a tool for shared writing and social publishing
at update/reader 410 lines 14 kB view raw
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12 PubLeafletPollVote, 13 PubLeafletPollDefinition, 14 PubLeafletInteractionsRecommend, 15 SiteStandardDocument, 16 SiteStandardPublication, 17 SiteStandardGraphSubscription, 18} from "lexicons/api"; 19import { 20 AppBskyEmbedExternal, 21 AppBskyEmbedRecordWithMedia, 22 AppBskyFeedPost, 23 AppBskyRichtextFacet, 24} from "@atproto/api"; 25import { AtUri } from "@atproto/syntax"; 26import { writeFile, readFile } from "fs/promises"; 27import { inngest } from "app/api/inngest/client"; 28 29const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 30 31let supabase = createClient<Database>( 32 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 33 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 34); 35const QUOTE_PARAM = "/l-quote/"; 36async function main() { 37 const runner = new MemoryRunner({}); 38 let firehose = new Firehose({ 39 service: "wss://relay1.us-west.bsky.network", 40 subscriptionReconnectDelay: 3000, 41 excludeAccount: true, 42 excludeIdentity: true, 43 runner, 44 idResolver, 45 filterCollections: [ 46 ids.PubLeafletDocument, 47 ids.PubLeafletPublication, 48 ids.PubLeafletGraphSubscription, 49 ids.PubLeafletComment, 50 ids.PubLeafletPollVote, 51 ids.PubLeafletPollDefinition, 52 ids.PubLeafletInteractionsRecommend, 53 // ids.AppBskyActorProfile, 54 "app.bsky.feed.post", 55 ids.SiteStandardDocument, 56 ids.SiteStandardPublication, 57 ids.SiteStandardGraphSubscription, 58 ], 59 handleEvent, 60 onError: (err) => { 61 console.error(err); 62 }, 63 }); 64 console.log("starting firehose consumer"); 65 firehose.start(); 66 let cleaningUp = false; 67 const cleanup = async () => { 68 if (cleaningUp) return; 69 cleaningUp = true; 70 console.log("shutting down firehose..."); 71 await firehose.destroy(); 72 await runner.destroy(); 73 process.exit(); 74 }; 75 76 process.on("SIGINT", cleanup); 77 process.on("SIGTERM", cleanup); 78} 79 80main(); 81 82async function handleEvent(evt: Event) { 83 if (evt.event === "identity") { 84 if (evt.handle) 85 await supabase 86 .from("bsky_profiles") 87 .update({ handle: evt.handle }) 88 .eq("did", evt.did); 89 } 90 if ( 91 evt.event == "account" || 92 evt.event === "identity" || 93 evt.event === "sync" 94 ) 95 return; 96 if (evt.collection !== "app.bsky.feed.post") 97 console.log( 98 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 99 ); 100 if (evt.collection === ids.PubLeafletDocument) { 101 if (evt.event === "create" || evt.event === "update") { 102 let record = PubLeafletDocument.validateRecord(evt.record); 103 if (!record.success) { 104 console.log(record.error); 105 return; 106 } 107 let docResult = await supabase.from("documents").upsert({ 108 uri: evt.uri.toString(), 109 data: record.value as Json, 110 }); 111 if (docResult.error) console.log(docResult.error); 112 if (record.value.publication) { 113 let publicationURI = new AtUri(record.value.publication); 114 115 if (publicationURI.host !== evt.uri.host) { 116 console.log("Unauthorized to create post!"); 117 return; 118 } 119 let docInPublicationResult = await supabase 120 .from("documents_in_publications") 121 .upsert({ 122 publication: record.value.publication, 123 document: evt.uri.toString(), 124 }); 125 await supabase 126 .from("documents_in_publications") 127 .delete() 128 .neq("publication", record.value.publication) 129 .eq("document", evt.uri.toString()); 130 131 if (docInPublicationResult.error) 132 console.log(docInPublicationResult.error); 133 } 134 } 135 if (evt.event === "delete") { 136 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 137 } 138 } 139 if (evt.collection === ids.PubLeafletPublication) { 140 if (evt.event === "create" || evt.event === "update") { 141 let record = PubLeafletPublication.validateRecord(evt.record); 142 if (!record.success) return; 143 await supabase 144 .from("identities") 145 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 146 await supabase.from("publications").upsert({ 147 uri: evt.uri.toString(), 148 identity_did: evt.did, 149 name: record.value.name, 150 record: record.value as Json, 151 }); 152 } 153 if (evt.event === "delete") { 154 await supabase 155 .from("publications") 156 .delete() 157 .eq("uri", evt.uri.toString()); 158 } 159 } 160 if (evt.collection === ids.PubLeafletComment) { 161 if (evt.event === "create" || evt.event === "update") { 162 let record = PubLeafletComment.validateRecord(evt.record); 163 if (!record.success) return; 164 let { error } = await supabase.from("comments_on_documents").upsert({ 165 uri: evt.uri.toString(), 166 profile: evt.did, 167 document: record.value.subject, 168 record: record.value as Json, 169 }); 170 } 171 if (evt.event === "delete") { 172 await supabase 173 .from("comments_on_documents") 174 .delete() 175 .eq("uri", evt.uri.toString()); 176 } 177 } 178 if (evt.collection === ids.PubLeafletPollVote) { 179 if (evt.event === "create" || evt.event === "update") { 180 let record = PubLeafletPollVote.validateRecord(evt.record); 181 if (!record.success) return; 182 let { error } = await supabase.from("atp_poll_votes").upsert({ 183 uri: evt.uri.toString(), 184 voter_did: evt.did, 185 poll_uri: record.value.poll.uri, 186 poll_cid: record.value.poll.cid, 187 record: record.value as Json, 188 }); 189 } 190 if (evt.event === "delete") { 191 await supabase 192 .from("atp_poll_votes") 193 .delete() 194 .eq("uri", evt.uri.toString()); 195 } 196 } 197 if (evt.collection === ids.PubLeafletPollDefinition) { 198 if (evt.event === "create" || evt.event === "update") { 199 let record = PubLeafletPollDefinition.validateRecord(evt.record); 200 if (!record.success) return; 201 let { error } = await supabase.from("atp_poll_records").upsert({ 202 uri: evt.uri.toString(), 203 cid: evt.cid.toString(), 204 record: record.value as Json, 205 }); 206 if (error) console.log("Error upserting poll definition:", error); 207 } 208 if (evt.event === "delete") { 209 await supabase 210 .from("atp_poll_records") 211 .delete() 212 .eq("uri", evt.uri.toString()); 213 } 214 } 215 if (evt.collection === ids.PubLeafletInteractionsRecommend) { 216 if (evt.event === "create" || evt.event === "update") { 217 let record = PubLeafletInteractionsRecommend.validateRecord(evt.record); 218 if (!record.success) return; 219 await supabase 220 .from("identities") 221 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 222 let { error } = await supabase.from("recommends_on_documents").upsert({ 223 uri: evt.uri.toString(), 224 recommender_did: evt.did, 225 document: record.value.subject, 226 record: record.value as Json, 227 }); 228 if (error) console.log("Error upserting recommend:", error); 229 } 230 if (evt.event === "delete") { 231 await supabase 232 .from("recommends_on_documents") 233 .delete() 234 .eq("uri", evt.uri.toString()); 235 } 236 } 237 if (evt.collection === ids.PubLeafletGraphSubscription) { 238 if (evt.event === "create" || evt.event === "update") { 239 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 240 if (!record.success) return; 241 await supabase 242 .from("identities") 243 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 244 await supabase.from("publication_subscriptions").upsert({ 245 uri: evt.uri.toString(), 246 identity: evt.did, 247 publication: record.value.publication, 248 record: record.value as Json, 249 }); 250 } 251 if (evt.event === "delete") { 252 await supabase 253 .from("publication_subscriptions") 254 .delete() 255 .eq("uri", evt.uri.toString()); 256 } 257 } 258 // site.standard.document records go into the main "documents" table 259 // The normalization layer handles reading both pub.leaflet and site.standard formats 260 if (evt.collection === ids.SiteStandardDocument) { 261 if (evt.event === "create" || evt.event === "update") { 262 let record = SiteStandardDocument.validateRecord(evt.record); 263 if (!record.success) { 264 console.log(record.error); 265 return; 266 } 267 let docResult = await supabase.from("documents").upsert({ 268 uri: evt.uri.toString(), 269 data: record.value as Json, 270 }); 271 if (docResult.error) console.log(docResult.error); 272 273 // site.standard.document uses "site" field to reference the publication 274 // For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey) 275 // For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx) 276 // Only link to publications table for AT-URI sites 277 if (record.value.site && record.value.site.startsWith("at://")) { 278 let siteURI = new AtUri(record.value.site); 279 280 if (siteURI.host !== evt.uri.host) { 281 console.log("Unauthorized to create document in site!"); 282 return; 283 } 284 let docInPublicationResult = await supabase 285 .from("documents_in_publications") 286 .upsert({ 287 publication: record.value.site, 288 document: evt.uri.toString(), 289 }); 290 await supabase 291 .from("documents_in_publications") 292 .delete() 293 .neq("publication", record.value.site) 294 .eq("document", evt.uri.toString()); 295 296 if (docInPublicationResult.error) 297 console.log(docInPublicationResult.error); 298 } 299 } 300 if (evt.event === "delete") { 301 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 302 } 303 } 304 305 // site.standard.publication records go into the main "publications" table 306 if (evt.collection === ids.SiteStandardPublication) { 307 if (evt.event === "create" || evt.event === "update") { 308 let record = SiteStandardPublication.validateRecord(evt.record); 309 if (!record.success) return; 310 await supabase 311 .from("identities") 312 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 313 await supabase.from("publications").upsert({ 314 uri: evt.uri.toString(), 315 identity_did: evt.did, 316 name: record.value.name, 317 record: record.value as Json, 318 }); 319 } 320 if (evt.event === "delete") { 321 await supabase 322 .from("publications") 323 .delete() 324 .eq("uri", evt.uri.toString()); 325 } 326 } 327 328 // site.standard.graph.subscription records go into the main "publication_subscriptions" table 329 if (evt.collection === ids.SiteStandardGraphSubscription) { 330 if (evt.event === "create" || evt.event === "update") { 331 let record = SiteStandardGraphSubscription.validateRecord(evt.record); 332 if (!record.success) return; 333 await supabase 334 .from("identities") 335 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" }); 336 await supabase.from("publication_subscriptions").upsert({ 337 uri: evt.uri.toString(), 338 identity: evt.did, 339 publication: record.value.publication, 340 record: record.value as Json, 341 }); 342 } 343 if (evt.event === "delete") { 344 await supabase 345 .from("publication_subscriptions") 346 .delete() 347 .eq("uri", evt.uri.toString()); 348 } 349 } 350 // if (evt.collection === ids.AppBskyActorProfile) { 351 // //only listen to updates because we should fetch it for the first time when they subscribe! 352 // if (evt.event === "update") { 353 // await supabaseServerClient 354 // .from("bsky_profiles") 355 // .update({ record: evt.record as Json }) 356 // .eq("did", evt.did); 357 // } 358 // } 359 if (evt.collection === "app.bsky.feed.post") { 360 if (evt.event !== "create") return; 361 362 // Early exit if no embed 363 if ( 364 !evt.record || 365 typeof evt.record !== "object" || 366 !("embed" in evt.record) 367 ) 368 return; 369 370 // Check if embed contains our quote param using optional chaining 371 const embedRecord = evt.record as any; 372 const hasQuoteParam = 373 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 374 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 375 376 if (!hasQuoteParam) return; 377 console.log("FOUND EMBED!!!"); 378 379 // Now validate the record since we know it contains our quote param 380 let record = AppBskyFeedPost.validateRecord(evt.record); 381 if (!record.success) { 382 console.log(record.error); 383 return; 384 } 385 386 let embed: string | null = null; 387 if ( 388 AppBskyEmbedExternal.isMain(record.value.embed) && 389 record.value.embed.external.uri.includes(QUOTE_PARAM) 390 ) { 391 embed = record.value.embed.external.uri; 392 } 393 if ( 394 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 395 AppBskyEmbedExternal.isMain(record.value.embed.media) && 396 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 397 ) { 398 embed = record.value.embed.media.external.uri; 399 } 400 if (embed) { 401 console.log( 402 "processing post mention: " + embed + " in " + evt.uri.toString(), 403 ); 404 await inngest.send({ 405 name: "appview/index-bsky-post-mention", 406 data: { post_uri: evt.uri.toString(), document_link: embed }, 407 }); 408 } 409 } 410}