A couple of Bluesky feeds focused around PDSes
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Switch to official redis lib

+58 -30
+2 -2
deno.json
··· 7 7 "@atcute/lexicons": "npm:@atcute/lexicons@^1.2.2", 8 8 "@atcute/xrpc-server": "npm:@atcute/xrpc-server@^0.1.2", 9 9 "@db/sqlite": "jsr:@db/sqlite@^0.12.0", 10 - "@iuioiua/redis": "jsr:@iuioiua/redis@^1.1.9", 11 - "@skyware/jetstream": "npm:@skyware/jetstream@^0.2.5" 10 + "@skyware/jetstream": "npm:@skyware/jetstream@^0.2.5", 11 + "redis": "npm:redis@^5.8.3" 12 12 }, 13 13 "tasks": { 14 14 "start-feedgen": "deno serve --no-prompt --env-file --allow-net --allow-env --allow-ffi --allow-read --port=4000 feedgen.ts",
+47 -7
deno.lock
··· 3 3 "specifiers": { 4 4 "jsr:@db/sqlite@0.12": "0.12.0", 5 5 "jsr:@denosaurs/plug@1": "1.1.0", 6 - "jsr:@iuioiua/redis@^1.1.9": "1.1.9", 7 6 "jsr:@std/assert@0.217": "0.217.0", 8 7 "jsr:@std/encoding@1": "1.0.10", 9 8 "jsr:@std/fmt@1": "1.0.8", ··· 18 17 "npm:@atcute/identity-resolver@^1.1.4": "1.1.4_@atcute+identity@1.1.1", 19 18 "npm:@atcute/lexicons@^1.2.2": "1.2.2", 20 19 "npm:@atcute/xrpc-server@~0.1.2": "0.1.2_@atcute+identity@1.1.1", 21 - "npm:@skyware/jetstream@~0.2.5": "0.2.5" 20 + "npm:@skyware/jetstream@~0.2.5": "0.2.5", 21 + "npm:redis@^5.8.3": "5.8.3_@redis+client@5.8.3" 22 22 }, 23 23 "jsr": { 24 24 "@db/sqlite@0.12.0": { ··· 36 36 "jsr:@std/fs", 37 37 "jsr:@std/path@1" 38 38 ] 39 - }, 40 - "@iuioiua/redis@1.1.9": { 41 - "integrity": "167d9031d0b8cf050d63756cf6b2d59872d9095784b2dff2b8a40adcf744c1e4" 42 39 }, 43 40 "@std/assert@0.217.0": { 44 41 "integrity": "c98e279362ca6982d5285c3b89517b757c1e3477ee9f14eb2fdf80a45aaa9642" ··· 158 155 "@noble/secp256k1@2.3.0": { 159 156 "integrity": "sha512-0TQed2gcBbIrh7Ccyw+y/uZQvbJwm7Ao4scBUxqpBCcsOlZG0O4KGfjtNAy/li4W8n1xt3dxrwJ0beZ2h2G6Kw==" 160 157 }, 158 + "@redis/bloom@5.8.3_@redis+client@5.8.3": { 159 + "integrity": "sha512-1eldTzHvdW3Oi0TReb8m1yiFt8ZwyF6rv1NpZyG5R4TpCwuAdKQetBKoCw7D96tNFgsVVd6eL+NaGZZCqhRg4g==", 160 + "dependencies": [ 161 + "@redis/client" 162 + ] 163 + }, 164 + "@redis/client@5.8.3": { 165 + "integrity": "sha512-MZVUE+l7LmMIYlIjubPosruJ9ltSLGFmJqsXApTqPLyHLjsJUSAbAJb/A3N34fEqean4ddiDkdWzNu4ZKPvRUg==", 166 + "dependencies": [ 167 + "cluster-key-slot" 168 + ] 169 + }, 170 + "@redis/json@5.8.3_@redis+client@5.8.3": { 171 + "integrity": "sha512-DRR09fy/u8gynHGJ4gzXYeM7D8nlS6EMv5o+h20ndTJiAc7RGR01fdk2FNjnn1Nz5PjgGGownF+s72bYG4nZKQ==", 172 + "dependencies": [ 173 + "@redis/client" 174 + ] 175 + }, 176 + "@redis/search@5.8.3_@redis+client@5.8.3": { 177 + "integrity": "sha512-EMIvEeGRR2I0BJEz4PV88DyCuPmMT1rDtznlsHY3cKSDcc9vj0Q411jUnX0iU2vVowUgWn/cpySKjpXdZ8m+5g==", 178 + "dependencies": [ 179 + "@redis/client" 180 + ] 181 + }, 182 + "@redis/time-series@5.8.3_@redis+client@5.8.3": { 183 + "integrity": "sha512-5Jwy3ilsUYQjzpE7WZ1lEeG1RkqQ5kHtwV1p8yxXHSEmyUbC/T/AVgyjMcm52Olj/Ov/mhDKjx6ndYUi14bXsw==", 184 + "dependencies": [ 185 + "@redis/client" 186 + ] 187 + }, 161 188 "@skyware/jetstream@0.2.5": { 162 189 "integrity": "sha512-fM/zs03DLwqRyzZZJFWN20e76KrdqIp97Tlm8Cek+vxn96+tu5d/fx79V6H85L0QN6HvGiX2l9A8hWFqHvYlOA==", 163 190 "dependencies": [ ··· 171 198 "@standard-schema/spec@1.0.0": { 172 199 "integrity": "sha512-m2bOd0f2RT9k8QJx1JN85cZYyH1RqFBdlwtkSlf4tBDYLCiiZnv1fIIwacK6cqwXavOydf0NPToMQgpKq+dVlA==" 173 200 }, 201 + "cluster-key-slot@1.1.2": { 202 + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==" 203 + }, 174 204 "esm-env@1.2.2": { 175 205 "integrity": "sha512-Epxrv+Nr/CaL4ZcFGPJIYLWFom+YeV1DqMLHJoEd9SYRxNbaFruBwfEX/kkHUJf55j2+TUbmDcmuilbP1TmXHA==" 176 206 }, ··· 187 217 "event-target-polyfill" 188 218 ] 189 219 }, 220 + "redis@5.8.3_@redis+client@5.8.3": { 221 + "integrity": "sha512-MfSrfV6+tEfTw8c4W0yFp6XWX8Il4laGU7Bx4kvW4uiYM1AuZ3KGqEGt1LdQHeD1nEyLpIWetZ/SpY3kkbgrYw==", 222 + "dependencies": [ 223 + "@redis/bloom", 224 + "@redis/client", 225 + "@redis/json", 226 + "@redis/search", 227 + "@redis/time-series" 228 + ] 229 + }, 190 230 "tiny-emitter@2.1.0": { 191 231 "integrity": "sha512-NB6Dk1A9xgQPMoGqC5CVXn123gWyte215ONT5Pp5a0yt4nlEoO1ZWeCwpncaekPHXO60i47ihFnZPiRPjRMq4Q==" 192 232 } ··· 194 234 "workspace": { 195 235 "dependencies": [ 196 236 "jsr:@db/sqlite@0.12", 197 - "jsr:@iuioiua/redis@^1.1.9", 198 237 "npm:@atcute/atproto@^3.1.7", 199 238 "npm:@atcute/bluesky@^3.2.7", 200 239 "npm:@atcute/client@^4.0.5", 201 240 "npm:@atcute/identity-resolver@^1.1.4", 202 241 "npm:@atcute/lexicons@^1.2.2", 203 242 "npm:@atcute/xrpc-server@~0.1.2", 204 - "npm:@skyware/jetstream@~0.2.5" 243 + "npm:@skyware/jetstream@~0.2.5", 244 + "npm:redis@^5.8.3" 205 245 ] 206 246 } 207 247 }
+9 -21
ingest.ts
··· 6 6 WebDidDocumentResolver, 7 7 } from "@atcute/identity-resolver"; 8 8 import type { ResourceUri } from "@atcute/lexicons/syntax"; 9 - import { RedisClient } from "@iuioiua/redis"; 9 + import { createClient } from "redis"; 10 10 import { Jetstream, CommitType } from "@skyware/jetstream"; 11 11 12 12 import type {} from "@atcute/atproto"; ··· 16 16 17 17 type ShallowPost = Omit<Post, "cid" | "indexed_at" | "author">; 18 18 19 - const redisConn = await Deno.connect({ port: 6379 }); 20 - const redisClient = new RedisClient(redisConn); 19 + const redis = createClient(); 20 + redis.on("error", (err) => console.log("Redis Client Error", err)); 21 + await redis.connect(); 21 22 22 23 const insertPost = db.prepare( 23 24 `INSERT INTO posts ("uri", "cid", "author", "indexed_at") VALUES (?1, ?2, ?3, ?4) ON CONFLICT DO NOTHING;` ··· 33 34 ); 34 35 if (changes > 0) { 35 36 const pdsKey = `posts:${post.pds}`; 36 - redisClient 37 - .sendCommand(["LPUSH", pdsKey, `${post.uri};${post.indexed_at}`]) 37 + redis 38 + .lPush(pdsKey, `${post.uri};${post.indexed_at}`) 38 39 .then((length) => { 39 - if (typeof length !== "number") { 40 - console.error(`Incorrect LPUSH return type! Got ${length}`); 41 - return; 42 - } 43 40 if (length > 30000) { 44 - redisClient.sendCommand(["LTRIM", pdsKey, 0, 29999]); // let it run in background 45 - return redisClient.sendCommand(["RPOP", pdsKey]); 41 + redis.lTrim(pdsKey, 0, 29999); 42 + return redis.rPop(pdsKey); 46 43 } 47 44 }) 48 45 .then((last) => { 49 46 if (last) { 50 - if (typeof last !== "string") { 51 - console.error(`Incorrect RPOP return type! Got ${last}`); 52 - return; 53 - } 54 47 const indexTime = last.split(";")[1]; 55 48 if (indexTime?.trim()) { 56 49 removePostByPDS.run(post.pds, indexTime); ··· 174 167 const dbResult = 175 168 removePostByURL.get<Omit<Post, "uri" | "cid" | "pds">>(atUri); 176 169 if (dbResult) { 177 - await redisClient.sendCommand([ 178 - "LREM", 179 - `posts:${pds}`, 180 - 0, 181 - `${atUri};${dbResult.indexed_at}`, 182 - ]); 170 + await redis.lRem(`posts:${pds}`, 0, `${atUri};${dbResult.indexed_at}`); 183 171 } 184 172 } 185 173 });