atproto user agency toolkit for individuals and groups
1/**
2 * Multi-node integration test: gossipsub + challenge-response + failover.
3 *
4 * Proves that gossipsub pub/sub and custom protocol streams (challenge-response)
5 * coexist on the same libp2p instance pair with real networking, and that the
6 * failover transport correctly resolves HTTP endpoints to multiaddrs via SyncStorage.
7 */
8
9import { describe, it, expect, beforeEach, afterEach } from "vitest";
10import { mkdtempSync, rmSync } from "node:fs";
11import { tmpdir } from "node:os";
12import { join } from "node:path";
13import Database from "better-sqlite3";
14import type { Helia } from "@helia/interface";
15import type { Libp2p } from "@libp2p/interface";
16import { readCarWithRoot } from "@atproto/repo";
17
18import { IpfsService, commitTopic, type CommitNotification } from "../ipfs.js";
19import type { BlockStore } from "../ipfs.js";
20import { SqliteBlockstore } from "../sqlite-blockstore.js";
21import { SqliteDatastore } from "../sqlite-datastore.js";
22import { RepoManager } from "../repo-manager.js";
23import type { Config } from "../config.js";
24import { encode as cborEncode, decode as cborDecode } from "../cbor-compat.js";
25import { generateChallenge } from "./challenge-response/challenge-generator.js";
26import { respondToChallenge } from "./challenge-response/challenge-responder.js";
27import { verifyResponse } from "./challenge-response/challenge-verifier.js";
28import { Libp2pChallengeTransport } from "./challenge-response/libp2p-transport.js";
29import { FailoverChallengeTransport } from "./challenge-response/failover-transport.js";
30import type { ChallengeTransport } from "./challenge-response/transport.js";
31import type { StorageChallenge, StorageChallengeResponse } from "./challenge-response/types.js";
32import { SyncStorage } from "./sync-storage.js";
33
34function testConfig(dataDir: string): Config {
35 return {
36 DID: "did:plc:test123",
37 HANDLE: "test.example.com",
38 PDS_HOSTNAME: "test.example.com",
39 AUTH_TOKEN: "test-auth-token",
40 SIGNING_KEY:
41 "0000000000000000000000000000000000000000000000000000000000000001",
42 SIGNING_KEY_PUBLIC:
43 "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
44 JWT_SECRET: "test-jwt-secret",
45 PASSWORD_HASH: "$2a$10$test",
46 DATA_DIR: dataDir,
47 PORT: 3000,
48 IPFS_ENABLED: true,
49 IPFS_NETWORKING: false,
50 REPLICATE_DIDS: [],
51 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
52 FIREHOSE_ENABLED: false,
53 RATE_LIMIT_ENABLED: false,
54 RATE_LIMIT_READ_PER_MIN: 300,
55 RATE_LIMIT_SYNC_PER_MIN: 30,
56 RATE_LIMIT_SESSION_PER_MIN: 10,
57 RATE_LIMIT_WRITE_PER_MIN: 200,
58 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
59 RATE_LIMIT_MAX_CONNECTIONS: 100,
60 RATE_LIMIT_FIREHOSE_PER_IP: 3,
61 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
62 };
63}
64
65/**
66 * Create a minimal Helia node with TCP + gossipsub + identify.
67 * Supports both gossipsub pub/sub and custom protocol streams.
68 */
69async function createGossipsubChallengeNode(
70 db: Database.Database,
71): Promise<Helia> {
72 const { createHelia } = await import("helia");
73 const { noise } = await import("@chainsafe/libp2p-noise");
74 const { yamux } = await import("@chainsafe/libp2p-yamux");
75 const { tcp } = await import("@libp2p/tcp");
76 const { identify } = await import("@libp2p/identify");
77 const { gossipsub } = await import("@libp2p/gossipsub");
78 const { createLibp2p } = await import("libp2p");
79
80 const blockstore = new SqliteBlockstore(db);
81 const datastore = new SqliteDatastore(db);
82
83 const libp2p = await createLibp2p({
84 addresses: {
85 listen: ["/ip4/127.0.0.1/tcp/0"],
86 },
87 transports: [tcp()],
88 connectionEncrypters: [noise()],
89 streamMuxers: [yamux()],
90 services: {
91 identify: identify(),
92 pubsub: gossipsub({
93 emitSelf: false,
94 allowPublishToZeroTopicPeers: true,
95 }),
96 },
97 });
98
99 return createHelia({
100 libp2p,
101 blockstore: blockstore as any,
102 datastore: datastore as any,
103 });
104}
105
106async function waitFor(
107 fn: () => Promise<boolean> | boolean,
108 timeoutMs: number = 10_000,
109 intervalMs: number = 200,
110): Promise<void> {
111 const deadline = Date.now() + timeoutMs;
112 while (Date.now() < deadline) {
113 if (await fn()) return;
114 await new Promise((r) => setTimeout(r, intervalMs));
115 }
116 throw new Error(`waitFor timed out after ${timeoutMs}ms`);
117}
118
119/**
120 * Wrap a Helia blockstore as the BlockStore interface used by challenge-response.
121 */
122function makeBlockStoreAdapter(helia: Helia): BlockStore {
123 return {
124 async putBlock(cidStr: string, bytes: Uint8Array) {
125 const { CID } = await import("multiformats");
126 await helia.blockstore.put(CID.parse(cidStr), bytes);
127 },
128 async getBlock(cidStr: string) {
129 try {
130 const { CID } = await import("multiformats");
131 const bytes = await helia.blockstore.get(CID.parse(cidStr), { offline: true } as any);
132 const chunks: Uint8Array[] = [];
133 for await (const chunk of bytes) {
134 chunks.push(chunk);
135 }
136 if (chunks.length === 0) return null;
137 if (chunks.length === 1) return chunks[0]!;
138 const total = chunks.reduce((acc, c) => acc + c.length, 0);
139 const result = new Uint8Array(total);
140 let offset = 0;
141 for (const c of chunks) {
142 result.set(c, offset);
143 offset += c.length;
144 }
145 return result;
146 } catch {
147 return null;
148 }
149 },
150 async hasBlock(cidStr: string) {
151 try {
152 const { CID } = await import("multiformats");
153 return await helia.blockstore.has(CID.parse(cidStr));
154 } catch {
155 return false;
156 }
157 },
158 async putBlocks() {},
159 async deleteBlock() {},
160 };
161}
162
163function getPubsub(node: Helia) {
164 return (node.libp2p.services as Record<string, unknown>).pubsub as {
165 subscribe(topic: string): void;
166 publish(topic: string, data: Uint8Array): Promise<unknown>;
167 addEventListener(event: string, handler: (evt: unknown) => void): void;
168 };
169}
170
171describe("E2E multi-node: gossipsub + challenge + failover", () => {
172 let tmpDir: string;
173 let db: InstanceType<typeof Database>;
174 let ipfsService: IpfsService;
175 let repoManager: RepoManager;
176 let nodeA: Helia | null = null;
177 let nodeB: Helia | null = null;
178 let nodeDbA: Database.Database | null = null;
179 let nodeDbB: Database.Database | null = null;
180 let transportA: Libp2pChallengeTransport | null = null;
181 let transportB: Libp2pChallengeTransport | null = null;
182
183 beforeEach(async () => {
184 tmpDir = mkdtempSync(join(tmpdir(), "e2e-multi-node-test-"));
185 const config = testConfig(tmpDir);
186
187 db = new Database(join(tmpDir, "test.db"));
188 ipfsService = new IpfsService({
189 db,
190 networking: false,
191 });
192 await ipfsService.start();
193 repoManager = new RepoManager(db, config);
194 repoManager.init(undefined, ipfsService, ipfsService);
195
196 // Create 5 test records
197 for (let i = 0; i < 5; i++) {
198 await repoManager.createRecord(
199 "app.bsky.feed.post",
200 undefined,
201 {
202 $type: "app.bsky.feed.post",
203 text: `E2E multi-node test post ${i}`,
204 createdAt: new Date().toISOString(),
205 },
206 );
207 }
208 });
209
210 afterEach(async () => {
211 const stops: Promise<void>[] = [];
212 if (transportA) stops.push(transportA.stop().catch(() => {}));
213 if (transportB) stops.push(transportB.stop().catch(() => {}));
214 if (nodeA) stops.push(nodeA.stop().catch(() => {}));
215 if (nodeB) stops.push(nodeB.stop().catch(() => {}));
216 await Promise.all(stops);
217 transportA = null;
218 transportB = null;
219 nodeA = null;
220 nodeB = null;
221
222 if (nodeDbA) { nodeDbA.close(); nodeDbA = null; }
223 if (nodeDbB) { nodeDbB.close(); nodeDbB = null; }
224
225 if (ipfsService.isRunning()) {
226 await ipfsService.stop();
227 }
228 db.close();
229 rmSync(tmpDir, { recursive: true, force: true });
230 });
231
232 async function getRepoRootCid(): Promise<string> {
233 const carBytes = await repoManager.getRepoCar();
234 const { root, blocks } = await readCarWithRoot(carBytes);
235 await ipfsService.putBlocks(blocks);
236 return root.toString();
237 }
238
239 async function getRecordPaths(): Promise<string[]> {
240 const records = await repoManager.listRecords("app.bsky.feed.post", {
241 limit: 100,
242 });
243 return records.records.map((r) => {
244 const rkey = r.uri.split("/").pop()!;
245 return `app.bsky.feed.post/${rkey}`;
246 });
247 }
248
249 /**
250 * Create two gossipsub+challenge nodes, connect them, copy repo blocks to Node A,
251 * and create Libp2pChallengeTransport on both.
252 */
253 async function setupConnectedNodes(): Promise<void> {
254 nodeDbA = new Database(join(tmpDir, "node-a.db"));
255 nodeDbB = new Database(join(tmpDir, "node-b.db"));
256 nodeA = await createGossipsubChallengeNode(nodeDbA);
257 nodeB = await createGossipsubChallengeNode(nodeDbB);
258
259 // Connect B -> A
260 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!);
261 await waitFor(
262 () =>
263 nodeA!.libp2p.getConnections().length > 0 &&
264 nodeB!.libp2p.getConnections().length > 0,
265 5_000,
266 );
267
268 // Copy all repo blocks to node A's blockstore
269 const carBytes = await repoManager.getRepoCar();
270 const { blocks } = await readCarWithRoot(carBytes);
271 const internalMap = (
272 blocks as unknown as { map: Map<string, Uint8Array> }
273 ).map;
274 if (internalMap) {
275 const { CID } = await import("multiformats");
276 for (const [cidStr, bytes] of internalMap) {
277 const cid = CID.parse(cidStr);
278 await nodeA!.blockstore.put(cid, bytes);
279 }
280 }
281
282 transportA = new Libp2pChallengeTransport(nodeA.libp2p as unknown as Libp2p);
283 transportB = new Libp2pChallengeTransport(nodeB.libp2p as unknown as Libp2p);
284 }
285
286 it(
287 "gossipsub notification + challenge roundtrip on same node pair",
288 { timeout: 60_000 },
289 async () => {
290 await setupConnectedNodes();
291
292 const rootCid = await getRepoRootCid();
293 const recordPaths = await getRecordPaths();
294 const testDid = "did:plc:test123";
295 const topic = commitTopic(testDid);
296
297 // Register challenge handler on Node A
298 const nodeABlockStore = makeBlockStoreAdapter(nodeA!);
299 transportA!.onChallenge(async (challenge) => {
300 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover");
301 });
302
303 // Both nodes subscribe (needed for gossipsub mesh formation)
304 const pubsubA = getPubsub(nodeA!);
305 const pubsubB = getPubsub(nodeB!);
306 pubsubA.subscribe(topic);
307 pubsubB.subscribe(topic);
308
309 // Node B collects received gossipsub notifications
310 const received: CommitNotification[] = [];
311 pubsubB.addEventListener("message", (evt: unknown) => {
312 try {
313 const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail;
314 if (detail.topic === topic) {
315 const notification = cborDecode(detail.data) as CommitNotification;
316 received.push(notification);
317 }
318 } catch {
319 // ignore decode errors
320 }
321 });
322
323 // Node A publishes CBOR notification (with retry loop for mesh formation)
324 const notification: CommitNotification = {
325 did: testDid,
326 commit: rootCid,
327 rev: "3jui7kd2xxxx2",
328 time: new Date().toISOString(),
329 peer: nodeA!.libp2p.peerId.toString(),
330 };
331 const data = cborEncode(notification);
332
333 await waitFor(
334 async () => {
335 if (received.length > 0) return true;
336 await pubsubA.publish(topic, data).catch(() => {});
337 await new Promise((r) => setTimeout(r, 1000));
338 return received.length > 0;
339 },
340 30_000,
341 500,
342 );
343
344 // Assert notification received with correct fields
345 expect(received.length).toBe(1);
346 expect(received[0]!.did).toBe(testDid);
347 expect(received[0]!.commit).toBe(rootCid);
348 expect(received[0]!.rev).toBe("3jui7kd2xxxx2");
349 expect(received[0]!.peer).toBe(nodeA!.libp2p.peerId.toString());
350
351 // Node B generates MST-proof challenge using the received commitCid
352 const challenge = generateChallenge({
353 challengerDid: "did:plc:verifier",
354 targetDid: "did:plc:prover",
355 subjectDid: testDid,
356 commitCid: received[0]!.commit,
357 availableRecordPaths: recordPaths,
358 challengeType: "mst-proof",
359 epoch: 1,
360 nonce: "e2e-multi-node-nonce",
361 config: { recordCount: 2 },
362 });
363
364 // Node B sends challenge to Node A via libp2p
365 const addrA = nodeA!.libp2p.getMultiaddrs()[0]!.toString();
366 const response = await transportB!.sendChallenge(addrA, challenge);
367
368 // Assert challenge response is valid
369 expect(response.challengeId).toBe(challenge.id);
370 expect(response.mstProofs).toBeDefined();
371 expect(response.mstProofs!.length).toBe(challenge.recordPaths.length);
372
373 // Verify proof cryptographically
374 const result = await verifyResponse(challenge, response, ipfsService);
375 expect(result.passed).toBe(true);
376 expect(result.mstResults).toBeDefined();
377 expect(result.mstResults!.every((r) => r.valid)).toBe(true);
378 },
379 );
380
381 it(
382 "failover transport resolves HTTP endpoint to multiaddr via SyncStorage",
383 { timeout: 60_000 },
384 async () => {
385 await setupConnectedNodes();
386
387 const rootCid = await getRepoRootCid();
388 const recordPaths = await getRecordPaths();
389
390 // Register challenge handler on Node A
391 const nodeABlockStore = makeBlockStoreAdapter(nodeA!);
392 transportA!.onChallenge(async (challenge) => {
393 return respondToChallenge(challenge, nodeABlockStore, "did:plc:prover");
394 });
395
396 // Create SyncStorage with Node A's multiaddr mapped to a fake HTTP endpoint
397 const syncStorage = new SyncStorage(db);
398 syncStorage.initSchema();
399
400 const fakeEndpoint = "https://pds-a.example.com";
401 const nodeAMultiaddr = nodeA!.libp2p.getMultiaddrs()[0]!.toString();
402 // Ensure multiaddr includes /p2p/ suffix with peer ID
403 const nodeAPeerId = nodeA!.libp2p.peerId.toString();
404 const fullMultiaddr = nodeAMultiaddr.includes("/p2p/")
405 ? nodeAMultiaddr
406 : `${nodeAMultiaddr}/p2p/${nodeAPeerId}`;
407
408 // Store the mapping: fake endpoint -> Node A's multiaddr
409 syncStorage.upsertState({
410 did: "did:plc:nodeA",
411 pdsEndpoint: fakeEndpoint,
412 status: "synced",
413 });
414 syncStorage.updatePeerInfo("did:plc:nodeA", nodeAPeerId, [fullMultiaddr]);
415
416 // Sanity check: resolver returns a multiaddr with /p2p/
417 const resolved = syncStorage.getMultiaddrForPdsEndpoint(fakeEndpoint);
418 expect(resolved).not.toBeNull();
419 expect(resolved!).toContain("/p2p/");
420
421 // Create mock HTTP transport that tracks calls and throws if invoked
422 let httpCalls = 0;
423 let fallbackCalled = false;
424 const mockHttp: ChallengeTransport = {
425 async sendChallenge() {
426 httpCalls++;
427 throw new Error("HTTP transport should not be called");
428 },
429 onChallenge() {},
430 };
431
432 // Create failover transport: libp2p primary, mock HTTP fallback
433 const failover = new FailoverChallengeTransport(
434 transportB!,
435 mockHttp,
436 {
437 resolveEndpoint: (endpoint) =>
438 syncStorage.getMultiaddrForPdsEndpoint(endpoint),
439 onFallback: () => {
440 fallbackCalled = true;
441 },
442 },
443 );
444
445 // Generate MST-proof challenge
446 const challenge = generateChallenge({
447 challengerDid: "did:plc:verifier",
448 targetDid: "did:plc:prover",
449 subjectDid: "did:plc:test123",
450 commitCid: rootCid,
451 availableRecordPaths: recordPaths,
452 challengeType: "mst-proof",
453 epoch: 1,
454 nonce: "e2e-failover-nonce",
455 config: { recordCount: 2 },
456 });
457
458 // Send via failover transport using the HTTP endpoint
459 const response = await failover.sendChallenge(fakeEndpoint, challenge);
460
461 // Assert: response valid, HTTP not called, no fallback
462 expect(response.challengeId).toBe(challenge.id);
463 expect(response.mstProofs).toBeDefined();
464 expect(httpCalls).toBe(0);
465 expect(fallbackCalled).toBe(false);
466
467 // Verify proof cryptographically
468 const result = await verifyResponse(challenge, response, ipfsService);
469 expect(result.passed).toBe(true);
470 },
471 );
472});