basic notification system for atproto stuff using ntfy
1import { 2 Client, 3 ClientResponse, 4 FailedClientResponse, 5 simpleFetchHandler, 6} from "@atcute/client"; 7import { JetstreamSubscription } from "@atcute/jetstream"; 8import { 9 CanonicalResourceUri, 10 Did, 11 parseCanonicalResourceUri, 12 ParsedCanonicalResourceUri, 13 RecordKey, 14} from "@atcute/lexicons"; 15 16import { AppBskyFeedPost } from "@atcute/bluesky"; 17import { 18 ProfileViewDetailed, 19 VerificationView, 20} from "@atcute/bluesky/types/app/actor/defs"; 21import { 22 ShTangledFeedStar, 23 ShTangledRepoIssue, 24 ShTangledRepoIssueComment, 25} from "@atcute/tangled"; 26import { 27 CompositeDidDocumentResolver, 28 PlcDidDocumentResolver, 29 WebDidDocumentResolver, 30} from "@atcute/identity-resolver"; 31import { AtprotoDid } from "@atcute/lexicons/syntax"; 32import { XRPCProcedures, XRPCQueries } from "@atcute/lexicons/ambient"; 33 34const TARGET_DID = (process.env.TARGET_DID || 35 "did:plc:3c6vkaq7xf5kz3va3muptjh5") as Did; 36 37const JETSTREAM_URL = 38 process.env.JETSTREAM_URL || 39 "wss://jetstream2.us-east.bsky.network/subscribe"; 40const NTFY_URL = process.env.NTFY_URL || "http://0.0.0.0"; 41const BSKY_URL = process.env.BSKY_URL || "https://bsky.app"; 42const PDSLS_URL = process.env.PDSLS_URL || "https://pdsls.dev"; 43const TANGLED_URL = process.env.TANGLED_URL || "https://tangled.sh"; 44 45const CACHE_LIFETIME = 60 * 60 * 1000; // 60 minutes in milliseconds 46 47const cache = new Map< 48 string, 49 { value: any; timestamp: number; lifetime: number } 50>(); 51 52const getWithCache = async <T>( 53 key: string, 54 fetcher: () => Promise<T>, 55 lifetime?: number, 56): Promise<T> => { 57 const cached = cache.get(key); 58 const now = Date.now(); 59 60 if (cached && now - cached.timestamp < cached.lifetime) { 61 return cached.value as T; 62 } 63 64 const value = await fetcher(); 65 cache.set(key, { 66 value, 67 timestamp: now, 68 lifetime: lifetime ?? CACHE_LIFETIME, 69 }); 70 return value; 71}; 72 73const docResolver = new CompositeDidDocumentResolver({ 74 methods: { 75 plc: new PlcDidDocumentResolver(), 76 web: new WebDidDocumentResolver(), 77 }, 78}); 79 80const bskyClient = new Client({ 81 handler: simpleFetchHandler({ service: "https://public.api.bsky.app" }), 82}); 83const clientGetRecord = async ( 84 uri: ParsedCanonicalResourceUri, 85): Promise< 86 ClientResponse< 87 XRPCQueries["com.atproto.repo.getRecord"], 88 XRPCQueries["com.atproto.repo.getRecord"] 89 > 90> => { 91 return getWithCache( 92 uri.collection + uri.repo + uri.rkey, 93 async () => { 94 try { 95 const doc = await docResolver.resolve(uri.repo as AtprotoDid); 96 const atprotoPdsService = doc.service?.find( 97 (s) => 98 s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer", 99 ); 100 const pdsServiceEndpoint = atprotoPdsService?.serviceEndpoint; 101 if (!pdsServiceEndpoint || typeof pdsServiceEndpoint !== "string") { 102 throw new Error("No PDS service endpoint found"); 103 } 104 const client = new Client({ 105 handler: simpleFetchHandler({ service: pdsServiceEndpoint }), 106 }); 107 return await client.get("com.atproto.repo.getRecord", { params: uri }); 108 } catch (err) { 109 return { ok: false, data: err } as FailedClientResponse; 110 } 111 }, 112 CACHE_LIFETIME * 24, 113 ); 114}; 115 116const getId = (profile: ProfileViewDetailed) => { 117 return profile.handle !== "handle.invalid" ? profile.handle : profile.did; 118}; 119 120const getProfile = async (did: Did): Promise<ProfileViewDetailed> => { 121 return getWithCache( 122 "bskyProfile_" + did, 123 async () => { 124 const profile = ( 125 await bskyClient.get("app.bsky.actor.getProfile", { 126 params: { 127 actor: did, 128 }, 129 }) 130 ).data; 131 132 if ("error" in profile) 133 return { 134 $type: "app.bsky.actor.defs#profileViewDetailed", 135 did: did, 136 handle: "handle.invalid", 137 displayName: "silent error!", 138 } as ProfileViewDetailed; 139 140 return profile; 141 }, 142 CACHE_LIFETIME * 4, 143 ); 144}; 145 146const errorRepo = { 147 name: "Repository not found", 148}; 149 150const getTangledRepo = async ( 151 uri: CanonicalResourceUri, 152): Promise<{ name: string }> => { 153 const res = parseCanonicalResourceUri(uri); 154 if (!res.ok) return errorRepo; 155 156 return getWithCache(uri, async () => { 157 const repo = (await clientGetRecord(res.value)).data; 158 159 if ("error" in repo || !("name" in repo.value)) return errorRepo; 160 161 return repo.value as { name: string }; 162 }); 163}; 164 165const errorIssue = { 166 title: "Repository not found", 167 repo: "at://did:web:fake/nope.nada/nada" as CanonicalResourceUri, 168}; 169 170const getTangledIssue = async ( 171 uri: CanonicalResourceUri, 172): Promise<{ title: string; repo: CanonicalResourceUri }> => { 173 const res = parseCanonicalResourceUri(uri); 174 if (!res.ok) return errorIssue; 175 176 return getWithCache(uri, async () => { 177 const repo = (await clientGetRecord(res.value)).data; 178 179 if ("error" in repo || !("title" in repo.value)) return errorIssue; 180 181 return repo.value as { 182 title: string; 183 repo: CanonicalResourceUri; 184 }; 185 }); 186}; 187 188const sendNotification = async (args: { 189 title?: string; 190 icon?: `${string}:${string}` | undefined; 191 message?: string; 192 url?: string; 193 priority?: number; 194 picture?: string | undefined; 195}) => { 196 const res = await fetch(NTFY_URL, { 197 method: "POST", 198 headers: { 199 Title: args.title ?? "", 200 Icon: args.icon ?? "", 201 Priority: args.priority?.toString() ?? "3", 202 Click: args.url ?? "", 203 Attach: args.picture ?? "", 204 }, 205 body: args.message ?? null, 206 }); 207 208 if ("error" in res) { 209 console.error(JSON.stringify(res)); 210 } 211}; 212 213const wantedCollections = [ 214 "app.bsky.feed.post", 215 "app.bsky.feed.follow", 216 "app.bsky.graph.verification", 217 "sh.tangled.graph.follow", 218 "sh.tangled.feed.star", 219 "sh.tangled.repo.issue", 220 "sh.tangled.repo.issue.comment", 221 "sh.tangled.repo.issue.state", 222]; 223 224const notificationHandlers: { 225 [key in (typeof wantedCollections)[number]]: ( 226 did: Did, 227 rkey: RecordKey, 228 record: any, 229 ) => void; 230} = { 231 "app.bsky.feed.post": async (did, rkey, record: AppBskyFeedPost.Main) => { 232 const embedTable = { 233 "app.bsky.embed.external": "External Link", 234 "app.bsky.embed.images": "Image", 235 "app.bsky.embed.record": "Record", 236 "app.bsky.embed.recordWithMedia": "Record with Media", 237 "app.bsky.embed.video": "Video", 238 }; 239 240 const profile = await getProfile(did); 241 242 const typeOfPost = 243 record.reply?.parent.uri.includes(TARGET_DID) || 244 record.reply?.root.uri.includes(TARGET_DID) 245 ? "replied" 246 : "mentioned you"; 247 248 const post = record as AppBskyFeedPost.Main; 249 sendNotification({ 250 title: "Bluesky", 251 icon: profile.avatar, 252 message: 253 `${getId(profile)} ${typeOfPost}: ${post.text}` + 254 (post.embed 255 ? (post.text.length > 0 ? " " : "") + 256 `[${embedTable[post.embed.$type]}]` 257 : ""), 258 url: `${BSKY_URL}/profile/${profile.did}/post/${rkey}`, 259 }); 260 }, 261 "app.bsky.feed.follow": async (did, rkey, record) => { 262 const profile = await getProfile(did); 263 264 sendNotification({ 265 title: "Bluesky", 266 icon: profile.avatar, 267 message: `${getId(profile)} followed you`, 268 url: `${BSKY_URL}/profile/${profile.did}`, 269 priority: 2, 270 }); 271 }, 272 "app.bsky.graph.verification": async ( 273 did, 274 rkey, 275 record: VerificationView, 276 ) => { 277 const profile = await getProfile(did); 278 279 sendNotification({ 280 title: "Bluesky", 281 icon: profile.avatar, 282 message: `${getId(profile)} verified you`, 283 url: `${PDSLS_URL}/${record.uri}`, 284 priority: 2, 285 }); 286 }, 287 "sh.tangled.graph.follow": async (did, rkey, record) => { 288 const profile = await getProfile(did); 289 290 sendNotification({ 291 title: "Tangled", 292 icon: profile.avatar, 293 message: `${getId(profile)} followed you`, 294 url: `${TANGLED_URL}/@${profile.did}`, 295 }); 296 }, 297 "sh.tangled.feed.star": async (did, rkey, record: ShTangledFeedStar.Main) => { 298 const profile = await getProfile(did); 299 const repo = await getTangledRepo(record.subject as CanonicalResourceUri); 300 301 sendNotification({ 302 title: "Tangled", 303 icon: profile.avatar, 304 message: `${getId(profile)} starred ${repo.name}`, 305 url: `${TANGLED_URL}/@${profile.did}`, 306 priority: 2, 307 }); 308 }, 309 "sh.tangled.repo.issue": async ( 310 did, 311 rkey, 312 record: ShTangledRepoIssue.Main, 313 ) => { 314 const profile = await getProfile(did); 315 const repo = await getTangledRepo(record.repo as CanonicalResourceUri); 316 317 sendNotification({ 318 title: "Tangled", 319 icon: profile.avatar, 320 message: `${getId(profile)} opened an issue, "${record.title}", on ${repo.name}: ${record.body}`, 321 url: `${TANGLED_URL}`, 322 }); 323 }, 324 "sh.tangled.repo.issue.comment": async ( 325 did, 326 rkey, 327 record: ShTangledRepoIssueComment.Main, 328 ) => { 329 const profile = await getProfile(did); 330 const issue = await getTangledIssue(record.issue as CanonicalResourceUri); 331 const repo = await getTangledRepo(issue.repo as CanonicalResourceUri); 332 333 sendNotification({ 334 title: "Tangled", 335 icon: profile.avatar, 336 message: `${getId(profile)} commented on issue "${issue.title}", on ${repo.name}: ${record.body}`, 337 url: `${TANGLED_URL}/@${profile.did}`, 338 }); 339 }, 340}; 341 342async function main() { 343 console.log("Started notification server."); 344 345 const subscription = new JetstreamSubscription({ 346 url: JETSTREAM_URL, 347 wantedCollections: wantedCollections, 348 }); 349 350 for await (const event of subscription) { 351 if (event.did !== TARGET_DID && event.kind === "commit") { 352 const commit = event.commit; 353 354 if ( 355 commit.operation === "create" && 356 wantedCollections.includes(commit.collection) 357 ) { 358 const record = commit.record; 359 const recordText = JSON.stringify(record); 360 361 if (recordText.includes(TARGET_DID)) { 362 const handler = notificationHandlers[commit.collection]; 363 364 if (handler) { 365 handler(event.did, event.commit.rkey, record); 366 } 367 } 368 } 369 } 370 } 371} 372 373main();