atproto user agency toolkit for individuals and groups
at main 379 lines 12 kB view raw
1/** 2 * End-to-end networking integration test. 3 * 4 * Proves two p2pds nodes can discover each other and exchange data 5 * over the network using libp2p/Helia bitswap. 6 * 7 * These tests create real Helia nodes with TCP networking on localhost, 8 * connect them directly, and verify block exchange via bitswap. 9 */ 10 11import { describe, it, expect, beforeEach, afterEach } from "vitest"; 12import { mkdtempSync, rmSync } from "node:fs"; 13import { tmpdir } from "node:os"; 14import { join } from "node:path"; 15import Database from "better-sqlite3"; 16import { CID } from "multiformats"; 17// @ts-ignore -- multiformats v9 subpath exports lack type declarations 18import * as raw from "multiformats/codecs/raw"; 19// @ts-ignore -- multiformats v9 subpath exports lack type declarations 20import { sha256 } from "multiformats/hashes/sha2"; 21import type { Helia } from "@helia/interface"; 22import { SqliteBlockstore } from "../sqlite-blockstore.js"; 23import { SqliteDatastore } from "../sqlite-datastore.js"; 24 25/** 26 * Create a CID from raw bytes using SHA-256. 27 */ 28async function cidFromBytes(bytes: Uint8Array): Promise<CID> { 29 const hash = await sha256.digest(bytes); 30 return CID.create(1, raw.code, hash); 31} 32 33/** 34 * Collect an async generator of Uint8Array chunks into a single Uint8Array. 35 * Helia/interface-blockstore's get() returns AsyncGenerator<Uint8Array>, 36 * not a plain Uint8Array. The chunks may be Node.js Buffers, so we 37 * normalize to a plain Uint8Array for consistent comparison. 38 */ 39async function collectBytes( 40 gen: AsyncIterable<Uint8Array>, 41): Promise<Uint8Array> { 42 const chunks: Uint8Array[] = []; 43 for await (const chunk of gen) { 44 chunks.push(chunk); 45 } 46 if (chunks.length === 0) return new Uint8Array(0); 47 // Always return a plain Uint8Array (not a Buffer subclass) 48 if (chunks.length === 1) { 49 const c = chunks[0]!; 50 return new Uint8Array(c.buffer, c.byteOffset, c.byteLength); 51 } 52 const total = chunks.reduce((acc, c) => acc + c.length, 0); 53 const result = new Uint8Array(total); 54 let offset = 0; 55 for (const c of chunks) { 56 result.set(c, offset); 57 offset += c.length; 58 } 59 return result; 60} 61 62/** 63 * Create a minimal Helia node with TCP-only networking on localhost. 64 * 65 * Strips out bootstrap peers, mDNS, delegated routing, autoNAT, autoTLS, 66 * uPnP, circuit relay, WebRTC, and WebSockets to avoid any external 67 * network dependencies. Nodes must be connected manually via dial(). 68 */ 69async function createTestHeliaNode( 70 db: Database.Database, 71): Promise<Helia> { 72 const { createHelia } = await import("helia"); 73 const { noise } = await import("@chainsafe/libp2p-noise"); 74 const { yamux } = await import("@chainsafe/libp2p-yamux"); 75 const { tcp } = await import("@libp2p/tcp"); 76 const { identify } = await import("@libp2p/identify"); 77 const { bitswap } = await import("@helia/block-brokers"); 78 const { libp2pRouting } = await import("@helia/routers"); 79 const { createLibp2p } = await import("libp2p"); 80 81 const blockstore = new SqliteBlockstore(db); 82 const datastore = new SqliteDatastore(db); 83 84 const libp2p = await createLibp2p({ 85 addresses: { 86 listen: ["/ip4/127.0.0.1/tcp/0"], 87 }, 88 transports: [tcp()], 89 connectionEncrypters: [noise()], 90 streamMuxers: [yamux()], 91 services: { 92 identify: identify(), 93 }, 94 // No peer discovery -- we connect manually 95 }); 96 97 const helia = await createHelia({ 98 libp2p, 99 blockstore: blockstore as any, 100 datastore: datastore as any, 101 blockBrokers: [bitswap()], 102 routers: [libp2pRouting(libp2p)], 103 }); 104 105 return helia; 106} 107 108/** 109 * Wait for a condition to become true, with a timeout. 110 */ 111async function waitFor( 112 fn: () => Promise<boolean> | boolean, 113 timeoutMs: number = 10_000, 114 intervalMs: number = 200, 115): Promise<void> { 116 const deadline = Date.now() + timeoutMs; 117 while (Date.now() < deadline) { 118 if (await fn()) return; 119 await new Promise((r) => setTimeout(r, intervalMs)); 120 } 121 throw new Error(`waitFor timed out after ${timeoutMs}ms`); 122} 123 124describe("E2E networking: two Helia nodes", () => { 125 let tmpDir: string; 126 let nodeA: Helia | null = null; 127 let nodeB: Helia | null = null; 128 let dbA: Database.Database | null = null; 129 let dbB: Database.Database | null = null; 130 131 beforeEach(() => { 132 tmpDir = mkdtempSync(join(tmpdir(), "e2e-networking-test-")); 133 }); 134 135 afterEach(async () => { 136 // Stop nodes in parallel for faster cleanup 137 const stops: Promise<void>[] = []; 138 if (nodeA) stops.push(nodeA.stop().catch(() => {})); 139 if (nodeB) stops.push(nodeB.stop().catch(() => {})); 140 await Promise.all(stops); 141 nodeA = null; 142 nodeB = null; 143 144 if (dbA) { dbA.close(); dbA = null; } 145 if (dbB) { dbB.close(); dbB = null; } 146 147 rmSync(tmpDir, { recursive: true, force: true }); 148 }); 149 150 it("nodes can connect and exchange blocks via bitswap", { timeout: 60_000 }, async () => { 151 // 1. Create two Helia nodes with real TCP networking on localhost 152 dbA = new Database(join(tmpDir, "node-a.db")); 153 dbB = new Database(join(tmpDir, "node-b.db")); 154 nodeA = await createTestHeliaNode(dbA); 155 nodeB = await createTestHeliaNode(dbB); 156 157 // 2. Verify both nodes are running and have addresses 158 const addrsA = nodeA.libp2p.getMultiaddrs(); 159 const addrsB = nodeB.libp2p.getMultiaddrs(); 160 expect(addrsA.length).toBeGreaterThan(0); 161 expect(addrsB.length).toBeGreaterThan(0); 162 163 const peerIdA = nodeA.libp2p.peerId.toString(); 164 const peerIdB = nodeB.libp2p.peerId.toString(); 165 expect(peerIdA).toBeTruthy(); 166 expect(peerIdB).toBeTruthy(); 167 expect(peerIdA).not.toBe(peerIdB); 168 169 // 3. Connect node B to node A 170 await nodeB.libp2p.dial(addrsA[0]!); 171 172 // Wait for the connection to be established in both directions 173 await waitFor( 174 () => 175 nodeA!.libp2p.getConnections().length > 0 && 176 nodeB!.libp2p.getConnections().length > 0, 177 5_000, 178 ); 179 180 expect(nodeA.libp2p.getConnections().length).toBeGreaterThan(0); 181 expect(nodeB.libp2p.getConnections().length).toBeGreaterThan(0); 182 183 // 4. Store blocks on node A 184 const testData = [ 185 new TextEncoder().encode("hello from node A"), 186 new TextEncoder().encode("second block of data"), 187 new TextEncoder().encode("third block for good measure"), 188 ]; 189 190 const cids: CID[] = []; 191 for (const bytes of testData) { 192 const cid = await cidFromBytes(bytes); 193 await nodeA.blockstore.put(cid, bytes); 194 cids.push(cid); 195 } 196 197 // Verify blocks are on node A 198 for (const cid of cids) { 199 expect(await nodeA.blockstore.has(cid)).toBe(true); 200 } 201 202 // Verify blocks are NOT on node B yet 203 for (const cid of cids) { 204 expect( 205 await nodeB.blockstore.has(cid, { offline: true } as any), 206 ).toBe(false); 207 } 208 209 // 5. Retrieve blocks on node B via bitswap (network fetch) 210 // blockstore.get() returns AsyncGenerator<Uint8Array> 211 for (let i = 0; i < cids.length; i++) { 212 const cid = cids[i]!; 213 const signal = AbortSignal.timeout(15_000); 214 const retrieved = await collectBytes( 215 nodeB.blockstore.get(cid, { signal }) as any, 216 ); 217 218 expect(retrieved).toBeDefined(); 219 expect(retrieved.length).toBe(testData[i]!.length); 220 expect(retrieved).toEqual(testData[i]!); 221 } 222 223 // 6. Verify blocks are now cached on node B 224 for (const cid of cids) { 225 expect(await nodeB.blockstore.has(cid)).toBe(true); 226 } 227 }); 228 229 it("IpfsService instances can be connected and peer info is correct", { timeout: 60_000 }, async () => { 230 // This test verifies that IpfsService with networking=true 231 // exposes correct peer identity and multiaddr information. 232 const { IpfsService } = await import("../ipfs.js"); 233 234 const svcDbA = new Database(join(tmpDir, "svc-a.db")); 235 const svcDbB = new Database(join(tmpDir, "svc-b.db")); 236 const serviceA = new IpfsService({ 237 db: svcDbA, 238 networking: true, 239 }); 240 const serviceB = new IpfsService({ 241 db: svcDbB, 242 networking: true, 243 }); 244 245 try { 246 await serviceA.start(); 247 await serviceB.start(); 248 249 // Verify peer IDs are present and distinct 250 const peerIdA = serviceA.getPeerId(); 251 const peerIdB = serviceB.getPeerId(); 252 expect(peerIdA).not.toBeNull(); 253 expect(peerIdB).not.toBeNull(); 254 expect(peerIdA).not.toBe(peerIdB); 255 256 // Verify multiaddrs are available 257 const addrsA = serviceA.getMultiaddrs(); 258 const addrsB = serviceB.getMultiaddrs(); 259 expect(addrsA.length).toBeGreaterThan(0); 260 expect(addrsB.length).toBeGreaterThan(0); 261 262 // Verify that IpfsService reports running 263 expect(serviceA.isRunning()).toBe(true); 264 expect(serviceB.isRunning()).toBe(true); 265 } finally { 266 if (serviceA.isRunning()) await serviceA.stop(); 267 if (serviceB.isRunning()) await serviceB.stop(); 268 svcDbA.close(); 269 svcDbB.close(); 270 } 271 }); 272 273 it("block stored on one node is retrievable from the other after connection", { timeout: 60_000 }, async () => { 274 // A focused test: one block, two nodes, verify bitswap fetch. 275 dbA = new Database(join(tmpDir, "single-a.db")); 276 dbB = new Database(join(tmpDir, "single-b.db")); 277 nodeA = await createTestHeliaNode(dbA); 278 nodeB = await createTestHeliaNode(dbB); 279 280 // Connect 281 const addrsA = nodeA.libp2p.getMultiaddrs(); 282 await nodeB.libp2p.dial(addrsA[0]!); 283 await waitFor( 284 () => nodeB!.libp2p.getConnections().length > 0, 285 5_000, 286 ); 287 288 // Store a single block on node A 289 const data = new TextEncoder().encode( 290 "single block e2e test payload", 291 ); 292 const cid = await cidFromBytes(data); 293 await nodeA.blockstore.put(cid, data); 294 295 // Fetch from node B (will use bitswap to get from node A) 296 const signal = AbortSignal.timeout(15_000); 297 const fetched = await collectBytes( 298 nodeB.blockstore.get(cid, { signal }) as any, 299 ); 300 301 expect(fetched).toEqual(data); 302 303 // Verify it is now cached locally on node B 304 const cachedLocally = await collectBytes( 305 nodeB.blockstore.get(cid, { offline: true }) as any, 306 ); 307 expect(cachedLocally).toEqual(data); 308 }); 309 310 it("nodes discover each other's peer IDs after connection", { timeout: 30_000 }, async () => { 311 dbA = new Database(join(tmpDir, "disc-a.db")); 312 dbB = new Database(join(tmpDir, "disc-b.db")); 313 nodeA = await createTestHeliaNode(dbA); 314 nodeB = await createTestHeliaNode(dbB); 315 316 const peerIdA = nodeA.libp2p.peerId; 317 const peerIdB = nodeB.libp2p.peerId; 318 319 // Before connection, neither knows the other 320 expect(nodeA.libp2p.getConnections(peerIdB)).toHaveLength(0); 321 expect(nodeB.libp2p.getConnections(peerIdA)).toHaveLength(0); 322 323 // Connect B -> A 324 const addrsA = nodeA.libp2p.getMultiaddrs(); 325 await nodeB.libp2p.dial(addrsA[0]!); 326 327 // After connection, both should see the connection 328 await waitFor( 329 () => nodeA!.libp2p.getConnections(peerIdB).length > 0, 330 5_000, 331 ); 332 333 expect( 334 nodeA.libp2p.getConnections(peerIdB).length, 335 ).toBeGreaterThan(0); 336 expect( 337 nodeB.libp2p.getConnections(peerIdA).length, 338 ).toBeGreaterThan(0); 339 }); 340 341 it("bidirectional block exchange works", { timeout: 60_000 }, async () => { 342 dbA = new Database(join(tmpDir, "bidir-a.db")); 343 dbB = new Database(join(tmpDir, "bidir-b.db")); 344 nodeA = await createTestHeliaNode(dbA); 345 nodeB = await createTestHeliaNode(dbB); 346 347 // Connect 348 await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); 349 await waitFor( 350 () => 351 nodeA!.libp2p.getConnections().length > 0 && 352 nodeB!.libp2p.getConnections().length > 0, 353 5_000, 354 ); 355 356 // Store block on A, different block on B 357 const dataA = new TextEncoder().encode("block from A"); 358 const dataB = new TextEncoder().encode("block from B"); 359 const cidA = await cidFromBytes(dataA); 360 const cidB = await cidFromBytes(dataB); 361 362 await nodeA.blockstore.put(cidA, dataA); 363 await nodeB.blockstore.put(cidB, dataB); 364 365 const signal = AbortSignal.timeout(15_000); 366 367 // B fetches A's block 368 const fetchedFromA = await collectBytes( 369 nodeB.blockstore.get(cidA, { signal }) as any, 370 ); 371 expect(fetchedFromA).toEqual(dataA); 372 373 // A fetches B's block 374 const fetchedFromB = await collectBytes( 375 nodeA.blockstore.get(cidB, { signal }) as any, 376 ); 377 expect(fetchedFromB).toEqual(dataB); 378 }); 379});