/** * Main replication orchestrator. * Publishes manifest records, syncs remote repos to IPFS. * Optionally driven by a PolicyEngine for per-DID intervals, priority, and filtering. */ import type Database from "better-sqlite3"; import type { Config } from "../config.js"; import type { RepoManager } from "../repo-manager.js"; import { extractBlobCids } from "../repo-manager.js"; import { create as createCid, CODEC_RAW, toString as cidToString } from "@atcute/cid"; import type { BlockStore, NetworkService, CommitNotification, IdentityNotification } from "../ipfs.js"; import type { DidResolver } from "../did-resolver.js"; import { readCarWithRoot } from "@atproto/repo"; import { decode as cborDecode } from "../cbor-compat.js"; import type { ReplicatedRepoReader } from "./replicated-repo-reader.js"; import type { PolicyEngine } from "../policy/engine.js"; import { MANIFEST_NSID, didToRkey, type ManifestRecord, type SyncState, type SyncTrigger, type VerificationConfig, type LayeredVerificationResult, DEFAULT_VERIFICATION_CONFIG, } from "./types.js"; import { SyncStorage } from "./sync-storage.js"; import { RepoFetcher } from "./repo-fetcher.js"; import { PeerDiscovery } from "./peer-discovery.js"; import { BlockVerifier, RemoteVerifier } from "./verification.js"; import { extractAllRecordPaths, extractAllCids } from "./mst-proof.js"; import { FirehoseSubscription, type FirehoseCommitEvent, type FirehoseAccountEvent, } from "./firehose-subscription.js"; import { ChallengeScheduler } from "./challenge-response/challenge-scheduler.js"; import { ChallengeStorage, type ChallengeHistoryRow, type PeerReliabilityRow } from "./challenge-response/challenge-storage.js"; import type { ChallengeTransport } from "./challenge-response/transport.js"; import { OfferManager, type RecordWriter } from "./offer-manager.js"; import { fetchRepoFromPeer } from "./libp2p-sync.js"; import type { Libp2p } from "@libp2p/interface"; /** Progress event emitted during sync operations for live UI updates. */ export interface SyncProgressEvent { type: string; did: string; sourceType?: string; carBytes?: number; blocksStored?: number; blocksSizeKb?: number; blobsFetched?: number; blobsTotal?: number; blobBytes?: number; durationMs?: number; error?: string; missingBlocks?: number; } /** Extract unique NSIDs from record paths (collection/rkey format). */ function extractNsids(paths: string[]): string[] { const nsids = new Set(); for (const p of paths) { const slash = p.indexOf("/"); if (slash > 0) nsids.add(p.slice(0, slash)); } return [...nsids]; } /** How old cached peer info can be before re-fetching (1 hour). */ const PEER_INFO_TTL_MS = 60 * 60 * 1000; /** Default sync interval when no policy engine is present (5 minutes). */ const DEFAULT_SYNC_INTERVAL_MS = 5 * 60 * 1000; /** Minimum tick interval to prevent excessive polling (10 seconds). */ const MIN_TICK_INTERVAL_MS = 10 * 1000; export class ReplicationManager { private syncStorage: SyncStorage; private repoFetcher: RepoFetcher; private peerDiscovery: PeerDiscovery; private verifier: BlockVerifier; private remoteVerifier: RemoteVerifier; private syncTimer: ReturnType | null = null; private verificationTimer: ReturnType | null = null; private verificationConfig: VerificationConfig; private lastVerificationResults: Map = new Map(); private firehoseSubscription: FirehoseSubscription | null = null; private firehoseCursorSaveTimer: ReturnType | null = null; private challengeStorage: ChallengeStorage; private challengeScheduler: ChallengeScheduler | null = null; private stopped = false; private policyEngine: PolicyEngine | null = null; private offerManager: OfferManager | null = null; /** Per-DID last-sync timestamps (epoch ms) for policy-driven interval tracking. */ private lastSyncTimestamps: Map = new Map(); /** Dedup set for gossipsub notifications, keyed by `${did}:${rev}`. */ private recentNotifications: Set = new Set(); private notificationCleanupTimer: ReturnType | null = null; /** libp2p node for peer-first repo sync. Set via setLibp2p(). */ private libp2p: Libp2p | null = null; /** Cached multiaddrs from last peer record publish, for change detection. */ private lastPublishedAddrs: Set = new Set(); /** Callback to republish peer record when multiaddrs change. */ private publishPeerRecordFn?: () => Promise; /** Sync progress event subscribers for live UI updates via SSE. */ private progressCallbacks: Set<(event: SyncProgressEvent) => void> = new Set(); /** Timer for periodic PLC mirror refresh (6 hours). */ private plcRefreshTimer: ReturnType | null = null; constructor( private db: Database.Database, private config: Config, private repoManager: RepoManager | undefined, private blockStore: BlockStore, private networkService: NetworkService, private didResolver: DidResolver, verificationConfig?: Partial, private replicatedRepoReader?: ReplicatedRepoReader, policyEngine?: PolicyEngine, pdsClient?: RecordWriter, ) { this.syncStorage = new SyncStorage(db); this.syncStorage.initSchema(); this.challengeStorage = new ChallengeStorage(db); this.challengeStorage.initSchema(); this.repoFetcher = new RepoFetcher(didResolver); this.peerDiscovery = new PeerDiscovery(this.repoFetcher); this.verifier = new BlockVerifier(blockStore); this.verificationConfig = { ...DEFAULT_VERIFICATION_CONFIG, ...verificationConfig, }; this.remoteVerifier = new RemoteVerifier( blockStore, this.verificationConfig, ); if (policyEngine) { this.policyEngine = policyEngine; // Prefer remote PDS client for offer records; fall back to local repo const recordWriter: RecordWriter | undefined = pdsClient ?? repoManager; if (recordWriter) { this.offerManager = new OfferManager( recordWriter, this.peerDiscovery, policyEngine, config.DID ?? "", ); } } } /** * Get the PolicyEngine, if one is configured. */ getPolicyEngine(): PolicyEngine | null { return this.policyEngine; } /** * Get the OfferManager, if one is configured (requires PolicyEngine). */ getOfferManager(): OfferManager | null { return this.offerManager; } /** * Late-bind a PdsClient (or any RecordWriter) after construction. * Called when OAuth login completes after the server has already started. * Creates the OfferManager if a PolicyEngine is present but OfferManager * wasn't created at construction time (because no RecordWriter was available). */ setPdsClient(client: RecordWriter, did: string): void { if (this.policyEngine && !this.offerManager) { this.offerManager = new OfferManager( client, this.peerDiscovery, this.policyEngine, did, ); } } /** * Set the libp2p node for peer-first repo sync. * Called from start.ts after IPFS is started. */ setLibp2p(libp2p: Libp2p): void { this.libp2p = libp2p; } /** * Set a callback to republish the peer record when multiaddrs change. * Called from start.ts after IPFS and replication are ready. */ setPublishPeerRecordFn(fn: () => Promise): void { this.publishPeerRecordFn = fn; // Seed the cache with current addrs so the first check doesn't spuriously trigger this.lastPublishedAddrs = new Set(this.networkService.getMultiaddrs()); } /** Subscribe to sync progress events (for SSE streaming). */ onSyncProgress(cb: (event: SyncProgressEvent) => void): void { this.progressCallbacks.add(cb); } /** Unsubscribe from sync progress events. */ offSyncProgress(cb: (event: SyncProgressEvent) => void): void { this.progressCallbacks.delete(cb); } /** Emit a progress event to all subscribers. */ private emitProgress(event: SyncProgressEvent): void { for (const cb of this.progressCallbacks) { try { cb(event); } catch { /* non-fatal */ } } } /** * Check if multiaddrs have changed since last peer record publish. * If changed, republish the peer record and update the cache. */ private async _checkMultiaddrsChanged(): Promise { if (!this.publishPeerRecordFn) return; const current = new Set(this.networkService.getMultiaddrs()); if (current.size === 0) return; // Compare sets if ( current.size === this.lastPublishedAddrs.size && [...current].every((a) => this.lastPublishedAddrs.has(a)) ) { return; } this.lastPublishedAddrs = current; try { await this.publishPeerRecordFn(); console.log("[replication] Multiaddrs changed, republished peer record"); } catch (err) { console.warn( "[replication] Failed to republish peer record:", err instanceof Error ? err.message : String(err), ); } } /** * Initialize replication: create tables, sync manifests. */ async init(): Promise { this.syncStorage.initSchema(); this.challengeStorage.initSchema(); await this.syncManifests(); await this.discoverPeerEndpoints(); await this.runOfferDiscovery(); this.setupGossipsubSubscription(); } /** * Get the list of DIDs to replicate. * * When a PolicyEngine is present, uses getActiveDids() as the single source of truth. * Falls back to legacy three-source merge when no PolicyEngine is configured. */ getReplicateDids(): string[] { if (this.policyEngine) { return this.policyEngine.getActiveDids(); } // Legacy fallback: merge config + admin DIDs (no policy engine) const allDids = new Set(this.config.REPLICATE_DIDS); for (const did of this.syncStorage.getAdminDids()) { allDids.add(did); } return [...allDids]; } /** * Determine the source of a tracked DID by looking up its policy. */ getDidSource(did: string): "config" | "user" | "policy" | null { if (this.policyEngine) { const configPolicy = this.policyEngine.getStoredPolicy(`config:${did}`); if (configPolicy) return "config"; const archivePolicy = this.policyEngine.getStoredPolicy(`archive:${did}`); if (archivePolicy) return "user"; const p2pPolicy = this.policyEngine.getStoredPolicy(`p2p:${did}`); if (p2pPolicy) return "policy"; // Check if any other policy covers this DID if (this.policyEngine.shouldReplicate(did)) return "policy"; } // Legacy fallback if (this.config.REPLICATE_DIDS.includes(did)) return "config"; if (this.syncStorage.isAdminDid(did)) return "user"; return null; } /** * Add a DID via the admin interface. * Creates an archive policy (+ dual-write to admin_tracked_dids for rollback safety), * creates manifest + sync state, subscribes gossipsub, updates firehose. * Returns the status and source if already tracked. */ async addDid(did: string): Promise<{ status: "added" | "already_tracked"; source?: string }> { const existingSource = this.getDidSource(did); if (existingSource) { return { status: "already_tracked", source: existingSource }; } // Check if already tracked via getReplicateDids (covers pattern/all policies) if (this.getReplicateDids().includes(did)) { return { status: "already_tracked", source: "policy" }; } // Create archive policy in PolicyEngine (auto-persists to DB) if (this.policyEngine) { const policyId = `archive:${did}`; if (!this.policyEngine.getPolicies().some((p) => p.id === policyId)) { const { archive } = await import("../policy/presets.js"); this.policyEngine.addPolicy(archive(did)); } } // Dual-write to admin_tracked_dids for rollback safety this.syncStorage.addAdminDid(did); await this.syncManifestForDid(did); this.networkService.subscribeCommitTopics([did]); this.networkService.subscribeIdentityTopics([did]); this.updateFirehoseDids(); // Trigger initial sync in background (fire-and-forget) this.syncDid(did, "manual").catch((err) => { console.error(`[replication] Initial sync for admin-added ${did} failed:`, err); }); // Fetch PLC log in background (fire-and-forget) if (did.startsWith("did:plc:")) { this.fetchPlcLog(did).catch((err) => { console.warn(`[replication] PLC log fetch for ${did} failed:`, err instanceof Error ? err.message : String(err)); }); } return { status: "added" }; } /** * Offer to replicate a DID (consent-gated). * Publishes an offer record and stores in offered_dids, but does NOT sync. * Replication begins only when mutual consent is detected during offer discovery. */ async offerDid(did: string): Promise<{ status: "offered" | "already_tracked" | "already_offered"; source?: string }> { // Already fully tracked? const existingSource = this.getDidSource(did); if (existingSource) { return { status: "already_tracked", source: existingSource }; } // Already offered? if (this.syncStorage.isOfferedDid(did)) { return { status: "already_offered" }; } // Require active session to publish offer record if (!this.offerManager) { throw new Error("No active session — log in first to publish offers"); } // Resolve PDS endpoint const pdsEndpoint = await this.repoFetcher.resolvePds(did); // Publish offer record to user's PDS await this.offerManager.publishOffer(did); // Store in offered_dids (does NOT create replication_state or trigger sync) this.syncStorage.addOfferedDid(did, pdsEndpoint ?? null); // Push notification: tell the target node about this offer (fire-and-forget) // Small delay to ensure PDS has committed the record before the peer verifies it setTimeout(() => { this.pushOfferNotification(did).catch((err) => { console.warn( `[replication] Failed to push offer notification to ${did}:`, err instanceof Error ? err.message : String(err), ); }); }, 2000); return { status: "offered" }; } /** * Remove an offered DID: revoke the offer, notify the peer, and remove from offered_dids. */ async removeOfferedDid(did: string): Promise<{ status: "removed" | "not_found" }> { if (!this.syncStorage.isOfferedDid(did)) { return { status: "not_found" }; } // Revoke offer record if OfferManager is available if (this.offerManager) { await this.offerManager.revokeOffer(did); } this.syncStorage.removeOfferedDid(did); // Notify peer that the offer was revoked (fire-and-forget) this.pushRevokeNotification(did).catch((err) => { console.warn(`[replication] Failed to push revoke notification to ${did}:`, err instanceof Error ? err.message : String(err)); }); return { status: "removed" }; } /** * Revoke consent and remove a DID that was promoted to active replication via mutual offer. * Revokes the offer record, purges all data, and notifies the peer. */ async revokeReplication(did: string): Promise<{ status: "revoked" | "not_found" | "error"; error?: string }> { // Must be an actively tracked DID const source = this.getDidSource(did); const hasState = this.syncStorage.getAllStates().some((s) => s.did === did); if (!source && !hasState) { return { status: "not_found" }; } // Revoke offer record on PDS if (this.offerManager) { try { await this.offerManager.revokeOffer(did); } catch { // Non-fatal: offer record may not exist } } // Transition P2P policy to terminated if (this.policyEngine) { const policyId = `p2p:${did}`; this.policyEngine.transitionPolicy(policyId, "terminated"); } // Remove from offered_dids if present if (this.syncStorage.isOfferedDid(did)) { this.syncStorage.removeOfferedDid(did); } // Notify peer BEFORE purging (purge clears peer endpoints needed for notification) try { await this.pushRevokeNotification(did); } catch (err) { console.warn(`[replication] Failed to push revoke notification to ${did}:`, err instanceof Error ? err.message : String(err)); } // Remove from replication with full data purge const result = await this.removeDid(did, true); if (result.status === "error") { return { status: "error", error: result.error }; } return { status: "revoked" }; } /** * Handle a revocation from a remote peer: stop replicating their DID and purge data. */ async handleRemoteRevocation(revokerDid: string): Promise { // Remove from incoming offers if present const incomingOffers = this.syncStorage.getIncomingOffers(); for (const offer of incomingOffers) { if (offer.offererDid === revokerDid) { this.syncStorage.removeIncomingOffer(revokerDid, offer.subjectDid); } } // Remove from offered_dids if present if (this.syncStorage.isOfferedDid(revokerDid)) { if (this.offerManager) { try { await this.offerManager.revokeOffer(revokerDid); } catch { /* non-fatal */ } } this.syncStorage.removeOfferedDid(revokerDid); } // Transition P2P policy to terminated if (this.policyEngine) { const policyId = `p2p:${revokerDid}`; if (this.policyEngine.transitionPolicy(policyId, "terminated")) { console.log(`[replication] Terminated P2P policy ${policyId}`); } } // Remove from active replication with data purge const isTracked = this.getDidSource(revokerDid) !== null || this.syncStorage.getAllStates().some((s) => s.did === revokerDid); if (isTracked) { console.log(`[replication] Remote revocation from ${revokerDid} — removing DID and purging data`); await this.removeDid(revokerDid, true); } } /** * Push an offer notification to the target node's p2pds endpoint. * Resolves the target's org.p2pds.peer record to find their endpoint URL, * then POSTs to their notifyOffer XRPC method. */ private async pushOfferNotification(targetDid: string): Promise { const peerInfo = await this.peerDiscovery.discoverPeer(targetDid); if (!peerInfo?.endpoint) { console.log(`[replication] No p2pds endpoint found for ${targetDid}, skipping push notification`); return; } const url = `${peerInfo.endpoint}/xrpc/org.p2pds.replication.notifyOffer`; const body = { offererDid: this.config.DID, subjectDid: targetDid, offererPdsEndpoint: await this.repoFetcher.resolvePds(this.config.DID ?? ""), params: { minCopies: 2, intervalSec: 600, priority: 50, }, }; const res = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), }); if (!res.ok) { const text = await res.text().catch(() => ""); console.warn(`[replication] Push notification to ${targetDid} failed (${res.status}): ${text}`); } else { console.log(`[replication] Push notification sent to ${targetDid} at ${peerInfo.endpoint}`); } } /** * Push a revocation notification to the target node's p2pds endpoint. */ private async pushRevokeNotification(targetDid: string): Promise { const peerInfo = await this.peerDiscovery.discoverPeer(targetDid); if (!peerInfo?.endpoint) { console.log(`[replication] No p2pds endpoint found for ${targetDid}, skipping revoke notification`); return; } const url = `${peerInfo.endpoint}/xrpc/org.p2pds.replication.notifyRevoke`; const body = { revokerDid: this.config.DID, subjectDid: targetDid, }; const res = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), }); if (!res.ok) { const text = await res.text().catch(() => ""); console.warn(`[replication] Revoke notification to ${targetDid} failed (${res.status}): ${text}`); } else { console.log(`[replication] Revoke notification sent to ${targetDid} at ${peerInfo.endpoint}`); } } /** * Accept an incoming offer: create a reciprocal offer and remove from incoming_offers. */ async acceptOffer(offererDid: string): Promise<{ status: "accepted" | "not_found" | "error"; error?: string }> { // Find the incoming offer to get the subject DID const offers = this.syncStorage.getIncomingOffers(); const offer = offers.find((o) => o.offererDid === offererDid); if (!offer) { return { status: "not_found" }; } // Create reciprocal offer (which also push-notifies back) try { await this.offerDid(offererDid); } catch (err) { return { status: "error", error: `Failed to create reciprocal offer: ${err instanceof Error ? err.message : String(err)}`, }; } // Remove from incoming_offers this.syncStorage.removeIncomingOffer(offererDid, offer.subjectDid); // Run offer discovery immediately to detect mutual agreement and start replication this.triggerOfferDiscovery(); return { status: "accepted" }; } /** * Reject an incoming offer: remove from incoming_offers without creating a reciprocal offer. */ rejectOffer(offererDid: string): { status: "rejected" | "not_found" } { const offers = this.syncStorage.getIncomingOffers(); const offer = offers.find((o) => o.offererDid === offererDid); if (!offer) { return { status: "not_found" }; } this.syncStorage.removeIncomingOffer(offererDid, offer.subjectDid); return { status: "rejected" }; } /** * Trigger offer discovery asynchronously (fire-and-forget). * Used after accepting an offer or receiving a push notification * to immediately detect mutual agreements. */ triggerOfferDiscovery(): void { this.runOfferDiscovery().catch((err) => { console.warn("[replication] Triggered offer discovery failed:", err instanceof Error ? err.message : String(err)); }); } /** * Get all incoming offers (from other nodes wanting to replicate our data). */ getIncomingOffers(): Array<{ offererDid: string; subjectDid: string; offererPdsEndpoint: string | null; offererEndpoint: string | null; minCopies: number; intervalSec: number; priority: number; receivedAt: string; }> { return this.syncStorage.getIncomingOffers(); } /** * Get the PeerDiscovery instance (for verifying incoming offers). */ getPeerDiscovery(): PeerDiscovery { return this.peerDiscovery; } /** * Get the RepoFetcher instance (for resolving PDS endpoints). */ getRepoFetcher(): RepoFetcher { return this.repoFetcher; } /** * Remove an admin-added DID. * Returns error info if the DID cannot be removed (config/policy origin). * If purgeData is true, deletes all associated data. Otherwise pauses. */ async removeDid(did: string, purgeData: boolean = false): Promise<{ status: "removed" | "error"; purged: boolean; error?: string; }> { // Check if it's a config-origin DID (not removable via UI) const configPolicyId = `config:${did}`; if (this.policyEngine?.getStoredPolicy(configPolicyId) || this.config.REPLICATE_DIDS.includes(did)) { return { status: "error", purged: false, error: "Cannot remove config-origin DID. Remove from REPLICATE_DIDS env var." }; } // Transition archive/p2p policies to purged state if (this.policyEngine) { const archivePolicyId = `archive:${did}`; this.policyEngine.transitionPolicy(archivePolicyId, "purged"); const p2pPolicyId = `p2p:${did}`; this.policyEngine.transitionPolicy(p2pPolicyId, "purged"); } // Dual-write: also remove from legacy table this.syncStorage.removeAdminDid(did); this.networkService.unsubscribeCommitTopics([did]); this.networkService.unsubscribeIdentityTopics([did]); this.updateFirehoseDids(); if (purgeData) { this.syncStorage.clearBlocks(did); this.syncStorage.clearBlobs(did); this.syncStorage.clearRecordPaths(did); this.syncStorage.clearPeerEndpoints(did); this.syncStorage.deleteState(did); // Remove manifest record if (this.repoManager) { const rkey = didToRkey(did); try { await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); } catch { // Non-fatal: manifest may not exist } } } else { // Pause: update manifest status but keep data if (this.repoManager) { const rkey = didToRkey(did); const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); if (existing) { const manifest: ManifestRecord = { ...(existing.record as ManifestRecord), status: "paused", }; await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); } } } return { status: "removed", purged: purgeData }; } /** * Update firehose subscription with the current set of tracked DIDs. */ private updateFirehoseDids(): void { if (this.firehoseSubscription) { this.firehoseSubscription.updateDids(new Set(this.getReplicateDids())); } } /** * Ensure a manifest record and sync state exist for a single DID. * Extracted from syncManifests() for use by addDid(). */ private async syncManifestForDid(did: string): Promise { if (this.repoManager) { const rkey = didToRkey(did); const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); if (!existing) { const manifest: ManifestRecord = { $type: MANIFEST_NSID, subject: did, status: "active", lastSyncRev: null, lastSyncAt: null, createdAt: new Date().toISOString(), }; await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); } } const pdsEndpoint = await this.repoFetcher.resolvePds(did); if (pdsEndpoint) { this.syncStorage.upsertState({ did, pdsEndpoint }); } } /** * Ensure a manifest record exists for each configured DID. * Creates new manifests or updates existing ones. */ async syncManifests(): Promise { for (const did of this.getReplicateDids()) { await this.syncManifestForDid(did); } } /** * Run offer discovery: gather peers from sync state AND offered_dids, * then discover mutual agreements. * When a mutual agreement is detected for an offered DID, * promotes it to full replication via addDid() and removes from offered_dids. * Non-fatal: errors are logged but don't block sync. */ private async runOfferDiscovery(): Promise { if (!this.offerManager) return; try { // Existing: peers from replication_state const states = this.syncStorage.getAllStates(); const syncPeers = states .filter((s) => s.pdsEndpoint) .map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); // New: peers from offered_dids const offeredDids = this.syncStorage.getOfferedDids(); const offeredPeers = offeredDids .filter((o) => o.pdsEndpoint) .map((o) => ({ did: o.did, pdsEndpoint: o.pdsEndpoint! })); // Merge and deduplicate by DID const seenDids = new Set(); const allPeers: Array<{ did: string; pdsEndpoint: string }> = []; for (const peer of [...syncPeers, ...offeredPeers]) { if (!seenDids.has(peer.did)) { seenDids.add(peer.did); allPeers.push(peer); } } if (allPeers.length > 0) { const agreements = await this.offerManager.discoverAgreements(allPeers); // Check if any offered DIDs now have mutual agreements for (const offered of offeredDids) { const hasAgreement = agreements.some( (a) => a.localOffer.subject === offered.did || a.remoteOffer.subject === offered.did, ); if (hasAgreement) { console.log(`[replication] Mutual agreement detected for offered DID ${offered.did} — promoting to replication`); try { await this.addDid(offered.did); this.syncStorage.removeOfferedDid(offered.did); } catch (err) { console.error( `[replication] Failed to promote offered DID ${offered.did}:`, err instanceof Error ? err.message : String(err), ); } } } // Detect broken agreements: P2P-origin DIDs whose remote offer was revoked const activePeerDids = new Set(agreements.map((a) => a.counterpartyDid)); for (const state of states) { // Only check P2P-origin DIDs (those that were promoted from offers) const policyId = `p2p:${state.did}`; const hasP2pPolicy = this.policyEngine?.getPolicies().some((p) => p.id === policyId); if (hasP2pPolicy && !activePeerDids.has(state.did)) { console.log(`[replication] Agreement broken for ${state.did} (remote offer revoked) — removing and purging data`); try { await this.removeDid(state.did, true); } catch (err) { console.error(`[replication] Failed to remove broken-agreement DID ${state.did}:`, err instanceof Error ? err.message : String(err)); } } } // Still run full discoverAndSync for sync-state peers await this.offerManager.discoverAndSync(syncPeers); } } catch (err) { console.error( "[replication] Offer discovery error:", err instanceof Error ? err.message : String(err), ); } } /** * Discover peer endpoints by scanning manifests of known peers. * For each peer whose manifest includes a DID we're tracking, record * that peer as a potential fallback source for that DID. */ private async discoverPeerEndpoints(): Promise { const trackedDids = new Set(this.getReplicateDids()); const states = this.syncStorage.getAllStates(); for (const state of states) { if (!state.pdsEndpoint) continue; try { const manifests = await this.peerDiscovery.discoverManifests( state.did, state.pdsEndpoint, ); for (const manifest of manifests) { if ( manifest.subject && trackedDids.has(manifest.subject) && manifest.status === "active" ) { this.syncStorage.upsertPeerEndpoint( manifest.subject, state.did, state.pdsEndpoint, manifest.lastSyncRev ?? null, ); } } } catch { // Non-fatal: peer endpoint discovery is best-effort } } } /** * Fetch a repo from known peer endpoints when the source PDS is unavailable. * Tries peers in order of freshest data (highest lastSyncRev). * Throws the original error if no peers succeed. */ private async fetchFromPeersOrThrow( did: string, since: string | undefined, originalError: Error, ): Promise { const peers = this.syncStorage.getPeerEndpoints(did); if (peers.length === 0) throw originalError; // Sort by lastSyncRev descending (prefer freshest data) peers.sort((a, b) => { if (!a.lastSyncRev && !b.lastSyncRev) return 0; if (!a.lastSyncRev) return 1; if (!b.lastSyncRev) return -1; return b.lastSyncRev.localeCompare(a.lastSyncRev); }); for (const peer of peers) { try { return await this.repoFetcher.fetchRepo( peer.pdsEndpoint, did, since, ); } catch { // Try next peer } } throw originalError; } /** * Sync all configured DIDs. * * When a PolicyEngine is present: * - Merges config DIDs with policy explicit DIDs * - Filters out DIDs where shouldReplicate is false * - Sorts by priority (highest first) * - Respects per-DID sync intervals (skips DIDs not yet due) */ async syncAll(): Promise { const dids = this.getReplicateDids(); // Sort by priority (highest first) when policy engine is present const sortedDids = this.policyEngine ? this.sortDidsByPriority(dids) : dids; this.emitProgress({ type: "sync-cycle:start", did: "*" }); for (const did of sortedDids) { if (this.stopped) break; // Skip tombstoned DIDs (handled separately by cleanupTombstonedDids) const currentState = this.syncStorage.getState(did); if (currentState?.status === "tombstoned") continue; // Check per-DID interval when policy engine is present if (this.policyEngine && !this.isDidDueForSync(did)) { continue; } try { await this.syncDid(did, "periodic"); // Record successful sync timestamp this.lastSyncTimestamps.set(did, Date.now()); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.syncStorage.updateStatus(did, "error", message); } } // Run deferred GC for DIDs flagged by firehose delete/update ops await this.runDeferredGc(); // Clean up tombstoned DIDs after grace period await this.cleanupTombstonedDids(); // Discover peer endpoints for P2P fallback await this.discoverPeerEndpoints(); // Re-run offer discovery to pick up new/revoked offers await this.runOfferDiscovery(); // Check if multiaddrs changed and republish peer record if needed await this._checkMultiaddrsChanged(); this.emitProgress({ type: "sync-cycle:complete", did: "*" }); } /** * Sort DIDs by their effective policy priority (highest first). * DIDs without a matching policy get priority 0. */ private sortDidsByPriority(dids: string[]): string[] { if (!this.policyEngine) return dids; return [...dids].sort((a, b) => { const pa = this.policyEngine!.evaluate(a).priority; const pb = this.policyEngine!.evaluate(b).priority; return pb - pa; }); } /** * Check whether a DID is due for sync based on its policy interval. * DIDs that have never been synced are always due. */ private isDidDueForSync(did: string): boolean { if (!this.policyEngine) return true; const lastSync = this.lastSyncTimestamps.get(did); if (lastSync === undefined) return true; // never synced const config = this.policyEngine.getReplicationConfig(did); // Config DIDs without a matching policy use the default interval const intervalMs = config ? config.sync.intervalSec * 1000 : DEFAULT_SYNC_INTERVAL_MS; return Date.now() - lastSync >= intervalMs; } /** * Get the effective sync interval for a DID in milliseconds. * Returns the policy-driven interval if a policy engine is present, * otherwise returns the default 5-minute interval. */ getEffectiveSyncIntervalMs(did: string): number { if (!this.policyEngine) return DEFAULT_SYNC_INTERVAL_MS; const config = this.policyEngine.getReplicationConfig(did); return config ? config.sync.intervalSec * 1000 : DEFAULT_SYNC_INTERVAL_MS; } /** * Sync a single DID: fetch repo, store blocks in IPFS, verify, update state. */ async syncDid(did: string, trigger: SyncTrigger = "unknown"): Promise { this.syncStorage.updateStatus(did, "syncing"); const syncStart = Date.now(); let sourceType = "pds"; // 1. Resolve PDS endpoint let state = this.syncStorage.getState(did); let pdsEndpoint = await this.repoFetcher.resolvePds(did); if (!pdsEndpoint) { // If previously had a PDS endpoint, this may be a tombstoned/deleted account if (state?.pdsEndpoint) { this.syncStorage.markTombstoned(did); console.warn(`[replication] DID ${did} no longer resolvable — marked as tombstoned`); } else { this.syncStorage.updateStatus(did, "error", "Could not resolve PDS endpoint"); } return; } // Update PDS endpoint if it changed this.syncStorage.upsertState({ did, pdsEndpoint }); state = this.syncStorage.getState(did); // 2. Optionally refresh peer info (peerId + multiaddrs) if (state && this.shouldRefreshPeerInfo(state)) { try { const peerInfo = await this.peerDiscovery.discoverPeer(did); if (peerInfo) { // Detect PeerID changes (peer restarted with new identity) if (state.peerId && peerInfo.peerId && state.peerId !== peerInfo.peerId) { console.warn( `[replication] PeerID changed for ${did}: ${state.peerId} → ${peerInfo.peerId}`, ); } this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs); // Merge observed multiaddrs from active libp2p connections if (peerInfo.peerId) { const observed = this.networkService.getRemoteAddrs(peerInfo.peerId); if (observed.length > 0) { const merged = [...new Set([...peerInfo.multiaddrs, ...observed])]; if (merged.length > peerInfo.multiaddrs.length) { this.syncStorage.updatePeerInfo(did, peerInfo.peerId, merged); } } } } } catch { // Non-fatal: peer discovery is optional } } // 3. Fetch repo (with incremental sync if we have a previous rev) const since = state?.lastSyncRev ?? undefined; // 3a. Try libp2p peer-first sync if we have peer info let carBytes: Uint8Array | null = null; state = this.syncStorage.getState(did); // re-read for fresh peer info if (this.libp2p && state?.peerId && state.peerMultiaddrs && state.peerMultiaddrs.length > 0) { try { console.log(`[sync] ${did} — fetching repo from peer ${state.peerId.slice(0, 12)}… via libp2p`); carBytes = await fetchRepoFromPeer( this.libp2p, state.peerMultiaddrs, did, since, ); sourceType = "libp2p"; console.log(`[sync] ${did} — received ${(carBytes.length / 1024).toFixed(1)} KB from peer via libp2p`); } catch (err) { console.log( `[sync] ${did} — libp2p sync failed, falling back to HTTP: ${err instanceof Error ? err.message : String(err)}`, ); carBytes = null; } } // 3b. Fall back to HTTP PDS fetch if (!carBytes) { sourceType = "pds"; console.log(`[sync] ${did} — fetching repo from ${pdsEndpoint}${since ? ` (since: ${since.slice(0, 8)}…)` : ""}`); try { carBytes = await this.repoFetcher.fetchRepo( pdsEndpoint, did, since, ); } catch (sourceErr) { // On failure, clear cached peer info and trigger re-discovery this.syncStorage.clearPeerInfo(did); this.peerDiscovery.discoverPeer(did).then((peerInfo) => { if (peerInfo) { this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs); } }).catch(() => {}); const err = sourceErr instanceof Error ? sourceErr : new Error(String(sourceErr)); if (since) { // Retry full sync from source, then fall back to peers try { carBytes = await this.repoFetcher.fetchRepo(pdsEndpoint, did); } catch { sourceType = "peer_fallback"; carBytes = await this.fetchFromPeersOrThrow(did, since, err); } } else { sourceType = "peer_fallback"; carBytes = await this.fetchFromPeersOrThrow(did, undefined, err); } } } // Start sync event after source is determined this.emitProgress({ type: "sync:start", did, sourceType }); const syncEventId = this.syncStorage.startSyncEvent(did, sourceType, trigger); try { // 4. Parse CAR and store blocks const carSizeKb = (carBytes.length / 1024).toFixed(1); console.log(`[sync] ${did} — CAR received: ${carSizeKb} KB, parsing...`); this.emitProgress({ type: "sync:car-received", did, carBytes: carBytes.length }); const { root, blocks } = await readCarWithRoot(carBytes); await this.blockStore.putBlocks(blocks); // 5. Collect CID strings + sizes for DHT announcement + verification const cidStrs: string[] = []; const blockEntries: Array<{ cid: string; sizeBytes: number }> = []; const internalMap = ( blocks as unknown as { map: Map } ).map; if (internalMap) { for (const [cidStr, blockBytes] of internalMap.entries()) { cidStrs.push(cidStr); blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length }); } } console.log(`[sync] ${did} — stored ${blockEntries.length} blocks, verifying...`); // 5b. Track block CIDs with sizes for remote verification if (blockEntries.length > 0) { this.syncStorage.trackBlocksWithSize(did, blockEntries); } const totalBlockSizeKb = blockEntries.reduce((s, b) => s + b.sizeBytes, 0) / 1024; this.emitProgress({ type: "sync:blocks-stored", did, blocksStored: blockEntries.length, blocksSizeKb: Math.round(totalBlockSizeKb) }); // 6. Announce to DHT (fire-and-forget) this.networkService.provideBlocks(cidStrs).catch(() => {}); // 7. Verify local block availability const verification = await this.verifier.verifyBlockAvailability(cidStrs); if (verification.missing.length > 0) { console.warn( `Local verification: ${verification.missing.length}/${verification.checked} blocks missing for ${did}`, ); } this.syncStorage.updateVerifiedAt(did); this.emitProgress({ type: "sync:verified", did, missingBlocks: verification.missing.length }); // 8. Extract actual rev from the commit block const rootCidStr = root.toString(); let rev = rootCidStr; // fallback const commitBytes = internalMap?.get(rootCidStr); if (commitBytes) { try { const commitObj = cborDecode(commitBytes) as Record; if (typeof commitObj.rev === "string") { rev = commitObj.rev; } } catch { // If CBOR decode fails, fall back to root CID as rev } } // 9. Update sync state with both rev and root CID this.syncStorage.updateSyncProgress(did, rev, rootCidStr); // 9a. Publish gossipsub notification (fire-and-forget) this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {}); // 9b. Invalidate cached ReadableRepo so it reloads with new root this.replicatedRepoReader?.invalidateCache(did); // 9c. Track record paths for challenge generation (full MST walk) try { const recordPaths = await extractAllRecordPaths( this.blockStore, rootCidStr, ); this.syncStorage.clearRecordPaths(did); this.syncStorage.trackRecordPaths(did, recordPaths); // Update lexicon index with NSIDs from these paths const nsids = extractNsids(recordPaths); if (nsids.length > 0) { this.syncStorage.updateLexiconIndex(nsids); } } catch { // Non-fatal: path extraction is best-effort } // 9d. Reconcile blocks: remove orphaned blocks from this DID's tracking try { const liveCids = await extractAllCids(this.blockStore, rootCidStr); const trackedCids = this.syncStorage.getBlockCidSet(did); // Find CIDs that are tracked but no longer live const orphanedFromDid: string[] = []; for (const cid of trackedCids) { if (!liveCids.has(cid)) { orphanedFromDid.push(cid); } } if (orphanedFromDid.length > 0) { // Remove from this DID's tracking this.syncStorage.removeBlocks(did, orphanedFromDid); // Find which are truly orphaned (no other DID references them) const trulyOrphaned = this.syncStorage.findOrphanedCids(orphanedFromDid); // Delete from blockstore for (const cid of trulyOrphaned) { await this.blockStore.deleteBlock(cid); } } // Clear the needs_gc flag since we just did a full reconciliation this.syncStorage.clearNeedsGc(did); } catch { // Non-fatal: GC is best-effort } // 9e. Reconcile blobs: remove blobs for deleted records try { const currentBlobCids = new Set(); if (this.replicatedRepoReader) { const repo = await this.replicatedRepoReader.getRepo(did); if (repo) { for await (const entry of repo.walkRecords()) { const cids = extractBlobCids(entry.record); for (const cid of cids) { currentBlobCids.add(cid); } } } } const trackedBlobCids = this.syncStorage.getBlobCids(did); const orphanedBlobs = trackedBlobCids.filter(cid => !currentBlobCids.has(cid)); if (orphanedBlobs.length > 0) { this.syncStorage.removeBlobs(did, orphanedBlobs); const trulyOrphanedBlobs = this.syncStorage.findOrphanedBlobCids(orphanedBlobs); for (const cid of trulyOrphanedBlobs) { await this.blockStore.deleteBlock(cid); } } } catch { // Non-fatal: blob GC is best-effort } // 10. Update manifest record if (this.repoManager) { const rkey = didToRkey(did); const existingManifest = await this.repoManager.getRecord( MANIFEST_NSID, rkey, ); if (existingManifest) { const manifest: ManifestRecord = { $type: MANIFEST_NSID, subject: did, status: "active", lastSyncRev: rev, lastSyncAt: new Date().toISOString(), createdAt: (existingManifest.record as Record) ?.createdAt as string ?? new Date().toISOString(), }; await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); } } // 11. Sync blobs and capture result console.log(`[sync] ${did} — syncing blobs...`); const blobResult = await this.syncBlobs(did); // 12. Complete sync event with metrics const totalDuration = ((Date.now() - syncStart) / 1000).toFixed(1); const totalBlobKb = (blobResult.totalBytes / 1024).toFixed(1); console.log( `[sync] ${did} — complete: ${blockEntries.length} blocks, ${blobResult.fetched} blobs (${totalBlobKb} KB), ${totalDuration}s`, ); this.syncStorage.completeSyncEvent(syncEventId, { status: "success", blocksAdded: blockEntries.length, blobsAdded: blobResult.fetched, carBytes: carBytes.length, blobBytes: blobResult.totalBytes, durationMs: Date.now() - syncStart, rev, rootCid: rootCidStr, incremental: !!since, }); this.emitProgress({ type: "sync:complete", did, blocksStored: blockEntries.length, blobsFetched: blobResult.fetched, blobBytes: blobResult.totalBytes, durationMs: Date.now() - syncStart, }); } catch (err) { this.syncStorage.completeSyncEvent(syncEventId, { status: "error", errorMessage: err instanceof Error ? err.message : String(err), durationMs: Date.now() - syncStart, incremental: !!since, }); this.emitProgress({ type: "sync:error", did, error: err instanceof Error ? err.message : String(err), }); throw err; } } /** Grace period before purging tombstoned DID data (24 hours). */ private static readonly TOMBSTONE_GRACE_MS = 24 * 60 * 60 * 1000; /** * Clean up tombstoned DIDs: re-verify resolution, purge if still dead. * Only purges after a grace period (24 hours) to avoid premature deletion * from transient DID resolution failures. */ private async cleanupTombstonedDids(): Promise { const states = this.syncStorage.getAllStates(); const tombstoned = states.filter((s) => s.status === "tombstoned"); for (const state of tombstoned) { if (this.stopped) break; // Check grace period: only purge if tombstoned for > 24 hours const lastSyncAt = state.lastSyncAt ? new Date(state.lastSyncAt).getTime() : 0; const timeSinceLastSync = Date.now() - lastSyncAt; if (timeSinceLastSync < ReplicationManager.TOMBSTONE_GRACE_MS) { continue; } // Re-verify: try to resolve the DID again try { const pdsEndpoint = await this.repoFetcher.resolvePds(state.did); if (pdsEndpoint) { // DID is alive again! Un-tombstone and resume. this.syncStorage.updateStatus(state.did, "pending"); this.syncStorage.upsertState({ did: state.did, pdsEndpoint }); console.log(`[replication] Tombstoned DID ${state.did} is alive again, resuming`); this.syncDid(state.did, "tombstone-recovery").catch((err) => { const message = err instanceof Error ? err.message : String(err); this.syncStorage.updateStatus(state.did, "error", message); }); continue; } } catch { // Resolution failed, proceed with purge } // Still dead — purge data console.warn(`[replication] Purging data for tombstoned DID ${state.did}`); const { blocksRemoved, blobsRemoved } = this.syncStorage.purgeDidData(state.did); // Delete truly orphaned blocks/blobs from blockstore if (blocksRemoved.length > 0) { const orphanedBlocks = this.syncStorage.findOrphanedCids(blocksRemoved); for (const cid of orphanedBlocks) { await this.blockStore.deleteBlock(cid); } } if (blobsRemoved.length > 0) { const orphanedBlobs = this.syncStorage.findOrphanedBlobCids(blobsRemoved); for (const cid of orphanedBlobs) { await this.blockStore.deleteBlock(cid); } } // Remove manifest record if (this.repoManager) { const rkey = didToRkey(state.did); try { await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); } catch { // Non-fatal } } } } /** * Run deferred GC for DIDs that were flagged by firehose delete/update ops. * Triggers a full sync for each flagged DID, which includes block/blob reconciliation. */ private async runDeferredGc(): Promise { const didsNeedingGc = this.syncStorage.getDidsNeedingGc(); for (const did of didsNeedingGc) { if (this.stopped) break; try { await this.syncDid(did, "gc"); this.lastSyncTimestamps.set(did, Date.now()); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.syncStorage.updateStatus(did, "error", message); } } } /** * Compute the tick interval for periodic sync. * * When a PolicyEngine is present, uses the minimum sync interval * across all DIDs so that no DID misses its window. Each DID's * individual interval is checked inside syncAll(). * * Falls back to the provided intervalMs (default 5 minutes). */ private computeTickIntervalMs(fallbackMs: number): number { if (!this.policyEngine) return fallbackMs; const dids = this.getReplicateDids(); if (dids.length === 0) return fallbackMs; let minInterval = Infinity; for (const did of dids) { const config = this.policyEngine.getReplicationConfig(did); const intervalMs = config ? config.sync.intervalSec * 1000 : DEFAULT_SYNC_INTERVAL_MS; if (intervalMs < minInterval) { minInterval = intervalMs; } } // Clamp to minimum tick interval to prevent excessive polling return Math.max( minInterval === Infinity ? fallbackMs : minInterval, MIN_TICK_INTERVAL_MS, ); } /** * Start periodic sync and verification at their respective intervals. * * When a PolicyEngine is present, the tick rate is derived from the * minimum sync interval across all DIDs. Each tick, syncAll() checks * which DIDs are actually due for sync based on their individual intervals. */ startPeriodicSync(intervalMs: number = 5 * 60 * 1000): void { if (this.syncTimer) return; this.stopped = false; const tickMs = this.computeTickIntervalMs(intervalMs); // Run first sync after a short delay to let startup complete setTimeout(() => { if (!this.stopped) { this.syncAll().catch((err) => { console.error("Periodic sync error:", err); }); } }, 5000); this.syncTimer = setInterval(() => { if (!this.stopped) { this.syncAll().catch((err) => { console.error("Periodic sync error:", err); }); } }, tickMs); // Start periodic PLC mirror refresh (6 hours) const PLC_REFRESH_MS = 6 * 60 * 60 * 1000; this.plcRefreshTimer = setInterval(() => { if (!this.stopped) { this.refreshAllPlcLogs().catch((err) => { console.error("[replication] PLC mirror refresh error:", err); }); } }, PLC_REFRESH_MS); // Initial PLC log fetch for all tracked did:plc DIDs (fire-and-forget) this.refreshAllPlcLogs().catch((err) => { console.error("[replication] Initial PLC mirror fetch error:", err); }); // Build lexicon index from existing record paths try { this.syncStorage.rebuildLexiconIndex(); } catch (err) { console.error("[replication] Lexicon index rebuild error:", err); } // Run verification once on startup, then on a timer this.runVerification().catch((err) => { console.error("Initial verification error:", err); }); this.verificationTimer = setInterval(() => { if (!this.stopped) { this.runVerification().catch((err) => { console.error("Periodic verification error:", err); }); } }, this.verificationConfig.verificationIntervalMs); } /** * Start the firehose subscription for real-time updates. * The firehose provides streaming updates for serviced DIDs, * complementing the periodic polling sync. */ startFirehose(): void { if (this.firehoseSubscription) return; if (!this.config.FIREHOSE_ENABLED) return; const replicateDids = this.getReplicateDids(); if (replicateDids.length === 0) return; this.firehoseSubscription = new FirehoseSubscription({ firehoseUrl: this.config.FIREHOSE_URL, }); // Register handler for commit events this.firehoseSubscription.onCommit((event) => { this.handleFirehoseCommit(event).catch((err) => { console.error(`[replication] Firehose commit handler error for ${event.repo}:`, err); }); }); // Register handler for account status events (tombstone/deactivation) this.firehoseSubscription.onAccount((event) => { this.handleFirehoseAccount(event).catch((err) => { console.error(`[replication] Firehose account handler error for ${event.did}:`, err); }); }); // Load saved cursor for resumption const savedCursor = this.syncStorage.getFirehoseCursor(); // Start with the merged set of DIDs const dids = new Set(replicateDids); this.firehoseSubscription.start(dids, savedCursor); // Periodically save the cursor to SQLite (every 30 seconds) this.firehoseCursorSaveTimer = setInterval(() => { this.saveFirehoseCursor(); }, 30_000); console.log( `[replication] Firehose subscription started` + (savedCursor !== null ? ` (resuming from cursor ${savedCursor})` : "") + ` — tracking ${dids.size} DIDs`, ); } /** * Handle a commit event from the firehose. * * Optimization: the firehose event already contains the blocks (as CAR bytes) * and the commit rev/CID. We apply them directly to our blockstore, avoiding * an HTTP round-trip to the source PDS. * * Falls back to full syncDid() if incremental apply fails (e.g., tooBig event, * empty blocks, gap in sequence, or CAR parsing error). */ private async handleFirehoseCommit(event: FirehoseCommitEvent): Promise { const did = event.repo; // Only process if this DID is one we are tracking const replicateDids = this.getReplicateDids(); if (!replicateDids.includes(did)) return; // Determine if we should attempt incremental apply or fall back to full sync. // Fall back when: // - tooBig is set (blocks were too large to include in the event) // - rebase occurred (repo structure changed, need full sync) // - blocks are empty (nothing to apply) if (event.tooBig || event.rebase || event.blocks.length === 0) { try { await this.syncDid(did, "firehose-resync"); this.lastSyncTimestamps.set(did, Date.now()); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.syncStorage.updateStatus(did, "error", message); } return; } // Check for sequence gaps: if we have a `since` (previous rev) in the event // and it doesn't match our last sync rev, we may have missed events. const state = this.syncStorage.getState(did); if (event.since !== null && state !== null && state.lastSyncRev !== null) { if (state.lastSyncRev !== event.since) { // Gap detected: our last known rev doesn't match the event's `since`. // Fall back to full sync to catch up. console.warn( `[replication] Gap detected for ${did}: local rev=${state.lastSyncRev}, event.since=${event.since}. Falling back to full sync.`, ); try { await this.syncDid(did, "firehose-resync"); this.lastSyncTimestamps.set(did, Date.now()); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.syncStorage.updateStatus(did, "error", message); } return; } } // Attempt incremental block application from firehose event try { await this.applyFirehoseBlocks(did, event); this.lastSyncTimestamps.set(did, Date.now()); } catch (err) { // Incremental apply failed — fall back to full sync console.warn( `[replication] Incremental apply failed for ${did}, falling back to full sync:`, err instanceof Error ? err.message : String(err), ); try { await this.syncDid(did, "firehose-resync"); this.lastSyncTimestamps.set(did, Date.now()); } catch (syncErr) { const message = syncErr instanceof Error ? syncErr.message : String(syncErr); this.syncStorage.updateStatus(did, "error", message); } } } /** * Handle an account status event from the firehose. * Detects tombstoned/deactivated accounts and marks them accordingly. * Handles re-activation by clearing tombstone and triggering sync. */ private async handleFirehoseAccount(event: FirehoseAccountEvent): Promise { const did = event.did; // Only process DIDs we are tracking const replicateDids = this.getReplicateDids(); if (!replicateDids.includes(did)) return; if (!event.active || event.status === "deleted" || event.status === "takendown") { // Refresh PLC log to capture tombstone operation if (did.startsWith("did:plc:")) { this.fetchPlcLog(did).catch((err) => { console.warn(`[replication] PLC log refresh on tombstone for ${did} failed:`, err instanceof Error ? err.message : String(err)); }); } // Mark as tombstoned this.syncStorage.markTombstoned(did); // Update manifest to paused if (this.repoManager) { const rkey = didToRkey(did); const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); if (existing) { const manifest: ManifestRecord = { ...(existing.record as ManifestRecord), status: "paused", }; await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); } } console.warn( `[replication] Account ${did} deactivated/tombstoned (status=${event.status}, active=${event.active})`, ); } else if (event.active) { // Re-activated: clear tombstone and trigger full sync const state = this.syncStorage.getState(did); if (state?.status === "tombstoned") { this.syncStorage.updateStatus(did, "pending"); console.log(`[replication] Account ${did} re-activated, triggering sync`); this.syncDid(did, "tombstone-recovery").catch((err) => { const message = err instanceof Error ? err.message : String(err); this.syncStorage.updateStatus(did, "error", message); }); // Refresh PLC log on reactivation if (did.startsWith("did:plc:")) { this.fetchPlcLog(did).catch((err) => { console.warn(`[replication] PLC log refresh on reactivation for ${did} failed:`, err instanceof Error ? err.message : String(err)); }); } } } } /** * Apply blocks from a firehose commit event directly to the blockstore. * The event's `blocks` field contains CAR-encoded bytes that can be parsed * and stored without fetching from the source PDS. */ private async applyFirehoseBlocks( did: string, event: FirehoseCommitEvent, ): Promise { const syncStart = Date.now(); const syncEventId = this.syncStorage.startSyncEvent(did, "firehose", "firehose"); try { // 1. Parse the CAR bytes from the firehose event const { root, blocks } = await readCarWithRoot(event.blocks); // 2. Store blocks in our blockstore await this.blockStore.putBlocks(blocks); // 3. Collect CID strings + sizes for DHT announcement + block tracking const cidStrs: string[] = []; const blockEntries: Array<{ cid: string; sizeBytes: number }> = []; const internalMap = ( blocks as unknown as { map: Map } ).map; if (internalMap) { for (const [cidStr, blockBytes] of internalMap.entries()) { cidStrs.push(cidStr); blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length }); } } // 4. Track block CIDs with sizes if (blockEntries.length > 0) { this.syncStorage.trackBlocksWithSize(did, blockEntries); } const fhBlockSizeKb = blockEntries.reduce((s, b) => s + b.sizeBytes, 0) / 1024; this.emitProgress({ type: "sync:blocks-stored", did, blocksStored: blockEntries.length, blocksSizeKb: Math.round(fhBlockSizeKb) }); // 5. Announce to DHT (fire-and-forget) this.networkService.provideBlocks(cidStrs).catch(() => {}); // 6. Determine rev and root CID const rootCidStr = root.toString(); let rev = event.rev; // Use the rev directly from the firehose event // If no rev in the event, try to extract from commit block if (!rev) { const commitBytes = internalMap?.get(rootCidStr); if (commitBytes) { try { const commitObj = cborDecode(commitBytes) as Record; if (typeof commitObj.rev === "string") { rev = commitObj.rev; } } catch { // Fall back to root CID as rev rev = rootCidStr; } } else { rev = rootCidStr; } } // 7. Update sync state this.syncStorage.updateSyncProgress(did, rev, rootCidStr); // 7a. Publish gossipsub notification (fire-and-forget) this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {}); // 8. Invalidate cached ReadableRepo so it reloads with new root this.replicatedRepoReader?.invalidateCache(did); // 8b. Track record paths incrementally from firehose ops try { const createdPaths = event.ops .filter((op) => op.action === "create" || op.action === "update") .map((op) => op.path); const deletedPaths = event.ops .filter((op) => op.action === "delete") .map((op) => op.path); if (createdPaths.length > 0) { this.syncStorage.trackRecordPaths(did, createdPaths); // Update lexicon index with NSIDs from created paths const nsids = extractNsids(createdPaths); if (nsids.length > 0) { this.syncStorage.updateLexiconIndex(nsids); } } if (deletedPaths.length > 0) { this.syncStorage.removeRecordPaths(did, deletedPaths); } // Flag for deferred GC if deletes or updates occurred // (updates may orphan old MST nodes/record blocks) const hasDeletesOrUpdates = event.ops.some( (op) => op.action === "delete" || op.action === "update", ); if (hasDeletesOrUpdates) { this.syncStorage.setNeedsGc(did); } } catch { // Non-fatal: path tracking is best-effort } // 9. Update manifest record if (this.repoManager) { const rkey = didToRkey(did); const existingManifest = await this.repoManager.getRecord( MANIFEST_NSID, rkey, ); if (existingManifest) { const manifest: ManifestRecord = { $type: MANIFEST_NSID, subject: did, status: "active", lastSyncRev: rev, lastSyncAt: new Date().toISOString(), createdAt: (existingManifest.record as Record) ?.createdAt as string ?? new Date().toISOString(), }; await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); } } // 10. Sync blobs for firehose ops (fire-and-forget) const pdsEndpoint = this.syncStorage.getState(did)?.pdsEndpoint; if (pdsEndpoint) { this.syncBlobsForOps(did, pdsEndpoint, event.ops).catch((err) => { console.warn( `[replication] Blob sync error for firehose ops (${did}):`, err instanceof Error ? err.message : String(err), ); }); } // 11. Complete sync event this.syncStorage.completeSyncEvent(syncEventId, { status: "success", blocksAdded: blockEntries.length, carBytes: event.blocks.length, durationMs: Date.now() - syncStart, rev, rootCid: rootCidStr, incremental: true, }); this.emitProgress({ type: "sync:complete", did, blocksStored: blockEntries.length, durationMs: Date.now() - syncStart, }); } catch (err) { this.syncStorage.completeSyncEvent(syncEventId, { status: "error", errorMessage: err instanceof Error ? err.message : String(err), durationMs: Date.now() - syncStart, incremental: true, }); this.emitProgress({ type: "sync:error", did, error: err instanceof Error ? err.message : String(err), }); throw err; } } /** * Save the current firehose cursor to persistent storage. */ private saveFirehoseCursor(): void { if (!this.firehoseSubscription) return; const cursor = this.firehoseSubscription.getCursor(); if (cursor !== null) { this.syncStorage.saveFirehoseCursor(cursor); } } /** * Get firehose subscription statistics. */ getFirehoseStats(): { connected: boolean; cursor: number | null; eventsReceived: number; eventsProcessed: number; trackedDids: number; } | null { if (!this.firehoseSubscription) return null; return this.firehoseSubscription.getStats(); } /** * Set up gossipsub subscription for commit notifications. * Subscribes to topics for all tracked DIDs and registers a handler * that triggers syncDid() when a newer rev is received. */ private setupGossipsubSubscription(): void { const dids = this.getReplicateDids(); if (dids.length === 0) return; this.networkService.onCommitNotification((notification) => { this.handleGossipsubNotification(notification).catch((err) => { console.error( `[replication] Gossipsub notification handler error for ${notification.did}:`, err instanceof Error ? err.message : String(err), ); }); }); this.networkService.onIdentityNotification((notification) => { this.handleIdentityNotification(notification); }); this.networkService.subscribeCommitTopics(dids); this.networkService.subscribeIdentityTopics(dids); // Periodically clear the dedup set to prevent unbounded growth this.notificationCleanupTimer = setInterval(() => { this.recentNotifications.clear(); }, 60_000); } /** * Handle an incoming gossipsub commit notification. * Deduplicates by did:rev, skips if local state is already current, * and triggers syncDid() if the notification represents a newer commit. */ private async handleGossipsubNotification(notification: CommitNotification): Promise { const did = notification.did; // Only process DIDs we are tracking const trackedDids = this.getReplicateDids(); if (!trackedDids.includes(did)) return; // Dedup: skip if we've already processed this did:rev const dedupKey = `${did}:${notification.rev}`; if (this.recentNotifications.has(dedupKey)) return; this.recentNotifications.add(dedupKey); // Skip if local state already matches this rev const state = this.syncStorage.getState(did); if (state?.lastSyncRev === notification.rev) return; try { await this.syncDid(did, "gossipsub"); this.lastSyncTimestamps.set(did, Date.now()); } catch (err) { const message = err instanceof Error ? err.message : String(err); this.syncStorage.updateStatus(did, "error", message); } } /** * Handle an incoming gossipsub identity notification. * Updates peer info immediately if the DID is tracked. */ private handleIdentityNotification(notification: IdentityNotification): void { const did = notification.did; // Only process DIDs we are tracking const trackedDids = this.getReplicateDids(); if (!trackedDids.includes(did)) return; // Log PeerID changes const state = this.syncStorage.getState(did); if (state?.peerId && state.peerId !== notification.peerId) { console.warn( `[replication] Identity notification: PeerID changed for ${did}: ${state.peerId} → ${notification.peerId}`, ); } // Update peer info immediately this.syncStorage.updatePeerInfo(did, notification.peerId, notification.multiaddrs); // Refresh PLC log on identity change (key rotation, PDS migration, etc.) if (did.startsWith("did:plc:")) { this.fetchPlcLog(did).catch((err) => { console.warn(`[replication] PLC log refresh on identity change for ${did} failed:`, err instanceof Error ? err.message : String(err)); }); } } /** * Stop periodic sync, verification, and firehose subscription. */ stop(): void { this.stopped = true; if (this.syncTimer) { clearInterval(this.syncTimer); this.syncTimer = null; } if (this.verificationTimer) { clearInterval(this.verificationTimer); this.verificationTimer = null; } // Save cursor and stop firehose if (this.firehoseCursorSaveTimer) { clearInterval(this.firehoseCursorSaveTimer); this.firehoseCursorSaveTimer = null; } if (this.firehoseSubscription) { this.saveFirehoseCursor(); this.firehoseSubscription.stop(); this.firehoseSubscription = null; } // Stop PLC mirror refresh if (this.plcRefreshTimer) { clearInterval(this.plcRefreshTimer); this.plcRefreshTimer = null; } // Stop challenge scheduler if (this.challengeScheduler) { this.challengeScheduler.stop(); this.challengeScheduler = null; } // Stop gossipsub notification cleanup and unsubscribe identity topics if (this.notificationCleanupTimer) { clearInterval(this.notificationCleanupTimer); this.notificationCleanupTimer = null; } try { this.networkService.unsubscribeIdentityTopics(this.getReplicateDids()); } catch { // Gossipsub streams may already be closed during shutdown } } /** * Run remote verification for all synced DIDs. */ async runVerification(): Promise { const results: LayeredVerificationResult[] = []; for (const did of this.getReplicateDids()) { if (this.stopped) break; try { const result = await this.verifyDid(did); if (result) { results.push(result); this.lastVerificationResults.set(did, result); } } catch (err) { console.error(`Verification error for ${did}:`, err); } } return results; } /** * Run layered verification for a single DID against its source PDS. */ async verifyDid(did: string): Promise { const state = this.syncStorage.getState(did); if (!state || !state.pdsEndpoint) return null; // Get tracked block CIDs for this DID const blockCids = this.syncStorage.getBlockCids(did); // Use the root CID (commit root) for verification const rootCid = state.rootCid ?? state.lastSyncRev ?? null; // Get tracked record paths for challenge generation const recordPaths = this.syncStorage.getRecordPaths(did); const result = await this.remoteVerifier.verifyPeer( did, state.pdsEndpoint, rootCid, blockCids, recordPaths, ); if (result.overallPassed) { this.syncStorage.updateVerifiedAt(did); } return result; } /** * Get the underlying SyncStorage instance. */ getSyncStorage(): SyncStorage { return this.syncStorage; } /** * Get the underlying ChallengeStorage instance. */ getChallengeStorage(): ChallengeStorage { return this.challengeStorage; } /** * Set the replicated repo reader for cache invalidation after sync. */ setReplicatedRepoReader(reader: ReplicatedRepoReader): void { this.replicatedRepoReader = reader; } /** * Get sync states for all tracked DIDs. */ getSyncStates(): SyncState[] { return this.syncStorage.getAllStates(); } /** * Query the lexicon index with optional prefix filter. */ getLexiconIndex(prefix?: string, limit?: number) { return this.syncStorage.getLexiconIndex(prefix, limit); } /** * Get aggregate lexicon stats. */ getLexiconStats() { return this.syncStorage.getLexiconStats(); } /** * Get all offered DIDs (awaiting mutual consent). */ getOfferedDids(): Array<{ did: string; pdsEndpoint: string | null; offeredAt: string }> { return this.syncStorage.getOfferedDids(); } /** * Get the most recent verification results. */ getVerificationResults(): Map { return this.lastVerificationResults; } // ============================================ // Blob replication // ============================================ /** * Sync blobs for a DID: walk all records, extract blob CIDs, fetch new ones. */ async syncBlobs(did: string): Promise<{ fetched: number; skipped: number; errors: number; totalBytes: number }> { const state = this.syncStorage.getState(did); if (!state?.pdsEndpoint || !state.rootCid) { return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; } if (!this.replicatedRepoReader) { return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; } // Walk all records and collect blob CIDs const allBlobCids = new Set(); const repo = await this.replicatedRepoReader.getRepo(did); if (!repo) { return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; } for await (const entry of repo.walkRecords()) { const cids = extractBlobCids(entry.record); for (const cid of cids) { allBlobCids.add(cid); } } // Filter to blobs not already fetched let fetched = 0; let skipped = 0; let errors = 0; let totalBytes = 0; const total = allBlobCids.size; for (const blobCid of allBlobCids) { if (this.syncStorage.hasBlobCid(did, blobCid)) { skipped++; continue; } try { const bytes = await this.repoFetcher.fetchBlob( state.pdsEndpoint, did, blobCid, ); if (!bytes) { skipped++; // 404 — blob deleted upstream continue; } if (!await this.verifyBlobCid(blobCid, bytes)) { console.warn(`[replication] Blob CID mismatch for ${blobCid} (${did})`); errors++; continue; } await this.blockStore.putBlock(blobCid, bytes); this.syncStorage.trackBlobsWithSize(did, [{ cid: blobCid, sizeBytes: bytes.length }]); this.networkService.provideBlocks([blobCid]).catch(() => {}); fetched++; totalBytes += bytes.length; this.emitProgress({ type: "sync:blob-progress", did, blobsFetched: fetched, blobsTotal: total, blobBytes: totalBytes }); } catch (err) { console.warn( `[replication] Failed to fetch blob ${blobCid} for ${did}:`, err instanceof Error ? err.message : String(err), ); errors++; } } return { fetched, skipped, errors, totalBytes }; } /** * Sync blobs for specific firehose ops (create/update only). */ private async syncBlobsForOps( did: string, pdsEndpoint: string, ops: Array<{ action: string; path: string }>, ): Promise { if (!this.replicatedRepoReader) return; for (const op of ops) { if (op.action !== "create" && op.action !== "update") continue; const parts = op.path.split("/"); if (parts.length < 2) continue; const collection = parts.slice(0, -1).join("/"); const rkey = parts[parts.length - 1]!; try { const record = await this.replicatedRepoReader.getRecord(did, collection, rkey); if (!record) continue; const blobCids = extractBlobCids(record.value); for (const blobCid of blobCids) { if (this.syncStorage.hasBlobCid(did, blobCid)) continue; const bytes = await this.repoFetcher.fetchBlob(pdsEndpoint, did, blobCid); if (!bytes) continue; if (!await this.verifyBlobCid(blobCid, bytes)) { console.warn(`[replication] Blob CID mismatch for ${blobCid} (${did})`); continue; } await this.blockStore.putBlock(blobCid, bytes); this.syncStorage.trackBlobs(did, [blobCid]); this.networkService.provideBlocks([blobCid]).catch(() => {}); } } catch (err) { console.warn( `[replication] Failed to sync blobs for op ${op.path} (${did}):`, err instanceof Error ? err.message : String(err), ); } } } /** * Verify that blob bytes match the expected CID. */ private async verifyBlobCid(expectedCid: string, bytes: Uint8Array): Promise { const actualCid = await createCid(CODEC_RAW, bytes); return cidToString(actualCid) === expectedCid; } // ============================================ // Challenge-response verification // ============================================ /** * Start the challenge scheduler for periodic proof-of-storage verification. * Requires a ChallengeTransport to communicate with peers. */ startChallengeScheduler(transport: ChallengeTransport): void { if (this.challengeScheduler) return; this.challengeScheduler = new ChallengeScheduler( this.config.DID ?? "", this.policyEngine, this.syncStorage, this.challengeStorage, this.blockStore, transport, ); this.challengeScheduler.start(); } /** * Get challenge history for a target peer. */ getChallengeResults( targetDid: string, subjectDid?: string, ): ChallengeHistoryRow[] { return this.challengeStorage.getHistory(targetDid, subjectDid); } /** * Get peer reliability scores. * If peerDid is provided, returns reliability for that specific peer. * Otherwise, returns all peer reliability scores. */ getPeerReliability(peerDid?: string): PeerReliabilityRow[] { if (peerDid) { return this.challengeStorage.getReliability(peerDid); } return this.challengeStorage.getAllReliability(); } /** * Refresh peer info for all DIDs associated with a PDS endpoint. * Called when a connection to that endpoint fails (e.g. challenge transport fallback). * Fire-and-forget: clears stale cache and triggers re-discovery. */ refreshPeerInfoForEndpoint(pdsEndpoint: string): void { const states = this.syncStorage.getAllStates(); for (const state of states) { if (state.pdsEndpoint === pdsEndpoint) { this.syncStorage.clearPeerInfo(state.did); this.peerDiscovery.discoverPeer(state.did).then((peerInfo) => { if (peerInfo) { this.syncStorage.updatePeerInfo(state.did, peerInfo.peerId, peerInfo.multiaddrs); } }).catch(() => {}); } } } // ============================================ // PLC mirror // ============================================ /** * Fetch and validate the PLC audit log for a single DID. */ private async fetchPlcLog(did: string): Promise { const { fetchAndValidateLog } = await import("../identity/plc-mirror.js"); await fetchAndValidateLog(this.db, did); // Announce via DHT that we hold this DID's PLC log this.networkService.provideForDid(did).catch(() => {}); } /** * Refresh PLC logs for all tracked did:plc DIDs. */ private async refreshAllPlcLogs(): Promise { const { getAllPlcMirrorDids, fetchAndValidateLog, publishPlcProviders } = await import("../identity/plc-mirror.js"); // Refresh existing mirrored DIDs const mirroredDids = getAllPlcMirrorDids(this.db); for (const did of mirroredDids) { if (this.stopped) break; try { await fetchAndValidateLog(this.db, did); } catch (err) { console.warn( `[replication] PLC log refresh for ${did} failed:`, err instanceof Error ? err.message : String(err), ); // Fallback: try discovering PLC log from DHT peers await this.fetchPlcLogFromPeers(did).catch(() => {}); } } // Also fetch for any tracked did:plc DIDs that don't have a mirror entry yet const trackedDids = this.getReplicateDids().filter((d) => d.startsWith("did:plc:")); const mirroredSet = new Set(mirroredDids); for (const did of trackedDids) { if (this.stopped) break; if (mirroredSet.has(did)) continue; try { await fetchAndValidateLog(this.db, did); } catch (err) { console.warn( `[replication] PLC log fetch for ${did} failed:`, err instanceof Error ? err.message : String(err), ); // Fallback: try discovering PLC log from DHT peers await this.fetchPlcLogFromPeers(did).catch(() => {}); } } // Re-announce all mirrored DIDs to the DHT const allMirrored = getAllPlcMirrorDids(this.db); if (allMirrored.length > 0) { const helia = this.networkService as unknown as { getLibp2p?(): unknown }; if (typeof helia.getLibp2p === "function" && helia.getLibp2p()) { // Use the IpfsService routing directly via provideForDid for (const did of allMirrored) { this.networkService.provideForDid(did).catch(() => {}); } } } } /** * Fallback: discover PLC log providers via DHT and fetch from a peer. * Used when plc.directory is unavailable. */ private async fetchPlcLogFromPeers(did: string): Promise { const providers = await this.networkService.findProvidersForDid(did); if (providers.length === 0) return; const { validateOperationChain, getStoredLog } = await import("../identity/plc-mirror.js"); for (const multiaddr of providers) { try { // Extract the host:port from the multiaddr for an HTTP fetch const hostMatch = multiaddr.match(/\/ip[46]\/([^/]+)\/tcp\/(\d+)/); if (!hostMatch) continue; const [, host, port] = hostMatch; const url = `http://${host}:${port}/xrpc/org.p2pds.plc.getLog?did=${encodeURIComponent(did)}`; const res = await fetch(url, { signal: AbortSignal.timeout(10000) }); if (!res.ok) continue; const data = await res.json() as { operations?: unknown[]; status?: { validated?: boolean } }; if (!data.operations || !Array.isArray(data.operations)) continue; // Validate the returned chain independently const validation = await validateOperationChain(data.operations as any, did); if (!validation.valid) continue; // Store it — we verified it ourselves const existingLog = getStoredLog(this.db, did); const newOpCount = data.operations.length; // Accept if we don't have it yet, or if the new chain is longer if (!existingLog || newOpCount > existingLog.status.opCount) { const isTombstoned = (data.operations as any[]).some( (op: any) => !op.nullified && op.operation?.type === "plc_tombstone", ); const lastOpCreatedAt = newOpCount > 0 ? (data.operations as any[])[newOpCount - 1]!.createdAt ?? null : null; this.db.prepare( `INSERT INTO plc_mirror (did, operations_json, op_count, last_fetched_at, last_op_created_at, validated, is_tombstoned) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(did) DO UPDATE SET operations_json = excluded.operations_json, op_count = excluded.op_count, last_fetched_at = excluded.last_fetched_at, last_op_created_at = excluded.last_op_created_at, validated = excluded.validated, is_tombstoned = excluded.is_tombstoned`, ).run( did, JSON.stringify(data.operations), newOpCount, new Date().toISOString(), lastOpCreatedAt, validation.valid ? 1 : 0, isTombstoned ? 1 : 0, ); console.log(`[replication] PLC log for ${did} fetched from peer (${newOpCount} ops)`); return; } } catch { // Try next provider } } } /** * Get PLC log status for a single DID. */ getPlcLogStatus(did: string): { opCount: number; lastFetchedAt: string; lastOpCreatedAt: string | null; validated: boolean; isTombstoned: boolean } | null { const row = this.db .prepare("SELECT * FROM plc_mirror WHERE did = ?") .get(did) as Record | undefined; if (!row) return null; return { opCount: row.op_count as number, lastFetchedAt: row.last_fetched_at as string, lastOpCreatedAt: (row.last_op_created_at as string) ?? null, validated: (row.validated as number) === 1, isTombstoned: (row.is_tombstoned as number) === 1, }; } /** * Get aggregate PLC mirror stats. */ getPlcMirrorStatus(): { mirroredDids: number; totalOps: number; lastRefresh: string | null } { const countRow = this.db .prepare("SELECT COUNT(*) as count, COALESCE(SUM(op_count), 0) as total_ops FROM plc_mirror") .get() as { count: number; total_ops: number }; const lastRow = this.db .prepare("SELECT MAX(last_fetched_at) as last_refresh FROM plc_mirror") .get() as { last_refresh: string | null }; return { mirroredDids: countRow.count, totalOps: countRow.total_ops, lastRefresh: lastRow.last_refresh, }; } /** * Check if peer info should be refreshed based on TTL. */ private shouldRefreshPeerInfo(state: SyncState): boolean { if (!state.peerInfoFetchedAt) return true; const fetchedAt = new Date(state.peerInfoFetchedAt).getTime(); return Date.now() - fetchedAt > PEER_INFO_TTL_MS; } }