A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
38
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 250 lines 6.9 kB view raw
1import { LabelerServer } from "@skyware/labeler"; 2 3export type KnownPDS = { 4 uri: string, 5 lastCrawled: Date, 6 totalRepos: number, 7} 8 9export class KnownPDSStorage { 10 private db: typeof LabelerServer.prototype.db; 11 12 /** 13 * Promise that resolves when database initialization is complete. 14 * This should be awaited before any database operations. 15 */ 16 private readonly dbInitLock?: Promise<void>; 17 18 constructor(db: typeof this.db) { 19 this.db = db; 20 21 this.dbInitLock = this.initializeDatabase(); 22 } 23 24 async initializeDatabase() { 25 await this.db.execute(` 26 CREATE TABLE IF NOT EXISTS known_pds ( 27 uri TEXT NOT NULL, 28 last_crawled DATETIME NOT NULL, 29 total_repos INTEGER NOT NULL, 30 PRIMARY KEY (uri) 31 ); 32 `); 33 await this.db.execute(` 34 CREATE TABLE IF NOT EXISTS pds_crawling_failures ( 35 uri TEXT NOT NULL, 36 last_attempt DATETIME NOT NULL, 37 PRIMARY KEY (uri) 38 ); 39 `); 40 } 41 42 private rowToKnownPDS(row: any): KnownPDS { 43 const v: KnownPDS = { 44 uri: row.uri as string, 45 lastCrawled: new Date(row.last_crawled as string), 46 totalRepos: row.total_repos as number, 47 }; 48 return v 49 } 50 51 async countKnownPDSs(): Promise<number> { 52 await this.dbInitLock; 53 54 const result = await this.db.execute({ 55 sql: ` 56 SELECT 57 COUNT(*) AS c 58 FROM known_pds 59 `, 60 }); 61 62 if (!result.rows || !result.rows.length) { 63 return 0; 64 } 65 66 return result.rows[0]!.c as number; 67 } 68 69 async* getKnownPDSs(lastCrawledBefore: Date = new Date(), lastFailureBefore: Date = new Date()) { 70 await this.dbInitLock; 71 72 let cursor = ""; 73 while (true) { 74 const result = await this.db.execute({ 75 sql: ` 76 SELECT 77 k.uri, k.last_crawled, k.total_repos 78 FROM known_pds k 79 WHERE 80 k.uri > ? AND 81 k.last_crawled < ? AND 82 k.uri NOT IN ( 83 SELECT f.uri 84 FROM pds_crawling_failures f 85 WHERE f.last_attempt >= ? 86 ) 87 ORDER BY uri 88 LIMIT 100 89 `, 90 args: [cursor, lastCrawledBefore.toISOString(), lastFailureBefore.toISOString()], 91 }); 92 93 if (!result.rows.length) { 94 return; 95 } 96 97 for (const row of result.rows) { 98 const v = this.rowToKnownPDS(row); 99 yield v; 100 cursor = v.uri; 101 } 102 } 103 } 104 105 async* getBiggestKnownPDSs(limit: number) { 106 await this.dbInitLock; 107 108 const result = await this.db.execute({ 109 sql: ` 110 SELECT 111 uri, last_crawled, total_repos 112 FROM known_pds 113 ORDER BY total_repos DESC, uri 114 LIMIT ? 115 `, 116 args: [limit], 117 }); 118 119 for (const row of result.rows) { 120 const v = this.rowToKnownPDS(row); 121 yield v; 122 } 123 } 124 125 async getKnownPDS(uri: string): Promise<KnownPDS | undefined> { 126 await this.dbInitLock; 127 128 const result = await this.db.execute({ 129 sql: ` 130 SELECT 131 uri, last_crawled, total_repos 132 FROM known_pds 133 WHERE uri = ? 134 `, 135 args: [uri], 136 }); 137 138 if (!result.rows.length) { 139 return undefined; 140 } 141 142 return this.rowToKnownPDS(result.rows[0]); 143 } 144 145 async upsertKnownPDS(knownPDS: KnownPDS) { 146 await this.dbInitLock; 147 148 const transaction = await this.db.transaction("write"); 149 150 try { 151 const result = await transaction.execute({ 152 sql: ` 153 INSERT INTO known_pds (uri, last_crawled, total_repos) 154 VALUES (?, ?, ?) 155 ON CONFLICT (uri) DO UPDATE SET 156 last_crawled = excluded.last_crawled, 157 total_repos = excluded.total_repos 158 `, 159 args: [knownPDS.uri, knownPDS.lastCrawled.toISOString(), knownPDS.totalRepos], 160 }); 161 162 if (!result.rowsAffected) throw new Error("Failed to upsert known PDS"); 163 164 await transaction.execute({ 165 sql: ` 166 DELETE FROM pds_crawling_failures 167 WHERE uri = ? 168 `, 169 args: [knownPDS.uri], 170 }); 171 await transaction.commit(); 172 } finally { 173 transaction.close(); 174 } 175 } 176 177 async removeKnownPDS(knownPDSURI: string) { 178 await this.dbInitLock; 179 180 const transaction = await this.db.transaction("write"); 181 182 try { 183 await transaction.execute({ 184 sql: ` 185 DELETE FROM known_pds 186 WHERE uri = ? 187 `, 188 args: [knownPDSURI], 189 }); 190 191 await transaction.execute({ 192 sql: ` 193 DELETE FROM pds_crawling_failures 194 WHERE uri = ? 195 `, 196 args: [knownPDSURI], 197 }); 198 await transaction.commit(); 199 } finally { 200 transaction.close(); 201 } 202 } 203 204 async countPDSCrawlingFailures(): Promise<number> { 205 await this.dbInitLock; 206 207 const result = await this.db.execute({ 208 sql: ` 209 SELECT 210 COUNT(*) AS c 211 FROM pds_crawling_failures 212 `, 213 }); 214 215 if (!result.rows || !result.rows.length) { 216 return 0; 217 } 218 219 return result.rows[0]!.c as number; 220 } 221 222 async markPDSCrawlFailure(uri: string, at: Date) { 223 const result = await this.db.execute({ 224 sql: ` 225 INSERT INTO pds_crawling_failures (uri, last_attempt) 226 VALUES (?, ?) 227 ON CONFLICT (uri) DO UPDATE SET 228 last_attempt = excluded.last_attempt 229 `, 230 args: [uri, at.toISOString()], 231 }); 232 233 if (!result.rowsAffected) throw new Error("Failed to upsert known PDS"); 234 } 235 236 async hasPDSCrawlFailed(uri: string, since: Date): Promise<boolean> { 237 await this.dbInitLock; 238 239 const result = await this.db.execute({ 240 sql: ` 241 SELECT 1 242 FROM pds_crawling_failures 243 WHERE uri = ? AND last_attempt > ? 244 `, 245 args: [uri, since.toISOString()], 246 }); 247 248 return result.rows.length > 0; 249 } 250}