/** * Two-node DID-less startup test. * * Two clean p2pds servers start without any DID. Each establishes identity * (simulating OAuth), then each replicates a different external account * from mock PDSes. Verifies the full flow: startup → identity → add DID → sync. */ import { describe, it, expect, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join, resolve } from "node:path"; import Database from "better-sqlite3"; import type { Config } from "./config.js"; import { startServer, type ServerHandle } from "./start.js"; import { createTestRepo, startMockPds, createMockDidResolver, type MockPds, } from "./replication/test-helpers.js"; const DID_ALICE = "did:plc:alice111"; const DID_BOB = "did:plc:bob222"; function didlessConfig(dataDir: string): Config { return { PDS_HOSTNAME: "local.test", AUTH_TOKEN: "test-auth-token", JWT_SECRET: "test-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: 0, IPFS_ENABLED: true, IPFS_NETWORKING: false, REPLICATE_DIDS: [], FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, RATE_LIMIT_ENABLED: false, RATE_LIMIT_READ_PER_MIN: 300, RATE_LIMIT_SYNC_PER_MIN: 30, RATE_LIMIT_SESSION_PER_MIN: 10, RATE_LIMIT_WRITE_PER_MIN: 200, RATE_LIMIT_CHALLENGE_PER_MIN: 20, RATE_LIMIT_MAX_CONNECTIONS: 100, RATE_LIMIT_FIREHOSE_PER_IP: 3, OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", }; } describe("Two-node DID-less replication", () => { let tmpDirA: string; let tmpDirB: string; let handleA: ServerHandle | undefined; let handleB: ServerHandle | undefined; let mockPds: MockPds | undefined; afterEach(async () => { if (handleA) { await handleA.close(); handleA = undefined; } if (handleB) { await handleB.close(); handleB = undefined; } if (mockPds) { await mockPds.close(); mockPds = undefined; } if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); }); it("two clean nodes establish identity and replicate different accounts", async () => { tmpDirA = mkdtempSync(join(tmpdir(), "two-node-a-")); tmpDirB = mkdtempSync(join(tmpdir(), "two-node-b-")); // Create test repos for two external accounts const aliceCar = await createTestRepo(DID_ALICE, [ { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post 1", createdAt: new Date().toISOString() } }, { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, ]); const bobCar = await createTestRepo(DID_BOB, [ { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post 1", createdAt: new Date().toISOString() } }, ]); // Mock PDS serves both accounts mockPds = await startMockPds([ { did: DID_ALICE, carBytes: aliceCar }, { did: DID_BOB, carBytes: bobCar }, ]); const mockResolver = createMockDidResolver({ [DID_ALICE]: mockPds.url, [DID_BOB]: mockPds.url, }); // Start two clean servers — no DID, no signing key const configA = didlessConfig(tmpDirA); const configB = didlessConfig(tmpDirB); handleA = await startServer(configA, { didResolver: mockResolver }); handleB = await startServer(configB, { didResolver: mockResolver }); // Both should be healthy without DID const healthA = await fetch(`${handleA.url}/xrpc/_health`); const healthB = await fetch(`${handleB.url}/xrpc/_health`); expect(healthA.status).toBe(200); expect(healthB.status).toBe(200); // Both should have replication managers expect(handleA.replicationManager).toBeDefined(); expect(handleB.replicationManager).toBeDefined(); // Simulate OAuth identity establishment on each node // (In production this happens in the OAuth callback) const dbA = new Database(resolve(tmpDirA, "pds.db")); dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( "did:plc:node-a-identity", "node-a.test", ); configA.DID = "did:plc:node-a-identity"; configA.HANDLE = "node-a.test"; dbA.close(); const dbB = new Database(resolve(tmpDirB, "pds.db")); dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( "did:plc:node-b-identity", "node-b.test", ); configB.DID = "did:plc:node-b-identity"; configB.HANDLE = "node-b.test"; dbB.close(); // Verify identity shows up in overview const overviewA = await fetch(`${handleA.url}/xrpc/org.p2pds.app.getOverview`, { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, }); expect(overviewA.status).toBe(200); const ovA = (await overviewA.json()) as { did: string }; expect(ovA.did).toBe("did:plc:node-a-identity"); // Node A replicates Alice's account const addAlice = await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, { method: "POST", headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}`, "Content-Type": "application/json", }, body: JSON.stringify({ did: DID_ALICE }), }); expect(addAlice.status).toBe(200); // Node B replicates Bob's account const addBob = await fetch(`${handleB.url}/xrpc/org.p2pds.app.addDid`, { method: "POST", headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}`, "Content-Type": "application/json", }, body: JSON.stringify({ did: DID_BOB }), }); expect(addBob.status).toBe(200); // Trigger sync on both await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { method: "POST", headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, }); await fetch(`${handleB.url}/xrpc/org.p2pds.replication.syncNow`, { method: "POST", headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` }, }); // Wait for async sync await new Promise((r) => setTimeout(r, 3000)); // Verify Node A synced Alice's data const statusA = await fetch( `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`, { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` } }, ); expect(statusA.status).toBe(200); const didStatusA = (await statusA.json()) as { did: string; blockCount: number; syncState: { status: string }; }; expect(didStatusA.did).toBe(DID_ALICE); expect(didStatusA.blockCount).toBeGreaterThan(0); expect(didStatusA.syncState.status).toBe("synced"); // Verify Node B synced Bob's data const statusB = await fetch( `${handleB.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_BOB}`, { headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` } }, ); expect(statusB.status).toBe(200); const didStatusB = (await statusB.json()) as { did: string; blockCount: number; syncState: { status: string }; }; expect(didStatusB.did).toBe(DID_BOB); expect(didStatusB.blockCount).toBeGreaterThan(0); expect(didStatusB.syncState.status).toBe("synced"); }, 30_000); it("identity persists across restart and replication continues", async () => { tmpDirA = mkdtempSync(join(tmpdir(), "two-node-restart-")); tmpDirB = ""; // not used // Create test repo const aliceCar = await createTestRepo(DID_ALICE, [ { collection: "app.bsky.feed.post", rkey: "r1", record: { text: "restart test", createdAt: new Date().toISOString() } }, ]); mockPds = await startMockPds([{ did: DID_ALICE, carBytes: aliceCar }]); const mockResolver = createMockDidResolver({ [DID_ALICE]: mockPds.url }); // First boot: start clean, establish identity, add DID, sync const config1 = didlessConfig(tmpDirA); handleA = await startServer(config1, { didResolver: mockResolver }); // Simulate identity establishment const db1 = new Database(resolve(tmpDirA, "pds.db")); db1.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( "did:plc:persistent-node", "persistent.test", ); config1.DID = "did:plc:persistent-node"; db1.close(); // Add and sync await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, { method: "POST", headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}`, "Content-Type": "application/json", }, body: JSON.stringify({ did: DID_ALICE }), }); await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { method: "POST", headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}` }, }); await new Promise((r) => setTimeout(r, 2000)); // Verify sync worked const status1 = await fetch( `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`, { headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}` } }, ); const ds1 = (await status1.json()) as { blockCount: number }; expect(ds1.blockCount).toBeGreaterThan(0); // Shut down await handleA.close(); handleA = undefined; // Second boot: restart with same data dir — identity should load automatically const config2 = didlessConfig(tmpDirA); expect(config2.DID).toBeUndefined(); // not set in config handleA = await startServer(config2, { didResolver: mockResolver }); // Identity should have been loaded from node_identity expect(config2.DID).toBe("did:plc:persistent-node"); expect(config2.HANDLE).toBe("persistent.test"); // Replication state should persist — DID_ALICE is still tracked const overview = await fetch(`${handleA.url}/xrpc/org.p2pds.app.getOverview`, { headers: { Authorization: `Bearer ${config2.AUTH_TOKEN}` }, }); const ov = (await overview.json()) as { did: string; replication: { trackedDids: string[] }; }; expect(ov.did).toBe("did:plc:persistent-node"); expect(ov.replication.trackedDids).toContain(DID_ALICE); // Blocks should still be in IPFS (persisted to disk) const status2 = await fetch( `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`, { headers: { Authorization: `Bearer ${config2.AUTH_TOKEN}` } }, ); const ds2 = (await status2.json()) as { blockCount: number; syncState: { status: string }; }; expect(ds2.blockCount).toBeGreaterThan(0); expect(ds2.syncState.status).toBe("synced"); }, 30_000); });