a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12 PubLeafletPollVote,
13 PubLeafletPollDefinition,
14 PubLeafletInteractionsRecommend,
15 SiteStandardDocument,
16 SiteStandardPublication,
17 SiteStandardGraphSubscription,
18} from "lexicons/api";
19import {
20 AppBskyEmbedExternal,
21 AppBskyEmbedRecordWithMedia,
22 AppBskyFeedPost,
23 AppBskyRichtextFacet,
24} from "@atproto/api";
25import { AtUri } from "@atproto/syntax";
26import { writeFile, readFile } from "fs/promises";
27import { inngest } from "app/api/inngest/client";
28
29const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
30
31let supabase = createClient<Database>(
32 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
33 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
34);
35const QUOTE_PARAM = "/l-quote/";
36async function main() {
37 const runner = new MemoryRunner({});
38 let firehose = new Firehose({
39 service: "wss://relay1.us-west.bsky.network",
40 subscriptionReconnectDelay: 3000,
41 excludeAccount: true,
42 excludeIdentity: true,
43 runner,
44 idResolver,
45 filterCollections: [
46 ids.PubLeafletDocument,
47 ids.PubLeafletPublication,
48 ids.PubLeafletGraphSubscription,
49 ids.PubLeafletComment,
50 ids.PubLeafletPollVote,
51 ids.PubLeafletPollDefinition,
52 ids.PubLeafletInteractionsRecommend,
53 // ids.AppBskyActorProfile,
54 "app.bsky.feed.post",
55 ids.SiteStandardDocument,
56 ids.SiteStandardPublication,
57 ids.SiteStandardGraphSubscription,
58 ],
59 handleEvent,
60 onError: (err) => {
61 console.error(err);
62 },
63 });
64 console.log("starting firehose consumer");
65 firehose.start();
66 let cleaningUp = false;
67 const cleanup = async () => {
68 if (cleaningUp) return;
69 cleaningUp = true;
70 console.log("shutting down firehose...");
71 await firehose.destroy();
72 await runner.destroy();
73 process.exit();
74 };
75
76 process.on("SIGINT", cleanup);
77 process.on("SIGTERM", cleanup);
78}
79
80main();
81
82async function handleEvent(evt: Event) {
83 if (evt.event === "identity") {
84 if (evt.handle)
85 await supabase
86 .from("bsky_profiles")
87 .update({ handle: evt.handle })
88 .eq("did", evt.did);
89 }
90 if (
91 evt.event == "account" ||
92 evt.event === "identity" ||
93 evt.event === "sync"
94 )
95 return;
96 if (evt.collection !== "app.bsky.feed.post")
97 console.log(
98 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
99 );
100 if (evt.collection === ids.PubLeafletDocument) {
101 if (evt.event === "create" || evt.event === "update") {
102 let record = PubLeafletDocument.validateRecord(evt.record);
103 if (!record.success) {
104 console.log(record.error);
105 return;
106 }
107 let docResult = await supabase.from("documents").upsert({
108 uri: evt.uri.toString(),
109 data: record.value as Json,
110 });
111 if (docResult.error) console.log(docResult.error);
112 if (record.value.publication) {
113 let publicationURI = new AtUri(record.value.publication);
114
115 if (publicationURI.host !== evt.uri.host) {
116 console.log("Unauthorized to create post!");
117 return;
118 }
119 let docInPublicationResult = await supabase
120 .from("documents_in_publications")
121 .upsert({
122 publication: record.value.publication,
123 document: evt.uri.toString(),
124 });
125 await supabase
126 .from("documents_in_publications")
127 .delete()
128 .neq("publication", record.value.publication)
129 .eq("document", evt.uri.toString());
130
131 if (docInPublicationResult.error)
132 console.log(docInPublicationResult.error);
133 }
134 }
135 if (evt.event === "delete") {
136 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
137 }
138 }
139 if (evt.collection === ids.PubLeafletPublication) {
140 if (evt.event === "create" || evt.event === "update") {
141 let record = PubLeafletPublication.validateRecord(evt.record);
142 if (!record.success) return;
143 await supabase
144 .from("identities")
145 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
146 await supabase.from("publications").upsert({
147 uri: evt.uri.toString(),
148 identity_did: evt.did,
149 name: record.value.name,
150 record: record.value as Json,
151 });
152 }
153 if (evt.event === "delete") {
154 await supabase
155 .from("publications")
156 .delete()
157 .eq("uri", evt.uri.toString());
158 }
159 }
160 if (evt.collection === ids.PubLeafletComment) {
161 if (evt.event === "create" || evt.event === "update") {
162 let record = PubLeafletComment.validateRecord(evt.record);
163 if (!record.success) return;
164 let { error } = await supabase.from("comments_on_documents").upsert({
165 uri: evt.uri.toString(),
166 profile: evt.did,
167 document: record.value.subject,
168 record: record.value as Json,
169 });
170 }
171 if (evt.event === "delete") {
172 await supabase
173 .from("comments_on_documents")
174 .delete()
175 .eq("uri", evt.uri.toString());
176 }
177 }
178 if (evt.collection === ids.PubLeafletPollVote) {
179 if (evt.event === "create" || evt.event === "update") {
180 let record = PubLeafletPollVote.validateRecord(evt.record);
181 if (!record.success) return;
182 let { error } = await supabase.from("atp_poll_votes").upsert({
183 uri: evt.uri.toString(),
184 voter_did: evt.did,
185 poll_uri: record.value.poll.uri,
186 poll_cid: record.value.poll.cid,
187 record: record.value as Json,
188 });
189 }
190 if (evt.event === "delete") {
191 await supabase
192 .from("atp_poll_votes")
193 .delete()
194 .eq("uri", evt.uri.toString());
195 }
196 }
197 if (evt.collection === ids.PubLeafletPollDefinition) {
198 if (evt.event === "create" || evt.event === "update") {
199 let record = PubLeafletPollDefinition.validateRecord(evt.record);
200 if (!record.success) return;
201 let { error } = await supabase.from("atp_poll_records").upsert({
202 uri: evt.uri.toString(),
203 cid: evt.cid.toString(),
204 record: record.value as Json,
205 });
206 if (error) console.log("Error upserting poll definition:", error);
207 }
208 if (evt.event === "delete") {
209 await supabase
210 .from("atp_poll_records")
211 .delete()
212 .eq("uri", evt.uri.toString());
213 }
214 }
215 if (evt.collection === ids.PubLeafletInteractionsRecommend) {
216 if (evt.event === "create" || evt.event === "update") {
217 let record = PubLeafletInteractionsRecommend.validateRecord(evt.record);
218 if (!record.success) return;
219 await supabase
220 .from("identities")
221 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
222 let { error } = await supabase.from("recommends_on_documents").upsert({
223 uri: evt.uri.toString(),
224 recommender_did: evt.did,
225 document: record.value.subject,
226 record: record.value as Json,
227 });
228 if (error) console.log("Error upserting recommend:", error);
229 }
230 if (evt.event === "delete") {
231 await supabase
232 .from("recommends_on_documents")
233 .delete()
234 .eq("uri", evt.uri.toString());
235 }
236 }
237 if (evt.collection === ids.PubLeafletGraphSubscription) {
238 if (evt.event === "create" || evt.event === "update") {
239 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
240 if (!record.success) return;
241 await supabase
242 .from("identities")
243 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
244 await supabase.from("publication_subscriptions").upsert({
245 uri: evt.uri.toString(),
246 identity: evt.did,
247 publication: record.value.publication,
248 record: record.value as Json,
249 });
250 }
251 if (evt.event === "delete") {
252 await supabase
253 .from("publication_subscriptions")
254 .delete()
255 .eq("uri", evt.uri.toString());
256 }
257 }
258 // site.standard.document records go into the main "documents" table
259 // The normalization layer handles reading both pub.leaflet and site.standard formats
260 if (evt.collection === ids.SiteStandardDocument) {
261 if (evt.event === "create" || evt.event === "update") {
262 let record = SiteStandardDocument.validateRecord(evt.record);
263 if (!record.success) {
264 console.log(record.error);
265 return;
266 }
267 let docResult = await supabase.from("documents").upsert({
268 uri: evt.uri.toString(),
269 data: record.value as Json,
270 });
271 if (docResult.error) console.log(docResult.error);
272
273 // site.standard.document uses "site" field to reference the publication
274 // For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey)
275 // For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx)
276 // Only link to publications table for AT-URI sites
277 if (record.value.site && record.value.site.startsWith("at://")) {
278 let siteURI = new AtUri(record.value.site);
279
280 if (siteURI.host !== evt.uri.host) {
281 console.log("Unauthorized to create document in site!");
282 return;
283 }
284 let docInPublicationResult = await supabase
285 .from("documents_in_publications")
286 .upsert({
287 publication: record.value.site,
288 document: evt.uri.toString(),
289 });
290 await supabase
291 .from("documents_in_publications")
292 .delete()
293 .neq("publication", record.value.site)
294 .eq("document", evt.uri.toString());
295
296 if (docInPublicationResult.error)
297 console.log(docInPublicationResult.error);
298 }
299 }
300 if (evt.event === "delete") {
301 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
302 }
303 }
304
305 // site.standard.publication records go into the main "publications" table
306 if (evt.collection === ids.SiteStandardPublication) {
307 if (evt.event === "create" || evt.event === "update") {
308 let record = SiteStandardPublication.validateRecord(evt.record);
309 if (!record.success) return;
310 await supabase
311 .from("identities")
312 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
313 await supabase.from("publications").upsert({
314 uri: evt.uri.toString(),
315 identity_did: evt.did,
316 name: record.value.name,
317 record: record.value as Json,
318 });
319 }
320 if (evt.event === "delete") {
321 await supabase
322 .from("publications")
323 .delete()
324 .eq("uri", evt.uri.toString());
325 }
326 }
327
328 // site.standard.graph.subscription records go into the main "publication_subscriptions" table
329 if (evt.collection === ids.SiteStandardGraphSubscription) {
330 if (evt.event === "create" || evt.event === "update") {
331 let record = SiteStandardGraphSubscription.validateRecord(evt.record);
332 if (!record.success) return;
333 await supabase
334 .from("identities")
335 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
336 await supabase.from("publication_subscriptions").upsert({
337 uri: evt.uri.toString(),
338 identity: evt.did,
339 publication: record.value.publication,
340 record: record.value as Json,
341 });
342 }
343 if (evt.event === "delete") {
344 await supabase
345 .from("publication_subscriptions")
346 .delete()
347 .eq("uri", evt.uri.toString());
348 }
349 }
350 // if (evt.collection === ids.AppBskyActorProfile) {
351 // //only listen to updates because we should fetch it for the first time when they subscribe!
352 // if (evt.event === "update") {
353 // await supabaseServerClient
354 // .from("bsky_profiles")
355 // .update({ record: evt.record as Json })
356 // .eq("did", evt.did);
357 // }
358 // }
359 if (evt.collection === "app.bsky.feed.post") {
360 if (evt.event !== "create") return;
361
362 // Early exit if no embed
363 if (
364 !evt.record ||
365 typeof evt.record !== "object" ||
366 !("embed" in evt.record)
367 )
368 return;
369
370 // Check if embed contains our quote param using optional chaining
371 const embedRecord = evt.record as any;
372 const hasQuoteParam =
373 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
374 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
375
376 if (!hasQuoteParam) return;
377 console.log("FOUND EMBED!!!");
378
379 // Now validate the record since we know it contains our quote param
380 let record = AppBskyFeedPost.validateRecord(evt.record);
381 if (!record.success) {
382 console.log(record.error);
383 return;
384 }
385
386 let embed: string | null = null;
387 if (
388 AppBskyEmbedExternal.isMain(record.value.embed) &&
389 record.value.embed.external.uri.includes(QUOTE_PARAM)
390 ) {
391 embed = record.value.embed.external.uri;
392 }
393 if (
394 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
395 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
396 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
397 ) {
398 embed = record.value.embed.media.external.uri;
399 }
400 if (embed) {
401 console.log(
402 "processing post mention: " + embed + " in " + evt.uri.toString(),
403 );
404 await inngest.send({
405 name: "appview/index-bsky-post-mention",
406 data: { post_uri: evt.uri.toString(), document_link: embed },
407 });
408 }
409 }
410}