A couple of Bluesky feeds focused around PDSes
at main 3.1 kB view raw
1import { createClient } from "redis"; 2 3import { db } from "../common/db.ts"; 4import type { Post } from "../common/types.ts"; 5 6type ShallowPost = Omit<Post, "cid" | "indexed_at" | "author">; 7 8const redis = createClient(); 9redis.on("error", (err) => console.log("Redis Client Error", err)); 10await redis.connect(); 11 12const insertPost = db.prepare( 13 `INSERT INTO posts ("uri", "cid", "author", "indexed_at") VALUES (?1, ?2, ?3, ?4) ON CONFLICT DO NOTHING;` 14); 15 16const lastPostTimes = new Map<string, string>(); 17 18const insertPosts = db.transaction((posts: Post[]) => { 19 for (const post of posts) { 20 const changes = insertPost.run( 21 post.uri, 22 post.cid, 23 post.author, 24 post.indexed_at 25 ); 26 if (changes > 0) { 27 const pdsKey = `posts:${post.pds}`; 28 redis 29 .lPush(pdsKey, `${post.uri};${post.indexed_at}`) 30 .then((length) => { 31 if (length > 30000) { 32 redis.lTrim(pdsKey, 0, 29999); 33 return redis.rPop(pdsKey); 34 } 35 }) 36 .then((last) => { 37 if (last) { 38 const indexTime = last.split(";")[1]; 39 if (indexTime?.trim() && post.pds) { 40 lastPostTimes.set(post.pds, indexTime); 41 } 42 } 43 }); 44 } 45 } 46}); 47 48const removePostByURL = db.prepare( 49 `DELETE FROM posts WHERE uri = ?1 RETURNING indexed_at, author;` 50); 51const removePostByPDS = db.prepare( 52 `DELETE FROM posts WHERE rowid IN (SELECT a.rowid FROM posts a INNER JOIN authors b ON a.author = b.did WHERE b.pds = ?1 AND a.indexed_at < ?2);` 53); 54 55const removePosts = db.transaction((posts: ShallowPost[]) => { 56 for (const post of posts) { 57 const dbResult = removePostByURL.get<Omit<Post, "uri" | "cid" | "pds">>( 58 post.uri 59 ); 60 if (dbResult) { 61 redis.lRem(`posts:${post.pds}`, 0, `${post.uri};${dbResult.indexed_at}`); 62 } 63 } 64}); 65 66const upsertAuthor = db.prepare( 67 "INSERT OR REPLACE INTO authors (did, pds, pds_base) VALUES (?1, ?2, ?3)" 68); 69 70const updateCursor = db.prepare("UPDATE state SET cursor = ? WHERE id = 1"); 71 72setInterval(() => { 73 for (const [pds, time] of lastPostTimes) { 74 lastPostTimes.delete(pds); 75 removePostByPDS.run(pds, time); 76 } 77}, 60000); 78 79self.onmessage = (e: MessageEvent) => { 80 if (e.data.op === 0) { 81 const indexed_at = new Date().toISOString(); 82 insertPosts.immediate([ 83 { 84 uri: e.data.atUri, 85 cid: e.data.cid, 86 author: e.data.did, 87 indexed_at, 88 pds: e.data.pds, 89 }, 90 ]); 91 } else if (e.data.op === 1) { 92 removePosts.immediate([ 93 { 94 uri: e.data.atUri, 95 pds: e.data.pds, 96 }, 97 ]); 98 } else if (e.data.op === 2) { 99 const posts = e.data.records.map((v) => ({ 100 uri: v.uri, 101 cid: v.cid, 102 author: e.data.did, 103 indexed_at: v.value.createdAt ?? new Date().toISOString(), 104 pds: e.data.pds, 105 })); 106 insertPosts.immediate(posts); 107 } else if (e.data.op === 3) { 108 updateCursor.run(e.data.cursor); 109 } else if (e.data.op === 4) { 110 upsertAuthor.run(e.data.did, e.data.pds, e.data.pds_base); 111 } 112};