A couple of Bluesky feeds focused around PDSes
1import { createClient } from "redis";
2
3import { db } from "../common/db.ts";
4import type { Post } from "../common/types.ts";
5
6type ShallowPost = Omit<Post, "cid" | "indexed_at" | "author">;
7
8const redis = createClient();
9redis.on("error", (err) => console.log("Redis Client Error", err));
10await redis.connect();
11
12const insertPost = db.prepare(
13 `INSERT INTO posts ("uri", "cid", "author", "indexed_at") VALUES (?1, ?2, ?3, ?4) ON CONFLICT DO NOTHING;`
14);
15
16const lastPostTimes = new Map<string, string>();
17
18const insertPosts = db.transaction((posts: Post[]) => {
19 for (const post of posts) {
20 const changes = insertPost.run(
21 post.uri,
22 post.cid,
23 post.author,
24 post.indexed_at
25 );
26 if (changes > 0) {
27 const pdsKey = `posts:${post.pds}`;
28 redis
29 .lPush(pdsKey, `${post.uri};${post.indexed_at}`)
30 .then((length) => {
31 if (length > 30000) {
32 redis.lTrim(pdsKey, 0, 29999);
33 return redis.rPop(pdsKey);
34 }
35 })
36 .then((last) => {
37 if (last) {
38 const indexTime = last.split(";")[1];
39 if (indexTime?.trim() && post.pds) {
40 lastPostTimes.set(post.pds, indexTime);
41 }
42 }
43 });
44 }
45 }
46});
47
48const removePostByURL = db.prepare(
49 `DELETE FROM posts WHERE uri = ?1 RETURNING indexed_at, author;`
50);
51const removePostByPDS = db.prepare(
52 `DELETE FROM posts WHERE rowid IN (SELECT a.rowid FROM posts a INNER JOIN authors b ON a.author = b.did WHERE b.pds = ?1 AND a.indexed_at < ?2);`
53);
54
55const removePosts = db.transaction((posts: ShallowPost[]) => {
56 for (const post of posts) {
57 const dbResult = removePostByURL.get<Omit<Post, "uri" | "cid" | "pds">>(
58 post.uri
59 );
60 if (dbResult) {
61 redis.lRem(`posts:${post.pds}`, 0, `${post.uri};${dbResult.indexed_at}`);
62 }
63 }
64});
65
66const upsertAuthor = db.prepare(
67 "INSERT OR REPLACE INTO authors (did, pds, pds_base) VALUES (?1, ?2, ?3)"
68);
69
70const updateCursor = db.prepare("UPDATE state SET cursor = ? WHERE id = 1");
71
72setInterval(() => {
73 for (const [pds, time] of lastPostTimes) {
74 lastPostTimes.delete(pds);
75 removePostByPDS.run(pds, time);
76 }
77}, 60000);
78
79self.onmessage = (e: MessageEvent) => {
80 if (e.data.op === 0) {
81 const indexed_at = new Date().toISOString();
82 insertPosts.immediate([
83 {
84 uri: e.data.atUri,
85 cid: e.data.cid,
86 author: e.data.did,
87 indexed_at,
88 pds: e.data.pds,
89 },
90 ]);
91 } else if (e.data.op === 1) {
92 removePosts.immediate([
93 {
94 uri: e.data.atUri,
95 pds: e.data.pds,
96 },
97 ]);
98 } else if (e.data.op === 2) {
99 const posts = e.data.records.map((v) => ({
100 uri: v.uri,
101 cid: v.cid,
102 author: e.data.did,
103 indexed_at: v.value.createdAt ?? new Date().toISOString(),
104 pds: e.data.pds,
105 }));
106 insertPosts.immediate(posts);
107 } else if (e.data.op === 3) {
108 updateCursor.run(e.data.cursor);
109 } else if (e.data.op === 4) {
110 upsertAuthor.run(e.data.did, e.data.pds, e.data.pds_base);
111 }
112};