/** * SQLite-backed sync state storage for replication. * Tracks per-DID sync progress separately from the repo. */ import type Database from "better-sqlite3"; import type { SyncState, SyncHistoryRow, SyncTrigger, AggregateMetrics, DidMetrics } from "./types.js"; export class SyncStorage { constructor(private db: Database.Database) {} /** * Create the replication tables if they don't exist. */ initSchema(): void { this.db.exec(` CREATE TABLE IF NOT EXISTS replication_state ( did TEXT PRIMARY KEY, pds_endpoint TEXT NOT NULL, peer_id TEXT, peer_info_fetched_at TEXT, last_sync_rev TEXT, root_cid TEXT, last_sync_at TEXT, last_verified_at TEXT, status TEXT NOT NULL DEFAULT 'pending', error_message TEXT ); CREATE TABLE IF NOT EXISTS replication_blocks ( did TEXT NOT NULL, cid TEXT NOT NULL, PRIMARY KEY (did, cid) ); `); // Firehose cursor table: stores the last-seen sequence number // for resumption after restart. this.db.exec(` CREATE TABLE IF NOT EXISTS firehose_cursor ( key TEXT PRIMARY KEY DEFAULT 'cursor', seq INTEGER NOT NULL, updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); `); // Blob tracking table: tracks replicated blob CIDs per DID. this.db.exec(` CREATE TABLE IF NOT EXISTS replication_blobs ( did TEXT NOT NULL, cid TEXT NOT NULL, fetched_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (did, cid) ); `); // Record paths table: tracks record paths per DID for challenge generation. this.db.exec(` CREATE TABLE IF NOT EXISTS replication_record_paths ( did TEXT NOT NULL, record_path TEXT NOT NULL, PRIMARY KEY (did, record_path) ); `); // Peer endpoints table: tracks which peers have which DIDs for P2P fallback fetch. this.db.exec(` CREATE TABLE IF NOT EXISTS peer_endpoints ( target_did TEXT NOT NULL, peer_did TEXT NOT NULL, pds_endpoint TEXT NOT NULL, last_sync_rev TEXT, discovered_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (target_did, peer_did) ); `); // Admin-added DIDs table: persists DIDs added via admin UI. this.db.exec(` CREATE TABLE IF NOT EXISTS admin_tracked_dids ( did TEXT PRIMARY KEY, added_at TEXT NOT NULL DEFAULT (datetime('now')), added_by TEXT ); `); // Incoming offers table: tracks offers received from other nodes. this.db.exec(` CREATE TABLE IF NOT EXISTS incoming_offers ( offerer_did TEXT NOT NULL, subject_did TEXT NOT NULL, offerer_pds_endpoint TEXT, offerer_endpoint TEXT, min_copies INTEGER NOT NULL DEFAULT 2, interval_sec INTEGER NOT NULL DEFAULT 600, priority INTEGER NOT NULL DEFAULT 50, received_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (offerer_did, subject_did) ); `); // PLC mirror table: stores PLC operation logs for tracked DIDs. this.db.exec(` CREATE TABLE IF NOT EXISTS plc_mirror ( did TEXT PRIMARY KEY, operations_json TEXT NOT NULL, op_count INTEGER NOT NULL, last_fetched_at TEXT NOT NULL, last_op_created_at TEXT, validated INTEGER NOT NULL DEFAULT 1, is_tombstoned INTEGER NOT NULL DEFAULT 0 ); `); // Lexicon index table: aggregates NSID usage across all replicated repos. this.db.exec(` CREATE TABLE IF NOT EXISTS lexicon_index ( nsid TEXT PRIMARY KEY, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, record_count INTEGER NOT NULL DEFAULT 0, repo_count INTEGER NOT NULL DEFAULT 0 ); `); // Offered DIDs table: tracks DIDs we've offered to replicate // but don't yet have mutual consent for. this.db.exec(` CREATE TABLE IF NOT EXISTS offered_dids ( did TEXT PRIMARY KEY, pds_endpoint TEXT, offered_at TEXT NOT NULL DEFAULT (datetime('now')) ); `); // Sync history table: logs each sync event with metrics. this.db.exec(` CREATE TABLE IF NOT EXISTS sync_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, did TEXT NOT NULL, source_type TEXT NOT NULL, trigger TEXT NOT NULL DEFAULT 'unknown', started_at TEXT NOT NULL, completed_at TEXT, status TEXT NOT NULL DEFAULT 'in_progress', error_message TEXT, blocks_added INTEGER NOT NULL DEFAULT 0, blobs_added INTEGER NOT NULL DEFAULT 0, car_bytes INTEGER NOT NULL DEFAULT 0, blob_bytes INTEGER NOT NULL DEFAULT 0, duration_ms INTEGER, rev TEXT, root_cid TEXT, incremental INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_sync_history_did ON sync_history(did, started_at DESC); `); // Migrations: add columns if missing (for existing databases) const columns = this.db .prepare("PRAGMA table_info(replication_state)") .all() as Array<{ name: string }>; if (!columns.some((c) => c.name === "root_cid")) { this.db.exec( "ALTER TABLE replication_state ADD COLUMN root_cid TEXT", ); } if (!columns.some((c) => c.name === "peer_multiaddrs")) { this.db.exec( "ALTER TABLE replication_state ADD COLUMN peer_multiaddrs TEXT", ); } // Migration: add size_bytes to replication_blocks and replication_blobs const blockCols = this.db .prepare("PRAGMA table_info(replication_blocks)") .all() as Array<{ name: string }>; if (!blockCols.some((c) => c.name === "size_bytes")) { this.db.exec( "ALTER TABLE replication_blocks ADD COLUMN size_bytes INTEGER NOT NULL DEFAULT 0", ); } const blobCols = this.db .prepare("PRAGMA table_info(replication_blobs)") .all() as Array<{ name: string }>; if (!blobCols.some((c) => c.name === "size_bytes")) { this.db.exec( "ALTER TABLE replication_blobs ADD COLUMN size_bytes INTEGER NOT NULL DEFAULT 0", ); } // Migration: add needs_gc flag to replication_state if (!columns.some((c) => c.name === "needs_gc")) { this.db.exec( "ALTER TABLE replication_state ADD COLUMN needs_gc INTEGER NOT NULL DEFAULT 0", ); } // Migration: add trigger column to sync_history const syncHistoryCols = this.db .prepare("PRAGMA table_info(sync_history)") .all() as Array<{ name: string }>; if (!syncHistoryCols.some((c) => c.name === "trigger")) { this.db.exec( "ALTER TABLE sync_history ADD COLUMN trigger TEXT NOT NULL DEFAULT 'unknown'", ); } } /** * Insert or update sync state for a DID. */ upsertState(state: { did: string; pdsEndpoint: string; peerId?: string | null; status?: string; }): void { this.db .prepare( `INSERT INTO replication_state (did, pds_endpoint, peer_id, status) VALUES (?, ?, ?, ?) ON CONFLICT(did) DO UPDATE SET pds_endpoint = excluded.pds_endpoint, peer_id = COALESCE(excluded.peer_id, replication_state.peer_id)`, ) .run( state.did, state.pdsEndpoint, state.peerId ?? null, state.status ?? "pending", ); } /** * Get sync state for a single DID. */ getState(did: string): SyncState | null { const row = this.db .prepare("SELECT * FROM replication_state WHERE did = ?") .get(did) as Record | undefined; if (!row) return null; return this.rowToState(row); } /** * Get sync states for all tracked DIDs. */ getAllStates(): SyncState[] { const rows = this.db .prepare("SELECT * FROM replication_state ORDER BY did") .all() as Array>; return rows.map((row) => this.rowToState(row)); } /** * Update sync progress after a successful sync. */ updateSyncProgress(did: string, rev: string, rootCid?: string): void { this.db .prepare( `UPDATE replication_state SET last_sync_rev = ?, root_cid = COALESCE(?, root_cid), last_sync_at = datetime('now'), status = 'synced', error_message = NULL WHERE did = ?`, ) .run(rev, rootCid ?? null, did); } /** * Update status and optionally set an error message. */ updateStatus( did: string, status: SyncState["status"], errorMessage?: string, ): void { this.db .prepare( `UPDATE replication_state SET status = ?, error_message = ? WHERE did = ?`, ) .run(status, errorMessage ?? null, did); } /** * Update cached peer info for a DID. */ updatePeerInfo(did: string, peerId: string | null, multiaddrs?: string[]): void { this.db .prepare( `UPDATE replication_state SET peer_id = ?, peer_multiaddrs = ?, peer_info_fetched_at = datetime('now') WHERE did = ?`, ) .run(peerId, multiaddrs && multiaddrs.length > 0 ? JSON.stringify(multiaddrs) : null, did); } /** * Clear cached peer info (e.g. on connection failure). */ clearPeerInfo(did: string): void { this.db .prepare( `UPDATE replication_state SET peer_id = NULL, peer_multiaddrs = NULL, peer_info_fetched_at = NULL WHERE did = ?`, ) .run(did); } /** * Update the last verified timestamp. */ updateVerifiedAt(did: string): void { this.db .prepare( `UPDATE replication_state SET last_verified_at = datetime('now') WHERE did = ?`, ) .run(did); } /** * Track block CIDs for a DID (batch insert, ignores duplicates). */ trackBlocks(did: string, cids: string[]): void { if (cids.length === 0) return; const insert = this.db.prepare( "INSERT OR IGNORE INTO replication_blocks (did, cid) VALUES (?, ?)", ); const batch = this.db.transaction((items: string[]) => { for (const cid of items) { insert.run(did, cid); } }); batch(cids); } /** * Get all tracked block CIDs for a DID. */ getBlockCids(did: string): string[] { const rows = this.db .prepare("SELECT cid FROM replication_blocks WHERE did = ?") .all(did) as Array<{ cid: string }>; return rows.map((r) => r.cid); } /** * Get the count of tracked blocks for a DID. */ getBlockCount(did: string): number { const row = this.db .prepare( "SELECT COUNT(*) as count FROM replication_blocks WHERE did = ?", ) .get(did) as { count: number }; return row.count; } /** * Clear all tracked blocks for a DID. */ clearBlocks(did: string): void { this.db .prepare("DELETE FROM replication_blocks WHERE did = ?") .run(did); } // ============================================ // Firehose cursor persistence // ============================================ /** * Save the firehose cursor (last processed sequence number). */ saveFirehoseCursor(seq: number): void { this.db .prepare( `INSERT INTO firehose_cursor (key, seq, updated_at) VALUES ('cursor', ?, datetime('now')) ON CONFLICT(key) DO UPDATE SET seq = excluded.seq, updated_at = datetime('now')`, ) .run(seq); } /** * Get the saved firehose cursor, or null if none exists. */ getFirehoseCursor(): number | null { const row = this.db .prepare("SELECT seq FROM firehose_cursor WHERE key = 'cursor'") .get() as { seq: number } | undefined; return row?.seq ?? null; } /** * Clear the firehose cursor (e.g., on full re-sync). */ clearFirehoseCursor(): void { this.db .prepare("DELETE FROM firehose_cursor WHERE key = 'cursor'") .run(); } // ============================================ // Record path tracking (for challenge generation) // ============================================ /** * Track record paths for a DID (batch insert, ignores duplicates). */ trackRecordPaths(did: string, paths: string[]): void { if (paths.length === 0) return; const insert = this.db.prepare( "INSERT OR IGNORE INTO replication_record_paths (did, record_path) VALUES (?, ?)", ); const batch = this.db.transaction((items: string[]) => { for (const path of items) { insert.run(did, path); } }); batch(paths); } /** * Get all tracked record paths for a DID. */ getRecordPaths(did: string): string[] { const rows = this.db .prepare( "SELECT record_path FROM replication_record_paths WHERE did = ?", ) .all(did) as Array<{ record_path: string }>; return rows.map((r) => r.record_path); } /** * Remove specific record paths for a DID (batch delete). */ removeRecordPaths(did: string, paths: string[]): void { if (paths.length === 0) return; const remove = this.db.prepare( "DELETE FROM replication_record_paths WHERE did = ? AND record_path = ?", ); const batch = this.db.transaction((items: string[]) => { for (const path of items) { remove.run(did, path); } }); batch(paths); } /** * Clear all tracked record paths for a DID. */ clearRecordPaths(did: string): void { this.db .prepare("DELETE FROM replication_record_paths WHERE did = ?") .run(did); } // ============================================ // Blob tracking (for replicated blob CIDs) // ============================================ /** * Track blob CIDs for a DID (batch insert, ignores duplicates). */ trackBlobs(did: string, cids: string[]): void { if (cids.length === 0) return; const insert = this.db.prepare( "INSERT OR IGNORE INTO replication_blobs (did, cid) VALUES (?, ?)", ); const batch = this.db.transaction((items: string[]) => { for (const cid of items) { insert.run(did, cid); } }); batch(cids); } /** * Check if a blob CID has been fetched for a DID. */ hasBlobCid(did: string, cid: string): boolean { const row = this.db .prepare( "SELECT 1 FROM replication_blobs WHERE did = ? AND cid = ?", ) .get(did, cid); return row !== undefined; } /** * Get all tracked blob CIDs for a DID. */ getBlobCids(did: string): string[] { const rows = this.db .prepare("SELECT cid FROM replication_blobs WHERE did = ?") .all(did) as Array<{ cid: string }>; return rows.map((r) => r.cid); } /** * Get the count of tracked blobs for a DID. */ getBlobCount(did: string): number { const row = this.db .prepare( "SELECT COUNT(*) as count FROM replication_blobs WHERE did = ?", ) .get(did) as { count: number }; return row.count; } // ============================================ // Peer endpoint tracking (for P2P fallback fetch) // ============================================ /** * Upsert a peer endpoint entry for a target DID. */ upsertPeerEndpoint( targetDid: string, peerDid: string, pdsEndpoint: string, lastSyncRev: string | null, ): void { this.db .prepare( `INSERT INTO peer_endpoints (target_did, peer_did, pds_endpoint, last_sync_rev) VALUES (?, ?, ?, ?) ON CONFLICT(target_did, peer_did) DO UPDATE SET pds_endpoint = excluded.pds_endpoint, last_sync_rev = excluded.last_sync_rev, discovered_at = datetime('now')`, ) .run(targetDid, peerDid, pdsEndpoint, lastSyncRev); } /** * Get all known peer endpoints for a target DID. */ getPeerEndpoints( targetDid: string, ): Array<{ peerDid: string; pdsEndpoint: string; lastSyncRev: string | null }> { const rows = this.db .prepare( "SELECT peer_did, pds_endpoint, last_sync_rev FROM peer_endpoints WHERE target_did = ?", ) .all(targetDid) as Array<{ peer_did: string; pds_endpoint: string; last_sync_rev: string | null; }>; return rows.map((r) => ({ peerDid: r.peer_did, pdsEndpoint: r.pds_endpoint, lastSyncRev: r.last_sync_rev, })); } /** * Clear all peer endpoint entries for a target DID. */ clearPeerEndpoints(targetDid: string): void { this.db .prepare("DELETE FROM peer_endpoints WHERE target_did = ?") .run(targetDid); } /** * Look up the first available multiaddr for a PDS endpoint. * Searches replication_state rows matching the given PDS endpoint, * returning the first multiaddr that contains a /p2p/ component (peer ID). */ getMultiaddrForPdsEndpoint(pdsEndpoint: string): string | null { const rows = this.db .prepare( `SELECT peer_multiaddrs FROM replication_state WHERE pds_endpoint = ? AND peer_multiaddrs IS NOT NULL LIMIT 5`, ) .all(pdsEndpoint) as Array<{ peer_multiaddrs: string }>; for (const row of rows) { try { const addrs = JSON.parse(row.peer_multiaddrs) as string[]; // Prefer multiaddrs that include /p2p/ (have peer ID) const withPeerId = addrs.find((a) => a.includes("/p2p/")); if (withPeerId) return withPeerId; // Fall back to first addr if none have /p2p/ if (addrs.length > 0) return addrs[0]!; } catch { // Malformed JSON, skip } } return null; } // ============================================ // Admin-added DID management // ============================================ /** * Add a DID to the admin-tracked list (idempotent). */ addAdminDid(did: string): void { this.db .prepare( "INSERT OR IGNORE INTO admin_tracked_dids (did) VALUES (?)", ) .run(did); } /** * Remove a DID from the admin-tracked list. * Returns true if the DID was actually removed. */ removeAdminDid(did: string): boolean { const result = this.db .prepare("DELETE FROM admin_tracked_dids WHERE did = ?") .run(did); return result.changes > 0; } /** * Get all admin-added DIDs. */ getAdminDids(): string[] { const rows = this.db .prepare("SELECT did FROM admin_tracked_dids ORDER BY added_at") .all() as Array<{ did: string }>; return rows.map((r) => r.did); } /** * Check if a DID was added via the admin interface. */ isAdminDid(did: string): boolean { const row = this.db .prepare("SELECT 1 FROM admin_tracked_dids WHERE did = ?") .get(did); return row !== undefined; } // ============================================ // Offered DID management // ============================================ /** * Add a DID to the offered list (idempotent). */ addOfferedDid(did: string, pdsEndpoint: string | null): void { this.db .prepare( `INSERT INTO offered_dids (did, pds_endpoint) VALUES (?, ?) ON CONFLICT(did) DO UPDATE SET pds_endpoint = COALESCE(excluded.pds_endpoint, offered_dids.pds_endpoint)`, ) .run(did, pdsEndpoint); } /** * Get all offered DIDs with their PDS endpoints. */ getOfferedDids(): Array<{ did: string; pdsEndpoint: string | null; offeredAt: string }> { const rows = this.db .prepare("SELECT did, pds_endpoint, offered_at FROM offered_dids ORDER BY offered_at") .all() as Array<{ did: string; pds_endpoint: string | null; offered_at: string }>; return rows.map((r) => ({ did: r.did, pdsEndpoint: r.pds_endpoint, offeredAt: r.offered_at, })); } /** * Remove a DID from the offered list. * Returns true if the DID was actually removed. */ removeOfferedDid(did: string): boolean { const result = this.db .prepare("DELETE FROM offered_dids WHERE did = ?") .run(did); return result.changes > 0; } /** * Check if a DID is in the offered list. */ isOfferedDid(did: string): boolean { const row = this.db .prepare("SELECT 1 FROM offered_dids WHERE did = ?") .get(did); return row !== undefined; } // ============================================ // Incoming offer management // ============================================ /** * Add or update an incoming offer (idempotent upsert). */ addIncomingOffer(offer: { offererDid: string; subjectDid: string; offererPdsEndpoint?: string | null; offererEndpoint?: string | null; minCopies?: number; intervalSec?: number; priority?: number; }): void { this.db .prepare( `INSERT INTO incoming_offers (offerer_did, subject_did, offerer_pds_endpoint, offerer_endpoint, min_copies, interval_sec, priority) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(offerer_did, subject_did) DO UPDATE SET offerer_pds_endpoint = COALESCE(excluded.offerer_pds_endpoint, incoming_offers.offerer_pds_endpoint), offerer_endpoint = COALESCE(excluded.offerer_endpoint, incoming_offers.offerer_endpoint), min_copies = excluded.min_copies, interval_sec = excluded.interval_sec, priority = excluded.priority, received_at = datetime('now')`, ) .run( offer.offererDid, offer.subjectDid, offer.offererPdsEndpoint ?? null, offer.offererEndpoint ?? null, offer.minCopies ?? 2, offer.intervalSec ?? 600, offer.priority ?? 50, ); } /** * Get all incoming offers. */ getIncomingOffers(): Array<{ offererDid: string; subjectDid: string; offererPdsEndpoint: string | null; offererEndpoint: string | null; minCopies: number; intervalSec: number; priority: number; receivedAt: string; }> { const rows = this.db .prepare("SELECT * FROM incoming_offers ORDER BY received_at DESC") .all() as Array>; return rows.map((r) => ({ offererDid: r.offerer_did as string, subjectDid: r.subject_did as string, offererPdsEndpoint: (r.offerer_pds_endpoint as string) ?? null, offererEndpoint: (r.offerer_endpoint as string) ?? null, minCopies: r.min_copies as number, intervalSec: r.interval_sec as number, priority: r.priority as number, receivedAt: r.received_at as string, })); } /** * Remove an incoming offer. * Returns true if the offer was actually removed. */ removeIncomingOffer(offererDid: string, subjectDid: string): boolean { const result = this.db .prepare("DELETE FROM incoming_offers WHERE offerer_did = ? AND subject_did = ?") .run(offererDid, subjectDid); return result.changes > 0; } /** * Check if an incoming offer exists. */ hasIncomingOffer(offererDid: string, subjectDid: string): boolean { const row = this.db .prepare("SELECT 1 FROM incoming_offers WHERE offerer_did = ? AND subject_did = ?") .get(offererDid, subjectDid); return row !== undefined; } /** * Delete sync state for a DID. */ deleteState(did: string): void { this.db .prepare("DELETE FROM replication_state WHERE did = ?") .run(did); } /** * Clear all tracked blobs for a DID. */ clearBlobs(did: string): void { this.db .prepare("DELETE FROM replication_blobs WHERE did = ?") .run(did); } // ============================================ // Block/blob GC and orphan detection // ============================================ /** * Remove specific block tracking entries for a DID (batch delete). */ removeBlocks(did: string, cids: string[]): void { if (cids.length === 0) return; const remove = this.db.prepare( "DELETE FROM replication_blocks WHERE did = ? AND cid = ?", ); const batch = this.db.transaction((items: string[]) => { for (const cid of items) { remove.run(did, cid); } }); batch(cids); } /** * Find CIDs that have zero remaining references across all DIDs. * Given a list of CID strings, returns those with no rows in replication_blocks. */ findOrphanedCids(cids: string[]): string[] { if (cids.length === 0) return []; const orphaned: string[] = []; const check = this.db.prepare( "SELECT 1 FROM replication_blocks WHERE cid = ? LIMIT 1", ); for (const cid of cids) { const row = check.get(cid); if (!row) orphaned.push(cid); } return orphaned; } /** * Remove specific blob tracking entries for a DID (batch delete). */ removeBlobs(did: string, cids: string[]): void { if (cids.length === 0) return; const remove = this.db.prepare( "DELETE FROM replication_blobs WHERE did = ? AND cid = ?", ); const batch = this.db.transaction((items: string[]) => { for (const cid of items) { remove.run(did, cid); } }); batch(cids); } /** * Find blob CIDs that have zero remaining references across all DIDs. */ findOrphanedBlobCids(cids: string[]): string[] { if (cids.length === 0) return []; const orphaned: string[] = []; const check = this.db.prepare( "SELECT 1 FROM replication_blobs WHERE cid = ? LIMIT 1", ); for (const cid of cids) { const row = check.get(cid); if (!row) orphaned.push(cid); } return orphaned; } /** * Get all tracked block CIDs for a DID as a Set for efficient diffing. */ getBlockCidSet(did: string): Set { return new Set(this.getBlockCids(did)); } /** * Mark a DID as needing garbage collection. */ setNeedsGc(did: string): void { this.db .prepare("UPDATE replication_state SET needs_gc = 1 WHERE did = ?") .run(did); } /** * Clear the needs_gc flag for a DID. */ clearNeedsGc(did: string): void { this.db .prepare("UPDATE replication_state SET needs_gc = 0 WHERE did = ?") .run(did); } /** * Get all DIDs that need garbage collection. */ getDidsNeedingGc(): string[] { const rows = this.db .prepare("SELECT did FROM replication_state WHERE needs_gc = 1") .all() as Array<{ did: string }>; return rows.map((r) => r.did); } /** * Mark a DID as tombstoned (account deleted/deactivated upstream). */ markTombstoned(did: string): void { this.db .prepare("UPDATE replication_state SET status = 'tombstoned' WHERE did = ?") .run(did); } /** * Purge all tracking data for a DID. * Returns the CID lists before deletion so the caller can check for orphans. */ purgeDidData(did: string): { blocksRemoved: string[]; blobsRemoved: string[] } { const blocksRemoved = this.getBlockCids(did); const blobsRemoved = this.getBlobCids(did); const purge = this.db.transaction(() => { this.db.prepare("DELETE FROM replication_blocks WHERE did = ?").run(did); this.db.prepare("DELETE FROM replication_blobs WHERE did = ?").run(did); this.db.prepare("DELETE FROM replication_record_paths WHERE did = ?").run(did); this.db.prepare("DELETE FROM replication_state WHERE did = ?").run(did); this.db.prepare("DELETE FROM peer_endpoints WHERE target_did = ?").run(did); }); purge(); return { blocksRemoved, blobsRemoved }; } // ============================================ // Sync history tracking // ============================================ /** * Start a sync event, returning its ID for later completion. */ startSyncEvent(did: string, sourceType: string, trigger: SyncTrigger = "unknown"): number { const result = this.db .prepare( `INSERT INTO sync_history (did, source_type, trigger, started_at, status) VALUES (?, ?, ?, datetime('now'), 'in_progress')`, ) .run(did, sourceType, trigger); return Number(result.lastInsertRowid); } /** * Complete a sync event with final metrics. */ completeSyncEvent( id: number, data: { status: "success" | "error"; errorMessage?: string; blocksAdded?: number; blobsAdded?: number; carBytes?: number; blobBytes?: number; durationMs?: number; rev?: string; rootCid?: string; incremental?: boolean; }, ): void { this.db .prepare( `UPDATE sync_history SET completed_at = datetime('now'), status = ?, error_message = ?, blocks_added = ?, blobs_added = ?, car_bytes = ?, blob_bytes = ?, duration_ms = ?, rev = ?, root_cid = ?, incremental = ? WHERE id = ?`, ) .run( data.status, data.errorMessage ?? null, data.blocksAdded ?? 0, data.blobsAdded ?? 0, data.carBytes ?? 0, data.blobBytes ?? 0, data.durationMs ?? null, data.rev ?? null, data.rootCid ?? null, data.incremental ? 1 : 0, id, ); } /** * Get sync history, optionally filtered by DID. */ getSyncHistory(did?: string, limit: number = 50): SyncHistoryRow[] { const query = did ? "SELECT * FROM sync_history WHERE did = ? ORDER BY started_at DESC LIMIT ?" : "SELECT * FROM sync_history ORDER BY started_at DESC LIMIT ?"; const params = did ? [did, limit] : [limit]; const rows = this.db.prepare(query).all(...params) as Array>; return rows.map((r) => this.rowToSyncHistory(r)); } /** * Get the count of tracked records for a DID. */ getRecordCount(did: string): number { const row = this.db .prepare( "SELECT COUNT(*) as count FROM replication_record_paths WHERE did = ?", ) .get(did) as { count: number }; return row.count; } /** * Get aggregate metrics across all replicated DIDs. */ getAggregateMetrics(): AggregateMetrics { const dids = this.db .prepare("SELECT COUNT(DISTINCT did) as count FROM replication_state") .get() as { count: number }; const blocks = this.db .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blocks") .get() as { count: number; bytes: number }; const blobs = this.db .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blobs") .get() as { count: number; bytes: number }; const records = this.db .prepare("SELECT COUNT(*) as count FROM replication_record_paths") .get() as { count: number }; const syncs = this.db .prepare("SELECT COUNT(*) as count FROM sync_history") .get() as { count: number }; const recentTransfer = this.db .prepare( `SELECT COALESCE(SUM(car_bytes + blob_bytes), 0) as bytes FROM sync_history WHERE started_at >= datetime('now', '-24 hours') AND status = 'success'`, ) .get() as { bytes: number }; return { totalDids: dids.count, totalBlocks: blocks.count, totalBlobs: blobs.count, totalRecords: records.count, totalBytesHeld: blocks.bytes + blobs.bytes, totalSyncs: syncs.count, recentTransferredBytes: recentTransfer.bytes, }; } /** * Get per-DID metrics summary. */ getDidMetrics(did: string): DidMetrics { const blocks = this.db .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blocks WHERE did = ?") .get(did) as { count: number; bytes: number }; const blobs = this.db .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blobs WHERE did = ?") .get(did) as { count: number; bytes: number }; const records = this.getRecordCount(did); const recentSyncs = this.getSyncHistory(did, 10); return { blocks: blocks.count, blobs: blobs.count, records, bytesHeld: blocks.bytes + blobs.bytes, recentSyncs, }; } /** * Track blocks with their sizes (batch upsert). */ trackBlocksWithSize(did: string, entries: Array<{ cid: string; sizeBytes: number }>): void { if (entries.length === 0) return; const insert = this.db.prepare( `INSERT INTO replication_blocks (did, cid, size_bytes) VALUES (?, ?, ?) ON CONFLICT(did, cid) DO UPDATE SET size_bytes = excluded.size_bytes`, ); const batch = this.db.transaction((items: Array<{ cid: string; sizeBytes: number }>) => { for (const entry of items) { insert.run(did, entry.cid, entry.sizeBytes); } }); batch(entries); } /** * Track blobs with their sizes (batch upsert). */ trackBlobsWithSize(did: string, entries: Array<{ cid: string; sizeBytes: number }>): void { if (entries.length === 0) return; const insert = this.db.prepare( `INSERT INTO replication_blobs (did, cid, size_bytes) VALUES (?, ?, ?) ON CONFLICT(did, cid) DO UPDATE SET size_bytes = excluded.size_bytes`, ); const batch = this.db.transaction((items: Array<{ cid: string; sizeBytes: number }>) => { for (const entry of items) { insert.run(did, entry.cid, entry.sizeBytes); } }); batch(entries); } /** * Get trigger breakdown for a DID's recent syncs. * Returns a map of trigger → count for the most recent N sync events. */ getTriggerBreakdown(did: string, limit: number = 20): Record { const rows = this.db .prepare( `SELECT trigger, COUNT(*) as count FROM ( SELECT trigger FROM sync_history WHERE did = ? ORDER BY started_at DESC LIMIT ? ) GROUP BY trigger`, ) .all(did, limit) as Array<{ trigger: string; count: number }>; const result: Record = {}; for (const row of rows) { result[row.trigger] = row.count; } return result; } /** * Look up the root CID for a specific (did, rev) from sync_history. * Returns null if no successful sync with that rev is found. */ getRootCidForRev(did: string, rev: string): string | null { const row = this.db .prepare( `SELECT root_cid FROM sync_history WHERE did = ? AND rev = ? AND status = 'success' AND root_cid IS NOT NULL ORDER BY id DESC LIMIT 1`, ) .get(did, rev) as { root_cid: string } | undefined; return row?.root_cid ?? null; } // ============================================ // Lexicon index // ============================================ /** * Upsert NSIDs into the lexicon index. * Updates last_seen_at and recomputes record_count/repo_count from source data. */ updateLexiconIndex(nsids: string[]): void { if (nsids.length === 0) return; const now = new Date().toISOString(); const upsert = this.db.prepare( `INSERT INTO lexicon_index (nsid, first_seen_at, last_seen_at, record_count, repo_count) VALUES (?, ?, ?, 0, 0) ON CONFLICT(nsid) DO UPDATE SET last_seen_at = excluded.last_seen_at`, ); const countRecords = this.db.prepare( `SELECT COUNT(*) as cnt FROM replication_record_paths WHERE record_path LIKE ? || '/%'`, ); const countRepos = this.db.prepare( `SELECT COUNT(DISTINCT did) as cnt FROM replication_record_paths WHERE record_path LIKE ? || '/%'`, ); const updateCounts = this.db.prepare( `UPDATE lexicon_index SET record_count = ?, repo_count = ? WHERE nsid = ?`, ); const batch = this.db.transaction((items: string[]) => { for (const nsid of items) { upsert.run(nsid, now, now); const rc = countRecords.get(nsid) as { cnt: number }; const rp = countRepos.get(nsid) as { cnt: number }; updateCounts.run(rc.cnt, rp.cnt, nsid); } }); batch(nsids); } /** * Full rebuild of the lexicon index from replication_record_paths. */ rebuildLexiconIndex(): void { const now = new Date().toISOString(); this.db.exec("DELETE FROM lexicon_index"); const rows = this.db.prepare( `SELECT SUBSTR(record_path, 1, INSTR(record_path, '/') - 1) as nsid, COUNT(*) as record_count, COUNT(DISTINCT did) as repo_count, MIN(?) as first_seen_at, MAX(?) as last_seen_at FROM replication_record_paths WHERE INSTR(record_path, '/') > 0 GROUP BY SUBSTR(record_path, 1, INSTR(record_path, '/') - 1)`, ).all(now, now) as Array<{ nsid: string; record_count: number; repo_count: number; first_seen_at: string; last_seen_at: string; }>; if (rows.length === 0) return; const insert = this.db.prepare( `INSERT INTO lexicon_index (nsid, first_seen_at, last_seen_at, record_count, repo_count) VALUES (?, ?, ?, ?, ?)`, ); const batch = this.db.transaction(() => { for (const r of rows) { insert.run(r.nsid, r.first_seen_at, r.last_seen_at, r.record_count, r.repo_count); } }); batch(); } /** * Query the lexicon index with optional NSID prefix filter. */ getLexiconIndex(prefix?: string, limit: number = 100): Array<{ nsid: string; firstSeenAt: string; lastSeenAt: string; recordCount: number; repoCount: number; }> { const query = prefix ? `SELECT * FROM lexicon_index WHERE nsid LIKE ? || '%' ORDER BY record_count DESC LIMIT ?` : `SELECT * FROM lexicon_index ORDER BY record_count DESC LIMIT ?`; const params = prefix ? [prefix, limit] : [limit]; const rows = this.db.prepare(query).all(...params) as Array<{ nsid: string; first_seen_at: string; last_seen_at: string; record_count: number; repo_count: number; }>; return rows.map((r) => ({ nsid: r.nsid, firstSeenAt: r.first_seen_at, lastSeenAt: r.last_seen_at, recordCount: r.record_count, repoCount: r.repo_count, })); } /** * Get aggregate lexicon stats. */ getLexiconStats(): { uniqueNsids: number; totalRecords: number } { const row = this.db.prepare( `SELECT COUNT(*) as unique_nsids, COALESCE(SUM(record_count), 0) as total_records FROM lexicon_index`, ).get() as { unique_nsids: number; total_records: number }; return { uniqueNsids: row.unique_nsids, totalRecords: row.total_records, }; } /** * Delete all data from all replication tables in a single transaction. * Used during full disconnect to wipe the node clean. */ purgeAllData(): void { const purge = this.db.transaction(() => { this.db.prepare("DELETE FROM replication_blocks").run(); this.db.prepare("DELETE FROM replication_blobs").run(); this.db.prepare("DELETE FROM replication_record_paths").run(); this.db.prepare("DELETE FROM replication_state").run(); this.db.prepare("DELETE FROM peer_endpoints").run(); this.db.prepare("DELETE FROM admin_tracked_dids").run(); this.db.prepare("DELETE FROM offered_dids").run(); this.db.prepare("DELETE FROM incoming_offers").run(); this.db.prepare("DELETE FROM sync_history").run(); this.db.prepare("DELETE FROM firehose_cursor").run(); this.db.prepare("DELETE FROM plc_mirror").run(); this.db.prepare("DELETE FROM lexicon_index").run(); }); purge(); } private rowToSyncHistory(row: Record): SyncHistoryRow { return { id: row.id as number, did: row.did as string, sourceType: row.source_type as string, trigger: (row.trigger as SyncTrigger) ?? "unknown", startedAt: row.started_at as string, completedAt: (row.completed_at as string) ?? null, status: row.status as string, errorMessage: (row.error_message as string) ?? null, blocksAdded: row.blocks_added as number, blobsAdded: row.blobs_added as number, carBytes: row.car_bytes as number, blobBytes: row.blob_bytes as number, durationMs: (row.duration_ms as number) ?? null, rev: (row.rev as string) ?? null, rootCid: (row.root_cid as string) ?? null, incremental: (row.incremental as number) === 1, }; } private rowToState(row: Record): SyncState { let peerMultiaddrs: string[] = []; if (typeof row.peer_multiaddrs === "string") { try { peerMultiaddrs = JSON.parse(row.peer_multiaddrs) as string[]; } catch { // Malformed JSON, default to empty } } return { did: row.did as string, pdsEndpoint: row.pds_endpoint as string, peerId: (row.peer_id as string) ?? null, peerMultiaddrs, peerInfoFetchedAt: (row.peer_info_fetched_at as string) ?? null, lastSyncRev: (row.last_sync_rev as string) ?? null, rootCid: (row.root_cid as string) ?? null, lastSyncAt: (row.last_sync_at as string) ?? null, lastVerifiedAt: (row.last_verified_at as string) ?? null, status: row.status as SyncState["status"], errorMessage: (row.error_message as string) ?? null, needsGc: (row.needs_gc as number) === 1, }; } }