basic notification system for atproto stuff using ntfy
1import {
2 Client,
3 ClientResponse,
4 FailedClientResponse,
5 simpleFetchHandler,
6} from "@atcute/client";
7import { JetstreamSubscription } from "@atcute/jetstream";
8import {
9 CanonicalResourceUri,
10 Did,
11 parseCanonicalResourceUri,
12 ParsedCanonicalResourceUri,
13 RecordKey,
14} from "@atcute/lexicons";
15
16import { AppBskyFeedPost } from "@atcute/bluesky";
17import {
18 ProfileViewDetailed,
19 VerificationView,
20} from "@atcute/bluesky/types/app/actor/defs";
21import {
22 ShTangledFeedStar,
23 ShTangledRepoIssue,
24 ShTangledRepoIssueComment,
25} from "@atcute/tangled";
26import {
27 CompositeDidDocumentResolver,
28 PlcDidDocumentResolver,
29 WebDidDocumentResolver,
30} from "@atcute/identity-resolver";
31import { AtprotoDid } from "@atcute/lexicons/syntax";
32import { XRPCProcedures, XRPCQueries } from "@atcute/lexicons/ambient";
33
34const TARGET_DID = (process.env.TARGET_DID ||
35 "did:plc:3c6vkaq7xf5kz3va3muptjh5") as Did;
36
37const JETSTREAM_URL =
38 process.env.JETSTREAM_URL ||
39 "wss://jetstream2.us-east.bsky.network/subscribe";
40const NTFY_URL = process.env.NTFY_URL || "http://0.0.0.0";
41const BSKY_URL = process.env.BSKY_URL || "https://bsky.app";
42const PDSLS_URL = process.env.PDSLS_URL || "https://pdsls.dev";
43const TANGLED_URL = process.env.TANGLED_URL || "https://tangled.sh";
44
45const CACHE_LIFETIME = 60 * 60 * 1000; // 60 minutes in milliseconds
46
47const cache = new Map<
48 string,
49 { value: any; timestamp: number; lifetime: number }
50>();
51
52const getWithCache = async <T>(
53 key: string,
54 fetcher: () => Promise<T>,
55 lifetime?: number,
56): Promise<T> => {
57 const cached = cache.get(key);
58 const now = Date.now();
59
60 if (cached && now - cached.timestamp < cached.lifetime) {
61 return cached.value as T;
62 }
63
64 const value = await fetcher();
65 cache.set(key, {
66 value,
67 timestamp: now,
68 lifetime: lifetime ?? CACHE_LIFETIME,
69 });
70 return value;
71};
72
73const docResolver = new CompositeDidDocumentResolver({
74 methods: {
75 plc: new PlcDidDocumentResolver(),
76 web: new WebDidDocumentResolver(),
77 },
78});
79
80const bskyClient = new Client({
81 handler: simpleFetchHandler({ service: "https://public.api.bsky.app" }),
82});
83const clientGetRecord = async (
84 uri: ParsedCanonicalResourceUri,
85): Promise<
86 ClientResponse<
87 XRPCQueries["com.atproto.repo.getRecord"],
88 XRPCQueries["com.atproto.repo.getRecord"]
89 >
90> => {
91 return getWithCache(
92 uri.collection + uri.repo + uri.rkey,
93 async () => {
94 try {
95 const doc = await docResolver.resolve(uri.repo as AtprotoDid);
96 const atprotoPdsService = doc.service?.find(
97 (s) =>
98 s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer",
99 );
100 const pdsServiceEndpoint = atprotoPdsService?.serviceEndpoint;
101 if (!pdsServiceEndpoint || typeof pdsServiceEndpoint !== "string") {
102 throw new Error("No PDS service endpoint found");
103 }
104 const client = new Client({
105 handler: simpleFetchHandler({ service: pdsServiceEndpoint }),
106 });
107 return await client.get("com.atproto.repo.getRecord", { params: uri });
108 } catch (err) {
109 return { ok: false, data: err } as FailedClientResponse;
110 }
111 },
112 CACHE_LIFETIME * 24,
113 );
114};
115
116const getId = (profile: ProfileViewDetailed) => {
117 return profile.handle !== "handle.invalid" ? profile.handle : profile.did;
118};
119
120const getProfile = async (did: Did): Promise<ProfileViewDetailed> => {
121 return getWithCache(
122 "bskyProfile_" + did,
123 async () => {
124 const profile = (
125 await bskyClient.get("app.bsky.actor.getProfile", {
126 params: {
127 actor: did,
128 },
129 })
130 ).data;
131
132 if ("error" in profile)
133 return {
134 $type: "app.bsky.actor.defs#profileViewDetailed",
135 did: did,
136 handle: "handle.invalid",
137 displayName: "silent error!",
138 } as ProfileViewDetailed;
139
140 return profile;
141 },
142 CACHE_LIFETIME * 4,
143 );
144};
145
146const errorRepo = {
147 name: "Repository not found",
148};
149
150const getTangledRepo = async (
151 uri: CanonicalResourceUri,
152): Promise<{ name: string }> => {
153 const res = parseCanonicalResourceUri(uri);
154 if (!res.ok) return errorRepo;
155
156 return getWithCache(uri, async () => {
157 const repo = (await clientGetRecord(res.value)).data;
158
159 if ("error" in repo || !("name" in repo.value)) return errorRepo;
160
161 return repo.value as { name: string };
162 });
163};
164
165const errorIssue = {
166 title: "Repository not found",
167 repo: "at://did:web:fake/nope.nada/nada" as CanonicalResourceUri,
168};
169
170const getTangledIssue = async (
171 uri: CanonicalResourceUri,
172): Promise<{ title: string; repo: CanonicalResourceUri }> => {
173 const res = parseCanonicalResourceUri(uri);
174 if (!res.ok) return errorIssue;
175
176 return getWithCache(uri, async () => {
177 const repo = (await clientGetRecord(res.value)).data;
178
179 if ("error" in repo || !("title" in repo.value)) return errorIssue;
180
181 return repo.value as {
182 title: string;
183 repo: CanonicalResourceUri;
184 };
185 });
186};
187
188const sendNotification = async (args: {
189 title?: string;
190 icon?: `${string}:${string}` | undefined;
191 message?: string;
192 url?: string;
193 priority?: number;
194 picture?: string | undefined;
195}) => {
196 const res = await fetch(NTFY_URL, {
197 method: "POST",
198 headers: {
199 Title: args.title ?? "",
200 Icon: args.icon ?? "",
201 Priority: args.priority?.toString() ?? "3",
202 Click: args.url ?? "",
203 Attach: args.picture ?? "",
204 },
205 body: args.message ?? null,
206 });
207
208 if ("error" in res) {
209 console.error(JSON.stringify(res));
210 }
211};
212
213const wantedCollections = [
214 "app.bsky.feed.post",
215 "app.bsky.feed.follow",
216 "app.bsky.graph.verification",
217 "sh.tangled.graph.follow",
218 "sh.tangled.feed.star",
219 "sh.tangled.repo.issue",
220 "sh.tangled.repo.issue.comment",
221 "sh.tangled.repo.issue.state",
222];
223
224const notificationHandlers: {
225 [key in (typeof wantedCollections)[number]]: (
226 did: Did,
227 rkey: RecordKey,
228 record: any,
229 ) => void;
230} = {
231 "app.bsky.feed.post": async (did, rkey, record: AppBskyFeedPost.Main) => {
232 const embedTable = {
233 "app.bsky.embed.external": "External Link",
234 "app.bsky.embed.images": "Image",
235 "app.bsky.embed.record": "Record",
236 "app.bsky.embed.recordWithMedia": "Record with Media",
237 "app.bsky.embed.video": "Video",
238 };
239
240 const profile = await getProfile(did);
241
242 const typeOfPost =
243 record.reply?.parent.uri.includes(TARGET_DID) ||
244 record.reply?.root.uri.includes(TARGET_DID)
245 ? "replied"
246 : "mentioned you";
247
248 const post = record as AppBskyFeedPost.Main;
249 sendNotification({
250 title: "Bluesky",
251 icon: profile.avatar,
252 message:
253 `${getId(profile)} ${typeOfPost}: ${post.text}` +
254 (post.embed
255 ? (post.text.length > 0 ? " " : "") +
256 `[${embedTable[post.embed.$type]}]`
257 : ""),
258 url: `${BSKY_URL}/profile/${profile.did}/post/${rkey}`,
259 });
260 },
261 "app.bsky.feed.follow": async (did, rkey, record) => {
262 const profile = await getProfile(did);
263
264 sendNotification({
265 title: "Bluesky",
266 icon: profile.avatar,
267 message: `${getId(profile)} followed you`,
268 url: `${BSKY_URL}/profile/${profile.did}`,
269 priority: 2,
270 });
271 },
272 "app.bsky.graph.verification": async (
273 did,
274 rkey,
275 record: VerificationView,
276 ) => {
277 const profile = await getProfile(did);
278
279 sendNotification({
280 title: "Bluesky",
281 icon: profile.avatar,
282 message: `${getId(profile)} verified you`,
283 url: `${PDSLS_URL}/${record.uri}`,
284 priority: 2,
285 });
286 },
287 "sh.tangled.graph.follow": async (did, rkey, record) => {
288 const profile = await getProfile(did);
289
290 sendNotification({
291 title: "Tangled",
292 icon: profile.avatar,
293 message: `${getId(profile)} followed you`,
294 url: `${TANGLED_URL}/@${profile.did}`,
295 });
296 },
297 "sh.tangled.feed.star": async (did, rkey, record: ShTangledFeedStar.Main) => {
298 const profile = await getProfile(did);
299 const repo = await getTangledRepo(record.subject as CanonicalResourceUri);
300
301 sendNotification({
302 title: "Tangled",
303 icon: profile.avatar,
304 message: `${getId(profile)} starred ${repo.name}`,
305 url: `${TANGLED_URL}/@${profile.did}`,
306 priority: 2,
307 });
308 },
309 "sh.tangled.repo.issue": async (
310 did,
311 rkey,
312 record: ShTangledRepoIssue.Main,
313 ) => {
314 const profile = await getProfile(did);
315 const repo = await getTangledRepo(record.repo as CanonicalResourceUri);
316
317 sendNotification({
318 title: "Tangled",
319 icon: profile.avatar,
320 message: `${getId(profile)} opened an issue, "${record.title}", on ${repo.name}: ${record.body}`,
321 url: `${TANGLED_URL}`,
322 });
323 },
324 "sh.tangled.repo.issue.comment": async (
325 did,
326 rkey,
327 record: ShTangledRepoIssueComment.Main,
328 ) => {
329 const profile = await getProfile(did);
330 const issue = await getTangledIssue(record.issue as CanonicalResourceUri);
331 const repo = await getTangledRepo(issue.repo as CanonicalResourceUri);
332
333 sendNotification({
334 title: "Tangled",
335 icon: profile.avatar,
336 message: `${getId(profile)} commented on issue "${issue.title}", on ${repo.name}: ${record.body}`,
337 url: `${TANGLED_URL}/@${profile.did}`,
338 });
339 },
340};
341
342async function main() {
343 console.log("Started notification server.");
344
345 const subscription = new JetstreamSubscription({
346 url: JETSTREAM_URL,
347 wantedCollections: wantedCollections,
348 });
349
350 for await (const event of subscription) {
351 if (event.did !== TARGET_DID && event.kind === "commit") {
352 const commit = event.commit;
353
354 if (
355 commit.operation === "create" &&
356 wantedCollections.includes(commit.collection)
357 ) {
358 const record = commit.record;
359 const recordText = JSON.stringify(record);
360
361 if (recordText.includes(TARGET_DID)) {
362 const handler = notificationHandlers[commit.collection];
363
364 if (handler) {
365 handler(event.did, event.commit.rkey, record);
366 }
367 }
368 }
369 }
370 }
371}
372
373main();