a tool for shared writing and social publishing
at update/thread-viewer 383 lines 13 kB view raw
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12 PubLeafletPollVote, 13 PubLeafletPollDefinition, 14 SiteStandardDocument, 15 SiteStandardPublication, 16 SiteStandardGraphSubscription, 17} from "lexicons/api"; 18import { 19 AppBskyEmbedExternal, 20 AppBskyEmbedRecordWithMedia, 21 AppBskyFeedPost, 22 AppBskyRichtextFacet, 23} from "@atproto/api"; 24import { AtUri } from "@atproto/syntax"; 25import { writeFile, readFile } from "fs/promises"; 26import { inngest } from "app/api/inngest/client"; 27 28const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 29 30let supabase = createClient<Database>( 31 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 32 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 33); 34const QUOTE_PARAM = "/l-quote/"; 35async function main() { 36 const runner = new MemoryRunner({}); 37 let firehose = new Firehose({ 38 service: "wss://relay1.us-west.bsky.network", 39 subscriptionReconnectDelay: 3000, 40 excludeAccount: true, 41 excludeIdentity: true, 42 runner, 43 idResolver, 44 filterCollections: [ 45 ids.PubLeafletDocument, 46 ids.PubLeafletPublication, 47 ids.PubLeafletGraphSubscription, 48 ids.PubLeafletComment, 49 ids.PubLeafletPollVote, 50 ids.PubLeafletPollDefinition, 51 // ids.AppBskyActorProfile, 52 "app.bsky.feed.post", 53 ids.SiteStandardDocument, 54 ids.SiteStandardPublication, 55 ids.SiteStandardGraphSubscription, 56 ], 57 handleEvent, 58 onError: (err) => { 59 console.error(err); 60 }, 61 }); 62 console.log("starting firehose consumer"); 63 firehose.start(); 64 let cleaningUp = false; 65 const cleanup = async () => { 66 if (cleaningUp) return; 67 cleaningUp = true; 68 console.log("shutting down firehose..."); 69 await firehose.destroy(); 70 await runner.destroy(); 71 process.exit(); 72 }; 73 74 process.on("SIGINT", cleanup); 75 process.on("SIGTERM", cleanup); 76} 77 78main(); 79 80async function handleEvent(evt: Event) { 81 if (evt.event === "identity") { 82 if (evt.handle) 83 await supabase 84 .from("bsky_profiles") 85 .update({ handle: evt.handle }) 86 .eq("did", evt.did); 87 } 88 if ( 89 evt.event == "account" || 90 evt.event === "identity" || 91 evt.event === "sync" 92 ) 93 return; 94 if (evt.collection !== "app.bsky.feed.post") 95 console.log( 96 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 97 ); 98 if (evt.collection === ids.PubLeafletDocument) { 99 if (evt.event === "create" || evt.event === "update") { 100 let record = PubLeafletDocument.validateRecord(evt.record); 101 if (!record.success) { 102 console.log(record.error); 103 return; 104 } 105 let docResult = await supabase.from("documents").upsert({ 106 uri: evt.uri.toString(), 107 data: record.value as Json, 108 }); 109 if (docResult.error) console.log(docResult.error); 110 if (record.value.publication) { 111 let publicationURI = new AtUri(record.value.publication); 112 113 if (publicationURI.host !== evt.uri.host) { 114 console.log("Unauthorized to create post!"); 115 return; 116 } 117 let docInPublicationResult = await supabase 118 .from("documents_in_publications") 119 .upsert({ 120 publication: record.value.publication, 121 document: evt.uri.toString(), 122 }); 123 await supabase 124 .from("documents_in_publications") 125 .delete() 126 .neq("publication", record.value.publication) 127 .eq("document", evt.uri.toString()); 128 129 if (docInPublicationResult.error) 130 console.log(docInPublicationResult.error); 131 } 132 } 133 if (evt.event === "delete") { 134 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 135 } 136 } 137 if (evt.collection === ids.PubLeafletPublication) { 138 if (evt.event === "create" || evt.event === "update") { 139 let record = PubLeafletPublication.validateRecord(evt.record); 140 if (!record.success) return; 141 await supabase 142 .from("identities") 143 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 144 await supabase.from("publications").upsert({ 145 uri: evt.uri.toString(), 146 identity_did: evt.did, 147 name: record.value.name, 148 record: record.value as Json, 149 }); 150 } 151 if (evt.event === "delete") { 152 await supabase 153 .from("publications") 154 .delete() 155 .eq("uri", evt.uri.toString()); 156 } 157 } 158 if (evt.collection === ids.PubLeafletComment) { 159 if (evt.event === "create" || evt.event === "update") { 160 let record = PubLeafletComment.validateRecord(evt.record); 161 if (!record.success) return; 162 let { error } = await supabase.from("comments_on_documents").upsert({ 163 uri: evt.uri.toString(), 164 profile: evt.did, 165 document: record.value.subject, 166 record: record.value as Json, 167 }); 168 } 169 if (evt.event === "delete") { 170 await supabase 171 .from("comments_on_documents") 172 .delete() 173 .eq("uri", evt.uri.toString()); 174 } 175 } 176 if (evt.collection === ids.PubLeafletPollVote) { 177 if (evt.event === "create" || evt.event === "update") { 178 let record = PubLeafletPollVote.validateRecord(evt.record); 179 if (!record.success) return; 180 let { error } = await supabase.from("atp_poll_votes").upsert({ 181 uri: evt.uri.toString(), 182 voter_did: evt.did, 183 poll_uri: record.value.poll.uri, 184 poll_cid: record.value.poll.cid, 185 record: record.value as Json, 186 }); 187 } 188 if (evt.event === "delete") { 189 await supabase 190 .from("atp_poll_votes") 191 .delete() 192 .eq("uri", evt.uri.toString()); 193 } 194 } 195 if (evt.collection === ids.PubLeafletPollDefinition) { 196 if (evt.event === "create" || evt.event === "update") { 197 let record = PubLeafletPollDefinition.validateRecord(evt.record); 198 if (!record.success) return; 199 let { error } = await supabase.from("atp_poll_records").upsert({ 200 uri: evt.uri.toString(), 201 cid: evt.cid.toString(), 202 record: record.value as Json, 203 }); 204 if (error) console.log("Error upserting poll definition:", error); 205 } 206 if (evt.event === "delete") { 207 await supabase 208 .from("atp_poll_records") 209 .delete() 210 .eq("uri", evt.uri.toString()); 211 } 212 } 213 if (evt.collection === ids.PubLeafletGraphSubscription) { 214 if (evt.event === "create" || evt.event === "update") { 215 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 216 if (!record.success) return; 217 await supabase 218 .from("identities") 219 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 220 await supabase.from("publication_subscriptions").upsert({ 221 uri: evt.uri.toString(), 222 identity: evt.did, 223 publication: record.value.publication, 224 record: record.value as Json, 225 }); 226 } 227 if (evt.event === "delete") { 228 await supabase 229 .from("publication_subscriptions") 230 .delete() 231 .eq("uri", evt.uri.toString()); 232 } 233 } 234 // site.standard.document records go into the main "documents" table 235 // The normalization layer handles reading both pub.leaflet and site.standard formats 236 if (evt.collection === ids.SiteStandardDocument) { 237 if (evt.event === "create" || evt.event === "update") { 238 let record = SiteStandardDocument.validateRecord(evt.record); 239 if (!record.success) { 240 console.log(record.error); 241 return; 242 } 243 let docResult = await supabase.from("documents").upsert({ 244 uri: evt.uri.toString(), 245 data: record.value as Json, 246 }); 247 if (docResult.error) console.log(docResult.error); 248 249 // site.standard.document uses "site" field to reference the publication 250 // For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey) 251 // For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx) 252 // Only link to publications table for AT-URI sites 253 if (record.value.site && record.value.site.startsWith("at://")) { 254 let siteURI = new AtUri(record.value.site); 255 256 if (siteURI.host !== evt.uri.host) { 257 console.log("Unauthorized to create document in site!"); 258 return; 259 } 260 let docInPublicationResult = await supabase 261 .from("documents_in_publications") 262 .upsert({ 263 publication: record.value.site, 264 document: evt.uri.toString(), 265 }); 266 await supabase 267 .from("documents_in_publications") 268 .delete() 269 .neq("publication", record.value.site) 270 .eq("document", evt.uri.toString()); 271 272 if (docInPublicationResult.error) 273 console.log(docInPublicationResult.error); 274 } 275 } 276 if (evt.event === "delete") { 277 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 278 } 279 } 280 281 // site.standard.publication records go into the main "publications" table 282 if (evt.collection === ids.SiteStandardPublication) { 283 if (evt.event === "create" || evt.event === "update") { 284 let record = SiteStandardPublication.validateRecord(evt.record); 285 if (!record.success) return; 286 await supabase 287 .from("identities") 288 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 289 await supabase.from("publications").upsert({ 290 uri: evt.uri.toString(), 291 identity_did: evt.did, 292 name: record.value.name, 293 record: record.value as Json, 294 }); 295 } 296 if (evt.event === "delete") { 297 await supabase 298 .from("publications") 299 .delete() 300 .eq("uri", evt.uri.toString()); 301 } 302 } 303 304 // site.standard.graph.subscription records go into the main "publication_subscriptions" table 305 if (evt.collection === ids.SiteStandardGraphSubscription) { 306 if (evt.event === "create" || evt.event === "update") { 307 let record = SiteStandardGraphSubscription.validateRecord(evt.record); 308 if (!record.success) return; 309 await supabase 310 .from("identities") 311 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 312 await supabase.from("publication_subscriptions").upsert({ 313 uri: evt.uri.toString(), 314 identity: evt.did, 315 publication: record.value.publication, 316 record: record.value as Json, 317 }); 318 } 319 if (evt.event === "delete") { 320 await supabase 321 .from("publication_subscriptions") 322 .delete() 323 .eq("uri", evt.uri.toString()); 324 } 325 } 326 // if (evt.collection === ids.AppBskyActorProfile) { 327 // //only listen to updates because we should fetch it for the first time when they subscribe! 328 // if (evt.event === "update") { 329 // await supabaseServerClient 330 // .from("bsky_profiles") 331 // .update({ record: evt.record as Json }) 332 // .eq("did", evt.did); 333 // } 334 // } 335 if (evt.collection === "app.bsky.feed.post") { 336 if (evt.event !== "create") return; 337 338 // Early exit if no embed 339 if ( 340 !evt.record || 341 typeof evt.record !== "object" || 342 !("embed" in evt.record) 343 ) 344 return; 345 346 // Check if embed contains our quote param using optional chaining 347 const embedRecord = evt.record as any; 348 const hasQuoteParam = 349 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 350 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 351 352 if (!hasQuoteParam) return; 353 console.log("FOUND EMBED!!!"); 354 355 // Now validate the record since we know it contains our quote param 356 let record = AppBskyFeedPost.validateRecord(evt.record); 357 if (!record.success) return; 358 359 let embed: string | null = null; 360 if ( 361 AppBskyEmbedExternal.isMain(record.value.embed) && 362 record.value.embed.external.uri.includes(QUOTE_PARAM) 363 ) { 364 embed = record.value.embed.external.uri; 365 } 366 if ( 367 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 368 AppBskyEmbedExternal.isMain(record.value.embed.media) && 369 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 370 ) { 371 embed = record.value.embed.media.external.uri; 372 } 373 if (embed) { 374 console.log( 375 "processing post mention: " + embed + " in " + evt.uri.toString(), 376 ); 377 await inngest.send({ 378 name: "appview/index-bsky-post-mention", 379 data: { post_uri: evt.uri.toString(), document_link: embed }, 380 }); 381 } 382 } 383}