atproto user agency toolkit for individuals and groups
1/**
2 * Main replication orchestrator.
3 * Publishes manifest records, syncs remote repos to IPFS.
4 * Optionally driven by a PolicyEngine for per-DID intervals, priority, and filtering.
5 */
6
7import type Database from "better-sqlite3";
8import type { Config } from "../config.js";
9import type { RepoManager } from "../repo-manager.js";
10import { extractBlobCids } from "../repo-manager.js";
11import { create as createCid, CODEC_RAW, toString as cidToString } from "@atcute/cid";
12import type { BlockStore, NetworkService, CommitNotification, IdentityNotification } from "../ipfs.js";
13import type { DidResolver } from "../did-resolver.js";
14import { readCarWithRoot } from "@atproto/repo";
15import { decode as cborDecode } from "../cbor-compat.js";
16import type { ReplicatedRepoReader } from "./replicated-repo-reader.js";
17import type { PolicyEngine } from "../policy/engine.js";
18
19import {
20 MANIFEST_NSID,
21 didToRkey,
22 type ManifestRecord,
23 type SyncState,
24 type SyncTrigger,
25 type VerificationConfig,
26 type LayeredVerificationResult,
27 DEFAULT_VERIFICATION_CONFIG,
28} from "./types.js";
29import { SyncStorage } from "./sync-storage.js";
30import { RepoFetcher } from "./repo-fetcher.js";
31import { PeerDiscovery } from "./peer-discovery.js";
32import { BlockVerifier, RemoteVerifier } from "./verification.js";
33import { extractAllRecordPaths, extractAllCids } from "./mst-proof.js";
34import {
35 FirehoseSubscription,
36 type FirehoseCommitEvent,
37 type FirehoseAccountEvent,
38} from "./firehose-subscription.js";
39import { ChallengeScheduler } from "./challenge-response/challenge-scheduler.js";
40import { ChallengeStorage, type ChallengeHistoryRow, type PeerReliabilityRow } from "./challenge-response/challenge-storage.js";
41import type { ChallengeTransport } from "./challenge-response/transport.js";
42import { OfferManager, type RecordWriter } from "./offer-manager.js";
43import { fetchRepoFromPeer } from "./libp2p-sync.js";
44import type { Libp2p } from "@libp2p/interface";
45
46/** Progress event emitted during sync operations for live UI updates. */
47export interface SyncProgressEvent {
48 type: string;
49 did: string;
50 sourceType?: string;
51 carBytes?: number;
52 blocksStored?: number;
53 blocksSizeKb?: number;
54 blobsFetched?: number;
55 blobsTotal?: number;
56 blobBytes?: number;
57 durationMs?: number;
58 error?: string;
59 missingBlocks?: number;
60}
61
62/** Extract unique NSIDs from record paths (collection/rkey format). */
63function extractNsids(paths: string[]): string[] {
64 const nsids = new Set<string>();
65 for (const p of paths) {
66 const slash = p.indexOf("/");
67 if (slash > 0) nsids.add(p.slice(0, slash));
68 }
69 return [...nsids];
70}
71
72/** How old cached peer info can be before re-fetching (1 hour). */
73const PEER_INFO_TTL_MS = 60 * 60 * 1000;
74
75/** Default sync interval when no policy engine is present (5 minutes). */
76const DEFAULT_SYNC_INTERVAL_MS = 5 * 60 * 1000;
77
78/** Minimum tick interval to prevent excessive polling (10 seconds). */
79const MIN_TICK_INTERVAL_MS = 10 * 1000;
80
81export class ReplicationManager {
82 private syncStorage: SyncStorage;
83 private repoFetcher: RepoFetcher;
84 private peerDiscovery: PeerDiscovery;
85 private verifier: BlockVerifier;
86 private remoteVerifier: RemoteVerifier;
87 private syncTimer: ReturnType<typeof setInterval> | null = null;
88 private verificationTimer: ReturnType<typeof setInterval> | null = null;
89 private verificationConfig: VerificationConfig;
90 private lastVerificationResults: Map<string, LayeredVerificationResult> =
91 new Map();
92 private firehoseSubscription: FirehoseSubscription | null = null;
93 private firehoseCursorSaveTimer: ReturnType<typeof setInterval> | null = null;
94 private challengeStorage: ChallengeStorage;
95 private challengeScheduler: ChallengeScheduler | null = null;
96 private stopped = false;
97 private policyEngine: PolicyEngine | null = null;
98 private offerManager: OfferManager | null = null;
99 /** Per-DID last-sync timestamps (epoch ms) for policy-driven interval tracking. */
100 private lastSyncTimestamps: Map<string, number> = new Map();
101 /** Dedup set for gossipsub notifications, keyed by `${did}:${rev}`. */
102 private recentNotifications: Set<string> = new Set();
103 private notificationCleanupTimer: ReturnType<typeof setInterval> | null = null;
104 /** libp2p node for peer-first repo sync. Set via setLibp2p(). */
105 private libp2p: Libp2p | null = null;
106 /** Cached multiaddrs from last peer record publish, for change detection. */
107 private lastPublishedAddrs: Set<string> = new Set();
108 /** Callback to republish peer record when multiaddrs change. */
109 private publishPeerRecordFn?: () => Promise<void>;
110 /** Sync progress event subscribers for live UI updates via SSE. */
111 private progressCallbacks: Set<(event: SyncProgressEvent) => void> = new Set();
112 /** Timer for periodic PLC mirror refresh (6 hours). */
113 private plcRefreshTimer: ReturnType<typeof setInterval> | null = null;
114
115 constructor(
116 private db: Database.Database,
117 private config: Config,
118 private repoManager: RepoManager | undefined,
119 private blockStore: BlockStore,
120 private networkService: NetworkService,
121 private didResolver: DidResolver,
122 verificationConfig?: Partial<VerificationConfig>,
123 private replicatedRepoReader?: ReplicatedRepoReader,
124 policyEngine?: PolicyEngine,
125 pdsClient?: RecordWriter,
126 ) {
127 this.syncStorage = new SyncStorage(db);
128 this.syncStorage.initSchema();
129 this.challengeStorage = new ChallengeStorage(db);
130 this.challengeStorage.initSchema();
131 this.repoFetcher = new RepoFetcher(didResolver);
132 this.peerDiscovery = new PeerDiscovery(this.repoFetcher);
133 this.verifier = new BlockVerifier(blockStore);
134 this.verificationConfig = {
135 ...DEFAULT_VERIFICATION_CONFIG,
136 ...verificationConfig,
137 };
138 this.remoteVerifier = new RemoteVerifier(
139 blockStore,
140 this.verificationConfig,
141 );
142 if (policyEngine) {
143 this.policyEngine = policyEngine;
144 // Prefer remote PDS client for offer records; fall back to local repo
145 const recordWriter: RecordWriter | undefined = pdsClient ?? repoManager;
146 if (recordWriter) {
147 this.offerManager = new OfferManager(
148 recordWriter,
149 this.peerDiscovery,
150 policyEngine,
151 config.DID ?? "",
152 );
153 }
154 }
155 }
156
157 /**
158 * Get the PolicyEngine, if one is configured.
159 */
160 getPolicyEngine(): PolicyEngine | null {
161 return this.policyEngine;
162 }
163
164 /**
165 * Get the OfferManager, if one is configured (requires PolicyEngine).
166 */
167 getOfferManager(): OfferManager | null {
168 return this.offerManager;
169 }
170
171 /**
172 * Late-bind a PdsClient (or any RecordWriter) after construction.
173 * Called when OAuth login completes after the server has already started.
174 * Creates the OfferManager if a PolicyEngine is present but OfferManager
175 * wasn't created at construction time (because no RecordWriter was available).
176 */
177 setPdsClient(client: RecordWriter, did: string): void {
178 if (this.policyEngine && !this.offerManager) {
179 this.offerManager = new OfferManager(
180 client,
181 this.peerDiscovery,
182 this.policyEngine,
183 did,
184 );
185 }
186 }
187
188 /**
189 * Set the libp2p node for peer-first repo sync.
190 * Called from start.ts after IPFS is started.
191 */
192 setLibp2p(libp2p: Libp2p): void {
193 this.libp2p = libp2p;
194 }
195
196 /**
197 * Set a callback to republish the peer record when multiaddrs change.
198 * Called from start.ts after IPFS and replication are ready.
199 */
200 setPublishPeerRecordFn(fn: () => Promise<void>): void {
201 this.publishPeerRecordFn = fn;
202 // Seed the cache with current addrs so the first check doesn't spuriously trigger
203 this.lastPublishedAddrs = new Set(this.networkService.getMultiaddrs());
204 }
205
206 /** Subscribe to sync progress events (for SSE streaming). */
207 onSyncProgress(cb: (event: SyncProgressEvent) => void): void {
208 this.progressCallbacks.add(cb);
209 }
210
211 /** Unsubscribe from sync progress events. */
212 offSyncProgress(cb: (event: SyncProgressEvent) => void): void {
213 this.progressCallbacks.delete(cb);
214 }
215
216 /** Emit a progress event to all subscribers. */
217 private emitProgress(event: SyncProgressEvent): void {
218 for (const cb of this.progressCallbacks) {
219 try { cb(event); } catch { /* non-fatal */ }
220 }
221 }
222
223 /**
224 * Check if multiaddrs have changed since last peer record publish.
225 * If changed, republish the peer record and update the cache.
226 */
227 private async _checkMultiaddrsChanged(): Promise<void> {
228 if (!this.publishPeerRecordFn) return;
229
230 const current = new Set(this.networkService.getMultiaddrs());
231 if (current.size === 0) return;
232
233 // Compare sets
234 if (
235 current.size === this.lastPublishedAddrs.size &&
236 [...current].every((a) => this.lastPublishedAddrs.has(a))
237 ) {
238 return;
239 }
240
241 this.lastPublishedAddrs = current;
242 try {
243 await this.publishPeerRecordFn();
244 console.log("[replication] Multiaddrs changed, republished peer record");
245 } catch (err) {
246 console.warn(
247 "[replication] Failed to republish peer record:",
248 err instanceof Error ? err.message : String(err),
249 );
250 }
251 }
252
253 /**
254 * Initialize replication: create tables, sync manifests.
255 */
256 async init(): Promise<void> {
257 this.syncStorage.initSchema();
258 this.challengeStorage.initSchema();
259 await this.syncManifests();
260 await this.discoverPeerEndpoints();
261 await this.runOfferDiscovery();
262 this.setupGossipsubSubscription();
263 }
264
265 /**
266 * Get the list of DIDs to replicate.
267 *
268 * When a PolicyEngine is present, uses getActiveDids() as the single source of truth.
269 * Falls back to legacy three-source merge when no PolicyEngine is configured.
270 */
271 getReplicateDids(): string[] {
272 if (this.policyEngine) {
273 return this.policyEngine.getActiveDids();
274 }
275
276 // Legacy fallback: merge config + admin DIDs (no policy engine)
277 const allDids = new Set(this.config.REPLICATE_DIDS);
278 for (const did of this.syncStorage.getAdminDids()) {
279 allDids.add(did);
280 }
281 return [...allDids];
282 }
283
284 /**
285 * Determine the source of a tracked DID by looking up its policy.
286 */
287 getDidSource(did: string): "config" | "user" | "policy" | null {
288 if (this.policyEngine) {
289 const configPolicy = this.policyEngine.getStoredPolicy(`config:${did}`);
290 if (configPolicy) return "config";
291 const archivePolicy = this.policyEngine.getStoredPolicy(`archive:${did}`);
292 if (archivePolicy) return "user";
293 const p2pPolicy = this.policyEngine.getStoredPolicy(`p2p:${did}`);
294 if (p2pPolicy) return "policy";
295 // Check if any other policy covers this DID
296 if (this.policyEngine.shouldReplicate(did)) return "policy";
297 }
298 // Legacy fallback
299 if (this.config.REPLICATE_DIDS.includes(did)) return "config";
300 if (this.syncStorage.isAdminDid(did)) return "user";
301 return null;
302 }
303
304 /**
305 * Add a DID via the admin interface.
306 * Creates an archive policy (+ dual-write to admin_tracked_dids for rollback safety),
307 * creates manifest + sync state, subscribes gossipsub, updates firehose.
308 * Returns the status and source if already tracked.
309 */
310 async addDid(did: string): Promise<{ status: "added" | "already_tracked"; source?: string }> {
311 const existingSource = this.getDidSource(did);
312 if (existingSource) {
313 return { status: "already_tracked", source: existingSource };
314 }
315
316 // Check if already tracked via getReplicateDids (covers pattern/all policies)
317 if (this.getReplicateDids().includes(did)) {
318 return { status: "already_tracked", source: "policy" };
319 }
320
321 // Create archive policy in PolicyEngine (auto-persists to DB)
322 if (this.policyEngine) {
323 const policyId = `archive:${did}`;
324 if (!this.policyEngine.getPolicies().some((p) => p.id === policyId)) {
325 const { archive } = await import("../policy/presets.js");
326 this.policyEngine.addPolicy(archive(did));
327 }
328 }
329
330 // Dual-write to admin_tracked_dids for rollback safety
331 this.syncStorage.addAdminDid(did);
332
333 await this.syncManifestForDid(did);
334 this.networkService.subscribeCommitTopics([did]);
335 this.networkService.subscribeIdentityTopics([did]);
336 this.updateFirehoseDids();
337
338 // Trigger initial sync in background (fire-and-forget)
339 this.syncDid(did, "manual").catch((err) => {
340 console.error(`[replication] Initial sync for admin-added ${did} failed:`, err);
341 });
342
343 // Fetch PLC log in background (fire-and-forget)
344 if (did.startsWith("did:plc:")) {
345 this.fetchPlcLog(did).catch((err) => {
346 console.warn(`[replication] PLC log fetch for ${did} failed:`, err instanceof Error ? err.message : String(err));
347 });
348 }
349
350 return { status: "added" };
351 }
352
353 /**
354 * Offer to replicate a DID (consent-gated).
355 * Publishes an offer record and stores in offered_dids, but does NOT sync.
356 * Replication begins only when mutual consent is detected during offer discovery.
357 */
358 async offerDid(did: string): Promise<{ status: "offered" | "already_tracked" | "already_offered"; source?: string }> {
359 // Already fully tracked?
360 const existingSource = this.getDidSource(did);
361 if (existingSource) {
362 return { status: "already_tracked", source: existingSource };
363 }
364
365 // Already offered?
366 if (this.syncStorage.isOfferedDid(did)) {
367 return { status: "already_offered" };
368 }
369
370 // Require active session to publish offer record
371 if (!this.offerManager) {
372 throw new Error("No active session — log in first to publish offers");
373 }
374
375 // Resolve PDS endpoint
376 const pdsEndpoint = await this.repoFetcher.resolvePds(did);
377
378 // Publish offer record to user's PDS
379 await this.offerManager.publishOffer(did);
380
381 // Store in offered_dids (does NOT create replication_state or trigger sync)
382 this.syncStorage.addOfferedDid(did, pdsEndpoint ?? null);
383
384 // Push notification: tell the target node about this offer (fire-and-forget)
385 // Small delay to ensure PDS has committed the record before the peer verifies it
386 setTimeout(() => {
387 this.pushOfferNotification(did).catch((err) => {
388 console.warn(
389 `[replication] Failed to push offer notification to ${did}:`,
390 err instanceof Error ? err.message : String(err),
391 );
392 });
393 }, 2000);
394
395 return { status: "offered" };
396 }
397
398 /**
399 * Remove an offered DID: revoke the offer, notify the peer, and remove from offered_dids.
400 */
401 async removeOfferedDid(did: string): Promise<{ status: "removed" | "not_found" }> {
402 if (!this.syncStorage.isOfferedDid(did)) {
403 return { status: "not_found" };
404 }
405
406 // Revoke offer record if OfferManager is available
407 if (this.offerManager) {
408 await this.offerManager.revokeOffer(did);
409 }
410
411 this.syncStorage.removeOfferedDid(did);
412
413 // Notify peer that the offer was revoked (fire-and-forget)
414 this.pushRevokeNotification(did).catch((err) => {
415 console.warn(`[replication] Failed to push revoke notification to ${did}:`, err instanceof Error ? err.message : String(err));
416 });
417
418 return { status: "removed" };
419 }
420
421 /**
422 * Revoke consent and remove a DID that was promoted to active replication via mutual offer.
423 * Revokes the offer record, purges all data, and notifies the peer.
424 */
425 async revokeReplication(did: string): Promise<{ status: "revoked" | "not_found" | "error"; error?: string }> {
426 // Must be an actively tracked DID
427 const source = this.getDidSource(did);
428 const hasState = this.syncStorage.getAllStates().some((s) => s.did === did);
429 if (!source && !hasState) {
430 return { status: "not_found" };
431 }
432
433 // Revoke offer record on PDS
434 if (this.offerManager) {
435 try {
436 await this.offerManager.revokeOffer(did);
437 } catch {
438 // Non-fatal: offer record may not exist
439 }
440 }
441
442 // Transition P2P policy to terminated
443 if (this.policyEngine) {
444 const policyId = `p2p:${did}`;
445 this.policyEngine.transitionPolicy(policyId, "terminated");
446 }
447
448 // Remove from offered_dids if present
449 if (this.syncStorage.isOfferedDid(did)) {
450 this.syncStorage.removeOfferedDid(did);
451 }
452
453 // Notify peer BEFORE purging (purge clears peer endpoints needed for notification)
454 try {
455 await this.pushRevokeNotification(did);
456 } catch (err) {
457 console.warn(`[replication] Failed to push revoke notification to ${did}:`, err instanceof Error ? err.message : String(err));
458 }
459
460 // Remove from replication with full data purge
461 const result = await this.removeDid(did, true);
462 if (result.status === "error") {
463 return { status: "error", error: result.error };
464 }
465
466 return { status: "revoked" };
467 }
468
469 /**
470 * Handle a revocation from a remote peer: stop replicating their DID and purge data.
471 */
472 async handleRemoteRevocation(revokerDid: string): Promise<void> {
473 // Remove from incoming offers if present
474 const incomingOffers = this.syncStorage.getIncomingOffers();
475 for (const offer of incomingOffers) {
476 if (offer.offererDid === revokerDid) {
477 this.syncStorage.removeIncomingOffer(revokerDid, offer.subjectDid);
478 }
479 }
480
481 // Remove from offered_dids if present
482 if (this.syncStorage.isOfferedDid(revokerDid)) {
483 if (this.offerManager) {
484 try { await this.offerManager.revokeOffer(revokerDid); } catch { /* non-fatal */ }
485 }
486 this.syncStorage.removeOfferedDid(revokerDid);
487 }
488
489 // Transition P2P policy to terminated
490 if (this.policyEngine) {
491 const policyId = `p2p:${revokerDid}`;
492 if (this.policyEngine.transitionPolicy(policyId, "terminated")) {
493 console.log(`[replication] Terminated P2P policy ${policyId}`);
494 }
495 }
496
497 // Remove from active replication with data purge
498 const isTracked = this.getDidSource(revokerDid) !== null
499 || this.syncStorage.getAllStates().some((s) => s.did === revokerDid);
500 if (isTracked) {
501 console.log(`[replication] Remote revocation from ${revokerDid} — removing DID and purging data`);
502 await this.removeDid(revokerDid, true);
503 }
504 }
505
506 /**
507 * Push an offer notification to the target node's p2pds endpoint.
508 * Resolves the target's org.p2pds.peer record to find their endpoint URL,
509 * then POSTs to their notifyOffer XRPC method.
510 */
511 private async pushOfferNotification(targetDid: string): Promise<void> {
512 const peerInfo = await this.peerDiscovery.discoverPeer(targetDid);
513 if (!peerInfo?.endpoint) {
514 console.log(`[replication] No p2pds endpoint found for ${targetDid}, skipping push notification`);
515 return;
516 }
517
518 const url = `${peerInfo.endpoint}/xrpc/org.p2pds.replication.notifyOffer`;
519 const body = {
520 offererDid: this.config.DID,
521 subjectDid: targetDid,
522 offererPdsEndpoint: await this.repoFetcher.resolvePds(this.config.DID ?? ""),
523 params: {
524 minCopies: 2,
525 intervalSec: 600,
526 priority: 50,
527 },
528 };
529
530 const res = await fetch(url, {
531 method: "POST",
532 headers: { "Content-Type": "application/json" },
533 body: JSON.stringify(body),
534 });
535
536 if (!res.ok) {
537 const text = await res.text().catch(() => "");
538 console.warn(`[replication] Push notification to ${targetDid} failed (${res.status}): ${text}`);
539 } else {
540 console.log(`[replication] Push notification sent to ${targetDid} at ${peerInfo.endpoint}`);
541 }
542 }
543
544 /**
545 * Push a revocation notification to the target node's p2pds endpoint.
546 */
547 private async pushRevokeNotification(targetDid: string): Promise<void> {
548 const peerInfo = await this.peerDiscovery.discoverPeer(targetDid);
549 if (!peerInfo?.endpoint) {
550 console.log(`[replication] No p2pds endpoint found for ${targetDid}, skipping revoke notification`);
551 return;
552 }
553
554 const url = `${peerInfo.endpoint}/xrpc/org.p2pds.replication.notifyRevoke`;
555 const body = {
556 revokerDid: this.config.DID,
557 subjectDid: targetDid,
558 };
559
560 const res = await fetch(url, {
561 method: "POST",
562 headers: { "Content-Type": "application/json" },
563 body: JSON.stringify(body),
564 });
565
566 if (!res.ok) {
567 const text = await res.text().catch(() => "");
568 console.warn(`[replication] Revoke notification to ${targetDid} failed (${res.status}): ${text}`);
569 } else {
570 console.log(`[replication] Revoke notification sent to ${targetDid} at ${peerInfo.endpoint}`);
571 }
572 }
573
574 /**
575 * Accept an incoming offer: create a reciprocal offer and remove from incoming_offers.
576 */
577 async acceptOffer(offererDid: string): Promise<{ status: "accepted" | "not_found" | "error"; error?: string }> {
578 // Find the incoming offer to get the subject DID
579 const offers = this.syncStorage.getIncomingOffers();
580 const offer = offers.find((o) => o.offererDid === offererDid);
581 if (!offer) {
582 return { status: "not_found" };
583 }
584
585 // Create reciprocal offer (which also push-notifies back)
586 try {
587 await this.offerDid(offererDid);
588 } catch (err) {
589 return {
590 status: "error",
591 error: `Failed to create reciprocal offer: ${err instanceof Error ? err.message : String(err)}`,
592 };
593 }
594
595 // Remove from incoming_offers
596 this.syncStorage.removeIncomingOffer(offererDid, offer.subjectDid);
597
598 // Run offer discovery immediately to detect mutual agreement and start replication
599 this.triggerOfferDiscovery();
600
601 return { status: "accepted" };
602 }
603
604 /**
605 * Reject an incoming offer: remove from incoming_offers without creating a reciprocal offer.
606 */
607 rejectOffer(offererDid: string): { status: "rejected" | "not_found" } {
608 const offers = this.syncStorage.getIncomingOffers();
609 const offer = offers.find((o) => o.offererDid === offererDid);
610 if (!offer) {
611 return { status: "not_found" };
612 }
613
614 this.syncStorage.removeIncomingOffer(offererDid, offer.subjectDid);
615 return { status: "rejected" };
616 }
617
618 /**
619 * Trigger offer discovery asynchronously (fire-and-forget).
620 * Used after accepting an offer or receiving a push notification
621 * to immediately detect mutual agreements.
622 */
623 triggerOfferDiscovery(): void {
624 this.runOfferDiscovery().catch((err) => {
625 console.warn("[replication] Triggered offer discovery failed:", err instanceof Error ? err.message : String(err));
626 });
627 }
628
629 /**
630 * Get all incoming offers (from other nodes wanting to replicate our data).
631 */
632 getIncomingOffers(): Array<{
633 offererDid: string;
634 subjectDid: string;
635 offererPdsEndpoint: string | null;
636 offererEndpoint: string | null;
637 minCopies: number;
638 intervalSec: number;
639 priority: number;
640 receivedAt: string;
641 }> {
642 return this.syncStorage.getIncomingOffers();
643 }
644
645 /**
646 * Get the PeerDiscovery instance (for verifying incoming offers).
647 */
648 getPeerDiscovery(): PeerDiscovery {
649 return this.peerDiscovery;
650 }
651
652 /**
653 * Get the RepoFetcher instance (for resolving PDS endpoints).
654 */
655 getRepoFetcher(): RepoFetcher {
656 return this.repoFetcher;
657 }
658
659 /**
660 * Remove an admin-added DID.
661 * Returns error info if the DID cannot be removed (config/policy origin).
662 * If purgeData is true, deletes all associated data. Otherwise pauses.
663 */
664 async removeDid(did: string, purgeData: boolean = false): Promise<{
665 status: "removed" | "error";
666 purged: boolean;
667 error?: string;
668 }> {
669 // Check if it's a config-origin DID (not removable via UI)
670 const configPolicyId = `config:${did}`;
671 if (this.policyEngine?.getStoredPolicy(configPolicyId) || this.config.REPLICATE_DIDS.includes(did)) {
672 return { status: "error", purged: false, error: "Cannot remove config-origin DID. Remove from REPLICATE_DIDS env var." };
673 }
674
675 // Transition archive/p2p policies to purged state
676 if (this.policyEngine) {
677 const archivePolicyId = `archive:${did}`;
678 this.policyEngine.transitionPolicy(archivePolicyId, "purged");
679 const p2pPolicyId = `p2p:${did}`;
680 this.policyEngine.transitionPolicy(p2pPolicyId, "purged");
681 }
682
683 // Dual-write: also remove from legacy table
684 this.syncStorage.removeAdminDid(did);
685 this.networkService.unsubscribeCommitTopics([did]);
686 this.networkService.unsubscribeIdentityTopics([did]);
687 this.updateFirehoseDids();
688
689 if (purgeData) {
690 this.syncStorage.clearBlocks(did);
691 this.syncStorage.clearBlobs(did);
692 this.syncStorage.clearRecordPaths(did);
693 this.syncStorage.clearPeerEndpoints(did);
694 this.syncStorage.deleteState(did);
695
696 // Remove manifest record
697 if (this.repoManager) {
698 const rkey = didToRkey(did);
699 try {
700 await this.repoManager.deleteRecord(MANIFEST_NSID, rkey);
701 } catch {
702 // Non-fatal: manifest may not exist
703 }
704 }
705 } else {
706 // Pause: update manifest status but keep data
707 if (this.repoManager) {
708 const rkey = didToRkey(did);
709 const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey);
710 if (existing) {
711 const manifest: ManifestRecord = {
712 ...(existing.record as ManifestRecord),
713 status: "paused",
714 };
715 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest);
716 }
717 }
718 }
719
720 return { status: "removed", purged: purgeData };
721 }
722
723 /**
724 * Update firehose subscription with the current set of tracked DIDs.
725 */
726 private updateFirehoseDids(): void {
727 if (this.firehoseSubscription) {
728 this.firehoseSubscription.updateDids(new Set(this.getReplicateDids()));
729 }
730 }
731
732 /**
733 * Ensure a manifest record and sync state exist for a single DID.
734 * Extracted from syncManifests() for use by addDid().
735 */
736 private async syncManifestForDid(did: string): Promise<void> {
737 if (this.repoManager) {
738 const rkey = didToRkey(did);
739 const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey);
740
741 if (!existing) {
742 const manifest: ManifestRecord = {
743 $type: MANIFEST_NSID,
744 subject: did,
745 status: "active",
746 lastSyncRev: null,
747 lastSyncAt: null,
748 createdAt: new Date().toISOString(),
749 };
750 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest);
751 }
752 }
753
754 const pdsEndpoint = await this.repoFetcher.resolvePds(did);
755 if (pdsEndpoint) {
756 this.syncStorage.upsertState({ did, pdsEndpoint });
757 }
758 }
759
760 /**
761 * Ensure a manifest record exists for each configured DID.
762 * Creates new manifests or updates existing ones.
763 */
764 async syncManifests(): Promise<void> {
765 for (const did of this.getReplicateDids()) {
766 await this.syncManifestForDid(did);
767 }
768 }
769
770 /**
771 * Run offer discovery: gather peers from sync state AND offered_dids,
772 * then discover mutual agreements.
773 * When a mutual agreement is detected for an offered DID,
774 * promotes it to full replication via addDid() and removes from offered_dids.
775 * Non-fatal: errors are logged but don't block sync.
776 */
777 private async runOfferDiscovery(): Promise<void> {
778 if (!this.offerManager) return;
779
780 try {
781 // Existing: peers from replication_state
782 const states = this.syncStorage.getAllStates();
783 const syncPeers = states
784 .filter((s) => s.pdsEndpoint)
785 .map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint }));
786
787 // New: peers from offered_dids
788 const offeredDids = this.syncStorage.getOfferedDids();
789 const offeredPeers = offeredDids
790 .filter((o) => o.pdsEndpoint)
791 .map((o) => ({ did: o.did, pdsEndpoint: o.pdsEndpoint! }));
792
793 // Merge and deduplicate by DID
794 const seenDids = new Set<string>();
795 const allPeers: Array<{ did: string; pdsEndpoint: string }> = [];
796 for (const peer of [...syncPeers, ...offeredPeers]) {
797 if (!seenDids.has(peer.did)) {
798 seenDids.add(peer.did);
799 allPeers.push(peer);
800 }
801 }
802
803 if (allPeers.length > 0) {
804 const agreements = await this.offerManager.discoverAgreements(allPeers);
805
806 // Check if any offered DIDs now have mutual agreements
807 for (const offered of offeredDids) {
808 const hasAgreement = agreements.some(
809 (a) => a.localOffer.subject === offered.did || a.remoteOffer.subject === offered.did,
810 );
811 if (hasAgreement) {
812 console.log(`[replication] Mutual agreement detected for offered DID ${offered.did} — promoting to replication`);
813 try {
814 await this.addDid(offered.did);
815 this.syncStorage.removeOfferedDid(offered.did);
816 } catch (err) {
817 console.error(
818 `[replication] Failed to promote offered DID ${offered.did}:`,
819 err instanceof Error ? err.message : String(err),
820 );
821 }
822 }
823 }
824
825 // Detect broken agreements: P2P-origin DIDs whose remote offer was revoked
826 const activePeerDids = new Set(agreements.map((a) => a.counterpartyDid));
827 for (const state of states) {
828 // Only check P2P-origin DIDs (those that were promoted from offers)
829 const policyId = `p2p:${state.did}`;
830 const hasP2pPolicy = this.policyEngine?.getPolicies().some((p) => p.id === policyId);
831 if (hasP2pPolicy && !activePeerDids.has(state.did)) {
832 console.log(`[replication] Agreement broken for ${state.did} (remote offer revoked) — removing and purging data`);
833 try {
834 await this.removeDid(state.did, true);
835 } catch (err) {
836 console.error(`[replication] Failed to remove broken-agreement DID ${state.did}:`, err instanceof Error ? err.message : String(err));
837 }
838 }
839 }
840
841 // Still run full discoverAndSync for sync-state peers
842 await this.offerManager.discoverAndSync(syncPeers);
843 }
844 } catch (err) {
845 console.error(
846 "[replication] Offer discovery error:",
847 err instanceof Error ? err.message : String(err),
848 );
849 }
850 }
851
852 /**
853 * Discover peer endpoints by scanning manifests of known peers.
854 * For each peer whose manifest includes a DID we're tracking, record
855 * that peer as a potential fallback source for that DID.
856 */
857 private async discoverPeerEndpoints(): Promise<void> {
858 const trackedDids = new Set(this.getReplicateDids());
859 const states = this.syncStorage.getAllStates();
860
861 for (const state of states) {
862 if (!state.pdsEndpoint) continue;
863
864 try {
865 const manifests = await this.peerDiscovery.discoverManifests(
866 state.did,
867 state.pdsEndpoint,
868 );
869
870 for (const manifest of manifests) {
871 if (
872 manifest.subject &&
873 trackedDids.has(manifest.subject) &&
874 manifest.status === "active"
875 ) {
876 this.syncStorage.upsertPeerEndpoint(
877 manifest.subject,
878 state.did,
879 state.pdsEndpoint,
880 manifest.lastSyncRev ?? null,
881 );
882 }
883 }
884 } catch {
885 // Non-fatal: peer endpoint discovery is best-effort
886 }
887 }
888 }
889
890 /**
891 * Fetch a repo from known peer endpoints when the source PDS is unavailable.
892 * Tries peers in order of freshest data (highest lastSyncRev).
893 * Throws the original error if no peers succeed.
894 */
895 private async fetchFromPeersOrThrow(
896 did: string,
897 since: string | undefined,
898 originalError: Error,
899 ): Promise<Uint8Array> {
900 const peers = this.syncStorage.getPeerEndpoints(did);
901 if (peers.length === 0) throw originalError;
902
903 // Sort by lastSyncRev descending (prefer freshest data)
904 peers.sort((a, b) => {
905 if (!a.lastSyncRev && !b.lastSyncRev) return 0;
906 if (!a.lastSyncRev) return 1;
907 if (!b.lastSyncRev) return -1;
908 return b.lastSyncRev.localeCompare(a.lastSyncRev);
909 });
910
911 for (const peer of peers) {
912 try {
913 return await this.repoFetcher.fetchRepo(
914 peer.pdsEndpoint,
915 did,
916 since,
917 );
918 } catch {
919 // Try next peer
920 }
921 }
922
923 throw originalError;
924 }
925
926 /**
927 * Sync all configured DIDs.
928 *
929 * When a PolicyEngine is present:
930 * - Merges config DIDs with policy explicit DIDs
931 * - Filters out DIDs where shouldReplicate is false
932 * - Sorts by priority (highest first)
933 * - Respects per-DID sync intervals (skips DIDs not yet due)
934 */
935 async syncAll(): Promise<void> {
936 const dids = this.getReplicateDids();
937
938 // Sort by priority (highest first) when policy engine is present
939 const sortedDids = this.policyEngine
940 ? this.sortDidsByPriority(dids)
941 : dids;
942
943 this.emitProgress({ type: "sync-cycle:start", did: "*" });
944
945 for (const did of sortedDids) {
946 if (this.stopped) break;
947
948 // Skip tombstoned DIDs (handled separately by cleanupTombstonedDids)
949 const currentState = this.syncStorage.getState(did);
950 if (currentState?.status === "tombstoned") continue;
951
952 // Check per-DID interval when policy engine is present
953 if (this.policyEngine && !this.isDidDueForSync(did)) {
954 continue;
955 }
956
957 try {
958 await this.syncDid(did, "periodic");
959 // Record successful sync timestamp
960 this.lastSyncTimestamps.set(did, Date.now());
961 } catch (err) {
962 const message =
963 err instanceof Error ? err.message : String(err);
964 this.syncStorage.updateStatus(did, "error", message);
965 }
966 }
967
968 // Run deferred GC for DIDs flagged by firehose delete/update ops
969 await this.runDeferredGc();
970
971 // Clean up tombstoned DIDs after grace period
972 await this.cleanupTombstonedDids();
973
974 // Discover peer endpoints for P2P fallback
975 await this.discoverPeerEndpoints();
976
977 // Re-run offer discovery to pick up new/revoked offers
978 await this.runOfferDiscovery();
979
980 // Check if multiaddrs changed and republish peer record if needed
981 await this._checkMultiaddrsChanged();
982
983 this.emitProgress({ type: "sync-cycle:complete", did: "*" });
984 }
985
986 /**
987 * Sort DIDs by their effective policy priority (highest first).
988 * DIDs without a matching policy get priority 0.
989 */
990 private sortDidsByPriority(dids: string[]): string[] {
991 if (!this.policyEngine) return dids;
992 return [...dids].sort((a, b) => {
993 const pa = this.policyEngine!.evaluate(a).priority;
994 const pb = this.policyEngine!.evaluate(b).priority;
995 return pb - pa;
996 });
997 }
998
999 /**
1000 * Check whether a DID is due for sync based on its policy interval.
1001 * DIDs that have never been synced are always due.
1002 */
1003 private isDidDueForSync(did: string): boolean {
1004 if (!this.policyEngine) return true;
1005
1006 const lastSync = this.lastSyncTimestamps.get(did);
1007 if (lastSync === undefined) return true; // never synced
1008
1009 const config = this.policyEngine.getReplicationConfig(did);
1010 // Config DIDs without a matching policy use the default interval
1011 const intervalMs = config
1012 ? config.sync.intervalSec * 1000
1013 : DEFAULT_SYNC_INTERVAL_MS;
1014
1015 return Date.now() - lastSync >= intervalMs;
1016 }
1017
1018 /**
1019 * Get the effective sync interval for a DID in milliseconds.
1020 * Returns the policy-driven interval if a policy engine is present,
1021 * otherwise returns the default 5-minute interval.
1022 */
1023 getEffectiveSyncIntervalMs(did: string): number {
1024 if (!this.policyEngine) return DEFAULT_SYNC_INTERVAL_MS;
1025
1026 const config = this.policyEngine.getReplicationConfig(did);
1027 return config
1028 ? config.sync.intervalSec * 1000
1029 : DEFAULT_SYNC_INTERVAL_MS;
1030 }
1031
1032 /**
1033 * Sync a single DID: fetch repo, store blocks in IPFS, verify, update state.
1034 */
1035 async syncDid(did: string, trigger: SyncTrigger = "unknown"): Promise<void> {
1036 this.syncStorage.updateStatus(did, "syncing");
1037 const syncStart = Date.now();
1038 let sourceType = "pds";
1039
1040 // 1. Resolve PDS endpoint
1041 let state = this.syncStorage.getState(did);
1042 let pdsEndpoint = await this.repoFetcher.resolvePds(did);
1043
1044 if (!pdsEndpoint) {
1045 // If previously had a PDS endpoint, this may be a tombstoned/deleted account
1046 if (state?.pdsEndpoint) {
1047 this.syncStorage.markTombstoned(did);
1048 console.warn(`[replication] DID ${did} no longer resolvable — marked as tombstoned`);
1049 } else {
1050 this.syncStorage.updateStatus(did, "error", "Could not resolve PDS endpoint");
1051 }
1052 return;
1053 }
1054
1055 // Update PDS endpoint if it changed
1056 this.syncStorage.upsertState({ did, pdsEndpoint });
1057 state = this.syncStorage.getState(did);
1058
1059 // 2. Optionally refresh peer info (peerId + multiaddrs)
1060 if (state && this.shouldRefreshPeerInfo(state)) {
1061 try {
1062 const peerInfo = await this.peerDiscovery.discoverPeer(did);
1063 if (peerInfo) {
1064 // Detect PeerID changes (peer restarted with new identity)
1065 if (state.peerId && peerInfo.peerId && state.peerId !== peerInfo.peerId) {
1066 console.warn(
1067 `[replication] PeerID changed for ${did}: ${state.peerId} → ${peerInfo.peerId}`,
1068 );
1069 }
1070 this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs);
1071 // Merge observed multiaddrs from active libp2p connections
1072 if (peerInfo.peerId) {
1073 const observed = this.networkService.getRemoteAddrs(peerInfo.peerId);
1074 if (observed.length > 0) {
1075 const merged = [...new Set([...peerInfo.multiaddrs, ...observed])];
1076 if (merged.length > peerInfo.multiaddrs.length) {
1077 this.syncStorage.updatePeerInfo(did, peerInfo.peerId, merged);
1078 }
1079 }
1080 }
1081 }
1082 } catch {
1083 // Non-fatal: peer discovery is optional
1084 }
1085 }
1086
1087 // 3. Fetch repo (with incremental sync if we have a previous rev)
1088 const since = state?.lastSyncRev ?? undefined;
1089
1090 // 3a. Try libp2p peer-first sync if we have peer info
1091 let carBytes: Uint8Array | null = null;
1092 state = this.syncStorage.getState(did); // re-read for fresh peer info
1093 if (this.libp2p && state?.peerId && state.peerMultiaddrs && state.peerMultiaddrs.length > 0) {
1094 try {
1095 console.log(`[sync] ${did} — fetching repo from peer ${state.peerId.slice(0, 12)}… via libp2p`);
1096 carBytes = await fetchRepoFromPeer(
1097 this.libp2p,
1098 state.peerMultiaddrs,
1099 did,
1100 since,
1101 );
1102 sourceType = "libp2p";
1103 console.log(`[sync] ${did} — received ${(carBytes.length / 1024).toFixed(1)} KB from peer via libp2p`);
1104 } catch (err) {
1105 console.log(
1106 `[sync] ${did} — libp2p sync failed, falling back to HTTP: ${err instanceof Error ? err.message : String(err)}`,
1107 );
1108 carBytes = null;
1109 }
1110 }
1111
1112 // 3b. Fall back to HTTP PDS fetch
1113 if (!carBytes) {
1114 sourceType = "pds";
1115 console.log(`[sync] ${did} — fetching repo from ${pdsEndpoint}${since ? ` (since: ${since.slice(0, 8)}…)` : ""}`);
1116 try {
1117 carBytes = await this.repoFetcher.fetchRepo(
1118 pdsEndpoint,
1119 did,
1120 since,
1121 );
1122 } catch (sourceErr) {
1123 // On failure, clear cached peer info and trigger re-discovery
1124 this.syncStorage.clearPeerInfo(did);
1125 this.peerDiscovery.discoverPeer(did).then((peerInfo) => {
1126 if (peerInfo) {
1127 this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs);
1128 }
1129 }).catch(() => {});
1130 const err = sourceErr instanceof Error ? sourceErr : new Error(String(sourceErr));
1131 if (since) {
1132 // Retry full sync from source, then fall back to peers
1133 try {
1134 carBytes = await this.repoFetcher.fetchRepo(pdsEndpoint, did);
1135 } catch {
1136 sourceType = "peer_fallback";
1137 carBytes = await this.fetchFromPeersOrThrow(did, since, err);
1138 }
1139 } else {
1140 sourceType = "peer_fallback";
1141 carBytes = await this.fetchFromPeersOrThrow(did, undefined, err);
1142 }
1143 }
1144 }
1145
1146 // Start sync event after source is determined
1147 this.emitProgress({ type: "sync:start", did, sourceType });
1148 const syncEventId = this.syncStorage.startSyncEvent(did, sourceType, trigger);
1149
1150 try {
1151 // 4. Parse CAR and store blocks
1152 const carSizeKb = (carBytes.length / 1024).toFixed(1);
1153 console.log(`[sync] ${did} — CAR received: ${carSizeKb} KB, parsing...`);
1154 this.emitProgress({ type: "sync:car-received", did, carBytes: carBytes.length });
1155 const { root, blocks } = await readCarWithRoot(carBytes);
1156
1157 await this.blockStore.putBlocks(blocks);
1158
1159 // 5. Collect CID strings + sizes for DHT announcement + verification
1160 const cidStrs: string[] = [];
1161 const blockEntries: Array<{ cid: string; sizeBytes: number }> = [];
1162 const internalMap = (
1163 blocks as unknown as { map: Map<string, Uint8Array> }
1164 ).map;
1165 if (internalMap) {
1166 for (const [cidStr, blockBytes] of internalMap.entries()) {
1167 cidStrs.push(cidStr);
1168 blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length });
1169 }
1170 }
1171
1172 console.log(`[sync] ${did} — stored ${blockEntries.length} blocks, verifying...`);
1173
1174 // 5b. Track block CIDs with sizes for remote verification
1175 if (blockEntries.length > 0) {
1176 this.syncStorage.trackBlocksWithSize(did, blockEntries);
1177 }
1178
1179 const totalBlockSizeKb = blockEntries.reduce((s, b) => s + b.sizeBytes, 0) / 1024;
1180 this.emitProgress({ type: "sync:blocks-stored", did, blocksStored: blockEntries.length, blocksSizeKb: Math.round(totalBlockSizeKb) });
1181
1182 // 6. Announce to DHT (fire-and-forget)
1183 this.networkService.provideBlocks(cidStrs).catch(() => {});
1184
1185 // 7. Verify local block availability
1186 const verification =
1187 await this.verifier.verifyBlockAvailability(cidStrs);
1188 if (verification.missing.length > 0) {
1189 console.warn(
1190 `Local verification: ${verification.missing.length}/${verification.checked} blocks missing for ${did}`,
1191 );
1192 }
1193 this.syncStorage.updateVerifiedAt(did);
1194 this.emitProgress({ type: "sync:verified", did, missingBlocks: verification.missing.length });
1195
1196 // 8. Extract actual rev from the commit block
1197 const rootCidStr = root.toString();
1198 let rev = rootCidStr; // fallback
1199 const commitBytes = internalMap?.get(rootCidStr);
1200 if (commitBytes) {
1201 try {
1202 const commitObj = cborDecode(commitBytes) as Record<string, unknown>;
1203 if (typeof commitObj.rev === "string") {
1204 rev = commitObj.rev;
1205 }
1206 } catch {
1207 // If CBOR decode fails, fall back to root CID as rev
1208 }
1209 }
1210
1211 // 9. Update sync state with both rev and root CID
1212 this.syncStorage.updateSyncProgress(did, rev, rootCidStr);
1213
1214 // 9a. Publish gossipsub notification (fire-and-forget)
1215 this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {});
1216
1217 // 9b. Invalidate cached ReadableRepo so it reloads with new root
1218 this.replicatedRepoReader?.invalidateCache(did);
1219
1220 // 9c. Track record paths for challenge generation (full MST walk)
1221 try {
1222 const recordPaths = await extractAllRecordPaths(
1223 this.blockStore,
1224 rootCidStr,
1225 );
1226 this.syncStorage.clearRecordPaths(did);
1227 this.syncStorage.trackRecordPaths(did, recordPaths);
1228 // Update lexicon index with NSIDs from these paths
1229 const nsids = extractNsids(recordPaths);
1230 if (nsids.length > 0) {
1231 this.syncStorage.updateLexiconIndex(nsids);
1232 }
1233 } catch {
1234 // Non-fatal: path extraction is best-effort
1235 }
1236
1237 // 9d. Reconcile blocks: remove orphaned blocks from this DID's tracking
1238 try {
1239 const liveCids = await extractAllCids(this.blockStore, rootCidStr);
1240 const trackedCids = this.syncStorage.getBlockCidSet(did);
1241
1242 // Find CIDs that are tracked but no longer live
1243 const orphanedFromDid: string[] = [];
1244 for (const cid of trackedCids) {
1245 if (!liveCids.has(cid)) {
1246 orphanedFromDid.push(cid);
1247 }
1248 }
1249
1250 if (orphanedFromDid.length > 0) {
1251 // Remove from this DID's tracking
1252 this.syncStorage.removeBlocks(did, orphanedFromDid);
1253
1254 // Find which are truly orphaned (no other DID references them)
1255 const trulyOrphaned = this.syncStorage.findOrphanedCids(orphanedFromDid);
1256
1257 // Delete from blockstore
1258 for (const cid of trulyOrphaned) {
1259 await this.blockStore.deleteBlock(cid);
1260 }
1261 }
1262
1263 // Clear the needs_gc flag since we just did a full reconciliation
1264 this.syncStorage.clearNeedsGc(did);
1265 } catch {
1266 // Non-fatal: GC is best-effort
1267 }
1268
1269 // 9e. Reconcile blobs: remove blobs for deleted records
1270 try {
1271 const currentBlobCids = new Set<string>();
1272 if (this.replicatedRepoReader) {
1273 const repo = await this.replicatedRepoReader.getRepo(did);
1274 if (repo) {
1275 for await (const entry of repo.walkRecords()) {
1276 const cids = extractBlobCids(entry.record);
1277 for (const cid of cids) {
1278 currentBlobCids.add(cid);
1279 }
1280 }
1281 }
1282 }
1283
1284 const trackedBlobCids = this.syncStorage.getBlobCids(did);
1285 const orphanedBlobs = trackedBlobCids.filter(cid => !currentBlobCids.has(cid));
1286
1287 if (orphanedBlobs.length > 0) {
1288 this.syncStorage.removeBlobs(did, orphanedBlobs);
1289 const trulyOrphanedBlobs = this.syncStorage.findOrphanedBlobCids(orphanedBlobs);
1290 for (const cid of trulyOrphanedBlobs) {
1291 await this.blockStore.deleteBlock(cid);
1292 }
1293 }
1294 } catch {
1295 // Non-fatal: blob GC is best-effort
1296 }
1297
1298 // 10. Update manifest record
1299 if (this.repoManager) {
1300 const rkey = didToRkey(did);
1301 const existingManifest = await this.repoManager.getRecord(
1302 MANIFEST_NSID,
1303 rkey,
1304 );
1305 if (existingManifest) {
1306 const manifest: ManifestRecord = {
1307 $type: MANIFEST_NSID,
1308 subject: did,
1309 status: "active",
1310 lastSyncRev: rev,
1311 lastSyncAt: new Date().toISOString(),
1312 createdAt:
1313 (existingManifest.record as Record<string, unknown>)
1314 ?.createdAt as string ?? new Date().toISOString(),
1315 };
1316 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest);
1317 }
1318 }
1319
1320 // 11. Sync blobs and capture result
1321 console.log(`[sync] ${did} — syncing blobs...`);
1322 const blobResult = await this.syncBlobs(did);
1323
1324 // 12. Complete sync event with metrics
1325 const totalDuration = ((Date.now() - syncStart) / 1000).toFixed(1);
1326 const totalBlobKb = (blobResult.totalBytes / 1024).toFixed(1);
1327 console.log(
1328 `[sync] ${did} — complete: ${blockEntries.length} blocks, ${blobResult.fetched} blobs (${totalBlobKb} KB), ${totalDuration}s`,
1329 );
1330 this.syncStorage.completeSyncEvent(syncEventId, {
1331 status: "success",
1332 blocksAdded: blockEntries.length,
1333 blobsAdded: blobResult.fetched,
1334 carBytes: carBytes.length,
1335 blobBytes: blobResult.totalBytes,
1336 durationMs: Date.now() - syncStart,
1337 rev,
1338 rootCid: rootCidStr,
1339 incremental: !!since,
1340 });
1341 this.emitProgress({
1342 type: "sync:complete",
1343 did,
1344 blocksStored: blockEntries.length,
1345 blobsFetched: blobResult.fetched,
1346 blobBytes: blobResult.totalBytes,
1347 durationMs: Date.now() - syncStart,
1348 });
1349 } catch (err) {
1350 this.syncStorage.completeSyncEvent(syncEventId, {
1351 status: "error",
1352 errorMessage: err instanceof Error ? err.message : String(err),
1353 durationMs: Date.now() - syncStart,
1354 incremental: !!since,
1355 });
1356 this.emitProgress({
1357 type: "sync:error",
1358 did,
1359 error: err instanceof Error ? err.message : String(err),
1360 });
1361 throw err;
1362 }
1363 }
1364
1365 /** Grace period before purging tombstoned DID data (24 hours). */
1366 private static readonly TOMBSTONE_GRACE_MS = 24 * 60 * 60 * 1000;
1367
1368 /**
1369 * Clean up tombstoned DIDs: re-verify resolution, purge if still dead.
1370 * Only purges after a grace period (24 hours) to avoid premature deletion
1371 * from transient DID resolution failures.
1372 */
1373 private async cleanupTombstonedDids(): Promise<void> {
1374 const states = this.syncStorage.getAllStates();
1375 const tombstoned = states.filter((s) => s.status === "tombstoned");
1376
1377 for (const state of tombstoned) {
1378 if (this.stopped) break;
1379
1380 // Check grace period: only purge if tombstoned for > 24 hours
1381 const lastSyncAt = state.lastSyncAt ? new Date(state.lastSyncAt).getTime() : 0;
1382 const timeSinceLastSync = Date.now() - lastSyncAt;
1383 if (timeSinceLastSync < ReplicationManager.TOMBSTONE_GRACE_MS) {
1384 continue;
1385 }
1386
1387 // Re-verify: try to resolve the DID again
1388 try {
1389 const pdsEndpoint = await this.repoFetcher.resolvePds(state.did);
1390 if (pdsEndpoint) {
1391 // DID is alive again! Un-tombstone and resume.
1392 this.syncStorage.updateStatus(state.did, "pending");
1393 this.syncStorage.upsertState({ did: state.did, pdsEndpoint });
1394 console.log(`[replication] Tombstoned DID ${state.did} is alive again, resuming`);
1395 this.syncDid(state.did, "tombstone-recovery").catch((err) => {
1396 const message = err instanceof Error ? err.message : String(err);
1397 this.syncStorage.updateStatus(state.did, "error", message);
1398 });
1399 continue;
1400 }
1401 } catch {
1402 // Resolution failed, proceed with purge
1403 }
1404
1405 // Still dead — purge data
1406 console.warn(`[replication] Purging data for tombstoned DID ${state.did}`);
1407 const { blocksRemoved, blobsRemoved } = this.syncStorage.purgeDidData(state.did);
1408
1409 // Delete truly orphaned blocks/blobs from blockstore
1410 if (blocksRemoved.length > 0) {
1411 const orphanedBlocks = this.syncStorage.findOrphanedCids(blocksRemoved);
1412 for (const cid of orphanedBlocks) {
1413 await this.blockStore.deleteBlock(cid);
1414 }
1415 }
1416 if (blobsRemoved.length > 0) {
1417 const orphanedBlobs = this.syncStorage.findOrphanedBlobCids(blobsRemoved);
1418 for (const cid of orphanedBlobs) {
1419 await this.blockStore.deleteBlock(cid);
1420 }
1421 }
1422
1423 // Remove manifest record
1424 if (this.repoManager) {
1425 const rkey = didToRkey(state.did);
1426 try {
1427 await this.repoManager.deleteRecord(MANIFEST_NSID, rkey);
1428 } catch {
1429 // Non-fatal
1430 }
1431 }
1432 }
1433 }
1434
1435 /**
1436 * Run deferred GC for DIDs that were flagged by firehose delete/update ops.
1437 * Triggers a full sync for each flagged DID, which includes block/blob reconciliation.
1438 */
1439 private async runDeferredGc(): Promise<void> {
1440 const didsNeedingGc = this.syncStorage.getDidsNeedingGc();
1441 for (const did of didsNeedingGc) {
1442 if (this.stopped) break;
1443 try {
1444 await this.syncDid(did, "gc");
1445 this.lastSyncTimestamps.set(did, Date.now());
1446 } catch (err) {
1447 const message = err instanceof Error ? err.message : String(err);
1448 this.syncStorage.updateStatus(did, "error", message);
1449 }
1450 }
1451 }
1452
1453 /**
1454 * Compute the tick interval for periodic sync.
1455 *
1456 * When a PolicyEngine is present, uses the minimum sync interval
1457 * across all DIDs so that no DID misses its window. Each DID's
1458 * individual interval is checked inside syncAll().
1459 *
1460 * Falls back to the provided intervalMs (default 5 minutes).
1461 */
1462 private computeTickIntervalMs(fallbackMs: number): number {
1463 if (!this.policyEngine) return fallbackMs;
1464
1465 const dids = this.getReplicateDids();
1466 if (dids.length === 0) return fallbackMs;
1467
1468 let minInterval = Infinity;
1469 for (const did of dids) {
1470 const config = this.policyEngine.getReplicationConfig(did);
1471 const intervalMs = config
1472 ? config.sync.intervalSec * 1000
1473 : DEFAULT_SYNC_INTERVAL_MS;
1474 if (intervalMs < minInterval) {
1475 minInterval = intervalMs;
1476 }
1477 }
1478
1479 // Clamp to minimum tick interval to prevent excessive polling
1480 return Math.max(
1481 minInterval === Infinity ? fallbackMs : minInterval,
1482 MIN_TICK_INTERVAL_MS,
1483 );
1484 }
1485
1486 /**
1487 * Start periodic sync and verification at their respective intervals.
1488 *
1489 * When a PolicyEngine is present, the tick rate is derived from the
1490 * minimum sync interval across all DIDs. Each tick, syncAll() checks
1491 * which DIDs are actually due for sync based on their individual intervals.
1492 */
1493 startPeriodicSync(intervalMs: number = 5 * 60 * 1000): void {
1494 if (this.syncTimer) return;
1495 this.stopped = false;
1496
1497 const tickMs = this.computeTickIntervalMs(intervalMs);
1498
1499 // Run first sync after a short delay to let startup complete
1500 setTimeout(() => {
1501 if (!this.stopped) {
1502 this.syncAll().catch((err) => {
1503 console.error("Periodic sync error:", err);
1504 });
1505 }
1506 }, 5000);
1507
1508 this.syncTimer = setInterval(() => {
1509 if (!this.stopped) {
1510 this.syncAll().catch((err) => {
1511 console.error("Periodic sync error:", err);
1512 });
1513 }
1514 }, tickMs);
1515
1516 // Start periodic PLC mirror refresh (6 hours)
1517 const PLC_REFRESH_MS = 6 * 60 * 60 * 1000;
1518 this.plcRefreshTimer = setInterval(() => {
1519 if (!this.stopped) {
1520 this.refreshAllPlcLogs().catch((err) => {
1521 console.error("[replication] PLC mirror refresh error:", err);
1522 });
1523 }
1524 }, PLC_REFRESH_MS);
1525
1526 // Initial PLC log fetch for all tracked did:plc DIDs (fire-and-forget)
1527 this.refreshAllPlcLogs().catch((err) => {
1528 console.error("[replication] Initial PLC mirror fetch error:", err);
1529 });
1530
1531 // Build lexicon index from existing record paths
1532 try {
1533 this.syncStorage.rebuildLexiconIndex();
1534 } catch (err) {
1535 console.error("[replication] Lexicon index rebuild error:", err);
1536 }
1537
1538 // Run verification once on startup, then on a timer
1539 this.runVerification().catch((err) => {
1540 console.error("Initial verification error:", err);
1541 });
1542 this.verificationTimer = setInterval(() => {
1543 if (!this.stopped) {
1544 this.runVerification().catch((err) => {
1545 console.error("Periodic verification error:", err);
1546 });
1547 }
1548 }, this.verificationConfig.verificationIntervalMs);
1549 }
1550
1551 /**
1552 * Start the firehose subscription for real-time updates.
1553 * The firehose provides streaming updates for serviced DIDs,
1554 * complementing the periodic polling sync.
1555 */
1556 startFirehose(): void {
1557 if (this.firehoseSubscription) return;
1558 if (!this.config.FIREHOSE_ENABLED) return;
1559
1560 const replicateDids = this.getReplicateDids();
1561 if (replicateDids.length === 0) return;
1562
1563 this.firehoseSubscription = new FirehoseSubscription({
1564 firehoseUrl: this.config.FIREHOSE_URL,
1565 });
1566
1567 // Register handler for commit events
1568 this.firehoseSubscription.onCommit((event) => {
1569 this.handleFirehoseCommit(event).catch((err) => {
1570 console.error(`[replication] Firehose commit handler error for ${event.repo}:`, err);
1571 });
1572 });
1573
1574 // Register handler for account status events (tombstone/deactivation)
1575 this.firehoseSubscription.onAccount((event) => {
1576 this.handleFirehoseAccount(event).catch((err) => {
1577 console.error(`[replication] Firehose account handler error for ${event.did}:`, err);
1578 });
1579 });
1580
1581 // Load saved cursor for resumption
1582 const savedCursor = this.syncStorage.getFirehoseCursor();
1583
1584 // Start with the merged set of DIDs
1585 const dids = new Set(replicateDids);
1586 this.firehoseSubscription.start(dids, savedCursor);
1587
1588 // Periodically save the cursor to SQLite (every 30 seconds)
1589 this.firehoseCursorSaveTimer = setInterval(() => {
1590 this.saveFirehoseCursor();
1591 }, 30_000);
1592
1593 console.log(
1594 `[replication] Firehose subscription started` +
1595 (savedCursor !== null ? ` (resuming from cursor ${savedCursor})` : "") +
1596 ` — tracking ${dids.size} DIDs`,
1597 );
1598 }
1599
1600 /**
1601 * Handle a commit event from the firehose.
1602 *
1603 * Optimization: the firehose event already contains the blocks (as CAR bytes)
1604 * and the commit rev/CID. We apply them directly to our blockstore, avoiding
1605 * an HTTP round-trip to the source PDS.
1606 *
1607 * Falls back to full syncDid() if incremental apply fails (e.g., tooBig event,
1608 * empty blocks, gap in sequence, or CAR parsing error).
1609 */
1610 private async handleFirehoseCommit(event: FirehoseCommitEvent): Promise<void> {
1611 const did = event.repo;
1612
1613 // Only process if this DID is one we are tracking
1614 const replicateDids = this.getReplicateDids();
1615 if (!replicateDids.includes(did)) return;
1616
1617 // Determine if we should attempt incremental apply or fall back to full sync.
1618 // Fall back when:
1619 // - tooBig is set (blocks were too large to include in the event)
1620 // - rebase occurred (repo structure changed, need full sync)
1621 // - blocks are empty (nothing to apply)
1622 if (event.tooBig || event.rebase || event.blocks.length === 0) {
1623 try {
1624 await this.syncDid(did, "firehose-resync");
1625 this.lastSyncTimestamps.set(did, Date.now());
1626 } catch (err) {
1627 const message = err instanceof Error ? err.message : String(err);
1628 this.syncStorage.updateStatus(did, "error", message);
1629 }
1630 return;
1631 }
1632
1633 // Check for sequence gaps: if we have a `since` (previous rev) in the event
1634 // and it doesn't match our last sync rev, we may have missed events.
1635 const state = this.syncStorage.getState(did);
1636 if (event.since !== null && state !== null && state.lastSyncRev !== null) {
1637 if (state.lastSyncRev !== event.since) {
1638 // Gap detected: our last known rev doesn't match the event's `since`.
1639 // Fall back to full sync to catch up.
1640 console.warn(
1641 `[replication] Gap detected for ${did}: local rev=${state.lastSyncRev}, event.since=${event.since}. Falling back to full sync.`,
1642 );
1643 try {
1644 await this.syncDid(did, "firehose-resync");
1645 this.lastSyncTimestamps.set(did, Date.now());
1646 } catch (err) {
1647 const message = err instanceof Error ? err.message : String(err);
1648 this.syncStorage.updateStatus(did, "error", message);
1649 }
1650 return;
1651 }
1652 }
1653
1654 // Attempt incremental block application from firehose event
1655 try {
1656 await this.applyFirehoseBlocks(did, event);
1657 this.lastSyncTimestamps.set(did, Date.now());
1658 } catch (err) {
1659 // Incremental apply failed — fall back to full sync
1660 console.warn(
1661 `[replication] Incremental apply failed for ${did}, falling back to full sync:`,
1662 err instanceof Error ? err.message : String(err),
1663 );
1664 try {
1665 await this.syncDid(did, "firehose-resync");
1666 this.lastSyncTimestamps.set(did, Date.now());
1667 } catch (syncErr) {
1668 const message = syncErr instanceof Error ? syncErr.message : String(syncErr);
1669 this.syncStorage.updateStatus(did, "error", message);
1670 }
1671 }
1672 }
1673
1674 /**
1675 * Handle an account status event from the firehose.
1676 * Detects tombstoned/deactivated accounts and marks them accordingly.
1677 * Handles re-activation by clearing tombstone and triggering sync.
1678 */
1679 private async handleFirehoseAccount(event: FirehoseAccountEvent): Promise<void> {
1680 const did = event.did;
1681
1682 // Only process DIDs we are tracking
1683 const replicateDids = this.getReplicateDids();
1684 if (!replicateDids.includes(did)) return;
1685
1686 if (!event.active || event.status === "deleted" || event.status === "takendown") {
1687 // Refresh PLC log to capture tombstone operation
1688 if (did.startsWith("did:plc:")) {
1689 this.fetchPlcLog(did).catch((err) => {
1690 console.warn(`[replication] PLC log refresh on tombstone for ${did} failed:`, err instanceof Error ? err.message : String(err));
1691 });
1692 }
1693
1694 // Mark as tombstoned
1695 this.syncStorage.markTombstoned(did);
1696
1697 // Update manifest to paused
1698 if (this.repoManager) {
1699 const rkey = didToRkey(did);
1700 const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey);
1701 if (existing) {
1702 const manifest: ManifestRecord = {
1703 ...(existing.record as ManifestRecord),
1704 status: "paused",
1705 };
1706 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest);
1707 }
1708 }
1709
1710 console.warn(
1711 `[replication] Account ${did} deactivated/tombstoned (status=${event.status}, active=${event.active})`,
1712 );
1713 } else if (event.active) {
1714 // Re-activated: clear tombstone and trigger full sync
1715 const state = this.syncStorage.getState(did);
1716 if (state?.status === "tombstoned") {
1717 this.syncStorage.updateStatus(did, "pending");
1718 console.log(`[replication] Account ${did} re-activated, triggering sync`);
1719 this.syncDid(did, "tombstone-recovery").catch((err) => {
1720 const message = err instanceof Error ? err.message : String(err);
1721 this.syncStorage.updateStatus(did, "error", message);
1722 });
1723 // Refresh PLC log on reactivation
1724 if (did.startsWith("did:plc:")) {
1725 this.fetchPlcLog(did).catch((err) => {
1726 console.warn(`[replication] PLC log refresh on reactivation for ${did} failed:`, err instanceof Error ? err.message : String(err));
1727 });
1728 }
1729 }
1730 }
1731 }
1732
1733 /**
1734 * Apply blocks from a firehose commit event directly to the blockstore.
1735 * The event's `blocks` field contains CAR-encoded bytes that can be parsed
1736 * and stored without fetching from the source PDS.
1737 */
1738 private async applyFirehoseBlocks(
1739 did: string,
1740 event: FirehoseCommitEvent,
1741 ): Promise<void> {
1742 const syncStart = Date.now();
1743 const syncEventId = this.syncStorage.startSyncEvent(did, "firehose", "firehose");
1744
1745 try {
1746 // 1. Parse the CAR bytes from the firehose event
1747 const { root, blocks } = await readCarWithRoot(event.blocks);
1748
1749 // 2. Store blocks in our blockstore
1750 await this.blockStore.putBlocks(blocks);
1751
1752 // 3. Collect CID strings + sizes for DHT announcement + block tracking
1753 const cidStrs: string[] = [];
1754 const blockEntries: Array<{ cid: string; sizeBytes: number }> = [];
1755 const internalMap = (
1756 blocks as unknown as { map: Map<string, Uint8Array> }
1757 ).map;
1758 if (internalMap) {
1759 for (const [cidStr, blockBytes] of internalMap.entries()) {
1760 cidStrs.push(cidStr);
1761 blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length });
1762 }
1763 }
1764
1765 // 4. Track block CIDs with sizes
1766 if (blockEntries.length > 0) {
1767 this.syncStorage.trackBlocksWithSize(did, blockEntries);
1768 }
1769
1770 const fhBlockSizeKb = blockEntries.reduce((s, b) => s + b.sizeBytes, 0) / 1024;
1771 this.emitProgress({ type: "sync:blocks-stored", did, blocksStored: blockEntries.length, blocksSizeKb: Math.round(fhBlockSizeKb) });
1772
1773 // 5. Announce to DHT (fire-and-forget)
1774 this.networkService.provideBlocks(cidStrs).catch(() => {});
1775
1776 // 6. Determine rev and root CID
1777 const rootCidStr = root.toString();
1778 let rev = event.rev; // Use the rev directly from the firehose event
1779
1780 // If no rev in the event, try to extract from commit block
1781 if (!rev) {
1782 const commitBytes = internalMap?.get(rootCidStr);
1783 if (commitBytes) {
1784 try {
1785 const commitObj = cborDecode(commitBytes) as Record<string, unknown>;
1786 if (typeof commitObj.rev === "string") {
1787 rev = commitObj.rev;
1788 }
1789 } catch {
1790 // Fall back to root CID as rev
1791 rev = rootCidStr;
1792 }
1793 } else {
1794 rev = rootCidStr;
1795 }
1796 }
1797
1798 // 7. Update sync state
1799 this.syncStorage.updateSyncProgress(did, rev, rootCidStr);
1800
1801 // 7a. Publish gossipsub notification (fire-and-forget)
1802 this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {});
1803
1804 // 8. Invalidate cached ReadableRepo so it reloads with new root
1805 this.replicatedRepoReader?.invalidateCache(did);
1806
1807 // 8b. Track record paths incrementally from firehose ops
1808 try {
1809 const createdPaths = event.ops
1810 .filter((op) => op.action === "create" || op.action === "update")
1811 .map((op) => op.path);
1812 const deletedPaths = event.ops
1813 .filter((op) => op.action === "delete")
1814 .map((op) => op.path);
1815 if (createdPaths.length > 0) {
1816 this.syncStorage.trackRecordPaths(did, createdPaths);
1817 // Update lexicon index with NSIDs from created paths
1818 const nsids = extractNsids(createdPaths);
1819 if (nsids.length > 0) {
1820 this.syncStorage.updateLexiconIndex(nsids);
1821 }
1822 }
1823 if (deletedPaths.length > 0) {
1824 this.syncStorage.removeRecordPaths(did, deletedPaths);
1825 }
1826
1827 // Flag for deferred GC if deletes or updates occurred
1828 // (updates may orphan old MST nodes/record blocks)
1829 const hasDeletesOrUpdates = event.ops.some(
1830 (op) => op.action === "delete" || op.action === "update",
1831 );
1832 if (hasDeletesOrUpdates) {
1833 this.syncStorage.setNeedsGc(did);
1834 }
1835 } catch {
1836 // Non-fatal: path tracking is best-effort
1837 }
1838
1839 // 9. Update manifest record
1840 if (this.repoManager) {
1841 const rkey = didToRkey(did);
1842 const existingManifest = await this.repoManager.getRecord(
1843 MANIFEST_NSID,
1844 rkey,
1845 );
1846 if (existingManifest) {
1847 const manifest: ManifestRecord = {
1848 $type: MANIFEST_NSID,
1849 subject: did,
1850 status: "active",
1851 lastSyncRev: rev,
1852 lastSyncAt: new Date().toISOString(),
1853 createdAt:
1854 (existingManifest.record as Record<string, unknown>)
1855 ?.createdAt as string ?? new Date().toISOString(),
1856 };
1857 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest);
1858 }
1859 }
1860
1861 // 10. Sync blobs for firehose ops (fire-and-forget)
1862 const pdsEndpoint = this.syncStorage.getState(did)?.pdsEndpoint;
1863 if (pdsEndpoint) {
1864 this.syncBlobsForOps(did, pdsEndpoint, event.ops).catch((err) => {
1865 console.warn(
1866 `[replication] Blob sync error for firehose ops (${did}):`,
1867 err instanceof Error ? err.message : String(err),
1868 );
1869 });
1870 }
1871
1872 // 11. Complete sync event
1873 this.syncStorage.completeSyncEvent(syncEventId, {
1874 status: "success",
1875 blocksAdded: blockEntries.length,
1876 carBytes: event.blocks.length,
1877 durationMs: Date.now() - syncStart,
1878 rev,
1879 rootCid: rootCidStr,
1880 incremental: true,
1881 });
1882 this.emitProgress({
1883 type: "sync:complete",
1884 did,
1885 blocksStored: blockEntries.length,
1886 durationMs: Date.now() - syncStart,
1887 });
1888 } catch (err) {
1889 this.syncStorage.completeSyncEvent(syncEventId, {
1890 status: "error",
1891 errorMessage: err instanceof Error ? err.message : String(err),
1892 durationMs: Date.now() - syncStart,
1893 incremental: true,
1894 });
1895 this.emitProgress({
1896 type: "sync:error",
1897 did,
1898 error: err instanceof Error ? err.message : String(err),
1899 });
1900 throw err;
1901 }
1902 }
1903
1904 /**
1905 * Save the current firehose cursor to persistent storage.
1906 */
1907 private saveFirehoseCursor(): void {
1908 if (!this.firehoseSubscription) return;
1909 const cursor = this.firehoseSubscription.getCursor();
1910 if (cursor !== null) {
1911 this.syncStorage.saveFirehoseCursor(cursor);
1912 }
1913 }
1914
1915 /**
1916 * Get firehose subscription statistics.
1917 */
1918 getFirehoseStats(): {
1919 connected: boolean;
1920 cursor: number | null;
1921 eventsReceived: number;
1922 eventsProcessed: number;
1923 trackedDids: number;
1924 } | null {
1925 if (!this.firehoseSubscription) return null;
1926 return this.firehoseSubscription.getStats();
1927 }
1928
1929 /**
1930 * Set up gossipsub subscription for commit notifications.
1931 * Subscribes to topics for all tracked DIDs and registers a handler
1932 * that triggers syncDid() when a newer rev is received.
1933 */
1934 private setupGossipsubSubscription(): void {
1935 const dids = this.getReplicateDids();
1936 if (dids.length === 0) return;
1937
1938 this.networkService.onCommitNotification((notification) => {
1939 this.handleGossipsubNotification(notification).catch((err) => {
1940 console.error(
1941 `[replication] Gossipsub notification handler error for ${notification.did}:`,
1942 err instanceof Error ? err.message : String(err),
1943 );
1944 });
1945 });
1946
1947 this.networkService.onIdentityNotification((notification) => {
1948 this.handleIdentityNotification(notification);
1949 });
1950
1951 this.networkService.subscribeCommitTopics(dids);
1952 this.networkService.subscribeIdentityTopics(dids);
1953
1954 // Periodically clear the dedup set to prevent unbounded growth
1955 this.notificationCleanupTimer = setInterval(() => {
1956 this.recentNotifications.clear();
1957 }, 60_000);
1958 }
1959
1960 /**
1961 * Handle an incoming gossipsub commit notification.
1962 * Deduplicates by did:rev, skips if local state is already current,
1963 * and triggers syncDid() if the notification represents a newer commit.
1964 */
1965 private async handleGossipsubNotification(notification: CommitNotification): Promise<void> {
1966 const did = notification.did;
1967
1968 // Only process DIDs we are tracking
1969 const trackedDids = this.getReplicateDids();
1970 if (!trackedDids.includes(did)) return;
1971
1972 // Dedup: skip if we've already processed this did:rev
1973 const dedupKey = `${did}:${notification.rev}`;
1974 if (this.recentNotifications.has(dedupKey)) return;
1975 this.recentNotifications.add(dedupKey);
1976
1977 // Skip if local state already matches this rev
1978 const state = this.syncStorage.getState(did);
1979 if (state?.lastSyncRev === notification.rev) return;
1980
1981 try {
1982 await this.syncDid(did, "gossipsub");
1983 this.lastSyncTimestamps.set(did, Date.now());
1984 } catch (err) {
1985 const message = err instanceof Error ? err.message : String(err);
1986 this.syncStorage.updateStatus(did, "error", message);
1987 }
1988 }
1989
1990 /**
1991 * Handle an incoming gossipsub identity notification.
1992 * Updates peer info immediately if the DID is tracked.
1993 */
1994 private handleIdentityNotification(notification: IdentityNotification): void {
1995 const did = notification.did;
1996
1997 // Only process DIDs we are tracking
1998 const trackedDids = this.getReplicateDids();
1999 if (!trackedDids.includes(did)) return;
2000
2001 // Log PeerID changes
2002 const state = this.syncStorage.getState(did);
2003 if (state?.peerId && state.peerId !== notification.peerId) {
2004 console.warn(
2005 `[replication] Identity notification: PeerID changed for ${did}: ${state.peerId} → ${notification.peerId}`,
2006 );
2007 }
2008
2009 // Update peer info immediately
2010 this.syncStorage.updatePeerInfo(did, notification.peerId, notification.multiaddrs);
2011
2012 // Refresh PLC log on identity change (key rotation, PDS migration, etc.)
2013 if (did.startsWith("did:plc:")) {
2014 this.fetchPlcLog(did).catch((err) => {
2015 console.warn(`[replication] PLC log refresh on identity change for ${did} failed:`, err instanceof Error ? err.message : String(err));
2016 });
2017 }
2018 }
2019
2020 /**
2021 * Stop periodic sync, verification, and firehose subscription.
2022 */
2023 stop(): void {
2024 this.stopped = true;
2025 if (this.syncTimer) {
2026 clearInterval(this.syncTimer);
2027 this.syncTimer = null;
2028 }
2029 if (this.verificationTimer) {
2030 clearInterval(this.verificationTimer);
2031 this.verificationTimer = null;
2032 }
2033 // Save cursor and stop firehose
2034 if (this.firehoseCursorSaveTimer) {
2035 clearInterval(this.firehoseCursorSaveTimer);
2036 this.firehoseCursorSaveTimer = null;
2037 }
2038 if (this.firehoseSubscription) {
2039 this.saveFirehoseCursor();
2040 this.firehoseSubscription.stop();
2041 this.firehoseSubscription = null;
2042 }
2043 // Stop PLC mirror refresh
2044 if (this.plcRefreshTimer) {
2045 clearInterval(this.plcRefreshTimer);
2046 this.plcRefreshTimer = null;
2047 }
2048 // Stop challenge scheduler
2049 if (this.challengeScheduler) {
2050 this.challengeScheduler.stop();
2051 this.challengeScheduler = null;
2052 }
2053 // Stop gossipsub notification cleanup and unsubscribe identity topics
2054 if (this.notificationCleanupTimer) {
2055 clearInterval(this.notificationCleanupTimer);
2056 this.notificationCleanupTimer = null;
2057 }
2058 try {
2059 this.networkService.unsubscribeIdentityTopics(this.getReplicateDids());
2060 } catch {
2061 // Gossipsub streams may already be closed during shutdown
2062 }
2063 }
2064
2065 /**
2066 * Run remote verification for all synced DIDs.
2067 */
2068 async runVerification(): Promise<LayeredVerificationResult[]> {
2069 const results: LayeredVerificationResult[] = [];
2070
2071 for (const did of this.getReplicateDids()) {
2072 if (this.stopped) break;
2073 try {
2074 const result = await this.verifyDid(did);
2075 if (result) {
2076 results.push(result);
2077 this.lastVerificationResults.set(did, result);
2078 }
2079 } catch (err) {
2080 console.error(`Verification error for ${did}:`, err);
2081 }
2082 }
2083
2084 return results;
2085 }
2086
2087 /**
2088 * Run layered verification for a single DID against its source PDS.
2089 */
2090 async verifyDid(did: string): Promise<LayeredVerificationResult | null> {
2091 const state = this.syncStorage.getState(did);
2092 if (!state || !state.pdsEndpoint) return null;
2093
2094 // Get tracked block CIDs for this DID
2095 const blockCids = this.syncStorage.getBlockCids(did);
2096
2097 // Use the root CID (commit root) for verification
2098 const rootCid = state.rootCid ?? state.lastSyncRev ?? null;
2099
2100 // Get tracked record paths for challenge generation
2101 const recordPaths = this.syncStorage.getRecordPaths(did);
2102
2103 const result = await this.remoteVerifier.verifyPeer(
2104 did,
2105 state.pdsEndpoint,
2106 rootCid,
2107 blockCids,
2108 recordPaths,
2109 );
2110
2111 if (result.overallPassed) {
2112 this.syncStorage.updateVerifiedAt(did);
2113 }
2114
2115 return result;
2116 }
2117
2118 /**
2119 * Get the underlying SyncStorage instance.
2120 */
2121 getSyncStorage(): SyncStorage {
2122 return this.syncStorage;
2123 }
2124
2125 /**
2126 * Get the underlying ChallengeStorage instance.
2127 */
2128 getChallengeStorage(): ChallengeStorage {
2129 return this.challengeStorage;
2130 }
2131
2132 /**
2133 * Set the replicated repo reader for cache invalidation after sync.
2134 */
2135 setReplicatedRepoReader(reader: ReplicatedRepoReader): void {
2136 this.replicatedRepoReader = reader;
2137 }
2138
2139 /**
2140 * Get sync states for all tracked DIDs.
2141 */
2142 getSyncStates(): SyncState[] {
2143 return this.syncStorage.getAllStates();
2144 }
2145
2146 /**
2147 * Query the lexicon index with optional prefix filter.
2148 */
2149 getLexiconIndex(prefix?: string, limit?: number) {
2150 return this.syncStorage.getLexiconIndex(prefix, limit);
2151 }
2152
2153 /**
2154 * Get aggregate lexicon stats.
2155 */
2156 getLexiconStats() {
2157 return this.syncStorage.getLexiconStats();
2158 }
2159
2160 /**
2161 * Get all offered DIDs (awaiting mutual consent).
2162 */
2163 getOfferedDids(): Array<{ did: string; pdsEndpoint: string | null; offeredAt: string }> {
2164 return this.syncStorage.getOfferedDids();
2165 }
2166
2167 /**
2168 * Get the most recent verification results.
2169 */
2170 getVerificationResults(): Map<string, LayeredVerificationResult> {
2171 return this.lastVerificationResults;
2172 }
2173
2174 // ============================================
2175 // Blob replication
2176 // ============================================
2177
2178 /**
2179 * Sync blobs for a DID: walk all records, extract blob CIDs, fetch new ones.
2180 */
2181 async syncBlobs(did: string): Promise<{ fetched: number; skipped: number; errors: number; totalBytes: number }> {
2182 const state = this.syncStorage.getState(did);
2183 if (!state?.pdsEndpoint || !state.rootCid) {
2184 return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 };
2185 }
2186
2187 if (!this.replicatedRepoReader) {
2188 return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 };
2189 }
2190
2191 // Walk all records and collect blob CIDs
2192 const allBlobCids = new Set<string>();
2193 const repo = await this.replicatedRepoReader.getRepo(did);
2194 if (!repo) {
2195 return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 };
2196 }
2197
2198 for await (const entry of repo.walkRecords()) {
2199 const cids = extractBlobCids(entry.record);
2200 for (const cid of cids) {
2201 allBlobCids.add(cid);
2202 }
2203 }
2204
2205 // Filter to blobs not already fetched
2206 let fetched = 0;
2207 let skipped = 0;
2208 let errors = 0;
2209 let totalBytes = 0;
2210 const total = allBlobCids.size;
2211
2212 for (const blobCid of allBlobCids) {
2213 if (this.syncStorage.hasBlobCid(did, blobCid)) {
2214 skipped++;
2215 continue;
2216 }
2217
2218 try {
2219 const bytes = await this.repoFetcher.fetchBlob(
2220 state.pdsEndpoint,
2221 did,
2222 blobCid,
2223 );
2224 if (!bytes) {
2225 skipped++; // 404 — blob deleted upstream
2226 continue;
2227 }
2228
2229 if (!await this.verifyBlobCid(blobCid, bytes)) {
2230 console.warn(`[replication] Blob CID mismatch for ${blobCid} (${did})`);
2231 errors++;
2232 continue;
2233 }
2234
2235 await this.blockStore.putBlock(blobCid, bytes);
2236 this.syncStorage.trackBlobsWithSize(did, [{ cid: blobCid, sizeBytes: bytes.length }]);
2237 this.networkService.provideBlocks([blobCid]).catch(() => {});
2238 fetched++;
2239 totalBytes += bytes.length;
2240 this.emitProgress({ type: "sync:blob-progress", did, blobsFetched: fetched, blobsTotal: total, blobBytes: totalBytes });
2241 } catch (err) {
2242 console.warn(
2243 `[replication] Failed to fetch blob ${blobCid} for ${did}:`,
2244 err instanceof Error ? err.message : String(err),
2245 );
2246 errors++;
2247 }
2248 }
2249
2250 return { fetched, skipped, errors, totalBytes };
2251 }
2252
2253 /**
2254 * Sync blobs for specific firehose ops (create/update only).
2255 */
2256 private async syncBlobsForOps(
2257 did: string,
2258 pdsEndpoint: string,
2259 ops: Array<{ action: string; path: string }>,
2260 ): Promise<void> {
2261 if (!this.replicatedRepoReader) return;
2262
2263 for (const op of ops) {
2264 if (op.action !== "create" && op.action !== "update") continue;
2265
2266 const parts = op.path.split("/");
2267 if (parts.length < 2) continue;
2268 const collection = parts.slice(0, -1).join("/");
2269 const rkey = parts[parts.length - 1]!;
2270
2271 try {
2272 const record = await this.replicatedRepoReader.getRecord(did, collection, rkey);
2273 if (!record) continue;
2274
2275 const blobCids = extractBlobCids(record.value);
2276 for (const blobCid of blobCids) {
2277 if (this.syncStorage.hasBlobCid(did, blobCid)) continue;
2278
2279 const bytes = await this.repoFetcher.fetchBlob(pdsEndpoint, did, blobCid);
2280 if (!bytes) continue;
2281
2282 if (!await this.verifyBlobCid(blobCid, bytes)) {
2283 console.warn(`[replication] Blob CID mismatch for ${blobCid} (${did})`);
2284 continue;
2285 }
2286
2287 await this.blockStore.putBlock(blobCid, bytes);
2288 this.syncStorage.trackBlobs(did, [blobCid]);
2289 this.networkService.provideBlocks([blobCid]).catch(() => {});
2290 }
2291 } catch (err) {
2292 console.warn(
2293 `[replication] Failed to sync blobs for op ${op.path} (${did}):`,
2294 err instanceof Error ? err.message : String(err),
2295 );
2296 }
2297 }
2298 }
2299
2300 /**
2301 * Verify that blob bytes match the expected CID.
2302 */
2303 private async verifyBlobCid(expectedCid: string, bytes: Uint8Array): Promise<boolean> {
2304 const actualCid = await createCid(CODEC_RAW, bytes);
2305 return cidToString(actualCid) === expectedCid;
2306 }
2307
2308 // ============================================
2309 // Challenge-response verification
2310 // ============================================
2311
2312 /**
2313 * Start the challenge scheduler for periodic proof-of-storage verification.
2314 * Requires a ChallengeTransport to communicate with peers.
2315 */
2316 startChallengeScheduler(transport: ChallengeTransport): void {
2317 if (this.challengeScheduler) return;
2318
2319 this.challengeScheduler = new ChallengeScheduler(
2320 this.config.DID ?? "",
2321 this.policyEngine,
2322 this.syncStorage,
2323 this.challengeStorage,
2324 this.blockStore,
2325 transport,
2326 );
2327 this.challengeScheduler.start();
2328 }
2329
2330 /**
2331 * Get challenge history for a target peer.
2332 */
2333 getChallengeResults(
2334 targetDid: string,
2335 subjectDid?: string,
2336 ): ChallengeHistoryRow[] {
2337 return this.challengeStorage.getHistory(targetDid, subjectDid);
2338 }
2339
2340 /**
2341 * Get peer reliability scores.
2342 * If peerDid is provided, returns reliability for that specific peer.
2343 * Otherwise, returns all peer reliability scores.
2344 */
2345 getPeerReliability(peerDid?: string): PeerReliabilityRow[] {
2346 if (peerDid) {
2347 return this.challengeStorage.getReliability(peerDid);
2348 }
2349 return this.challengeStorage.getAllReliability();
2350 }
2351
2352 /**
2353 * Refresh peer info for all DIDs associated with a PDS endpoint.
2354 * Called when a connection to that endpoint fails (e.g. challenge transport fallback).
2355 * Fire-and-forget: clears stale cache and triggers re-discovery.
2356 */
2357 refreshPeerInfoForEndpoint(pdsEndpoint: string): void {
2358 const states = this.syncStorage.getAllStates();
2359 for (const state of states) {
2360 if (state.pdsEndpoint === pdsEndpoint) {
2361 this.syncStorage.clearPeerInfo(state.did);
2362 this.peerDiscovery.discoverPeer(state.did).then((peerInfo) => {
2363 if (peerInfo) {
2364 this.syncStorage.updatePeerInfo(state.did, peerInfo.peerId, peerInfo.multiaddrs);
2365 }
2366 }).catch(() => {});
2367 }
2368 }
2369 }
2370
2371 // ============================================
2372 // PLC mirror
2373 // ============================================
2374
2375 /**
2376 * Fetch and validate the PLC audit log for a single DID.
2377 */
2378 private async fetchPlcLog(did: string): Promise<void> {
2379 const { fetchAndValidateLog } = await import("../identity/plc-mirror.js");
2380 await fetchAndValidateLog(this.db, did);
2381 // Announce via DHT that we hold this DID's PLC log
2382 this.networkService.provideForDid(did).catch(() => {});
2383 }
2384
2385 /**
2386 * Refresh PLC logs for all tracked did:plc DIDs.
2387 */
2388 private async refreshAllPlcLogs(): Promise<void> {
2389 const { getAllPlcMirrorDids, fetchAndValidateLog, publishPlcProviders } = await import("../identity/plc-mirror.js");
2390
2391 // Refresh existing mirrored DIDs
2392 const mirroredDids = getAllPlcMirrorDids(this.db);
2393 for (const did of mirroredDids) {
2394 if (this.stopped) break;
2395 try {
2396 await fetchAndValidateLog(this.db, did);
2397 } catch (err) {
2398 console.warn(
2399 `[replication] PLC log refresh for ${did} failed:`,
2400 err instanceof Error ? err.message : String(err),
2401 );
2402 // Fallback: try discovering PLC log from DHT peers
2403 await this.fetchPlcLogFromPeers(did).catch(() => {});
2404 }
2405 }
2406
2407 // Also fetch for any tracked did:plc DIDs that don't have a mirror entry yet
2408 const trackedDids = this.getReplicateDids().filter((d) => d.startsWith("did:plc:"));
2409 const mirroredSet = new Set(mirroredDids);
2410 for (const did of trackedDids) {
2411 if (this.stopped) break;
2412 if (mirroredSet.has(did)) continue;
2413 try {
2414 await fetchAndValidateLog(this.db, did);
2415 } catch (err) {
2416 console.warn(
2417 `[replication] PLC log fetch for ${did} failed:`,
2418 err instanceof Error ? err.message : String(err),
2419 );
2420 // Fallback: try discovering PLC log from DHT peers
2421 await this.fetchPlcLogFromPeers(did).catch(() => {});
2422 }
2423 }
2424
2425 // Re-announce all mirrored DIDs to the DHT
2426 const allMirrored = getAllPlcMirrorDids(this.db);
2427 if (allMirrored.length > 0) {
2428 const helia = this.networkService as unknown as { getLibp2p?(): unknown };
2429 if (typeof helia.getLibp2p === "function" && helia.getLibp2p()) {
2430 // Use the IpfsService routing directly via provideForDid
2431 for (const did of allMirrored) {
2432 this.networkService.provideForDid(did).catch(() => {});
2433 }
2434 }
2435 }
2436 }
2437
2438 /**
2439 * Fallback: discover PLC log providers via DHT and fetch from a peer.
2440 * Used when plc.directory is unavailable.
2441 */
2442 private async fetchPlcLogFromPeers(did: string): Promise<void> {
2443 const providers = await this.networkService.findProvidersForDid(did);
2444 if (providers.length === 0) return;
2445
2446 const { validateOperationChain, getStoredLog } = await import("../identity/plc-mirror.js");
2447
2448 for (const multiaddr of providers) {
2449 try {
2450 // Extract the host:port from the multiaddr for an HTTP fetch
2451 const hostMatch = multiaddr.match(/\/ip[46]\/([^/]+)\/tcp\/(\d+)/);
2452 if (!hostMatch) continue;
2453 const [, host, port] = hostMatch;
2454
2455 const url = `http://${host}:${port}/xrpc/org.p2pds.plc.getLog?did=${encodeURIComponent(did)}`;
2456 const res = await fetch(url, { signal: AbortSignal.timeout(10000) });
2457 if (!res.ok) continue;
2458
2459 const data = await res.json() as { operations?: unknown[]; status?: { validated?: boolean } };
2460 if (!data.operations || !Array.isArray(data.operations)) continue;
2461
2462 // Validate the returned chain independently
2463 const validation = await validateOperationChain(data.operations as any, did);
2464 if (!validation.valid) continue;
2465
2466 // Store it — we verified it ourselves
2467 const existingLog = getStoredLog(this.db, did);
2468 const newOpCount = data.operations.length;
2469
2470 // Accept if we don't have it yet, or if the new chain is longer
2471 if (!existingLog || newOpCount > existingLog.status.opCount) {
2472 const isTombstoned = (data.operations as any[]).some(
2473 (op: any) => !op.nullified && op.operation?.type === "plc_tombstone",
2474 );
2475 const lastOpCreatedAt = newOpCount > 0
2476 ? (data.operations as any[])[newOpCount - 1]!.createdAt ?? null
2477 : null;
2478
2479 this.db.prepare(
2480 `INSERT INTO plc_mirror (did, operations_json, op_count, last_fetched_at, last_op_created_at, validated, is_tombstoned)
2481 VALUES (?, ?, ?, ?, ?, ?, ?)
2482 ON CONFLICT(did) DO UPDATE SET
2483 operations_json = excluded.operations_json,
2484 op_count = excluded.op_count,
2485 last_fetched_at = excluded.last_fetched_at,
2486 last_op_created_at = excluded.last_op_created_at,
2487 validated = excluded.validated,
2488 is_tombstoned = excluded.is_tombstoned`,
2489 ).run(
2490 did,
2491 JSON.stringify(data.operations),
2492 newOpCount,
2493 new Date().toISOString(),
2494 lastOpCreatedAt,
2495 validation.valid ? 1 : 0,
2496 isTombstoned ? 1 : 0,
2497 );
2498 console.log(`[replication] PLC log for ${did} fetched from peer (${newOpCount} ops)`);
2499 return;
2500 }
2501 } catch {
2502 // Try next provider
2503 }
2504 }
2505 }
2506
2507 /**
2508 * Get PLC log status for a single DID.
2509 */
2510 getPlcLogStatus(did: string): { opCount: number; lastFetchedAt: string; lastOpCreatedAt: string | null; validated: boolean; isTombstoned: boolean } | null {
2511 const row = this.db
2512 .prepare("SELECT * FROM plc_mirror WHERE did = ?")
2513 .get(did) as Record<string, unknown> | undefined;
2514 if (!row) return null;
2515 return {
2516 opCount: row.op_count as number,
2517 lastFetchedAt: row.last_fetched_at as string,
2518 lastOpCreatedAt: (row.last_op_created_at as string) ?? null,
2519 validated: (row.validated as number) === 1,
2520 isTombstoned: (row.is_tombstoned as number) === 1,
2521 };
2522 }
2523
2524 /**
2525 * Get aggregate PLC mirror stats.
2526 */
2527 getPlcMirrorStatus(): { mirroredDids: number; totalOps: number; lastRefresh: string | null } {
2528 const countRow = this.db
2529 .prepare("SELECT COUNT(*) as count, COALESCE(SUM(op_count), 0) as total_ops FROM plc_mirror")
2530 .get() as { count: number; total_ops: number };
2531 const lastRow = this.db
2532 .prepare("SELECT MAX(last_fetched_at) as last_refresh FROM plc_mirror")
2533 .get() as { last_refresh: string | null };
2534 return {
2535 mirroredDids: countRow.count,
2536 totalOps: countRow.total_ops,
2537 lastRefresh: lastRow.last_refresh,
2538 };
2539 }
2540
2541 /**
2542 * Check if peer info should be refreshed based on TTL.
2543 */
2544 private shouldRefreshPeerInfo(state: SyncState): boolean {
2545 if (!state.peerInfoFetchedAt) return true;
2546 const fetchedAt = new Date(state.peerInfoFetchedAt).getTime();
2547 return Date.now() - fetchedAt > PEER_INFO_TTL_MS;
2548 }
2549}