atproto user agency toolkit for individuals and groups
at main 472 lines 15 kB view raw
1/** 2 * Multi-node integration test: gossipsub + challenge-response + failover. 3 * 4 * Proves that gossipsub pub/sub and custom protocol streams (challenge-response) 5 * coexist on the same libp2p instance pair with real networking, and that the 6 * failover transport correctly resolves HTTP endpoints to multiaddrs via SyncStorage. 7 */ 8 9import { describe, it, expect, beforeEach, afterEach } from "vitest"; 10import { mkdtempSync, rmSync } from "node:fs"; 11import { tmpdir } from "node:os"; 12import { join } from "node:path"; 13import Database from "better-sqlite3"; 14import type { Helia } from "@helia/interface"; 15import type { Libp2p } from "@libp2p/interface"; 16import { readCarWithRoot } from "@atproto/repo"; 17 18import { IpfsService, commitTopic, type CommitNotification } from "../ipfs.js"; 19import type { BlockStore } from "../ipfs.js"; 20import { SqliteBlockstore } from "../sqlite-blockstore.js"; 21import { SqliteDatastore } from "../sqlite-datastore.js"; 22import { RepoManager } from "../repo-manager.js"; 23import type { Config } from "../config.js"; 24import { encode as cborEncode, decode as cborDecode } from "../cbor-compat.js"; 25import { generateChallenge } from "./challenge-response/challenge-generator.js"; 26import { respondToChallenge } from "./challenge-response/challenge-responder.js"; 27import { verifyResponse } from "./challenge-response/challenge-verifier.js"; 28import { Libp2pChallengeTransport } from "./challenge-response/libp2p-transport.js"; 29import { FailoverChallengeTransport } from "./challenge-response/failover-transport.js"; 30import type { ChallengeTransport } from "./challenge-response/transport.js"; 31import type { StorageChallenge, StorageChallengeResponse } from "./challenge-response/types.js"; 32import { SyncStorage } from "./sync-storage.js"; 33 34function testConfig(dataDir: string): Config { 35 return { 36 DID: "did:plc:test123", 37 HANDLE: "test.example.com", 38 PDS_HOSTNAME: "test.example.com", 39 AUTH_TOKEN: "test-auth-token", 40 SIGNING_KEY: 41 "0000000000000000000000000000000000000000000000000000000000000001", 42 SIGNING_KEY_PUBLIC: 43 "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 44 JWT_SECRET: "test-jwt-secret", 45 PASSWORD_HASH: "$2a$10$test", 46 DATA_DIR: dataDir, 47 PORT: 3000, 48 IPFS_ENABLED: true, 49 IPFS_NETWORKING: false, 50 REPLICATE_DIDS: [], 51 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 52 FIREHOSE_ENABLED: false, 53 RATE_LIMIT_ENABLED: false, 54 RATE_LIMIT_READ_PER_MIN: 300, 55 RATE_LIMIT_SYNC_PER_MIN: 30, 56 RATE_LIMIT_SESSION_PER_MIN: 10, 57 RATE_LIMIT_WRITE_PER_MIN: 200, 58 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 59 RATE_LIMIT_MAX_CONNECTIONS: 100, 60 RATE_LIMIT_FIREHOSE_PER_IP: 3, 61 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 62 }; 63} 64 65/** 66 * Create a minimal Helia node with TCP + gossipsub + identify. 67 * Supports both gossipsub pub/sub and custom protocol streams. 68 */ 69async function createGossipsubChallengeNode( 70 db: Database.Database, 71): Promise<Helia> { 72 const { createHelia } = await import("helia"); 73 const { noise } = await import("@chainsafe/libp2p-noise"); 74 const { yamux } = await import("@chainsafe/libp2p-yamux"); 75 const { tcp } = await import("@libp2p/tcp"); 76 const { identify } = await import("@libp2p/identify"); 77 const { gossipsub } = await import("@libp2p/gossipsub"); 78 const { createLibp2p } = await import("libp2p"); 79 80 const blockstore = new SqliteBlockstore(db); 81 const datastore = new SqliteDatastore(db); 82 83 const libp2p = await createLibp2p({ 84 addresses: { 85 listen: ["/ip4/127.0.0.1/tcp/0"], 86 }, 87 transports: [tcp()], 88 connectionEncrypters: [noise()], 89 streamMuxers: [yamux()], 90 services: { 91 identify: identify(), 92 pubsub: gossipsub({ 93 emitSelf: false, 94 allowPublishToZeroTopicPeers: true, 95 }), 96 }, 97 }); 98 99 return createHelia({ 100 libp2p, 101 blockstore: blockstore as any, 102 datastore: datastore as any, 103 }); 104} 105 106async function waitFor( 107 fn: () => Promise<boolean> | boolean, 108 timeoutMs: number = 10_000, 109 intervalMs: number = 200, 110): Promise<void> { 111 const deadline = Date.now() + timeoutMs; 112 while (Date.now() < deadline) { 113 if (await fn()) return; 114 await new Promise((r) => setTimeout(r, intervalMs)); 115 } 116 throw new Error(`waitFor timed out after ${timeoutMs}ms`); 117} 118 119/** 120 * Wrap a Helia blockstore as the BlockStore interface used by challenge-response. 121 */ 122function makeBlockStoreAdapter(helia: Helia): BlockStore { 123 return { 124 async putBlock(cidStr: string, bytes: Uint8Array) { 125 const { CID } = await import("multiformats"); 126 await helia.blockstore.put(CID.parse(cidStr), bytes); 127 }, 128 async getBlock(cidStr: string) { 129 try { 130 const { CID } = await import("multiformats"); 131 const bytes = await helia.blockstore.get(CID.parse(cidStr), { offline: true } as any); 132 const chunks: Uint8Array[] = []; 133 for await (const chunk of bytes) { 134 chunks.push(chunk); 135 } 136 if (chunks.length === 0) return null; 137 if (chunks.length === 1) return chunks[0]!; 138 const total = chunks.reduce((acc, c) => acc + c.length, 0); 139 const result = new Uint8Array(total); 140 let offset = 0; 141 for (const c of chunks) { 142 result.set(c, offset); 143 offset += c.length; 144 } 145 return result; 146 } catch { 147 return null; 148 } 149 }, 150 async hasBlock(cidStr: string) { 151 try { 152 const { CID } = await import("multiformats"); 153 return await helia.blockstore.has(CID.parse(cidStr)); 154 } catch { 155 return false; 156 } 157 }, 158 async putBlocks() {}, 159 async deleteBlock() {}, 160 }; 161} 162 163function getPubsub(node: Helia) { 164 return (node.libp2p.services as Record<string, unknown>).pubsub as { 165 subscribe(topic: string): void; 166 publish(topic: string, data: Uint8Array): Promise<unknown>; 167 addEventListener(event: string, handler: (evt: unknown) => void): void; 168 }; 169} 170 171describe("E2E multi-node: gossipsub + challenge + failover", () => { 172 let tmpDir: string; 173 let db: InstanceType<typeof Database>; 174 let ipfsService: IpfsService; 175 let repoManager: RepoManager; 176 let nodeA: Helia | null = null; 177 let nodeB: Helia | null = null; 178 let nodeDbA: Database.Database | null = null; 179 let nodeDbB: Database.Database | null = null; 180 let transportA: Libp2pChallengeTransport | null = null; 181 let transportB: Libp2pChallengeTransport | null = null; 182 183 beforeEach(async () => { 184 tmpDir = mkdtempSync(join(tmpdir(), "e2e-multi-node-test-")); 185 const config = testConfig(tmpDir); 186 187 db = new Database(join(tmpDir, "test.db")); 188 ipfsService = new IpfsService({ 189 db, 190 networking: false, 191 }); 192 await ipfsService.start(); 193 repoManager = new RepoManager(db, config); 194 repoManager.init(undefined, ipfsService, ipfsService); 195 196 // Create 5 test records 197 for (let i = 0; i < 5; i++) { 198 await repoManager.createRecord( 199 "app.bsky.feed.post", 200 undefined, 201 { 202 $type: "app.bsky.feed.post", 203 text: `E2E multi-node test post ${i}`, 204 createdAt: new Date().toISOString(), 205 }, 206 ); 207 } 208 }); 209 210 afterEach(async () => { 211 const stops: Promise<void>[] = []; 212 if (transportA) stops.push(transportA.stop().catch(() => {})); 213 if (transportB) stops.push(transportB.stop().catch(() => {})); 214 if (nodeA) stops.push(nodeA.stop().catch(() => {})); 215 if (nodeB) stops.push(nodeB.stop().catch(() => {})); 216 await Promise.all(stops); 217 transportA = null; 218 transportB = null; 219 nodeA = null; 220 nodeB = null; 221 222 if (nodeDbA) { nodeDbA.close(); nodeDbA = null; } 223 if (nodeDbB) { nodeDbB.close(); nodeDbB = null; } 224 225 if (ipfsService.isRunning()) { 226 await ipfsService.stop(); 227 } 228 db.close(); 229 rmSync(tmpDir, { recursive: true, force: true }); 230 }); 231 232 async function getRepoRootCid(): Promise<string> { 233 const carBytes = await repoManager.getRepoCar(); 234 const { root, blocks } = await readCarWithRoot(carBytes); 235 await ipfsService.putBlocks(blocks); 236 return root.toString(); 237 } 238 239 async function getRecordPaths(): Promise<string[]> { 240 const records = await repoManager.listRecords("app.bsky.feed.post", { 241 limit: 100, 242 }); 243 return records.records.map((r) => { 244 const rkey = r.uri.split("/").pop()!; 245 return `app.bsky.feed.post/${rkey}`; 246 }); 247 } 248 249 /** 250 * Create two gossipsub+challenge nodes, connect them, copy repo blocks to Node A, 251 * and create Libp2pChallengeTransport on both. 252 */ 253 async function setupConnectedNodes(): Promise<void> { 254 nodeDbA = new Database(join(tmpDir, "node-a.db")); 255 nodeDbB = new Database(join(tmpDir, "node-b.db")); 256 nodeA = await createGossipsubChallengeNode(nodeDbA); 257 nodeB = await createGossipsubChallengeNode(nodeDbB); 258 259 // Connect B -> A 260 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); 261 await waitFor( 262 () => 263 nodeA!.libp2p.getConnections().length > 0 && 264 nodeB!.libp2p.getConnections().length > 0, 265 5_000, 266 ); 267 268 // Copy all repo blocks to node A's blockstore 269 const carBytes = await repoManager.getRepoCar(); 270 const { blocks } = await readCarWithRoot(carBytes); 271 const internalMap = ( 272 blocks as unknown as { map: Map<string, Uint8Array> } 273 ).map; 274 if (internalMap) { 275 const { CID } = await import("multiformats"); 276 for (const [cidStr, bytes] of internalMap) { 277 const cid = CID.parse(cidStr); 278 await nodeA!.blockstore.put(cid, bytes); 279 } 280 } 281 282 transportA = new Libp2pChallengeTransport(nodeA.libp2p as unknown as Libp2p); 283 transportB = new Libp2pChallengeTransport(nodeB.libp2p as unknown as Libp2p); 284 } 285 286 it( 287 "gossipsub notification + challenge roundtrip on same node pair", 288 { timeout: 60_000 }, 289 async () => { 290 await setupConnectedNodes(); 291 292 const rootCid = await getRepoRootCid(); 293 const recordPaths = await getRecordPaths(); 294 const testDid = "did:plc:test123"; 295 const topic = commitTopic(testDid); 296 297 // Register challenge handler on Node A 298 const nodeABlockStore = makeBlockStoreAdapter(nodeA!); 299 transportA!.onChallenge(async (challenge) => { 300 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 301 }); 302 303 // Both nodes subscribe (needed for gossipsub mesh formation) 304 const pubsubA = getPubsub(nodeA!); 305 const pubsubB = getPubsub(nodeB!); 306 pubsubA.subscribe(topic); 307 pubsubB.subscribe(topic); 308 309 // Node B collects received gossipsub notifications 310 const received: CommitNotification[] = []; 311 pubsubB.addEventListener("message", (evt: unknown) => { 312 try { 313 const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail; 314 if (detail.topic === topic) { 315 const notification = cborDecode(detail.data) as CommitNotification; 316 received.push(notification); 317 } 318 } catch { 319 // ignore decode errors 320 } 321 }); 322 323 // Node A publishes CBOR notification (with retry loop for mesh formation) 324 const notification: CommitNotification = { 325 did: testDid, 326 commit: rootCid, 327 rev: "3jui7kd2xxxx2", 328 time: new Date().toISOString(), 329 peer: nodeA!.libp2p.peerId.toString(), 330 }; 331 const data = cborEncode(notification); 332 333 await waitFor( 334 async () => { 335 if (received.length > 0) return true; 336 await pubsubA.publish(topic, data).catch(() => {}); 337 await new Promise((r) => setTimeout(r, 1000)); 338 return received.length > 0; 339 }, 340 30_000, 341 500, 342 ); 343 344 // Assert notification received with correct fields 345 expect(received.length).toBe(1); 346 expect(received[0]!.did).toBe(testDid); 347 expect(received[0]!.commit).toBe(rootCid); 348 expect(received[0]!.rev).toBe("3jui7kd2xxxx2"); 349 expect(received[0]!.peer).toBe(nodeA!.libp2p.peerId.toString()); 350 351 // Node B generates MST-proof challenge using the received commitCid 352 const challenge = generateChallenge({ 353 challengerDid: "did:plc:verifier", 354 targetDid: "did:plc:prover", 355 subjectDid: testDid, 356 commitCid: received[0]!.commit, 357 availableRecordPaths: recordPaths, 358 challengeType: "mst-proof", 359 epoch: 1, 360 nonce: "e2e-multi-node-nonce", 361 config: { recordCount: 2 }, 362 }); 363 364 // Node B sends challenge to Node A via libp2p 365 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 366 const response = await transportB!.sendChallenge(addrA, challenge); 367 368 // Assert challenge response is valid 369 expect(response.challengeId).toBe(challenge.id); 370 expect(response.mstProofs).toBeDefined(); 371 expect(response.mstProofs!.length).toBe(challenge.recordPaths.length); 372 373 // Verify proof cryptographically 374 const result = await verifyResponse(challenge, response, ipfsService); 375 expect(result.passed).toBe(true); 376 expect(result.mstResults).toBeDefined(); 377 expect(result.mstResults!.every((r) => r.valid)).toBe(true); 378 }, 379 ); 380 381 it( 382 "failover transport resolves HTTP endpoint to multiaddr via SyncStorage", 383 { timeout: 60_000 }, 384 async () => { 385 await setupConnectedNodes(); 386 387 const rootCid = await getRepoRootCid(); 388 const recordPaths = await getRecordPaths(); 389 390 // Register challenge handler on Node A 391 const nodeABlockStore = makeBlockStoreAdapter(nodeA!); 392 transportA!.onChallenge(async (challenge) => { 393 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); 394 }); 395 396 // Create SyncStorage with Node A's multiaddr mapped to a fake HTTP endpoint 397 const syncStorage = new SyncStorage(db); 398 syncStorage.initSchema(); 399 400 const fakeEndpoint = "https://pds-a.example.com"; 401 const nodeAMultiaddr = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); 402 // Ensure multiaddr includes /p2p/ suffix with peer ID 403 const nodeAPeerId = nodeA!.libp2p.peerId.toString(); 404 const fullMultiaddr = nodeAMultiaddr.includes("/p2p/") 405 ? nodeAMultiaddr 406 : `${nodeAMultiaddr}/p2p/${nodeAPeerId}`; 407 408 // Store the mapping: fake endpoint -> Node A's multiaddr 409 syncStorage.upsertState({ 410 did: "did:plc:nodeA", 411 pdsEndpoint: fakeEndpoint, 412 status: "synced", 413 }); 414 syncStorage.updatePeerInfo("did:plc:nodeA", nodeAPeerId, [fullMultiaddr]); 415 416 // Sanity check: resolver returns a multiaddr with /p2p/ 417 const resolved = syncStorage.getMultiaddrForPdsEndpoint(fakeEndpoint); 418 expect(resolved).not.toBeNull(); 419 expect(resolved!).toContain("/p2p/"); 420 421 // Create mock HTTP transport that tracks calls and throws if invoked 422 let httpCalls = 0; 423 let fallbackCalled = false; 424 const mockHttp: ChallengeTransport = { 425 async sendChallenge() { 426 httpCalls++; 427 throw new Error("HTTP transport should not be called"); 428 }, 429 onChallenge() {}, 430 }; 431 432 // Create failover transport: libp2p primary, mock HTTP fallback 433 const failover = new FailoverChallengeTransport( 434 transportB!, 435 mockHttp, 436 { 437 resolveEndpoint: (endpoint) => 438 syncStorage.getMultiaddrForPdsEndpoint(endpoint), 439 onFallback: () => { 440 fallbackCalled = true; 441 }, 442 }, 443 ); 444 445 // Generate MST-proof challenge 446 const challenge = generateChallenge({ 447 challengerDid: "did:plc:verifier", 448 targetDid: "did:plc:prover", 449 subjectDid: "did:plc:test123", 450 commitCid: rootCid, 451 availableRecordPaths: recordPaths, 452 challengeType: "mst-proof", 453 epoch: 1, 454 nonce: "e2e-failover-nonce", 455 config: { recordCount: 2 }, 456 }); 457 458 // Send via failover transport using the HTTP endpoint 459 const response = await failover.sendChallenge(fakeEndpoint, challenge); 460 461 // Assert: response valid, HTTP not called, no fallback 462 expect(response.challengeId).toBe(challenge.id); 463 expect(response.mstProofs).toBeDefined(); 464 expect(httpCalls).toBe(0); 465 expect(fallbackCalled).toBe(false); 466 467 // Verify proof cryptographically 468 const result = await verifyResponse(challenge, response, ipfsService); 469 expect(result.passed).toBe(true); 470 }, 471 ); 472});