forked from
leaflet.pub/leaflet
a tool for shared writing and social publishing
1import { createClient } from "@supabase/supabase-js";
2import { Database, Json } from "supabase/database.types";
3import { IdResolver } from "@atproto/identity";
4const idResolver = new IdResolver();
5import { Firehose, MemoryRunner, Event } from "@atproto/sync";
6import { ids } from "lexicons/api/lexicons";
7import {
8 PubLeafletDocument,
9 PubLeafletGraphSubscription,
10 PubLeafletPublication,
11 PubLeafletComment,
12 PubLeafletPollVote,
13 PubLeafletPollDefinition,
14} from "lexicons/api";
15import {
16 AppBskyEmbedExternal,
17 AppBskyEmbedRecordWithMedia,
18 AppBskyFeedPost,
19 AppBskyRichtextFacet,
20} from "@atproto/api";
21import { AtUri } from "@atproto/syntax";
22import { writeFile, readFile } from "fs/promises";
23import { createIdentity } from "actions/createIdentity";
24import { drizzle } from "drizzle-orm/node-postgres";
25import { inngest } from "app/api/inngest/client";
26import { Client } from "pg";
27
28const cursorFile = process.env.CURSOR_FILE || "/cursor/cursor";
29
30let supabase = createClient<Database>(
31 process.env.NEXT_PUBLIC_SUPABASE_API_URL as string,
32 process.env.SUPABASE_SERVICE_ROLE_KEY as string,
33);
34const QUOTE_PARAM = "/l-quote/";
35async function main() {
36 const runner = new MemoryRunner({});
37 let firehose = new Firehose({
38 service: "wss://relay1.us-west.bsky.network",
39 subscriptionReconnectDelay: 3000,
40 excludeAccount: true,
41 excludeIdentity: true,
42 runner,
43 idResolver,
44 filterCollections: [
45 ids.PubLeafletDocument,
46 ids.PubLeafletPublication,
47 ids.PubLeafletGraphSubscription,
48 ids.PubLeafletComment,
49 ids.PubLeafletPollVote,
50 ids.PubLeafletPollDefinition,
51 // ids.AppBskyActorProfile,
52 "app.bsky.feed.post",
53 ],
54 handleEvent,
55 onError: (err) => {
56 console.error(err);
57 },
58 });
59 console.log("starting firehose consumer");
60 firehose.start();
61 let cleaningUp = false;
62 const cleanup = async () => {
63 if (cleaningUp) return;
64 cleaningUp = true;
65 console.log("shutting down firehose...");
66 await firehose.destroy();
67 await runner.destroy();
68 process.exit();
69 };
70
71 process.on("SIGINT", cleanup);
72 process.on("SIGTERM", cleanup);
73}
74
75main();
76
77async function handleEvent(evt: Event) {
78 if (evt.event === "identity") {
79 if (evt.handle)
80 await supabase
81 .from("bsky_profiles")
82 .update({ handle: evt.handle })
83 .eq("did", evt.did);
84 }
85 if (
86 evt.event == "account" ||
87 evt.event === "identity" ||
88 evt.event === "sync"
89 )
90 return;
91 if (evt.collection !== "app.bsky.feed.post")
92 console.log(
93 `${evt.event} in ${evt.collection} ${evt.uri}: ${evt.seq} ${evt.time}`,
94 );
95 if (evt.collection === ids.PubLeafletDocument) {
96 if (evt.event === "create" || evt.event === "update") {
97 let record = PubLeafletDocument.validateRecord(evt.record);
98 if (!record.success) {
99 console.log(record.error);
100 return;
101 }
102 let docResult = await supabase.from("documents").upsert({
103 uri: evt.uri.toString(),
104 data: record.value as Json,
105 });
106 if (docResult.error) console.log(docResult.error);
107 let publicationURI = new AtUri(record.value.publication);
108
109 if (publicationURI.host !== evt.uri.host) {
110 console.log("Unauthorized to create post!");
111 return;
112 }
113 let docInPublicationResult = await supabase
114 .from("documents_in_publications")
115 .upsert({
116 publication: record.value.publication,
117 document: evt.uri.toString(),
118 });
119 await supabase
120 .from("documents_in_publications")
121 .delete()
122 .neq("publication", record.value.publication)
123 .eq("document", evt.uri.toString());
124 if (docInPublicationResult.error)
125 console.log(docInPublicationResult.error);
126 }
127 if (evt.event === "delete") {
128 await supabase.from("documents").delete().eq("uri", evt.uri.toString());
129 }
130 }
131 if (evt.collection === ids.PubLeafletPublication) {
132 if (evt.event === "create" || evt.event === "update") {
133 let record = PubLeafletPublication.validateRecord(evt.record);
134 if (!record.success) return;
135 let { error } = await supabase.from("publications").upsert({
136 uri: evt.uri.toString(),
137 identity_did: evt.did,
138 name: record.value.name,
139 record: record.value as Json,
140 });
141
142 if (error && error.code === "23503") {
143 console.log("creating identity");
144 let client = new Client({ connectionString: process.env.DB_URL });
145 let db = drizzle(client);
146 await createIdentity(db, { atp_did: evt.did });
147 client.end();
148 await supabase.from("publications").upsert({
149 uri: evt.uri.toString(),
150 identity_did: evt.did,
151 name: record.value.name,
152 record: record.value as Json,
153 });
154 }
155 }
156 if (evt.event === "delete") {
157 await supabase
158 .from("publications")
159 .delete()
160 .eq("uri", evt.uri.toString());
161 }
162 }
163 if (evt.collection === ids.PubLeafletComment) {
164 if (evt.event === "create" || evt.event === "update") {
165 let record = PubLeafletComment.validateRecord(evt.record);
166 if (!record.success) return;
167 let { error } = await supabase.from("comments_on_documents").upsert({
168 uri: evt.uri.toString(),
169 profile: evt.did,
170 document: record.value.subject,
171 record: record.value as Json,
172 });
173 }
174 if (evt.event === "delete") {
175 await supabase
176 .from("comments_on_documents")
177 .delete()
178 .eq("uri", evt.uri.toString());
179 }
180 }
181 if (evt.collection === ids.PubLeafletPollVote) {
182 if (evt.event === "create" || evt.event === "update") {
183 let record = PubLeafletPollVote.validateRecord(evt.record);
184 if (!record.success) return;
185 let { error } = await supabase.from("atp_poll_votes").upsert({
186 uri: evt.uri.toString(),
187 voter_did: evt.did,
188 poll_uri: record.value.poll.uri,
189 poll_cid: record.value.poll.cid,
190 record: record.value as Json,
191 });
192 }
193 if (evt.event === "delete") {
194 await supabase
195 .from("atp_poll_votes")
196 .delete()
197 .eq("uri", evt.uri.toString());
198 }
199 }
200 if (evt.collection === ids.PubLeafletPollDefinition) {
201 if (evt.event === "create" || evt.event === "update") {
202 let record = PubLeafletPollDefinition.validateRecord(evt.record);
203 if (!record.success) return;
204 let { error } = await supabase.from("atp_poll_records").upsert({
205 uri: evt.uri.toString(),
206 cid: evt.cid.toString(),
207 record: record.value as Json,
208 });
209 if (error) console.log("Error upserting poll definition:", error);
210 }
211 if (evt.event === "delete") {
212 await supabase
213 .from("atp_poll_records")
214 .delete()
215 .eq("uri", evt.uri.toString());
216 }
217 }
218 if (evt.collection === ids.PubLeafletGraphSubscription) {
219 if (evt.event === "create" || evt.event === "update") {
220 let record = PubLeafletGraphSubscription.validateRecord(evt.record);
221 if (!record.success) return;
222 let { error } = await supabase.from("publication_subscriptions").upsert({
223 uri: evt.uri.toString(),
224 identity: evt.did,
225 publication: record.value.publication,
226 record: record.value as Json,
227 });
228 if (error && error.code === "23503") {
229 console.log("creating identity");
230 let client = new Client({ connectionString: process.env.DB_URL });
231 let db = drizzle(client);
232 await createIdentity(db, { atp_did: evt.did });
233 client.end();
234 await supabase.from("publication_subscriptions").upsert({
235 uri: evt.uri.toString(),
236 identity: evt.did,
237 publication: record.value.publication,
238 record: record.value as Json,
239 });
240 }
241 }
242 if (evt.event === "delete") {
243 await supabase
244 .from("publication_subscriptions")
245 .delete()
246 .eq("uri", evt.uri.toString());
247 }
248 }
249 // if (evt.collection === ids.AppBskyActorProfile) {
250 // //only listen to updates because we should fetch it for the first time when they subscribe!
251 // if (evt.event === "update") {
252 // await supabaseServerClient
253 // .from("bsky_profiles")
254 // .update({ record: evt.record as Json })
255 // .eq("did", evt.did);
256 // }
257 // }
258 if (evt.collection === "app.bsky.feed.post") {
259 if (evt.event !== "create") return;
260
261 // Early exit if no embed
262 if (
263 !evt.record ||
264 typeof evt.record !== "object" ||
265 !("embed" in evt.record)
266 )
267 return;
268
269 // Check if embed contains our quote param using optional chaining
270 const embedRecord = evt.record as any;
271 const hasQuoteParam =
272 embedRecord.embed?.external?.uri?.includes(QUOTE_PARAM) ||
273 embedRecord.embed?.media?.external?.uri?.includes(QUOTE_PARAM);
274
275 if (!hasQuoteParam) return;
276 console.log("FOUND EMBED!!!");
277
278 // Now validate the record since we know it contains our quote param
279 let record = AppBskyFeedPost.validateRecord(evt.record);
280 if (!record.success) return;
281
282 let embed: string | null = null;
283 if (
284 AppBskyEmbedExternal.isMain(record.value.embed) &&
285 record.value.embed.external.uri.includes(QUOTE_PARAM)
286 ) {
287 embed = record.value.embed.external.uri;
288 }
289 if (
290 AppBskyEmbedRecordWithMedia.isMain(record.value.embed) &&
291 AppBskyEmbedExternal.isMain(record.value.embed.media) &&
292 record.value.embed.media?.external?.uri.includes(QUOTE_PARAM)
293 ) {
294 embed = record.value.embed.media.external.uri;
295 }
296 if (embed) {
297 console.log(
298 "processing post mention: " + embed + " in " + evt.uri.toString(),
299 );
300 await inngest.send({
301 name: "appview/index-bsky-post-mention",
302 data: { post_uri: evt.uri.toString(), document_link: embed },
303 });
304 }
305 }
306}