/** * Multi-node integration test: gossipsub + challenge-response + failover. * * Proves that gossipsub pub/sub and custom protocol streams (challenge-response) * coexist on the same libp2p instance pair with real networking, and that the * failover transport correctly resolves HTTP endpoints to multiaddrs via SyncStorage. */ import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import Database from "better-sqlite3"; import type { Helia } from "@helia/interface"; import type { Libp2p } from "@libp2p/interface"; import { readCarWithRoot } from "@atproto/repo"; import { IpfsService, commitTopic, type CommitNotification } from "../ipfs.js"; import type { BlockStore } from "../ipfs.js"; import { SqliteBlockstore } from "../sqlite-blockstore.js"; import { SqliteDatastore } from "../sqlite-datastore.js"; import { RepoManager } from "../repo-manager.js"; import type { Config } from "../config.js"; import { encode as cborEncode, decode as cborDecode } from "../cbor-compat.js"; import { generateChallenge } from "./challenge-response/challenge-generator.js"; import { respondToChallenge } from "./challenge-response/challenge-responder.js"; import { verifyResponse } from "./challenge-response/challenge-verifier.js"; import { Libp2pChallengeTransport } from "./challenge-response/libp2p-transport.js"; import { FailoverChallengeTransport } from "./challenge-response/failover-transport.js"; import type { ChallengeTransport } from "./challenge-response/transport.js"; import type { StorageChallenge, StorageChallengeResponse } from "./challenge-response/types.js"; import { SyncStorage } from "./sync-storage.js"; function testConfig(dataDir: string): Config { return { DID: "did:plc:test123", HANDLE: "test.example.com", PDS_HOSTNAME: "test.example.com", AUTH_TOKEN: "test-auth-token", SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", JWT_SECRET: "test-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: 3000, IPFS_ENABLED: true, IPFS_NETWORKING: false, REPLICATE_DIDS: [], FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, RATE_LIMIT_ENABLED: false, RATE_LIMIT_READ_PER_MIN: 300, RATE_LIMIT_SYNC_PER_MIN: 30, RATE_LIMIT_SESSION_PER_MIN: 10, RATE_LIMIT_WRITE_PER_MIN: 200, RATE_LIMIT_CHALLENGE_PER_MIN: 20, RATE_LIMIT_MAX_CONNECTIONS: 100, RATE_LIMIT_FIREHOSE_PER_IP: 3, OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", }; } /** * Create a minimal Helia node with TCP + gossipsub + identify. * Supports both gossipsub pub/sub and custom protocol streams. */ async function createGossipsubChallengeNode( db: Database.Database, ): Promise { const { createHelia } = await import("helia"); const { noise } = await import("@chainsafe/libp2p-noise"); const { yamux } = await import("@chainsafe/libp2p-yamux"); const { tcp } = await import("@libp2p/tcp"); const { identify } = await import("@libp2p/identify"); const { gossipsub } = await import("@libp2p/gossipsub"); const { createLibp2p } = await import("libp2p"); const blockstore = new SqliteBlockstore(db); const datastore = new SqliteDatastore(db); const libp2p = await createLibp2p({ addresses: { listen: ["/ip4/127.0.0.1/tcp/0"], }, transports: [tcp()], connectionEncrypters: [noise()], streamMuxers: [yamux()], services: { identify: identify(), pubsub: gossipsub({ emitSelf: false, allowPublishToZeroTopicPeers: true, }), }, }); return createHelia({ libp2p, blockstore: blockstore as any, datastore: datastore as any, }); } async function waitFor( fn: () => Promise | boolean, timeoutMs: number = 10_000, intervalMs: number = 200, ): Promise { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await fn()) return; await new Promise((r) => setTimeout(r, intervalMs)); } throw new Error(`waitFor timed out after ${timeoutMs}ms`); } /** * Wrap a Helia blockstore as the BlockStore interface used by challenge-response. */ function makeBlockStoreAdapter(helia: Helia): BlockStore { return { async putBlock(cidStr: string, bytes: Uint8Array) { const { CID } = await import("multiformats"); await helia.blockstore.put(CID.parse(cidStr), bytes); }, async getBlock(cidStr: string) { try { const { CID } = await import("multiformats"); const bytes = await helia.blockstore.get(CID.parse(cidStr), { offline: true } as any); const chunks: Uint8Array[] = []; for await (const chunk of bytes) { chunks.push(chunk); } if (chunks.length === 0) return null; if (chunks.length === 1) return chunks[0]!; const total = chunks.reduce((acc, c) => acc + c.length, 0); const result = new Uint8Array(total); let offset = 0; for (const c of chunks) { result.set(c, offset); offset += c.length; } return result; } catch { return null; } }, async hasBlock(cidStr: string) { try { const { CID } = await import("multiformats"); return await helia.blockstore.has(CID.parse(cidStr)); } catch { return false; } }, async putBlocks() {}, async deleteBlock() {}, }; } function getPubsub(node: Helia) { return (node.libp2p.services as Record).pubsub as { subscribe(topic: string): void; publish(topic: string, data: Uint8Array): Promise; addEventListener(event: string, handler: (evt: unknown) => void): void; }; } describe("E2E multi-node: gossipsub + challenge + failover", () => { let tmpDir: string; let db: InstanceType; let ipfsService: IpfsService; let repoManager: RepoManager; let nodeA: Helia | null = null; let nodeB: Helia | null = null; let nodeDbA: Database.Database | null = null; let nodeDbB: Database.Database | null = null; let transportA: Libp2pChallengeTransport | null = null; let transportB: Libp2pChallengeTransport | null = null; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "e2e-multi-node-test-")); const config = testConfig(tmpDir); db = new Database(join(tmpDir, "test.db")); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); repoManager = new RepoManager(db, config); repoManager.init(undefined, ipfsService, ipfsService); // Create 5 test records for (let i = 0; i < 5; i++) { await repoManager.createRecord( "app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: `E2E multi-node test post ${i}`, createdAt: new Date().toISOString(), }, ); } }); afterEach(async () => { const stops: Promise[] = []; if (transportA) stops.push(transportA.stop().catch(() => {})); if (transportB) stops.push(transportB.stop().catch(() => {})); if (nodeA) stops.push(nodeA.stop().catch(() => {})); if (nodeB) stops.push(nodeB.stop().catch(() => {})); await Promise.all(stops); transportA = null; transportB = null; nodeA = null; nodeB = null; if (nodeDbA) { nodeDbA.close(); nodeDbA = null; } if (nodeDbB) { nodeDbB.close(); nodeDbB = null; } if (ipfsService.isRunning()) { await ipfsService.stop(); } db.close(); rmSync(tmpDir, { recursive: true, force: true }); }); async function getRepoRootCid(): Promise { const carBytes = await repoManager.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await ipfsService.putBlocks(blocks); return root.toString(); } async function getRecordPaths(): Promise { const records = await repoManager.listRecords("app.bsky.feed.post", { limit: 100, }); return records.records.map((r) => { const rkey = r.uri.split("/").pop()!; return `app.bsky.feed.post/${rkey}`; }); } /** * Create two gossipsub+challenge nodes, connect them, copy repo blocks to Node A, * and create Libp2pChallengeTransport on both. */ async function setupConnectedNodes(): Promise { nodeDbA = new Database(join(tmpDir, "node-a.db")); nodeDbB = new Database(join(tmpDir, "node-b.db")); nodeA = await createGossipsubChallengeNode(nodeDbA); nodeB = await createGossipsubChallengeNode(nodeDbB); // Connect B -> A await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); await waitFor( () => nodeA!.libp2p.getConnections().length > 0 && nodeB!.libp2p.getConnections().length > 0, 5_000, ); // Copy all repo blocks to node A's blockstore const carBytes = await repoManager.getRepoCar(); const { blocks } = await readCarWithRoot(carBytes); const internalMap = ( blocks as unknown as { map: Map } ).map; if (internalMap) { const { CID } = await import("multiformats"); for (const [cidStr, bytes] of internalMap) { const cid = CID.parse(cidStr); await nodeA!.blockstore.put(cid, bytes); } } transportA = new Libp2pChallengeTransport(nodeA.libp2p as unknown as Libp2p); transportB = new Libp2pChallengeTransport(nodeB.libp2p as unknown as Libp2p); } it( "gossipsub notification + challenge roundtrip on same node pair", { timeout: 60_000 }, async () => { await setupConnectedNodes(); const rootCid = await getRepoRootCid(); const recordPaths = await getRecordPaths(); const testDid = "did:plc:test123"; const topic = commitTopic(testDid); // Register challenge handler on Node A const nodeABlockStore = makeBlockStoreAdapter(nodeA!); transportA!.onChallenge(async (challenge) => { return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); }); // Both nodes subscribe (needed for gossipsub mesh formation) const pubsubA = getPubsub(nodeA!); const pubsubB = getPubsub(nodeB!); pubsubA.subscribe(topic); pubsubB.subscribe(topic); // Node B collects received gossipsub notifications const received: CommitNotification[] = []; pubsubB.addEventListener("message", (evt: unknown) => { try { const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail; if (detail.topic === topic) { const notification = cborDecode(detail.data) as CommitNotification; received.push(notification); } } catch { // ignore decode errors } }); // Node A publishes CBOR notification (with retry loop for mesh formation) const notification: CommitNotification = { did: testDid, commit: rootCid, rev: "3jui7kd2xxxx2", time: new Date().toISOString(), peer: nodeA!.libp2p.peerId.toString(), }; const data = cborEncode(notification); await waitFor( async () => { if (received.length > 0) return true; await pubsubA.publish(topic, data).catch(() => {}); await new Promise((r) => setTimeout(r, 1000)); return received.length > 0; }, 30_000, 500, ); // Assert notification received with correct fields expect(received.length).toBe(1); expect(received[0]!.did).toBe(testDid); expect(received[0]!.commit).toBe(rootCid); expect(received[0]!.rev).toBe("3jui7kd2xxxx2"); expect(received[0]!.peer).toBe(nodeA!.libp2p.peerId.toString()); // Node B generates MST-proof challenge using the received commitCid const challenge = generateChallenge({ challengerDid: "did:plc:verifier", targetDid: "did:plc:prover", subjectDid: testDid, commitCid: received[0]!.commit, availableRecordPaths: recordPaths, challengeType: "mst-proof", epoch: 1, nonce: "e2e-multi-node-nonce", config: { recordCount: 2 }, }); // Node B sends challenge to Node A via libp2p const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); const response = await transportB!.sendChallenge(addrA, challenge); // Assert challenge response is valid expect(response.challengeId).toBe(challenge.id); expect(response.mstProofs).toBeDefined(); expect(response.mstProofs!.length).toBe(challenge.recordPaths.length); // Verify proof cryptographically const result = await verifyResponse(challenge, response, ipfsService); expect(result.passed).toBe(true); expect(result.mstResults).toBeDefined(); expect(result.mstResults!.every((r) => r.valid)).toBe(true); }, ); it( "failover transport resolves HTTP endpoint to multiaddr via SyncStorage", { timeout: 60_000 }, async () => { await setupConnectedNodes(); const rootCid = await getRepoRootCid(); const recordPaths = await getRecordPaths(); // Register challenge handler on Node A const nodeABlockStore = makeBlockStoreAdapter(nodeA!); transportA!.onChallenge(async (challenge) => { return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover"); }); // Create SyncStorage with Node A's multiaddr mapped to a fake HTTP endpoint const syncStorage = new SyncStorage(db); syncStorage.initSchema(); const fakeEndpoint = "https://pds-a.example.com"; const nodeAMultiaddr = nodeA!.libp2p.getMultiaddrs()[0]!.toString(); // Ensure multiaddr includes /p2p/ suffix with peer ID const nodeAPeerId = nodeA!.libp2p.peerId.toString(); const fullMultiaddr = nodeAMultiaddr.includes("/p2p/") ? nodeAMultiaddr : `${nodeAMultiaddr}/p2p/${nodeAPeerId}`; // Store the mapping: fake endpoint -> Node A's multiaddr syncStorage.upsertState({ did: "did:plc:nodeA", pdsEndpoint: fakeEndpoint, status: "synced", }); syncStorage.updatePeerInfo("did:plc:nodeA", nodeAPeerId, [fullMultiaddr]); // Sanity check: resolver returns a multiaddr with /p2p/ const resolved = syncStorage.getMultiaddrForPdsEndpoint(fakeEndpoint); expect(resolved).not.toBeNull(); expect(resolved!).toContain("/p2p/"); // Create mock HTTP transport that tracks calls and throws if invoked let httpCalls = 0; let fallbackCalled = false; const mockHttp: ChallengeTransport = { async sendChallenge() { httpCalls++; throw new Error("HTTP transport should not be called"); }, onChallenge() {}, }; // Create failover transport: libp2p primary, mock HTTP fallback const failover = new FailoverChallengeTransport( transportB!, mockHttp, { resolveEndpoint: (endpoint) => syncStorage.getMultiaddrForPdsEndpoint(endpoint), onFallback: () => { fallbackCalled = true; }, }, ); // Generate MST-proof challenge const challenge = generateChallenge({ challengerDid: "did:plc:verifier", targetDid: "did:plc:prover", subjectDid: "did:plc:test123", commitCid: rootCid, availableRecordPaths: recordPaths, challengeType: "mst-proof", epoch: 1, nonce: "e2e-failover-nonce", config: { recordCount: 2 }, }); // Send via failover transport using the HTTP endpoint const response = await failover.sendChallenge(fakeEndpoint, challenge); // Assert: response valid, HTTP not called, no fallback expect(response.challengeId).toBe(challenge.id); expect(response.mstProofs).toBeDefined(); expect(httpCalls).toBe(0); expect(fallbackCalled).toBe(false); // Verify proof cryptographically const result = await verifyResponse(challenge, response, ipfsService); expect(result.passed).toBe(true); }, ); });