1import pino from "pino";
2import { IdResolver } from "@atproto/identity";
3import { Firehose } from "@atproto/sync";
4import type { Database } from "#/db";
5import * as Paste from "#/lexicons/types/li/plonk/paste";
6import * as Comment from "#/lexicons/types/li/plonk/comment";
7
8export function createIngester(db: Database, idResolver: IdResolver) {
9 const logger = pino({ name: "firehose ingestion" });
10 return new Firehose({
11 idResolver,
12 handleEvent: async (evt) => {
13 // Watch for write events
14 if (evt.event === "create" || evt.event === "update") {
15 const now = new Date();
16 const record = evt.record;
17
18 // If the write is a valid status update
19 if (
20 evt.collection === "li.plonk.paste" &&
21 Paste.isRecord(record) &&
22 Paste.validateRecord(record).success
23 ) {
24 await db
25 .insertInto("paste")
26 .values({
27 uri: evt.uri.toString(),
28 shortUrl: record.shortUrl,
29 authorDid: evt.did,
30 code: record.code,
31 lang: record.lang,
32 title: record.title,
33 createdAt: record.createdAt,
34 indexedAt: now.toISOString(),
35 })
36 .onConflict((oc) =>
37 oc.column("uri").doUpdateSet({
38 code: record.code,
39 lang: record.lang,
40 title: record.title,
41 indexedAt: now.toISOString(),
42 }),
43 )
44 .execute();
45 } else if (
46 evt.collection === "li.plonk.comment" &&
47 Comment.isRecord(record) &&
48 Comment.validateRecord(record).success
49 ) {
50 await db
51 .insertInto("comment")
52 .values({
53 uri: evt.uri.toString(),
54 authorDid: evt.did,
55 body: record.content,
56 pasteUri: record.post.uri,
57 pasteCid: record.post.cid,
58 createdAt: record.createdAt,
59 indexedAt: now.toISOString(),
60 })
61 .onConflict((oc) =>
62 oc.column("uri").doUpdateSet({
63 body: record.content,
64 pasteUri: record.post.uri,
65 pasteCid: record.post.cid,
66 indexedAt: now.toISOString(),
67 }),
68 )
69 .execute();
70 }
71 } else if (
72 evt.event === "delete" &&
73 evt.collection === "li.plonk.paste"
74 ) {
75 // Remove the status from our SQLite
76 await db
77 .deleteFrom("paste")
78 .where("uri", "=", evt.uri.toString())
79 .execute();
80 } else if (
81 evt.event === "delete" &&
82 evt.collection === "li.plonk.comment"
83 ) {
84 // Remove the status from our SQLite
85 await db
86 .deleteFrom("comment")
87 .where("uri", "=", evt.uri.toString())
88 .execute();
89 }
90 },
91 onError: (err) => {
92 logger.error({ err }, "error on firehose ingestion");
93 },
94 filterCollections: ["li.plonk.paste"],
95 excludeIdentity: true,
96 excludeAccount: true,
97 });
98}