A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
1import { LabelerServer } from "@skyware/labeler";
2
3export class ActiveLabelsStorage {
4 private db: typeof LabelerServer.prototype.db;
5
6 /**
7 * Promise that resolves when database initialization is complete.
8 * This should be awaited before any database operations.
9 */
10 private readonly dbInitLock?: Promise<void>;
11
12 constructor(db: typeof this.db) {
13 this.db = db;
14
15 this.dbInitLock = this.initializeDatabase();
16 }
17
18 async initializeDatabase() {
19 await this.db.execute(`
20 CREATE TABLE IF NOT EXISTS active_labels (
21 uri TEXT NOT NULL,
22 val TEXT NOT NULL,
23 PRIMARY KEY (uri, val)
24 );
25 `);
26 await this.db.execute(`
27 CREATE INDEX IF NOT EXISTS idx_active_labels_uri ON active_labels (uri);
28 `);
29 }
30
31 async computeLabelsDiff(uri: string, desiredOutcome: string[]): Promise<{
32 labelsToAdd: string[],
33 labelsToNegate: string[],
34 }> {
35 await this.dbInitLock;
36
37 const diff: {
38 labelsToAdd: string[],
39 labelsToNegate: string[],
40 } = {
41 labelsToAdd: [],
42 labelsToNegate: [],
43 };
44
45 const desiredOutcomeSet = new Set(desiredOutcome);
46
47 const result = await this.db.execute({
48 sql: `
49 SELECT uri, val FROM active_labels
50 WHERE uri = ?
51 `,
52 args: [uri],
53 });
54
55 const activeLabels = result.rows.map((row) => row.val as string);
56 const activeLabelSet = new Set(activeLabels);
57
58 for (const label of activeLabels) {
59 if (!desiredOutcomeSet.has(label)) {
60 diff.labelsToNegate.push(label);
61 }
62 }
63
64 for (const label of desiredOutcome) {
65 if (!activeLabelSet.has(label)) {
66 diff.labelsToAdd.push(label);
67 }
68 }
69
70 return diff;
71 }
72
73 async registerLabelActivation(uri: string, val: string) {
74 await this.dbInitLock;
75
76 const result = await this.db.execute({
77 sql: `
78 INSERT INTO active_labels (uri, val)
79 VALUES (?, ?)
80 `,
81 args: [uri, val],
82 });
83
84 if (!result.rowsAffected) throw new Error("Failed to register label activation");
85 }
86
87 async registerLabelDeactivation(uri: string, val: string) {
88 await this.dbInitLock;
89
90 const result = await this.db.execute({
91 sql: `
92 DELETE FROM active_labels
93 WHERE uri = ? AND val = ?
94 `,
95 args: [uri, val],
96 });
97
98 if (!result.rowsAffected) throw new Error("Failed to register label deactivation");
99 }
100
101 async countActiveLabelSubjects() {
102 await this.dbInitLock;
103
104 const result = await this.db.execute({
105 sql: `
106 SELECT COUNT (DISTINCT uri) AS count
107 FROM active_labels
108 `,
109 args: [],
110 });
111
112 return result.rows[0]?.count as number ?? 0;
113 }
114
115 async* getActiveLabelSubjects() {
116 await this.dbInitLock;
117
118 let cursor = "";
119 while (true) {
120 const result = await this.db.execute({
121 sql: `
122 SELECT DISTINCT uri
123 FROM active_labels
124 WHERE uri > ?
125 ORDER BY uri
126 LIMIT 100
127 `,
128 args: [cursor],
129 });
130
131 if (!result.rows.length) {
132 return;
133 }
134
135 for (const row of result.rows) {
136 const v = row.uri as string;
137 yield v;
138 cursor = v;
139 }
140 }
141 }
142}