A couple of Bluesky feeds focused around PDSes

init

essem.space 267ee4b2

+5
.env.example
··· 1 + DB=./data.db 2 + 3 + JETSTREAM="wss://jetstream1.us-east.bsky.network/subscribe" 4 + HOSTNAME="feeds.example.com" 5 + PUBLISHER="did:plc:fasdv..."
+2
.gitignore
··· 1 + *.db* 2 + .env
+5
.vscode/extensions.json
··· 1 + { 2 + "recommendations": [ 3 + "denoland.vscode-deno" 4 + ] 5 + }
+6
.vscode/settings.json
··· 1 + { 2 + "deno.enablePaths": [ 3 + "./" 4 + ], 5 + "editor.inlayHints.enabled": "off" 6 + }
+12
README.md
··· 1 + # pds-feedgen 2 + Source code for the [Your PDS](https://bsky.app/profile/did:plc:ca4b3evcz7rjhni6mngjwfzl/feed/your-pds) and [ATmosphere Dwellers](https://bsky.app/profile/did:plc:ca4b3evcz7rjhni6mngjwfzl/feed/non-bsky-pds) feeds. 3 + 4 + Ingestion and feed generation are handled separately. To start the ingestor, run: 5 + ``` 6 + $ deno task start-ingest 7 + ``` 8 + 9 + To start the feed generator, run: 10 + ``` 11 + $ deno task start-feedgen 12 + ```
+61
common/db.ts
··· 1 + import { Database } from "@db/sqlite"; 2 + 3 + const schema = [ 4 + `CREATE TABLE posts ( 5 + uri VARCHAR PRIMARY KEY, 6 + cid VARCHAR NOT NULL, 7 + author VARCHAR NOT NULL, 8 + indexed_at VARCHAR NOT NULL 9 + ); 10 + CREATE TABLE authors ( 11 + did VARCHAR PRIMARY KEY, 12 + pds VARCHAR NOT NULL, 13 + pds_base VARCHAR NOT NULL 14 + );`, 15 + `CREATE INDEX authors_pds_idx ON authors (did, pds); 16 + CREATE INDEX posts_author_idx ON posts (uri, author, indexed_at DESC);`, 17 + `CREATE TABLE state ( 18 + id smallint PRIMARY KEY, 19 + cursor integer, 20 + CHECK(id = 1) 21 + ); 22 + INSERT INTO state (id) VALUES (1);`, 23 + ]; 24 + 25 + const db = new Database(Deno.env.get("DB") ?? ":memory:", { 26 + create: true, 27 + }); 28 + db.exec("PRAGMA journal_mode = WAL;"); 29 + db.exec("PRAGMA synchronous = NORMAL;"); 30 + db.exec("PRAGMA temp_store = memory;"); 31 + 32 + try { 33 + db.transaction(() => { 34 + let version: number; 35 + const result = db.prepare("PRAGMA user_version").get() as { 36 + user_version: number; 37 + }; 38 + version = result?.user_version ?? 0; 39 + const latestVersion = schema.length; 40 + if (version < latestVersion) { 41 + console.log( 42 + `Migrating database, which is currently at version ${version}...` 43 + ); 44 + while (version < latestVersion) { 45 + console.log(`Running version ${version} update script...`); 46 + db.exec(schema[version]); 47 + version++; 48 + } 49 + } else { 50 + return; 51 + } 52 + db.exec(`PRAGMA user_version = ${latestVersion}`); 53 + })(); 54 + } catch (e) { 55 + console.error(`Database migration failed:`); 56 + console.error(e); 57 + console.error("Unable to start, quitting now."); 58 + Deno.exit(1); 59 + } 60 + 61 + export { db };
+17
common/types.ts
··· 1 + import type { ResourceUri } from "@atcute/lexicons"; 2 + 3 + export type DID = `did:${"plc" | "web"}:${string}`; 4 + 5 + export type Post = { 6 + uri: ResourceUri; 7 + cid: string; 8 + indexed_at: string; 9 + author: string; 10 + pds?: string; 11 + }; 12 + 13 + export type Author = { 14 + did: DID; 15 + pds: string; 16 + pds_base: string; 17 + };
+16
deno.json
··· 1 + { 2 + "imports": { 3 + "@atcute/bluesky": "npm:@atcute/bluesky@^3.2.3", 4 + "@atcute/client": "npm:@atcute/client@^4.0.3", 5 + "@atcute/identity-resolver": "npm:@atcute/identity-resolver@^1.1.3", 6 + "@atcute/lexicons": "npm:@atcute/lexicons@^1.1.1", 7 + "@atcute/xrpc-server": "npm:@atcute/xrpc-server@^0.1.1", 8 + "@db/sqlite": "jsr:@db/sqlite@^0.12.0", 9 + "@skyware/jetstream": "npm:@skyware/jetstream@^0.2.5", 10 + "redis": "npm:redis@^5.8.2" 11 + }, 12 + "tasks": { 13 + "start-feedgen": "deno serve --no-prompt --env-file --allow-env --allow-ffi --allow-read --port=4000 feedgen.ts", 14 + "start-ingest": "deno run --no-prompt --env-file --allow-net --allow-env --allow-ffi --allow-read ingest.ts" 15 + } 16 + }
+240
deno.lock
··· 1 + { 2 + "version": "5", 3 + "specifiers": { 4 + "jsr:@db/sqlite@0.12": "0.12.0", 5 + "jsr:@denosaurs/plug@1": "1.1.0", 6 + "jsr:@std/assert@0.217": "0.217.0", 7 + "jsr:@std/encoding@1": "1.0.10", 8 + "jsr:@std/fmt@1": "1.0.8", 9 + "jsr:@std/fs@1": "1.0.19", 10 + "jsr:@std/internal@^1.0.9": "1.0.9", 11 + "jsr:@std/path@0.217": "0.217.0", 12 + "jsr:@std/path@1": "1.1.1", 13 + "jsr:@std/path@^1.1.1": "1.1.1", 14 + "npm:@atcute/bluesky@^3.2.3": "3.2.3", 15 + "npm:@atcute/client@^4.0.3": "4.0.3", 16 + "npm:@atcute/identity-resolver@^1.1.3": "1.1.3_@atcute+identity@1.1.0", 17 + "npm:@atcute/lexicons@^1.1.1": "1.1.1", 18 + "npm:@atcute/xrpc-server@~0.1.1": "0.1.1_@atcute+identity@1.1.0", 19 + "npm:@skyware/jetstream@~0.2.5": "0.2.5", 20 + "npm:redis@^5.8.2": "5.8.2_@redis+client@5.8.2" 21 + }, 22 + "jsr": { 23 + "@db/sqlite@0.12.0": { 24 + "integrity": "dd1ef7f621ad50fc1e073a1c3609c4470bd51edc0994139c5bf9851de7a6d85f", 25 + "dependencies": [ 26 + "jsr:@denosaurs/plug", 27 + "jsr:@std/path@0.217" 28 + ] 29 + }, 30 + "@denosaurs/plug@1.1.0": { 31 + "integrity": "eb2f0b7546c7bca2000d8b0282c54d50d91cf6d75cb26a80df25a6de8c4bc044", 32 + "dependencies": [ 33 + "jsr:@std/encoding", 34 + "jsr:@std/fmt", 35 + "jsr:@std/fs", 36 + "jsr:@std/path@1" 37 + ] 38 + }, 39 + "@std/assert@0.217.0": { 40 + "integrity": "c98e279362ca6982d5285c3b89517b757c1e3477ee9f14eb2fdf80a45aaa9642" 41 + }, 42 + "@std/encoding@1.0.10": { 43 + "integrity": "8783c6384a2d13abd5e9e87a7ae0520a30e9f56aeeaa3bdf910a3eaaf5c811a1" 44 + }, 45 + "@std/fmt@1.0.8": { 46 + "integrity": "71e1fc498787e4434d213647a6e43e794af4fd393ef8f52062246e06f7e372b7" 47 + }, 48 + "@std/fs@1.0.19": { 49 + "integrity": "051968c2b1eae4d2ea9f79a08a3845740ef6af10356aff43d3e2ef11ed09fb06", 50 + "dependencies": [ 51 + "jsr:@std/internal", 52 + "jsr:@std/path@^1.1.1" 53 + ] 54 + }, 55 + "@std/internal@1.0.9": { 56 + "integrity": "bdfb97f83e4db7a13e8faab26fb1958d1b80cc64366501af78a0aee151696eb8" 57 + }, 58 + "@std/path@0.217.0": { 59 + "integrity": "1217cc25534bca9a2f672d7fe7c6f356e4027df400c0e85c0ef3e4343bc67d11", 60 + "dependencies": [ 61 + "jsr:@std/assert" 62 + ] 63 + }, 64 + "@std/path@1.1.1": { 65 + "integrity": "fe00026bd3a7e6a27f73709b83c607798be40e20c81dde655ce34052fd82ec76", 66 + "dependencies": [ 67 + "jsr:@std/internal" 68 + ] 69 + } 70 + }, 71 + "npm": { 72 + "@atcute/atproto@3.1.4": { 73 + "integrity": "sha512-v0/ue7mZYtjYw4vWbtda51bLwW88mqsUQB8F/UZNO18ANAQWmKq1HDceVqjvruaLe2QPqE43XM3WkEyZ2FhOrA==", 74 + "dependencies": [ 75 + "@atcute/lexicons" 76 + ] 77 + }, 78 + "@atcute/bluesky@3.2.3": { 79 + "integrity": "sha512-IdPQQ54F1BLhW5z49k81ZUC/GQl/tVygZ+CzLHYvQySHA6GJRcvPzwEf8aV21u0SZOJF+yF4CWEGNgtryyxPmg==", 80 + "dependencies": [ 81 + "@atcute/atproto", 82 + "@atcute/lexicons" 83 + ] 84 + }, 85 + "@atcute/client@4.0.3": { 86 + "integrity": "sha512-RIOZWFVLca/HiPAAUDqQPOdOreCxTbL5cb+WUf5yqQOKIu5yEAP3eksinmlLmgIrlr5qVOE7brazUUzaskFCfw==", 87 + "dependencies": [ 88 + "@atcute/identity", 89 + "@atcute/lexicons" 90 + ] 91 + }, 92 + "@atcute/crypto@2.2.3": { 93 + "integrity": "sha512-jJI/8WDK6rKvpoUKi0C9Q7pjRRrHGGAagRxnFvpBM5ycZT9eABz7p309LmRKBCWLasmCs/qee8WK4dqOA2e7Dw==", 94 + "dependencies": [ 95 + "@atcute/multibase", 96 + "@atcute/uint8array", 97 + "@noble/secp256k1" 98 + ] 99 + }, 100 + "@atcute/identity-resolver@1.1.3_@atcute+identity@1.1.0": { 101 + "integrity": "sha512-KZgGgg99CWaV7Df3+h3X/WMrDzTPQVfsaoIVbTNLx2B56BvCL2EmaxPSVw/7BFUJMZHlVU4rtoEB4lyvNyMswA==", 102 + "dependencies": [ 103 + "@atcute/identity", 104 + "@atcute/lexicons", 105 + "@atcute/util-fetch", 106 + "@badrap/valita" 107 + ] 108 + }, 109 + "@atcute/identity@1.1.0": { 110 + "integrity": "sha512-6vRvRqJatDB+JUQsb+UswYmtBGQnSZcqC3a2y6H5DB/v5KcIh+6nFFtc17G0+3W9rxdk7k9M4KkgkdKf/YDNoQ==", 111 + "dependencies": [ 112 + "@atcute/lexicons", 113 + "@badrap/valita" 114 + ] 115 + }, 116 + "@atcute/lexicons@1.1.1": { 117 + "integrity": "sha512-k6qy5p3j9fJJ6ekaMPfEfp3ni4TW/XNuH9ZmsuwC0fi0tOjp+Fa8ZQakHwnqOzFt/cVBfGcmYE/lKNAbeTjgUg==", 118 + "dependencies": [ 119 + "esm-env" 120 + ] 121 + }, 122 + "@atcute/multibase@1.1.4": { 123 + "integrity": "sha512-NUf5AeeSOmuZHGU+4GAaMtISJoG+ZHtW/vUVA4lK/YDt/7LODAW0Fd0NNIIUPVUoW0xJS6zSEIWvwLLuxmEHhA==", 124 + "dependencies": [ 125 + "@atcute/uint8array" 126 + ] 127 + }, 128 + "@atcute/uint8array@1.0.3": { 129 + "integrity": "sha512-M/K+ihiVW8Pl2PFLzaC4E3l4JaZ1IH05Q0AbPWUC4cVHnd/gZ/1kAF5ngdtGvJeDMirHZ2VAy7OmAsPwR/2nlA==" 130 + }, 131 + "@atcute/util-fetch@1.0.2": { 132 + "dependencies": [ 133 + "@badrap/valita" 134 + ] 135 + }, 136 + "@atcute/xrpc-server@0.1.1_@atcute+identity@1.1.0": { 137 + "integrity": "sha512-pkr1yGtcCoApqELLjTIU7DO6SnVly5bAPbVswLHEM2xQfk6X4F/dKFDdPEsXwbRrSSr6leKgPwQPVcUpPQnQ4A==", 138 + "dependencies": [ 139 + "@atcute/crypto", 140 + "@atcute/identity", 141 + "@atcute/identity-resolver", 142 + "@atcute/lexicons", 143 + "@atcute/multibase", 144 + "@atcute/uint8array", 145 + "@badrap/valita", 146 + "nanoid" 147 + ] 148 + }, 149 + "@badrap/valita@0.4.6": { 150 + "integrity": "sha512-4kdqcjyxo/8RQ8ayjms47HCWZIF5981oE5nIenbfThKDxWXtEHKipAOWlflpPJzZx9y/JWYQkp18Awr7VuepFg==" 151 + }, 152 + "@noble/secp256k1@2.3.0": { 153 + "integrity": "sha512-0TQed2gcBbIrh7Ccyw+y/uZQvbJwm7Ao4scBUxqpBCcsOlZG0O4KGfjtNAy/li4W8n1xt3dxrwJ0beZ2h2G6Kw==" 154 + }, 155 + "@redis/bloom@5.8.2_@redis+client@5.8.2": { 156 + "integrity": "sha512-855DR0ChetZLarblio5eM0yLwxA9Dqq50t8StXKp5bAtLT0G+rZ+eRzzqxl37sPqQKjUudSYypz55o6nNhbz0A==", 157 + "dependencies": [ 158 + "@redis/client" 159 + ] 160 + }, 161 + "@redis/client@5.8.2": { 162 + "integrity": "sha512-WtMScno3+eBpTac1Uav2zugXEoXqaU23YznwvFgkPwBQVwEHTDgOG7uEAObtZ/Nyn8SmAMbqkEubJaMOvnqdsQ==", 163 + "dependencies": [ 164 + "cluster-key-slot" 165 + ] 166 + }, 167 + "@redis/json@5.8.2_@redis+client@5.8.2": { 168 + "integrity": "sha512-uxpVfas3I0LccBX9rIfDgJ0dBrUa3+0Gc8sEwmQQH0vHi7C1Rx1Qn8Nv1QWz5bohoeIXMICFZRcyDONvum2l/w==", 169 + "dependencies": [ 170 + "@redis/client" 171 + ] 172 + }, 173 + "@redis/search@5.8.2_@redis+client@5.8.2": { 174 + "integrity": "sha512-cNv7HlgayavCBXqPXgaS97DRPVWFznuzsAmmuemi2TMCx5scwLiP50TeZvUS06h/MG96YNPe6A0Zt57yayfxwA==", 175 + "dependencies": [ 176 + "@redis/client" 177 + ] 178 + }, 179 + "@redis/time-series@5.8.2_@redis+client@5.8.2": { 180 + "integrity": "sha512-g2NlHM07fK8H4k+613NBsk3y70R2JIM2dPMSkhIjl2Z17SYvaYKdusz85d7VYOrZBWtDrHV/WD2E3vGu+zni8A==", 181 + "dependencies": [ 182 + "@redis/client" 183 + ] 184 + }, 185 + "@skyware/jetstream@0.2.5": { 186 + "integrity": "sha512-fM/zs03DLwqRyzZZJFWN20e76KrdqIp97Tlm8Cek+vxn96+tu5d/fx79V6H85L0QN6HvGiX2l9A8hWFqHvYlOA==", 187 + "dependencies": [ 188 + "@atcute/atproto", 189 + "@atcute/bluesky", 190 + "@atcute/lexicons", 191 + "partysocket", 192 + "tiny-emitter" 193 + ] 194 + }, 195 + "cluster-key-slot@1.1.2": { 196 + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==" 197 + }, 198 + "esm-env@1.2.2": { 199 + "integrity": "sha512-Epxrv+Nr/CaL4ZcFGPJIYLWFom+YeV1DqMLHJoEd9SYRxNbaFruBwfEX/kkHUJf55j2+TUbmDcmuilbP1TmXHA==" 200 + }, 201 + "event-target-polyfill@0.0.4": { 202 + "integrity": "sha512-Gs6RLjzlLRdT8X9ZipJdIZI/Y6/HhRLyq9RdDlCsnpxr/+Nn6bU2EFGuC94GjxqhM+Nmij2Vcq98yoHrU8uNFQ==" 203 + }, 204 + "nanoid@5.1.5": { 205 + "integrity": "sha512-Ir/+ZpE9fDsNH0hQ3C68uyThDXzYcim2EqcZ8zn8Chtt1iylPT9xXJB0kPCnqzgcEGikO9RxSrh63MsmVCU7Fw==", 206 + "bin": true 207 + }, 208 + "partysocket@1.1.6": { 209 + "integrity": "sha512-LkEk8N9hMDDsDT0iDK0zuwUDFVrVMUXFXCeN3850Ng8wtjPqPBeJlwdeY6ROlJSEh3tPoTTasXoSBYH76y118w==", 210 + "dependencies": [ 211 + "event-target-polyfill" 212 + ] 213 + }, 214 + "redis@5.8.2_@redis+client@5.8.2": { 215 + "integrity": "sha512-31vunZj07++Y1vcFGcnNWEf5jPoTkGARgfWI4+Tk55vdwHxhAvug8VEtW7Cx+/h47NuJTEg/JL77zAwC6E0OeA==", 216 + "dependencies": [ 217 + "@redis/bloom", 218 + "@redis/client", 219 + "@redis/json", 220 + "@redis/search", 221 + "@redis/time-series" 222 + ] 223 + }, 224 + "tiny-emitter@2.1.0": { 225 + "integrity": "sha512-NB6Dk1A9xgQPMoGqC5CVXn123gWyte215ONT5Pp5a0yt4nlEoO1ZWeCwpncaekPHXO60i47ihFnZPiRPjRMq4Q==" 226 + } 227 + }, 228 + "workspace": { 229 + "dependencies": [ 230 + "jsr:@db/sqlite@0.12", 231 + "npm:@atcute/bluesky@^3.2.3", 232 + "npm:@atcute/client@^4.0.3", 233 + "npm:@atcute/identity-resolver@^1.1.3", 234 + "npm:@atcute/lexicons@^1.1.1", 235 + "npm:@atcute/xrpc-server@~0.1.1", 236 + "npm:@skyware/jetstream@~0.2.5", 237 + "npm:redis@^5.8.2" 238 + ] 239 + } 240 + }
+179
feedgen.ts
··· 1 + import { 2 + AppBskyFeedDescribeFeedGenerator, 3 + AppBskyFeedGetFeedSkeleton, 4 + } from "@atcute/bluesky"; 5 + import { 6 + CompositeDidDocumentResolver, 7 + PlcDidDocumentResolver, 8 + WebDidDocumentResolver, 9 + } from "@atcute/identity-resolver"; 10 + import { parseResourceUri, type ResourceUri } from "@atcute/lexicons/syntax"; 11 + import { 12 + AuthRequiredError, 13 + InvalidRequestError, 14 + XRPCRouter, 15 + json, 16 + } from "@atcute/xrpc-server"; 17 + import { ServiceJwtVerifier } from "@atcute/xrpc-server/auth"; 18 + import { cors } from "@atcute/xrpc-server/middlewares/cors"; 19 + import type { Statement } from "@db/sqlite"; 20 + 21 + import { db } from "./common/db.ts"; 22 + import type { DID, Post } from "./common/types.ts"; 23 + 24 + const publisher = Deno.env.get("PUBLISHER") ?? "did:example:bob"; 25 + const hostname = Deno.env.get("HOSTNAME"); 26 + if (!hostname) { 27 + console.error("HOSTNAME not provided! Exiting now."); 28 + Deno.exit(1); 29 + } 30 + 31 + const baseDID: DID = `did:web:${hostname}`; 32 + 33 + const app = new XRPCRouter({ middlewares: [cors()] }); 34 + const didResolver = new CompositeDidDocumentResolver({ 35 + methods: { 36 + plc: new PlcDidDocumentResolver(), 37 + web: new WebDidDocumentResolver(), 38 + }, 39 + }); 40 + const verifier = new ServiceJwtVerifier({ 41 + serviceDid: baseDID, 42 + resolver: didResolver, 43 + }); 44 + 45 + const feeds: Record< 46 + string, 47 + { default: Statement; cursor: Statement; pds: boolean } 48 + > = { 49 + "your-pds": { 50 + default: db.prepare( 51 + `SELECT a.uri, a.indexed_at FROM posts a 52 + INNER JOIN authors b ON a.author = b.did 53 + WHERE b.pds = ?1 54 + ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?2;` 55 + ), 56 + cursor: db.prepare( 57 + `SELECT a.uri, a.indexed_at FROM posts a 58 + INNER JOIN authors b ON a.author = b.did 59 + WHERE b.pds = ?1 60 + AND a.indexed_at < ?2 61 + ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?3;` 62 + ), 63 + pds: true, 64 + }, 65 + "non-bsky-pds": { 66 + default: db.prepare( 67 + `SELECT a.uri, a.indexed_at FROM posts a 68 + INNER JOIN authors b ON a.author = b.did 69 + WHERE b.pds_base != 'bsky.network' 70 + AND b.pds_base != 'brid.gy' 71 + ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?1;` 72 + ), 73 + cursor: db.prepare( 74 + `SELECT a.uri, a.indexed_at FROM posts a 75 + INNER JOIN authors b ON a.author = b.did 76 + WHERE b.pds_base != 'bsky.network' 77 + AND b.pds_base != 'brid.gy' 78 + AND a.indexed_at < ?1 79 + ORDER BY a.indexed_at DESC, a.cid DESC LIMIT ?2;` 80 + ), 81 + pds: false, 82 + }, 83 + }; 84 + 85 + app.add(AppBskyFeedGetFeedSkeleton.mainSchema, { 86 + async handler({ request, params: { feed, limit, cursor } }) { 87 + const feedUri = parseResourceUri(feed); 88 + if (!feedUri.ok || !feedUri.value.rkey) { 89 + throw new InvalidRequestError(); 90 + } 91 + 92 + const feedQuery = feeds[feedUri.value.rkey]; 93 + 94 + if ( 95 + feedUri.value.repo !== publisher || 96 + feedUri.value.collection !== "app.bsky.feed.generator" || 97 + !feedQuery 98 + ) { 99 + throw new InvalidRequestError({ 100 + error: "UnsupportedAlgorithm", 101 + description: "Unsupported algorithm", 102 + }); 103 + } 104 + 105 + let pds = ""; 106 + 107 + if (feedQuery.pds) { 108 + const authorization = request.headers.get("authorization") ?? ""; 109 + if (!authorization.startsWith("Bearer ")) { 110 + throw new AuthRequiredError(); 111 + } 112 + const jwt = authorization.replace("Bearer ", "").trim(); 113 + const parsed = await verifier.verify(jwt); 114 + if (!parsed.ok) { 115 + throw new AuthRequiredError(); 116 + } 117 + 118 + const resolved = await didResolver.resolve(parsed.value.issuer as DID); 119 + for (const service of resolved.service ?? []) { 120 + if ( 121 + service.type == "AtprotoPersonalDataServer" && 122 + typeof service.serviceEndpoint === "string" 123 + ) { 124 + pds = service.serviceEndpoint; 125 + } 126 + } 127 + if (typeof pds !== "string") 128 + throw new InvalidRequestError({ 129 + error: "NoServiceEndpoint", 130 + description: "No service endpoint", 131 + }); 132 + } 133 + 134 + let res: Post[]; 135 + if (cursor) { 136 + const timeStr = new Date(parseInt(cursor, 10)).toISOString(); 137 + if (feedQuery.pds) { 138 + res = feedQuery.cursor.all(pds, timeStr, limit); 139 + } else { 140 + res = feedQuery.cursor.all(timeStr, limit); 141 + } 142 + } else { 143 + if (feedQuery.pds) { 144 + res = feedQuery.default.all(pds, limit); 145 + } else { 146 + res = feedQuery.default.all(limit); 147 + } 148 + } 149 + 150 + const posts = res.map((row) => ({ 151 + post: row.uri, 152 + })); 153 + 154 + let cs: string | undefined; 155 + const last = res.at(-1); 156 + if (last) { 157 + cs = new Date(last.indexed_at).getTime().toString(10); 158 + } 159 + 160 + return json({ 161 + cursor: cs, 162 + feed: posts, 163 + }); 164 + }, 165 + }); 166 + 167 + app.add(AppBskyFeedDescribeFeedGenerator.mainSchema, { 168 + handler() { 169 + const feedArray = Object.keys(feeds).map((v) => ({ 170 + uri: `at://${publisher}/app.bsky.feed.generator/${v}` as ResourceUri, 171 + })); 172 + return json({ 173 + did: baseDID, 174 + feeds: feedArray, 175 + }); 176 + }, 177 + }); 178 + 179 + export default app;
+186
ingest.ts
··· 1 + import { 2 + CompositeDidDocumentResolver, 3 + PlcDidDocumentResolver, 4 + WebDidDocumentResolver, 5 + } from "@atcute/identity-resolver"; 6 + import type { ResourceUri } from "@atcute/lexicons/syntax"; 7 + import { Jetstream, CommitType } from "@skyware/jetstream"; 8 + 9 + import { createClient } from "redis"; 10 + 11 + import { db } from "./common/db.ts"; 12 + import { Post, type Author, type DID } from "./common/types.ts"; 13 + 14 + type ShallowPost = Omit<Post, "cid" | "indexed_at" | "author">; 15 + 16 + const postQueue: Post[] = []; 17 + const delQueue: ShallowPost[] = []; 18 + 19 + const redis = createClient(); 20 + redis.on("error", (err: Error) => console.error("Redis Client Error", err)); 21 + await redis.connect(); 22 + 23 + const insertPost = db.prepare( 24 + `INSERT INTO posts ("uri", "cid", "author", "indexed_at") VALUES (?1, ?2, ?3, ?4) ON CONFLICT DO NOTHING;` 25 + ); 26 + 27 + const insertPosts = db.transaction(async (posts: Post[]) => { 28 + for (const post of posts) { 29 + const dbResult = insertPost.run( 30 + post.uri, 31 + post.cid, 32 + post.author, 33 + post.indexed_at 34 + ); 35 + if (dbResult > 0) { 36 + const pdsKey = `posts:${post.pds}`; 37 + const length = await redis.lPush( 38 + pdsKey, 39 + `${post.uri};${post.indexed_at}` 40 + ); 41 + if (length > 30000) { 42 + const last = await redis.rPop(pdsKey); 43 + await redis.lTrim(pdsKey, 0, 29999); 44 + if (last) { 45 + const indexTime = last.split(";")[1]; 46 + if (indexTime?.trim()) { 47 + removePostByPDS.run(post.pds, indexTime); 48 + } 49 + } 50 + } 51 + } 52 + } 53 + }); 54 + 55 + const removePostByURL = db.prepare( 56 + `DELETE FROM posts WHERE uri = ?1 RETURNING indexed_at;` 57 + ); 58 + const removePostByPDS = db.prepare( 59 + `DELETE FROM posts WHERE ROWID IN (SELECT a.ROWID FROM posts a INNER JOIN authors b ON a.author = b.did WHERE b.pds = ?1 AND a.indexed_at < ?2);` 60 + ); 61 + 62 + const removePosts = db.transaction(async (posts: ShallowPost[]) => { 63 + for (const post of posts) { 64 + const dbResult = removePostByURL.get<Post>(post.uri); 65 + if (dbResult) 66 + await redis.lRem( 67 + `posts:${post.pds}`, 68 + 0, 69 + `${post.uri};${dbResult.indexed_at}` 70 + ); 71 + } 72 + }); 73 + 74 + const getAuthor = db.prepare("SELECT pds FROM authors WHERE did = ?"); 75 + const upsertAuthor = db.prepare( 76 + "INSERT OR REPLACE INTO authors (did, pds, pds_base) VALUES (?1, ?2, ?3)" 77 + ); 78 + 79 + const getCursor = db.prepare("SELECT cursor FROM state WHERE id = 1"); 80 + const updateCursor = db.prepare("UPDATE state SET cursor = ? WHERE id = 1"); 81 + 82 + const didResolver = new CompositeDidDocumentResolver({ 83 + methods: { 84 + plc: new PlcDidDocumentResolver(), 85 + web: new WebDidDocumentResolver(), 86 + }, 87 + }); 88 + 89 + async function getPDS(did: DID, ignoreCache = false) { 90 + let pds: string | undefined; 91 + 92 + if (!ignoreCache) { 93 + const author = getAuthor.get<Author>(did); 94 + if (author) pds = author.pds; 95 + } 96 + 97 + if (!pds) { 98 + const resolved = await didResolver.resolve(did); 99 + for (const service of resolved.service ?? []) { 100 + if ( 101 + service.type == "AtprotoPersonalDataServer" && 102 + typeof service.serviceEndpoint === "string" 103 + ) { 104 + upsertAuthor.run( 105 + did, 106 + service.serviceEndpoint, 107 + getPDSBase(service.serviceEndpoint) 108 + ); 109 + pds = service.serviceEndpoint; 110 + } 111 + } 112 + } 113 + 114 + return pds; 115 + } 116 + 117 + function getPDSBase(pds: string) { 118 + const url = new URL(pds); 119 + const splitDomain = url.hostname.split("."); 120 + return `${splitDomain[splitDomain.length - 2]}.${ 121 + splitDomain[splitDomain.length - 1] 122 + }`; 123 + } 124 + 125 + const dbCursor = getCursor.get<{ cursor?: string }>(); 126 + const cursor = dbCursor ? Number(dbCursor.cursor) : 0; 127 + const jetstream = new Jetstream({ 128 + wantedCollections: ["app.bsky.feed.post"], 129 + cursor: cursor - 10000000, // back up a bit for seamless playback 130 + endpoint: 131 + Deno.env.get("JETSTREAM") ?? 132 + "wss://jetstream1.us-east.bsky.network/subscribe", 133 + }); 134 + 135 + jetstream.on("open", () => console.log("Listening to the jetstream...")); 136 + 137 + jetstream.on("error", (e, c) => { 138 + console.error(e); 139 + updateCursor.run(c); 140 + }); 141 + 142 + let count = 0; 143 + 144 + jetstream.on("commit", async (e) => { 145 + count++; 146 + if (count >= 1024) { 147 + count = 0; 148 + updateCursor.run(e.time_us); 149 + } 150 + 151 + const atUri: ResourceUri = `at://${e.did}/app.bsky.feed.post/${e.commit.rkey}`; 152 + const pds = await getPDS(e.did as DID); 153 + 154 + if (!pds) { 155 + console.error(`PDS not found for ${e.did}`); 156 + return; 157 + } 158 + 159 + if (e.commit.operation === CommitType.Create) { 160 + const indexed_at = new Date().toISOString(); 161 + postQueue.push({ 162 + uri: atUri, 163 + cid: e.commit.cid, 164 + author: e.did, 165 + indexed_at, 166 + pds, 167 + }); 168 + if (postQueue.length > 127) { 169 + insertPosts.immediate(postQueue.splice(0, 128)); 170 + } 171 + } else if (e.commit.operation === CommitType.Delete) { 172 + delQueue.push({ 173 + uri: atUri, 174 + pds, 175 + }); 176 + if (delQueue.length > 63) { 177 + removePosts.immediate(delQueue.splice(0, 64)); 178 + } 179 + } 180 + }); 181 + 182 + jetstream.on("identity", async (e) => { 183 + await getPDS(e.did as DID, true); 184 + }); 185 + 186 + jetstream.start();