atproto user agency toolkit for individuals and groups
at main 680 lines 21 kB view raw
1import { CID } from "multiformats"; 2import type { Helia } from "@helia/interface"; 3import type { BlockMap } from "@atproto/repo"; 4import type Database from "better-sqlite3"; 5import { encode as cborEncode, decode as cborDecode } from "./cbor-compat.js"; 6import { SqliteBlockstore } from "./sqlite-blockstore.js"; 7import { SqliteDatastore } from "./sqlite-datastore.js"; 8import type { RateLimiter } from "./rate-limiter.js"; 9import { DEFAULT_RATE_LIMIT_CONFIG } from "./rate-limiter.js"; 10 11/** Maximum gossipsub message size (8 KB). Messages larger than this are dropped before decoding. */ 12const MAX_GOSSIPSUB_MESSAGE_SIZE = 8192; 13 14/** 15 * Pure storage: put, get, has blocks by CID string. 16 * No networking, no peer identity — just content-addressed bytes. 17 */ 18export interface BlockStore { 19 putBlock(cidStr: string, bytes: Uint8Array): Promise<void>; 20 getBlock(cidStr: string): Promise<Uint8Array | null>; 21 hasBlock(cidStr: string): Promise<boolean>; 22 putBlocks(blocks: BlockMap): Promise<void>; 23 deleteBlock(cidStr: string): Promise<void>; 24} 25 26/** 27 * Lightweight gossipsub notification for a repo commit. 28 * Contains only metadata — actual blocks are fetched via existing sync mechanisms. 29 */ 30export interface CommitNotification { 31 did: string; 32 commit: string; 33 rev: string; 34 time: string; 35 peer: string; 36} 37 38export type CommitNotificationHandler = (notification: CommitNotification) => void | Promise<void>; 39 40export const COMMIT_TOPIC_PREFIX = "/p2pds/commits/1/"; 41 42export function commitTopic(did: string): string { 43 return `${COMMIT_TOPIC_PREFIX}${did}`; 44} 45 46/** 47 * Lightweight gossipsub notification for a peer identity change. 48 * Published when a peer's multiaddrs or PeerID changes. 49 */ 50export interface IdentityNotification { 51 did: string; 52 peerId: string; 53 multiaddrs: string[]; 54 time: string; 55} 56 57export type IdentityNotificationHandler = (notification: IdentityNotification) => void | Promise<void>; 58 59export const IDENTITY_TOPIC_PREFIX = "/p2pds/identity/1/"; 60 61export function identityTopic(did: string): string { 62 return `${IDENTITY_TOPIC_PREFIX}${did}`; 63} 64 65/** 66 * P2P networking: content routing, peer identity, connectivity, gossipsub. 67 * Separated from storage so transports can be swapped independently. 68 */ 69export interface NetworkService { 70 provideBlocks(cidStrs: string[]): Promise<void>; 71 publishCommitNotification( 72 did: string, 73 commitCid: string, 74 rev: string, 75 ): Promise<void>; 76 onCommitNotification(handler: CommitNotificationHandler): void; 77 subscribeCommitTopics(dids: string[]): void; 78 unsubscribeCommitTopics(dids: string[]): void; 79 getPeerId(): string | null; 80 getMultiaddrs(): string[]; 81 getConnectionCount(): number; 82 getRemoteAddrs(peerId: string): string[]; 83 publishIdentityNotification(did: string, peerId: string, multiaddrs: string[]): Promise<void>; 84 onIdentityNotification(handler: IdentityNotificationHandler): void; 85 subscribeIdentityTopics(dids: string[]): void; 86 unsubscribeIdentityTopics(dids: string[]): void; 87 provideForDid(did: string): Promise<void>; 88 findProvidersForDid(did: string): Promise<string[]>; 89} 90 91export interface IpfsConfig { 92 db: Database.Database; 93 networking: boolean; 94} 95 96/** 97 * IpfsService encapsulates all Helia/IPFS functionality. 98 * 99 * Implements both BlockStore (storage) and NetworkService (P2P networking). 100 * When networking is enabled, a full Helia node is created (libp2p + bitswap + DHT + gossipsub). 101 * When networking is disabled, only a local FsBlockstore is used (for testing). 102 * 103 * All methods no-op gracefully if the service hasn't started yet. 104 * String-based CID interface at the boundary — decouples from @atproto's CID class. 105 */ 106export class IpfsService implements BlockStore, NetworkService { 107 private helia: Helia | null = null; 108 private blockstore: SqliteBlockstore | null = null; 109 private config: IpfsConfig; 110 private running = false; 111 private commitHandlers: CommitNotificationHandler[] = []; 112 private identityHandlers: IdentityNotificationHandler[] = []; 113 private subscribedTopics: Set<string> = new Set(); 114 private rateLimiter: RateLimiter | null = null; 115 116 constructor(config: IpfsConfig) { 117 this.config = config; 118 } 119 120 /** 121 * Set a rate limiter for gossipsub message rate limiting. 122 * Called from server.ts after construction. 123 */ 124 setRateLimiter(limiter: RateLimiter): void { 125 this.rateLimiter = limiter; 126 } 127 128 async start(): Promise<void> { 129 this.blockstore = new SqliteBlockstore(this.config.db); 130 131 if (this.config.networking) { 132 const { createHelia } = await import("helia"); 133 const { tcp } = await import("@libp2p/tcp"); 134 const { noise } = await import("@chainsafe/libp2p-noise"); 135 const { yamux } = await import("@chainsafe/libp2p-yamux"); 136 const { identify } = await import("@libp2p/identify"); 137 const { autoNAT } = await import("@libp2p/autonat"); 138 const { kadDHT, removePrivateAddressesMapper } = await import("@libp2p/kad-dht"); 139 const { bootstrap } = await import("@libp2p/bootstrap"); 140 const { ping } = await import("@libp2p/ping"); 141 const datastore = new SqliteDatastore(this.config.db); 142 143 // libp2p config: direct peer connections + lightweight diagnostics. 144 // - Amino DHT in client mode: queries the public IPFS DHT for content 145 // routing and PLC log discovery, but doesn't serve DHT queries. 146 // - AutoNAT: asks connected peers to dial us back, confirming reachability. 147 // - Bootstrap: connects to standard IPFS bootstrap peers on startup. 148 // Cast SQLite stores to Helia's expected interfaces. 149 // Our implementations are duck-type compatible but use Promise 150 // returns instead of AwaitGenerator, which Helia handles fine at runtime. 151 this.helia = await createHelia({ 152 libp2p: { 153 addresses: { listen: ["/ip4/0.0.0.0/tcp/0"] }, 154 transports: [tcp()], 155 connectionEncrypters: [noise()], 156 streamMuxers: [yamux()], 157 services: { 158 identify: identify(), 159 ping: ping(), 160 autoNAT: autoNAT(), 161 aminoDHT: kadDHT({ 162 protocol: "/ipfs/kad/1.0.0", 163 peerInfoMapper: removePrivateAddressesMapper, 164 clientMode: true, 165 }), 166 }, 167 peerDiscovery: [ 168 bootstrap({ 169 list: [ 170 "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", 171 "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", 172 ], 173 }), 174 ], 175 connectionManager: { 176 maxConnections: 10, 177 maxIncomingPendingConnections: 3, 178 inboundConnectionThreshold: 2, 179 }, 180 }, 181 blockstore: this.blockstore as any, 182 datastore: datastore as any, 183 }); 184 } 185 186 this.running = true; 187 } 188 189 async stop(): Promise<void> { 190 if (this.helia) { 191 await this.helia.stop(); 192 this.helia = null; 193 } 194 this.blockstore = null; 195 this.running = false; 196 this.subscribedTopics.clear(); 197 } 198 199 async putBlock(cidStr: string, bytes: Uint8Array): Promise<void> { 200 if (!this.blockstore) return; 201 const cid = CID.parse(cidStr); 202 if (this.helia) { 203 await this.helia.blockstore.put(cid, bytes); 204 } else { 205 await this.blockstore.put(cid, bytes); 206 } 207 } 208 209 async putBlocks(blocks: BlockMap): Promise<void> { 210 if (!this.blockstore) return; 211 const internalMap = ( 212 blocks as unknown as { map: Map<string, Uint8Array> } 213 ).map; 214 if (!internalMap) return; 215 for (const [cidStr, bytes] of internalMap) { 216 await this.putBlock(cidStr, bytes); 217 } 218 } 219 220 async getBlock(cidStr: string): Promise<Uint8Array | null> { 221 if (!this.blockstore) return null; 222 try { 223 const cid = CID.parse(cidStr); 224 // Both Helia's blockstore and SqliteBlockstore return AsyncGenerator<Uint8Array> 225 const gen = this.helia 226 ? this.helia.blockstore.get(cid, { offline: true }) 227 : this.blockstore.get(cid); 228 const chunks: Uint8Array[] = []; 229 for await (const chunk of gen) { 230 chunks.push(chunk); 231 } 232 if (chunks.length === 0) return null; 233 if (chunks.length === 1) return chunks[0]!; 234 const total = chunks.reduce((acc, c) => acc + c.length, 0); 235 const result = new Uint8Array(total); 236 let offset = 0; 237 for (const c of chunks) { 238 result.set(c, offset); 239 offset += c.length; 240 } 241 return result; 242 } catch { 243 return null; 244 } 245 } 246 247 async hasBlock(cidStr: string): Promise<boolean> { 248 if (!this.blockstore) return false; 249 try { 250 const cid = CID.parse(cidStr); 251 if (this.helia) { 252 return await this.helia.blockstore.has(cid); 253 } 254 return await this.blockstore.has(cid); 255 } catch { 256 return false; 257 } 258 } 259 260 async deleteBlock(cidStr: string): Promise<void> { 261 if (!this.blockstore) return; 262 try { 263 const cid = CID.parse(cidStr); 264 if (this.helia) { 265 await this.helia.blockstore.delete(cid); 266 } else { 267 await this.blockstore.delete(cid); 268 } 269 } catch { 270 // No-op if block doesn't exist 271 } 272 } 273 274 /** 275 * Announce blocks to the DHT. Fire-and-forget, non-blocking. 276 * Only works when networking is enabled. 277 */ 278 async provideBlocks(cidStrs: string[]): Promise<void> { 279 if (!this.helia) return; 280 for (const cidStr of cidStrs) { 281 try { 282 const cid = CID.parse(cidStr); 283 await this.helia.routing.provide(cid); 284 } catch { 285 // fire-and-forget: log but don't throw 286 } 287 } 288 } 289 290 /** 291 * Publish a commit notification via gossipsub. 292 * CBOR-encodes { did, commit, rev, time, peer } and publishes to the DID's topic. 293 * No-op when networking is disabled. 294 */ 295 async publishCommitNotification( 296 did: string, 297 commitCid: string, 298 rev: string, 299 ): Promise<void> { 300 if (!this.helia) return; 301 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 302 { publish(topic: string, data: Uint8Array): Promise<unknown> } | undefined; 303 if (!pubsub) return; 304 305 const notification: CommitNotification = { 306 did, 307 commit: commitCid, 308 rev, 309 time: new Date().toISOString(), 310 peer: this.helia.libp2p.peerId.toString(), 311 }; 312 313 const data = cborEncode(notification); 314 await pubsub.publish(commitTopic(did), data); 315 } 316 317 /** 318 * Register a handler for incoming commit notifications. 319 * Multiple handlers can be registered; all are called for each notification. 320 */ 321 onCommitNotification(handler: CommitNotificationHandler): void { 322 this.commitHandlers.push(handler); 323 } 324 325 /** 326 * Subscribe to gossipsub topics for the given DIDs. 327 * No-op when networking is disabled. 328 */ 329 subscribeCommitTopics(dids: string[]): void { 330 if (!this.helia) return; 331 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 332 { subscribe(topic: string): void } | undefined; 333 if (!pubsub) return; 334 335 for (const did of dids) { 336 const topic = commitTopic(did); 337 if (!this.subscribedTopics.has(topic)) { 338 pubsub.subscribe(topic); 339 this.subscribedTopics.add(topic); 340 } 341 } 342 } 343 344 /** 345 * Unsubscribe from gossipsub topics for the given DIDs. 346 * No-op when networking is disabled. 347 */ 348 unsubscribeCommitTopics(dids: string[]): void { 349 if (!this.helia) return; 350 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 351 { unsubscribe(topic: string): void } | undefined; 352 if (!pubsub) return; 353 354 for (const did of dids) { 355 const topic = commitTopic(did); 356 if (this.subscribedTopics.has(topic)) { 357 pubsub.unsubscribe(topic); 358 this.subscribedTopics.delete(topic); 359 } 360 } 361 } 362 363 /** 364 * Publish an identity notification via gossipsub. 365 * CBOR-encodes { did, peerId, multiaddrs, time } and publishes to the DID's identity topic. 366 */ 367 async publishIdentityNotification( 368 did: string, 369 peerId: string, 370 multiaddrs: string[], 371 ): Promise<void> { 372 if (!this.helia) return; 373 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 374 { publish(topic: string, data: Uint8Array): Promise<unknown> } | undefined; 375 if (!pubsub) return; 376 377 const notification: IdentityNotification = { 378 did, 379 peerId, 380 multiaddrs, 381 time: new Date().toISOString(), 382 }; 383 384 const data = cborEncode(notification); 385 await pubsub.publish(identityTopic(did), data); 386 } 387 388 /** 389 * Register a handler for incoming identity notifications. 390 */ 391 onIdentityNotification(handler: IdentityNotificationHandler): void { 392 this.identityHandlers.push(handler); 393 } 394 395 /** 396 * Subscribe to identity gossipsub topics for the given DIDs. 397 */ 398 subscribeIdentityTopics(dids: string[]): void { 399 if (!this.helia) return; 400 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 401 { subscribe(topic: string): void } | undefined; 402 if (!pubsub) return; 403 404 for (const did of dids) { 405 const topic = identityTopic(did); 406 if (!this.subscribedTopics.has(topic)) { 407 pubsub.subscribe(topic); 408 this.subscribedTopics.add(topic); 409 } 410 } 411 } 412 413 /** 414 * Unsubscribe from identity gossipsub topics for the given DIDs. 415 */ 416 unsubscribeIdentityTopics(dids: string[]): void { 417 if (!this.helia) return; 418 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 419 { unsubscribe(topic: string): void } | undefined; 420 if (!pubsub) return; 421 422 for (const did of dids) { 423 const topic = identityTopic(did); 424 if (this.subscribedTopics.has(topic)) { 425 pubsub.unsubscribe(topic); 426 this.subscribedTopics.delete(topic); 427 } 428 } 429 } 430 431 /** 432 * Set up the gossipsub message handler. 433 * Listens for "message" events, CBOR-decodes, and dispatches to all registered handlers. 434 */ 435 private setupGossipsubHandler(): void { 436 if (!this.helia) return; 437 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as 438 { addEventListener(event: string, handler: (evt: unknown) => void): void } | undefined; 439 if (!pubsub) return; 440 441 pubsub.addEventListener("message", (evt: unknown) => { 442 try { 443 const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail; 444 445 // Drop oversized messages before decoding 446 if (detail.data.length > MAX_GOSSIPSUB_MESSAGE_SIZE) { 447 return; 448 } 449 450 // Per-topic rate limiting via the shared RateLimiter 451 if (this.rateLimiter) { 452 const isCommit = detail.topic.startsWith(COMMIT_TOPIC_PREFIX); 453 const isIdentity = detail.topic.startsWith(IDENTITY_TOPIC_PREFIX); 454 if (isCommit || isIdentity) { 455 const rule = isCommit 456 ? DEFAULT_RATE_LIMIT_CONFIG.gossipsubCommit 457 : DEFAULT_RATE_LIMIT_CONFIG.gossipsubIdentity; 458 const result = this.rateLimiter.check("gossipsub", detail.topic, rule); 459 if (!result.allowed) return; // silently drop 460 } 461 } 462 463 if (detail.topic.startsWith(COMMIT_TOPIC_PREFIX)) { 464 const notification = cborDecode(detail.data) as CommitNotification; 465 if ( 466 typeof notification.did !== "string" || 467 typeof notification.commit !== "string" || 468 typeof notification.rev !== "string" 469 ) { 470 return; 471 } 472 473 for (const handler of this.commitHandlers) { 474 try { 475 const result = handler(notification); 476 if (result && typeof (result as Promise<void>).catch === "function") { 477 (result as Promise<void>).catch(() => {}); 478 } 479 } catch { 480 // Individual handler errors don't affect other handlers 481 } 482 } 483 } else if (detail.topic.startsWith(IDENTITY_TOPIC_PREFIX)) { 484 const notification = cborDecode(detail.data) as IdentityNotification; 485 if ( 486 typeof notification.did !== "string" || 487 typeof notification.peerId !== "string" || 488 !Array.isArray(notification.multiaddrs) 489 ) { 490 return; 491 } 492 493 for (const handler of this.identityHandlers) { 494 try { 495 const result = handler(notification); 496 if (result && typeof (result as Promise<void>).catch === "function") { 497 (result as Promise<void>).catch(() => {}); 498 } 499 } catch { 500 // Individual handler errors don't affect other handlers 501 } 502 } 503 } 504 } catch { 505 // Malformed messages are silently dropped 506 } 507 }); 508 } 509 510 getMultiaddrs(): string[] { 511 if (!this.helia) return []; 512 return this.helia.libp2p.getMultiaddrs().map(ma => ma.toString()); 513 } 514 515 getPeerId(): string | null { 516 if (!this.helia) return null; 517 return this.helia.libp2p.peerId.toString(); 518 } 519 520 getConnectionCount(): number { 521 if (!this.helia) return 0; 522 return this.helia.libp2p.getConnections().length; 523 } 524 525 getRemoteAddrs(peerId: string): string[] { 526 if (!this.helia) return []; 527 return this.helia.libp2p 528 .getConnections() 529 .filter((conn) => conn.remotePeer.toString() === peerId) 530 .map((conn) => conn.remoteAddr.toString()); 531 } 532 533 /** 534 * Announce this node as a provider for a DID's PLC log via the DHT. 535 * Computes a deterministic CID from the DID and calls routing.provide(). 536 */ 537 async provideForDid(did: string): Promise<void> { 538 if (!this.helia) return; 539 try { 540 const { computeDiscoveryCid } = await import("./identity/plc-mirror.js"); 541 const cidStr = await computeDiscoveryCid(did); 542 const cid = CID.parse(cidStr); 543 await this.helia.routing.provide(cid); 544 } catch { 545 // fire-and-forget 546 } 547 } 548 549 /** 550 * Find providers for a DID's PLC log via the DHT. 551 * Returns multiaddrs of peers that have announced they hold this DID's log. 552 */ 553 async findProvidersForDid(did: string): Promise<string[]> { 554 if (!this.helia) return []; 555 try { 556 const { computeDiscoveryCid } = await import("./identity/plc-mirror.js"); 557 const cidStr = await computeDiscoveryCid(did); 558 const cid = CID.parse(cidStr); 559 const providers: string[] = []; 560 for await (const provider of this.helia.routing.findProviders(cid)) { 561 for (const ma of provider.multiaddrs) { 562 providers.push(ma.toString()); 563 } 564 if (providers.length >= 10) break; 565 } 566 return providers; 567 } catch { 568 return []; 569 } 570 } 571 572 /** 573 * Delete all blocks from the underlying blockstore. 574 * Used during full disconnect to wipe the node clean. 575 */ 576 clearBlockstore(): void { 577 if (this.blockstore) { 578 this.blockstore.clear(); 579 } 580 } 581 582 /** 583 * Get dialability diagnostics: listening addrs, public addrs, NAT status, 584 * and HTTP reachability (if PUBLIC_URL is configured). 585 */ 586 async getDialability(publicUrl?: string): Promise<{ 587 listeningAddrs: string[]; 588 publicAddrs: string[]; 589 natStatus: "unknown" | "public" | "private"; 590 httpReachable: boolean | null; 591 }> { 592 const allAddrs = this.getMultiaddrs(); 593 const publicAddrs = allAddrs.filter((a) => !isPrivateMultiaddr(a)); 594 595 // Determine NAT status from multiaddr analysis 596 // getMultiaddrs() includes addresses confirmed by identify/autoNAT 597 let natStatus: "unknown" | "public" | "private" = "unknown"; 598 if (this.helia) { 599 // If we have public addresses in our advertised multiaddrs, 600 // that indicates we're publicly reachable 601 if (publicAddrs.length > 0) { 602 natStatus = "public"; 603 } else if (allAddrs.length > 0) { 604 natStatus = "private"; 605 } 606 } 607 608 // Check HTTP reachability 609 let httpReachable: boolean | null = null; 610 if (publicUrl) { 611 try { 612 const res = await fetch(publicUrl + "/health", { 613 signal: AbortSignal.timeout(5000), 614 }); 615 httpReachable = res.ok; 616 } catch { 617 httpReachable = false; 618 } 619 } 620 621 return { listeningAddrs: allAddrs, publicAddrs, natStatus, httpReachable }; 622 } 623 624 isRunning(): boolean { 625 return this.running; 626 } 627 628 /** 629 * Get the underlying libp2p node, if networking is enabled. 630 * Typed as `unknown` at the boundary to avoid importing libp2p types 631 * into this module. Callers cast to `Libp2p` when needed. 632 */ 633 getLibp2p(): unknown | null { 634 return this.helia?.libp2p ?? null; 635 } 636 637 /** 638 * Dial a specific multiaddr to establish a connection. 639 * No-op when networking is disabled. 640 */ 641 async dial(multiaddr: string): Promise<void> { 642 if (!this.helia) return; 643 const { multiaddr: ma } = await import("@multiformats/multiaddr"); 644 await this.helia.libp2p.dial(ma(multiaddr)); 645 } 646} 647 648/** 649 * Check if a multiaddr string contains a private/loopback IP address. 650 * Filters: 127.x.x.x, 10.x.x.x, 192.168.x.x, 172.16-31.x.x, ::1, fe80:: 651 */ 652function isPrivateMultiaddr(addr: string): boolean { 653 // Extract the IP from the multiaddr (e.g. /ip4/10.0.0.1/tcp/4001 → 10.0.0.1) 654 const ip4Match = addr.match(/\/ip4\/([\d.]+)/); 655 if (ip4Match) { 656 const ip = ip4Match[1]!; 657 if (ip.startsWith("127.")) return true; 658 if (ip.startsWith("10.")) return true; 659 if (ip.startsWith("192.168.")) return true; 660 // 172.16.0.0 - 172.31.255.255 661 const parts = ip.split("."); 662 if (parts[0] === "172") { 663 const second = parseInt(parts[1]!, 10); 664 if (second >= 16 && second <= 31) return true; 665 } 666 return false; 667 } 668 669 const ip6Match = addr.match(/\/ip6\/([a-fA-F0-9:]+)/); 670 if (ip6Match) { 671 const ip = ip6Match[1]!.toLowerCase(); 672 if (ip === "::1") return true; 673 if (ip.startsWith("fe80")) return true; 674 if (ip === "::") return true; 675 return false; 676 } 677 678 // dnsaddr and other non-IP addrs are considered public 679 return false; 680}