a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12 PubLeafletPollVote,
13 PubLeafletPollDefinition,
14 SiteStandardDocument,
15 SiteStandardPublication,
16 SiteStandardGraphSubscription,
17} from "lexicons/api";
18import {
19 AppBskyEmbedExternal,
20 AppBskyEmbedRecordWithMedia,
21 AppBskyFeedPost,
22 AppBskyRichtextFacet,
23} from "@atproto/api";
24import { AtUri } from "@atproto/syntax";
25import { writeFile, readFile } from "fs/promises";
26import { inngest } from "app/api/inngest/client";
27
28const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
29
30let supabase = createClient<Database>(
31 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
32 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
33);
34const QUOTE_PARAM = "/l-quote/";
35async function main() {
36 const runner = new MemoryRunner({});
37 let firehose = new Firehose({
38 service: "wss://relay1.us-west.bsky.network",
39 subscriptionReconnectDelay: 3000,
40 excludeAccount: true,
41 excludeIdentity: true,
42 runner,
43 idResolver,
44 filterCollections: [
45 ids.PubLeafletDocument,
46 ids.PubLeafletPublication,
47 ids.PubLeafletGraphSubscription,
48 ids.PubLeafletComment,
49 ids.PubLeafletPollVote,
50 ids.PubLeafletPollDefinition,
51 // ids.AppBskyActorProfile,
52 "app.bsky.feed.post",
53 ids.SiteStandardDocument,
54 ids.SiteStandardPublication,
55 ids.SiteStandardGraphSubscription,
56 ],
57 handleEvent,
58 onError: (err) => {
59 console.error(err);
60 },
61 });
62 console.log("starting firehose consumer");
63 firehose.start();
64 let cleaningUp = false;
65 const cleanup = async () => {
66 if (cleaningUp) return;
67 cleaningUp = true;
68 console.log("shutting down firehose...");
69 await firehose.destroy();
70 await runner.destroy();
71 process.exit();
72 };
73
74 process.on("SIGINT", cleanup);
75 process.on("SIGTERM", cleanup);
76}
77
78main();
79
80async function handleEvent(evt: Event) {
81 if (evt.event === "identity") {
82 if (evt.handle)
83 await supabase
84 .from("bsky_profiles")
85 .update({ handle: evt.handle })
86 .eq("did", evt.did);
87 }
88 if (
89 evt.event == "account" ||
90 evt.event === "identity" ||
91 evt.event === "sync"
92 )
93 return;
94 if (evt.collection !== "app.bsky.feed.post")
95 console.log(
96 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
97 );
98 if (evt.collection === ids.PubLeafletDocument) {
99 if (evt.event === "create" || evt.event === "update") {
100 let record = PubLeafletDocument.validateRecord(evt.record);
101 if (!record.success) {
102 console.log(record.error);
103 return;
104 }
105 let docResult = await supabase.from("documents").upsert({
106 uri: evt.uri.toString(),
107 data: record.value as Json,
108 });
109 if (docResult.error) console.log(docResult.error);
110 if (record.value.publication) {
111 let publicationURI = new AtUri(record.value.publication);
112
113 if (publicationURI.host !== evt.uri.host) {
114 console.log("Unauthorized to create post!");
115 return;
116 }
117 let docInPublicationResult = await supabase
118 .from("documents_in_publications")
119 .upsert({
120 publication: record.value.publication,
121 document: evt.uri.toString(),
122 });
123 await supabase
124 .from("documents_in_publications")
125 .delete()
126 .neq("publication", record.value.publication)
127 .eq("document", evt.uri.toString());
128
129 if (docInPublicationResult.error)
130 console.log(docInPublicationResult.error);
131 }
132 }
133 if (evt.event === "delete") {
134 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
135 }
136 }
137 if (evt.collection === ids.PubLeafletPublication) {
138 if (evt.event === "create" || evt.event === "update") {
139 let record = PubLeafletPublication.validateRecord(evt.record);
140 if (!record.success) return;
141 await supabase
142 .from("identities")
143 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
144 await supabase.from("publications").upsert({
145 uri: evt.uri.toString(),
146 identity_did: evt.did,
147 name: record.value.name,
148 record: record.value as Json,
149 });
150 }
151 if (evt.event === "delete") {
152 await supabase
153 .from("publications")
154 .delete()
155 .eq("uri", evt.uri.toString());
156 }
157 }
158 if (evt.collection === ids.PubLeafletComment) {
159 if (evt.event === "create" || evt.event === "update") {
160 let record = PubLeafletComment.validateRecord(evt.record);
161 if (!record.success) return;
162 let { error } = await supabase.from("comments_on_documents").upsert({
163 uri: evt.uri.toString(),
164 profile: evt.did,
165 document: record.value.subject,
166 record: record.value as Json,
167 });
168 }
169 if (evt.event === "delete") {
170 await supabase
171 .from("comments_on_documents")
172 .delete()
173 .eq("uri", evt.uri.toString());
174 }
175 }
176 if (evt.collection === ids.PubLeafletPollVote) {
177 if (evt.event === "create" || evt.event === "update") {
178 let record = PubLeafletPollVote.validateRecord(evt.record);
179 if (!record.success) return;
180 let { error } = await supabase.from("atp_poll_votes").upsert({
181 uri: evt.uri.toString(),
182 voter_did: evt.did,
183 poll_uri: record.value.poll.uri,
184 poll_cid: record.value.poll.cid,
185 record: record.value as Json,
186 });
187 }
188 if (evt.event === "delete") {
189 await supabase
190 .from("atp_poll_votes")
191 .delete()
192 .eq("uri", evt.uri.toString());
193 }
194 }
195 if (evt.collection === ids.PubLeafletPollDefinition) {
196 if (evt.event === "create" || evt.event === "update") {
197 let record = PubLeafletPollDefinition.validateRecord(evt.record);
198 if (!record.success) return;
199 let { error } = await supabase.from("atp_poll_records").upsert({
200 uri: evt.uri.toString(),
201 cid: evt.cid.toString(),
202 record: record.value as Json,
203 });
204 if (error) console.log("Error upserting poll definition:", error);
205 }
206 if (evt.event === "delete") {
207 await supabase
208 .from("atp_poll_records")
209 .delete()
210 .eq("uri", evt.uri.toString());
211 }
212 }
213 if (evt.collection === ids.PubLeafletGraphSubscription) {
214 if (evt.event === "create" || evt.event === "update") {
215 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
216 if (!record.success) return;
217 await supabase
218 .from("identities")
219 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
220 await supabase.from("publication_subscriptions").upsert({
221 uri: evt.uri.toString(),
222 identity: evt.did,
223 publication: record.value.publication,
224 record: record.value as Json,
225 });
226 }
227 if (evt.event === "delete") {
228 await supabase
229 .from("publication_subscriptions")
230 .delete()
231 .eq("uri", evt.uri.toString());
232 }
233 }
234 // site.standard.document records go into the main "documents" table
235 // The normalization layer handles reading both pub.leaflet and site.standard formats
236 if (evt.collection === ids.SiteStandardDocument) {
237 if (evt.event === "create" || evt.event === "update") {
238 let record = SiteStandardDocument.validateRecord(evt.record);
239 if (!record.success) {
240 console.log(record.error);
241 return;
242 }
243 let docResult = await supabase.from("documents").upsert({
244 uri: evt.uri.toString(),
245 data: record.value as Json,
246 });
247 if (docResult.error) console.log(docResult.error);
248
249 // site.standard.document uses "site" field to reference the publication
250 // For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey)
251 // For standalone documents, site is an HTTPS URL (https://leaflet.pub/p/did:plc:xxx)
252 // Only link to publications table for AT-URI sites
253 if (record.value.site && record.value.site.startsWith("at://")) {
254 let siteURI = new AtUri(record.value.site);
255
256 if (siteURI.host !== evt.uri.host) {
257 console.log("Unauthorized to create document in site!");
258 return;
259 }
260 let docInPublicationResult = await supabase
261 .from("documents_in_publications")
262 .upsert({
263 publication: record.value.site,
264 document: evt.uri.toString(),
265 });
266 await supabase
267 .from("documents_in_publications")
268 .delete()
269 .neq("publication", record.value.site)
270 .eq("document", evt.uri.toString());
271
272 if (docInPublicationResult.error)
273 console.log(docInPublicationResult.error);
274 }
275 }
276 if (evt.event === "delete") {
277 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
278 }
279 }
280
281 // site.standard.publication records go into the main "publications" table
282 if (evt.collection === ids.SiteStandardPublication) {
283 if (evt.event === "create" || evt.event === "update") {
284 let record = SiteStandardPublication.validateRecord(evt.record);
285 if (!record.success) return;
286 await supabase
287 .from("identities")
288 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
289 await supabase.from("publications").upsert({
290 uri: evt.uri.toString(),
291 identity_did: evt.did,
292 name: record.value.name,
293 record: record.value as Json,
294 });
295 }
296 if (evt.event === "delete") {
297 await supabase
298 .from("publications")
299 .delete()
300 .eq("uri", evt.uri.toString());
301 }
302 }
303
304 // site.standard.graph.subscription records go into the main "publication_subscriptions" table
305 if (evt.collection === ids.SiteStandardGraphSubscription) {
306 if (evt.event === "create" || evt.event === "update") {
307 let record = SiteStandardGraphSubscription.validateRecord(evt.record);
308 if (!record.success) return;
309 await supabase
310 .from("identities")
311 .upsert({ atp_did: evt.did }, { onConflict: "atp_did" });
312 await supabase.from("publication_subscriptions").upsert({
313 uri: evt.uri.toString(),
314 identity: evt.did,
315 publication: record.value.publication,
316 record: record.value as Json,
317 });
318 }
319 if (evt.event === "delete") {
320 await supabase
321 .from("publication_subscriptions")
322 .delete()
323 .eq("uri", evt.uri.toString());
324 }
325 }
326 // if (evt.collection === ids.AppBskyActorProfile) {
327 // //only listen to updates because we should fetch it for the first time when they subscribe!
328 // if (evt.event === "update") {
329 // await supabaseServerClient
330 // .from("bsky_profiles")
331 // .update({ record: evt.record as Json })
332 // .eq("did", evt.did);
333 // }
334 // }
335 if (evt.collection === "app.bsky.feed.post") {
336 if (evt.event !== "create") return;
337
338 // Early exit if no embed
339 if (
340 !evt.record ||
341 typeof evt.record !== "object" ||
342 !("embed" in evt.record)
343 )
344 return;
345
346 // Check if embed contains our quote param using optional chaining
347 const embedRecord = evt.record as any;
348 const hasQuoteParam =
349 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
350 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
351
352 if (!hasQuoteParam) return;
353 console.log("FOUND EMBED!!!");
354
355 // Now validate the record since we know it contains our quote param
356 let record = AppBskyFeedPost.validateRecord(evt.record);
357 if (!record.success) return;
358
359 let embed: string | null = null;
360 if (
361 AppBskyEmbedExternal.isMain(record.value.embed) &&
362 record.value.embed.external.uri.includes(QUOTE_PARAM)
363 ) {
364 embed = record.value.embed.external.uri;
365 }
366 if (
367 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
368 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
369 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
370 ) {
371 embed = record.value.embed.media.external.uri;
372 }
373 if (embed) {
374 console.log(
375 "processing post mention: " + embed + " in " + evt.uri.toString(),
376 );
377 await inngest.send({
378 name: "appview/index-bsky-post-mention",
379 data: { post_uri: evt.uri.toString(), document_link: embed },
380 });
381 }
382 }
383}