grain.social is a photo sharing platform built on atproto.
1import type { Label } from "$lexicon/types/com/atproto/label/defs.ts";
2import type { Labels } from "$lexicon/types/com/atproto/label/subscribeLabels.ts";
3import { cborEncode, noUndefinedVals } from "@atproto/common";
4import { Keypair, Secp256k1Keypair } from "@atproto/crypto";
5import { stringifyLex } from "@atproto/lexicon";
6import { assertEquals, assertExists } from "@std/assert";
7import { DatabaseSync } from "node:sqlite";
8import * as ui8 from "uint8arrays";
9
10if (import.meta.main) {
11 const cfg = await createConfig();
12 const db = createDb(cfg);
13 const modService = createModService(db);
14 const handler = createHandler(modService);
15
16 Deno.serve({
17 port: cfg.port,
18 onListen() {
19 console.log(`Listening on http://localhost:${cfg.port}`);
20 },
21 onError(err) {
22 console.error("Error occurred:", err);
23 return new Response("Internal Server Error", {
24 status: 500,
25 });
26 },
27 }, handler);
28
29 Deno.addSignalListener("SIGINT", () => {
30 console.log("Shutting down server...");
31 Deno.exit(0);
32 });
33}
34
35type Config = {
36 port: number;
37 databaseUrl: string;
38 signingKey: string;
39};
40
41export async function createConfig(): Promise<Config> {
42 return {
43 port: Number(Deno.env.get("MOD_SERVICE_PORT")) || 8080,
44 databaseUrl: Deno.env.get("MOD_SERVICE_DATABASE_URL") ?? ":memory:",
45 signingKey: Deno.env.get("MOD_SERVICE_SIGNING_KEY") ??
46 await createSigningKey(),
47 };
48}
49
50async function createSigningKey() {
51 const serviceKeypair = await Secp256k1Keypair.create({ exportable: true });
52 return ui8.toString(await serviceKeypair.export(), "hex");
53}
54
55// Track all connected WebSocket clients for label subscriptions
56const labelSubscribers = new Set<WebSocket>();
57
58export function broadcastLabel(label: LabelRow) {
59 // Only broadcast if this label is active (not negated, not expired, and latest)
60 // Gather all labels for this (src, uri, val) and check if this is the latest and not negated/expired
61 // For simplicity, assume label is already the latest for this key
62 if (label.exp && new Date(label.exp).getTime() < Date.now()) return;
63 if (label.neg) return;
64 const msg = stringifyLex({
65 seq: label.id,
66 labels: [formatLabel(label)],
67 } as Labels);
68 for (const ws of labelSubscribers) {
69 try {
70 ws.send(msg);
71 } catch (e) {
72 console.error("Error sending label to subscriber:", e);
73 labelSubscribers.delete(ws);
74 }
75 }
76}
77
78export async function handleSubscribeLabels(
79 req: Request,
80 modService: ReturnType<typeof createModService>,
81): Promise<Response> {
82 const { searchParams } = new URL(req.url);
83 const cursorParam = searchParams.get("cursor");
84 const cursor = cursorParam ? parseInt(cursorParam, 10) : 0;
85 if (cursorParam && Number.isNaN(cursor)) {
86 return new Response(
87 JSON.stringify({ error: "Cursor must be an integer" }),
88 { status: 400 },
89 );
90 }
91 const { response, socket } = Deno.upgradeWebSocket(req);
92 // On open, send all labels after the cursor (including negations and expired)
93 socket.onopen = () => {
94 try {
95 const { rows } = modService.getLabels({
96 patterns: [],
97 sources: [],
98 limit: 1000,
99 cursor,
100 });
101 // Send ALL rows, not just active (per ATProto spec)
102 for (const row of rows) {
103 const msg = stringifyLex({
104 seq: row.id,
105 labels: [formatLabel(row)],
106 } as Labels);
107 socket.send(msg);
108 }
109 labelSubscribers.add(socket);
110 const userAgent = req.headers.get("user-agent");
111 const origin = req.headers.get("origin");
112 console.log(
113 `New subscriber connected, total: ${labelSubscribers.size}, user-agent: ${userAgent}, origin: ${origin}, time: ${
114 new Date().toISOString()
115 }`,
116 );
117 } catch (e) {
118 console.error("Error sending initial labels:", e);
119 socket.close();
120 }
121 };
122 socket.onclose = () => {
123 labelSubscribers.delete(socket);
124 };
125 return response;
126}
127
128// Filter for active (non-negated, non-expired) labels
129// Used for API hydration only. For the WebSocket stream, send all labels (including negations and expired).
130function filterActiveLabels(labels: LabelRow[]): LabelRow[] {
131 const now = Date.now();
132 const latest: Record<string, LabelRow> = {};
133 for (const label of labels) {
134 if (label.exp && new Date(label.exp).getTime() < now) continue;
135 const key = `${label.src}|${label.uri}|${label.val}`;
136 if (!latest[key] || new Date(label.cts) > new Date(latest[key].cts)) {
137 latest[key] = label;
138 }
139 }
140 return Object.values(latest).filter((label) => !label.neg);
141}
142
143function createHandler(
144 modService: ReturnType<typeof createModService>,
145) {
146 return async (req: Request): Promise<Response> => {
147 const url = new URL(req.url);
148 const pathname = url.pathname;
149
150 if (
151 pathname === "/xrpc/com.atproto.label.subscribeLabels" &&
152 req.headers.get("upgrade")?.toLowerCase() === "websocket"
153 ) {
154 return await handleSubscribeLabels(req, modService);
155 } else if (
156 req.method === "GET" && pathname === "/xrpc/com.atproto.label.queryLabels"
157 ) {
158 // Parse query params
159 const uriPatternsParam = url.searchParams.getAll("uriPatterns");
160 const sourcesParam = url.searchParams.getAll("sources");
161 const cursorParam = url.searchParams.get("cursor");
162 const limitParam = url.searchParams.get("limit");
163
164 const uriPatterns: string[] = uriPatternsParam.length
165 ? uriPatternsParam
166 : [];
167 const sources: string[] = sourcesParam.length ? sourcesParam : [];
168 const cursor = cursorParam ? parseInt(cursorParam, 10) : 0;
169 if (cursorParam && Number.isNaN(cursor)) {
170 return new Response(
171 JSON.stringify({
172 error: "Cursor must be an integer",
173 }),
174 { status: 400 },
175 );
176 }
177 const limit = limitParam ? parseInt(limitParam, 10) : 50;
178 if (Number.isNaN(limit) || limit < 1 || limit > 250) {
179 return new Response(
180 JSON.stringify({
181 error: "Limit must be an integer between 1 and 250",
182 }),
183 { status: 400 },
184 );
185 }
186
187 // Handle wildcards and SQL LIKE
188 const patterns = uriPatterns.includes("*")
189 ? []
190 : uriPatterns.map((pattern) => {
191 pattern = pattern.replace(/%/g, "").replace(/_/g, "\\_");
192 const starIndex = pattern.indexOf("*");
193 if (starIndex === -1) return pattern;
194 if (starIndex !== pattern.length - 1) {
195 return undefined; // Only trailing wildcards supported
196 }
197 return pattern.slice(0, -1) + "%";
198 }).filter(Boolean) as string[];
199
200 const { rows: labelRows, nextCursor } = modService.getLabels({
201 patterns,
202 sources,
203 limit,
204 cursor,
205 });
206 const activeLabels = filterActiveLabels(labelRows);
207 const formattedRows = activeLabels.map(formatLabel);
208 return new Response(
209 stringifyLex({ cursor: nextCursor, labels: formattedRows }),
210 {
211 headers: { "content-type": "application/json" },
212 },
213 );
214 } else if (req.method === "GET" && pathname === "/health") {
215 // Use the modService's DB to check health
216 modService.getLabels({
217 patterns: [],
218 sources: [],
219 limit: 1,
220 cursor: 0,
221 });
222 return new Response(JSON.stringify({ version: "0.1.0" }), {
223 headers: { "content-type": "application/json" },
224 });
225 } else if (pathname.startsWith("/xrpc/")) {
226 return new Response("Method Not Implemented", { status: 501 });
227 }
228
229 return new Response("Not Found", { status: 404 });
230 };
231}
232
233export function createDb(cfg: Config) {
234 const db = new DatabaseSync(cfg.databaseUrl);
235
236 db.exec(`
237 PRAGMA journal_mode = WAL;
238
239 CREATE TABLE IF NOT EXISTS labels (
240 id INTEGER PRIMARY KEY AUTOINCREMENT,
241 src TEXT NOT NULL,
242 uri TEXT NOT NULL,
243 cid TEXT,
244 val TEXT NOT NULL,
245 neg BOOLEAN DEFAULT FALSE,
246 cts DATETIME NOT NULL,
247 exp DATETIME,
248 sig BLOB
249 );
250 `);
251
252 return db;
253}
254
255export type LabelRow = {
256 id: number;
257 src: string;
258 uri: string;
259 cid: string | null;
260 val: string;
261 neg: boolean;
262 cts: string;
263 exp: string | null;
264 sig: Uint8Array;
265};
266
267type UnsignedLabel = Omit<Label, "sig">;
268type SignedLabel = Label & { sig: Uint8Array };
269
270export function createModService(db: DatabaseSync) {
271 return {
272 insertLabel: (label: SignedLabel) => {
273 const result = db.prepare(
274 `INSERT INTO labels (src, uri, cid, val, neg, cts, exp, sig)
275 VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
276 ).run(
277 label.src,
278 label.uri,
279 label.cid ?? null,
280 label.val,
281 label.neg ? 1 : 0,
282 label.cts,
283 label.exp ?? null,
284 label.sig,
285 );
286 return result.lastInsertRowid as number;
287 },
288 getLabels: (opts: {
289 patterns: string[];
290 sources: string[];
291 limit: number;
292 cursor: number;
293 }): { rows: LabelRow[]; nextCursor: string } => {
294 const { patterns, sources, limit, cursor } = opts;
295 const conditions: string[] = [];
296 const params: (string | number | Uint8Array | null)[] = [];
297 if (patterns.length) {
298 conditions.push(
299 "(" + patterns.map(() => "uri LIKE ?").join(" OR ") + ")",
300 );
301 params.push(...patterns);
302 }
303 if (sources.length) {
304 conditions.push(`src IN (${sources.map(() => "?").join(", ")})`);
305 params.push(...sources);
306 }
307 if (cursor) {
308 conditions.push("id > ?");
309 params.push(cursor);
310 }
311 params.push(limit);
312 const whereClause = conditions.length
313 ? `WHERE ${conditions.join(" AND ")}`
314 : "";
315 const sql = `SELECT * FROM labels ${whereClause} ORDER BY id ASC LIMIT ?`;
316 const stmt = db.prepare(sql);
317 const rows = stmt.all(...params) as unknown[];
318 function rowToLabelRow(row: unknown): LabelRow {
319 if (typeof row !== "object" || row === null) {
320 throw new Error("Invalid row");
321 }
322 const r = row as Record<string, unknown>;
323 const src = typeof r.src === "string" ? r.src : "";
324 const uri = typeof r.uri === "string" ? r.uri : "";
325 let cid: string | null = null;
326 if (typeof r.cid === "string") cid = r.cid;
327 const val = typeof r.val === "string" ? r.val : "";
328 const cts = typeof r.cts === "string" ? r.cts : "";
329 let exp: string | null = null;
330 if (typeof r.exp === "string") exp = r.exp;
331 const neg = typeof r.neg === "boolean" ? r.neg : Number(r.neg) === 1;
332 let sig: Uint8Array;
333 if (r.sig instanceof Uint8Array) sig = r.sig;
334 else if (Array.isArray(r.sig)) sig = new Uint8Array(r.sig);
335 else sig = new Uint8Array();
336 const id = typeof r.id === "number" ? r.id : Number(r.id);
337 return { id, src, uri, cid, val, neg, cts, exp, sig };
338 }
339 const labelRows = rows.map(rowToLabelRow);
340 let nextCursor = "0";
341 if (rows.length > 0) {
342 const lastId = (rows[rows.length - 1] as Record<string, unknown>).id;
343 nextCursor = typeof lastId === "string" || typeof lastId === "number"
344 ? String(lastId)
345 : "0";
346 }
347 return { rows: labelRows, nextCursor };
348 },
349 };
350}
351
352export function createLabel(
353 cfg: Config,
354 modService: ReturnType<typeof createModService>,
355) {
356 return async (
357 label: UnsignedLabel,
358 ) => {
359 const serviceSigningKey = cfg.signingKey;
360 if (!serviceSigningKey) {
361 throw new Error("MOD_SERVICE_SIGNING_KEY is not set");
362 }
363 const signingKey = await Secp256k1Keypair.import(serviceSigningKey);
364
365 const signed = await signLabel(label, signingKey);
366
367 const id = modService.insertLabel(signed);
368 // Broadcast the label to all subscribers after insert
369 const labelRow: LabelRow = {
370 id: Number(id),
371 src: signed.src,
372 uri: signed.uri,
373 cid: signed.cid ?? null,
374 val: signed.val,
375 neg: signed.neg === true, // ensure boolean
376 cts: signed.cts,
377 exp: signed.exp ?? null,
378 sig: signed.sig,
379 };
380 broadcastLabel(labelRow);
381 };
382}
383
384const formatLabel = (row: LabelRow): Label => {
385 return noUndefinedVals(
386 {
387 ver: 1,
388 src: row.src,
389 uri: row.uri,
390 cid: row.cid === "" || row.cid === null ? undefined : row.cid,
391 val: row.val,
392 neg: row.neg === true ? true : undefined,
393 cts: row.cts,
394 exp: row.exp ?? undefined,
395 sig: row.sig ? new Uint8Array(row.sig) : undefined,
396 } satisfies Label,
397 ) as unknown as Label;
398};
399
400const signLabel = async (
401 label: Label,
402 signingKey: Keypair,
403): Promise<SignedLabel> => {
404 const { ver, src, uri, cid, val, neg, cts, exp } = label;
405 const reformatted = noUndefinedVals(
406 {
407 ver: ver ?? 1,
408 src,
409 uri,
410 cid,
411 val,
412 neg: neg === true ? true : undefined,
413 cts,
414 exp,
415 } satisfies Label,
416 ) as unknown as Label;
417
418 const bytes = cborEncode(reformatted);
419 const sig = await signingKey.sign(bytes);
420 return {
421 ...reformatted,
422 sig,
423 };
424};
425
426Deno.test("insertLabel inserts a signed label and returns an id", async () => {
427 const cfg = await createConfig();
428 const db = createDb(cfg);
429 const modService = createModService(db);
430
431 const label = {
432 src: "did:example:alice",
433 uri: "at://did:example:bob/app.bsky.feed.post/123",
434 val: "spam",
435 neg: false,
436 cts: new Date().toISOString(),
437 sig: new Uint8Array([1, 2, 3]),
438 };
439
440 const id = modService.insertLabel(label);
441 assertExists(id);
442
443 // Check that the label is in the database
444 const row = db.prepare("SELECT * FROM labels WHERE id = ?").get(id);
445 assertExists(row);
446 assertEquals(row.src, label.src);
447 assertEquals(row.uri, label.uri);
448 assertEquals(row.val, label.val);
449 assertEquals(row.neg, 0);
450 assertEquals(row.sig, label.sig);
451});
452
453Deno.test("signLabel produces a valid signature", async () => {
454 const cfg = await createConfig();
455 const keyHex = cfg.signingKey;
456 const signingKey = await Secp256k1Keypair.import(keyHex);
457
458 const label: Label = {
459 ver: 1,
460 src: "did:example:alice",
461 uri: "at://did:example:bob/app.bsky.feed.post/123",
462 val: "spam",
463 cts: new Date().toISOString(),
464 };
465
466 const signed = await signLabel(label, signingKey);
467
468 assertExists(signed.sig);
469 assertEquals(signed.src, label.src);
470 assertEquals(signed.uri, label.uri);
471 assertEquals(signed.val, label.val);
472});
473
474Deno.test("getLabels retrieves labels with filtering and pagination", async () => {
475 const cfg = await createConfig();
476 const db = createDb(cfg);
477 const modService = createModService(db);
478
479 // Insert some test labels
480 for (let i = 1; i <= 10; i++) {
481 modService.insertLabel({
482 src: "did:example:alice",
483 uri: `at://did:example:bob/app.bsky.feed.post/${i}`,
484 val: i % 2 === 0 ? "spam" : "scam",
485 neg: false,
486 cts: new Date().toISOString(),
487 sig: new Uint8Array([1, 2, 3]),
488 });
489 }
490
491 // Retrieve labels with limit and cursor
492 const { rows, nextCursor } = modService.getLabels({
493 patterns: ["at://did:example:bob/app.bsky.feed.post/%"],
494 sources: ["did:example:alice"],
495 limit: 5,
496 cursor: 0,
497 });
498
499 assertEquals(rows.length, 5);
500 assertEquals(nextCursor, "5");
501
502 // Retrieve next page
503 const { rows: nextRows } = modService.getLabels({
504 patterns: ["at://did:example:bob/app.bsky.feed.post/%"],
505 sources: ["did:example:alice"],
506 limit: 5,
507 cursor: parseInt(nextCursor, 10),
508 });
509
510 assertEquals(nextRows.length, 5);
511});
512
513export type ModService = ReturnType<typeof createModService>;