atproto user agency toolkit for individuals and groups
at main 587 lines 22 kB view raw
1/** 2 * Capstone E2E test: full on-protocol bidirectional replication. 3 * 4 * Two startServer() instances with IPFS_NETWORKING: true, real libp2p, 5 * CAR-over-libp2p protocol. Exercises the complete loop: 6 * self-sync -> peer dial -> cross-sync via libp2p -> XRPC serving -> 7 * incremental re-sync -> mutual offers -> auto-policies 8 */ 9 10import { describe, it, expect, afterEach } from "vitest"; 11import { mkdtempSync, rmSync } from "node:fs"; 12import { tmpdir } from "node:os"; 13import { join, resolve } from "node:path"; 14import Database from "better-sqlite3"; 15 16import type { Config } from "./config.js"; 17import { startServer, type ServerHandle } from "./start.js"; 18import { 19 createTestRepo, 20 createTestRepoWithUpdate, 21} from "./replication/test-helpers.js"; 22import type { DidResolver, DidDocument } from "./did-resolver.js"; 23import { PolicyEngine } from "./policy/engine.js"; 24import { OFFER_NSID, didToRkey } from "./replication/types.js"; 25 26const DID_A = "did:plc:capstone-alice"; 27const DID_B = "did:plc:capstone-bob"; 28 29function baseConfig(dataDir: string): Config { 30 return { 31 PDS_HOSTNAME: "local.test", 32 AUTH_TOKEN: "test-auth-token", 33 JWT_SECRET: "test-jwt-secret", 34 PASSWORD_HASH: "$2a$10$test", 35 DATA_DIR: dataDir, 36 PORT: 0, 37 IPFS_ENABLED: true, 38 IPFS_NETWORKING: true, 39 REPLICATE_DIDS: [], 40 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 41 FIREHOSE_ENABLED: false, 42 RATE_LIMIT_ENABLED: false, 43 RATE_LIMIT_READ_PER_MIN: 300, 44 RATE_LIMIT_SYNC_PER_MIN: 30, 45 RATE_LIMIT_SESSION_PER_MIN: 10, 46 RATE_LIMIT_WRITE_PER_MIN: 200, 47 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 48 RATE_LIMIT_MAX_CONNECTIONS: 100, 49 RATE_LIMIT_FIREHOSE_PER_IP: 3, 50 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 51 }; 52} 53 54// ---- Enhanced mock PDS ---- 55 56interface EnhancedMockPds { 57 url: string; 58 port: number; 59 close: () => Promise<void>; 60 updateAccount: (did: string, carBytes: Uint8Array) => void; 61 addRecord: (did: string, collection: string, rkey: string, value: unknown) => void; 62} 63 64async function startEnhancedMockPds( 65 accounts: Array<{ did: string; carBytes: Uint8Array }>, 66): Promise<EnhancedMockPds> { 67 const { createServer } = await import("node:http"); 68 69 const accountMap = new Map<string, Uint8Array>(); 70 for (const a of accounts) { 71 accountMap.set(a.did, a.carBytes); 72 } 73 74 // Records: did -> collection -> rkey -> { uri, cid, value } 75 const records = new Map< 76 string, 77 Map<string, Map<string, { uri: string; cid: string; value: unknown }>> 78 >(); 79 80 const server = createServer((req, res) => { 81 const url = new URL(req.url ?? "/", "http://localhost"); 82 const pathname = url.pathname; 83 84 if (pathname === "/xrpc/com.atproto.sync.getRepo") { 85 const did = url.searchParams.get("did"); 86 if (!did || !accountMap.has(did)) { 87 res.writeHead(404, { "Content-Type": "application/json" }); 88 res.end(JSON.stringify({ error: "RepoNotFound" })); 89 return; 90 } 91 const carBytes = accountMap.get(did)!; 92 res.writeHead(200, { 93 "Content-Type": "application/vnd.ipld.car", 94 "Content-Length": String(carBytes.length), 95 }); 96 res.end(Buffer.from(carBytes)); 97 return; 98 } 99 100 if (pathname === "/xrpc/com.atproto.repo.listRecords") { 101 const did = url.searchParams.get("repo"); 102 const collection = url.searchParams.get("collection"); 103 if (!did || !collection) { 104 res.writeHead(400, { "Content-Type": "application/json" }); 105 res.end(JSON.stringify({ error: "InvalidRequest" })); 106 return; 107 } 108 const didRecords = records.get(did)?.get(collection); 109 const recordList = didRecords ? Array.from(didRecords.values()) : []; 110 res.writeHead(200, { "Content-Type": "application/json" }); 111 res.end(JSON.stringify({ records: recordList })); 112 return; 113 } 114 115 if (pathname === "/xrpc/com.atproto.repo.getRecord") { 116 const did = url.searchParams.get("repo"); 117 const collection = url.searchParams.get("collection"); 118 const rkey = url.searchParams.get("rkey"); 119 if (!did || !collection || !rkey) { 120 res.writeHead(400, { "Content-Type": "application/json" }); 121 res.end(JSON.stringify({ error: "InvalidRequest" })); 122 return; 123 } 124 const record = records.get(did)?.get(collection)?.get(rkey); 125 if (!record) { 126 res.writeHead(404, { "Content-Type": "application/json" }); 127 res.end(JSON.stringify({ error: "RecordNotFound" })); 128 return; 129 } 130 res.writeHead(200, { "Content-Type": "application/json" }); 131 res.end(JSON.stringify(record)); 132 return; 133 } 134 135 res.writeHead(404, { "Content-Type": "application/json" }); 136 res.end(JSON.stringify({ error: "NotFound" })); 137 }); 138 139 return new Promise((resolvePromise) => { 140 server.listen(0, "127.0.0.1", () => { 141 const addr = server.address() as { port: number }; 142 const pdsUrl = `http://127.0.0.1:${addr.port}`; 143 resolvePromise({ 144 url: pdsUrl, 145 port: addr.port, 146 close: () => new Promise<void>((res) => server.close(() => res())), 147 updateAccount: (did: string, carBytes: Uint8Array) => { 148 accountMap.set(did, carBytes); 149 }, 150 addRecord: (did: string, collection: string, rkey: string, value: unknown) => { 151 if (!records.has(did)) records.set(did, new Map()); 152 const didMap = records.get(did)!; 153 if (!didMap.has(collection)) didMap.set(collection, new Map()); 154 didMap.get(collection)!.set(rkey, { 155 uri: `at://${did}/${collection}/${rkey}`, 156 cid: "bafytest", 157 value, 158 }); 159 }, 160 }); 161 }); 162 }); 163} 164 165function createMockDidResolver(mapping: Record<string, string>): DidResolver { 166 return { 167 resolve: async (did: string): Promise<DidDocument | null> => { 168 const pdsUrl = mapping[did]; 169 if (!pdsUrl) return null; 170 return { 171 id: did, 172 service: [ 173 { 174 id: "#atproto_pds", 175 type: "AtprotoPersonalDataServer", 176 serviceEndpoint: pdsUrl, 177 }, 178 ], 179 } as unknown as DidDocument; 180 }, 181 } as DidResolver; 182} 183 184function createMockRecordWriter(did: string, pds: EnhancedMockPds) { 185 return { 186 putRecord: async (collection: string, rkey: string, record: unknown) => { 187 pds.addRecord(did, collection, rkey, record); 188 return { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest" }; 189 }, 190 deleteRecord: async (_collection: string, _rkey: string) => {}, 191 listRecords: async (collection: string, _opts: { limit: number }) => { 192 const res = await fetch( 193 `${pds.url}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&limit=100`, 194 ); 195 if (!res.ok) return { records: [] }; 196 return (await res.json()) as { records: Array<{ uri: string; cid: string; value: unknown }> }; 197 }, 198 }; 199} 200 201/** 202 * Wait for a condition to become true, with a timeout. 203 */ 204async function waitFor( 205 fn: () => Promise<boolean> | boolean, 206 timeoutMs: number = 10_000, 207 intervalMs: number = 200, 208): Promise<void> { 209 const deadline = Date.now() + timeoutMs; 210 while (Date.now() < deadline) { 211 if (await fn()) return; 212 await new Promise((r) => setTimeout(r, intervalMs)); 213 } 214 throw new Error(`waitFor timed out after ${timeoutMs}ms`); 215} 216 217/** 218 * Pre-create identity in the SQLite database (simulates OAuth already done). 219 */ 220function presetIdentity(dataDir: string, did: string, handle: string): void { 221 const db = new Database(resolve(dataDir, "pds.db")); 222 db.pragma("journal_mode = WAL"); 223 db.exec( 224 "CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))", 225 ); 226 db.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(did, handle); 227 db.close(); 228} 229 230// ---- Test suite ---- 231 232describe("Capstone E2E: on-protocol bidirectional replication", () => { 233 let tmpDirA: string; 234 let tmpDirB: string; 235 let handleA: ServerHandle | undefined; 236 let handleB: ServerHandle | undefined; 237 let mockPds: EnhancedMockPds | undefined; 238 239 afterEach(async () => { 240 const closes: Promise<void>[] = []; 241 if (handleA) closes.push(handleA.close().catch(() => {})); 242 if (handleB) closes.push(handleB.close().catch(() => {})); 243 await Promise.all(closes); 244 handleA = undefined; 245 handleB = undefined; 246 247 if (mockPds) { await mockPds.close().catch(() => {}); mockPds = undefined; } 248 if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); 249 if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); 250 }); 251 252 it("full flow: self-sync, libp2p cross-sync, XRPC serve, incremental re-sync, mutual offers", async () => { 253 // ================================================================ 254 // Step 1: Setup — create repos, mock PDS, start two servers 255 // ================================================================ 256 tmpDirA = mkdtempSync(join(tmpdir(), "capstone-a-")); 257 tmpDirB = mkdtempSync(join(tmpdir(), "capstone-b-")); 258 259 // Create repos with update support for incremental sync 260 const { initialCar: aliceInitialCar, fullCar: aliceFullCar } = await createTestRepoWithUpdate( 261 DID_A, 262 [ 263 { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post 1", createdAt: new Date().toISOString() } }, 264 { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, 265 ], 266 [ 267 { collection: "app.bsky.feed.post", rkey: "a3", record: { text: "Alice post 3 (update)", createdAt: new Date().toISOString() } }, 268 ], 269 ); 270 const { initialCar: bobInitialCar, fullCar: bobFullCar } = await createTestRepoWithUpdate( 271 DID_B, 272 [ 273 { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post 1", createdAt: new Date().toISOString() } }, 274 ], 275 [ 276 { collection: "app.bsky.feed.post", rkey: "b2", record: { text: "Bob post 2 (update)", createdAt: new Date().toISOString() } }, 277 ], 278 ); 279 280 // Start mock PDS with initial CARs 281 mockPds = await startEnhancedMockPds([ 282 { did: DID_A, carBytes: aliceInitialCar }, 283 { did: DID_B, carBytes: bobInitialCar }, 284 ]); 285 286 // Pre-inject offer records so they're available for discovery later 287 mockPds.addRecord(DID_A, OFFER_NSID, didToRkey(DID_B), { 288 $type: OFFER_NSID, 289 subject: DID_B, 290 minCopies: 2, 291 intervalSec: 300, 292 priority: 50, 293 createdAt: new Date().toISOString(), 294 }); 295 mockPds.addRecord(DID_B, OFFER_NSID, didToRkey(DID_A), { 296 $type: OFFER_NSID, 297 subject: DID_A, 298 minCopies: 3, 299 intervalSec: 600, 300 priority: 75, 301 createdAt: new Date().toISOString(), 302 }); 303 304 const resolver = createMockDidResolver({ 305 [DID_A]: mockPds.url, 306 [DID_B]: mockPds.url, 307 }); 308 309 // Pre-set identities 310 const configA = baseConfig(tmpDirA); 311 configA.DID = DID_A; 312 configA.HANDLE = "alice.test"; 313 presetIdentity(tmpDirA, DID_A, "alice.test"); 314 315 const configB = baseConfig(tmpDirB); 316 configB.DID = DID_B; 317 configB.HANDLE = "bob.test"; 318 presetIdentity(tmpDirB, DID_B, "bob.test"); 319 320 // Start both servers with real networking 321 handleA = await startServer(configA, { didResolver: resolver }); 322 handleB = await startServer(configB, { didResolver: resolver }); 323 324 expect(handleA.replicationManager).toBeDefined(); 325 expect(handleB.replicationManager).toBeDefined(); 326 expect(handleA.ipfsService).toBeDefined(); 327 expect(handleB.ipfsService).toBeDefined(); 328 329 const rmA = handleA.replicationManager!; 330 const rmB = handleB.replicationManager!; 331 const ipfsA = handleA.ipfsService!; 332 const ipfsB = handleB.ipfsService!; 333 334 // Prevent periodic sync from interfering — set stopped flag directly 335 // (don't call stop() which would unsubscribe topics) 336 (rmA as unknown as { stopped: boolean }).stopped = true; 337 (rmB as unknown as { stopped: boolean }).stopped = true; 338 const syncTimerA = (rmA as unknown as { syncTimer: ReturnType<typeof setInterval> | null }).syncTimer; 339 if (syncTimerA) { clearInterval(syncTimerA); (rmA as unknown as { syncTimer: null }).syncTimer = null; } 340 const syncTimerB = (rmB as unknown as { syncTimer: ReturnType<typeof setInterval> | null }).syncTimer; 341 if (syncTimerB) { clearInterval(syncTimerB); (rmB as unknown as { syncTimer: null }).syncTimer = null; } 342 343 // Verify both nodes have peer IDs (networking is on) 344 const peerIdA = ipfsA.getPeerId()!; 345 const peerIdB = ipfsB.getPeerId()!; 346 expect(peerIdA).toBeTruthy(); 347 expect(peerIdB).toBeTruthy(); 348 expect(peerIdA).not.toBe(peerIdB); 349 350 const addrsA = ipfsA.getMultiaddrs(); 351 const addrsB = ipfsB.getMultiaddrs(); 352 expect(addrsA.length).toBeGreaterThan(0); 353 expect(addrsB.length).toBeGreaterThan(0); 354 355 // ================================================================ 356 // Step 2: Self-sync — each node syncs its own DID 357 // ================================================================ 358 await rmA.addDid(DID_A); 359 await waitFor(() => { 360 const state = rmA.getSyncStorage().getState(DID_A); 361 return state?.status === "synced"; 362 }, 15_000); 363 364 await rmB.addDid(DID_B); 365 await waitFor(() => { 366 const state = rmB.getSyncStorage().getState(DID_B); 367 return state?.status === "synced"; 368 }, 15_000); 369 370 // Verify self-sync completed 371 expect(rmA.getSyncStorage().getState(DID_A)?.status).toBe("synced"); 372 expect(rmB.getSyncStorage().getState(DID_B)?.status).toBe("synced"); 373 374 // ================================================================ 375 // Step 3: Peer dial + peer record injection 376 // ================================================================ 377 // Add org.p2pds.peer/self records to mock PDS so discoverPeer() 378 // finds the real peer info during sync. Each DID's peer record 379 // contains the multiaddrs of the node that hosts that DID. 380 const PEER_NSID = "org.p2pds.peer"; 381 mockPds.addRecord(DID_A, PEER_NSID, "self", { 382 $type: PEER_NSID, 383 peerId: peerIdA, 384 multiaddrs: addrsA, 385 createdAt: new Date().toISOString(), 386 }); 387 mockPds.addRecord(DID_B, PEER_NSID, "self", { 388 $type: PEER_NSID, 389 peerId: peerIdB, 390 multiaddrs: addrsB, 391 createdAt: new Date().toISOString(), 392 }); 393 394 // Connect the two nodes via libp2p 395 await ipfsA.dial(addrsB[0]!); 396 await waitFor( 397 () => ipfsA.getConnectionCount() > 0 && ipfsB.getConnectionCount() > 0, 398 10_000, 399 ); 400 401 // ================================================================ 402 // Step 4: Cross-sync — addDid discovers peer info, syncs via libp2p 403 // ================================================================ 404 // addDid fires background syncDid → discoverPeer finds peer record → 405 // peer info populated → libp2p path activated. 406 // Sync sequentially to avoid concurrent stream conflicts on the 407 // same libp2p connection (yamux muxer doesn't handle concurrent 408 // dialProtocol from both sides well). 409 await rmA.addDid(DID_B); 410 await waitFor(() => { 411 const state = rmA.getSyncStorage().getState(DID_B); 412 if (state?.status !== "synced") return false; 413 // Also wait for sync_history to be completed (not just state table) 414 const history = rmA.getSyncStorage().getSyncHistory(DID_B, 5); 415 return history.some((h) => h.sourceType === "libp2p" && h.status === "success"); 416 }, 30_000); 417 418 await rmB.addDid(DID_A); 419 await waitFor(() => { 420 const state = rmB.getSyncStorage().getState(DID_A); 421 if (state?.status !== "synced") return false; 422 const history = rmB.getSyncStorage().getSyncHistory(DID_A, 5); 423 return history.some((h) => h.sourceType === "libp2p" && h.status === "success"); 424 }, 30_000); 425 426 // Verify peer info was discovered and stored 427 const stateAforB = rmA.getSyncStorage().getState(DID_B); 428 expect(stateAforB?.peerId).toBe(peerIdB); 429 expect(stateAforB?.peerMultiaddrs?.length).toBeGreaterThan(0); 430 431 const stateBforA = rmB.getSyncStorage().getState(DID_A); 432 expect(stateBforA?.peerId).toBe(peerIdA); 433 expect(stateBforA?.peerMultiaddrs?.length).toBeGreaterThan(0); 434 435 // Verify sync used libp2p source (the initial sync should have used it 436 // since peer info was discovered before the fetch step) 437 const historyA = rmA.getSyncStorage().getSyncHistory(DID_B, 5); 438 const libp2pSyncA = historyA.find((h) => h.sourceType === "libp2p"); 439 expect(libp2pSyncA).toBeDefined(); 440 expect(libp2pSyncA!.status).toBe("success"); 441 442 const historyB = rmB.getSyncStorage().getSyncHistory(DID_A, 5); 443 const libp2pSyncB = historyB.find((h) => h.sourceType === "libp2p"); 444 expect(libp2pSyncB).toBeDefined(); 445 expect(libp2pSyncB!.status).toBe("success"); 446 447 // ================================================================ 448 // Step 6: Verify cross-serving via XRPC 449 // ================================================================ 450 451 // Node A serves Bob's repo via getRepo 452 const getRepoA = await fetch( 453 `${handleA.url}/xrpc/com.atproto.sync.getRepo?did=${DID_B}`, 454 ); 455 expect(getRepoA.status).toBe(200); 456 expect(getRepoA.headers.get("content-type")).toBe("application/vnd.ipld.car"); 457 const carBytesFromA = new Uint8Array(await getRepoA.arrayBuffer()); 458 expect(carBytesFromA.length).toBeGreaterThan(0); 459 460 // Node B serves Alice's repo via getRepo 461 const getRepoB = await fetch( 462 `${handleB.url}/xrpc/com.atproto.sync.getRepo?did=${DID_A}`, 463 ); 464 expect(getRepoB.status).toBe(200); 465 const carBytesFromB = new Uint8Array(await getRepoB.arrayBuffer()); 466 expect(carBytesFromB.length).toBeGreaterThan(0); 467 468 // Node A can read Bob's record 469 const recordA = await fetch( 470 `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_B}&collection=app.bsky.feed.post&rkey=b1`, 471 ); 472 expect(recordA.status).toBe(200); 473 const recA = (await recordA.json()) as { value: { text: string } }; 474 expect(recA.value.text).toBe("Bob post 1"); 475 476 // Node B can read Alice's record 477 const recordB = await fetch( 478 `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_A}&collection=app.bsky.feed.post&rkey=a1`, 479 ); 480 expect(recordB.status).toBe(200); 481 const recB = (await recordB.json()) as { value: { text: string } }; 482 expect(recB.value.text).toBe("Alice post 1"); 483 484 // Node A can describe Bob's repo 485 const describeA = await fetch( 486 `${handleA.url}/xrpc/com.atproto.repo.describeRepo?repo=${DID_B}`, 487 ); 488 expect(describeA.status).toBe(200); 489 const descA = (await describeA.json()) as { did: string; collections: string[] }; 490 expect(descA.did).toBe(DID_B); 491 expect(descA.collections).toContain("app.bsky.feed.post"); 492 493 // Node A getRepoStatus for Bob 494 const repoStatusA = await fetch( 495 `${handleA.url}/xrpc/com.atproto.sync.getRepoStatus?did=${DID_B}`, 496 ); 497 expect(repoStatusA.status).toBe(200); 498 const rsA = (await repoStatusA.json()) as { did: string; rev: string | null }; 499 expect(rsA.did).toBe(DID_B); 500 expect(rsA.rev).toBeTruthy(); 501 502 // ================================================================ 503 // Step 7: Update repos + incremental re-sync via libp2p 504 // ================================================================ 505 // Update mock PDS with full CARs (which include additional records) 506 mockPds.updateAccount(DID_A, aliceFullCar); 507 mockPds.updateAccount(DID_B, bobFullCar); 508 509 // First, re-sync each node's own DID to update local state 510 // (so the peer can serve the updated data) 511 await rmA.syncDid(DID_A, "manual"); 512 await rmB.syncDid(DID_B, "manual"); 513 514 // Now cross-sync: each node fetches the other's updated data via libp2p 515 await rmA.syncDid(DID_B, "manual"); 516 await rmB.syncDid(DID_A, "manual"); 517 518 // Verify incremental sync occurred 519 const historyA2 = rmA.getSyncStorage().getSyncHistory(DID_B, 10); 520 const incrementalSyncA = historyA2.find((h) => h.incremental && h.sourceType === "libp2p"); 521 expect(incrementalSyncA).toBeDefined(); 522 expect(incrementalSyncA!.status).toBe("success"); 523 // Verify the incremental sync actually transferred data 524 expect(incrementalSyncA!.carBytes).toBeGreaterThan(0); 525 526 // Verify the new records are accessible 527 const newRecordA = await fetch( 528 `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_B}&collection=app.bsky.feed.post&rkey=b2`, 529 ); 530 expect(newRecordA.status).toBe(200); 531 const newRecA = (await newRecordA.json()) as { value: { text: string } }; 532 expect(newRecA.value.text).toBe("Bob post 2 (update)"); 533 534 const newRecordB = await fetch( 535 `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_A}&collection=app.bsky.feed.post&rkey=a3`, 536 ); 537 expect(newRecordB.status).toBe(200); 538 const newRecB = (await newRecordB.json()) as { value: { text: string } }; 539 expect(newRecB.value.text).toBe("Alice post 3 (update)"); 540 541 // ================================================================ 542 // Step 8: Mutual offers -> auto-policies 543 // ================================================================ 544 // Inject PolicyEngine and RecordWriter into both ReplicationManagers 545 const peA = new PolicyEngine({ version: 1, policies: [] }); 546 const peB = new PolicyEngine({ version: 1, policies: [] }); 547 (rmA as unknown as { policyEngine: PolicyEngine }).policyEngine = peA; 548 (rmB as unknown as { policyEngine: PolicyEngine }).policyEngine = peB; 549 rmA.setPdsClient(createMockRecordWriter(DID_A, mockPds), DID_A); 550 rmB.setPdsClient(createMockRecordWriter(DID_B, mockPds), DID_B); 551 552 // Discover offers 553 const offerManagerA = rmA.getOfferManager(); 554 const offerManagerB = rmB.getOfferManager(); 555 expect(offerManagerA).toBeDefined(); 556 expect(offerManagerB).toBeDefined(); 557 558 const statesA = rmA.getSyncStates(); 559 const peersA = statesA.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 560 const agreementsA = await offerManagerA!.discoverAndSync(peersA); 561 562 const statesB = rmB.getSyncStates(); 563 const peersB = statesB.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 564 const agreementsB = await offerManagerB!.discoverAndSync(peersB); 565 566 // Both should detect one mutual agreement 567 expect(agreementsA.length).toBe(1); 568 expect(agreementsA[0]!.counterpartyDid).toBe(DID_B); 569 expect(agreementsB.length).toBe(1); 570 expect(agreementsB[0]!.counterpartyDid).toBe(DID_A); 571 572 // Verify effective params: max(minCopies), min(intervalSec), max(priority) 573 expect(agreementsA[0]!.effectiveParams.minCopies).toBe(3); 574 expect(agreementsA[0]!.effectiveParams.intervalSec).toBe(300); 575 expect(agreementsA[0]!.effectiveParams.priority).toBe(75); 576 577 // Verify policies were created 578 const policiesA = peA.getPolicies(); 579 expect(policiesA.length).toBe(1); 580 expect(policiesA[0]!.id).toBe(`p2p:${DID_B}`); 581 expect(policiesA[0]!.replication.minCopies).toBe(3); 582 583 const policiesB = peB.getPolicies(); 584 expect(policiesB.length).toBe(1); 585 expect(policiesB[0]!.id).toBe(`p2p:${DID_A}`); 586 }, 120_000); 587});