atproto pastebin service: https://plonk.li
at plonkin 2.7 kB view raw
1import pino from "pino"; 2import { IdResolver } from "@atproto/identity"; 3import { Firehose } from "@atproto/sync"; 4import type { Database } from "#/db"; 5import * as Paste from "#/lexicons/types/li/plonk/paste"; 6import * as Comment from "#/lexicons/types/li/plonk/comment"; 7 8export function createIngester(db: Database, idResolver: IdResolver) { 9 const logger = pino({ name: "firehose ingestion" }); 10 return new Firehose({ 11 idResolver, 12 handleEvent: async (evt) => { 13 // Watch for write events 14 if (evt.event === "create" || evt.event === "update") { 15 const now = new Date(); 16 const record = evt.record; 17 18 // If the write is a valid status update 19 if ( 20 evt.collection === "li.plonk.paste" && 21 Paste.isRecord(record) && 22 Paste.validateRecord(record).success 23 ) { 24 await db 25 .insertInto("paste") 26 .values({ 27 uri: evt.uri.toString(), 28 shortUrl: record.shortUrl, 29 authorDid: evt.did, 30 code: record.code, 31 lang: record.lang, 32 title: record.title, 33 createdAt: record.createdAt, 34 indexedAt: now.toISOString(), 35 }) 36 .onConflict((oc) => 37 oc.column("uri").doUpdateSet({ 38 code: record.code, 39 lang: record.lang, 40 title: record.title, 41 indexedAt: now.toISOString(), 42 }), 43 ) 44 .execute(); 45 } else if ( 46 evt.collection === "li.plonk.comment" && 47 Comment.isRecord(record) && 48 Comment.validateRecord(record).success 49 ) { 50 await db 51 .insertInto("comment") 52 .values({ 53 uri: evt.uri.toString(), 54 authorDid: evt.did, 55 body: record.content, 56 pasteUri: record.post.uri, 57 pasteCid: record.post.cid, 58 createdAt: record.createdAt, 59 indexedAt: now.toISOString(), 60 }) 61 .onConflict((oc) => 62 oc.column("uri").doUpdateSet({ 63 body: record.content, 64 pasteUri: record.post.uri, 65 pasteCid: record.post.cid, 66 indexedAt: now.toISOString(), 67 }), 68 ) 69 .execute(); 70 } 71 } else if ( 72 evt.event === "delete" && 73 evt.collection === "li.plonk.paste" 74 ) { 75 // Remove the status from our SQLite 76 await db 77 .deleteFrom("paste") 78 .where("uri", "=", evt.uri.toString()) 79 .execute(); 80 } else if ( 81 evt.event === "delete" && 82 evt.collection === "li.plonk.comment" 83 ) { 84 // Remove the status from our SQLite 85 await db 86 .deleteFrom("comment") 87 .where("uri", "=", evt.uri.toString()) 88 .execute(); 89 } 90 }, 91 onError: (err) => { 92 logger.error({ err }, "error on firehose ingestion"); 93 }, 94 filterCollections: ["li.plonk.paste"], 95 excludeIdentity: true, 96 excludeAccount: true, 97 }); 98}