atproto user agency toolkit for individuals and groups
1/**
2 * SQLite-backed sync state storage for replication.
3 * Tracks per-DID sync progress separately from the repo.
4 */
5
6import type Database from "better-sqlite3";
7import type { SyncState, SyncHistoryRow, SyncTrigger, AggregateMetrics, DidMetrics } from "./types.js";
8
9export class SyncStorage {
10 constructor(private db: Database.Database) {}
11
12 /**
13 * Create the replication tables if they don't exist.
14 */
15 initSchema(): void {
16 this.db.exec(`
17 CREATE TABLE IF NOT EXISTS replication_state (
18 did TEXT PRIMARY KEY,
19 pds_endpoint TEXT NOT NULL,
20 peer_id TEXT,
21 peer_info_fetched_at TEXT,
22 last_sync_rev TEXT,
23 root_cid TEXT,
24 last_sync_at TEXT,
25 last_verified_at TEXT,
26 status TEXT NOT NULL DEFAULT 'pending',
27 error_message TEXT
28 );
29
30 CREATE TABLE IF NOT EXISTS replication_blocks (
31 did TEXT NOT NULL,
32 cid TEXT NOT NULL,
33 PRIMARY KEY (did, cid)
34 );
35 `);
36
37 // Firehose cursor table: stores the last-seen sequence number
38 // for resumption after restart.
39 this.db.exec(`
40 CREATE TABLE IF NOT EXISTS firehose_cursor (
41 key TEXT PRIMARY KEY DEFAULT 'cursor',
42 seq INTEGER NOT NULL,
43 updated_at TEXT NOT NULL DEFAULT (datetime('now'))
44 );
45 `);
46
47 // Blob tracking table: tracks replicated blob CIDs per DID.
48 this.db.exec(`
49 CREATE TABLE IF NOT EXISTS replication_blobs (
50 did TEXT NOT NULL,
51 cid TEXT NOT NULL,
52 fetched_at TEXT NOT NULL DEFAULT (datetime('now')),
53 PRIMARY KEY (did, cid)
54 );
55 `);
56
57 // Record paths table: tracks record paths per DID for challenge generation.
58 this.db.exec(`
59 CREATE TABLE IF NOT EXISTS replication_record_paths (
60 did TEXT NOT NULL,
61 record_path TEXT NOT NULL,
62 PRIMARY KEY (did, record_path)
63 );
64 `);
65
66 // Peer endpoints table: tracks which peers have which DIDs for P2P fallback fetch.
67 this.db.exec(`
68 CREATE TABLE IF NOT EXISTS peer_endpoints (
69 target_did TEXT NOT NULL,
70 peer_did TEXT NOT NULL,
71 pds_endpoint TEXT NOT NULL,
72 last_sync_rev TEXT,
73 discovered_at TEXT NOT NULL DEFAULT (datetime('now')),
74 PRIMARY KEY (target_did, peer_did)
75 );
76 `);
77
78 // Admin-added DIDs table: persists DIDs added via admin UI.
79 this.db.exec(`
80 CREATE TABLE IF NOT EXISTS admin_tracked_dids (
81 did TEXT PRIMARY KEY,
82 added_at TEXT NOT NULL DEFAULT (datetime('now')),
83 added_by TEXT
84 );
85 `);
86
87 // Incoming offers table: tracks offers received from other nodes.
88 this.db.exec(`
89 CREATE TABLE IF NOT EXISTS incoming_offers (
90 offerer_did TEXT NOT NULL,
91 subject_did TEXT NOT NULL,
92 offerer_pds_endpoint TEXT,
93 offerer_endpoint TEXT,
94 min_copies INTEGER NOT NULL DEFAULT 2,
95 interval_sec INTEGER NOT NULL DEFAULT 600,
96 priority INTEGER NOT NULL DEFAULT 50,
97 received_at TEXT NOT NULL DEFAULT (datetime('now')),
98 PRIMARY KEY (offerer_did, subject_did)
99 );
100 `);
101
102 // PLC mirror table: stores PLC operation logs for tracked DIDs.
103 this.db.exec(`
104 CREATE TABLE IF NOT EXISTS plc_mirror (
105 did TEXT PRIMARY KEY,
106 operations_json TEXT NOT NULL,
107 op_count INTEGER NOT NULL,
108 last_fetched_at TEXT NOT NULL,
109 last_op_created_at TEXT,
110 validated INTEGER NOT NULL DEFAULT 1,
111 is_tombstoned INTEGER NOT NULL DEFAULT 0
112 );
113 `);
114
115 // Lexicon index table: aggregates NSID usage across all replicated repos.
116 this.db.exec(`
117 CREATE TABLE IF NOT EXISTS lexicon_index (
118 nsid TEXT PRIMARY KEY,
119 first_seen_at TEXT NOT NULL,
120 last_seen_at TEXT NOT NULL,
121 record_count INTEGER NOT NULL DEFAULT 0,
122 repo_count INTEGER NOT NULL DEFAULT 0
123 );
124 `);
125
126 // Offered DIDs table: tracks DIDs we've offered to replicate
127 // but don't yet have mutual consent for.
128 this.db.exec(`
129 CREATE TABLE IF NOT EXISTS offered_dids (
130 did TEXT PRIMARY KEY,
131 pds_endpoint TEXT,
132 offered_at TEXT NOT NULL DEFAULT (datetime('now'))
133 );
134 `);
135
136 // Sync history table: logs each sync event with metrics.
137 this.db.exec(`
138 CREATE TABLE IF NOT EXISTS sync_history (
139 id INTEGER PRIMARY KEY AUTOINCREMENT,
140 did TEXT NOT NULL,
141 source_type TEXT NOT NULL,
142 trigger TEXT NOT NULL DEFAULT 'unknown',
143 started_at TEXT NOT NULL,
144 completed_at TEXT,
145 status TEXT NOT NULL DEFAULT 'in_progress',
146 error_message TEXT,
147 blocks_added INTEGER NOT NULL DEFAULT 0,
148 blobs_added INTEGER NOT NULL DEFAULT 0,
149 car_bytes INTEGER NOT NULL DEFAULT 0,
150 blob_bytes INTEGER NOT NULL DEFAULT 0,
151 duration_ms INTEGER,
152 rev TEXT,
153 root_cid TEXT,
154 incremental INTEGER NOT NULL DEFAULT 0
155 );
156 CREATE INDEX IF NOT EXISTS idx_sync_history_did ON sync_history(did, started_at DESC);
157 `);
158
159 // Migrations: add columns if missing (for existing databases)
160 const columns = this.db
161 .prepare("PRAGMA table_info(replication_state)")
162 .all() as Array<{ name: string }>;
163 if (!columns.some((c) => c.name === "root_cid")) {
164 this.db.exec(
165 "ALTER TABLE replication_state ADD COLUMN root_cid TEXT",
166 );
167 }
168 if (!columns.some((c) => c.name === "peer_multiaddrs")) {
169 this.db.exec(
170 "ALTER TABLE replication_state ADD COLUMN peer_multiaddrs TEXT",
171 );
172 }
173
174 // Migration: add size_bytes to replication_blocks and replication_blobs
175 const blockCols = this.db
176 .prepare("PRAGMA table_info(replication_blocks)")
177 .all() as Array<{ name: string }>;
178 if (!blockCols.some((c) => c.name === "size_bytes")) {
179 this.db.exec(
180 "ALTER TABLE replication_blocks ADD COLUMN size_bytes INTEGER NOT NULL DEFAULT 0",
181 );
182 }
183
184 const blobCols = this.db
185 .prepare("PRAGMA table_info(replication_blobs)")
186 .all() as Array<{ name: string }>;
187 if (!blobCols.some((c) => c.name === "size_bytes")) {
188 this.db.exec(
189 "ALTER TABLE replication_blobs ADD COLUMN size_bytes INTEGER NOT NULL DEFAULT 0",
190 );
191 }
192
193 // Migration: add needs_gc flag to replication_state
194 if (!columns.some((c) => c.name === "needs_gc")) {
195 this.db.exec(
196 "ALTER TABLE replication_state ADD COLUMN needs_gc INTEGER NOT NULL DEFAULT 0",
197 );
198 }
199
200 // Migration: add trigger column to sync_history
201 const syncHistoryCols = this.db
202 .prepare("PRAGMA table_info(sync_history)")
203 .all() as Array<{ name: string }>;
204 if (!syncHistoryCols.some((c) => c.name === "trigger")) {
205 this.db.exec(
206 "ALTER TABLE sync_history ADD COLUMN trigger TEXT NOT NULL DEFAULT 'unknown'",
207 );
208 }
209 }
210
211 /**
212 * Insert or update sync state for a DID.
213 */
214 upsertState(state: {
215 did: string;
216 pdsEndpoint: string;
217 peerId?: string | null;
218 status?: string;
219 }): void {
220 this.db
221 .prepare(
222 `INSERT INTO replication_state (did, pds_endpoint, peer_id, status)
223 VALUES (?, ?, ?, ?)
224 ON CONFLICT(did) DO UPDATE SET
225 pds_endpoint = excluded.pds_endpoint,
226 peer_id = COALESCE(excluded.peer_id, replication_state.peer_id)`,
227 )
228 .run(
229 state.did,
230 state.pdsEndpoint,
231 state.peerId ?? null,
232 state.status ?? "pending",
233 );
234 }
235
236 /**
237 * Get sync state for a single DID.
238 */
239 getState(did: string): SyncState | null {
240 const row = this.db
241 .prepare("SELECT * FROM replication_state WHERE did = ?")
242 .get(did) as Record<string, unknown> | undefined;
243 if (!row) return null;
244 return this.rowToState(row);
245 }
246
247 /**
248 * Get sync states for all tracked DIDs.
249 */
250 getAllStates(): SyncState[] {
251 const rows = this.db
252 .prepare("SELECT * FROM replication_state ORDER BY did")
253 .all() as Array<Record<string, unknown>>;
254 return rows.map((row) => this.rowToState(row));
255 }
256
257 /**
258 * Update sync progress after a successful sync.
259 */
260 updateSyncProgress(did: string, rev: string, rootCid?: string): void {
261 this.db
262 .prepare(
263 `UPDATE replication_state
264 SET last_sync_rev = ?, root_cid = COALESCE(?, root_cid),
265 last_sync_at = datetime('now'),
266 status = 'synced', error_message = NULL
267 WHERE did = ?`,
268 )
269 .run(rev, rootCid ?? null, did);
270 }
271
272 /**
273 * Update status and optionally set an error message.
274 */
275 updateStatus(
276 did: string,
277 status: SyncState["status"],
278 errorMessage?: string,
279 ): void {
280 this.db
281 .prepare(
282 `UPDATE replication_state
283 SET status = ?, error_message = ?
284 WHERE did = ?`,
285 )
286 .run(status, errorMessage ?? null, did);
287 }
288
289 /**
290 * Update cached peer info for a DID.
291 */
292 updatePeerInfo(did: string, peerId: string | null, multiaddrs?: string[]): void {
293 this.db
294 .prepare(
295 `UPDATE replication_state
296 SET peer_id = ?, peer_multiaddrs = ?, peer_info_fetched_at = datetime('now')
297 WHERE did = ?`,
298 )
299 .run(peerId, multiaddrs && multiaddrs.length > 0 ? JSON.stringify(multiaddrs) : null, did);
300 }
301
302 /**
303 * Clear cached peer info (e.g. on connection failure).
304 */
305 clearPeerInfo(did: string): void {
306 this.db
307 .prepare(
308 `UPDATE replication_state
309 SET peer_id = NULL, peer_multiaddrs = NULL, peer_info_fetched_at = NULL
310 WHERE did = ?`,
311 )
312 .run(did);
313 }
314
315 /**
316 * Update the last verified timestamp.
317 */
318 updateVerifiedAt(did: string): void {
319 this.db
320 .prepare(
321 `UPDATE replication_state
322 SET last_verified_at = datetime('now')
323 WHERE did = ?`,
324 )
325 .run(did);
326 }
327
328 /**
329 * Track block CIDs for a DID (batch insert, ignores duplicates).
330 */
331 trackBlocks(did: string, cids: string[]): void {
332 if (cids.length === 0) return;
333 const insert = this.db.prepare(
334 "INSERT OR IGNORE INTO replication_blocks (did, cid) VALUES (?, ?)",
335 );
336 const batch = this.db.transaction((items: string[]) => {
337 for (const cid of items) {
338 insert.run(did, cid);
339 }
340 });
341 batch(cids);
342 }
343
344 /**
345 * Get all tracked block CIDs for a DID.
346 */
347 getBlockCids(did: string): string[] {
348 const rows = this.db
349 .prepare("SELECT cid FROM replication_blocks WHERE did = ?")
350 .all(did) as Array<{ cid: string }>;
351 return rows.map((r) => r.cid);
352 }
353
354 /**
355 * Get the count of tracked blocks for a DID.
356 */
357 getBlockCount(did: string): number {
358 const row = this.db
359 .prepare(
360 "SELECT COUNT(*) as count FROM replication_blocks WHERE did = ?",
361 )
362 .get(did) as { count: number };
363 return row.count;
364 }
365
366 /**
367 * Clear all tracked blocks for a DID.
368 */
369 clearBlocks(did: string): void {
370 this.db
371 .prepare("DELETE FROM replication_blocks WHERE did = ?")
372 .run(did);
373 }
374
375 // ============================================
376 // Firehose cursor persistence
377 // ============================================
378
379 /**
380 * Save the firehose cursor (last processed sequence number).
381 */
382 saveFirehoseCursor(seq: number): void {
383 this.db
384 .prepare(
385 `INSERT INTO firehose_cursor (key, seq, updated_at)
386 VALUES ('cursor', ?, datetime('now'))
387 ON CONFLICT(key) DO UPDATE SET
388 seq = excluded.seq,
389 updated_at = datetime('now')`,
390 )
391 .run(seq);
392 }
393
394 /**
395 * Get the saved firehose cursor, or null if none exists.
396 */
397 getFirehoseCursor(): number | null {
398 const row = this.db
399 .prepare("SELECT seq FROM firehose_cursor WHERE key = 'cursor'")
400 .get() as { seq: number } | undefined;
401 return row?.seq ?? null;
402 }
403
404 /**
405 * Clear the firehose cursor (e.g., on full re-sync).
406 */
407 clearFirehoseCursor(): void {
408 this.db
409 .prepare("DELETE FROM firehose_cursor WHERE key = 'cursor'")
410 .run();
411 }
412
413 // ============================================
414 // Record path tracking (for challenge generation)
415 // ============================================
416
417 /**
418 * Track record paths for a DID (batch insert, ignores duplicates).
419 */
420 trackRecordPaths(did: string, paths: string[]): void {
421 if (paths.length === 0) return;
422 const insert = this.db.prepare(
423 "INSERT OR IGNORE INTO replication_record_paths (did, record_path) VALUES (?, ?)",
424 );
425 const batch = this.db.transaction((items: string[]) => {
426 for (const path of items) {
427 insert.run(did, path);
428 }
429 });
430 batch(paths);
431 }
432
433 /**
434 * Get all tracked record paths for a DID.
435 */
436 getRecordPaths(did: string): string[] {
437 const rows = this.db
438 .prepare(
439 "SELECT record_path FROM replication_record_paths WHERE did = ?",
440 )
441 .all(did) as Array<{ record_path: string }>;
442 return rows.map((r) => r.record_path);
443 }
444
445 /**
446 * Remove specific record paths for a DID (batch delete).
447 */
448 removeRecordPaths(did: string, paths: string[]): void {
449 if (paths.length === 0) return;
450 const remove = this.db.prepare(
451 "DELETE FROM replication_record_paths WHERE did = ? AND record_path = ?",
452 );
453 const batch = this.db.transaction((items: string[]) => {
454 for (const path of items) {
455 remove.run(did, path);
456 }
457 });
458 batch(paths);
459 }
460
461 /**
462 * Clear all tracked record paths for a DID.
463 */
464 clearRecordPaths(did: string): void {
465 this.db
466 .prepare("DELETE FROM replication_record_paths WHERE did = ?")
467 .run(did);
468 }
469
470 // ============================================
471 // Blob tracking (for replicated blob CIDs)
472 // ============================================
473
474 /**
475 * Track blob CIDs for a DID (batch insert, ignores duplicates).
476 */
477 trackBlobs(did: string, cids: string[]): void {
478 if (cids.length === 0) return;
479 const insert = this.db.prepare(
480 "INSERT OR IGNORE INTO replication_blobs (did, cid) VALUES (?, ?)",
481 );
482 const batch = this.db.transaction((items: string[]) => {
483 for (const cid of items) {
484 insert.run(did, cid);
485 }
486 });
487 batch(cids);
488 }
489
490 /**
491 * Check if a blob CID has been fetched for a DID.
492 */
493 hasBlobCid(did: string, cid: string): boolean {
494 const row = this.db
495 .prepare(
496 "SELECT 1 FROM replication_blobs WHERE did = ? AND cid = ?",
497 )
498 .get(did, cid);
499 return row !== undefined;
500 }
501
502 /**
503 * Get all tracked blob CIDs for a DID.
504 */
505 getBlobCids(did: string): string[] {
506 const rows = this.db
507 .prepare("SELECT cid FROM replication_blobs WHERE did = ?")
508 .all(did) as Array<{ cid: string }>;
509 return rows.map((r) => r.cid);
510 }
511
512 /**
513 * Get the count of tracked blobs for a DID.
514 */
515 getBlobCount(did: string): number {
516 const row = this.db
517 .prepare(
518 "SELECT COUNT(*) as count FROM replication_blobs WHERE did = ?",
519 )
520 .get(did) as { count: number };
521 return row.count;
522 }
523
524 // ============================================
525 // Peer endpoint tracking (for P2P fallback fetch)
526 // ============================================
527
528 /**
529 * Upsert a peer endpoint entry for a target DID.
530 */
531 upsertPeerEndpoint(
532 targetDid: string,
533 peerDid: string,
534 pdsEndpoint: string,
535 lastSyncRev: string | null,
536 ): void {
537 this.db
538 .prepare(
539 `INSERT INTO peer_endpoints (target_did, peer_did, pds_endpoint, last_sync_rev)
540 VALUES (?, ?, ?, ?)
541 ON CONFLICT(target_did, peer_did) DO UPDATE SET
542 pds_endpoint = excluded.pds_endpoint,
543 last_sync_rev = excluded.last_sync_rev,
544 discovered_at = datetime('now')`,
545 )
546 .run(targetDid, peerDid, pdsEndpoint, lastSyncRev);
547 }
548
549 /**
550 * Get all known peer endpoints for a target DID.
551 */
552 getPeerEndpoints(
553 targetDid: string,
554 ): Array<{ peerDid: string; pdsEndpoint: string; lastSyncRev: string | null }> {
555 const rows = this.db
556 .prepare(
557 "SELECT peer_did, pds_endpoint, last_sync_rev FROM peer_endpoints WHERE target_did = ?",
558 )
559 .all(targetDid) as Array<{
560 peer_did: string;
561 pds_endpoint: string;
562 last_sync_rev: string | null;
563 }>;
564 return rows.map((r) => ({
565 peerDid: r.peer_did,
566 pdsEndpoint: r.pds_endpoint,
567 lastSyncRev: r.last_sync_rev,
568 }));
569 }
570
571 /**
572 * Clear all peer endpoint entries for a target DID.
573 */
574 clearPeerEndpoints(targetDid: string): void {
575 this.db
576 .prepare("DELETE FROM peer_endpoints WHERE target_did = ?")
577 .run(targetDid);
578 }
579
580 /**
581 * Look up the first available multiaddr for a PDS endpoint.
582 * Searches replication_state rows matching the given PDS endpoint,
583 * returning the first multiaddr that contains a /p2p/ component (peer ID).
584 */
585 getMultiaddrForPdsEndpoint(pdsEndpoint: string): string | null {
586 const rows = this.db
587 .prepare(
588 `SELECT peer_multiaddrs FROM replication_state
589 WHERE pds_endpoint = ? AND peer_multiaddrs IS NOT NULL
590 LIMIT 5`,
591 )
592 .all(pdsEndpoint) as Array<{ peer_multiaddrs: string }>;
593
594 for (const row of rows) {
595 try {
596 const addrs = JSON.parse(row.peer_multiaddrs) as string[];
597 // Prefer multiaddrs that include /p2p/ (have peer ID)
598 const withPeerId = addrs.find((a) => a.includes("/p2p/"));
599 if (withPeerId) return withPeerId;
600 // Fall back to first addr if none have /p2p/
601 if (addrs.length > 0) return addrs[0]!;
602 } catch {
603 // Malformed JSON, skip
604 }
605 }
606 return null;
607 }
608
609 // ============================================
610 // Admin-added DID management
611 // ============================================
612
613 /**
614 * Add a DID to the admin-tracked list (idempotent).
615 */
616 addAdminDid(did: string): void {
617 this.db
618 .prepare(
619 "INSERT OR IGNORE INTO admin_tracked_dids (did) VALUES (?)",
620 )
621 .run(did);
622 }
623
624 /**
625 * Remove a DID from the admin-tracked list.
626 * Returns true if the DID was actually removed.
627 */
628 removeAdminDid(did: string): boolean {
629 const result = this.db
630 .prepare("DELETE FROM admin_tracked_dids WHERE did = ?")
631 .run(did);
632 return result.changes > 0;
633 }
634
635 /**
636 * Get all admin-added DIDs.
637 */
638 getAdminDids(): string[] {
639 const rows = this.db
640 .prepare("SELECT did FROM admin_tracked_dids ORDER BY added_at")
641 .all() as Array<{ did: string }>;
642 return rows.map((r) => r.did);
643 }
644
645 /**
646 * Check if a DID was added via the admin interface.
647 */
648 isAdminDid(did: string): boolean {
649 const row = this.db
650 .prepare("SELECT 1 FROM admin_tracked_dids WHERE did = ?")
651 .get(did);
652 return row !== undefined;
653 }
654
655 // ============================================
656 // Offered DID management
657 // ============================================
658
659 /**
660 * Add a DID to the offered list (idempotent).
661 */
662 addOfferedDid(did: string, pdsEndpoint: string | null): void {
663 this.db
664 .prepare(
665 `INSERT INTO offered_dids (did, pds_endpoint)
666 VALUES (?, ?)
667 ON CONFLICT(did) DO UPDATE SET
668 pds_endpoint = COALESCE(excluded.pds_endpoint, offered_dids.pds_endpoint)`,
669 )
670 .run(did, pdsEndpoint);
671 }
672
673 /**
674 * Get all offered DIDs with their PDS endpoints.
675 */
676 getOfferedDids(): Array<{ did: string; pdsEndpoint: string | null; offeredAt: string }> {
677 const rows = this.db
678 .prepare("SELECT did, pds_endpoint, offered_at FROM offered_dids ORDER BY offered_at")
679 .all() as Array<{ did: string; pds_endpoint: string | null; offered_at: string }>;
680 return rows.map((r) => ({
681 did: r.did,
682 pdsEndpoint: r.pds_endpoint,
683 offeredAt: r.offered_at,
684 }));
685 }
686
687 /**
688 * Remove a DID from the offered list.
689 * Returns true if the DID was actually removed.
690 */
691 removeOfferedDid(did: string): boolean {
692 const result = this.db
693 .prepare("DELETE FROM offered_dids WHERE did = ?")
694 .run(did);
695 return result.changes > 0;
696 }
697
698 /**
699 * Check if a DID is in the offered list.
700 */
701 isOfferedDid(did: string): boolean {
702 const row = this.db
703 .prepare("SELECT 1 FROM offered_dids WHERE did = ?")
704 .get(did);
705 return row !== undefined;
706 }
707
708 // ============================================
709 // Incoming offer management
710 // ============================================
711
712 /**
713 * Add or update an incoming offer (idempotent upsert).
714 */
715 addIncomingOffer(offer: {
716 offererDid: string;
717 subjectDid: string;
718 offererPdsEndpoint?: string | null;
719 offererEndpoint?: string | null;
720 minCopies?: number;
721 intervalSec?: number;
722 priority?: number;
723 }): void {
724 this.db
725 .prepare(
726 `INSERT INTO incoming_offers (offerer_did, subject_did, offerer_pds_endpoint, offerer_endpoint, min_copies, interval_sec, priority)
727 VALUES (?, ?, ?, ?, ?, ?, ?)
728 ON CONFLICT(offerer_did, subject_did) DO UPDATE SET
729 offerer_pds_endpoint = COALESCE(excluded.offerer_pds_endpoint, incoming_offers.offerer_pds_endpoint),
730 offerer_endpoint = COALESCE(excluded.offerer_endpoint, incoming_offers.offerer_endpoint),
731 min_copies = excluded.min_copies,
732 interval_sec = excluded.interval_sec,
733 priority = excluded.priority,
734 received_at = datetime('now')`,
735 )
736 .run(
737 offer.offererDid,
738 offer.subjectDid,
739 offer.offererPdsEndpoint ?? null,
740 offer.offererEndpoint ?? null,
741 offer.minCopies ?? 2,
742 offer.intervalSec ?? 600,
743 offer.priority ?? 50,
744 );
745 }
746
747 /**
748 * Get all incoming offers.
749 */
750 getIncomingOffers(): Array<{
751 offererDid: string;
752 subjectDid: string;
753 offererPdsEndpoint: string | null;
754 offererEndpoint: string | null;
755 minCopies: number;
756 intervalSec: number;
757 priority: number;
758 receivedAt: string;
759 }> {
760 const rows = this.db
761 .prepare("SELECT * FROM incoming_offers ORDER BY received_at DESC")
762 .all() as Array<Record<string, unknown>>;
763 return rows.map((r) => ({
764 offererDid: r.offerer_did as string,
765 subjectDid: r.subject_did as string,
766 offererPdsEndpoint: (r.offerer_pds_endpoint as string) ?? null,
767 offererEndpoint: (r.offerer_endpoint as string) ?? null,
768 minCopies: r.min_copies as number,
769 intervalSec: r.interval_sec as number,
770 priority: r.priority as number,
771 receivedAt: r.received_at as string,
772 }));
773 }
774
775 /**
776 * Remove an incoming offer.
777 * Returns true if the offer was actually removed.
778 */
779 removeIncomingOffer(offererDid: string, subjectDid: string): boolean {
780 const result = this.db
781 .prepare("DELETE FROM incoming_offers WHERE offerer_did = ? AND subject_did = ?")
782 .run(offererDid, subjectDid);
783 return result.changes > 0;
784 }
785
786 /**
787 * Check if an incoming offer exists.
788 */
789 hasIncomingOffer(offererDid: string, subjectDid: string): boolean {
790 const row = this.db
791 .prepare("SELECT 1 FROM incoming_offers WHERE offerer_did = ? AND subject_did = ?")
792 .get(offererDid, subjectDid);
793 return row !== undefined;
794 }
795
796 /**
797 * Delete sync state for a DID.
798 */
799 deleteState(did: string): void {
800 this.db
801 .prepare("DELETE FROM replication_state WHERE did = ?")
802 .run(did);
803 }
804
805 /**
806 * Clear all tracked blobs for a DID.
807 */
808 clearBlobs(did: string): void {
809 this.db
810 .prepare("DELETE FROM replication_blobs WHERE did = ?")
811 .run(did);
812 }
813
814 // ============================================
815 // Block/blob GC and orphan detection
816 // ============================================
817
818 /**
819 * Remove specific block tracking entries for a DID (batch delete).
820 */
821 removeBlocks(did: string, cids: string[]): void {
822 if (cids.length === 0) return;
823 const remove = this.db.prepare(
824 "DELETE FROM replication_blocks WHERE did = ? AND cid = ?",
825 );
826 const batch = this.db.transaction((items: string[]) => {
827 for (const cid of items) {
828 remove.run(did, cid);
829 }
830 });
831 batch(cids);
832 }
833
834 /**
835 * Find CIDs that have zero remaining references across all DIDs.
836 * Given a list of CID strings, returns those with no rows in replication_blocks.
837 */
838 findOrphanedCids(cids: string[]): string[] {
839 if (cids.length === 0) return [];
840 const orphaned: string[] = [];
841 const check = this.db.prepare(
842 "SELECT 1 FROM replication_blocks WHERE cid = ? LIMIT 1",
843 );
844 for (const cid of cids) {
845 const row = check.get(cid);
846 if (!row) orphaned.push(cid);
847 }
848 return orphaned;
849 }
850
851 /**
852 * Remove specific blob tracking entries for a DID (batch delete).
853 */
854 removeBlobs(did: string, cids: string[]): void {
855 if (cids.length === 0) return;
856 const remove = this.db.prepare(
857 "DELETE FROM replication_blobs WHERE did = ? AND cid = ?",
858 );
859 const batch = this.db.transaction((items: string[]) => {
860 for (const cid of items) {
861 remove.run(did, cid);
862 }
863 });
864 batch(cids);
865 }
866
867 /**
868 * Find blob CIDs that have zero remaining references across all DIDs.
869 */
870 findOrphanedBlobCids(cids: string[]): string[] {
871 if (cids.length === 0) return [];
872 const orphaned: string[] = [];
873 const check = this.db.prepare(
874 "SELECT 1 FROM replication_blobs WHERE cid = ? LIMIT 1",
875 );
876 for (const cid of cids) {
877 const row = check.get(cid);
878 if (!row) orphaned.push(cid);
879 }
880 return orphaned;
881 }
882
883 /**
884 * Get all tracked block CIDs for a DID as a Set for efficient diffing.
885 */
886 getBlockCidSet(did: string): Set<string> {
887 return new Set(this.getBlockCids(did));
888 }
889
890 /**
891 * Mark a DID as needing garbage collection.
892 */
893 setNeedsGc(did: string): void {
894 this.db
895 .prepare("UPDATE replication_state SET needs_gc = 1 WHERE did = ?")
896 .run(did);
897 }
898
899 /**
900 * Clear the needs_gc flag for a DID.
901 */
902 clearNeedsGc(did: string): void {
903 this.db
904 .prepare("UPDATE replication_state SET needs_gc = 0 WHERE did = ?")
905 .run(did);
906 }
907
908 /**
909 * Get all DIDs that need garbage collection.
910 */
911 getDidsNeedingGc(): string[] {
912 const rows = this.db
913 .prepare("SELECT did FROM replication_state WHERE needs_gc = 1")
914 .all() as Array<{ did: string }>;
915 return rows.map((r) => r.did);
916 }
917
918 /**
919 * Mark a DID as tombstoned (account deleted/deactivated upstream).
920 */
921 markTombstoned(did: string): void {
922 this.db
923 .prepare("UPDATE replication_state SET status = 'tombstoned' WHERE did = ?")
924 .run(did);
925 }
926
927 /**
928 * Purge all tracking data for a DID.
929 * Returns the CID lists before deletion so the caller can check for orphans.
930 */
931 purgeDidData(did: string): { blocksRemoved: string[]; blobsRemoved: string[] } {
932 const blocksRemoved = this.getBlockCids(did);
933 const blobsRemoved = this.getBlobCids(did);
934
935 const purge = this.db.transaction(() => {
936 this.db.prepare("DELETE FROM replication_blocks WHERE did = ?").run(did);
937 this.db.prepare("DELETE FROM replication_blobs WHERE did = ?").run(did);
938 this.db.prepare("DELETE FROM replication_record_paths WHERE did = ?").run(did);
939 this.db.prepare("DELETE FROM replication_state WHERE did = ?").run(did);
940 this.db.prepare("DELETE FROM peer_endpoints WHERE target_did = ?").run(did);
941 });
942 purge();
943
944 return { blocksRemoved, blobsRemoved };
945 }
946
947 // ============================================
948 // Sync history tracking
949 // ============================================
950
951 /**
952 * Start a sync event, returning its ID for later completion.
953 */
954 startSyncEvent(did: string, sourceType: string, trigger: SyncTrigger = "unknown"): number {
955 const result = this.db
956 .prepare(
957 `INSERT INTO sync_history (did, source_type, trigger, started_at, status)
958 VALUES (?, ?, ?, datetime('now'), 'in_progress')`,
959 )
960 .run(did, sourceType, trigger);
961 return Number(result.lastInsertRowid);
962 }
963
964 /**
965 * Complete a sync event with final metrics.
966 */
967 completeSyncEvent(
968 id: number,
969 data: {
970 status: "success" | "error";
971 errorMessage?: string;
972 blocksAdded?: number;
973 blobsAdded?: number;
974 carBytes?: number;
975 blobBytes?: number;
976 durationMs?: number;
977 rev?: string;
978 rootCid?: string;
979 incremental?: boolean;
980 },
981 ): void {
982 this.db
983 .prepare(
984 `UPDATE sync_history SET
985 completed_at = datetime('now'),
986 status = ?,
987 error_message = ?,
988 blocks_added = ?,
989 blobs_added = ?,
990 car_bytes = ?,
991 blob_bytes = ?,
992 duration_ms = ?,
993 rev = ?,
994 root_cid = ?,
995 incremental = ?
996 WHERE id = ?`,
997 )
998 .run(
999 data.status,
1000 data.errorMessage ?? null,
1001 data.blocksAdded ?? 0,
1002 data.blobsAdded ?? 0,
1003 data.carBytes ?? 0,
1004 data.blobBytes ?? 0,
1005 data.durationMs ?? null,
1006 data.rev ?? null,
1007 data.rootCid ?? null,
1008 data.incremental ? 1 : 0,
1009 id,
1010 );
1011 }
1012
1013 /**
1014 * Get sync history, optionally filtered by DID.
1015 */
1016 getSyncHistory(did?: string, limit: number = 50): SyncHistoryRow[] {
1017 const query = did
1018 ? "SELECT * FROM sync_history WHERE did = ? ORDER BY started_at DESC LIMIT ?"
1019 : "SELECT * FROM sync_history ORDER BY started_at DESC LIMIT ?";
1020 const params = did ? [did, limit] : [limit];
1021 const rows = this.db.prepare(query).all(...params) as Array<Record<string, unknown>>;
1022 return rows.map((r) => this.rowToSyncHistory(r));
1023 }
1024
1025 /**
1026 * Get the count of tracked records for a DID.
1027 */
1028 getRecordCount(did: string): number {
1029 const row = this.db
1030 .prepare(
1031 "SELECT COUNT(*) as count FROM replication_record_paths WHERE did = ?",
1032 )
1033 .get(did) as { count: number };
1034 return row.count;
1035 }
1036
1037 /**
1038 * Get aggregate metrics across all replicated DIDs.
1039 */
1040 getAggregateMetrics(): AggregateMetrics {
1041 const dids = this.db
1042 .prepare("SELECT COUNT(DISTINCT did) as count FROM replication_state")
1043 .get() as { count: number };
1044 const blocks = this.db
1045 .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blocks")
1046 .get() as { count: number; bytes: number };
1047 const blobs = this.db
1048 .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blobs")
1049 .get() as { count: number; bytes: number };
1050 const records = this.db
1051 .prepare("SELECT COUNT(*) as count FROM replication_record_paths")
1052 .get() as { count: number };
1053 const syncs = this.db
1054 .prepare("SELECT COUNT(*) as count FROM sync_history")
1055 .get() as { count: number };
1056 const recentTransfer = this.db
1057 .prepare(
1058 `SELECT COALESCE(SUM(car_bytes + blob_bytes), 0) as bytes
1059 FROM sync_history
1060 WHERE started_at >= datetime('now', '-24 hours') AND status = 'success'`,
1061 )
1062 .get() as { bytes: number };
1063
1064 return {
1065 totalDids: dids.count,
1066 totalBlocks: blocks.count,
1067 totalBlobs: blobs.count,
1068 totalRecords: records.count,
1069 totalBytesHeld: blocks.bytes + blobs.bytes,
1070 totalSyncs: syncs.count,
1071 recentTransferredBytes: recentTransfer.bytes,
1072 };
1073 }
1074
1075 /**
1076 * Get per-DID metrics summary.
1077 */
1078 getDidMetrics(did: string): DidMetrics {
1079 const blocks = this.db
1080 .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blocks WHERE did = ?")
1081 .get(did) as { count: number; bytes: number };
1082 const blobs = this.db
1083 .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blobs WHERE did = ?")
1084 .get(did) as { count: number; bytes: number };
1085 const records = this.getRecordCount(did);
1086 const recentSyncs = this.getSyncHistory(did, 10);
1087
1088 return {
1089 blocks: blocks.count,
1090 blobs: blobs.count,
1091 records,
1092 bytesHeld: blocks.bytes + blobs.bytes,
1093 recentSyncs,
1094 };
1095 }
1096
1097 /**
1098 * Track blocks with their sizes (batch upsert).
1099 */
1100 trackBlocksWithSize(did: string, entries: Array<{ cid: string; sizeBytes: number }>): void {
1101 if (entries.length === 0) return;
1102 const insert = this.db.prepare(
1103 `INSERT INTO replication_blocks (did, cid, size_bytes) VALUES (?, ?, ?)
1104 ON CONFLICT(did, cid) DO UPDATE SET size_bytes = excluded.size_bytes`,
1105 );
1106 const batch = this.db.transaction((items: Array<{ cid: string; sizeBytes: number }>) => {
1107 for (const entry of items) {
1108 insert.run(did, entry.cid, entry.sizeBytes);
1109 }
1110 });
1111 batch(entries);
1112 }
1113
1114 /**
1115 * Track blobs with their sizes (batch upsert).
1116 */
1117 trackBlobsWithSize(did: string, entries: Array<{ cid: string; sizeBytes: number }>): void {
1118 if (entries.length === 0) return;
1119 const insert = this.db.prepare(
1120 `INSERT INTO replication_blobs (did, cid, size_bytes) VALUES (?, ?, ?)
1121 ON CONFLICT(did, cid) DO UPDATE SET size_bytes = excluded.size_bytes`,
1122 );
1123 const batch = this.db.transaction((items: Array<{ cid: string; sizeBytes: number }>) => {
1124 for (const entry of items) {
1125 insert.run(did, entry.cid, entry.sizeBytes);
1126 }
1127 });
1128 batch(entries);
1129 }
1130
1131 /**
1132 * Get trigger breakdown for a DID's recent syncs.
1133 * Returns a map of trigger → count for the most recent N sync events.
1134 */
1135 getTriggerBreakdown(did: string, limit: number = 20): Record<string, number> {
1136 const rows = this.db
1137 .prepare(
1138 `SELECT trigger, COUNT(*) as count FROM (
1139 SELECT trigger FROM sync_history WHERE did = ? ORDER BY started_at DESC LIMIT ?
1140 ) GROUP BY trigger`,
1141 )
1142 .all(did, limit) as Array<{ trigger: string; count: number }>;
1143
1144 const result: Record<string, number> = {};
1145 for (const row of rows) {
1146 result[row.trigger] = row.count;
1147 }
1148 return result;
1149 }
1150
1151 /**
1152 * Look up the root CID for a specific (did, rev) from sync_history.
1153 * Returns null if no successful sync with that rev is found.
1154 */
1155 getRootCidForRev(did: string, rev: string): string | null {
1156 const row = this.db
1157 .prepare(
1158 `SELECT root_cid FROM sync_history
1159 WHERE did = ? AND rev = ? AND status = 'success' AND root_cid IS NOT NULL
1160 ORDER BY id DESC LIMIT 1`,
1161 )
1162 .get(did, rev) as { root_cid: string } | undefined;
1163 return row?.root_cid ?? null;
1164 }
1165
1166 // ============================================
1167 // Lexicon index
1168 // ============================================
1169
1170 /**
1171 * Upsert NSIDs into the lexicon index.
1172 * Updates last_seen_at and recomputes record_count/repo_count from source data.
1173 */
1174 updateLexiconIndex(nsids: string[]): void {
1175 if (nsids.length === 0) return;
1176 const now = new Date().toISOString();
1177 const upsert = this.db.prepare(
1178 `INSERT INTO lexicon_index (nsid, first_seen_at, last_seen_at, record_count, repo_count)
1179 VALUES (?, ?, ?, 0, 0)
1180 ON CONFLICT(nsid) DO UPDATE SET last_seen_at = excluded.last_seen_at`,
1181 );
1182 const countRecords = this.db.prepare(
1183 `SELECT COUNT(*) as cnt FROM replication_record_paths
1184 WHERE record_path LIKE ? || '/%'`,
1185 );
1186 const countRepos = this.db.prepare(
1187 `SELECT COUNT(DISTINCT did) as cnt FROM replication_record_paths
1188 WHERE record_path LIKE ? || '/%'`,
1189 );
1190 const updateCounts = this.db.prepare(
1191 `UPDATE lexicon_index SET record_count = ?, repo_count = ? WHERE nsid = ?`,
1192 );
1193
1194 const batch = this.db.transaction((items: string[]) => {
1195 for (const nsid of items) {
1196 upsert.run(nsid, now, now);
1197 const rc = countRecords.get(nsid) as { cnt: number };
1198 const rp = countRepos.get(nsid) as { cnt: number };
1199 updateCounts.run(rc.cnt, rp.cnt, nsid);
1200 }
1201 });
1202 batch(nsids);
1203 }
1204
1205 /**
1206 * Full rebuild of the lexicon index from replication_record_paths.
1207 */
1208 rebuildLexiconIndex(): void {
1209 const now = new Date().toISOString();
1210 this.db.exec("DELETE FROM lexicon_index");
1211
1212 const rows = this.db.prepare(
1213 `SELECT
1214 SUBSTR(record_path, 1, INSTR(record_path, '/') - 1) as nsid,
1215 COUNT(*) as record_count,
1216 COUNT(DISTINCT did) as repo_count,
1217 MIN(?) as first_seen_at,
1218 MAX(?) as last_seen_at
1219 FROM replication_record_paths
1220 WHERE INSTR(record_path, '/') > 0
1221 GROUP BY SUBSTR(record_path, 1, INSTR(record_path, '/') - 1)`,
1222 ).all(now, now) as Array<{
1223 nsid: string;
1224 record_count: number;
1225 repo_count: number;
1226 first_seen_at: string;
1227 last_seen_at: string;
1228 }>;
1229
1230 if (rows.length === 0) return;
1231
1232 const insert = this.db.prepare(
1233 `INSERT INTO lexicon_index (nsid, first_seen_at, last_seen_at, record_count, repo_count)
1234 VALUES (?, ?, ?, ?, ?)`,
1235 );
1236 const batch = this.db.transaction(() => {
1237 for (const r of rows) {
1238 insert.run(r.nsid, r.first_seen_at, r.last_seen_at, r.record_count, r.repo_count);
1239 }
1240 });
1241 batch();
1242 }
1243
1244 /**
1245 * Query the lexicon index with optional NSID prefix filter.
1246 */
1247 getLexiconIndex(prefix?: string, limit: number = 100): Array<{
1248 nsid: string;
1249 firstSeenAt: string;
1250 lastSeenAt: string;
1251 recordCount: number;
1252 repoCount: number;
1253 }> {
1254 const query = prefix
1255 ? `SELECT * FROM lexicon_index WHERE nsid LIKE ? || '%' ORDER BY record_count DESC LIMIT ?`
1256 : `SELECT * FROM lexicon_index ORDER BY record_count DESC LIMIT ?`;
1257 const params = prefix ? [prefix, limit] : [limit];
1258 const rows = this.db.prepare(query).all(...params) as Array<{
1259 nsid: string;
1260 first_seen_at: string;
1261 last_seen_at: string;
1262 record_count: number;
1263 repo_count: number;
1264 }>;
1265 return rows.map((r) => ({
1266 nsid: r.nsid,
1267 firstSeenAt: r.first_seen_at,
1268 lastSeenAt: r.last_seen_at,
1269 recordCount: r.record_count,
1270 repoCount: r.repo_count,
1271 }));
1272 }
1273
1274 /**
1275 * Get aggregate lexicon stats.
1276 */
1277 getLexiconStats(): { uniqueNsids: number; totalRecords: number } {
1278 const row = this.db.prepare(
1279 `SELECT COUNT(*) as unique_nsids, COALESCE(SUM(record_count), 0) as total_records FROM lexicon_index`,
1280 ).get() as { unique_nsids: number; total_records: number };
1281 return {
1282 uniqueNsids: row.unique_nsids,
1283 totalRecords: row.total_records,
1284 };
1285 }
1286
1287 /**
1288 * Delete all data from all replication tables in a single transaction.
1289 * Used during full disconnect to wipe the node clean.
1290 */
1291 purgeAllData(): void {
1292 const purge = this.db.transaction(() => {
1293 this.db.prepare("DELETE FROM replication_blocks").run();
1294 this.db.prepare("DELETE FROM replication_blobs").run();
1295 this.db.prepare("DELETE FROM replication_record_paths").run();
1296 this.db.prepare("DELETE FROM replication_state").run();
1297 this.db.prepare("DELETE FROM peer_endpoints").run();
1298 this.db.prepare("DELETE FROM admin_tracked_dids").run();
1299 this.db.prepare("DELETE FROM offered_dids").run();
1300 this.db.prepare("DELETE FROM incoming_offers").run();
1301 this.db.prepare("DELETE FROM sync_history").run();
1302 this.db.prepare("DELETE FROM firehose_cursor").run();
1303 this.db.prepare("DELETE FROM plc_mirror").run();
1304 this.db.prepare("DELETE FROM lexicon_index").run();
1305 });
1306 purge();
1307 }
1308
1309 private rowToSyncHistory(row: Record<string, unknown>): SyncHistoryRow {
1310 return {
1311 id: row.id as number,
1312 did: row.did as string,
1313 sourceType: row.source_type as string,
1314 trigger: (row.trigger as SyncTrigger) ?? "unknown",
1315 startedAt: row.started_at as string,
1316 completedAt: (row.completed_at as string) ?? null,
1317 status: row.status as string,
1318 errorMessage: (row.error_message as string) ?? null,
1319 blocksAdded: row.blocks_added as number,
1320 blobsAdded: row.blobs_added as number,
1321 carBytes: row.car_bytes as number,
1322 blobBytes: row.blob_bytes as number,
1323 durationMs: (row.duration_ms as number) ?? null,
1324 rev: (row.rev as string) ?? null,
1325 rootCid: (row.root_cid as string) ?? null,
1326 incremental: (row.incremental as number) === 1,
1327 };
1328 }
1329
1330 private rowToState(row: Record<string, unknown>): SyncState {
1331 let peerMultiaddrs: string[] = [];
1332 if (typeof row.peer_multiaddrs === "string") {
1333 try {
1334 peerMultiaddrs = JSON.parse(row.peer_multiaddrs) as string[];
1335 } catch {
1336 // Malformed JSON, default to empty
1337 }
1338 }
1339
1340 return {
1341 did: row.did as string,
1342 pdsEndpoint: row.pds_endpoint as string,
1343 peerId: (row.peer_id as string) ?? null,
1344 peerMultiaddrs,
1345 peerInfoFetchedAt: (row.peer_info_fetched_at as string) ?? null,
1346 lastSyncRev: (row.last_sync_rev as string) ?? null,
1347 rootCid: (row.root_cid as string) ?? null,
1348 lastSyncAt: (row.last_sync_at as string) ?? null,
1349 lastVerifiedAt: (row.last_verified_at as string) ?? null,
1350 status: row.status as SyncState["status"],
1351 errorMessage: (row.error_message as string) ?? null,
1352 needsGc: (row.needs_gc as number) === 1,
1353 };
1354 }
1355}