A couple of Bluesky feeds focused around PDSes
at main 3.8 kB view raw
1import { Client, ok, simpleFetchHandler } from "@atcute/client"; 2import { 3 CompositeDidDocumentResolver, 4 PlcDidDocumentResolver, 5 WebDidDocumentResolver, 6} from "@atcute/identity-resolver"; 7import type { ResourceUri } from "@atcute/lexicons/syntax"; 8import { Jetstream, CommitType } from "@skyware/jetstream"; 9 10import type {} from "@atcute/atproto"; 11 12import { db } from "./common/db.ts"; 13import type { Author, DID } from "./common/types.ts"; 14 15const didResolver = new CompositeDidDocumentResolver({ 16 methods: { 17 plc: new PlcDidDocumentResolver(), 18 web: new WebDidDocumentResolver(), 19 }, 20}); 21 22const worker = new Worker(new URL("./ingest/worker.ts", import.meta.url).href, { 23 type: "module", 24}); 25 26const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?"); 27 28async function getPDS(did: DID, ignoreCache = false) { 29 let pds: string | undefined; 30 31 if (!ignoreCache) { 32 const author = getAuthor.get<Author>(did); 33 if (author) pds = author.pds; 34 } 35 36 if (!pds) { 37 const resolved = await didResolver.resolve(did); 38 for (const service of resolved.service ?? []) { 39 if ( 40 service.type == "AtprotoPersonalDataServer" && 41 typeof service.serviceEndpoint === "string" 42 ) { 43 worker.postMessage({ 44 op: 4, 45 did, 46 pds: service.serviceEndpoint, 47 pds_base: getPDSBase(service.serviceEndpoint), 48 }); 49 pds = service.serviceEndpoint; 50 } 51 } 52 } 53 54 return pds; 55} 56 57function getPDSBase(pds: string) { 58 const url = new URL(pds); 59 const splitDomain = url.hostname.split("."); 60 return `${splitDomain[splitDomain.length - 2]}.${ 61 splitDomain[splitDomain.length - 1] 62 }`; 63} 64 65const getCursor = db.prepare("SELECT cursor FROM state WHERE id = 1"); 66 67const dbCursor = getCursor.get<{ cursor?: string }>(); 68const cursor = dbCursor ? Number(dbCursor.cursor) : 0; 69const jetstream = new Jetstream({ 70 wantedCollections: ["app.bsky.feed.post"], 71 cursor: cursor - 10000000, // back up a bit for seamless playback 72 endpoint: 73 Deno.env.get("JETSTREAM") ?? 74 "wss://jetstream1.us-east.bsky.network/subscribe", 75}); 76 77jetstream.on("open", () => console.log("Listening to the jetstream...")); 78 79jetstream.on("error", (e, c) => { 80 console.error(e); 81 worker.postMessage({ 82 op: 3, 83 cursor: c, 84 }); 85}); 86 87let count = 0; 88 89jetstream.on("commit", async (e) => { 90 count++; 91 if (count >= 1024) { 92 count = 0; 93 worker.postMessage({ 94 op: 3, 95 cursor: e.time_us, 96 }); 97 } 98 99 const atUri: ResourceUri = `at://${e.did}/app.bsky.feed.post/${e.commit.rkey}`; 100 let pds; 101 try { 102 pds = await getPDS(e.did as DID); 103 } catch (e) { 104 console.error(e); 105 return; 106 } 107 108 if (!pds) { 109 console.error(`PDS not found for ${e.did}`); 110 return; 111 } 112 113 if (e.commit.operation === CommitType.Create) { 114 worker.postMessage({ 115 op: 0, 116 atUri, 117 cid: e.commit.cid, 118 did: e.did, 119 pds, 120 }); 121 } else if (e.commit.operation === CommitType.Delete) { 122 worker.postMessage({ 123 op: 1, 124 atUri, 125 pds, 126 }); 127 } 128}); 129 130jetstream.on("identity", async (e) => { 131 const cached = getAuthor.get<Author>(e.did); 132 const pds = await getPDS(e.did as DID, true); 133 if (!pds || cached?.pds === pds) return; 134 const handler = simpleFetchHandler({ service: pds }); 135 const rpc = new Client({ handler }); 136 try { 137 const { records } = await ok( 138 rpc.get("com.atproto.repo.listRecords", { 139 params: { 140 repo: e.did, 141 collection: "app.bsky.feed.post", 142 }, 143 }) 144 ); 145 worker.postMessage({ 146 op: 2, 147 records, 148 did: e.did, 149 pds, 150 }); 151 } catch (e) { 152 console.error(`Failed to backfill posts: ${e}`); 153 } 154}); 155 156jetstream.start(); 157 158export default { 159 fetch() { 160 return new Response("Pong!"); 161 }, 162} satisfies Deno.ServeDefaultExport;