/** * Capstone E2E test: full on-protocol bidirectional replication. * * Two startServer() instances with IPFS_NETWORKING: true, real libp2p, * CAR-over-libp2p protocol. Exercises the complete loop: * self-sync -> peer dial -> cross-sync via libp2p -> XRPC serving -> * incremental re-sync -> mutual offers -> auto-policies */ import { describe, it, expect, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join, resolve } from "node:path"; import Database from "better-sqlite3"; import type { Config } from "./config.js"; import { startServer, type ServerHandle } from "./start.js"; import { createTestRepo, createTestRepoWithUpdate, } from "./replication/test-helpers.js"; import type { DidResolver, DidDocument } from "./did-resolver.js"; import { PolicyEngine } from "./policy/engine.js"; import { OFFER_NSID, didToRkey } from "./replication/types.js"; const DID_A = "did:plc:capstone-alice"; const DID_B = "did:plc:capstone-bob"; function baseConfig(dataDir: string): Config { return { PDS_HOSTNAME: "local.test", AUTH_TOKEN: "test-auth-token", JWT_SECRET: "test-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: 0, IPFS_ENABLED: true, IPFS_NETWORKING: true, REPLICATE_DIDS: [], FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, RATE_LIMIT_ENABLED: false, RATE_LIMIT_READ_PER_MIN: 300, RATE_LIMIT_SYNC_PER_MIN: 30, RATE_LIMIT_SESSION_PER_MIN: 10, RATE_LIMIT_WRITE_PER_MIN: 200, RATE_LIMIT_CHALLENGE_PER_MIN: 20, RATE_LIMIT_MAX_CONNECTIONS: 100, RATE_LIMIT_FIREHOSE_PER_IP: 3, OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", }; } // ---- Enhanced mock PDS ---- interface EnhancedMockPds { url: string; port: number; close: () => Promise; updateAccount: (did: string, carBytes: Uint8Array) => void; addRecord: (did: string, collection: string, rkey: string, value: unknown) => void; } async function startEnhancedMockPds( accounts: Array<{ did: string; carBytes: Uint8Array }>, ): Promise { const { createServer } = await import("node:http"); const accountMap = new Map(); for (const a of accounts) { accountMap.set(a.did, a.carBytes); } // Records: did -> collection -> rkey -> { uri, cid, value } const records = new Map< string, Map> >(); const server = createServer((req, res) => { const url = new URL(req.url ?? "/", "http://localhost"); const pathname = url.pathname; if (pathname === "/xrpc/com.atproto.sync.getRepo") { const did = url.searchParams.get("did"); if (!did || !accountMap.has(did)) { res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "RepoNotFound" })); return; } const carBytes = accountMap.get(did)!; res.writeHead(200, { "Content-Type": "application/vnd.ipld.car", "Content-Length": String(carBytes.length), }); res.end(Buffer.from(carBytes)); return; } if (pathname === "/xrpc/com.atproto.repo.listRecords") { const did = url.searchParams.get("repo"); const collection = url.searchParams.get("collection"); if (!did || !collection) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "InvalidRequest" })); return; } const didRecords = records.get(did)?.get(collection); const recordList = didRecords ? Array.from(didRecords.values()) : []; res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ records: recordList })); return; } if (pathname === "/xrpc/com.atproto.repo.getRecord") { const did = url.searchParams.get("repo"); const collection = url.searchParams.get("collection"); const rkey = url.searchParams.get("rkey"); if (!did || !collection || !rkey) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "InvalidRequest" })); return; } const record = records.get(did)?.get(collection)?.get(rkey); if (!record) { res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "RecordNotFound" })); return; } res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify(record)); return; } res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "NotFound" })); }); return new Promise((resolvePromise) => { server.listen(0, "127.0.0.1", () => { const addr = server.address() as { port: number }; const pdsUrl = `http://127.0.0.1:${addr.port}`; resolvePromise({ url: pdsUrl, port: addr.port, close: () => new Promise((res) => server.close(() => res())), updateAccount: (did: string, carBytes: Uint8Array) => { accountMap.set(did, carBytes); }, addRecord: (did: string, collection: string, rkey: string, value: unknown) => { if (!records.has(did)) records.set(did, new Map()); const didMap = records.get(did)!; if (!didMap.has(collection)) didMap.set(collection, new Map()); didMap.get(collection)!.set(rkey, { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest", value, }); }, }); }); }); } function createMockDidResolver(mapping: Record): DidResolver { return { resolve: async (did: string): Promise => { const pdsUrl = mapping[did]; if (!pdsUrl) return null; return { id: did, service: [ { id: "#atproto_pds", type: "AtprotoPersonalDataServer", serviceEndpoint: pdsUrl, }, ], } as unknown as DidDocument; }, } as DidResolver; } function createMockRecordWriter(did: string, pds: EnhancedMockPds) { return { putRecord: async (collection: string, rkey: string, record: unknown) => { pds.addRecord(did, collection, rkey, record); return { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest" }; }, deleteRecord: async (_collection: string, _rkey: string) => {}, listRecords: async (collection: string, _opts: { limit: number }) => { const res = await fetch( `${pds.url}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&limit=100`, ); if (!res.ok) return { records: [] }; return (await res.json()) as { records: Array<{ uri: string; cid: string; value: unknown }> }; }, }; } /** * Wait for a condition to become true, with a timeout. */ async function waitFor( fn: () => Promise | boolean, timeoutMs: number = 10_000, intervalMs: number = 200, ): Promise { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await fn()) return; await new Promise((r) => setTimeout(r, intervalMs)); } throw new Error(`waitFor timed out after ${timeoutMs}ms`); } /** * Pre-create identity in the SQLite database (simulates OAuth already done). */ function presetIdentity(dataDir: string, did: string, handle: string): void { const db = new Database(resolve(dataDir, "pds.db")); db.pragma("journal_mode = WAL"); db.exec( "CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))", ); db.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(did, handle); db.close(); } // ---- Test suite ---- describe("Capstone E2E: on-protocol bidirectional replication", () => { let tmpDirA: string; let tmpDirB: string; let handleA: ServerHandle | undefined; let handleB: ServerHandle | undefined; let mockPds: EnhancedMockPds | undefined; afterEach(async () => { const closes: Promise[] = []; if (handleA) closes.push(handleA.close().catch(() => {})); if (handleB) closes.push(handleB.close().catch(() => {})); await Promise.all(closes); handleA = undefined; handleB = undefined; if (mockPds) { await mockPds.close().catch(() => {}); mockPds = undefined; } if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); }); it("full flow: self-sync, libp2p cross-sync, XRPC serve, incremental re-sync, mutual offers", async () => { // ================================================================ // Step 1: Setup — create repos, mock PDS, start two servers // ================================================================ tmpDirA = mkdtempSync(join(tmpdir(), "capstone-a-")); tmpDirB = mkdtempSync(join(tmpdir(), "capstone-b-")); // Create repos with update support for incremental sync const { initialCar: aliceInitialCar, fullCar: aliceFullCar } = await createTestRepoWithUpdate( DID_A, [ { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post 1", createdAt: new Date().toISOString() } }, { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, ], [ { collection: "app.bsky.feed.post", rkey: "a3", record: { text: "Alice post 3 (update)", createdAt: new Date().toISOString() } }, ], ); const { initialCar: bobInitialCar, fullCar: bobFullCar } = await createTestRepoWithUpdate( DID_B, [ { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post 1", createdAt: new Date().toISOString() } }, ], [ { collection: "app.bsky.feed.post", rkey: "b2", record: { text: "Bob post 2 (update)", createdAt: new Date().toISOString() } }, ], ); // Start mock PDS with initial CARs mockPds = await startEnhancedMockPds([ { did: DID_A, carBytes: aliceInitialCar }, { did: DID_B, carBytes: bobInitialCar }, ]); // Pre-inject offer records so they're available for discovery later mockPds.addRecord(DID_A, OFFER_NSID, didToRkey(DID_B), { $type: OFFER_NSID, subject: DID_B, minCopies: 2, intervalSec: 300, priority: 50, createdAt: new Date().toISOString(), }); mockPds.addRecord(DID_B, OFFER_NSID, didToRkey(DID_A), { $type: OFFER_NSID, subject: DID_A, minCopies: 3, intervalSec: 600, priority: 75, createdAt: new Date().toISOString(), }); const resolver = createMockDidResolver({ [DID_A]: mockPds.url, [DID_B]: mockPds.url, }); // Pre-set identities const configA = baseConfig(tmpDirA); configA.DID = DID_A; configA.HANDLE = "alice.test"; presetIdentity(tmpDirA, DID_A, "alice.test"); const configB = baseConfig(tmpDirB); configB.DID = DID_B; configB.HANDLE = "bob.test"; presetIdentity(tmpDirB, DID_B, "bob.test"); // Start both servers with real networking handleA = await startServer(configA, { didResolver: resolver }); handleB = await startServer(configB, { didResolver: resolver }); expect(handleA.replicationManager).toBeDefined(); expect(handleB.replicationManager).toBeDefined(); expect(handleA.ipfsService).toBeDefined(); expect(handleB.ipfsService).toBeDefined(); const rmA = handleA.replicationManager!; const rmB = handleB.replicationManager!; const ipfsA = handleA.ipfsService!; const ipfsB = handleB.ipfsService!; // Prevent periodic sync from interfering — set stopped flag directly // (don't call stop() which would unsubscribe topics) (rmA as unknown as { stopped: boolean }).stopped = true; (rmB as unknown as { stopped: boolean }).stopped = true; const syncTimerA = (rmA as unknown as { syncTimer: ReturnType | null }).syncTimer; if (syncTimerA) { clearInterval(syncTimerA); (rmA as unknown as { syncTimer: null }).syncTimer = null; } const syncTimerB = (rmB as unknown as { syncTimer: ReturnType | null }).syncTimer; if (syncTimerB) { clearInterval(syncTimerB); (rmB as unknown as { syncTimer: null }).syncTimer = null; } // Verify both nodes have peer IDs (networking is on) const peerIdA = ipfsA.getPeerId()!; const peerIdB = ipfsB.getPeerId()!; expect(peerIdA).toBeTruthy(); expect(peerIdB).toBeTruthy(); expect(peerIdA).not.toBe(peerIdB); const addrsA = ipfsA.getMultiaddrs(); const addrsB = ipfsB.getMultiaddrs(); expect(addrsA.length).toBeGreaterThan(0); expect(addrsB.length).toBeGreaterThan(0); // ================================================================ // Step 2: Self-sync — each node syncs its own DID // ================================================================ await rmA.addDid(DID_A); await waitFor(() => { const state = rmA.getSyncStorage().getState(DID_A); return state?.status === "synced"; }, 15_000); await rmB.addDid(DID_B); await waitFor(() => { const state = rmB.getSyncStorage().getState(DID_B); return state?.status === "synced"; }, 15_000); // Verify self-sync completed expect(rmA.getSyncStorage().getState(DID_A)?.status).toBe("synced"); expect(rmB.getSyncStorage().getState(DID_B)?.status).toBe("synced"); // ================================================================ // Step 3: Peer dial + peer record injection // ================================================================ // Add org.p2pds.peer/self records to mock PDS so discoverPeer() // finds the real peer info during sync. Each DID's peer record // contains the multiaddrs of the node that hosts that DID. const PEER_NSID = "org.p2pds.peer"; mockPds.addRecord(DID_A, PEER_NSID, "self", { $type: PEER_NSID, peerId: peerIdA, multiaddrs: addrsA, createdAt: new Date().toISOString(), }); mockPds.addRecord(DID_B, PEER_NSID, "self", { $type: PEER_NSID, peerId: peerIdB, multiaddrs: addrsB, createdAt: new Date().toISOString(), }); // Connect the two nodes via libp2p await ipfsA.dial(addrsB[0]!); await waitFor( () => ipfsA.getConnectionCount() > 0 && ipfsB.getConnectionCount() > 0, 10_000, ); // ================================================================ // Step 4: Cross-sync — addDid discovers peer info, syncs via libp2p // ================================================================ // addDid fires background syncDid → discoverPeer finds peer record → // peer info populated → libp2p path activated. // Sync sequentially to avoid concurrent stream conflicts on the // same libp2p connection (yamux muxer doesn't handle concurrent // dialProtocol from both sides well). await rmA.addDid(DID_B); await waitFor(() => { const state = rmA.getSyncStorage().getState(DID_B); if (state?.status !== "synced") return false; // Also wait for sync_history to be completed (not just state table) const history = rmA.getSyncStorage().getSyncHistory(DID_B, 5); return history.some((h) => h.sourceType === "libp2p" && h.status === "success"); }, 30_000); await rmB.addDid(DID_A); await waitFor(() => { const state = rmB.getSyncStorage().getState(DID_A); if (state?.status !== "synced") return false; const history = rmB.getSyncStorage().getSyncHistory(DID_A, 5); return history.some((h) => h.sourceType === "libp2p" && h.status === "success"); }, 30_000); // Verify peer info was discovered and stored const stateAforB = rmA.getSyncStorage().getState(DID_B); expect(stateAforB?.peerId).toBe(peerIdB); expect(stateAforB?.peerMultiaddrs?.length).toBeGreaterThan(0); const stateBforA = rmB.getSyncStorage().getState(DID_A); expect(stateBforA?.peerId).toBe(peerIdA); expect(stateBforA?.peerMultiaddrs?.length).toBeGreaterThan(0); // Verify sync used libp2p source (the initial sync should have used it // since peer info was discovered before the fetch step) const historyA = rmA.getSyncStorage().getSyncHistory(DID_B, 5); const libp2pSyncA = historyA.find((h) => h.sourceType === "libp2p"); expect(libp2pSyncA).toBeDefined(); expect(libp2pSyncA!.status).toBe("success"); const historyB = rmB.getSyncStorage().getSyncHistory(DID_A, 5); const libp2pSyncB = historyB.find((h) => h.sourceType === "libp2p"); expect(libp2pSyncB).toBeDefined(); expect(libp2pSyncB!.status).toBe("success"); // ================================================================ // Step 6: Verify cross-serving via XRPC // ================================================================ // Node A serves Bob's repo via getRepo const getRepoA = await fetch( `${handleA.url}/xrpc/com.atproto.sync.getRepo?did=${DID_B}`, ); expect(getRepoA.status).toBe(200); expect(getRepoA.headers.get("content-type")).toBe("application/vnd.ipld.car"); const carBytesFromA = new Uint8Array(await getRepoA.arrayBuffer()); expect(carBytesFromA.length).toBeGreaterThan(0); // Node B serves Alice's repo via getRepo const getRepoB = await fetch( `${handleB.url}/xrpc/com.atproto.sync.getRepo?did=${DID_A}`, ); expect(getRepoB.status).toBe(200); const carBytesFromB = new Uint8Array(await getRepoB.arrayBuffer()); expect(carBytesFromB.length).toBeGreaterThan(0); // Node A can read Bob's record const recordA = await fetch( `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_B}&collection=app.bsky.feed.post&rkey=b1`, ); expect(recordA.status).toBe(200); const recA = (await recordA.json()) as { value: { text: string } }; expect(recA.value.text).toBe("Bob post 1"); // Node B can read Alice's record const recordB = await fetch( `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_A}&collection=app.bsky.feed.post&rkey=a1`, ); expect(recordB.status).toBe(200); const recB = (await recordB.json()) as { value: { text: string } }; expect(recB.value.text).toBe("Alice post 1"); // Node A can describe Bob's repo const describeA = await fetch( `${handleA.url}/xrpc/com.atproto.repo.describeRepo?repo=${DID_B}`, ); expect(describeA.status).toBe(200); const descA = (await describeA.json()) as { did: string; collections: string[] }; expect(descA.did).toBe(DID_B); expect(descA.collections).toContain("app.bsky.feed.post"); // Node A getRepoStatus for Bob const repoStatusA = await fetch( `${handleA.url}/xrpc/com.atproto.sync.getRepoStatus?did=${DID_B}`, ); expect(repoStatusA.status).toBe(200); const rsA = (await repoStatusA.json()) as { did: string; rev: string | null }; expect(rsA.did).toBe(DID_B); expect(rsA.rev).toBeTruthy(); // ================================================================ // Step 7: Update repos + incremental re-sync via libp2p // ================================================================ // Update mock PDS with full CARs (which include additional records) mockPds.updateAccount(DID_A, aliceFullCar); mockPds.updateAccount(DID_B, bobFullCar); // First, re-sync each node's own DID to update local state // (so the peer can serve the updated data) await rmA.syncDid(DID_A, "manual"); await rmB.syncDid(DID_B, "manual"); // Now cross-sync: each node fetches the other's updated data via libp2p await rmA.syncDid(DID_B, "manual"); await rmB.syncDid(DID_A, "manual"); // Verify incremental sync occurred const historyA2 = rmA.getSyncStorage().getSyncHistory(DID_B, 10); const incrementalSyncA = historyA2.find((h) => h.incremental && h.sourceType === "libp2p"); expect(incrementalSyncA).toBeDefined(); expect(incrementalSyncA!.status).toBe("success"); // Verify the incremental sync actually transferred data expect(incrementalSyncA!.carBytes).toBeGreaterThan(0); // Verify the new records are accessible const newRecordA = await fetch( `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_B}&collection=app.bsky.feed.post&rkey=b2`, ); expect(newRecordA.status).toBe(200); const newRecA = (await newRecordA.json()) as { value: { text: string } }; expect(newRecA.value.text).toBe("Bob post 2 (update)"); const newRecordB = await fetch( `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_A}&collection=app.bsky.feed.post&rkey=a3`, ); expect(newRecordB.status).toBe(200); const newRecB = (await newRecordB.json()) as { value: { text: string } }; expect(newRecB.value.text).toBe("Alice post 3 (update)"); // ================================================================ // Step 8: Mutual offers -> auto-policies // ================================================================ // Inject PolicyEngine and RecordWriter into both ReplicationManagers const peA = new PolicyEngine({ version: 1, policies: [] }); const peB = new PolicyEngine({ version: 1, policies: [] }); (rmA as unknown as { policyEngine: PolicyEngine }).policyEngine = peA; (rmB as unknown as { policyEngine: PolicyEngine }).policyEngine = peB; rmA.setPdsClient(createMockRecordWriter(DID_A, mockPds), DID_A); rmB.setPdsClient(createMockRecordWriter(DID_B, mockPds), DID_B); // Discover offers const offerManagerA = rmA.getOfferManager(); const offerManagerB = rmB.getOfferManager(); expect(offerManagerA).toBeDefined(); expect(offerManagerB).toBeDefined(); const statesA = rmA.getSyncStates(); const peersA = statesA.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); const agreementsA = await offerManagerA!.discoverAndSync(peersA); const statesB = rmB.getSyncStates(); const peersB = statesB.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); const agreementsB = await offerManagerB!.discoverAndSync(peersB); // Both should detect one mutual agreement expect(agreementsA.length).toBe(1); expect(agreementsA[0]!.counterpartyDid).toBe(DID_B); expect(agreementsB.length).toBe(1); expect(agreementsB[0]!.counterpartyDid).toBe(DID_A); // Verify effective params: max(minCopies), min(intervalSec), max(priority) expect(agreementsA[0]!.effectiveParams.minCopies).toBe(3); expect(agreementsA[0]!.effectiveParams.intervalSec).toBe(300); expect(agreementsA[0]!.effectiveParams.priority).toBe(75); // Verify policies were created const policiesA = peA.getPolicies(); expect(policiesA.length).toBe(1); expect(policiesA[0]!.id).toBe(`p2p:${DID_B}`); expect(policiesA[0]!.replication.minCopies).toBe(3); const policiesB = peB.getPolicies(); expect(policiesB.length).toBe(1); expect(policiesB[0]!.id).toBe(`p2p:${DID_A}`); }, 120_000); });