A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
1import { LabelerServer } from "@skyware/labeler";
2
3export type KnownPDS = {
4 uri: string,
5 lastCrawled: Date,
6 totalRepos: number,
7}
8
9export class KnownPDSStorage {
10 private db: typeof LabelerServer.prototype.db;
11
12 /**
13 * Promise that resolves when database initialization is complete.
14 * This should be awaited before any database operations.
15 */
16 private readonly dbInitLock?: Promise<void>;
17
18 constructor(db: typeof this.db) {
19 this.db = db;
20
21 this.dbInitLock = this.initializeDatabase();
22 }
23
24 async initializeDatabase() {
25 await this.db.execute(`
26 CREATE TABLE IF NOT EXISTS known_pds (
27 uri TEXT NOT NULL,
28 last_crawled DATETIME NOT NULL,
29 total_repos INTEGER NOT NULL,
30 PRIMARY KEY (uri)
31 );
32 `);
33 await this.db.execute(`
34 CREATE TABLE IF NOT EXISTS pds_crawling_failures (
35 uri TEXT NOT NULL,
36 last_attempt DATETIME NOT NULL,
37 PRIMARY KEY (uri)
38 );
39 `);
40 }
41
42 private rowToKnownPDS(row: any): KnownPDS {
43 const v: KnownPDS = {
44 uri: row.uri as string,
45 lastCrawled: new Date(row.last_crawled as string),
46 totalRepos: row.total_repos as number,
47 };
48 return v
49 }
50
51 async countKnownPDSs(): Promise<number> {
52 await this.dbInitLock;
53
54 const result = await this.db.execute({
55 sql: `
56 SELECT
57 COUNT(*) AS c
58 FROM known_pds
59 `,
60 });
61
62 if (!result.rows || !result.rows.length) {
63 return 0;
64 }
65
66 return result.rows[0]!.c as number;
67 }
68
69 async* getKnownPDSs(lastCrawledBefore: Date = new Date(), lastFailureBefore: Date = new Date()) {
70 await this.dbInitLock;
71
72 let cursor = "";
73 while (true) {
74 const result = await this.db.execute({
75 sql: `
76 SELECT
77 k.uri, k.last_crawled, k.total_repos
78 FROM known_pds k
79 WHERE
80 k.uri > ? AND
81 k.last_crawled < ? AND
82 k.uri NOT IN (
83 SELECT f.uri
84 FROM pds_crawling_failures f
85 WHERE f.last_attempt >= ?
86 )
87 ORDER BY uri
88 LIMIT 100
89 `,
90 args: [cursor, lastCrawledBefore.toISOString(), lastFailureBefore.toISOString()],
91 });
92
93 if (!result.rows.length) {
94 return;
95 }
96
97 for (const row of result.rows) {
98 const v = this.rowToKnownPDS(row);
99 yield v;
100 cursor = v.uri;
101 }
102 }
103 }
104
105 async* getBiggestKnownPDSs(limit: number) {
106 await this.dbInitLock;
107
108 const result = await this.db.execute({
109 sql: `
110 SELECT
111 uri, last_crawled, total_repos
112 FROM known_pds
113 ORDER BY total_repos DESC, uri
114 LIMIT ?
115 `,
116 args: [limit],
117 });
118
119 for (const row of result.rows) {
120 const v = this.rowToKnownPDS(row);
121 yield v;
122 }
123 }
124
125 async getKnownPDS(uri: string): Promise<KnownPDS | undefined> {
126 await this.dbInitLock;
127
128 const result = await this.db.execute({
129 sql: `
130 SELECT
131 uri, last_crawled, total_repos
132 FROM known_pds
133 WHERE uri = ?
134 `,
135 args: [uri],
136 });
137
138 if (!result.rows.length) {
139 return undefined;
140 }
141
142 return this.rowToKnownPDS(result.rows[0]);
143 }
144
145 async upsertKnownPDS(knownPDS: KnownPDS) {
146 await this.dbInitLock;
147
148 const transaction = await this.db.transaction("write");
149
150 try {
151 const result = await transaction.execute({
152 sql: `
153 INSERT INTO known_pds (uri, last_crawled, total_repos)
154 VALUES (?, ?, ?)
155 ON CONFLICT (uri) DO UPDATE SET
156 last_crawled = excluded.last_crawled,
157 total_repos = excluded.total_repos
158 `,
159 args: [knownPDS.uri, knownPDS.lastCrawled.toISOString(), knownPDS.totalRepos],
160 });
161
162 if (!result.rowsAffected) throw new Error("Failed to upsert known PDS");
163
164 await transaction.execute({
165 sql: `
166 DELETE FROM pds_crawling_failures
167 WHERE uri = ?
168 `,
169 args: [knownPDS.uri],
170 });
171 await transaction.commit();
172 } finally {
173 transaction.close();
174 }
175 }
176
177 async removeKnownPDS(knownPDSURI: string) {
178 await this.dbInitLock;
179
180 const transaction = await this.db.transaction("write");
181
182 try {
183 await transaction.execute({
184 sql: `
185 DELETE FROM known_pds
186 WHERE uri = ?
187 `,
188 args: [knownPDSURI],
189 });
190
191 await transaction.execute({
192 sql: `
193 DELETE FROM pds_crawling_failures
194 WHERE uri = ?
195 `,
196 args: [knownPDSURI],
197 });
198 await transaction.commit();
199 } finally {
200 transaction.close();
201 }
202 }
203
204 async countPDSCrawlingFailures(): Promise<number> {
205 await this.dbInitLock;
206
207 const result = await this.db.execute({
208 sql: `
209 SELECT
210 COUNT(*) AS c
211 FROM pds_crawling_failures
212 `,
213 });
214
215 if (!result.rows || !result.rows.length) {
216 return 0;
217 }
218
219 return result.rows[0]!.c as number;
220 }
221
222 async markPDSCrawlFailure(uri: string, at: Date) {
223 const result = await this.db.execute({
224 sql: `
225 INSERT INTO pds_crawling_failures (uri, last_attempt)
226 VALUES (?, ?)
227 ON CONFLICT (uri) DO UPDATE SET
228 last_attempt = excluded.last_attempt
229 `,
230 args: [uri, at.toISOString()],
231 });
232
233 if (!result.rowsAffected) throw new Error("Failed to upsert known PDS");
234 }
235
236 async hasPDSCrawlFailed(uri: string, since: Date): Promise<boolean> {
237 await this.dbInitLock;
238
239 const result = await this.db.execute({
240 sql: `
241 SELECT 1
242 FROM pds_crawling_failures
243 WHERE uri = ? AND last_attempt > ?
244 `,
245 args: [uri, since.toISOString()],
246 });
247
248 return result.rows.length > 0;
249 }
250}