A couple of Bluesky feeds focused around PDSes
1import { Client, ok, simpleFetchHandler } from "@atcute/client";
2import {
3 CompositeDidDocumentResolver,
4 PlcDidDocumentResolver,
5 WebDidDocumentResolver,
6} from "@atcute/identity-resolver";
7import type { ResourceUri } from "@atcute/lexicons/syntax";
8import { Jetstream, CommitType } from "@skyware/jetstream";
9
10import type {} from "@atcute/atproto";
11
12import { db } from "./common/db.ts";
13import type { Author, DID } from "./common/types.ts";
14
15const didResolver = new CompositeDidDocumentResolver({
16 methods: {
17 plc: new PlcDidDocumentResolver(),
18 web: new WebDidDocumentResolver(),
19 },
20});
21
22const worker = new Worker(new URL("./ingest/worker.ts", import.meta.url).href, {
23 type: "module",
24});
25
26const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?");
27
28async function getPDS(did: DID, ignoreCache = false) {
29 let pds: string | undefined;
30
31 if (!ignoreCache) {
32 const author = getAuthor.get<Author>(did);
33 if (author) pds = author.pds;
34 }
35
36 if (!pds) {
37 const resolved = await didResolver.resolve(did);
38 for (const service of resolved.service ?? []) {
39 if (
40 service.type == "AtprotoPersonalDataServer" &&
41 typeof service.serviceEndpoint === "string"
42 ) {
43 worker.postMessage({
44 op: 4,
45 did,
46 pds: service.serviceEndpoint,
47 pds_base: getPDSBase(service.serviceEndpoint),
48 });
49 pds = service.serviceEndpoint;
50 }
51 }
52 }
53
54 return pds;
55}
56
57function getPDSBase(pds: string) {
58 const url = new URL(pds);
59 const splitDomain = url.hostname.split(".");
60 return `${splitDomain[splitDomain.length - 2]}.${
61 splitDomain[splitDomain.length - 1]
62 }`;
63}
64
65const getCursor = db.prepare("SELECT cursor FROM state WHERE id = 1");
66
67const dbCursor = getCursor.get<{ cursor?: string }>();
68const cursor = dbCursor ? Number(dbCursor.cursor) : 0;
69const jetstream = new Jetstream({
70 wantedCollections: ["app.bsky.feed.post"],
71 cursor: cursor - 10000000, // back up a bit for seamless playback
72 endpoint:
73 Deno.env.get("JETSTREAM") ??
74 "wss://jetstream1.us-east.bsky.network/subscribe",
75});
76
77jetstream.on("open", () => console.log("Listening to the jetstream..."));
78
79jetstream.on("error", (e, c) => {
80 console.error(e);
81 worker.postMessage({
82 op: 3,
83 cursor: c,
84 });
85});
86
87let count = 0;
88
89jetstream.on("commit", async (e) => {
90 count++;
91 if (count >= 1024) {
92 count = 0;
93 worker.postMessage({
94 op: 3,
95 cursor: e.time_us,
96 });
97 }
98
99 const atUri: ResourceUri = `at://${e.did}/app.bsky.feed.post/${e.commit.rkey}`;
100 let pds;
101 try {
102 pds = await getPDS(e.did as DID);
103 } catch (e) {
104 console.error(e);
105 return;
106 }
107
108 if (!pds) {
109 console.error(`PDS not found for ${e.did}`);
110 return;
111 }
112
113 if (e.commit.operation === CommitType.Create) {
114 worker.postMessage({
115 op: 0,
116 atUri,
117 cid: e.commit.cid,
118 did: e.did,
119 pds,
120 });
121 } else if (e.commit.operation === CommitType.Delete) {
122 worker.postMessage({
123 op: 1,
124 atUri,
125 pds,
126 });
127 }
128});
129
130jetstream.on("identity", async (e) => {
131 const cached = getAuthor.get<Author>(e.did);
132 const pds = await getPDS(e.did as DID, true);
133 if (!pds || cached?.pds === pds) return;
134 const handler = simpleFetchHandler({ service: pds });
135 const rpc = new Client({ handler });
136 try {
137 const { records } = await ok(
138 rpc.get("com.atproto.repo.listRecords", {
139 params: {
140 repo: e.did,
141 collection: "app.bsky.feed.post",
142 },
143 })
144 );
145 worker.postMessage({
146 op: 2,
147 records,
148 did: e.did,
149 pds,
150 });
151 } catch (e) {
152 console.error(`Failed to backfill posts: ${e}`);
153 }
154});
155
156jetstream.start();
157
158export default {
159 fetch() {
160 return new Response("Pong!");
161 },
162} satisfies Deno.ServeDefaultExport;