a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js"; 2import { Database, Json } from "supabase/database.types"; 3import { IdResolver } from "@atproto/identity"; 4const idResolver = new IdResolver(); 5import { Firehose, MemoryRunner, Event } from "@atproto/sync"; 6import { ids } from "lexicons/api/lexicons"; 7import { 8 PubLeafletDocument, 9 PubLeafletGraphSubscription, 10 PubLeafletPublication, 11 PubLeafletComment, 12 PubLeafletPollVote, 13 PubLeafletPollDefinition, 14} from "lexicons/api"; 15import { 16 AppBskyEmbedExternal, 17 AppBskyEmbedRecordWithMedia, 18 AppBskyFeedPost, 19 AppBskyRichtextFacet, 20} from "@atproto/api"; 21import { AtUri } from "@atproto/syntax"; 22import { writeFile, readFile } from "fs/promises"; 23import { createIdentity } from "actions/createIdentity"; 24import { drizzle } from "drizzle-orm/node-postgres"; 25import { inngest } from "app/api/inngest/client"; 26import { Client } from "pg"; 27 28const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor"; 29 30let supabase = createClient<Database>( 31 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string, 32 process.env.SUPABASE_SERVICE_ROLE_KEY as string, 33); 34const QUOTE_PARAM = "/l-quote/"; 35async function main() { 36 const runner = new MemoryRunner({}); 37 let firehose = new Firehose({ 38 service: "wss://relay1.us-west.bsky.network", 39 subscriptionReconnectDelay: 3000, 40 excludeAccount: true, 41 excludeIdentity: true, 42 runner, 43 idResolver, 44 filterCollections: [ 45 ids.PubLeafletDocument, 46 ids.PubLeafletPublication, 47 ids.PubLeafletGraphSubscription, 48 ids.PubLeafletComment, 49 ids.PubLeafletPollVote, 50 ids.PubLeafletPollDefinition, 51 // ids.AppBskyActorProfile, 52 "app.bsky.feed.post", 53 ], 54 handleEvent, 55 onError: (err) => { 56 console.error(err); 57 }, 58 }); 59 console.log("starting firehose consumer"); 60 firehose.start(); 61 let cleaningUp = false; 62 const cleanup = async () => { 63 if (cleaningUp) return; 64 cleaningUp = true; 65 console.log("shutting down firehose..."); 66 await firehose.destroy(); 67 await runner.destroy(); 68 process.exit(); 69 }; 70 71 process.on("SIGINT", cleanup); 72 process.on("SIGTERM", cleanup); 73} 74 75main(); 76 77async function handleEvent(evt: Event) { 78 if (evt.event === "identity") { 79 if (evt.handle) 80 await supabase 81 .from("bsky_profiles") 82 .update({ handle: evt.handle }) 83 .eq("did", evt.did); 84 } 85 if ( 86 evt.event == "account" || 87 evt.event === "identity" || 88 evt.event === "sync" 89 ) 90 return; 91 if (evt.collection !== "app.bsky.feed.post") 92 console.log( 93 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`, 94 ); 95 if (evt.collection === ids.PubLeafletDocument) { 96 if (evt.event === "create" || evt.event === "update") { 97 let record = PubLeafletDocument.validateRecord(evt.record); 98 if (!record.success) { 99 console.log(record.error); 100 return; 101 } 102 let docResult = await supabase.from("documents").upsert({ 103 uri: evt.uri.toString(), 104 data: record.value as Json, 105 }); 106 if (docResult.error) console.log(docResult.error); 107 let publicationURI = new AtUri(record.value.publication); 108 109 if (publicationURI.host !== evt.uri.host) { 110 console.log("Unauthorized to create post!"); 111 return; 112 } 113 let docInPublicationResult = await supabase 114 .from("documents_in_publications") 115 .upsert({ 116 publication: record.value.publication, 117 document: evt.uri.toString(), 118 }); 119 await supabase 120 .from("documents_in_publications") 121 .delete() 122 .neq("publication", record.value.publication) 123 .eq("document", evt.uri.toString()); 124 if (docInPublicationResult.error) 125 console.log(docInPublicationResult.error); 126 } 127 if (evt.event === "delete") { 128 await supabase.from("documents").delete().eq("uri", evt.uri.toString()); 129 } 130 } 131 if (evt.collection === ids.PubLeafletPublication) { 132 if (evt.event === "create" || evt.event === "update") { 133 let record = PubLeafletPublication.validateRecord(evt.record); 134 if (!record.success) return; 135 let { error } = await supabase.from("publications").upsert({ 136 uri: evt.uri.toString(), 137 identity_did: evt.did, 138 name: record.value.name, 139 record: record.value as Json, 140 }); 141 142 if (error && error.code === "23503") { 143 console.log("creating identity"); 144 let client = new Client({ connectionString: process.env.DB_URL }); 145 let db = drizzle(client); 146 await createIdentity(db, { atp_did: evt.did }); 147 client.end(); 148 await supabase.from("publications").upsert({ 149 uri: evt.uri.toString(), 150 identity_did: evt.did, 151 name: record.value.name, 152 record: record.value as Json, 153 }); 154 } 155 } 156 if (evt.event === "delete") { 157 await supabase 158 .from("publications") 159 .delete() 160 .eq("uri", evt.uri.toString()); 161 } 162 } 163 if (evt.collection === ids.PubLeafletComment) { 164 if (evt.event === "create" || evt.event === "update") { 165 let record = PubLeafletComment.validateRecord(evt.record); 166 if (!record.success) return; 167 let { error } = await supabase.from("comments_on_documents").upsert({ 168 uri: evt.uri.toString(), 169 profile: evt.did, 170 document: record.value.subject, 171 record: record.value as Json, 172 }); 173 } 174 if (evt.event === "delete") { 175 await supabase 176 .from("comments_on_documents") 177 .delete() 178 .eq("uri", evt.uri.toString()); 179 } 180 } 181 if (evt.collection === ids.PubLeafletPollVote) { 182 if (evt.event === "create" || evt.event === "update") { 183 let record = PubLeafletPollVote.validateRecord(evt.record); 184 if (!record.success) return; 185 let { error } = await supabase.from("atp_poll_votes").upsert({ 186 uri: evt.uri.toString(), 187 voter_did: evt.did, 188 poll_uri: record.value.poll.uri, 189 poll_cid: record.value.poll.cid, 190 record: record.value as Json, 191 }); 192 } 193 if (evt.event === "delete") { 194 await supabase 195 .from("atp_poll_votes") 196 .delete() 197 .eq("uri", evt.uri.toString()); 198 } 199 } 200 if (evt.collection === ids.PubLeafletPollDefinition) { 201 if (evt.event === "create" || evt.event === "update") { 202 let record = PubLeafletPollDefinition.validateRecord(evt.record); 203 if (!record.success) return; 204 let { error } = await supabase.from("atp_poll_records").upsert({ 205 uri: evt.uri.toString(), 206 cid: evt.cid.toString(), 207 record: record.value as Json, 208 }); 209 if (error) console.log("Error upserting poll definition:", error); 210 } 211 if (evt.event === "delete") { 212 await supabase 213 .from("atp_poll_records") 214 .delete() 215 .eq("uri", evt.uri.toString()); 216 } 217 } 218 if (evt.collection === ids.PubLeafletGraphSubscription) { 219 if (evt.event === "create" || evt.event === "update") { 220 let record = PubLeafletGraphSubscription.validateRecord(evt.record); 221 if (!record.success) return; 222 let { error } = await supabase.from("publication_subscriptions").upsert({ 223 uri: evt.uri.toString(), 224 identity: evt.did, 225 publication: record.value.publication, 226 record: record.value as Json, 227 }); 228 if (error && error.code === "23503") { 229 console.log("creating identity"); 230 let client = new Client({ connectionString: process.env.DB_URL }); 231 let db = drizzle(client); 232 await createIdentity(db, { atp_did: evt.did }); 233 client.end(); 234 await supabase.from("publication_subscriptions").upsert({ 235 uri: evt.uri.toString(), 236 identity: evt.did, 237 publication: record.value.publication, 238 record: record.value as Json, 239 }); 240 } 241 } 242 if (evt.event === "delete") { 243 await supabase 244 .from("publication_subscriptions") 245 .delete() 246 .eq("uri", evt.uri.toString()); 247 } 248 } 249 // if (evt.collection === ids.AppBskyActorProfile) { 250 // //only listen to updates because we should fetch it for the first time when they subscribe! 251 // if (evt.event === "update") { 252 // await supabaseServerClient 253 // .from("bsky_profiles") 254 // .update({ record: evt.record as Json }) 255 // .eq("did", evt.did); 256 // } 257 // } 258 if (evt.collection === "app.bsky.feed.post") { 259 if (evt.event !== "create") return; 260 261 // Early exit if no embed 262 if ( 263 !evt.record || 264 typeof evt.record !== "object" || 265 !("embed" in evt.record) 266 ) 267 return; 268 269 // Check if embed contains our quote param using optional chaining 270 const embedRecord = evt.record as any; 271 const hasQuoteParam = 272 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) || 273 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM); 274 275 if (!hasQuoteParam) return; 276 console.log("FOUND EMBED!!!"); 277 278 // Now validate the record since we know it contains our quote param 279 let record = AppBskyFeedPost.validateRecord(evt.record); 280 if (!record.success) return; 281 282 let embed: string | null = null; 283 if ( 284 AppBskyEmbedExternal.isMain(record.value.embed) && 285 record.value.embed.external.uri.includes(QUOTE_PARAM) 286 ) { 287 embed = record.value.embed.external.uri; 288 } 289 if ( 290 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) && 291 AppBskyEmbedExternal.isMain(record.value.embed.media) && 292 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM) 293 ) { 294 embed = record.value.embed.media.external.uri; 295 } 296 if (embed) { 297 console.log( 298 "processing post mention: " + embed + " in " + evt.uri.toString(), 299 ); 300 await inngest.send({ 301 name: "appview/index-bsky-post-mention", 302 data: { post_uri: evt.uri.toString(), document_link: embed }, 303 }); 304 } 305 } 306}