atproto user agency toolkit for individuals and groups
1/**
2 * Capstone E2E test: full on-protocol bidirectional replication.
3 *
4 * Two startServer() instances with IPFS_NETWORKING: true, real libp2p,
5 * CAR-over-libp2p protocol. Exercises the complete loop:
6 * self-sync -> peer dial -> cross-sync via libp2p -> XRPC serving ->
7 * incremental re-sync -> mutual offers -> auto-policies
8 */
9
10import { describe, it, expect, afterEach } from "vitest";
11import { mkdtempSync, rmSync } from "node:fs";
12import { tmpdir } from "node:os";
13import { join, resolve } from "node:path";
14import Database from "better-sqlite3";
15
16import type { Config } from "./config.js";
17import { startServer, type ServerHandle } from "./start.js";
18import {
19 createTestRepo,
20 createTestRepoWithUpdate,
21} from "./replication/test-helpers.js";
22import type { DidResolver, DidDocument } from "./did-resolver.js";
23import { PolicyEngine } from "./policy/engine.js";
24import { OFFER_NSID, didToRkey } from "./replication/types.js";
25
26const DID_A = "did:plc:capstone-alice";
27const DID_B = "did:plc:capstone-bob";
28
29function baseConfig(dataDir: string): Config {
30 return {
31 PDS_HOSTNAME: "local.test",
32 AUTH_TOKEN: "test-auth-token",
33 JWT_SECRET: "test-jwt-secret",
34 PASSWORD_HASH: "$2a$10$test",
35 DATA_DIR: dataDir,
36 PORT: 0,
37 IPFS_ENABLED: true,
38 IPFS_NETWORKING: true,
39 REPLICATE_DIDS: [],
40 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
41 FIREHOSE_ENABLED: false,
42 RATE_LIMIT_ENABLED: false,
43 RATE_LIMIT_READ_PER_MIN: 300,
44 RATE_LIMIT_SYNC_PER_MIN: 30,
45 RATE_LIMIT_SESSION_PER_MIN: 10,
46 RATE_LIMIT_WRITE_PER_MIN: 200,
47 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
48 RATE_LIMIT_MAX_CONNECTIONS: 100,
49 RATE_LIMIT_FIREHOSE_PER_IP: 3,
50 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
51 };
52}
53
54// ---- Enhanced mock PDS ----
55
56interface EnhancedMockPds {
57 url: string;
58 port: number;
59 close: () => Promise<void>;
60 updateAccount: (did: string, carBytes: Uint8Array) => void;
61 addRecord: (did: string, collection: string, rkey: string, value: unknown) => void;
62}
63
64async function startEnhancedMockPds(
65 accounts: Array<{ did: string; carBytes: Uint8Array }>,
66): Promise<EnhancedMockPds> {
67 const { createServer } = await import("node:http");
68
69 const accountMap = new Map<string, Uint8Array>();
70 for (const a of accounts) {
71 accountMap.set(a.did, a.carBytes);
72 }
73
74 // Records: did -> collection -> rkey -> { uri, cid, value }
75 const records = new Map<
76 string,
77 Map<string, Map<string, { uri: string; cid: string; value: unknown }>>
78 >();
79
80 const server = createServer((req, res) => {
81 const url = new URL(req.url ?? "/", "http://localhost");
82 const pathname = url.pathname;
83
84 if (pathname === "/xrpc/com.atproto.sync.getRepo") {
85 const did = url.searchParams.get("did");
86 if (!did || !accountMap.has(did)) {
87 res.writeHead(404, { "Content-Type": "application/json" });
88 res.end(JSON.stringify({ error: "RepoNotFound" }));
89 return;
90 }
91 const carBytes = accountMap.get(did)!;
92 res.writeHead(200, {
93 "Content-Type": "application/vnd.ipld.car",
94 "Content-Length": String(carBytes.length),
95 });
96 res.end(Buffer.from(carBytes));
97 return;
98 }
99
100 if (pathname === "/xrpc/com.atproto.repo.listRecords") {
101 const did = url.searchParams.get("repo");
102 const collection = url.searchParams.get("collection");
103 if (!did || !collection) {
104 res.writeHead(400, { "Content-Type": "application/json" });
105 res.end(JSON.stringify({ error: "InvalidRequest" }));
106 return;
107 }
108 const didRecords = records.get(did)?.get(collection);
109 const recordList = didRecords ? Array.from(didRecords.values()) : [];
110 res.writeHead(200, { "Content-Type": "application/json" });
111 res.end(JSON.stringify({ records: recordList }));
112 return;
113 }
114
115 if (pathname === "/xrpc/com.atproto.repo.getRecord") {
116 const did = url.searchParams.get("repo");
117 const collection = url.searchParams.get("collection");
118 const rkey = url.searchParams.get("rkey");
119 if (!did || !collection || !rkey) {
120 res.writeHead(400, { "Content-Type": "application/json" });
121 res.end(JSON.stringify({ error: "InvalidRequest" }));
122 return;
123 }
124 const record = records.get(did)?.get(collection)?.get(rkey);
125 if (!record) {
126 res.writeHead(404, { "Content-Type": "application/json" });
127 res.end(JSON.stringify({ error: "RecordNotFound" }));
128 return;
129 }
130 res.writeHead(200, { "Content-Type": "application/json" });
131 res.end(JSON.stringify(record));
132 return;
133 }
134
135 res.writeHead(404, { "Content-Type": "application/json" });
136 res.end(JSON.stringify({ error: "NotFound" }));
137 });
138
139 return new Promise((resolvePromise) => {
140 server.listen(0, "127.0.0.1", () => {
141 const addr = server.address() as { port: number };
142 const pdsUrl = `http://127.0.0.1:${addr.port}`;
143 resolvePromise({
144 url: pdsUrl,
145 port: addr.port,
146 close: () => new Promise<void>((res) => server.close(() => res())),
147 updateAccount: (did: string, carBytes: Uint8Array) => {
148 accountMap.set(did, carBytes);
149 },
150 addRecord: (did: string, collection: string, rkey: string, value: unknown) => {
151 if (!records.has(did)) records.set(did, new Map());
152 const didMap = records.get(did)!;
153 if (!didMap.has(collection)) didMap.set(collection, new Map());
154 didMap.get(collection)!.set(rkey, {
155 uri: `at://${did}/${collection}/${rkey}`,
156 cid: "bafytest",
157 value,
158 });
159 },
160 });
161 });
162 });
163}
164
165function createMockDidResolver(mapping: Record<string, string>): DidResolver {
166 return {
167 resolve: async (did: string): Promise<DidDocument | null> => {
168 const pdsUrl = mapping[did];
169 if (!pdsUrl) return null;
170 return {
171 id: did,
172 service: [
173 {
174 id: "#atproto_pds",
175 type: "AtprotoPersonalDataServer",
176 serviceEndpoint: pdsUrl,
177 },
178 ],
179 } as unknown as DidDocument;
180 },
181 } as DidResolver;
182}
183
184function createMockRecordWriter(did: string, pds: EnhancedMockPds) {
185 return {
186 putRecord: async (collection: string, rkey: string, record: unknown) => {
187 pds.addRecord(did, collection, rkey, record);
188 return { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest" };
189 },
190 deleteRecord: async (_collection: string, _rkey: string) => {},
191 listRecords: async (collection: string, _opts: { limit: number }) => {
192 const res = await fetch(
193 `${pds.url}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&limit=100`,
194 );
195 if (!res.ok) return { records: [] };
196 return (await res.json()) as { records: Array<{ uri: string; cid: string; value: unknown }> };
197 },
198 };
199}
200
201/**
202 * Wait for a condition to become true, with a timeout.
203 */
204async function waitFor(
205 fn: () => Promise<boolean> | boolean,
206 timeoutMs: number = 10_000,
207 intervalMs: number = 200,
208): Promise<void> {
209 const deadline = Date.now() + timeoutMs;
210 while (Date.now() < deadline) {
211 if (await fn()) return;
212 await new Promise((r) => setTimeout(r, intervalMs));
213 }
214 throw new Error(`waitFor timed out after ${timeoutMs}ms`);
215}
216
217/**
218 * Pre-create identity in the SQLite database (simulates OAuth already done).
219 */
220function presetIdentity(dataDir: string, did: string, handle: string): void {
221 const db = new Database(resolve(dataDir, "pds.db"));
222 db.pragma("journal_mode = WAL");
223 db.exec(
224 "CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))",
225 );
226 db.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(did, handle);
227 db.close();
228}
229
230// ---- Test suite ----
231
232describe("Capstone E2E: on-protocol bidirectional replication", () => {
233 let tmpDirA: string;
234 let tmpDirB: string;
235 let handleA: ServerHandle | undefined;
236 let handleB: ServerHandle | undefined;
237 let mockPds: EnhancedMockPds | undefined;
238
239 afterEach(async () => {
240 const closes: Promise<void>[] = [];
241 if (handleA) closes.push(handleA.close().catch(() => {}));
242 if (handleB) closes.push(handleB.close().catch(() => {}));
243 await Promise.all(closes);
244 handleA = undefined;
245 handleB = undefined;
246
247 if (mockPds) { await mockPds.close().catch(() => {}); mockPds = undefined; }
248 if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true });
249 if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true });
250 });
251
252 it("full flow: self-sync, libp2p cross-sync, XRPC serve, incremental re-sync, mutual offers", async () => {
253 // ================================================================
254 // Step 1: Setup — create repos, mock PDS, start two servers
255 // ================================================================
256 tmpDirA = mkdtempSync(join(tmpdir(), "capstone-a-"));
257 tmpDirB = mkdtempSync(join(tmpdir(), "capstone-b-"));
258
259 // Create repos with update support for incremental sync
260 const { initialCar: aliceInitialCar, fullCar: aliceFullCar } = await createTestRepoWithUpdate(
261 DID_A,
262 [
263 { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post 1", createdAt: new Date().toISOString() } },
264 { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } },
265 ],
266 [
267 { collection: "app.bsky.feed.post", rkey: "a3", record: { text: "Alice post 3 (update)", createdAt: new Date().toISOString() } },
268 ],
269 );
270 const { initialCar: bobInitialCar, fullCar: bobFullCar } = await createTestRepoWithUpdate(
271 DID_B,
272 [
273 { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post 1", createdAt: new Date().toISOString() } },
274 ],
275 [
276 { collection: "app.bsky.feed.post", rkey: "b2", record: { text: "Bob post 2 (update)", createdAt: new Date().toISOString() } },
277 ],
278 );
279
280 // Start mock PDS with initial CARs
281 mockPds = await startEnhancedMockPds([
282 { did: DID_A, carBytes: aliceInitialCar },
283 { did: DID_B, carBytes: bobInitialCar },
284 ]);
285
286 // Pre-inject offer records so they're available for discovery later
287 mockPds.addRecord(DID_A, OFFER_NSID, didToRkey(DID_B), {
288 $type: OFFER_NSID,
289 subject: DID_B,
290 minCopies: 2,
291 intervalSec: 300,
292 priority: 50,
293 createdAt: new Date().toISOString(),
294 });
295 mockPds.addRecord(DID_B, OFFER_NSID, didToRkey(DID_A), {
296 $type: OFFER_NSID,
297 subject: DID_A,
298 minCopies: 3,
299 intervalSec: 600,
300 priority: 75,
301 createdAt: new Date().toISOString(),
302 });
303
304 const resolver = createMockDidResolver({
305 [DID_A]: mockPds.url,
306 [DID_B]: mockPds.url,
307 });
308
309 // Pre-set identities
310 const configA = baseConfig(tmpDirA);
311 configA.DID = DID_A;
312 configA.HANDLE = "alice.test";
313 presetIdentity(tmpDirA, DID_A, "alice.test");
314
315 const configB = baseConfig(tmpDirB);
316 configB.DID = DID_B;
317 configB.HANDLE = "bob.test";
318 presetIdentity(tmpDirB, DID_B, "bob.test");
319
320 // Start both servers with real networking
321 handleA = await startServer(configA, { didResolver: resolver });
322 handleB = await startServer(configB, { didResolver: resolver });
323
324 expect(handleA.replicationManager).toBeDefined();
325 expect(handleB.replicationManager).toBeDefined();
326 expect(handleA.ipfsService).toBeDefined();
327 expect(handleB.ipfsService).toBeDefined();
328
329 const rmA = handleA.replicationManager!;
330 const rmB = handleB.replicationManager!;
331 const ipfsA = handleA.ipfsService!;
332 const ipfsB = handleB.ipfsService!;
333
334 // Prevent periodic sync from interfering — set stopped flag directly
335 // (don't call stop() which would unsubscribe topics)
336 (rmA as unknown as { stopped: boolean }).stopped = true;
337 (rmB as unknown as { stopped: boolean }).stopped = true;
338 const syncTimerA = (rmA as unknown as { syncTimer: ReturnType<typeof setInterval> | null }).syncTimer;
339 if (syncTimerA) { clearInterval(syncTimerA); (rmA as unknown as { syncTimer: null }).syncTimer = null; }
340 const syncTimerB = (rmB as unknown as { syncTimer: ReturnType<typeof setInterval> | null }).syncTimer;
341 if (syncTimerB) { clearInterval(syncTimerB); (rmB as unknown as { syncTimer: null }).syncTimer = null; }
342
343 // Verify both nodes have peer IDs (networking is on)
344 const peerIdA = ipfsA.getPeerId()!;
345 const peerIdB = ipfsB.getPeerId()!;
346 expect(peerIdA).toBeTruthy();
347 expect(peerIdB).toBeTruthy();
348 expect(peerIdA).not.toBe(peerIdB);
349
350 const addrsA = ipfsA.getMultiaddrs();
351 const addrsB = ipfsB.getMultiaddrs();
352 expect(addrsA.length).toBeGreaterThan(0);
353 expect(addrsB.length).toBeGreaterThan(0);
354
355 // ================================================================
356 // Step 2: Self-sync — each node syncs its own DID
357 // ================================================================
358 await rmA.addDid(DID_A);
359 await waitFor(() => {
360 const state = rmA.getSyncStorage().getState(DID_A);
361 return state?.status === "synced";
362 }, 15_000);
363
364 await rmB.addDid(DID_B);
365 await waitFor(() => {
366 const state = rmB.getSyncStorage().getState(DID_B);
367 return state?.status === "synced";
368 }, 15_000);
369
370 // Verify self-sync completed
371 expect(rmA.getSyncStorage().getState(DID_A)?.status).toBe("synced");
372 expect(rmB.getSyncStorage().getState(DID_B)?.status).toBe("synced");
373
374 // ================================================================
375 // Step 3: Peer dial + peer record injection
376 // ================================================================
377 // Add org.p2pds.peer/self records to mock PDS so discoverPeer()
378 // finds the real peer info during sync. Each DID's peer record
379 // contains the multiaddrs of the node that hosts that DID.
380 const PEER_NSID = "org.p2pds.peer";
381 mockPds.addRecord(DID_A, PEER_NSID, "self", {
382 $type: PEER_NSID,
383 peerId: peerIdA,
384 multiaddrs: addrsA,
385 createdAt: new Date().toISOString(),
386 });
387 mockPds.addRecord(DID_B, PEER_NSID, "self", {
388 $type: PEER_NSID,
389 peerId: peerIdB,
390 multiaddrs: addrsB,
391 createdAt: new Date().toISOString(),
392 });
393
394 // Connect the two nodes via libp2p
395 await ipfsA.dial(addrsB[0]!);
396 await waitFor(
397 () => ipfsA.getConnectionCount() > 0 && ipfsB.getConnectionCount() > 0,
398 10_000,
399 );
400
401 // ================================================================
402 // Step 4: Cross-sync — addDid discovers peer info, syncs via libp2p
403 // ================================================================
404 // addDid fires background syncDid → discoverPeer finds peer record →
405 // peer info populated → libp2p path activated.
406 // Sync sequentially to avoid concurrent stream conflicts on the
407 // same libp2p connection (yamux muxer doesn't handle concurrent
408 // dialProtocol from both sides well).
409 await rmA.addDid(DID_B);
410 await waitFor(() => {
411 const state = rmA.getSyncStorage().getState(DID_B);
412 if (state?.status !== "synced") return false;
413 // Also wait for sync_history to be completed (not just state table)
414 const history = rmA.getSyncStorage().getSyncHistory(DID_B, 5);
415 return history.some((h) => h.sourceType === "libp2p" && h.status === "success");
416 }, 30_000);
417
418 await rmB.addDid(DID_A);
419 await waitFor(() => {
420 const state = rmB.getSyncStorage().getState(DID_A);
421 if (state?.status !== "synced") return false;
422 const history = rmB.getSyncStorage().getSyncHistory(DID_A, 5);
423 return history.some((h) => h.sourceType === "libp2p" && h.status === "success");
424 }, 30_000);
425
426 // Verify peer info was discovered and stored
427 const stateAforB = rmA.getSyncStorage().getState(DID_B);
428 expect(stateAforB?.peerId).toBe(peerIdB);
429 expect(stateAforB?.peerMultiaddrs?.length).toBeGreaterThan(0);
430
431 const stateBforA = rmB.getSyncStorage().getState(DID_A);
432 expect(stateBforA?.peerId).toBe(peerIdA);
433 expect(stateBforA?.peerMultiaddrs?.length).toBeGreaterThan(0);
434
435 // Verify sync used libp2p source (the initial sync should have used it
436 // since peer info was discovered before the fetch step)
437 const historyA = rmA.getSyncStorage().getSyncHistory(DID_B, 5);
438 const libp2pSyncA = historyA.find((h) => h.sourceType === "libp2p");
439 expect(libp2pSyncA).toBeDefined();
440 expect(libp2pSyncA!.status).toBe("success");
441
442 const historyB = rmB.getSyncStorage().getSyncHistory(DID_A, 5);
443 const libp2pSyncB = historyB.find((h) => h.sourceType === "libp2p");
444 expect(libp2pSyncB).toBeDefined();
445 expect(libp2pSyncB!.status).toBe("success");
446
447 // ================================================================
448 // Step 6: Verify cross-serving via XRPC
449 // ================================================================
450
451 // Node A serves Bob's repo via getRepo
452 const getRepoA = await fetch(
453 `${handleA.url}/xrpc/com.atproto.sync.getRepo?did=${DID_B}`,
454 );
455 expect(getRepoA.status).toBe(200);
456 expect(getRepoA.headers.get("content-type")).toBe("application/vnd.ipld.car");
457 const carBytesFromA = new Uint8Array(await getRepoA.arrayBuffer());
458 expect(carBytesFromA.length).toBeGreaterThan(0);
459
460 // Node B serves Alice's repo via getRepo
461 const getRepoB = await fetch(
462 `${handleB.url}/xrpc/com.atproto.sync.getRepo?did=${DID_A}`,
463 );
464 expect(getRepoB.status).toBe(200);
465 const carBytesFromB = new Uint8Array(await getRepoB.arrayBuffer());
466 expect(carBytesFromB.length).toBeGreaterThan(0);
467
468 // Node A can read Bob's record
469 const recordA = await fetch(
470 `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_B}&collection=app.bsky.feed.post&rkey=b1`,
471 );
472 expect(recordA.status).toBe(200);
473 const recA = (await recordA.json()) as { value: { text: string } };
474 expect(recA.value.text).toBe("Bob post 1");
475
476 // Node B can read Alice's record
477 const recordB = await fetch(
478 `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_A}&collection=app.bsky.feed.post&rkey=a1`,
479 );
480 expect(recordB.status).toBe(200);
481 const recB = (await recordB.json()) as { value: { text: string } };
482 expect(recB.value.text).toBe("Alice post 1");
483
484 // Node A can describe Bob's repo
485 const describeA = await fetch(
486 `${handleA.url}/xrpc/com.atproto.repo.describeRepo?repo=${DID_B}`,
487 );
488 expect(describeA.status).toBe(200);
489 const descA = (await describeA.json()) as { did: string; collections: string[] };
490 expect(descA.did).toBe(DID_B);
491 expect(descA.collections).toContain("app.bsky.feed.post");
492
493 // Node A getRepoStatus for Bob
494 const repoStatusA = await fetch(
495 `${handleA.url}/xrpc/com.atproto.sync.getRepoStatus?did=${DID_B}`,
496 );
497 expect(repoStatusA.status).toBe(200);
498 const rsA = (await repoStatusA.json()) as { did: string; rev: string | null };
499 expect(rsA.did).toBe(DID_B);
500 expect(rsA.rev).toBeTruthy();
501
502 // ================================================================
503 // Step 7: Update repos + incremental re-sync via libp2p
504 // ================================================================
505 // Update mock PDS with full CARs (which include additional records)
506 mockPds.updateAccount(DID_A, aliceFullCar);
507 mockPds.updateAccount(DID_B, bobFullCar);
508
509 // First, re-sync each node's own DID to update local state
510 // (so the peer can serve the updated data)
511 await rmA.syncDid(DID_A, "manual");
512 await rmB.syncDid(DID_B, "manual");
513
514 // Now cross-sync: each node fetches the other's updated data via libp2p
515 await rmA.syncDid(DID_B, "manual");
516 await rmB.syncDid(DID_A, "manual");
517
518 // Verify incremental sync occurred
519 const historyA2 = rmA.getSyncStorage().getSyncHistory(DID_B, 10);
520 const incrementalSyncA = historyA2.find((h) => h.incremental && h.sourceType === "libp2p");
521 expect(incrementalSyncA).toBeDefined();
522 expect(incrementalSyncA!.status).toBe("success");
523 // Verify the incremental sync actually transferred data
524 expect(incrementalSyncA!.carBytes).toBeGreaterThan(0);
525
526 // Verify the new records are accessible
527 const newRecordA = await fetch(
528 `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_B}&collection=app.bsky.feed.post&rkey=b2`,
529 );
530 expect(newRecordA.status).toBe(200);
531 const newRecA = (await newRecordA.json()) as { value: { text: string } };
532 expect(newRecA.value.text).toBe("Bob post 2 (update)");
533
534 const newRecordB = await fetch(
535 `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_A}&collection=app.bsky.feed.post&rkey=a3`,
536 );
537 expect(newRecordB.status).toBe(200);
538 const newRecB = (await newRecordB.json()) as { value: { text: string } };
539 expect(newRecB.value.text).toBe("Alice post 3 (update)");
540
541 // ================================================================
542 // Step 8: Mutual offers -> auto-policies
543 // ================================================================
544 // Inject PolicyEngine and RecordWriter into both ReplicationManagers
545 const peA = new PolicyEngine({ version: 1, policies: [] });
546 const peB = new PolicyEngine({ version: 1, policies: [] });
547 (rmA as unknown as { policyEngine: PolicyEngine }).policyEngine = peA;
548 (rmB as unknown as { policyEngine: PolicyEngine }).policyEngine = peB;
549 rmA.setPdsClient(createMockRecordWriter(DID_A, mockPds), DID_A);
550 rmB.setPdsClient(createMockRecordWriter(DID_B, mockPds), DID_B);
551
552 // Discover offers
553 const offerManagerA = rmA.getOfferManager();
554 const offerManagerB = rmB.getOfferManager();
555 expect(offerManagerA).toBeDefined();
556 expect(offerManagerB).toBeDefined();
557
558 const statesA = rmA.getSyncStates();
559 const peersA = statesA.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint }));
560 const agreementsA = await offerManagerA!.discoverAndSync(peersA);
561
562 const statesB = rmB.getSyncStates();
563 const peersB = statesB.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint }));
564 const agreementsB = await offerManagerB!.discoverAndSync(peersB);
565
566 // Both should detect one mutual agreement
567 expect(agreementsA.length).toBe(1);
568 expect(agreementsA[0]!.counterpartyDid).toBe(DID_B);
569 expect(agreementsB.length).toBe(1);
570 expect(agreementsB[0]!.counterpartyDid).toBe(DID_A);
571
572 // Verify effective params: max(minCopies), min(intervalSec), max(priority)
573 expect(agreementsA[0]!.effectiveParams.minCopies).toBe(3);
574 expect(agreementsA[0]!.effectiveParams.intervalSec).toBe(300);
575 expect(agreementsA[0]!.effectiveParams.priority).toBe(75);
576
577 // Verify policies were created
578 const policiesA = peA.getPolicies();
579 expect(policiesA.length).toBe(1);
580 expect(policiesA[0]!.id).toBe(`p2p:${DID_B}`);
581 expect(policiesA[0]!.replication.minCopies).toBe(3);
582
583 const policiesB = peB.getPolicies();
584 expect(policiesB.length).toBe(1);
585 expect(policiesB[0]!.id).toBe(`p2p:${DID_A}`);
586 }, 120_000);
587});