atproto user agency toolkit for individuals and groups
at main 2549 lines 83 kB view raw
1/** 2 * Main replication orchestrator. 3 * Publishes manifest records, syncs remote repos to IPFS. 4 * Optionally driven by a PolicyEngine for per-DID intervals, priority, and filtering. 5 */ 6 7import type Database from "better-sqlite3"; 8import type { Config } from "../config.js"; 9import type { RepoManager } from "../repo-manager.js"; 10import { extractBlobCids } from "../repo-manager.js"; 11import { create as createCid, CODEC_RAW, toString as cidToString } from "@atcute/cid"; 12import type { BlockStore, NetworkService, CommitNotification, IdentityNotification } from "../ipfs.js"; 13import type { DidResolver } from "../did-resolver.js"; 14import { readCarWithRoot } from "@atproto/repo"; 15import { decode as cborDecode } from "../cbor-compat.js"; 16import type { ReplicatedRepoReader } from "./replicated-repo-reader.js"; 17import type { PolicyEngine } from "../policy/engine.js"; 18 19import { 20 MANIFEST_NSID, 21 didToRkey, 22 type ManifestRecord, 23 type SyncState, 24 type SyncTrigger, 25 type VerificationConfig, 26 type LayeredVerificationResult, 27 DEFAULT_VERIFICATION_CONFIG, 28} from "./types.js"; 29import { SyncStorage } from "./sync-storage.js"; 30import { RepoFetcher } from "./repo-fetcher.js"; 31import { PeerDiscovery } from "./peer-discovery.js"; 32import { BlockVerifier, RemoteVerifier } from "./verification.js"; 33import { extractAllRecordPaths, extractAllCids } from "./mst-proof.js"; 34import { 35 FirehoseSubscription, 36 type FirehoseCommitEvent, 37 type FirehoseAccountEvent, 38} from "./firehose-subscription.js"; 39import { ChallengeScheduler } from "./challenge-response/challenge-scheduler.js"; 40import { ChallengeStorage, type ChallengeHistoryRow, type PeerReliabilityRow } from "./challenge-response/challenge-storage.js"; 41import type { ChallengeTransport } from "./challenge-response/transport.js"; 42import { OfferManager, type RecordWriter } from "./offer-manager.js"; 43import { fetchRepoFromPeer } from "./libp2p-sync.js"; 44import type { Libp2p } from "@libp2p/interface"; 45 46/** Progress event emitted during sync operations for live UI updates. */ 47export interface SyncProgressEvent { 48 type: string; 49 did: string; 50 sourceType?: string; 51 carBytes?: number; 52 blocksStored?: number; 53 blocksSizeKb?: number; 54 blobsFetched?: number; 55 blobsTotal?: number; 56 blobBytes?: number; 57 durationMs?: number; 58 error?: string; 59 missingBlocks?: number; 60} 61 62/** Extract unique NSIDs from record paths (collection/rkey format). */ 63function extractNsids(paths: string[]): string[] { 64 const nsids = new Set<string>(); 65 for (const p of paths) { 66 const slash = p.indexOf("/"); 67 if (slash > 0) nsids.add(p.slice(0, slash)); 68 } 69 return [...nsids]; 70} 71 72/** How old cached peer info can be before re-fetching (1 hour). */ 73const PEER_INFO_TTL_MS = 60 * 60 * 1000; 74 75/** Default sync interval when no policy engine is present (5 minutes). */ 76const DEFAULT_SYNC_INTERVAL_MS = 5 * 60 * 1000; 77 78/** Minimum tick interval to prevent excessive polling (10 seconds). */ 79const MIN_TICK_INTERVAL_MS = 10 * 1000; 80 81export class ReplicationManager { 82 private syncStorage: SyncStorage; 83 private repoFetcher: RepoFetcher; 84 private peerDiscovery: PeerDiscovery; 85 private verifier: BlockVerifier; 86 private remoteVerifier: RemoteVerifier; 87 private syncTimer: ReturnType<typeof setInterval> | null = null; 88 private verificationTimer: ReturnType<typeof setInterval> | null = null; 89 private verificationConfig: VerificationConfig; 90 private lastVerificationResults: Map<string, LayeredVerificationResult> = 91 new Map(); 92 private firehoseSubscription: FirehoseSubscription | null = null; 93 private firehoseCursorSaveTimer: ReturnType<typeof setInterval> | null = null; 94 private challengeStorage: ChallengeStorage; 95 private challengeScheduler: ChallengeScheduler | null = null; 96 private stopped = false; 97 private policyEngine: PolicyEngine | null = null; 98 private offerManager: OfferManager | null = null; 99 /** Per-DID last-sync timestamps (epoch ms) for policy-driven interval tracking. */ 100 private lastSyncTimestamps: Map<string, number> = new Map(); 101 /** Dedup set for gossipsub notifications, keyed by `${did}:${rev}`. */ 102 private recentNotifications: Set<string> = new Set(); 103 private notificationCleanupTimer: ReturnType<typeof setInterval> | null = null; 104 /** libp2p node for peer-first repo sync. Set via setLibp2p(). */ 105 private libp2p: Libp2p | null = null; 106 /** Cached multiaddrs from last peer record publish, for change detection. */ 107 private lastPublishedAddrs: Set<string> = new Set(); 108 /** Callback to republish peer record when multiaddrs change. */ 109 private publishPeerRecordFn?: () => Promise<void>; 110 /** Sync progress event subscribers for live UI updates via SSE. */ 111 private progressCallbacks: Set<(event: SyncProgressEvent) => void> = new Set(); 112 /** Timer for periodic PLC mirror refresh (6 hours). */ 113 private plcRefreshTimer: ReturnType<typeof setInterval> | null = null; 114 115 constructor( 116 private db: Database.Database, 117 private config: Config, 118 private repoManager: RepoManager | undefined, 119 private blockStore: BlockStore, 120 private networkService: NetworkService, 121 private didResolver: DidResolver, 122 verificationConfig?: Partial<VerificationConfig>, 123 private replicatedRepoReader?: ReplicatedRepoReader, 124 policyEngine?: PolicyEngine, 125 pdsClient?: RecordWriter, 126 ) { 127 this.syncStorage = new SyncStorage(db); 128 this.syncStorage.initSchema(); 129 this.challengeStorage = new ChallengeStorage(db); 130 this.challengeStorage.initSchema(); 131 this.repoFetcher = new RepoFetcher(didResolver); 132 this.peerDiscovery = new PeerDiscovery(this.repoFetcher); 133 this.verifier = new BlockVerifier(blockStore); 134 this.verificationConfig = { 135 ...DEFAULT_VERIFICATION_CONFIG, 136 ...verificationConfig, 137 }; 138 this.remoteVerifier = new RemoteVerifier( 139 blockStore, 140 this.verificationConfig, 141 ); 142 if (policyEngine) { 143 this.policyEngine = policyEngine; 144 // Prefer remote PDS client for offer records; fall back to local repo 145 const recordWriter: RecordWriter | undefined = pdsClient ?? repoManager; 146 if (recordWriter) { 147 this.offerManager = new OfferManager( 148 recordWriter, 149 this.peerDiscovery, 150 policyEngine, 151 config.DID ?? "", 152 ); 153 } 154 } 155 } 156 157 /** 158 * Get the PolicyEngine, if one is configured. 159 */ 160 getPolicyEngine(): PolicyEngine | null { 161 return this.policyEngine; 162 } 163 164 /** 165 * Get the OfferManager, if one is configured (requires PolicyEngine). 166 */ 167 getOfferManager(): OfferManager | null { 168 return this.offerManager; 169 } 170 171 /** 172 * Late-bind a PdsClient (or any RecordWriter) after construction. 173 * Called when OAuth login completes after the server has already started. 174 * Creates the OfferManager if a PolicyEngine is present but OfferManager 175 * wasn't created at construction time (because no RecordWriter was available). 176 */ 177 setPdsClient(client: RecordWriter, did: string): void { 178 if (this.policyEngine && !this.offerManager) { 179 this.offerManager = new OfferManager( 180 client, 181 this.peerDiscovery, 182 this.policyEngine, 183 did, 184 ); 185 } 186 } 187 188 /** 189 * Set the libp2p node for peer-first repo sync. 190 * Called from start.ts after IPFS is started. 191 */ 192 setLibp2p(libp2p: Libp2p): void { 193 this.libp2p = libp2p; 194 } 195 196 /** 197 * Set a callback to republish the peer record when multiaddrs change. 198 * Called from start.ts after IPFS and replication are ready. 199 */ 200 setPublishPeerRecordFn(fn: () => Promise<void>): void { 201 this.publishPeerRecordFn = fn; 202 // Seed the cache with current addrs so the first check doesn't spuriously trigger 203 this.lastPublishedAddrs = new Set(this.networkService.getMultiaddrs()); 204 } 205 206 /** Subscribe to sync progress events (for SSE streaming). */ 207 onSyncProgress(cb: (event: SyncProgressEvent) => void): void { 208 this.progressCallbacks.add(cb); 209 } 210 211 /** Unsubscribe from sync progress events. */ 212 offSyncProgress(cb: (event: SyncProgressEvent) => void): void { 213 this.progressCallbacks.delete(cb); 214 } 215 216 /** Emit a progress event to all subscribers. */ 217 private emitProgress(event: SyncProgressEvent): void { 218 for (const cb of this.progressCallbacks) { 219 try { cb(event); } catch { /* non-fatal */ } 220 } 221 } 222 223 /** 224 * Check if multiaddrs have changed since last peer record publish. 225 * If changed, republish the peer record and update the cache. 226 */ 227 private async _checkMultiaddrsChanged(): Promise<void> { 228 if (!this.publishPeerRecordFn) return; 229 230 const current = new Set(this.networkService.getMultiaddrs()); 231 if (current.size === 0) return; 232 233 // Compare sets 234 if ( 235 current.size === this.lastPublishedAddrs.size && 236 [...current].every((a) => this.lastPublishedAddrs.has(a)) 237 ) { 238 return; 239 } 240 241 this.lastPublishedAddrs = current; 242 try { 243 await this.publishPeerRecordFn(); 244 console.log("[replication] Multiaddrs changed, republished peer record"); 245 } catch (err) { 246 console.warn( 247 "[replication] Failed to republish peer record:", 248 err instanceof Error ? err.message : String(err), 249 ); 250 } 251 } 252 253 /** 254 * Initialize replication: create tables, sync manifests. 255 */ 256 async init(): Promise<void> { 257 this.syncStorage.initSchema(); 258 this.challengeStorage.initSchema(); 259 await this.syncManifests(); 260 await this.discoverPeerEndpoints(); 261 await this.runOfferDiscovery(); 262 this.setupGossipsubSubscription(); 263 } 264 265 /** 266 * Get the list of DIDs to replicate. 267 * 268 * When a PolicyEngine is present, uses getActiveDids() as the single source of truth. 269 * Falls back to legacy three-source merge when no PolicyEngine is configured. 270 */ 271 getReplicateDids(): string[] { 272 if (this.policyEngine) { 273 return this.policyEngine.getActiveDids(); 274 } 275 276 // Legacy fallback: merge config + admin DIDs (no policy engine) 277 const allDids = new Set(this.config.REPLICATE_DIDS); 278 for (const did of this.syncStorage.getAdminDids()) { 279 allDids.add(did); 280 } 281 return [...allDids]; 282 } 283 284 /** 285 * Determine the source of a tracked DID by looking up its policy. 286 */ 287 getDidSource(did: string): "config" | "user" | "policy" | null { 288 if (this.policyEngine) { 289 const configPolicy = this.policyEngine.getStoredPolicy(`config:${did}`); 290 if (configPolicy) return "config"; 291 const archivePolicy = this.policyEngine.getStoredPolicy(`archive:${did}`); 292 if (archivePolicy) return "user"; 293 const p2pPolicy = this.policyEngine.getStoredPolicy(`p2p:${did}`); 294 if (p2pPolicy) return "policy"; 295 // Check if any other policy covers this DID 296 if (this.policyEngine.shouldReplicate(did)) return "policy"; 297 } 298 // Legacy fallback 299 if (this.config.REPLICATE_DIDS.includes(did)) return "config"; 300 if (this.syncStorage.isAdminDid(did)) return "user"; 301 return null; 302 } 303 304 /** 305 * Add a DID via the admin interface. 306 * Creates an archive policy (+ dual-write to admin_tracked_dids for rollback safety), 307 * creates manifest + sync state, subscribes gossipsub, updates firehose. 308 * Returns the status and source if already tracked. 309 */ 310 async addDid(did: string): Promise<{ status: "added" | "already_tracked"; source?: string }> { 311 const existingSource = this.getDidSource(did); 312 if (existingSource) { 313 return { status: "already_tracked", source: existingSource }; 314 } 315 316 // Check if already tracked via getReplicateDids (covers pattern/all policies) 317 if (this.getReplicateDids().includes(did)) { 318 return { status: "already_tracked", source: "policy" }; 319 } 320 321 // Create archive policy in PolicyEngine (auto-persists to DB) 322 if (this.policyEngine) { 323 const policyId = `archive:${did}`; 324 if (!this.policyEngine.getPolicies().some((p) => p.id === policyId)) { 325 const { archive } = await import("../policy/presets.js"); 326 this.policyEngine.addPolicy(archive(did)); 327 } 328 } 329 330 // Dual-write to admin_tracked_dids for rollback safety 331 this.syncStorage.addAdminDid(did); 332 333 await this.syncManifestForDid(did); 334 this.networkService.subscribeCommitTopics([did]); 335 this.networkService.subscribeIdentityTopics([did]); 336 this.updateFirehoseDids(); 337 338 // Trigger initial sync in background (fire-and-forget) 339 this.syncDid(did, "manual").catch((err) => { 340 console.error(`[replication] Initial sync for admin-added ${did} failed:`, err); 341 }); 342 343 // Fetch PLC log in background (fire-and-forget) 344 if (did.startsWith("did:plc:")) { 345 this.fetchPlcLog(did).catch((err) => { 346 console.warn(`[replication] PLC log fetch for ${did} failed:`, err instanceof Error ? err.message : String(err)); 347 }); 348 } 349 350 return { status: "added" }; 351 } 352 353 /** 354 * Offer to replicate a DID (consent-gated). 355 * Publishes an offer record and stores in offered_dids, but does NOT sync. 356 * Replication begins only when mutual consent is detected during offer discovery. 357 */ 358 async offerDid(did: string): Promise<{ status: "offered" | "already_tracked" | "already_offered"; source?: string }> { 359 // Already fully tracked? 360 const existingSource = this.getDidSource(did); 361 if (existingSource) { 362 return { status: "already_tracked", source: existingSource }; 363 } 364 365 // Already offered? 366 if (this.syncStorage.isOfferedDid(did)) { 367 return { status: "already_offered" }; 368 } 369 370 // Require active session to publish offer record 371 if (!this.offerManager) { 372 throw new Error("No active session — log in first to publish offers"); 373 } 374 375 // Resolve PDS endpoint 376 const pdsEndpoint = await this.repoFetcher.resolvePds(did); 377 378 // Publish offer record to user's PDS 379 await this.offerManager.publishOffer(did); 380 381 // Store in offered_dids (does NOT create replication_state or trigger sync) 382 this.syncStorage.addOfferedDid(did, pdsEndpoint ?? null); 383 384 // Push notification: tell the target node about this offer (fire-and-forget) 385 // Small delay to ensure PDS has committed the record before the peer verifies it 386 setTimeout(() => { 387 this.pushOfferNotification(did).catch((err) => { 388 console.warn( 389 `[replication] Failed to push offer notification to ${did}:`, 390 err instanceof Error ? err.message : String(err), 391 ); 392 }); 393 }, 2000); 394 395 return { status: "offered" }; 396 } 397 398 /** 399 * Remove an offered DID: revoke the offer, notify the peer, and remove from offered_dids. 400 */ 401 async removeOfferedDid(did: string): Promise<{ status: "removed" | "not_found" }> { 402 if (!this.syncStorage.isOfferedDid(did)) { 403 return { status: "not_found" }; 404 } 405 406 // Revoke offer record if OfferManager is available 407 if (this.offerManager) { 408 await this.offerManager.revokeOffer(did); 409 } 410 411 this.syncStorage.removeOfferedDid(did); 412 413 // Notify peer that the offer was revoked (fire-and-forget) 414 this.pushRevokeNotification(did).catch((err) => { 415 console.warn(`[replication] Failed to push revoke notification to ${did}:`, err instanceof Error ? err.message : String(err)); 416 }); 417 418 return { status: "removed" }; 419 } 420 421 /** 422 * Revoke consent and remove a DID that was promoted to active replication via mutual offer. 423 * Revokes the offer record, purges all data, and notifies the peer. 424 */ 425 async revokeReplication(did: string): Promise<{ status: "revoked" | "not_found" | "error"; error?: string }> { 426 // Must be an actively tracked DID 427 const source = this.getDidSource(did); 428 const hasState = this.syncStorage.getAllStates().some((s) => s.did === did); 429 if (!source && !hasState) { 430 return { status: "not_found" }; 431 } 432 433 // Revoke offer record on PDS 434 if (this.offerManager) { 435 try { 436 await this.offerManager.revokeOffer(did); 437 } catch { 438 // Non-fatal: offer record may not exist 439 } 440 } 441 442 // Transition P2P policy to terminated 443 if (this.policyEngine) { 444 const policyId = `p2p:${did}`; 445 this.policyEngine.transitionPolicy(policyId, "terminated"); 446 } 447 448 // Remove from offered_dids if present 449 if (this.syncStorage.isOfferedDid(did)) { 450 this.syncStorage.removeOfferedDid(did); 451 } 452 453 // Notify peer BEFORE purging (purge clears peer endpoints needed for notification) 454 try { 455 await this.pushRevokeNotification(did); 456 } catch (err) { 457 console.warn(`[replication] Failed to push revoke notification to ${did}:`, err instanceof Error ? err.message : String(err)); 458 } 459 460 // Remove from replication with full data purge 461 const result = await this.removeDid(did, true); 462 if (result.status === "error") { 463 return { status: "error", error: result.error }; 464 } 465 466 return { status: "revoked" }; 467 } 468 469 /** 470 * Handle a revocation from a remote peer: stop replicating their DID and purge data. 471 */ 472 async handleRemoteRevocation(revokerDid: string): Promise<void> { 473 // Remove from incoming offers if present 474 const incomingOffers = this.syncStorage.getIncomingOffers(); 475 for (const offer of incomingOffers) { 476 if (offer.offererDid === revokerDid) { 477 this.syncStorage.removeIncomingOffer(revokerDid, offer.subjectDid); 478 } 479 } 480 481 // Remove from offered_dids if present 482 if (this.syncStorage.isOfferedDid(revokerDid)) { 483 if (this.offerManager) { 484 try { await this.offerManager.revokeOffer(revokerDid); } catch { /* non-fatal */ } 485 } 486 this.syncStorage.removeOfferedDid(revokerDid); 487 } 488 489 // Transition P2P policy to terminated 490 if (this.policyEngine) { 491 const policyId = `p2p:${revokerDid}`; 492 if (this.policyEngine.transitionPolicy(policyId, "terminated")) { 493 console.log(`[replication] Terminated P2P policy ${policyId}`); 494 } 495 } 496 497 // Remove from active replication with data purge 498 const isTracked = this.getDidSource(revokerDid) !== null 499 || this.syncStorage.getAllStates().some((s) => s.did === revokerDid); 500 if (isTracked) { 501 console.log(`[replication] Remote revocation from ${revokerDid} — removing DID and purging data`); 502 await this.removeDid(revokerDid, true); 503 } 504 } 505 506 /** 507 * Push an offer notification to the target node's p2pds endpoint. 508 * Resolves the target's org.p2pds.peer record to find their endpoint URL, 509 * then POSTs to their notifyOffer XRPC method. 510 */ 511 private async pushOfferNotification(targetDid: string): Promise<void> { 512 const peerInfo = await this.peerDiscovery.discoverPeer(targetDid); 513 if (!peerInfo?.endpoint) { 514 console.log(`[replication] No p2pds endpoint found for ${targetDid}, skipping push notification`); 515 return; 516 } 517 518 const url = `${peerInfo.endpoint}/xrpc/org.p2pds.replication.notifyOffer`; 519 const body = { 520 offererDid: this.config.DID, 521 subjectDid: targetDid, 522 offererPdsEndpoint: await this.repoFetcher.resolvePds(this.config.DID ?? ""), 523 params: { 524 minCopies: 2, 525 intervalSec: 600, 526 priority: 50, 527 }, 528 }; 529 530 const res = await fetch(url, { 531 method: "POST", 532 headers: { "Content-Type": "application/json" }, 533 body: JSON.stringify(body), 534 }); 535 536 if (!res.ok) { 537 const text = await res.text().catch(() => ""); 538 console.warn(`[replication] Push notification to ${targetDid} failed (${res.status}): ${text}`); 539 } else { 540 console.log(`[replication] Push notification sent to ${targetDid} at ${peerInfo.endpoint}`); 541 } 542 } 543 544 /** 545 * Push a revocation notification to the target node's p2pds endpoint. 546 */ 547 private async pushRevokeNotification(targetDid: string): Promise<void> { 548 const peerInfo = await this.peerDiscovery.discoverPeer(targetDid); 549 if (!peerInfo?.endpoint) { 550 console.log(`[replication] No p2pds endpoint found for ${targetDid}, skipping revoke notification`); 551 return; 552 } 553 554 const url = `${peerInfo.endpoint}/xrpc/org.p2pds.replication.notifyRevoke`; 555 const body = { 556 revokerDid: this.config.DID, 557 subjectDid: targetDid, 558 }; 559 560 const res = await fetch(url, { 561 method: "POST", 562 headers: { "Content-Type": "application/json" }, 563 body: JSON.stringify(body), 564 }); 565 566 if (!res.ok) { 567 const text = await res.text().catch(() => ""); 568 console.warn(`[replication] Revoke notification to ${targetDid} failed (${res.status}): ${text}`); 569 } else { 570 console.log(`[replication] Revoke notification sent to ${targetDid} at ${peerInfo.endpoint}`); 571 } 572 } 573 574 /** 575 * Accept an incoming offer: create a reciprocal offer and remove from incoming_offers. 576 */ 577 async acceptOffer(offererDid: string): Promise<{ status: "accepted" | "not_found" | "error"; error?: string }> { 578 // Find the incoming offer to get the subject DID 579 const offers = this.syncStorage.getIncomingOffers(); 580 const offer = offers.find((o) => o.offererDid === offererDid); 581 if (!offer) { 582 return { status: "not_found" }; 583 } 584 585 // Create reciprocal offer (which also push-notifies back) 586 try { 587 await this.offerDid(offererDid); 588 } catch (err) { 589 return { 590 status: "error", 591 error: `Failed to create reciprocal offer: ${err instanceof Error ? err.message : String(err)}`, 592 }; 593 } 594 595 // Remove from incoming_offers 596 this.syncStorage.removeIncomingOffer(offererDid, offer.subjectDid); 597 598 // Run offer discovery immediately to detect mutual agreement and start replication 599 this.triggerOfferDiscovery(); 600 601 return { status: "accepted" }; 602 } 603 604 /** 605 * Reject an incoming offer: remove from incoming_offers without creating a reciprocal offer. 606 */ 607 rejectOffer(offererDid: string): { status: "rejected" | "not_found" } { 608 const offers = this.syncStorage.getIncomingOffers(); 609 const offer = offers.find((o) => o.offererDid === offererDid); 610 if (!offer) { 611 return { status: "not_found" }; 612 } 613 614 this.syncStorage.removeIncomingOffer(offererDid, offer.subjectDid); 615 return { status: "rejected" }; 616 } 617 618 /** 619 * Trigger offer discovery asynchronously (fire-and-forget). 620 * Used after accepting an offer or receiving a push notification 621 * to immediately detect mutual agreements. 622 */ 623 triggerOfferDiscovery(): void { 624 this.runOfferDiscovery().catch((err) => { 625 console.warn("[replication] Triggered offer discovery failed:", err instanceof Error ? err.message : String(err)); 626 }); 627 } 628 629 /** 630 * Get all incoming offers (from other nodes wanting to replicate our data). 631 */ 632 getIncomingOffers(): Array<{ 633 offererDid: string; 634 subjectDid: string; 635 offererPdsEndpoint: string | null; 636 offererEndpoint: string | null; 637 minCopies: number; 638 intervalSec: number; 639 priority: number; 640 receivedAt: string; 641 }> { 642 return this.syncStorage.getIncomingOffers(); 643 } 644 645 /** 646 * Get the PeerDiscovery instance (for verifying incoming offers). 647 */ 648 getPeerDiscovery(): PeerDiscovery { 649 return this.peerDiscovery; 650 } 651 652 /** 653 * Get the RepoFetcher instance (for resolving PDS endpoints). 654 */ 655 getRepoFetcher(): RepoFetcher { 656 return this.repoFetcher; 657 } 658 659 /** 660 * Remove an admin-added DID. 661 * Returns error info if the DID cannot be removed (config/policy origin). 662 * If purgeData is true, deletes all associated data. Otherwise pauses. 663 */ 664 async removeDid(did: string, purgeData: boolean = false): Promise<{ 665 status: "removed" | "error"; 666 purged: boolean; 667 error?: string; 668 }> { 669 // Check if it's a config-origin DID (not removable via UI) 670 const configPolicyId = `config:${did}`; 671 if (this.policyEngine?.getStoredPolicy(configPolicyId) || this.config.REPLICATE_DIDS.includes(did)) { 672 return { status: "error", purged: false, error: "Cannot remove config-origin DID. Remove from REPLICATE_DIDS env var." }; 673 } 674 675 // Transition archive/p2p policies to purged state 676 if (this.policyEngine) { 677 const archivePolicyId = `archive:${did}`; 678 this.policyEngine.transitionPolicy(archivePolicyId, "purged"); 679 const p2pPolicyId = `p2p:${did}`; 680 this.policyEngine.transitionPolicy(p2pPolicyId, "purged"); 681 } 682 683 // Dual-write: also remove from legacy table 684 this.syncStorage.removeAdminDid(did); 685 this.networkService.unsubscribeCommitTopics([did]); 686 this.networkService.unsubscribeIdentityTopics([did]); 687 this.updateFirehoseDids(); 688 689 if (purgeData) { 690 this.syncStorage.clearBlocks(did); 691 this.syncStorage.clearBlobs(did); 692 this.syncStorage.clearRecordPaths(did); 693 this.syncStorage.clearPeerEndpoints(did); 694 this.syncStorage.deleteState(did); 695 696 // Remove manifest record 697 if (this.repoManager) { 698 const rkey = didToRkey(did); 699 try { 700 await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); 701 } catch { 702 // Non-fatal: manifest may not exist 703 } 704 } 705 } else { 706 // Pause: update manifest status but keep data 707 if (this.repoManager) { 708 const rkey = didToRkey(did); 709 const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 710 if (existing) { 711 const manifest: ManifestRecord = { 712 ...(existing.record as ManifestRecord), 713 status: "paused", 714 }; 715 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 716 } 717 } 718 } 719 720 return { status: "removed", purged: purgeData }; 721 } 722 723 /** 724 * Update firehose subscription with the current set of tracked DIDs. 725 */ 726 private updateFirehoseDids(): void { 727 if (this.firehoseSubscription) { 728 this.firehoseSubscription.updateDids(new Set(this.getReplicateDids())); 729 } 730 } 731 732 /** 733 * Ensure a manifest record and sync state exist for a single DID. 734 * Extracted from syncManifests() for use by addDid(). 735 */ 736 private async syncManifestForDid(did: string): Promise<void> { 737 if (this.repoManager) { 738 const rkey = didToRkey(did); 739 const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 740 741 if (!existing) { 742 const manifest: ManifestRecord = { 743 $type: MANIFEST_NSID, 744 subject: did, 745 status: "active", 746 lastSyncRev: null, 747 lastSyncAt: null, 748 createdAt: new Date().toISOString(), 749 }; 750 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 751 } 752 } 753 754 const pdsEndpoint = await this.repoFetcher.resolvePds(did); 755 if (pdsEndpoint) { 756 this.syncStorage.upsertState({ did, pdsEndpoint }); 757 } 758 } 759 760 /** 761 * Ensure a manifest record exists for each configured DID. 762 * Creates new manifests or updates existing ones. 763 */ 764 async syncManifests(): Promise<void> { 765 for (const did of this.getReplicateDids()) { 766 await this.syncManifestForDid(did); 767 } 768 } 769 770 /** 771 * Run offer discovery: gather peers from sync state AND offered_dids, 772 * then discover mutual agreements. 773 * When a mutual agreement is detected for an offered DID, 774 * promotes it to full replication via addDid() and removes from offered_dids. 775 * Non-fatal: errors are logged but don't block sync. 776 */ 777 private async runOfferDiscovery(): Promise<void> { 778 if (!this.offerManager) return; 779 780 try { 781 // Existing: peers from replication_state 782 const states = this.syncStorage.getAllStates(); 783 const syncPeers = states 784 .filter((s) => s.pdsEndpoint) 785 .map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 786 787 // New: peers from offered_dids 788 const offeredDids = this.syncStorage.getOfferedDids(); 789 const offeredPeers = offeredDids 790 .filter((o) => o.pdsEndpoint) 791 .map((o) => ({ did: o.did, pdsEndpoint: o.pdsEndpoint! })); 792 793 // Merge and deduplicate by DID 794 const seenDids = new Set<string>(); 795 const allPeers: Array<{ did: string; pdsEndpoint: string }> = []; 796 for (const peer of [...syncPeers, ...offeredPeers]) { 797 if (!seenDids.has(peer.did)) { 798 seenDids.add(peer.did); 799 allPeers.push(peer); 800 } 801 } 802 803 if (allPeers.length > 0) { 804 const agreements = await this.offerManager.discoverAgreements(allPeers); 805 806 // Check if any offered DIDs now have mutual agreements 807 for (const offered of offeredDids) { 808 const hasAgreement = agreements.some( 809 (a) => a.localOffer.subject === offered.did || a.remoteOffer.subject === offered.did, 810 ); 811 if (hasAgreement) { 812 console.log(`[replication] Mutual agreement detected for offered DID ${offered.did} — promoting to replication`); 813 try { 814 await this.addDid(offered.did); 815 this.syncStorage.removeOfferedDid(offered.did); 816 } catch (err) { 817 console.error( 818 `[replication] Failed to promote offered DID ${offered.did}:`, 819 err instanceof Error ? err.message : String(err), 820 ); 821 } 822 } 823 } 824 825 // Detect broken agreements: P2P-origin DIDs whose remote offer was revoked 826 const activePeerDids = new Set(agreements.map((a) => a.counterpartyDid)); 827 for (const state of states) { 828 // Only check P2P-origin DIDs (those that were promoted from offers) 829 const policyId = `p2p:${state.did}`; 830 const hasP2pPolicy = this.policyEngine?.getPolicies().some((p) => p.id === policyId); 831 if (hasP2pPolicy && !activePeerDids.has(state.did)) { 832 console.log(`[replication] Agreement broken for ${state.did} (remote offer revoked) — removing and purging data`); 833 try { 834 await this.removeDid(state.did, true); 835 } catch (err) { 836 console.error(`[replication] Failed to remove broken-agreement DID ${state.did}:`, err instanceof Error ? err.message : String(err)); 837 } 838 } 839 } 840 841 // Still run full discoverAndSync for sync-state peers 842 await this.offerManager.discoverAndSync(syncPeers); 843 } 844 } catch (err) { 845 console.error( 846 "[replication] Offer discovery error:", 847 err instanceof Error ? err.message : String(err), 848 ); 849 } 850 } 851 852 /** 853 * Discover peer endpoints by scanning manifests of known peers. 854 * For each peer whose manifest includes a DID we're tracking, record 855 * that peer as a potential fallback source for that DID. 856 */ 857 private async discoverPeerEndpoints(): Promise<void> { 858 const trackedDids = new Set(this.getReplicateDids()); 859 const states = this.syncStorage.getAllStates(); 860 861 for (const state of states) { 862 if (!state.pdsEndpoint) continue; 863 864 try { 865 const manifests = await this.peerDiscovery.discoverManifests( 866 state.did, 867 state.pdsEndpoint, 868 ); 869 870 for (const manifest of manifests) { 871 if ( 872 manifest.subject && 873 trackedDids.has(manifest.subject) && 874 manifest.status === "active" 875 ) { 876 this.syncStorage.upsertPeerEndpoint( 877 manifest.subject, 878 state.did, 879 state.pdsEndpoint, 880 manifest.lastSyncRev ?? null, 881 ); 882 } 883 } 884 } catch { 885 // Non-fatal: peer endpoint discovery is best-effort 886 } 887 } 888 } 889 890 /** 891 * Fetch a repo from known peer endpoints when the source PDS is unavailable. 892 * Tries peers in order of freshest data (highest lastSyncRev). 893 * Throws the original error if no peers succeed. 894 */ 895 private async fetchFromPeersOrThrow( 896 did: string, 897 since: string | undefined, 898 originalError: Error, 899 ): Promise<Uint8Array> { 900 const peers = this.syncStorage.getPeerEndpoints(did); 901 if (peers.length === 0) throw originalError; 902 903 // Sort by lastSyncRev descending (prefer freshest data) 904 peers.sort((a, b) => { 905 if (!a.lastSyncRev && !b.lastSyncRev) return 0; 906 if (!a.lastSyncRev) return 1; 907 if (!b.lastSyncRev) return -1; 908 return b.lastSyncRev.localeCompare(a.lastSyncRev); 909 }); 910 911 for (const peer of peers) { 912 try { 913 return await this.repoFetcher.fetchRepo( 914 peer.pdsEndpoint, 915 did, 916 since, 917 ); 918 } catch { 919 // Try next peer 920 } 921 } 922 923 throw originalError; 924 } 925 926 /** 927 * Sync all configured DIDs. 928 * 929 * When a PolicyEngine is present: 930 * - Merges config DIDs with policy explicit DIDs 931 * - Filters out DIDs where shouldReplicate is false 932 * - Sorts by priority (highest first) 933 * - Respects per-DID sync intervals (skips DIDs not yet due) 934 */ 935 async syncAll(): Promise<void> { 936 const dids = this.getReplicateDids(); 937 938 // Sort by priority (highest first) when policy engine is present 939 const sortedDids = this.policyEngine 940 ? this.sortDidsByPriority(dids) 941 : dids; 942 943 this.emitProgress({ type: "sync-cycle:start", did: "*" }); 944 945 for (const did of sortedDids) { 946 if (this.stopped) break; 947 948 // Skip tombstoned DIDs (handled separately by cleanupTombstonedDids) 949 const currentState = this.syncStorage.getState(did); 950 if (currentState?.status === "tombstoned") continue; 951 952 // Check per-DID interval when policy engine is present 953 if (this.policyEngine && !this.isDidDueForSync(did)) { 954 continue; 955 } 956 957 try { 958 await this.syncDid(did, "periodic"); 959 // Record successful sync timestamp 960 this.lastSyncTimestamps.set(did, Date.now()); 961 } catch (err) { 962 const message = 963 err instanceof Error ? err.message : String(err); 964 this.syncStorage.updateStatus(did, "error", message); 965 } 966 } 967 968 // Run deferred GC for DIDs flagged by firehose delete/update ops 969 await this.runDeferredGc(); 970 971 // Clean up tombstoned DIDs after grace period 972 await this.cleanupTombstonedDids(); 973 974 // Discover peer endpoints for P2P fallback 975 await this.discoverPeerEndpoints(); 976 977 // Re-run offer discovery to pick up new/revoked offers 978 await this.runOfferDiscovery(); 979 980 // Check if multiaddrs changed and republish peer record if needed 981 await this._checkMultiaddrsChanged(); 982 983 this.emitProgress({ type: "sync-cycle:complete", did: "*" }); 984 } 985 986 /** 987 * Sort DIDs by their effective policy priority (highest first). 988 * DIDs without a matching policy get priority 0. 989 */ 990 private sortDidsByPriority(dids: string[]): string[] { 991 if (!this.policyEngine) return dids; 992 return [...dids].sort((a, b) => { 993 const pa = this.policyEngine!.evaluate(a).priority; 994 const pb = this.policyEngine!.evaluate(b).priority; 995 return pb - pa; 996 }); 997 } 998 999 /** 1000 * Check whether a DID is due for sync based on its policy interval. 1001 * DIDs that have never been synced are always due. 1002 */ 1003 private isDidDueForSync(did: string): boolean { 1004 if (!this.policyEngine) return true; 1005 1006 const lastSync = this.lastSyncTimestamps.get(did); 1007 if (lastSync === undefined) return true; // never synced 1008 1009 const config = this.policyEngine.getReplicationConfig(did); 1010 // Config DIDs without a matching policy use the default interval 1011 const intervalMs = config 1012 ? config.sync.intervalSec * 1000 1013 : DEFAULT_SYNC_INTERVAL_MS; 1014 1015 return Date.now() - lastSync >= intervalMs; 1016 } 1017 1018 /** 1019 * Get the effective sync interval for a DID in milliseconds. 1020 * Returns the policy-driven interval if a policy engine is present, 1021 * otherwise returns the default 5-minute interval. 1022 */ 1023 getEffectiveSyncIntervalMs(did: string): number { 1024 if (!this.policyEngine) return DEFAULT_SYNC_INTERVAL_MS; 1025 1026 const config = this.policyEngine.getReplicationConfig(did); 1027 return config 1028 ? config.sync.intervalSec * 1000 1029 : DEFAULT_SYNC_INTERVAL_MS; 1030 } 1031 1032 /** 1033 * Sync a single DID: fetch repo, store blocks in IPFS, verify, update state. 1034 */ 1035 async syncDid(did: string, trigger: SyncTrigger = "unknown"): Promise<void> { 1036 this.syncStorage.updateStatus(did, "syncing"); 1037 const syncStart = Date.now(); 1038 let sourceType = "pds"; 1039 1040 // 1. Resolve PDS endpoint 1041 let state = this.syncStorage.getState(did); 1042 let pdsEndpoint = await this.repoFetcher.resolvePds(did); 1043 1044 if (!pdsEndpoint) { 1045 // If previously had a PDS endpoint, this may be a tombstoned/deleted account 1046 if (state?.pdsEndpoint) { 1047 this.syncStorage.markTombstoned(did); 1048 console.warn(`[replication] DID ${did} no longer resolvable — marked as tombstoned`); 1049 } else { 1050 this.syncStorage.updateStatus(did, "error", "Could not resolve PDS endpoint"); 1051 } 1052 return; 1053 } 1054 1055 // Update PDS endpoint if it changed 1056 this.syncStorage.upsertState({ did, pdsEndpoint }); 1057 state = this.syncStorage.getState(did); 1058 1059 // 2. Optionally refresh peer info (peerId + multiaddrs) 1060 if (state && this.shouldRefreshPeerInfo(state)) { 1061 try { 1062 const peerInfo = await this.peerDiscovery.discoverPeer(did); 1063 if (peerInfo) { 1064 // Detect PeerID changes (peer restarted with new identity) 1065 if (state.peerId && peerInfo.peerId && state.peerId !== peerInfo.peerId) { 1066 console.warn( 1067 `[replication] PeerID changed for ${did}: ${state.peerId}${peerInfo.peerId}`, 1068 ); 1069 } 1070 this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs); 1071 // Merge observed multiaddrs from active libp2p connections 1072 if (peerInfo.peerId) { 1073 const observed = this.networkService.getRemoteAddrs(peerInfo.peerId); 1074 if (observed.length > 0) { 1075 const merged = [...new Set([...peerInfo.multiaddrs, ...observed])]; 1076 if (merged.length > peerInfo.multiaddrs.length) { 1077 this.syncStorage.updatePeerInfo(did, peerInfo.peerId, merged); 1078 } 1079 } 1080 } 1081 } 1082 } catch { 1083 // Non-fatal: peer discovery is optional 1084 } 1085 } 1086 1087 // 3. Fetch repo (with incremental sync if we have a previous rev) 1088 const since = state?.lastSyncRev ?? undefined; 1089 1090 // 3a. Try libp2p peer-first sync if we have peer info 1091 let carBytes: Uint8Array | null = null; 1092 state = this.syncStorage.getState(did); // re-read for fresh peer info 1093 if (this.libp2p && state?.peerId && state.peerMultiaddrs && state.peerMultiaddrs.length > 0) { 1094 try { 1095 console.log(`[sync] ${did} — fetching repo from peer ${state.peerId.slice(0, 12)}… via libp2p`); 1096 carBytes = await fetchRepoFromPeer( 1097 this.libp2p, 1098 state.peerMultiaddrs, 1099 did, 1100 since, 1101 ); 1102 sourceType = "libp2p"; 1103 console.log(`[sync] ${did} — received ${(carBytes.length / 1024).toFixed(1)} KB from peer via libp2p`); 1104 } catch (err) { 1105 console.log( 1106 `[sync] ${did} — libp2p sync failed, falling back to HTTP: ${err instanceof Error ? err.message : String(err)}`, 1107 ); 1108 carBytes = null; 1109 } 1110 } 1111 1112 // 3b. Fall back to HTTP PDS fetch 1113 if (!carBytes) { 1114 sourceType = "pds"; 1115 console.log(`[sync] ${did} — fetching repo from ${pdsEndpoint}${since ? ` (since: ${since.slice(0, 8)}…)` : ""}`); 1116 try { 1117 carBytes = await this.repoFetcher.fetchRepo( 1118 pdsEndpoint, 1119 did, 1120 since, 1121 ); 1122 } catch (sourceErr) { 1123 // On failure, clear cached peer info and trigger re-discovery 1124 this.syncStorage.clearPeerInfo(did); 1125 this.peerDiscovery.discoverPeer(did).then((peerInfo) => { 1126 if (peerInfo) { 1127 this.syncStorage.updatePeerInfo(did, peerInfo.peerId, peerInfo.multiaddrs); 1128 } 1129 }).catch(() => {}); 1130 const err = sourceErr instanceof Error ? sourceErr : new Error(String(sourceErr)); 1131 if (since) { 1132 // Retry full sync from source, then fall back to peers 1133 try { 1134 carBytes = await this.repoFetcher.fetchRepo(pdsEndpoint, did); 1135 } catch { 1136 sourceType = "peer_fallback"; 1137 carBytes = await this.fetchFromPeersOrThrow(did, since, err); 1138 } 1139 } else { 1140 sourceType = "peer_fallback"; 1141 carBytes = await this.fetchFromPeersOrThrow(did, undefined, err); 1142 } 1143 } 1144 } 1145 1146 // Start sync event after source is determined 1147 this.emitProgress({ type: "sync:start", did, sourceType }); 1148 const syncEventId = this.syncStorage.startSyncEvent(did, sourceType, trigger); 1149 1150 try { 1151 // 4. Parse CAR and store blocks 1152 const carSizeKb = (carBytes.length / 1024).toFixed(1); 1153 console.log(`[sync] ${did} — CAR received: ${carSizeKb} KB, parsing...`); 1154 this.emitProgress({ type: "sync:car-received", did, carBytes: carBytes.length }); 1155 const { root, blocks } = await readCarWithRoot(carBytes); 1156 1157 await this.blockStore.putBlocks(blocks); 1158 1159 // 5. Collect CID strings + sizes for DHT announcement + verification 1160 const cidStrs: string[] = []; 1161 const blockEntries: Array<{ cid: string; sizeBytes: number }> = []; 1162 const internalMap = ( 1163 blocks as unknown as { map: Map<string, Uint8Array> } 1164 ).map; 1165 if (internalMap) { 1166 for (const [cidStr, blockBytes] of internalMap.entries()) { 1167 cidStrs.push(cidStr); 1168 blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length }); 1169 } 1170 } 1171 1172 console.log(`[sync] ${did} — stored ${blockEntries.length} blocks, verifying...`); 1173 1174 // 5b. Track block CIDs with sizes for remote verification 1175 if (blockEntries.length > 0) { 1176 this.syncStorage.trackBlocksWithSize(did, blockEntries); 1177 } 1178 1179 const totalBlockSizeKb = blockEntries.reduce((s, b) => s + b.sizeBytes, 0) / 1024; 1180 this.emitProgress({ type: "sync:blocks-stored", did, blocksStored: blockEntries.length, blocksSizeKb: Math.round(totalBlockSizeKb) }); 1181 1182 // 6. Announce to DHT (fire-and-forget) 1183 this.networkService.provideBlocks(cidStrs).catch(() => {}); 1184 1185 // 7. Verify local block availability 1186 const verification = 1187 await this.verifier.verifyBlockAvailability(cidStrs); 1188 if (verification.missing.length > 0) { 1189 console.warn( 1190 `Local verification: ${verification.missing.length}/${verification.checked} blocks missing for ${did}`, 1191 ); 1192 } 1193 this.syncStorage.updateVerifiedAt(did); 1194 this.emitProgress({ type: "sync:verified", did, missingBlocks: verification.missing.length }); 1195 1196 // 8. Extract actual rev from the commit block 1197 const rootCidStr = root.toString(); 1198 let rev = rootCidStr; // fallback 1199 const commitBytes = internalMap?.get(rootCidStr); 1200 if (commitBytes) { 1201 try { 1202 const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1203 if (typeof commitObj.rev === "string") { 1204 rev = commitObj.rev; 1205 } 1206 } catch { 1207 // If CBOR decode fails, fall back to root CID as rev 1208 } 1209 } 1210 1211 // 9. Update sync state with both rev and root CID 1212 this.syncStorage.updateSyncProgress(did, rev, rootCidStr); 1213 1214 // 9a. Publish gossipsub notification (fire-and-forget) 1215 this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {}); 1216 1217 // 9b. Invalidate cached ReadableRepo so it reloads with new root 1218 this.replicatedRepoReader?.invalidateCache(did); 1219 1220 // 9c. Track record paths for challenge generation (full MST walk) 1221 try { 1222 const recordPaths = await extractAllRecordPaths( 1223 this.blockStore, 1224 rootCidStr, 1225 ); 1226 this.syncStorage.clearRecordPaths(did); 1227 this.syncStorage.trackRecordPaths(did, recordPaths); 1228 // Update lexicon index with NSIDs from these paths 1229 const nsids = extractNsids(recordPaths); 1230 if (nsids.length > 0) { 1231 this.syncStorage.updateLexiconIndex(nsids); 1232 } 1233 } catch { 1234 // Non-fatal: path extraction is best-effort 1235 } 1236 1237 // 9d. Reconcile blocks: remove orphaned blocks from this DID's tracking 1238 try { 1239 const liveCids = await extractAllCids(this.blockStore, rootCidStr); 1240 const trackedCids = this.syncStorage.getBlockCidSet(did); 1241 1242 // Find CIDs that are tracked but no longer live 1243 const orphanedFromDid: string[] = []; 1244 for (const cid of trackedCids) { 1245 if (!liveCids.has(cid)) { 1246 orphanedFromDid.push(cid); 1247 } 1248 } 1249 1250 if (orphanedFromDid.length > 0) { 1251 // Remove from this DID's tracking 1252 this.syncStorage.removeBlocks(did, orphanedFromDid); 1253 1254 // Find which are truly orphaned (no other DID references them) 1255 const trulyOrphaned = this.syncStorage.findOrphanedCids(orphanedFromDid); 1256 1257 // Delete from blockstore 1258 for (const cid of trulyOrphaned) { 1259 await this.blockStore.deleteBlock(cid); 1260 } 1261 } 1262 1263 // Clear the needs_gc flag since we just did a full reconciliation 1264 this.syncStorage.clearNeedsGc(did); 1265 } catch { 1266 // Non-fatal: GC is best-effort 1267 } 1268 1269 // 9e. Reconcile blobs: remove blobs for deleted records 1270 try { 1271 const currentBlobCids = new Set<string>(); 1272 if (this.replicatedRepoReader) { 1273 const repo = await this.replicatedRepoReader.getRepo(did); 1274 if (repo) { 1275 for await (const entry of repo.walkRecords()) { 1276 const cids = extractBlobCids(entry.record); 1277 for (const cid of cids) { 1278 currentBlobCids.add(cid); 1279 } 1280 } 1281 } 1282 } 1283 1284 const trackedBlobCids = this.syncStorage.getBlobCids(did); 1285 const orphanedBlobs = trackedBlobCids.filter(cid => !currentBlobCids.has(cid)); 1286 1287 if (orphanedBlobs.length > 0) { 1288 this.syncStorage.removeBlobs(did, orphanedBlobs); 1289 const trulyOrphanedBlobs = this.syncStorage.findOrphanedBlobCids(orphanedBlobs); 1290 for (const cid of trulyOrphanedBlobs) { 1291 await this.blockStore.deleteBlock(cid); 1292 } 1293 } 1294 } catch { 1295 // Non-fatal: blob GC is best-effort 1296 } 1297 1298 // 10. Update manifest record 1299 if (this.repoManager) { 1300 const rkey = didToRkey(did); 1301 const existingManifest = await this.repoManager.getRecord( 1302 MANIFEST_NSID, 1303 rkey, 1304 ); 1305 if (existingManifest) { 1306 const manifest: ManifestRecord = { 1307 $type: MANIFEST_NSID, 1308 subject: did, 1309 status: "active", 1310 lastSyncRev: rev, 1311 lastSyncAt: new Date().toISOString(), 1312 createdAt: 1313 (existingManifest.record as Record<string, unknown>) 1314 ?.createdAt as string ?? new Date().toISOString(), 1315 }; 1316 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 1317 } 1318 } 1319 1320 // 11. Sync blobs and capture result 1321 console.log(`[sync] ${did} — syncing blobs...`); 1322 const blobResult = await this.syncBlobs(did); 1323 1324 // 12. Complete sync event with metrics 1325 const totalDuration = ((Date.now() - syncStart) / 1000).toFixed(1); 1326 const totalBlobKb = (blobResult.totalBytes / 1024).toFixed(1); 1327 console.log( 1328 `[sync] ${did} — complete: ${blockEntries.length} blocks, ${blobResult.fetched} blobs (${totalBlobKb} KB), ${totalDuration}s`, 1329 ); 1330 this.syncStorage.completeSyncEvent(syncEventId, { 1331 status: "success", 1332 blocksAdded: blockEntries.length, 1333 blobsAdded: blobResult.fetched, 1334 carBytes: carBytes.length, 1335 blobBytes: blobResult.totalBytes, 1336 durationMs: Date.now() - syncStart, 1337 rev, 1338 rootCid: rootCidStr, 1339 incremental: !!since, 1340 }); 1341 this.emitProgress({ 1342 type: "sync:complete", 1343 did, 1344 blocksStored: blockEntries.length, 1345 blobsFetched: blobResult.fetched, 1346 blobBytes: blobResult.totalBytes, 1347 durationMs: Date.now() - syncStart, 1348 }); 1349 } catch (err) { 1350 this.syncStorage.completeSyncEvent(syncEventId, { 1351 status: "error", 1352 errorMessage: err instanceof Error ? err.message : String(err), 1353 durationMs: Date.now() - syncStart, 1354 incremental: !!since, 1355 }); 1356 this.emitProgress({ 1357 type: "sync:error", 1358 did, 1359 error: err instanceof Error ? err.message : String(err), 1360 }); 1361 throw err; 1362 } 1363 } 1364 1365 /** Grace period before purging tombstoned DID data (24 hours). */ 1366 private static readonly TOMBSTONE_GRACE_MS = 24 * 60 * 60 * 1000; 1367 1368 /** 1369 * Clean up tombstoned DIDs: re-verify resolution, purge if still dead. 1370 * Only purges after a grace period (24 hours) to avoid premature deletion 1371 * from transient DID resolution failures. 1372 */ 1373 private async cleanupTombstonedDids(): Promise<void> { 1374 const states = this.syncStorage.getAllStates(); 1375 const tombstoned = states.filter((s) => s.status === "tombstoned"); 1376 1377 for (const state of tombstoned) { 1378 if (this.stopped) break; 1379 1380 // Check grace period: only purge if tombstoned for > 24 hours 1381 const lastSyncAt = state.lastSyncAt ? new Date(state.lastSyncAt).getTime() : 0; 1382 const timeSinceLastSync = Date.now() - lastSyncAt; 1383 if (timeSinceLastSync < ReplicationManager.TOMBSTONE_GRACE_MS) { 1384 continue; 1385 } 1386 1387 // Re-verify: try to resolve the DID again 1388 try { 1389 const pdsEndpoint = await this.repoFetcher.resolvePds(state.did); 1390 if (pdsEndpoint) { 1391 // DID is alive again! Un-tombstone and resume. 1392 this.syncStorage.updateStatus(state.did, "pending"); 1393 this.syncStorage.upsertState({ did: state.did, pdsEndpoint }); 1394 console.log(`[replication] Tombstoned DID ${state.did} is alive again, resuming`); 1395 this.syncDid(state.did, "tombstone-recovery").catch((err) => { 1396 const message = err instanceof Error ? err.message : String(err); 1397 this.syncStorage.updateStatus(state.did, "error", message); 1398 }); 1399 continue; 1400 } 1401 } catch { 1402 // Resolution failed, proceed with purge 1403 } 1404 1405 // Still dead — purge data 1406 console.warn(`[replication] Purging data for tombstoned DID ${state.did}`); 1407 const { blocksRemoved, blobsRemoved } = this.syncStorage.purgeDidData(state.did); 1408 1409 // Delete truly orphaned blocks/blobs from blockstore 1410 if (blocksRemoved.length > 0) { 1411 const orphanedBlocks = this.syncStorage.findOrphanedCids(blocksRemoved); 1412 for (const cid of orphanedBlocks) { 1413 await this.blockStore.deleteBlock(cid); 1414 } 1415 } 1416 if (blobsRemoved.length > 0) { 1417 const orphanedBlobs = this.syncStorage.findOrphanedBlobCids(blobsRemoved); 1418 for (const cid of orphanedBlobs) { 1419 await this.blockStore.deleteBlock(cid); 1420 } 1421 } 1422 1423 // Remove manifest record 1424 if (this.repoManager) { 1425 const rkey = didToRkey(state.did); 1426 try { 1427 await this.repoManager.deleteRecord(MANIFEST_NSID, rkey); 1428 } catch { 1429 // Non-fatal 1430 } 1431 } 1432 } 1433 } 1434 1435 /** 1436 * Run deferred GC for DIDs that were flagged by firehose delete/update ops. 1437 * Triggers a full sync for each flagged DID, which includes block/blob reconciliation. 1438 */ 1439 private async runDeferredGc(): Promise<void> { 1440 const didsNeedingGc = this.syncStorage.getDidsNeedingGc(); 1441 for (const did of didsNeedingGc) { 1442 if (this.stopped) break; 1443 try { 1444 await this.syncDid(did, "gc"); 1445 this.lastSyncTimestamps.set(did, Date.now()); 1446 } catch (err) { 1447 const message = err instanceof Error ? err.message : String(err); 1448 this.syncStorage.updateStatus(did, "error", message); 1449 } 1450 } 1451 } 1452 1453 /** 1454 * Compute the tick interval for periodic sync. 1455 * 1456 * When a PolicyEngine is present, uses the minimum sync interval 1457 * across all DIDs so that no DID misses its window. Each DID's 1458 * individual interval is checked inside syncAll(). 1459 * 1460 * Falls back to the provided intervalMs (default 5 minutes). 1461 */ 1462 private computeTickIntervalMs(fallbackMs: number): number { 1463 if (!this.policyEngine) return fallbackMs; 1464 1465 const dids = this.getReplicateDids(); 1466 if (dids.length === 0) return fallbackMs; 1467 1468 let minInterval = Infinity; 1469 for (const did of dids) { 1470 const config = this.policyEngine.getReplicationConfig(did); 1471 const intervalMs = config 1472 ? config.sync.intervalSec * 1000 1473 : DEFAULT_SYNC_INTERVAL_MS; 1474 if (intervalMs < minInterval) { 1475 minInterval = intervalMs; 1476 } 1477 } 1478 1479 // Clamp to minimum tick interval to prevent excessive polling 1480 return Math.max( 1481 minInterval === Infinity ? fallbackMs : minInterval, 1482 MIN_TICK_INTERVAL_MS, 1483 ); 1484 } 1485 1486 /** 1487 * Start periodic sync and verification at their respective intervals. 1488 * 1489 * When a PolicyEngine is present, the tick rate is derived from the 1490 * minimum sync interval across all DIDs. Each tick, syncAll() checks 1491 * which DIDs are actually due for sync based on their individual intervals. 1492 */ 1493 startPeriodicSync(intervalMs: number = 5 * 60 * 1000): void { 1494 if (this.syncTimer) return; 1495 this.stopped = false; 1496 1497 const tickMs = this.computeTickIntervalMs(intervalMs); 1498 1499 // Run first sync after a short delay to let startup complete 1500 setTimeout(() => { 1501 if (!this.stopped) { 1502 this.syncAll().catch((err) => { 1503 console.error("Periodic sync error:", err); 1504 }); 1505 } 1506 }, 5000); 1507 1508 this.syncTimer = setInterval(() => { 1509 if (!this.stopped) { 1510 this.syncAll().catch((err) => { 1511 console.error("Periodic sync error:", err); 1512 }); 1513 } 1514 }, tickMs); 1515 1516 // Start periodic PLC mirror refresh (6 hours) 1517 const PLC_REFRESH_MS = 6 * 60 * 60 * 1000; 1518 this.plcRefreshTimer = setInterval(() => { 1519 if (!this.stopped) { 1520 this.refreshAllPlcLogs().catch((err) => { 1521 console.error("[replication] PLC mirror refresh error:", err); 1522 }); 1523 } 1524 }, PLC_REFRESH_MS); 1525 1526 // Initial PLC log fetch for all tracked did:plc DIDs (fire-and-forget) 1527 this.refreshAllPlcLogs().catch((err) => { 1528 console.error("[replication] Initial PLC mirror fetch error:", err); 1529 }); 1530 1531 // Build lexicon index from existing record paths 1532 try { 1533 this.syncStorage.rebuildLexiconIndex(); 1534 } catch (err) { 1535 console.error("[replication] Lexicon index rebuild error:", err); 1536 } 1537 1538 // Run verification once on startup, then on a timer 1539 this.runVerification().catch((err) => { 1540 console.error("Initial verification error:", err); 1541 }); 1542 this.verificationTimer = setInterval(() => { 1543 if (!this.stopped) { 1544 this.runVerification().catch((err) => { 1545 console.error("Periodic verification error:", err); 1546 }); 1547 } 1548 }, this.verificationConfig.verificationIntervalMs); 1549 } 1550 1551 /** 1552 * Start the firehose subscription for real-time updates. 1553 * The firehose provides streaming updates for serviced DIDs, 1554 * complementing the periodic polling sync. 1555 */ 1556 startFirehose(): void { 1557 if (this.firehoseSubscription) return; 1558 if (!this.config.FIREHOSE_ENABLED) return; 1559 1560 const replicateDids = this.getReplicateDids(); 1561 if (replicateDids.length === 0) return; 1562 1563 this.firehoseSubscription = new FirehoseSubscription({ 1564 firehoseUrl: this.config.FIREHOSE_URL, 1565 }); 1566 1567 // Register handler for commit events 1568 this.firehoseSubscription.onCommit((event) => { 1569 this.handleFirehoseCommit(event).catch((err) => { 1570 console.error(`[replication] Firehose commit handler error for ${event.repo}:`, err); 1571 }); 1572 }); 1573 1574 // Register handler for account status events (tombstone/deactivation) 1575 this.firehoseSubscription.onAccount((event) => { 1576 this.handleFirehoseAccount(event).catch((err) => { 1577 console.error(`[replication] Firehose account handler error for ${event.did}:`, err); 1578 }); 1579 }); 1580 1581 // Load saved cursor for resumption 1582 const savedCursor = this.syncStorage.getFirehoseCursor(); 1583 1584 // Start with the merged set of DIDs 1585 const dids = new Set(replicateDids); 1586 this.firehoseSubscription.start(dids, savedCursor); 1587 1588 // Periodically save the cursor to SQLite (every 30 seconds) 1589 this.firehoseCursorSaveTimer = setInterval(() => { 1590 this.saveFirehoseCursor(); 1591 }, 30_000); 1592 1593 console.log( 1594 `[replication] Firehose subscription started` + 1595 (savedCursor !== null ? ` (resuming from cursor ${savedCursor})` : "") + 1596 ` — tracking ${dids.size} DIDs`, 1597 ); 1598 } 1599 1600 /** 1601 * Handle a commit event from the firehose. 1602 * 1603 * Optimization: the firehose event already contains the blocks (as CAR bytes) 1604 * and the commit rev/CID. We apply them directly to our blockstore, avoiding 1605 * an HTTP round-trip to the source PDS. 1606 * 1607 * Falls back to full syncDid() if incremental apply fails (e.g., tooBig event, 1608 * empty blocks, gap in sequence, or CAR parsing error). 1609 */ 1610 private async handleFirehoseCommit(event: FirehoseCommitEvent): Promise<void> { 1611 const did = event.repo; 1612 1613 // Only process if this DID is one we are tracking 1614 const replicateDids = this.getReplicateDids(); 1615 if (!replicateDids.includes(did)) return; 1616 1617 // Determine if we should attempt incremental apply or fall back to full sync. 1618 // Fall back when: 1619 // - tooBig is set (blocks were too large to include in the event) 1620 // - rebase occurred (repo structure changed, need full sync) 1621 // - blocks are empty (nothing to apply) 1622 if (event.tooBig || event.rebase || event.blocks.length === 0) { 1623 try { 1624 await this.syncDid(did, "firehose-resync"); 1625 this.lastSyncTimestamps.set(did, Date.now()); 1626 } catch (err) { 1627 const message = err instanceof Error ? err.message : String(err); 1628 this.syncStorage.updateStatus(did, "error", message); 1629 } 1630 return; 1631 } 1632 1633 // Check for sequence gaps: if we have a `since` (previous rev) in the event 1634 // and it doesn't match our last sync rev, we may have missed events. 1635 const state = this.syncStorage.getState(did); 1636 if (event.since !== null && state !== null && state.lastSyncRev !== null) { 1637 if (state.lastSyncRev !== event.since) { 1638 // Gap detected: our last known rev doesn't match the event's `since`. 1639 // Fall back to full sync to catch up. 1640 console.warn( 1641 `[replication] Gap detected for ${did}: local rev=${state.lastSyncRev}, event.since=${event.since}. Falling back to full sync.`, 1642 ); 1643 try { 1644 await this.syncDid(did, "firehose-resync"); 1645 this.lastSyncTimestamps.set(did, Date.now()); 1646 } catch (err) { 1647 const message = err instanceof Error ? err.message : String(err); 1648 this.syncStorage.updateStatus(did, "error", message); 1649 } 1650 return; 1651 } 1652 } 1653 1654 // Attempt incremental block application from firehose event 1655 try { 1656 await this.applyFirehoseBlocks(did, event); 1657 this.lastSyncTimestamps.set(did, Date.now()); 1658 } catch (err) { 1659 // Incremental apply failed — fall back to full sync 1660 console.warn( 1661 `[replication] Incremental apply failed for ${did}, falling back to full sync:`, 1662 err instanceof Error ? err.message : String(err), 1663 ); 1664 try { 1665 await this.syncDid(did, "firehose-resync"); 1666 this.lastSyncTimestamps.set(did, Date.now()); 1667 } catch (syncErr) { 1668 const message = syncErr instanceof Error ? syncErr.message : String(syncErr); 1669 this.syncStorage.updateStatus(did, "error", message); 1670 } 1671 } 1672 } 1673 1674 /** 1675 * Handle an account status event from the firehose. 1676 * Detects tombstoned/deactivated accounts and marks them accordingly. 1677 * Handles re-activation by clearing tombstone and triggering sync. 1678 */ 1679 private async handleFirehoseAccount(event: FirehoseAccountEvent): Promise<void> { 1680 const did = event.did; 1681 1682 // Only process DIDs we are tracking 1683 const replicateDids = this.getReplicateDids(); 1684 if (!replicateDids.includes(did)) return; 1685 1686 if (!event.active || event.status === "deleted" || event.status === "takendown") { 1687 // Refresh PLC log to capture tombstone operation 1688 if (did.startsWith("did:plc:")) { 1689 this.fetchPlcLog(did).catch((err) => { 1690 console.warn(`[replication] PLC log refresh on tombstone for ${did} failed:`, err instanceof Error ? err.message : String(err)); 1691 }); 1692 } 1693 1694 // Mark as tombstoned 1695 this.syncStorage.markTombstoned(did); 1696 1697 // Update manifest to paused 1698 if (this.repoManager) { 1699 const rkey = didToRkey(did); 1700 const existing = await this.repoManager.getRecord(MANIFEST_NSID, rkey); 1701 if (existing) { 1702 const manifest: ManifestRecord = { 1703 ...(existing.record as ManifestRecord), 1704 status: "paused", 1705 }; 1706 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 1707 } 1708 } 1709 1710 console.warn( 1711 `[replication] Account ${did} deactivated/tombstoned (status=${event.status}, active=${event.active})`, 1712 ); 1713 } else if (event.active) { 1714 // Re-activated: clear tombstone and trigger full sync 1715 const state = this.syncStorage.getState(did); 1716 if (state?.status === "tombstoned") { 1717 this.syncStorage.updateStatus(did, "pending"); 1718 console.log(`[replication] Account ${did} re-activated, triggering sync`); 1719 this.syncDid(did, "tombstone-recovery").catch((err) => { 1720 const message = err instanceof Error ? err.message : String(err); 1721 this.syncStorage.updateStatus(did, "error", message); 1722 }); 1723 // Refresh PLC log on reactivation 1724 if (did.startsWith("did:plc:")) { 1725 this.fetchPlcLog(did).catch((err) => { 1726 console.warn(`[replication] PLC log refresh on reactivation for ${did} failed:`, err instanceof Error ? err.message : String(err)); 1727 }); 1728 } 1729 } 1730 } 1731 } 1732 1733 /** 1734 * Apply blocks from a firehose commit event directly to the blockstore. 1735 * The event's `blocks` field contains CAR-encoded bytes that can be parsed 1736 * and stored without fetching from the source PDS. 1737 */ 1738 private async applyFirehoseBlocks( 1739 did: string, 1740 event: FirehoseCommitEvent, 1741 ): Promise<void> { 1742 const syncStart = Date.now(); 1743 const syncEventId = this.syncStorage.startSyncEvent(did, "firehose", "firehose"); 1744 1745 try { 1746 // 1. Parse the CAR bytes from the firehose event 1747 const { root, blocks } = await readCarWithRoot(event.blocks); 1748 1749 // 2. Store blocks in our blockstore 1750 await this.blockStore.putBlocks(blocks); 1751 1752 // 3. Collect CID strings + sizes for DHT announcement + block tracking 1753 const cidStrs: string[] = []; 1754 const blockEntries: Array<{ cid: string; sizeBytes: number }> = []; 1755 const internalMap = ( 1756 blocks as unknown as { map: Map<string, Uint8Array> } 1757 ).map; 1758 if (internalMap) { 1759 for (const [cidStr, blockBytes] of internalMap.entries()) { 1760 cidStrs.push(cidStr); 1761 blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length }); 1762 } 1763 } 1764 1765 // 4. Track block CIDs with sizes 1766 if (blockEntries.length > 0) { 1767 this.syncStorage.trackBlocksWithSize(did, blockEntries); 1768 } 1769 1770 const fhBlockSizeKb = blockEntries.reduce((s, b) => s + b.sizeBytes, 0) / 1024; 1771 this.emitProgress({ type: "sync:blocks-stored", did, blocksStored: blockEntries.length, blocksSizeKb: Math.round(fhBlockSizeKb) }); 1772 1773 // 5. Announce to DHT (fire-and-forget) 1774 this.networkService.provideBlocks(cidStrs).catch(() => {}); 1775 1776 // 6. Determine rev and root CID 1777 const rootCidStr = root.toString(); 1778 let rev = event.rev; // Use the rev directly from the firehose event 1779 1780 // If no rev in the event, try to extract from commit block 1781 if (!rev) { 1782 const commitBytes = internalMap?.get(rootCidStr); 1783 if (commitBytes) { 1784 try { 1785 const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1786 if (typeof commitObj.rev === "string") { 1787 rev = commitObj.rev; 1788 } 1789 } catch { 1790 // Fall back to root CID as rev 1791 rev = rootCidStr; 1792 } 1793 } else { 1794 rev = rootCidStr; 1795 } 1796 } 1797 1798 // 7. Update sync state 1799 this.syncStorage.updateSyncProgress(did, rev, rootCidStr); 1800 1801 // 7a. Publish gossipsub notification (fire-and-forget) 1802 this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {}); 1803 1804 // 8. Invalidate cached ReadableRepo so it reloads with new root 1805 this.replicatedRepoReader?.invalidateCache(did); 1806 1807 // 8b. Track record paths incrementally from firehose ops 1808 try { 1809 const createdPaths = event.ops 1810 .filter((op) => op.action === "create" || op.action === "update") 1811 .map((op) => op.path); 1812 const deletedPaths = event.ops 1813 .filter((op) => op.action === "delete") 1814 .map((op) => op.path); 1815 if (createdPaths.length > 0) { 1816 this.syncStorage.trackRecordPaths(did, createdPaths); 1817 // Update lexicon index with NSIDs from created paths 1818 const nsids = extractNsids(createdPaths); 1819 if (nsids.length > 0) { 1820 this.syncStorage.updateLexiconIndex(nsids); 1821 } 1822 } 1823 if (deletedPaths.length > 0) { 1824 this.syncStorage.removeRecordPaths(did, deletedPaths); 1825 } 1826 1827 // Flag for deferred GC if deletes or updates occurred 1828 // (updates may orphan old MST nodes/record blocks) 1829 const hasDeletesOrUpdates = event.ops.some( 1830 (op) => op.action === "delete" || op.action === "update", 1831 ); 1832 if (hasDeletesOrUpdates) { 1833 this.syncStorage.setNeedsGc(did); 1834 } 1835 } catch { 1836 // Non-fatal: path tracking is best-effort 1837 } 1838 1839 // 9. Update manifest record 1840 if (this.repoManager) { 1841 const rkey = didToRkey(did); 1842 const existingManifest = await this.repoManager.getRecord( 1843 MANIFEST_NSID, 1844 rkey, 1845 ); 1846 if (existingManifest) { 1847 const manifest: ManifestRecord = { 1848 $type: MANIFEST_NSID, 1849 subject: did, 1850 status: "active", 1851 lastSyncRev: rev, 1852 lastSyncAt: new Date().toISOString(), 1853 createdAt: 1854 (existingManifest.record as Record<string, unknown>) 1855 ?.createdAt as string ?? new Date().toISOString(), 1856 }; 1857 await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 1858 } 1859 } 1860 1861 // 10. Sync blobs for firehose ops (fire-and-forget) 1862 const pdsEndpoint = this.syncStorage.getState(did)?.pdsEndpoint; 1863 if (pdsEndpoint) { 1864 this.syncBlobsForOps(did, pdsEndpoint, event.ops).catch((err) => { 1865 console.warn( 1866 `[replication] Blob sync error for firehose ops (${did}):`, 1867 err instanceof Error ? err.message : String(err), 1868 ); 1869 }); 1870 } 1871 1872 // 11. Complete sync event 1873 this.syncStorage.completeSyncEvent(syncEventId, { 1874 status: "success", 1875 blocksAdded: blockEntries.length, 1876 carBytes: event.blocks.length, 1877 durationMs: Date.now() - syncStart, 1878 rev, 1879 rootCid: rootCidStr, 1880 incremental: true, 1881 }); 1882 this.emitProgress({ 1883 type: "sync:complete", 1884 did, 1885 blocksStored: blockEntries.length, 1886 durationMs: Date.now() - syncStart, 1887 }); 1888 } catch (err) { 1889 this.syncStorage.completeSyncEvent(syncEventId, { 1890 status: "error", 1891 errorMessage: err instanceof Error ? err.message : String(err), 1892 durationMs: Date.now() - syncStart, 1893 incremental: true, 1894 }); 1895 this.emitProgress({ 1896 type: "sync:error", 1897 did, 1898 error: err instanceof Error ? err.message : String(err), 1899 }); 1900 throw err; 1901 } 1902 } 1903 1904 /** 1905 * Save the current firehose cursor to persistent storage. 1906 */ 1907 private saveFirehoseCursor(): void { 1908 if (!this.firehoseSubscription) return; 1909 const cursor = this.firehoseSubscription.getCursor(); 1910 if (cursor !== null) { 1911 this.syncStorage.saveFirehoseCursor(cursor); 1912 } 1913 } 1914 1915 /** 1916 * Get firehose subscription statistics. 1917 */ 1918 getFirehoseStats(): { 1919 connected: boolean; 1920 cursor: number | null; 1921 eventsReceived: number; 1922 eventsProcessed: number; 1923 trackedDids: number; 1924 } | null { 1925 if (!this.firehoseSubscription) return null; 1926 return this.firehoseSubscription.getStats(); 1927 } 1928 1929 /** 1930 * Set up gossipsub subscription for commit notifications. 1931 * Subscribes to topics for all tracked DIDs and registers a handler 1932 * that triggers syncDid() when a newer rev is received. 1933 */ 1934 private setupGossipsubSubscription(): void { 1935 const dids = this.getReplicateDids(); 1936 if (dids.length === 0) return; 1937 1938 this.networkService.onCommitNotification((notification) => { 1939 this.handleGossipsubNotification(notification).catch((err) => { 1940 console.error( 1941 `[replication] Gossipsub notification handler error for ${notification.did}:`, 1942 err instanceof Error ? err.message : String(err), 1943 ); 1944 }); 1945 }); 1946 1947 this.networkService.onIdentityNotification((notification) => { 1948 this.handleIdentityNotification(notification); 1949 }); 1950 1951 this.networkService.subscribeCommitTopics(dids); 1952 this.networkService.subscribeIdentityTopics(dids); 1953 1954 // Periodically clear the dedup set to prevent unbounded growth 1955 this.notificationCleanupTimer = setInterval(() => { 1956 this.recentNotifications.clear(); 1957 }, 60_000); 1958 } 1959 1960 /** 1961 * Handle an incoming gossipsub commit notification. 1962 * Deduplicates by did:rev, skips if local state is already current, 1963 * and triggers syncDid() if the notification represents a newer commit. 1964 */ 1965 private async handleGossipsubNotification(notification: CommitNotification): Promise<void> { 1966 const did = notification.did; 1967 1968 // Only process DIDs we are tracking 1969 const trackedDids = this.getReplicateDids(); 1970 if (!trackedDids.includes(did)) return; 1971 1972 // Dedup: skip if we've already processed this did:rev 1973 const dedupKey = `${did}:${notification.rev}`; 1974 if (this.recentNotifications.has(dedupKey)) return; 1975 this.recentNotifications.add(dedupKey); 1976 1977 // Skip if local state already matches this rev 1978 const state = this.syncStorage.getState(did); 1979 if (state?.lastSyncRev === notification.rev) return; 1980 1981 try { 1982 await this.syncDid(did, "gossipsub"); 1983 this.lastSyncTimestamps.set(did, Date.now()); 1984 } catch (err) { 1985 const message = err instanceof Error ? err.message : String(err); 1986 this.syncStorage.updateStatus(did, "error", message); 1987 } 1988 } 1989 1990 /** 1991 * Handle an incoming gossipsub identity notification. 1992 * Updates peer info immediately if the DID is tracked. 1993 */ 1994 private handleIdentityNotification(notification: IdentityNotification): void { 1995 const did = notification.did; 1996 1997 // Only process DIDs we are tracking 1998 const trackedDids = this.getReplicateDids(); 1999 if (!trackedDids.includes(did)) return; 2000 2001 // Log PeerID changes 2002 const state = this.syncStorage.getState(did); 2003 if (state?.peerId && state.peerId !== notification.peerId) { 2004 console.warn( 2005 `[replication] Identity notification: PeerID changed for ${did}: ${state.peerId}${notification.peerId}`, 2006 ); 2007 } 2008 2009 // Update peer info immediately 2010 this.syncStorage.updatePeerInfo(did, notification.peerId, notification.multiaddrs); 2011 2012 // Refresh PLC log on identity change (key rotation, PDS migration, etc.) 2013 if (did.startsWith("did:plc:")) { 2014 this.fetchPlcLog(did).catch((err) => { 2015 console.warn(`[replication] PLC log refresh on identity change for ${did} failed:`, err instanceof Error ? err.message : String(err)); 2016 }); 2017 } 2018 } 2019 2020 /** 2021 * Stop periodic sync, verification, and firehose subscription. 2022 */ 2023 stop(): void { 2024 this.stopped = true; 2025 if (this.syncTimer) { 2026 clearInterval(this.syncTimer); 2027 this.syncTimer = null; 2028 } 2029 if (this.verificationTimer) { 2030 clearInterval(this.verificationTimer); 2031 this.verificationTimer = null; 2032 } 2033 // Save cursor and stop firehose 2034 if (this.firehoseCursorSaveTimer) { 2035 clearInterval(this.firehoseCursorSaveTimer); 2036 this.firehoseCursorSaveTimer = null; 2037 } 2038 if (this.firehoseSubscription) { 2039 this.saveFirehoseCursor(); 2040 this.firehoseSubscription.stop(); 2041 this.firehoseSubscription = null; 2042 } 2043 // Stop PLC mirror refresh 2044 if (this.plcRefreshTimer) { 2045 clearInterval(this.plcRefreshTimer); 2046 this.plcRefreshTimer = null; 2047 } 2048 // Stop challenge scheduler 2049 if (this.challengeScheduler) { 2050 this.challengeScheduler.stop(); 2051 this.challengeScheduler = null; 2052 } 2053 // Stop gossipsub notification cleanup and unsubscribe identity topics 2054 if (this.notificationCleanupTimer) { 2055 clearInterval(this.notificationCleanupTimer); 2056 this.notificationCleanupTimer = null; 2057 } 2058 try { 2059 this.networkService.unsubscribeIdentityTopics(this.getReplicateDids()); 2060 } catch { 2061 // Gossipsub streams may already be closed during shutdown 2062 } 2063 } 2064 2065 /** 2066 * Run remote verification for all synced DIDs. 2067 */ 2068 async runVerification(): Promise<LayeredVerificationResult[]> { 2069 const results: LayeredVerificationResult[] = []; 2070 2071 for (const did of this.getReplicateDids()) { 2072 if (this.stopped) break; 2073 try { 2074 const result = await this.verifyDid(did); 2075 if (result) { 2076 results.push(result); 2077 this.lastVerificationResults.set(did, result); 2078 } 2079 } catch (err) { 2080 console.error(`Verification error for ${did}:`, err); 2081 } 2082 } 2083 2084 return results; 2085 } 2086 2087 /** 2088 * Run layered verification for a single DID against its source PDS. 2089 */ 2090 async verifyDid(did: string): Promise<LayeredVerificationResult | null> { 2091 const state = this.syncStorage.getState(did); 2092 if (!state || !state.pdsEndpoint) return null; 2093 2094 // Get tracked block CIDs for this DID 2095 const blockCids = this.syncStorage.getBlockCids(did); 2096 2097 // Use the root CID (commit root) for verification 2098 const rootCid = state.rootCid ?? state.lastSyncRev ?? null; 2099 2100 // Get tracked record paths for challenge generation 2101 const recordPaths = this.syncStorage.getRecordPaths(did); 2102 2103 const result = await this.remoteVerifier.verifyPeer( 2104 did, 2105 state.pdsEndpoint, 2106 rootCid, 2107 blockCids, 2108 recordPaths, 2109 ); 2110 2111 if (result.overallPassed) { 2112 this.syncStorage.updateVerifiedAt(did); 2113 } 2114 2115 return result; 2116 } 2117 2118 /** 2119 * Get the underlying SyncStorage instance. 2120 */ 2121 getSyncStorage(): SyncStorage { 2122 return this.syncStorage; 2123 } 2124 2125 /** 2126 * Get the underlying ChallengeStorage instance. 2127 */ 2128 getChallengeStorage(): ChallengeStorage { 2129 return this.challengeStorage; 2130 } 2131 2132 /** 2133 * Set the replicated repo reader for cache invalidation after sync. 2134 */ 2135 setReplicatedRepoReader(reader: ReplicatedRepoReader): void { 2136 this.replicatedRepoReader = reader; 2137 } 2138 2139 /** 2140 * Get sync states for all tracked DIDs. 2141 */ 2142 getSyncStates(): SyncState[] { 2143 return this.syncStorage.getAllStates(); 2144 } 2145 2146 /** 2147 * Query the lexicon index with optional prefix filter. 2148 */ 2149 getLexiconIndex(prefix?: string, limit?: number) { 2150 return this.syncStorage.getLexiconIndex(prefix, limit); 2151 } 2152 2153 /** 2154 * Get aggregate lexicon stats. 2155 */ 2156 getLexiconStats() { 2157 return this.syncStorage.getLexiconStats(); 2158 } 2159 2160 /** 2161 * Get all offered DIDs (awaiting mutual consent). 2162 */ 2163 getOfferedDids(): Array<{ did: string; pdsEndpoint: string | null; offeredAt: string }> { 2164 return this.syncStorage.getOfferedDids(); 2165 } 2166 2167 /** 2168 * Get the most recent verification results. 2169 */ 2170 getVerificationResults(): Map<string, LayeredVerificationResult> { 2171 return this.lastVerificationResults; 2172 } 2173 2174 // ============================================ 2175 // Blob replication 2176 // ============================================ 2177 2178 /** 2179 * Sync blobs for a DID: walk all records, extract blob CIDs, fetch new ones. 2180 */ 2181 async syncBlobs(did: string): Promise<{ fetched: number; skipped: number; errors: number; totalBytes: number }> { 2182 const state = this.syncStorage.getState(did); 2183 if (!state?.pdsEndpoint || !state.rootCid) { 2184 return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; 2185 } 2186 2187 if (!this.replicatedRepoReader) { 2188 return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; 2189 } 2190 2191 // Walk all records and collect blob CIDs 2192 const allBlobCids = new Set<string>(); 2193 const repo = await this.replicatedRepoReader.getRepo(did); 2194 if (!repo) { 2195 return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; 2196 } 2197 2198 for await (const entry of repo.walkRecords()) { 2199 const cids = extractBlobCids(entry.record); 2200 for (const cid of cids) { 2201 allBlobCids.add(cid); 2202 } 2203 } 2204 2205 // Filter to blobs not already fetched 2206 let fetched = 0; 2207 let skipped = 0; 2208 let errors = 0; 2209 let totalBytes = 0; 2210 const total = allBlobCids.size; 2211 2212 for (const blobCid of allBlobCids) { 2213 if (this.syncStorage.hasBlobCid(did, blobCid)) { 2214 skipped++; 2215 continue; 2216 } 2217 2218 try { 2219 const bytes = await this.repoFetcher.fetchBlob( 2220 state.pdsEndpoint, 2221 did, 2222 blobCid, 2223 ); 2224 if (!bytes) { 2225 skipped++; // 404 — blob deleted upstream 2226 continue; 2227 } 2228 2229 if (!await this.verifyBlobCid(blobCid, bytes)) { 2230 console.warn(`[replication] Blob CID mismatch for ${blobCid} (${did})`); 2231 errors++; 2232 continue; 2233 } 2234 2235 await this.blockStore.putBlock(blobCid, bytes); 2236 this.syncStorage.trackBlobsWithSize(did, [{ cid: blobCid, sizeBytes: bytes.length }]); 2237 this.networkService.provideBlocks([blobCid]).catch(() => {}); 2238 fetched++; 2239 totalBytes += bytes.length; 2240 this.emitProgress({ type: "sync:blob-progress", did, blobsFetched: fetched, blobsTotal: total, blobBytes: totalBytes }); 2241 } catch (err) { 2242 console.warn( 2243 `[replication] Failed to fetch blob ${blobCid} for ${did}:`, 2244 err instanceof Error ? err.message : String(err), 2245 ); 2246 errors++; 2247 } 2248 } 2249 2250 return { fetched, skipped, errors, totalBytes }; 2251 } 2252 2253 /** 2254 * Sync blobs for specific firehose ops (create/update only). 2255 */ 2256 private async syncBlobsForOps( 2257 did: string, 2258 pdsEndpoint: string, 2259 ops: Array<{ action: string; path: string }>, 2260 ): Promise<void> { 2261 if (!this.replicatedRepoReader) return; 2262 2263 for (const op of ops) { 2264 if (op.action !== "create" && op.action !== "update") continue; 2265 2266 const parts = op.path.split("/"); 2267 if (parts.length < 2) continue; 2268 const collection = parts.slice(0, -1).join("/"); 2269 const rkey = parts[parts.length - 1]!; 2270 2271 try { 2272 const record = await this.replicatedRepoReader.getRecord(did, collection, rkey); 2273 if (!record) continue; 2274 2275 const blobCids = extractBlobCids(record.value); 2276 for (const blobCid of blobCids) { 2277 if (this.syncStorage.hasBlobCid(did, blobCid)) continue; 2278 2279 const bytes = await this.repoFetcher.fetchBlob(pdsEndpoint, did, blobCid); 2280 if (!bytes) continue; 2281 2282 if (!await this.verifyBlobCid(blobCid, bytes)) { 2283 console.warn(`[replication] Blob CID mismatch for ${blobCid} (${did})`); 2284 continue; 2285 } 2286 2287 await this.blockStore.putBlock(blobCid, bytes); 2288 this.syncStorage.trackBlobs(did, [blobCid]); 2289 this.networkService.provideBlocks([blobCid]).catch(() => {}); 2290 } 2291 } catch (err) { 2292 console.warn( 2293 `[replication] Failed to sync blobs for op ${op.path} (${did}):`, 2294 err instanceof Error ? err.message : String(err), 2295 ); 2296 } 2297 } 2298 } 2299 2300 /** 2301 * Verify that blob bytes match the expected CID. 2302 */ 2303 private async verifyBlobCid(expectedCid: string, bytes: Uint8Array): Promise<boolean> { 2304 const actualCid = await createCid(CODEC_RAW, bytes); 2305 return cidToString(actualCid) === expectedCid; 2306 } 2307 2308 // ============================================ 2309 // Challenge-response verification 2310 // ============================================ 2311 2312 /** 2313 * Start the challenge scheduler for periodic proof-of-storage verification. 2314 * Requires a ChallengeTransport to communicate with peers. 2315 */ 2316 startChallengeScheduler(transport: ChallengeTransport): void { 2317 if (this.challengeScheduler) return; 2318 2319 this.challengeScheduler = new ChallengeScheduler( 2320 this.config.DID ?? "", 2321 this.policyEngine, 2322 this.syncStorage, 2323 this.challengeStorage, 2324 this.blockStore, 2325 transport, 2326 ); 2327 this.challengeScheduler.start(); 2328 } 2329 2330 /** 2331 * Get challenge history for a target peer. 2332 */ 2333 getChallengeResults( 2334 targetDid: string, 2335 subjectDid?: string, 2336 ): ChallengeHistoryRow[] { 2337 return this.challengeStorage.getHistory(targetDid, subjectDid); 2338 } 2339 2340 /** 2341 * Get peer reliability scores. 2342 * If peerDid is provided, returns reliability for that specific peer. 2343 * Otherwise, returns all peer reliability scores. 2344 */ 2345 getPeerReliability(peerDid?: string): PeerReliabilityRow[] { 2346 if (peerDid) { 2347 return this.challengeStorage.getReliability(peerDid); 2348 } 2349 return this.challengeStorage.getAllReliability(); 2350 } 2351 2352 /** 2353 * Refresh peer info for all DIDs associated with a PDS endpoint. 2354 * Called when a connection to that endpoint fails (e.g. challenge transport fallback). 2355 * Fire-and-forget: clears stale cache and triggers re-discovery. 2356 */ 2357 refreshPeerInfoForEndpoint(pdsEndpoint: string): void { 2358 const states = this.syncStorage.getAllStates(); 2359 for (const state of states) { 2360 if (state.pdsEndpoint === pdsEndpoint) { 2361 this.syncStorage.clearPeerInfo(state.did); 2362 this.peerDiscovery.discoverPeer(state.did).then((peerInfo) => { 2363 if (peerInfo) { 2364 this.syncStorage.updatePeerInfo(state.did, peerInfo.peerId, peerInfo.multiaddrs); 2365 } 2366 }).catch(() => {}); 2367 } 2368 } 2369 } 2370 2371 // ============================================ 2372 // PLC mirror 2373 // ============================================ 2374 2375 /** 2376 * Fetch and validate the PLC audit log for a single DID. 2377 */ 2378 private async fetchPlcLog(did: string): Promise<void> { 2379 const { fetchAndValidateLog } = await import("../identity/plc-mirror.js"); 2380 await fetchAndValidateLog(this.db, did); 2381 // Announce via DHT that we hold this DID's PLC log 2382 this.networkService.provideForDid(did).catch(() => {}); 2383 } 2384 2385 /** 2386 * Refresh PLC logs for all tracked did:plc DIDs. 2387 */ 2388 private async refreshAllPlcLogs(): Promise<void> { 2389 const { getAllPlcMirrorDids, fetchAndValidateLog, publishPlcProviders } = await import("../identity/plc-mirror.js"); 2390 2391 // Refresh existing mirrored DIDs 2392 const mirroredDids = getAllPlcMirrorDids(this.db); 2393 for (const did of mirroredDids) { 2394 if (this.stopped) break; 2395 try { 2396 await fetchAndValidateLog(this.db, did); 2397 } catch (err) { 2398 console.warn( 2399 `[replication] PLC log refresh for ${did} failed:`, 2400 err instanceof Error ? err.message : String(err), 2401 ); 2402 // Fallback: try discovering PLC log from DHT peers 2403 await this.fetchPlcLogFromPeers(did).catch(() => {}); 2404 } 2405 } 2406 2407 // Also fetch for any tracked did:plc DIDs that don't have a mirror entry yet 2408 const trackedDids = this.getReplicateDids().filter((d) => d.startsWith("did:plc:")); 2409 const mirroredSet = new Set(mirroredDids); 2410 for (const did of trackedDids) { 2411 if (this.stopped) break; 2412 if (mirroredSet.has(did)) continue; 2413 try { 2414 await fetchAndValidateLog(this.db, did); 2415 } catch (err) { 2416 console.warn( 2417 `[replication] PLC log fetch for ${did} failed:`, 2418 err instanceof Error ? err.message : String(err), 2419 ); 2420 // Fallback: try discovering PLC log from DHT peers 2421 await this.fetchPlcLogFromPeers(did).catch(() => {}); 2422 } 2423 } 2424 2425 // Re-announce all mirrored DIDs to the DHT 2426 const allMirrored = getAllPlcMirrorDids(this.db); 2427 if (allMirrored.length > 0) { 2428 const helia = this.networkService as unknown as { getLibp2p?(): unknown }; 2429 if (typeof helia.getLibp2p === "function" && helia.getLibp2p()) { 2430 // Use the IpfsService routing directly via provideForDid 2431 for (const did of allMirrored) { 2432 this.networkService.provideForDid(did).catch(() => {}); 2433 } 2434 } 2435 } 2436 } 2437 2438 /** 2439 * Fallback: discover PLC log providers via DHT and fetch from a peer. 2440 * Used when plc.directory is unavailable. 2441 */ 2442 private async fetchPlcLogFromPeers(did: string): Promise<void> { 2443 const providers = await this.networkService.findProvidersForDid(did); 2444 if (providers.length === 0) return; 2445 2446 const { validateOperationChain, getStoredLog } = await import("../identity/plc-mirror.js"); 2447 2448 for (const multiaddr of providers) { 2449 try { 2450 // Extract the host:port from the multiaddr for an HTTP fetch 2451 const hostMatch = multiaddr.match(/\/ip[46]\/([^/]+)\/tcp\/(\d+)/); 2452 if (!hostMatch) continue; 2453 const [, host, port] = hostMatch; 2454 2455 const url = `http://${host}:${port}/xrpc/org.p2pds.plc.getLog?did=${encodeURIComponent(did)}`; 2456 const res = await fetch(url, { signal: AbortSignal.timeout(10000) }); 2457 if (!res.ok) continue; 2458 2459 const data = await res.json() as { operations?: unknown[]; status?: { validated?: boolean } }; 2460 if (!data.operations || !Array.isArray(data.operations)) continue; 2461 2462 // Validate the returned chain independently 2463 const validation = await validateOperationChain(data.operations as any, did); 2464 if (!validation.valid) continue; 2465 2466 // Store it — we verified it ourselves 2467 const existingLog = getStoredLog(this.db, did); 2468 const newOpCount = data.operations.length; 2469 2470 // Accept if we don't have it yet, or if the new chain is longer 2471 if (!existingLog || newOpCount > existingLog.status.opCount) { 2472 const isTombstoned = (data.operations as any[]).some( 2473 (op: any) => !op.nullified && op.operation?.type === "plc_tombstone", 2474 ); 2475 const lastOpCreatedAt = newOpCount > 0 2476 ? (data.operations as any[])[newOpCount - 1]!.createdAt ?? null 2477 : null; 2478 2479 this.db.prepare( 2480 `INSERT INTO plc_mirror (did, operations_json, op_count, last_fetched_at, last_op_created_at, validated, is_tombstoned) 2481 VALUES (?, ?, ?, ?, ?, ?, ?) 2482 ON CONFLICT(did) DO UPDATE SET 2483 operations_json = excluded.operations_json, 2484 op_count = excluded.op_count, 2485 last_fetched_at = excluded.last_fetched_at, 2486 last_op_created_at = excluded.last_op_created_at, 2487 validated = excluded.validated, 2488 is_tombstoned = excluded.is_tombstoned`, 2489 ).run( 2490 did, 2491 JSON.stringify(data.operations), 2492 newOpCount, 2493 new Date().toISOString(), 2494 lastOpCreatedAt, 2495 validation.valid ? 1 : 0, 2496 isTombstoned ? 1 : 0, 2497 ); 2498 console.log(`[replication] PLC log for ${did} fetched from peer (${newOpCount} ops)`); 2499 return; 2500 } 2501 } catch { 2502 // Try next provider 2503 } 2504 } 2505 } 2506 2507 /** 2508 * Get PLC log status for a single DID. 2509 */ 2510 getPlcLogStatus(did: string): { opCount: number; lastFetchedAt: string; lastOpCreatedAt: string | null; validated: boolean; isTombstoned: boolean } | null { 2511 const row = this.db 2512 .prepare("SELECT * FROM plc_mirror WHERE did = ?") 2513 .get(did) as Record<string, unknown> | undefined; 2514 if (!row) return null; 2515 return { 2516 opCount: row.op_count as number, 2517 lastFetchedAt: row.last_fetched_at as string, 2518 lastOpCreatedAt: (row.last_op_created_at as string) ?? null, 2519 validated: (row.validated as number) === 1, 2520 isTombstoned: (row.is_tombstoned as number) === 1, 2521 }; 2522 } 2523 2524 /** 2525 * Get aggregate PLC mirror stats. 2526 */ 2527 getPlcMirrorStatus(): { mirroredDids: number; totalOps: number; lastRefresh: string | null } { 2528 const countRow = this.db 2529 .prepare("SELECT COUNT(*) as count, COALESCE(SUM(op_count), 0) as total_ops FROM plc_mirror") 2530 .get() as { count: number; total_ops: number }; 2531 const lastRow = this.db 2532 .prepare("SELECT MAX(last_fetched_at) as last_refresh FROM plc_mirror") 2533 .get() as { last_refresh: string | null }; 2534 return { 2535 mirroredDids: countRow.count, 2536 totalOps: countRow.total_ops, 2537 lastRefresh: lastRow.last_refresh, 2538 }; 2539 } 2540 2541 /** 2542 * Check if peer info should be refreshed based on TTL. 2543 */ 2544 private shouldRefreshPeerInfo(state: SyncState): boolean { 2545 if (!state.peerInfoFetchedAt) return true; 2546 const fetchedAt = new Date(state.peerInfoFetchedAt).getTime(); 2547 return Date.now() - fetchedAt > PEER_INFO_TTL_MS; 2548 } 2549}