atproto user agency toolkit for individuals and groups
at main 516 lines 20 kB view raw
1/** 2 * Bidirectional replication E2E test. 3 * 4 * Two p2pds nodes, each with a simulated user account on a mock PDS. 5 * Each node tracks the other's DID, syncs data, publishes offers, 6 * discovers mutual agreements, and serves replicated data via sync endpoints. 7 */ 8 9import { describe, it, expect, afterEach } from "vitest"; 10import { mkdtempSync, rmSync } from "node:fs"; 11import { tmpdir } from "node:os"; 12import { join, resolve } from "node:path"; 13import Database from "better-sqlite3"; 14 15import type { Config } from "./config.js"; 16import { startServer, type ServerHandle } from "./start.js"; 17import { 18 createTestRepo, 19 startMockPds, 20 type MockPds, 21} from "./replication/test-helpers.js"; 22import type { DidResolver, DidDocument } from "./did-resolver.js"; 23import { PolicyEngine } from "./policy/engine.js"; 24import { OFFER_NSID, didToRkey } from "./replication/types.js"; 25 26/** DID for Node A's user identity. */ 27const DID_NODE_A = "did:plc:node-a-user"; 28/** DID for Node B's user identity. */ 29const DID_NODE_B = "did:plc:node-b-user"; 30 31function baseConfig(dataDir: string): Config { 32 return { 33 PDS_HOSTNAME: "local.test", 34 AUTH_TOKEN: "test-auth-token", 35 JWT_SECRET: "test-jwt-secret", 36 PASSWORD_HASH: "$2a$10$test", 37 DATA_DIR: dataDir, 38 PORT: 0, 39 IPFS_ENABLED: true, 40 IPFS_NETWORKING: false, 41 REPLICATE_DIDS: [], 42 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 43 FIREHOSE_ENABLED: false, 44 RATE_LIMIT_ENABLED: false, 45 RATE_LIMIT_READ_PER_MIN: 300, 46 RATE_LIMIT_SYNC_PER_MIN: 30, 47 RATE_LIMIT_SESSION_PER_MIN: 10, 48 RATE_LIMIT_WRITE_PER_MIN: 200, 49 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 50 RATE_LIMIT_MAX_CONNECTIONS: 100, 51 RATE_LIMIT_FIREHOSE_PER_IP: 3, 52 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 53 }; 54} 55 56/** 57 * Enhanced mock PDS that serves configurable records per DID per collection. 58 * Supports adding offer records dynamically to simulate users publishing offers. 59 */ 60interface EnhancedMockPds extends MockPds { 61 addRecord(did: string, collection: string, rkey: string, value: unknown): void; 62} 63 64async function startEnhancedMockPds( 65 accounts: Array<{ did: string; carBytes: Uint8Array }>, 66): Promise<EnhancedMockPds> { 67 const { createServer } = await import("node:http"); 68 69 const accountMap = new Map<string, Uint8Array>(); 70 for (const a of accounts) { 71 accountMap.set(a.did, a.carBytes); 72 } 73 74 // Records store: did -> collection -> rkey -> { uri, cid, value } 75 const records = new Map<string, Map<string, Map<string, { uri: string; cid: string; value: unknown }>>>(); 76 77 const server = createServer((req, res) => { 78 const url = new URL(req.url ?? "/", "http://localhost"); 79 const pathname = url.pathname; 80 81 if (pathname === "/xrpc/com.atproto.sync.getRepo") { 82 const did = url.searchParams.get("did"); 83 if (!did || !accountMap.has(did)) { 84 res.writeHead(404, { "Content-Type": "application/json" }); 85 res.end(JSON.stringify({ error: "RepoNotFound" })); 86 return; 87 } 88 const carBytes = accountMap.get(did)!; 89 res.writeHead(200, { 90 "Content-Type": "application/vnd.ipld.car", 91 "Content-Length": String(carBytes.length), 92 }); 93 res.end(Buffer.from(carBytes)); 94 return; 95 } 96 97 if (pathname === "/xrpc/com.atproto.repo.listRecords") { 98 const did = url.searchParams.get("repo"); 99 const collection = url.searchParams.get("collection"); 100 if (!did || !collection) { 101 res.writeHead(400, { "Content-Type": "application/json" }); 102 res.end(JSON.stringify({ error: "InvalidRequest" })); 103 return; 104 } 105 const didRecords = records.get(did)?.get(collection); 106 const recordList = didRecords ? Array.from(didRecords.values()) : []; 107 res.writeHead(200, { "Content-Type": "application/json" }); 108 res.end(JSON.stringify({ records: recordList })); 109 return; 110 } 111 112 if (pathname === "/xrpc/com.atproto.repo.getRecord") { 113 const did = url.searchParams.get("repo"); 114 const collection = url.searchParams.get("collection"); 115 const rkey = url.searchParams.get("rkey"); 116 if (!did || !collection || !rkey) { 117 res.writeHead(400, { "Content-Type": "application/json" }); 118 res.end(JSON.stringify({ error: "InvalidRequest" })); 119 return; 120 } 121 const record = records.get(did)?.get(collection)?.get(rkey); 122 if (!record) { 123 res.writeHead(404, { "Content-Type": "application/json" }); 124 res.end(JSON.stringify({ error: "RecordNotFound" })); 125 return; 126 } 127 res.writeHead(200, { "Content-Type": "application/json" }); 128 res.end(JSON.stringify(record)); 129 return; 130 } 131 132 res.writeHead(404, { "Content-Type": "application/json" }); 133 res.end(JSON.stringify({ error: "NotFound" })); 134 }); 135 136 return new Promise((resolve) => { 137 server.listen(0, "127.0.0.1", () => { 138 const addr = server.address() as { port: number }; 139 const pdsUrl = `http://127.0.0.1:${addr.port}`; 140 resolve({ 141 url: pdsUrl, 142 port: addr.port, 143 close: () => new Promise<void>((res) => server.close(() => res())), 144 updateAccount: (did: string, carBytes: Uint8Array) => { 145 accountMap.set(did, carBytes); 146 }, 147 addRecord: (did: string, collection: string, rkey: string, value: unknown) => { 148 if (!records.has(did)) records.set(did, new Map()); 149 const didMap = records.get(did)!; 150 if (!didMap.has(collection)) didMap.set(collection, new Map()); 151 didMap.get(collection)!.set(rkey, { 152 uri: `at://${did}/${collection}/${rkey}`, 153 cid: "bafytest", 154 value, 155 }); 156 }, 157 }); 158 }); 159 }); 160} 161 162function createMockDidResolver(mapping: Record<string, string>): DidResolver { 163 return { 164 resolve: async (did: string): Promise<DidDocument | null> => { 165 const pdsUrl = mapping[did]; 166 if (!pdsUrl) return null; 167 return { 168 id: did, 169 service: [ 170 { 171 id: "#atproto_pds", 172 type: "AtprotoPersonalDataServer", 173 serviceEndpoint: pdsUrl, 174 }, 175 ], 176 } as unknown as DidDocument; 177 }, 178 } as DidResolver; 179} 180 181describe("Bidirectional replication E2E", () => { 182 let tmpDirA: string; 183 let tmpDirB: string; 184 let handleA: ServerHandle | undefined; 185 let handleB: ServerHandle | undefined; 186 let mockPds: EnhancedMockPds | undefined; 187 188 afterEach(async () => { 189 if (handleA) { await handleA.close(); handleA = undefined; } 190 if (handleB) { await handleB.close(); handleB = undefined; } 191 if (mockPds) { await mockPds.close(); mockPds = undefined; } 192 if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); 193 if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); 194 }); 195 196 it("two nodes sync each other's data and serve it via sync endpoints", async () => { 197 tmpDirA = mkdtempSync(join(tmpdir(), "bidir-a-")); 198 tmpDirB = mkdtempSync(join(tmpdir(), "bidir-b-")); 199 200 // Create test repos for each user's account 201 const aliceCar = await createTestRepo(DID_NODE_A, [ 202 { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post", createdAt: new Date().toISOString() } }, 203 { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, 204 ]); 205 const bobCar = await createTestRepo(DID_NODE_B, [ 206 { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post", createdAt: new Date().toISOString() } }, 207 ]); 208 209 // Mock PDS serves both accounts 210 mockPds = await startEnhancedMockPds([ 211 { did: DID_NODE_A, carBytes: aliceCar }, 212 { did: DID_NODE_B, carBytes: bobCar }, 213 ]); 214 const resolver = createMockDidResolver({ 215 [DID_NODE_A]: mockPds.url, 216 [DID_NODE_B]: mockPds.url, 217 }); 218 219 // Start two servers — each with their own identity 220 const configA = baseConfig(tmpDirA); 221 const configB = baseConfig(tmpDirB); 222 223 // Pre-set identities (simulating OAuth already done) 224 const dbA = new Database(resolve(tmpDirA, "pds.db")); 225 dbA.pragma("journal_mode = WAL"); 226 dbA.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); 227 dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_A, "alice.test"); 228 dbA.close(); 229 configA.DID = DID_NODE_A; 230 configA.HANDLE = "alice.test"; 231 232 const dbB = new Database(resolve(tmpDirB, "pds.db")); 233 dbB.pragma("journal_mode = WAL"); 234 dbB.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); 235 dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_B, "bob.test"); 236 dbB.close(); 237 configB.DID = DID_NODE_B; 238 configB.HANDLE = "bob.test"; 239 240 handleA = await startServer(configA, { didResolver: resolver }); 241 handleB = await startServer(configB, { didResolver: resolver }); 242 243 // Both nodes should have replication managers 244 expect(handleA.replicationManager).toBeDefined(); 245 expect(handleB.replicationManager).toBeDefined(); 246 247 // Node A adds Node B's DID, Node B adds Node A's DID 248 const addBToA = await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, { 249 method: "POST", 250 headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}`, "Content-Type": "application/json" }, 251 body: JSON.stringify({ did: DID_NODE_B }), 252 }); 253 expect(addBToA.status).toBe(200); 254 255 const addAToB = await fetch(`${handleB.url}/xrpc/org.p2pds.app.addDid`, { 256 method: "POST", 257 headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}`, "Content-Type": "application/json" }, 258 body: JSON.stringify({ did: DID_NODE_A }), 259 }); 260 expect(addAToB.status).toBe(200); 261 262 // Trigger sync on both 263 await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { 264 method: "POST", 265 headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, 266 }); 267 await fetch(`${handleB.url}/xrpc/org.p2pds.replication.syncNow`, { 268 method: "POST", 269 headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` }, 270 }); 271 272 // Wait for async sync 273 await new Promise((r) => setTimeout(r, 3000)); 274 275 // Verify Node A synced Bob's data 276 const statusA = await fetch( 277 `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_NODE_B}`, 278 { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` } }, 279 ); 280 const dsA = (await statusA.json()) as { did: string; blockCount: number; syncState: { status: string } }; 281 expect(dsA.did).toBe(DID_NODE_B); 282 expect(dsA.blockCount).toBeGreaterThan(0); 283 expect(dsA.syncState.status).toBe("synced"); 284 285 // Verify Node B synced Alice's data 286 const statusB = await fetch( 287 `${handleB.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_NODE_A}`, 288 { headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` } }, 289 ); 290 const dsB = (await statusB.json()) as { did: string; blockCount: number; syncState: { status: string } }; 291 expect(dsB.did).toBe(DID_NODE_A); 292 expect(dsB.blockCount).toBeGreaterThan(0); 293 expect(dsB.syncState.status).toBe("synced"); 294 295 // ---- Verify sync endpoints serve replicated data ---- 296 297 // Node A serves Bob's repo via getRepo 298 const getRepoA = await fetch( 299 `${handleA.url}/xrpc/com.atproto.sync.getRepo?did=${DID_NODE_B}`, 300 ); 301 expect(getRepoA.status).toBe(200); 302 expect(getRepoA.headers.get("content-type")).toBe("application/vnd.ipld.car"); 303 const carBytesA = new Uint8Array(await getRepoA.arrayBuffer()); 304 expect(carBytesA.length).toBeGreaterThan(0); 305 306 // Node B serves Alice's repo via getRepo 307 const getRepoB = await fetch( 308 `${handleB.url}/xrpc/com.atproto.sync.getRepo?did=${DID_NODE_A}`, 309 ); 310 expect(getRepoB.status).toBe(200); 311 const carBytesB = new Uint8Array(await getRepoB.arrayBuffer()); 312 expect(carBytesB.length).toBeGreaterThan(0); 313 314 // Node A serves Bob's repo via getRepoStatus 315 const repoStatusA = await fetch( 316 `${handleA.url}/xrpc/com.atproto.sync.getRepoStatus?did=${DID_NODE_B}`, 317 ); 318 expect(repoStatusA.status).toBe(200); 319 const rsA = (await repoStatusA.json()) as { did: string; active: boolean; rev: string | null }; 320 expect(rsA.did).toBe(DID_NODE_B); 321 expect(rsA.rev).toBeTruthy(); 322 323 // Node B lists all repos (should include Alice's) 324 const listReposB = await fetch( 325 `${handleB.url}/xrpc/com.atproto.sync.listRepos`, 326 ); 327 expect(listReposB.status).toBe(200); 328 const reposB = (await listReposB.json()) as { repos: Array<{ did: string }> }; 329 const replicatedDids = reposB.repos.map((r) => r.did); 330 expect(replicatedDids).toContain(DID_NODE_A); 331 332 // Node A can read Bob's records via repo.getRecord 333 const recordA = await fetch( 334 `${handleA.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_NODE_B}&collection=app.bsky.feed.post&rkey=b1`, 335 ); 336 expect(recordA.status).toBe(200); 337 const recA = (await recordA.json()) as { uri: string; value: { text: string } }; 338 expect(recA.value.text).toBe("Bob post"); 339 340 // Node B can read Alice's records via repo.getRecord 341 const recordB = await fetch( 342 `${handleB.url}/xrpc/com.atproto.repo.getRecord?repo=${DID_NODE_A}&collection=app.bsky.feed.post&rkey=a1`, 343 ); 344 expect(recordB.status).toBe(200); 345 const recB = (await recordB.json()) as { uri: string; value: { text: string } }; 346 expect(recB.value.text).toBe("Alice post"); 347 348 // Node B can list Alice's records 349 const listRecsB = await fetch( 350 `${handleB.url}/xrpc/com.atproto.repo.listRecords?repo=${DID_NODE_A}&collection=app.bsky.feed.post`, 351 ); 352 expect(listRecsB.status).toBe(200); 353 const recsB = (await listRecsB.json()) as { records: Array<{ value: { text: string } }> }; 354 expect(recsB.records.length).toBe(2); 355 356 // Node A can describe Bob's repo 357 const describeA = await fetch( 358 `${handleA.url}/xrpc/com.atproto.repo.describeRepo?repo=${DID_NODE_B}`, 359 ); 360 expect(describeA.status).toBe(200); 361 const descA = (await describeA.json()) as { did: string; collections: string[] }; 362 expect(descA.did).toBe(DID_NODE_B); 363 expect(descA.collections).toContain("app.bsky.feed.post"); 364 }, 30_000); 365 366 it("mutual offers create P2P replication policies", async () => { 367 tmpDirA = mkdtempSync(join(tmpdir(), "bidir-offer-a-")); 368 tmpDirB = mkdtempSync(join(tmpdir(), "bidir-offer-b-")); 369 370 // Create minimal repos 371 const aliceCar = await createTestRepo(DID_NODE_A, [ 372 { collection: "app.bsky.feed.post", rkey: "x1", record: { text: "test", createdAt: new Date().toISOString() } }, 373 ]); 374 const bobCar = await createTestRepo(DID_NODE_B, [ 375 { collection: "app.bsky.feed.post", rkey: "y1", record: { text: "test", createdAt: new Date().toISOString() } }, 376 ]); 377 378 // Mock PDS with offer records 379 mockPds = await startEnhancedMockPds([ 380 { did: DID_NODE_A, carBytes: aliceCar }, 381 { did: DID_NODE_B, carBytes: bobCar }, 382 ]); 383 384 // Simulate both users having published offers for each other 385 // Node A's user published an offer for Node B's DID 386 mockPds.addRecord(DID_NODE_A, OFFER_NSID, didToRkey(DID_NODE_B), { 387 $type: OFFER_NSID, 388 subject: DID_NODE_B, 389 minCopies: 2, 390 intervalSec: 300, 391 priority: 50, 392 createdAt: new Date().toISOString(), 393 }); 394 // Node B's user published an offer for Node A's DID 395 mockPds.addRecord(DID_NODE_B, OFFER_NSID, didToRkey(DID_NODE_A), { 396 $type: OFFER_NSID, 397 subject: DID_NODE_A, 398 minCopies: 3, 399 intervalSec: 600, 400 priority: 75, 401 createdAt: new Date().toISOString(), 402 }); 403 404 const resolver = createMockDidResolver({ 405 [DID_NODE_A]: mockPds.url, 406 [DID_NODE_B]: mockPds.url, 407 }); 408 409 // Create configs with policy engines and identities pre-set 410 const configA = baseConfig(tmpDirA); 411 configA.DID = DID_NODE_A; 412 configA.HANDLE = "alice.test"; 413 const dbA = new Database(resolve(tmpDirA, "pds.db")); 414 dbA.pragma("journal_mode = WAL"); 415 dbA.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); 416 dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_A, "alice.test"); 417 dbA.close(); 418 419 const configB = baseConfig(tmpDirB); 420 configB.DID = DID_NODE_B; 421 configB.HANDLE = "bob.test"; 422 const dbB = new Database(resolve(tmpDirB, "pds.db")); 423 dbB.pragma("journal_mode = WAL"); 424 dbB.exec("CREATE TABLE IF NOT EXISTS node_identity (did TEXT PRIMARY KEY, handle TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')))"); 425 dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(DID_NODE_B, "bob.test"); 426 dbB.close(); 427 428 // We need to inject PolicyEngine + a mock RecordWriter into the ReplicationManager 429 // The simplest approach: start the servers, then use the OfferManager directly 430 handleA = await startServer(configA, { didResolver: resolver }); 431 handleB = await startServer(configB, { didResolver: resolver }); 432 433 const rmA = handleA.replicationManager!; 434 const rmB = handleB.replicationManager!; 435 436 // Create mock RecordWriters that use the mock PDS records store 437 // (In production this would be the PdsClient via OAuth) 438 const mockWriterA = createMockRecordWriter(DID_NODE_A, mockPds); 439 const mockWriterB = createMockRecordWriter(DID_NODE_B, mockPds); 440 441 // Inject policy engine and setPdsClient 442 const peA = new PolicyEngine({ version: 1, policies: [] }); 443 const peB = new PolicyEngine({ version: 1, policies: [] }); 444 // Access private field to set policy engine — test-only hack 445 (rmA as unknown as { policyEngine: PolicyEngine }).policyEngine = peA; 446 (rmB as unknown as { policyEngine: PolicyEngine }).policyEngine = peB; 447 rmA.setPdsClient(mockWriterA, DID_NODE_A); 448 rmB.setPdsClient(mockWriterB, DID_NODE_B); 449 450 // Both nodes add each other's DID 451 await rmA.addDid(DID_NODE_B); 452 await rmB.addDid(DID_NODE_A); 453 454 // Wait for initial sync 455 await new Promise((r) => setTimeout(r, 3000)); 456 457 // Now run offer discovery on both 458 const offerManagerA = rmA.getOfferManager(); 459 const offerManagerB = rmB.getOfferManager(); 460 expect(offerManagerA).toBeDefined(); 461 expect(offerManagerB).toBeDefined(); 462 463 // Node A discovers agreements: it should find that Node B has an offer for Node A 464 const statesA = rmA.getSyncStates(); 465 const peersA = statesA.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 466 const agreementsA = await offerManagerA!.discoverAndSync(peersA); 467 468 // Node B discovers agreements similarly 469 const statesB = rmB.getSyncStates(); 470 const peersB = statesB.filter((s) => s.pdsEndpoint).map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 471 const agreementsB = await offerManagerB!.discoverAndSync(peersB); 472 473 // Both should detect one mutual agreement 474 expect(agreementsA.length).toBe(1); 475 expect(agreementsA[0]!.counterpartyDid).toBe(DID_NODE_B); 476 expect(agreementsB.length).toBe(1); 477 expect(agreementsB[0]!.counterpartyDid).toBe(DID_NODE_A); 478 479 // Verify effective params: max(minCopies), min(intervalSec), max(priority) 480 expect(agreementsA[0]!.effectiveParams.minCopies).toBe(3); // max(2, 3) 481 expect(agreementsA[0]!.effectiveParams.intervalSec).toBe(300); // min(300, 600) 482 expect(agreementsA[0]!.effectiveParams.priority).toBe(75); // max(50, 75) 483 484 // Verify P2P policies were created in the policy engine 485 const p2pPolicyA = peA.getPolicies().find((p) => p.id === `p2p:${DID_NODE_B}`); 486 expect(p2pPolicyA).toBeDefined(); 487 expect(p2pPolicyA!.replication.minCopies).toBe(3); 488 489 const p2pPolicyB = peB.getPolicies().find((p) => p.id === `p2p:${DID_NODE_A}`); 490 expect(p2pPolicyB).toBeDefined(); 491 }, 30_000); 492}); 493 494/** 495 * Create a mock RecordWriter backed by the enhanced mock PDS. 496 * This simulates what PdsClient does: read/write records to the user's PDS. 497 */ 498function createMockRecordWriter(did: string, pds: EnhancedMockPds) { 499 return { 500 putRecord: async (collection: string, rkey: string, record: unknown) => { 501 pds.addRecord(did, collection, rkey, record); 502 return { uri: `at://${did}/${collection}/${rkey}`, cid: "bafytest" }; 503 }, 504 deleteRecord: async (_collection: string, _rkey: string) => { 505 // No-op for test 506 }, 507 listRecords: async (collection: string, _opts: { limit: number }) => { 508 // Fetch from mock PDS 509 const res = await fetch( 510 `${pds.url}/xrpc/com.atproto.repo.listRecords?repo=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&limit=100`, 511 ); 512 if (!res.ok) return { records: [] }; 513 return (await res.json()) as { records: Array<{ uri: string; cid: string; value: unknown }> }; 514 }, 515 }; 516}