atproto user agency toolkit for individuals and groups
1/**
2 * Bidirectional replication E2E test.
3 *
4 * Two p2pds nodes, each with a simulated user account on a mock PDS.
5 * Each node tracks the other's DID, syncs data, publishes offers,
6 * discovers mutual agreements, and serves replicated data via sync endpoints.
7 */
8
9import { describe, it, expect, afterEach } from "vitest";
10import { mkdtempSync, rmSync } from "node:fs";
11import { tmpdir } from "node:os";
12import { join, resolve } from "node:path";
13import Database from "better-sqlite3";
14
15import type { Config } from "./config.js";
16import { startServer, type ServerHandle } from "./start.js";
17import {
18 createTestRepo,
19 startMockPds,
20 type MockPds,
21} from "./replication/test-helpers.js";
22import type { DidResolver, DidDocument } from "./did-resolver.js";
23import { PolicyEngine } from "./policy/engine.js";
24import { OFFER_NSID, didToRkey } from "./replication/types.js";
25
26/** DID for Node A's user identity. */
27const DID_NODE_A = "did:plc:node-a-user";
28/** DID for Node B's user identity. */
29const DID_NODE_B = "did:plc:node-b-user";
30
31function baseConfig(dataDir: string): Config {
32 return {
33 PDS_HOSTNAME: "local.test",
34 AUTH_TOKEN: "test-auth-token",
35 JWT_SECRET: "test-jwt-secret",
36 PASSWORD_HASH: "$2a$10$test",
37 DATA_DIR: dataDir,
38 PORT: 0,
39 IPFS_ENABLED: true,
40 IPFS_NETWORKING: false,
41 REPLICATE_DIDS: [],
42 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
43 FIREHOSE_ENABLED: false,
44 RATE_LIMIT_ENABLED: false,
45 RATE_LIMIT_READ_PER_MIN: 300,
46 RATE_LIMIT_SYNC_PER_MIN: 30,
47 RATE_LIMIT_SESSION_PER_MIN: 10,
48 RATE_LIMIT_WRITE_PER_MIN: 200,
49 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
50 RATE_LIMIT_MAX_CONNECTIONS: 100,
51 RATE_LIMIT_FIREHOSE_PER_IP: 3,
52 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
53 };
54}
55
56/**
57 * Enhanced mock PDS that serves configurable records per DID per collection.
58 * Supports adding offer records dynamically to simulate users publishing offers.
59 */
60interface EnhancedMockPds extends MockPds {
61 addRecord(did: string, collection: string, rkey: string, value: unknown): void;
62}
63
64async function startEnhancedMockPds(
65 accounts: Array<{ did: string; carBytes: Uint8Array }>,
66): Promise<EnhancedMockPds> {
67 const { createServer } = await import("node:http");
68
69 const accountMap = new Map<string, Uint8Array>();
70 for (const a of accounts) {
71 accountMap.set(a.did, a.carBytes);
72 }
73
74 // Records store: did -> collection -> rkey -> { uri, cid, value }
75 const records = new Map<string, Map<string, Map<string, { uri: string; cid: string; value: unknown }>>>();
76
77 const server = createServer((req, res) => {
78 const url = new URL(req.url ?? "/", "http://localhost");
79 const pathname = url.pathname;
80
81 if (pathname === "/xrpc/com.atproto.sync.getRepo") {
82 const did = url.searchParams.get("did");
83 if (!did || !accountMap.has(did)) {
84 res.writeHead(404, { "Content-Type": "application/json" });
85 res.end(JSON.stringify({ error: "RepoNotFound" }));
86 return;
87 }
88 const carBytes = accountMap.get(did)!;
89 res.writeHead(200, {
90 "Content-Type": "application/vnd.ipld.car",
91 "Content-Length": String(carBytes.length),
92 });
93 res.end(Buffer.from(carBytes));
94 return;
95 }
96
97 if (pathname === "/xrpc/com.atproto.repo.listRecords") {
98 const did = url.searchParams.get("repo");
99 const collection = url.searchParams.get("collection");
100 if (!did || !collection) {
101 res.writeHead(400, { "Content-Type": "application/json" });
102 res.end(JSON.stringify({ error: "InvalidRequest" }));
103 return;
104 }
105 const didRecords = records.get(did)?.get(collection);
106 const recordList = didRecords ? Array.from(didRecords.values()) : [];
107 res.writeHead(200, { "Content-Type": "application/json" });
108 res.end(JSON.stringify({ records: recordList }));
109 return;
110 }
111
112 if (pathname === "/xrpc/com.atproto.repo.getRecord") {
113 const did = url.searchParams.get("repo");
114 const collection = url.searchParams.get("collection");
115 const rkey = url.searchParams.get("rkey");
116 if (!did || !collection || !rkey) {
117 res.writeHead(400, { "Content-Type": "application/json" });
118 res.end(JSON.stringify({ error: "InvalidRequest" }));
119 return;
120 }
121 const record = records.get(did)?.get(collection)?.get(rkey);
122 if (!record) {
123 res.writeHead(404, { "Content-Type": "application/json" });
124 res.end(JSON.stringify({ error: "RecordNotFound" }));
125 return;
126 }
127 res.writeHead(200, { "Content-Type": "application/json" });
128 res.end(JSON.stringify(record));
129 return;
130 }
131
132 res.writeHead(404, { "Content-Type": "application/json" });
133 res.end(JSON.stringify({ error: "NotFound" }));
134 });
135
136 return new Promise((resolve) => {
137 server.listen(0, "127.0.0.1", () => {
138 const addr = server.address() as { port: number };
139 const pdsUrl = `http://127.0.0.1:${addr.port}`;
140 resolve({
141 url: pdsUrl,
142 port: addr.port,
143 close: () => new Promise<void>((res) => server.close(() => res())),
144 updateAccount: (did: string, carBytes: Uint8Array) => {
145 accountMap.set(did, carBytes);
146 },
147 addRecord: (did: string, collection: string, rkey: string, value: unknown) => {
148 if (!records.has(did)) records.set(did, new Map());
149 const didMap = records.get(did)!;
150 if (!didMap.has(collection)) didMap.set(collection, new Map());
151 didMap.get(collection)!.set(rkey, {
152 uri: `at://${did}/${collection}/${rkey}`,
153 cid: "bafytest",
154 value,
155 });
156 },
157 });
158 });
159 });
160}
161
162function createMockDidResolver(mapping: Record<string, string>): DidResolver {
163 return {
164 resolve: async (did: string): Promise<DidDocument | null> => {
165 const pdsUrl = mapping[did];
166 if (!pdsUrl) return null;
167 return {
168 id: did,
169 service: [
170 {
171 id: "#atproto_pds",
172 type: "AtprotoPersonalDataServer",
173 serviceEndpoint: pdsUrl,
174 },
175 ],
176 } as unknown as DidDocument;
177 },
178 } as DidResolver;
179}
180
181describe("Bidirectional replication E2E", () => {
182 let tmpDirA: string;
183 let tmpDirB: string;
184 let handleA: ServerHandle | undefined;
185 let handleB: ServerHandle | undefined;
186 let mockPds: EnhancedMockPds | undefined;
187
188 afterEach(async () => {
189 if (handleA) { await handleA.close(); handleA = undefined; }
190 if (handleB) { await handleB.close(); handleB = undefined; }
191 if (mockPds) { await mockPds.close(); mockPds = undefined; }
192 if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true });
193 if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true });
194 });
195
196 it("two nodes sync each other's data and serve it via sync endpoints", async () => {
197 tmpDirA = mkdtempSync(join(tmpdir(), "bidir-a-"));
198 tmpDirB = mkdtempSync(join(tmpdir(), "bidir-b-"));
199
200 // Create test repos for each user's account
201 const aliceCar = await createTestRepo(DID_NODE_A, [
202 { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post", createdAt: new Date().toISOString() } },
203 { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } },
204 ]);
205 const bobCar = await createTestRepo(DID_NODE_B, [
206 { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post", createdAt: new Date().toISOString() } },
207 ]);
208
209 // Mock PDS serves both accounts
210 mockPds = await startEnhancedMockPds([
211 { did: DID_NODE_A, carBytes: aliceCar },
212 { did: DID_NODE_B, carBytes: bobCar },
213 ]);
214 const resolver = createMockDidResolver({
215 [DID_NODE_A]: mockPds.url,
216 [DID_NODE_B]: mockPds.url,
217 });
218
219 // Start two servers — each with their own identity
220 const configA = baseConfig(tmpDirA);
221 const configB = baseConfig(tmpDirB);
222
223 // Pre-set identities (simulating OAuth already done)
224 const dbA = new Database(resolve(tmpDirA, "pds.db"));
225 dbA.pragma("journal_mode = WAL");
226 dbA.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))");
227 dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_A, "alice.test");
228 dbA.close();
229 configA.DID = DID_NODE_A;
230 configA.HANDLE = "alice.test";
231
232 const dbB = new Database(resolve(tmpDirB, "pds.db"));
233 dbB.pragma("journal_mode = WAL");
234 dbB.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))");
235 dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_B, "bob.test");
236 dbB.close();
237 configB.DID = DID_NODE_B;
238 configB.HANDLE = "bob.test";
239
240 handleA = await startServer(configA, { didResolver: resolver });
241 handleB = await startServer(configB, { didResolver: resolver });
242
243 // Both nodes should have replication managers
244 expect(handleA.replicationManager).toBeDefined();
245 expect(handleB.replicationManager).toBeDefined();
246
247 // Node A adds Node B's DID, Node B adds Node A's DID
248 const addBToA = await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, {
249 method: "POST",
250 headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}`, "Content-Type": "application/json" },
251 body: JSON.stringify({ did: DID_NODE_B }),
252 });
253 expect(addBToA.status).toBe(200);
254
255 const addAToB = await fetch(`${handleB.url}/xrpc/org.p2pds.app.addDid`, {
256 method: "POST",
257 headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}`, "Content-Type": "application/json" },
258 body: JSON.stringify({ did: DID_NODE_A }),
259 });
260 expect(addAToB.status).toBe(200);
261
262 // Trigger sync on both
263 await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, {
264 method: "POST",
265 headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` },
266 });
267 await fetch(`${handleB.url}/xrpc/org.p2pds.replication.syncNow`, {
268 method: "POST",
269 headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` },
270 });
271
272 // Wait for async sync
273 await new Promise((r) => setTimeout(r, 3000));
274
275 // Verify Node A synced Bob's data
276 const statusA = await fetch(
277 `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_NODE_B}`,
278 { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` } },
279 );
280 const dsA = (await statusA.json()) as { did: string; blockCount: number; syncState: { status: string } };
281 expect(dsA.did).toBe(DID_NODE_B);
282 expect(dsA.blockCount).toBeGreaterThan(0);
283 expect(dsA.syncState.status).toBe("synced");
284
285 // Verify Node B synced Alice's data
286 const statusB = await fetch(
287 `${handleB.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_NODE_A}`,
288 { headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` } },
289 );
290 const dsB = (await statusB.json()) as { did: string; blockCount: number; syncState: { status: string } };
291 expect(dsB.did).toBe(DID_NODE_A);
292 expect(dsB.blockCount).toBeGreaterThan(0);
293 expect(dsB.syncState.status).toBe("synced");
294
295 // ---- Verify sync endpoints serve replicated data ----
296
297 // Node A serves Bob's repo via getRepo
298 const getRepoA = await fetch(
299 `${handleA.url}/xrpc/com.atproto.sync.getRepo?did=${DID_NODE_B}`,
300 );
301 expect(getRepoA.status).toBe(200);
302 expect(getRepoA.headers.get("content-type")).toBe("application/vnd.ipld.car");
303 const carBytesA = new Uint8Array(await getRepoA.arrayBuffer());
304 expect(carBytesA.length).toBeGreaterThan(0);
305
306 // Node B serves Alice's repo via getRepo
307 const getRepoB = await fetch(
308 `${handleB.url}/xrpc/com.atproto.sync.getRepo?did=${DID_NODE_A}`,
309 );
310 expect(getRepoB.status).toBe(200);
311 const carBytesB = new Uint8Array(await getRepoB.arrayBuffer());
312 expect(carBytesB.length).toBeGreaterThan(0);
313
314 // Node A serves Bob's repo via getRepoStatus
315 const repoStatusA = await fetch(
316 `${handleA.url}/xrpc/com.atproto.sync.getRepoStatus?did=${DID_NODE_B}`,
317 );
318 expect(repoStatusA.status).toBe(200);
319 const rsA = (await repoStatusA.json()) as { did: string; active: boolean; rev: string | null };
320 expect(rsA.did).toBe(DID_NODE_B);
321 expect(rsA.rev).toBeTruthy();
322
323 // Node B lists all repos (should include Alice's)
324 const listReposB = await fetch(
325 `${handleB.url}/xrpc/com.atproto.sync.listRepos`,
326 );
327 expect(listReposB.status).toBe(200);
328 const reposB = (await listReposB.json()) as { repos: Array<{ did: string }> };
329 const replicatedDids = reposB.repos.map((r) => r.did);
330 expect(replicatedDids).toContain(DID_NODE_A);
331
332 // Node A can read Bob's records via repo.getRecord
333 const recordA = await fetch(
334 `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_NODE_B}&collection=app.bsky.feed.post&rkey=b1`,
335 );
336 expect(recordA.status).toBe(200);
337 const recA = (await recordA.json()) as { uri: string; value: { text: string } };
338 expect(recA.value.text).toBe("Bob post");
339
340 // Node B can read Alice's records via repo.getRecord
341 const recordB = await fetch(
342 `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_NODE_A}&collection=app.bsky.feed.post&rkey=a1`,
343 );
344 expect(recordB.status).toBe(200);
345 const recB = (await recordB.json()) as { uri: string; value: { text: string } };
346 expect(recB.value.text).toBe("Alice post");
347
348 // Node B can list Alice's records
349 const listRecsB = await fetch(
350 `${handleB.url}/xrpc/com.atproto.repo.listRecords?repo=${DID_NODE_A}&collection=app.bsky.feed.post`,
351 );
352 expect(listRecsB.status).toBe(200);
353 const recsB = (await listRecsB.json()) as { records: Array<{ value: { text: string } }> };
354 expect(recsB.records.length).toBe(2);
355
356 // Node A can describe Bob's repo
357 const describeA = await fetch(
358 `${handleA.url}/xrpc/com.atproto.repo.describeRepo?repo=${DID_NODE_B}`,
359 );
360 expect(describeA.status).toBe(200);
361 const descA = (await describeA.json()) as { did: string; collections: string[] };
362 expect(descA.did).toBe(DID_NODE_B);
363 expect(descA.collections).toContain("app.bsky.feed.post");
364 }, 30_000);
365
366 it("mutual offers create P2P replication policies", async () => {
367 tmpDirA = mkdtempSync(join(tmpdir(), "bidir-offer-a-"));
368 tmpDirB = mkdtempSync(join(tmpdir(), "bidir-offer-b-"));
369
370 // Create minimal repos
371 const aliceCar = await createTestRepo(DID_NODE_A, [
372 { collection: "app.bsky.feed.post", rkey: "x1", record: { text: "test", createdAt: new Date().toISOString() } },
373 ]);
374 const bobCar = await createTestRepo(DID_NODE_B, [
375 { collection: "app.bsky.feed.post", rkey: "y1", record: { text: "test", createdAt: new Date().toISOString() } },
376 ]);
377
378 // Mock PDS with offer records
379 mockPds = await startEnhancedMockPds([
380 { did: DID_NODE_A, carBytes: aliceCar },
381 { did: DID_NODE_B, carBytes: bobCar },
382 ]);
383
384 // Simulate both users having published offers for each other
385 // Node A's user published an offer for Node B's DID
386 mockPds.addRecord(DID_NODE_A, OFFER_NSID, didToRkey(DID_NODE_B), {
387 $type: OFFER_NSID,
388 subject: DID_NODE_B,
389 minCopies: 2,
390 intervalSec: 300,
391 priority: 50,
392 createdAt: new Date().toISOString(),
393 });
394 // Node B's user published an offer for Node A's DID
395 mockPds.addRecord(DID_NODE_B, OFFER_NSID, didToRkey(DID_NODE_A), {
396 $type: OFFER_NSID,
397 subject: DID_NODE_A,
398 minCopies: 3,
399 intervalSec: 600,
400 priority: 75,
401 createdAt: new Date().toISOString(),
402 });
403
404 const resolver = createMockDidResolver({
405 [DID_NODE_A]: mockPds.url,
406 [DID_NODE_B]: mockPds.url,
407 });
408
409 // Create configs with policy engines and identities pre-set
410 const configA = baseConfig(tmpDirA);
411 configA.DID = DID_NODE_A;
412 configA.HANDLE = "alice.test";
413 const dbA = new Database(resolve(tmpDirA, "pds.db"));
414 dbA.pragma("journal_mode = WAL");
415 dbA.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))");
416 dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_A, "alice.test");
417 dbA.close();
418
419 const configB = baseConfig(tmpDirB);
420 configB.DID = DID_NODE_B;
421 configB.HANDLE = "bob.test";
422 const dbB = new Database(resolve(tmpDirB, "pds.db"));
423 dbB.pragma("journal_mode = WAL");
424 dbB.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))");
425 dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_B, "bob.test");
426 dbB.close();
427
428 // We need to inject PolicyEngine + a mock RecordWriter into the ReplicationManager
429 // The simplest approach: start the servers, then use the OfferManager directly
430 handleA = await startServer(configA, { didResolver: resolver });
431 handleB = await startServer(configB, { didResolver: resolver });
432
433 const rmA = handleA.replicationManager!;
434 const rmB = handleB.replicationManager!;
435
436 // Create mock RecordWriters that use the mock PDS records store
437 // (In production this would be the PdsClient via OAuth)
438 const mockWriterA = createMockRecordWriter(DID_NODE_A, mockPds);
439 const mockWriterB = createMockRecordWriter(DID_NODE_B, mockPds);
440
441 // Inject policy engine and setPdsClient
442 const peA = new PolicyEngine({ version: 1, policies: [] });
443 const peB = new PolicyEngine({ version: 1, policies: [] });
444 // Access private field to set policy engine — test-only hack
445 (rmA as unknown as { policyEngine: PolicyEngine }).policyEngine = peA;
446 (rmB as unknown as { policyEngine: PolicyEngine }).policyEngine = peB;
447 rmA.setPdsClient(mockWriterA, DID_NODE_A);
448 rmB.setPdsClient(mockWriterB, DID_NODE_B);
449
450 // Both nodes add each other's DID
451 await rmA.addDid(DID_NODE_B);
452 await rmB.addDid(DID_NODE_A);
453
454 // Wait for initial sync
455 await new Promise((r) => setTimeout(r, 3000));
456
457 // Now run offer discovery on both
458 const offerManagerA = rmA.getOfferManager();
459 const offerManagerB = rmB.getOfferManager();
460 expect(offerManagerA).toBeDefined();
461 expect(offerManagerB).toBeDefined();
462
463 // Node A discovers agreements: it should find that Node B has an offer for Node A
464 const statesA = rmA.getSyncStates();
465 const peersA = statesA.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint }));
466 const agreementsA = await offerManagerA!.discoverAndSync(peersA);
467
468 // Node B discovers agreements similarly
469 const statesB = rmB.getSyncStates();
470 const peersB = statesB.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint }));
471 const agreementsB = await offerManagerB!.discoverAndSync(peersB);
472
473 // Both should detect one mutual agreement
474 expect(agreementsA.length).toBe(1);
475 expect(agreementsA[0]!.counterpartyDid).toBe(DID_NODE_B);
476 expect(agreementsB.length).toBe(1);
477 expect(agreementsB[0]!.counterpartyDid).toBe(DID_NODE_A);
478
479 // Verify effective params: max(minCopies), min(intervalSec), max(priority)
480 expect(agreementsA[0]!.effectiveParams.minCopies).toBe(3); // max(2, 3)
481 expect(agreementsA[0]!.effectiveParams.intervalSec).toBe(300); // min(300, 600)
482 expect(agreementsA[0]!.effectiveParams.priority).toBe(75); // max(50, 75)
483
484 // Verify P2P policies were created in the policy engine
485 const p2pPolicyA = peA.getPolicies().find((p) => p.id === `p2p:${DID_NODE_B}`);
486 expect(p2pPolicyA).toBeDefined();
487 expect(p2pPolicyA!.replication.minCopies).toBe(3);
488
489 const p2pPolicyB = peB.getPolicies().find((p) => p.id === `p2p:${DID_NODE_A}`);
490 expect(p2pPolicyB).toBeDefined();
491 }, 30_000);
492});
493
494/**
495 * Create a mock RecordWriter backed by the enhanced mock PDS.
496 * This simulates what PdsClient does: read/write records to the user's PDS.
497 */
498function createMockRecordWriter(did: string, pds: EnhancedMockPds) {
499 return {
500 putRecord: async (collection: string, rkey: string, record: unknown) => {
501 pds.addRecord(did, collection, rkey, record);
502 return { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest" };
503 },
504 deleteRecord: async (_collection: string, _rkey: string) => {
505 // No-op for test
506 },
507 listRecords: async (collection: string, _opts: { limit: number }) => {
508 // Fetch from mock PDS
509 const res = await fetch(
510 `${pds.url}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&limit=100`,
511 );
512 if (!res.ok) return { records: [] };
513 return (await res.json()) as { records: Array<{ uri: string; cid: string; value: unknown }> };
514 },
515 };
516}