atproto user agency toolkit for individuals and groups
1/**
2 * End-to-end networking integration test.
3 *
4 * Proves two p2pds nodes can discover each other and exchange data
5 * over the network using libp2p/Helia bitswap.
6 *
7 * These tests create real Helia nodes with TCP networking on localhost,
8 * connect them directly, and verify block exchange via bitswap.
9 */
10
11import { describe, it, expect, beforeEach, afterEach } from "vitest";
12import { mkdtempSync, rmSync } from "node:fs";
13import { tmpdir } from "node:os";
14import { join } from "node:path";
15import Database from "better-sqlite3";
16import { CID } from "multiformats";
17// @ts-ignore -- multiformats v9 subpath exports lack type declarations
18import * as raw from "multiformats/codecs/raw";
19// @ts-ignore -- multiformats v9 subpath exports lack type declarations
20import { sha256 } from "multiformats/hashes/sha2";
21import type { Helia } from "@helia/interface";
22import { SqliteBlockstore } from "../sqlite-blockstore.js";
23import { SqliteDatastore } from "../sqlite-datastore.js";
24
25/**
26 * Create a CID from raw bytes using SHA-256.
27 */
28async function cidFromBytes(bytes: Uint8Array): Promise<CID> {
29 const hash = await sha256.digest(bytes);
30 return CID.create(1, raw.code, hash);
31}
32
33/**
34 * Collect an async generator of Uint8Array chunks into a single Uint8Array.
35 * Helia/interface-blockstore's get() returns AsyncGenerator<Uint8Array>,
36 * not a plain Uint8Array. The chunks may be Node.js Buffers, so we
37 * normalize to a plain Uint8Array for consistent comparison.
38 */
39async function collectBytes(
40 gen: AsyncIterable<Uint8Array>,
41): Promise<Uint8Array> {
42 const chunks: Uint8Array[] = [];
43 for await (const chunk of gen) {
44 chunks.push(chunk);
45 }
46 if (chunks.length === 0) return new Uint8Array(0);
47 // Always return a plain Uint8Array (not a Buffer subclass)
48 if (chunks.length === 1) {
49 const c = chunks[0]!;
50 return new Uint8Array(c.buffer, c.byteOffset, c.byteLength);
51 }
52 const total = chunks.reduce((acc, c) => acc + c.length, 0);
53 const result = new Uint8Array(total);
54 let offset = 0;
55 for (const c of chunks) {
56 result.set(c, offset);
57 offset += c.length;
58 }
59 return result;
60}
61
62/**
63 * Create a minimal Helia node with TCP-only networking on localhost.
64 *
65 * Strips out bootstrap peers, mDNS, delegated routing, autoNAT, autoTLS,
66 * uPnP, circuit relay, WebRTC, and WebSockets to avoid any external
67 * network dependencies. Nodes must be connected manually via dial().
68 */
69async function createTestHeliaNode(
70 db: Database.Database,
71): Promise<Helia> {
72 const { createHelia } = await import("helia");
73 const { noise } = await import("@chainsafe/libp2p-noise");
74 const { yamux } = await import("@chainsafe/libp2p-yamux");
75 const { tcp } = await import("@libp2p/tcp");
76 const { identify } = await import("@libp2p/identify");
77 const { bitswap } = await import("@helia/block-brokers");
78 const { libp2pRouting } = await import("@helia/routers");
79 const { createLibp2p } = await import("libp2p");
80
81 const blockstore = new SqliteBlockstore(db);
82 const datastore = new SqliteDatastore(db);
83
84 const libp2p = await createLibp2p({
85 addresses: {
86 listen: ["/ip4/127.0.0.1/tcp/0"],
87 },
88 transports: [tcp()],
89 connectionEncrypters: [noise()],
90 streamMuxers: [yamux()],
91 services: {
92 identify: identify(),
93 },
94 // No peer discovery -- we connect manually
95 });
96
97 const helia = await createHelia({
98 libp2p,
99 blockstore: blockstore as any,
100 datastore: datastore as any,
101 blockBrokers: [bitswap()],
102 routers: [libp2pRouting(libp2p)],
103 });
104
105 return helia;
106}
107
108/**
109 * Wait for a condition to become true, with a timeout.
110 */
111async function waitFor(
112 fn: () => Promise<boolean> | boolean,
113 timeoutMs: number = 10_000,
114 intervalMs: number = 200,
115): Promise<void> {
116 const deadline = Date.now() + timeoutMs;
117 while (Date.now() < deadline) {
118 if (await fn()) return;
119 await new Promise((r) => setTimeout(r, intervalMs));
120 }
121 throw new Error(`waitFor timed out after ${timeoutMs}ms`);
122}
123
124describe("E2E networking: two Helia nodes", () => {
125 let tmpDir: string;
126 let nodeA: Helia | null = null;
127 let nodeB: Helia | null = null;
128 let dbA: Database.Database | null = null;
129 let dbB: Database.Database | null = null;
130
131 beforeEach(() => {
132 tmpDir = mkdtempSync(join(tmpdir(), "e2e-networking-test-"));
133 });
134
135 afterEach(async () => {
136 // Stop nodes in parallel for faster cleanup
137 const stops: Promise<void>[] = [];
138 if (nodeA) stops.push(nodeA.stop().catch(() => {}));
139 if (nodeB) stops.push(nodeB.stop().catch(() => {}));
140 await Promise.all(stops);
141 nodeA = null;
142 nodeB = null;
143
144 if (dbA) { dbA.close(); dbA = null; }
145 if (dbB) { dbB.close(); dbB = null; }
146
147 rmSync(tmpDir, { recursive: true, force: true });
148 });
149
150 it("nodes can connect and exchange blocks via bitswap", { timeout: 60_000 }, async () => {
151 // 1. Create two Helia nodes with real TCP networking on localhost
152 dbA = new Database(join(tmpDir, "node-a.db"));
153 dbB = new Database(join(tmpDir, "node-b.db"));
154 nodeA = await createTestHeliaNode(dbA);
155 nodeB = await createTestHeliaNode(dbB);
156
157 // 2. Verify both nodes are running and have addresses
158 const addrsA = nodeA.libp2p.getMultiaddrs();
159 const addrsB = nodeB.libp2p.getMultiaddrs();
160 expect(addrsA.length).toBeGreaterThan(0);
161 expect(addrsB.length).toBeGreaterThan(0);
162
163 const peerIdA = nodeA.libp2p.peerId.toString();
164 const peerIdB = nodeB.libp2p.peerId.toString();
165 expect(peerIdA).toBeTruthy();
166 expect(peerIdB).toBeTruthy();
167 expect(peerIdA).not.toBe(peerIdB);
168
169 // 3. Connect node B to node A
170 await nodeB.libp2p.dial(addrsA[0]!);
171
172 // Wait for the connection to be established in both directions
173 await waitFor(
174 () =>
175 nodeA!.libp2p.getConnections().length > 0 &&
176 nodeB!.libp2p.getConnections().length > 0,
177 5_000,
178 );
179
180 expect(nodeA.libp2p.getConnections().length).toBeGreaterThan(0);
181 expect(nodeB.libp2p.getConnections().length).toBeGreaterThan(0);
182
183 // 4. Store blocks on node A
184 const testData = [
185 new TextEncoder().encode("hello from node A"),
186 new TextEncoder().encode("second block of data"),
187 new TextEncoder().encode("third block for good measure"),
188 ];
189
190 const cids: CID[] = [];
191 for (const bytes of testData) {
192 const cid = await cidFromBytes(bytes);
193 await nodeA.blockstore.put(cid, bytes);
194 cids.push(cid);
195 }
196
197 // Verify blocks are on node A
198 for (const cid of cids) {
199 expect(await nodeA.blockstore.has(cid)).toBe(true);
200 }
201
202 // Verify blocks are NOT on node B yet
203 for (const cid of cids) {
204 expect(
205 await nodeB.blockstore.has(cid, { offline: true } as any),
206 ).toBe(false);
207 }
208
209 // 5. Retrieve blocks on node B via bitswap (network fetch)
210 // blockstore.get() returns AsyncGenerator<Uint8Array>
211 for (let i = 0; i < cids.length; i++) {
212 const cid = cids[i]!;
213 const signal = AbortSignal.timeout(15_000);
214 const retrieved = await collectBytes(
215 nodeB.blockstore.get(cid, { signal }) as any,
216 );
217
218 expect(retrieved).toBeDefined();
219 expect(retrieved.length).toBe(testData[i]!.length);
220 expect(retrieved).toEqual(testData[i]!);
221 }
222
223 // 6. Verify blocks are now cached on node B
224 for (const cid of cids) {
225 expect(await nodeB.blockstore.has(cid)).toBe(true);
226 }
227 });
228
229 it("IpfsService instances can be connected and peer info is correct", { timeout: 60_000 }, async () => {
230 // This test verifies that IpfsService with networking=true
231 // exposes correct peer identity and multiaddr information.
232 const { IpfsService } = await import("../ipfs.js");
233
234 const svcDbA = new Database(join(tmpDir, "svc-a.db"));
235 const svcDbB = new Database(join(tmpDir, "svc-b.db"));
236 const serviceA = new IpfsService({
237 db: svcDbA,
238 networking: true,
239 });
240 const serviceB = new IpfsService({
241 db: svcDbB,
242 networking: true,
243 });
244
245 try {
246 await serviceA.start();
247 await serviceB.start();
248
249 // Verify peer IDs are present and distinct
250 const peerIdA = serviceA.getPeerId();
251 const peerIdB = serviceB.getPeerId();
252 expect(peerIdA).not.toBeNull();
253 expect(peerIdB).not.toBeNull();
254 expect(peerIdA).not.toBe(peerIdB);
255
256 // Verify multiaddrs are available
257 const addrsA = serviceA.getMultiaddrs();
258 const addrsB = serviceB.getMultiaddrs();
259 expect(addrsA.length).toBeGreaterThan(0);
260 expect(addrsB.length).toBeGreaterThan(0);
261
262 // Verify that IpfsService reports running
263 expect(serviceA.isRunning()).toBe(true);
264 expect(serviceB.isRunning()).toBe(true);
265 } finally {
266 if (serviceA.isRunning()) await serviceA.stop();
267 if (serviceB.isRunning()) await serviceB.stop();
268 svcDbA.close();
269 svcDbB.close();
270 }
271 });
272
273 it("block stored on one node is retrievable from the other after connection", { timeout: 60_000 }, async () => {
274 // A focused test: one block, two nodes, verify bitswap fetch.
275 dbA = new Database(join(tmpDir, "single-a.db"));
276 dbB = new Database(join(tmpDir, "single-b.db"));
277 nodeA = await createTestHeliaNode(dbA);
278 nodeB = await createTestHeliaNode(dbB);
279
280 // Connect
281 const addrsA = nodeA.libp2p.getMultiaddrs();
282 await nodeB.libp2p.dial(addrsA[0]!);
283 await waitFor(
284 () => nodeB!.libp2p.getConnections().length > 0,
285 5_000,
286 );
287
288 // Store a single block on node A
289 const data = new TextEncoder().encode(
290 "single block e2e test payload",
291 );
292 const cid = await cidFromBytes(data);
293 await nodeA.blockstore.put(cid, data);
294
295 // Fetch from node B (will use bitswap to get from node A)
296 const signal = AbortSignal.timeout(15_000);
297 const fetched = await collectBytes(
298 nodeB.blockstore.get(cid, { signal }) as any,
299 );
300
301 expect(fetched).toEqual(data);
302
303 // Verify it is now cached locally on node B
304 const cachedLocally = await collectBytes(
305 nodeB.blockstore.get(cid, { offline: true }) as any,
306 );
307 expect(cachedLocally).toEqual(data);
308 });
309
310 it("nodes discover each other's peer IDs after connection", { timeout: 30_000 }, async () => {
311 dbA = new Database(join(tmpDir, "disc-a.db"));
312 dbB = new Database(join(tmpDir, "disc-b.db"));
313 nodeA = await createTestHeliaNode(dbA);
314 nodeB = await createTestHeliaNode(dbB);
315
316 const peerIdA = nodeA.libp2p.peerId;
317 const peerIdB = nodeB.libp2p.peerId;
318
319 // Before connection, neither knows the other
320 expect(nodeA.libp2p.getConnections(peerIdB)).toHaveLength(0);
321 expect(nodeB.libp2p.getConnections(peerIdA)).toHaveLength(0);
322
323 // Connect B -> A
324 const addrsA = nodeA.libp2p.getMultiaddrs();
325 await nodeB.libp2p.dial(addrsA[0]!);
326
327 // After connection, both should see the connection
328 await waitFor(
329 () => nodeA!.libp2p.getConnections(peerIdB).length > 0,
330 5_000,
331 );
332
333 expect(
334 nodeA.libp2p.getConnections(peerIdB).length,
335 ).toBeGreaterThan(0);
336 expect(
337 nodeB.libp2p.getConnections(peerIdA).length,
338 ).toBeGreaterThan(0);
339 });
340
341 it("bidirectional block exchange works", { timeout: 60_000 }, async () => {
342 dbA = new Database(join(tmpDir, "bidir-a.db"));
343 dbB = new Database(join(tmpDir, "bidir-b.db"));
344 nodeA = await createTestHeliaNode(dbA);
345 nodeB = await createTestHeliaNode(dbB);
346
347 // Connect
348 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!);
349 await waitFor(
350 () =>
351 nodeA!.libp2p.getConnections().length > 0 &&
352 nodeB!.libp2p.getConnections().length > 0,
353 5_000,
354 );
355
356 // Store block on A, different block on B
357 const dataA = new TextEncoder().encode("block from A");
358 const dataB = new TextEncoder().encode("block from B");
359 const cidA = await cidFromBytes(dataA);
360 const cidB = await cidFromBytes(dataB);
361
362 await nodeA.blockstore.put(cidA, dataA);
363 await nodeB.blockstore.put(cidB, dataB);
364
365 const signal = AbortSignal.timeout(15_000);
366
367 // B fetches A's block
368 const fetchedFromA = await collectBytes(
369 nodeB.blockstore.get(cidA, { signal }) as any,
370 );
371 expect(fetchedFromA).toEqual(dataA);
372
373 // A fetches B's block
374 const fetchedFromB = await collectBytes(
375 nodeA.blockstore.get(cidB, { signal }) as any,
376 );
377 expect(fetchedFromB).toEqual(dataB);
378 });
379});