/** * Bidirectional replication E2E test. * * Two p2pds nodes, each with a simulated user account on a mock PDS. * Each node tracks the other's DID, syncs data, publishes offers, * discovers mutual agreements, and serves replicated data via sync endpoints. */ import { describe, it, expect, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join, resolve } from "node:path"; import Database from "better-sqlite3"; import type { Config } from "./config.js"; import { startServer, type ServerHandle } from "./start.js"; import { createTestRepo, startMockPds, type MockPds, } from "./replication/test-helpers.js"; import type { DidResolver, DidDocument } from "./did-resolver.js"; import { PolicyEngine } from "./policy/engine.js"; import { OFFER_NSID, didToRkey } from "./replication/types.js"; /** DID for Node A's user identity. */ const DID_NODE_A = "did:plc:node-a-user"; /** DID for Node B's user identity. */ const DID_NODE_B = "did:plc:node-b-user"; function baseConfig(dataDir: string): Config { return { PDS_HOSTNAME: "local.test", AUTH_TOKEN: "test-auth-token", JWT_SECRET: "test-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: 0, IPFS_ENABLED: true, IPFS_NETWORKING: false, REPLICATE_DIDS: [], FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, RATE_LIMIT_ENABLED: false, RATE_LIMIT_READ_PER_MIN: 300, RATE_LIMIT_SYNC_PER_MIN: 30, RATE_LIMIT_SESSION_PER_MIN: 10, RATE_LIMIT_WRITE_PER_MIN: 200, RATE_LIMIT_CHALLENGE_PER_MIN: 20, RATE_LIMIT_MAX_CONNECTIONS: 100, RATE_LIMIT_FIREHOSE_PER_IP: 3, OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", }; } /** * Enhanced mock PDS that serves configurable records per DID per collection. * Supports adding offer records dynamically to simulate users publishing offers. */ interface EnhancedMockPds extends MockPds { addRecord(did: string, collection: string, rkey: string, value: unknown): void; } async function startEnhancedMockPds( accounts: Array<{ did: string; carBytes: Uint8Array }>, ): Promise { const { createServer } = await import("node:http"); const accountMap = new Map(); for (const a of accounts) { accountMap.set(a.did, a.carBytes); } // Records store: did -> collection -> rkey -> { uri, cid, value } const records = new Map>>(); const server = createServer((req, res) => { const url = new URL(req.url ?? "/", "http://localhost"); const pathname = url.pathname; if (pathname === "/xrpc/com.atproto.sync.getRepo") { const did = url.searchParams.get("did"); if (!did || !accountMap.has(did)) { res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "RepoNotFound" })); return; } const carBytes = accountMap.get(did)!; res.writeHead(200, { "Content-Type": "application/vnd.ipld.car", "Content-Length": String(carBytes.length), }); res.end(Buffer.from(carBytes)); return; } if (pathname === "/xrpc/com.atproto.repo.listRecords") { const did = url.searchParams.get("repo"); const collection = url.searchParams.get("collection"); if (!did || !collection) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "InvalidRequest" })); return; } const didRecords = records.get(did)?.get(collection); const recordList = didRecords ? Array.from(didRecords.values()) : []; res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ records: recordList })); return; } if (pathname === "/xrpc/com.atproto.repo.getRecord") { const did = url.searchParams.get("repo"); const collection = url.searchParams.get("collection"); const rkey = url.searchParams.get("rkey"); if (!did || !collection || !rkey) { res.writeHead(400, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "InvalidRequest" })); return; } const record = records.get(did)?.get(collection)?.get(rkey); if (!record) { res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "RecordNotFound" })); return; } res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify(record)); return; } res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "NotFound" })); }); return new Promise((resolve) => { server.listen(0, "127.0.0.1", () => { const addr = server.address() as { port: number }; const pdsUrl = `http://127.0.0.1:${addr.port}`; resolve({ url: pdsUrl, port: addr.port, close: () => new Promise((res) => server.close(() => res())), updateAccount: (did: string, carBytes: Uint8Array) => { accountMap.set(did, carBytes); }, addRecord: (did: string, collection: string, rkey: string, value: unknown) => { if (!records.has(did)) records.set(did, new Map()); const didMap = records.get(did)!; if (!didMap.has(collection)) didMap.set(collection, new Map()); didMap.get(collection)!.set(rkey, { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest", value, }); }, }); }); }); } function createMockDidResolver(mapping: Record): DidResolver { return { resolve: async (did: string): Promise => { const pdsUrl = mapping[did]; if (!pdsUrl) return null; return { id: did, service: [ { id: "#atproto_pds", type: "AtprotoPersonalDataServer", serviceEndpoint: pdsUrl, }, ], } as unknown as DidDocument; }, } as DidResolver; } describe("Bidirectional replication E2E", () => { let tmpDirA: string; let tmpDirB: string; let handleA: ServerHandle | undefined; let handleB: ServerHandle | undefined; let mockPds: EnhancedMockPds | undefined; afterEach(async () => { if (handleA) { await handleA.close(); handleA = undefined; } if (handleB) { await handleB.close(); handleB = undefined; } if (mockPds) { await mockPds.close(); mockPds = undefined; } if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); }); it("two nodes sync each other's data and serve it via sync endpoints", async () => { tmpDirA = mkdtempSync(join(tmpdir(), "bidir-a-")); tmpDirB = mkdtempSync(join(tmpdir(), "bidir-b-")); // Create test repos for each user's account const aliceCar = await createTestRepo(DID_NODE_A, [ { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post", createdAt: new Date().toISOString() } }, { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, ]); const bobCar = await createTestRepo(DID_NODE_B, [ { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post", createdAt: new Date().toISOString() } }, ]); // Mock PDS serves both accounts mockPds = await startEnhancedMockPds([ { did: DID_NODE_A, carBytes: aliceCar }, { did: DID_NODE_B, carBytes: bobCar }, ]); const resolver = createMockDidResolver({ [DID_NODE_A]: mockPds.url, [DID_NODE_B]: mockPds.url, }); // Start two servers — each with their own identity const configA = baseConfig(tmpDirA); const configB = baseConfig(tmpDirB); // Pre-set identities (simulating OAuth already done) const dbA = new Database(resolve(tmpDirA, "pds.db")); dbA.pragma("journal_mode = WAL"); dbA.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_A, "alice.test"); dbA.close(); configA.DID = DID_NODE_A; configA.HANDLE = "alice.test"; const dbB = new Database(resolve(tmpDirB, "pds.db")); dbB.pragma("journal_mode = WAL"); dbB.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_B, "bob.test"); dbB.close(); configB.DID = DID_NODE_B; configB.HANDLE = "bob.test"; handleA = await startServer(configA, { didResolver: resolver }); handleB = await startServer(configB, { didResolver: resolver }); // Both nodes should have replication managers expect(handleA.replicationManager).toBeDefined(); expect(handleB.replicationManager).toBeDefined(); // Node A adds Node B's DID, Node B adds Node A's DID const addBToA = await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, { method: "POST", headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}`, "Content-Type": "application/json" }, body: JSON.stringify({ did: DID_NODE_B }), }); expect(addBToA.status).toBe(200); const addAToB = await fetch(`${handleB.url}/xrpc/org.p2pds.app.addDid`, { method: "POST", headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}`, "Content-Type": "application/json" }, body: JSON.stringify({ did: DID_NODE_A }), }); expect(addAToB.status).toBe(200); // Trigger sync on both await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { method: "POST", headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, }); await fetch(`${handleB.url}/xrpc/org.p2pds.replication.syncNow`, { method: "POST", headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` }, }); // Wait for async sync await new Promise((r) => setTimeout(r, 3000)); // Verify Node A synced Bob's data const statusA = await fetch( `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_NODE_B}`, { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` } }, ); const dsA = (await statusA.json()) as { did: string; blockCount: number; syncState: { status: string } }; expect(dsA.did).toBe(DID_NODE_B); expect(dsA.blockCount).toBeGreaterThan(0); expect(dsA.syncState.status).toBe("synced"); // Verify Node B synced Alice's data const statusB = await fetch( `${handleB.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_NODE_A}`, { headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` } }, ); const dsB = (await statusB.json()) as { did: string; blockCount: number; syncState: { status: string } }; expect(dsB.did).toBe(DID_NODE_A); expect(dsB.blockCount).toBeGreaterThan(0); expect(dsB.syncState.status).toBe("synced"); // ---- Verify sync endpoints serve replicated data ---- // Node A serves Bob's repo via getRepo const getRepoA = await fetch( `${handleA.url}/xrpc/com.atproto.sync.getRepo?did=${DID_NODE_B}`, ); expect(getRepoA.status).toBe(200); expect(getRepoA.headers.get("content-type")).toBe("application/vnd.ipld.car"); const carBytesA = new Uint8Array(await getRepoA.arrayBuffer()); expect(carBytesA.length).toBeGreaterThan(0); // Node B serves Alice's repo via getRepo const getRepoB = await fetch( `${handleB.url}/xrpc/com.atproto.sync.getRepo?did=${DID_NODE_A}`, ); expect(getRepoB.status).toBe(200); const carBytesB = new Uint8Array(await getRepoB.arrayBuffer()); expect(carBytesB.length).toBeGreaterThan(0); // Node A serves Bob's repo via getRepoStatus const repoStatusA = await fetch( `${handleA.url}/xrpc/com.atproto.sync.getRepoStatus?did=${DID_NODE_B}`, ); expect(repoStatusA.status).toBe(200); const rsA = (await repoStatusA.json()) as { did: string; active: boolean; rev: string | null }; expect(rsA.did).toBe(DID_NODE_B); expect(rsA.rev).toBeTruthy(); // Node B lists all repos (should include Alice's) const listReposB = await fetch( `${handleB.url}/xrpc/com.atproto.sync.listRepos`, ); expect(listReposB.status).toBe(200); const reposB = (await listReposB.json()) as { repos: Array<{ did: string }> }; const replicatedDids = reposB.repos.map((r) => r.did); expect(replicatedDids).toContain(DID_NODE_A); // Node A can read Bob's records via repo.getRecord const recordA = await fetch( `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_NODE_B}&collection=app.bsky.feed.post&rkey=b1`, ); expect(recordA.status).toBe(200); const recA = (await recordA.json()) as { uri: string; value: { text: string } }; expect(recA.value.text).toBe("Bob post"); // Node B can read Alice's records via repo.getRecord const recordB = await fetch( `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_NODE_A}&collection=app.bsky.feed.post&rkey=a1`, ); expect(recordB.status).toBe(200); const recB = (await recordB.json()) as { uri: string; value: { text: string } }; expect(recB.value.text).toBe("Alice post"); // Node B can list Alice's records const listRecsB = await fetch( `${handleB.url}/xrpc/com.atproto.repo.listRecords?repo=${DID_NODE_A}&collection=app.bsky.feed.post`, ); expect(listRecsB.status).toBe(200); const recsB = (await listRecsB.json()) as { records: Array<{ value: { text: string } }> }; expect(recsB.records.length).toBe(2); // Node A can describe Bob's repo const describeA = await fetch( `${handleA.url}/xrpc/com.atproto.repo.describeRepo?repo=${DID_NODE_B}`, ); expect(describeA.status).toBe(200); const descA = (await describeA.json()) as { did: string; collections: string[] }; expect(descA.did).toBe(DID_NODE_B); expect(descA.collections).toContain("app.bsky.feed.post"); }, 30_000); it("mutual offers create P2P replication policies", async () => { tmpDirA = mkdtempSync(join(tmpdir(), "bidir-offer-a-")); tmpDirB = mkdtempSync(join(tmpdir(), "bidir-offer-b-")); // Create minimal repos const aliceCar = await createTestRepo(DID_NODE_A, [ { collection: "app.bsky.feed.post", rkey: "x1", record: { text: "test", createdAt: new Date().toISOString() } }, ]); const bobCar = await createTestRepo(DID_NODE_B, [ { collection: "app.bsky.feed.post", rkey: "y1", record: { text: "test", createdAt: new Date().toISOString() } }, ]); // Mock PDS with offer records mockPds = await startEnhancedMockPds([ { did: DID_NODE_A, carBytes: aliceCar }, { did: DID_NODE_B, carBytes: bobCar }, ]); // Simulate both users having published offers for each other // Node A's user published an offer for Node B's DID mockPds.addRecord(DID_NODE_A, OFFER_NSID, didToRkey(DID_NODE_B), { $type: OFFER_NSID, subject: DID_NODE_B, minCopies: 2, intervalSec: 300, priority: 50, createdAt: new Date().toISOString(), }); // Node B's user published an offer for Node A's DID mockPds.addRecord(DID_NODE_B, OFFER_NSID, didToRkey(DID_NODE_A), { $type: OFFER_NSID, subject: DID_NODE_A, minCopies: 3, intervalSec: 600, priority: 75, createdAt: new Date().toISOString(), }); const resolver = createMockDidResolver({ [DID_NODE_A]: mockPds.url, [DID_NODE_B]: mockPds.url, }); // Create configs with policy engines and identities pre-set const configA = baseConfig(tmpDirA); configA.DID = DID_NODE_A; configA.HANDLE = "alice.test"; const dbA = new Database(resolve(tmpDirA, "pds.db")); dbA.pragma("journal_mode = WAL"); dbA.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_A, "alice.test"); dbA.close(); const configB = baseConfig(tmpDirB); configB.DID = DID_NODE_B; configB.HANDLE = "bob.test"; const dbB = new Database(resolve(tmpDirB, "pds.db")); dbB.pragma("journal_mode = WAL"); dbB.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_B, "bob.test"); dbB.close(); // We need to inject PolicyEngine + a mock RecordWriter into the ReplicationManager // The simplest approach: start the servers, then use the OfferManager directly handleA = await startServer(configA, { didResolver: resolver }); handleB = await startServer(configB, { didResolver: resolver }); const rmA = handleA.replicationManager!; const rmB = handleB.replicationManager!; // Create mock RecordWriters that use the mock PDS records store // (In production this would be the PdsClient via OAuth) const mockWriterA = createMockRecordWriter(DID_NODE_A, mockPds); const mockWriterB = createMockRecordWriter(DID_NODE_B, mockPds); // Inject policy engine and setPdsClient const peA = new PolicyEngine({ version: 1, policies: [] }); const peB = new PolicyEngine({ version: 1, policies: [] }); // Access private field to set policy engine — test-only hack (rmA as unknown as { policyEngine: PolicyEngine }).policyEngine = peA; (rmB as unknown as { policyEngine: PolicyEngine }).policyEngine = peB; rmA.setPdsClient(mockWriterA, DID_NODE_A); rmB.setPdsClient(mockWriterB, DID_NODE_B); // Both nodes add each other's DID await rmA.addDid(DID_NODE_B); await rmB.addDid(DID_NODE_A); // Wait for initial sync await new Promise((r) => setTimeout(r, 3000)); // Now run offer discovery on both const offerManagerA = rmA.getOfferManager(); const offerManagerB = rmB.getOfferManager(); expect(offerManagerA).toBeDefined(); expect(offerManagerB).toBeDefined(); // Node A discovers agreements: it should find that Node B has an offer for Node A const statesA = rmA.getSyncStates(); const peersA = statesA.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); const agreementsA = await offerManagerA!.discoverAndSync(peersA); // Node B discovers agreements similarly const statesB = rmB.getSyncStates(); const peersB = statesB.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); const agreementsB = await offerManagerB!.discoverAndSync(peersB); // Both should detect one mutual agreement expect(agreementsA.length).toBe(1); expect(agreementsA[0]!.counterpartyDid).toBe(DID_NODE_B); expect(agreementsB.length).toBe(1); expect(agreementsB[0]!.counterpartyDid).toBe(DID_NODE_A); // Verify effective params: max(minCopies), min(intervalSec), max(priority) expect(agreementsA[0]!.effectiveParams.minCopies).toBe(3); // max(2, 3) expect(agreementsA[0]!.effectiveParams.intervalSec).toBe(300); // min(300, 600) expect(agreementsA[0]!.effectiveParams.priority).toBe(75); // max(50, 75) // Verify P2P policies were created in the policy engine const p2pPolicyA = peA.getPolicies().find((p) => p.id === `p2p:${DID_NODE_B}`); expect(p2pPolicyA).toBeDefined(); expect(p2pPolicyA!.replication.minCopies).toBe(3); const p2pPolicyB = peB.getPolicies().find((p) => p.id === `p2p:${DID_NODE_A}`); expect(p2pPolicyB).toBeDefined(); }, 30_000); }); /** * Create a mock RecordWriter backed by the enhanced mock PDS. * This simulates what PdsClient does: read/write records to the user's PDS. */ function createMockRecordWriter(did: string, pds: EnhancedMockPds) { return { putRecord: async (collection: string, rkey: string, record: unknown) => { pds.addRecord(did, collection, rkey, record); return { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest" }; }, deleteRecord: async (_collection: string, _rkey: string) => { // No-op for test }, listRecords: async (collection: string, _opts: { limit: number }) => { // Fetch from mock PDS const res = await fetch( `${pds.url}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&limit=100`, ); if (!res.ok) return { records: [] }; return (await res.json()) as { records: Array<{ uri: string; cid: string; value: unknown }> }; }, }; }