A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
at main 142 lines 3.7 kB view raw
1import { LabelerServer } from "@skyware/labeler"; 2 3export class ActiveLabelsStorage { 4 private db: typeof LabelerServer.prototype.db; 5 6 /** 7 * Promise that resolves when database initialization is complete. 8 * This should be awaited before any database operations. 9 */ 10 private readonly dbInitLock?: Promise<void>; 11 12 constructor(db: typeof this.db) { 13 this.db = db; 14 15 this.dbInitLock = this.initializeDatabase(); 16 } 17 18 async initializeDatabase() { 19 await this.db.execute(` 20 CREATE TABLE IF NOT EXISTS active_labels ( 21 uri TEXT NOT NULL, 22 val TEXT NOT NULL, 23 PRIMARY KEY (uri, val) 24 ); 25 `); 26 await this.db.execute(` 27 CREATE INDEX IF NOT EXISTS idx_active_labels_uri ON active_labels (uri); 28 `); 29 } 30 31 async computeLabelsDiff(uri: string, desiredOutcome: string[]): Promise<{ 32 labelsToAdd: string[], 33 labelsToNegate: string[], 34 }> { 35 await this.dbInitLock; 36 37 const diff: { 38 labelsToAdd: string[], 39 labelsToNegate: string[], 40 } = { 41 labelsToAdd: [], 42 labelsToNegate: [], 43 }; 44 45 const desiredOutcomeSet = new Set(desiredOutcome); 46 47 const result = await this.db.execute({ 48 sql: ` 49 SELECT uri, val FROM active_labels 50 WHERE uri = ? 51 `, 52 args: [uri], 53 }); 54 55 const activeLabels = result.rows.map((row) => row.val as string); 56 const activeLabelSet = new Set(activeLabels); 57 58 for (const label of activeLabels) { 59 if (!desiredOutcomeSet.has(label)) { 60 diff.labelsToNegate.push(label); 61 } 62 } 63 64 for (const label of desiredOutcome) { 65 if (!activeLabelSet.has(label)) { 66 diff.labelsToAdd.push(label); 67 } 68 } 69 70 return diff; 71 } 72 73 async registerLabelActivation(uri: string, val: string) { 74 await this.dbInitLock; 75 76 const result = await this.db.execute({ 77 sql: ` 78 INSERT INTO active_labels (uri, val) 79 VALUES (?, ?) 80 `, 81 args: [uri, val], 82 }); 83 84 if (!result.rowsAffected) throw new Error("Failed to register label activation"); 85 } 86 87 async registerLabelDeactivation(uri: string, val: string) { 88 await this.dbInitLock; 89 90 const result = await this.db.execute({ 91 sql: ` 92 DELETE FROM active_labels 93 WHERE uri = ? AND val = ? 94 `, 95 args: [uri, val], 96 }); 97 98 if (!result.rowsAffected) throw new Error("Failed to register label deactivation"); 99 } 100 101 async countActiveLabelSubjects() { 102 await this.dbInitLock; 103 104 const result = await this.db.execute({ 105 sql: ` 106 SELECT COUNT (DISTINCT uri) AS count 107 FROM active_labels 108 `, 109 args: [], 110 }); 111 112 return result.rows[0]?.count as number ?? 0; 113 } 114 115 async* getActiveLabelSubjects() { 116 await this.dbInitLock; 117 118 let cursor = ""; 119 while (true) { 120 const result = await this.db.execute({ 121 sql: ` 122 SELECT DISTINCT uri 123 FROM active_labels 124 WHERE uri > ? 125 ORDER BY uri 126 LIMIT 100 127 `, 128 args: [cursor], 129 }); 130 131 if (!result.rows.length) { 132 return; 133 } 134 135 for (const row of result.rows) { 136 const v = row.uri as string; 137 yield v; 138 cursor = v; 139 } 140 } 141 } 142}