import { CID } from "multiformats"; import type { Helia } from "@helia/interface"; import type { BlockMap } from "@atproto/repo"; import type Database from "better-sqlite3"; import { encode as cborEncode, decode as cborDecode } from "./cbor-compat.js"; import { SqliteBlockstore } from "./sqlite-blockstore.js"; import { SqliteDatastore } from "./sqlite-datastore.js"; import type { RateLimiter } from "./rate-limiter.js"; import { DEFAULT_RATE_LIMIT_CONFIG } from "./rate-limiter.js"; /** Maximum gossipsub message size (8 KB). Messages larger than this are dropped before decoding. */ const MAX_GOSSIPSUB_MESSAGE_SIZE = 8192; /** * Pure storage: put, get, has blocks by CID string. * No networking, no peer identity — just content-addressed bytes. */ export interface BlockStore { putBlock(cidStr: string, bytes: Uint8Array): Promise; getBlock(cidStr: string): Promise; hasBlock(cidStr: string): Promise; putBlocks(blocks: BlockMap): Promise; deleteBlock(cidStr: string): Promise; } /** * Lightweight gossipsub notification for a repo commit. * Contains only metadata — actual blocks are fetched via existing sync mechanisms. */ export interface CommitNotification { did: string; commit: string; rev: string; time: string; peer: string; } export type CommitNotificationHandler = (notification: CommitNotification) => void | Promise; export const COMMIT_TOPIC_PREFIX = "/p2pds/commits/1/"; export function commitTopic(did: string): string { return `${COMMIT_TOPIC_PREFIX}${did}`; } /** * Lightweight gossipsub notification for a peer identity change. * Published when a peer's multiaddrs or PeerID changes. */ export interface IdentityNotification { did: string; peerId: string; multiaddrs: string[]; time: string; } export type IdentityNotificationHandler = (notification: IdentityNotification) => void | Promise; export const IDENTITY_TOPIC_PREFIX = "/p2pds/identity/1/"; export function identityTopic(did: string): string { return `${IDENTITY_TOPIC_PREFIX}${did}`; } /** * P2P networking: content routing, peer identity, connectivity, gossipsub. * Separated from storage so transports can be swapped independently. */ export interface NetworkService { provideBlocks(cidStrs: string[]): Promise; publishCommitNotification( did: string, commitCid: string, rev: string, ): Promise; onCommitNotification(handler: CommitNotificationHandler): void; subscribeCommitTopics(dids: string[]): void; unsubscribeCommitTopics(dids: string[]): void; getPeerId(): string | null; getMultiaddrs(): string[]; getConnectionCount(): number; getRemoteAddrs(peerId: string): string[]; publishIdentityNotification(did: string, peerId: string, multiaddrs: string[]): Promise; onIdentityNotification(handler: IdentityNotificationHandler): void; subscribeIdentityTopics(dids: string[]): void; unsubscribeIdentityTopics(dids: string[]): void; provideForDid(did: string): Promise; findProvidersForDid(did: string): Promise; } export interface IpfsConfig { db: Database.Database; networking: boolean; } /** * IpfsService encapsulates all Helia/IPFS functionality. * * Implements both BlockStore (storage) and NetworkService (P2P networking). * When networking is enabled, a full Helia node is created (libp2p + bitswap + DHT + gossipsub). * When networking is disabled, only a local FsBlockstore is used (for testing). * * All methods no-op gracefully if the service hasn't started yet. * String-based CID interface at the boundary — decouples from @atproto's CID class. */ export class IpfsService implements BlockStore, NetworkService { private helia: Helia | null = null; private blockstore: SqliteBlockstore | null = null; private config: IpfsConfig; private running = false; private commitHandlers: CommitNotificationHandler[] = []; private identityHandlers: IdentityNotificationHandler[] = []; private subscribedTopics: Set = new Set(); private rateLimiter: RateLimiter | null = null; constructor(config: IpfsConfig) { this.config = config; } /** * Set a rate limiter for gossipsub message rate limiting. * Called from server.ts after construction. */ setRateLimiter(limiter: RateLimiter): void { this.rateLimiter = limiter; } async start(): Promise { this.blockstore = new SqliteBlockstore(this.config.db); if (this.config.networking) { const { createHelia } = await import("helia"); const { tcp } = await import("@libp2p/tcp"); const { noise } = await import("@chainsafe/libp2p-noise"); const { yamux } = await import("@chainsafe/libp2p-yamux"); const { identify } = await import("@libp2p/identify"); const { autoNAT } = await import("@libp2p/autonat"); const { kadDHT, removePrivateAddressesMapper } = await import("@libp2p/kad-dht"); const { bootstrap } = await import("@libp2p/bootstrap"); const { ping } = await import("@libp2p/ping"); const datastore = new SqliteDatastore(this.config.db); // libp2p config: direct peer connections + lightweight diagnostics. // - Amino DHT in client mode: queries the public IPFS DHT for content // routing and PLC log discovery, but doesn't serve DHT queries. // - AutoNAT: asks connected peers to dial us back, confirming reachability. // - Bootstrap: connects to standard IPFS bootstrap peers on startup. // Cast SQLite stores to Helia's expected interfaces. // Our implementations are duck-type compatible but use Promise // returns instead of AwaitGenerator, which Helia handles fine at runtime. this.helia = await createHelia({ libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0"] }, transports: [tcp()], connectionEncrypters: [noise()], streamMuxers: [yamux()], services: { identify: identify(), ping: ping(), autoNAT: autoNAT(), aminoDHT: kadDHT({ protocol: "/ipfs/kad/1.0.0", peerInfoMapper: removePrivateAddressesMapper, clientMode: true, }), }, peerDiscovery: [ bootstrap({ list: [ "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", ], }), ], connectionManager: { maxConnections: 10, maxIncomingPendingConnections: 3, inboundConnectionThreshold: 2, }, }, blockstore: this.blockstore as any, datastore: datastore as any, }); } this.running = true; } async stop(): Promise { if (this.helia) { await this.helia.stop(); this.helia = null; } this.blockstore = null; this.running = false; this.subscribedTopics.clear(); } async putBlock(cidStr: string, bytes: Uint8Array): Promise { if (!this.blockstore) return; const cid = CID.parse(cidStr); if (this.helia) { await this.helia.blockstore.put(cid, bytes); } else { await this.blockstore.put(cid, bytes); } } async putBlocks(blocks: BlockMap): Promise { if (!this.blockstore) return; const internalMap = ( blocks as unknown as { map: Map } ).map; if (!internalMap) return; for (const [cidStr, bytes] of internalMap) { await this.putBlock(cidStr, bytes); } } async getBlock(cidStr: string): Promise { if (!this.blockstore) return null; try { const cid = CID.parse(cidStr); // Both Helia's blockstore and SqliteBlockstore return AsyncGenerator const gen = this.helia ? this.helia.blockstore.get(cid, { offline: true }) : this.blockstore.get(cid); const chunks: Uint8Array[] = []; for await (const chunk of gen) { chunks.push(chunk); } if (chunks.length === 0) return null; if (chunks.length === 1) return chunks[0]!; const total = chunks.reduce((acc, c) => acc + c.length, 0); const result = new Uint8Array(total); let offset = 0; for (const c of chunks) { result.set(c, offset); offset += c.length; } return result; } catch { return null; } } async hasBlock(cidStr: string): Promise { if (!this.blockstore) return false; try { const cid = CID.parse(cidStr); if (this.helia) { return await this.helia.blockstore.has(cid); } return await this.blockstore.has(cid); } catch { return false; } } async deleteBlock(cidStr: string): Promise { if (!this.blockstore) return; try { const cid = CID.parse(cidStr); if (this.helia) { await this.helia.blockstore.delete(cid); } else { await this.blockstore.delete(cid); } } catch { // No-op if block doesn't exist } } /** * Announce blocks to the DHT. Fire-and-forget, non-blocking. * Only works when networking is enabled. */ async provideBlocks(cidStrs: string[]): Promise { if (!this.helia) return; for (const cidStr of cidStrs) { try { const cid = CID.parse(cidStr); await this.helia.routing.provide(cid); } catch { // fire-and-forget: log but don't throw } } } /** * Publish a commit notification via gossipsub. * CBOR-encodes { did, commit, rev, time, peer } and publishes to the DID's topic. * No-op when networking is disabled. */ async publishCommitNotification( did: string, commitCid: string, rev: string, ): Promise { if (!this.helia) return; const pubsub = (this.helia.libp2p.services as Record).pubsub as { publish(topic: string, data: Uint8Array): Promise } | undefined; if (!pubsub) return; const notification: CommitNotification = { did, commit: commitCid, rev, time: new Date().toISOString(), peer: this.helia.libp2p.peerId.toString(), }; const data = cborEncode(notification); await pubsub.publish(commitTopic(did), data); } /** * Register a handler for incoming commit notifications. * Multiple handlers can be registered; all are called for each notification. */ onCommitNotification(handler: CommitNotificationHandler): void { this.commitHandlers.push(handler); } /** * Subscribe to gossipsub topics for the given DIDs. * No-op when networking is disabled. */ subscribeCommitTopics(dids: string[]): void { if (!this.helia) return; const pubsub = (this.helia.libp2p.services as Record).pubsub as { subscribe(topic: string): void } | undefined; if (!pubsub) return; for (const did of dids) { const topic = commitTopic(did); if (!this.subscribedTopics.has(topic)) { pubsub.subscribe(topic); this.subscribedTopics.add(topic); } } } /** * Unsubscribe from gossipsub topics for the given DIDs. * No-op when networking is disabled. */ unsubscribeCommitTopics(dids: string[]): void { if (!this.helia) return; const pubsub = (this.helia.libp2p.services as Record).pubsub as { unsubscribe(topic: string): void } | undefined; if (!pubsub) return; for (const did of dids) { const topic = commitTopic(did); if (this.subscribedTopics.has(topic)) { pubsub.unsubscribe(topic); this.subscribedTopics.delete(topic); } } } /** * Publish an identity notification via gossipsub. * CBOR-encodes { did, peerId, multiaddrs, time } and publishes to the DID's identity topic. */ async publishIdentityNotification( did: string, peerId: string, multiaddrs: string[], ): Promise { if (!this.helia) return; const pubsub = (this.helia.libp2p.services as Record).pubsub as { publish(topic: string, data: Uint8Array): Promise } | undefined; if (!pubsub) return; const notification: IdentityNotification = { did, peerId, multiaddrs, time: new Date().toISOString(), }; const data = cborEncode(notification); await pubsub.publish(identityTopic(did), data); } /** * Register a handler for incoming identity notifications. */ onIdentityNotification(handler: IdentityNotificationHandler): void { this.identityHandlers.push(handler); } /** * Subscribe to identity gossipsub topics for the given DIDs. */ subscribeIdentityTopics(dids: string[]): void { if (!this.helia) return; const pubsub = (this.helia.libp2p.services as Record).pubsub as { subscribe(topic: string): void } | undefined; if (!pubsub) return; for (const did of dids) { const topic = identityTopic(did); if (!this.subscribedTopics.has(topic)) { pubsub.subscribe(topic); this.subscribedTopics.add(topic); } } } /** * Unsubscribe from identity gossipsub topics for the given DIDs. */ unsubscribeIdentityTopics(dids: string[]): void { if (!this.helia) return; const pubsub = (this.helia.libp2p.services as Record).pubsub as { unsubscribe(topic: string): void } | undefined; if (!pubsub) return; for (const did of dids) { const topic = identityTopic(did); if (this.subscribedTopics.has(topic)) { pubsub.unsubscribe(topic); this.subscribedTopics.delete(topic); } } } /** * Set up the gossipsub message handler. * Listens for "message" events, CBOR-decodes, and dispatches to all registered handlers. */ private setupGossipsubHandler(): void { if (!this.helia) return; const pubsub = (this.helia.libp2p.services as Record).pubsub as { addEventListener(event: string, handler: (evt: unknown) => void): void } | undefined; if (!pubsub) return; pubsub.addEventListener("message", (evt: unknown) => { try { const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail; // Drop oversized messages before decoding if (detail.data.length > MAX_GOSSIPSUB_MESSAGE_SIZE) { return; } // Per-topic rate limiting via the shared RateLimiter if (this.rateLimiter) { const isCommit = detail.topic.startsWith(COMMIT_TOPIC_PREFIX); const isIdentity = detail.topic.startsWith(IDENTITY_TOPIC_PREFIX); if (isCommit || isIdentity) { const rule = isCommit ? DEFAULT_RATE_LIMIT_CONFIG.gossipsubCommit : DEFAULT_RATE_LIMIT_CONFIG.gossipsubIdentity; const result = this.rateLimiter.check("gossipsub", detail.topic, rule); if (!result.allowed) return; // silently drop } } if (detail.topic.startsWith(COMMIT_TOPIC_PREFIX)) { const notification = cborDecode(detail.data) as CommitNotification; if ( typeof notification.did !== "string" || typeof notification.commit !== "string" || typeof notification.rev !== "string" ) { return; } for (const handler of this.commitHandlers) { try { const result = handler(notification); if (result && typeof (result as Promise).catch === "function") { (result as Promise).catch(() => {}); } } catch { // Individual handler errors don't affect other handlers } } } else if (detail.topic.startsWith(IDENTITY_TOPIC_PREFIX)) { const notification = cborDecode(detail.data) as IdentityNotification; if ( typeof notification.did !== "string" || typeof notification.peerId !== "string" || !Array.isArray(notification.multiaddrs) ) { return; } for (const handler of this.identityHandlers) { try { const result = handler(notification); if (result && typeof (result as Promise).catch === "function") { (result as Promise).catch(() => {}); } } catch { // Individual handler errors don't affect other handlers } } } } catch { // Malformed messages are silently dropped } }); } getMultiaddrs(): string[] { if (!this.helia) return []; return this.helia.libp2p.getMultiaddrs().map(ma => ma.toString()); } getPeerId(): string | null { if (!this.helia) return null; return this.helia.libp2p.peerId.toString(); } getConnectionCount(): number { if (!this.helia) return 0; return this.helia.libp2p.getConnections().length; } getRemoteAddrs(peerId: string): string[] { if (!this.helia) return []; return this.helia.libp2p .getConnections() .filter((conn) => conn.remotePeer.toString() === peerId) .map((conn) => conn.remoteAddr.toString()); } /** * Announce this node as a provider for a DID's PLC log via the DHT. * Computes a deterministic CID from the DID and calls routing.provide(). */ async provideForDid(did: string): Promise { if (!this.helia) return; try { const { computeDiscoveryCid } = await import("./identity/plc-mirror.js"); const cidStr = await computeDiscoveryCid(did); const cid = CID.parse(cidStr); await this.helia.routing.provide(cid); } catch { // fire-and-forget } } /** * Find providers for a DID's PLC log via the DHT. * Returns multiaddrs of peers that have announced they hold this DID's log. */ async findProvidersForDid(did: string): Promise { if (!this.helia) return []; try { const { computeDiscoveryCid } = await import("./identity/plc-mirror.js"); const cidStr = await computeDiscoveryCid(did); const cid = CID.parse(cidStr); const providers: string[] = []; for await (const provider of this.helia.routing.findProviders(cid)) { for (const ma of provider.multiaddrs) { providers.push(ma.toString()); } if (providers.length >= 10) break; } return providers; } catch { return []; } } /** * Delete all blocks from the underlying blockstore. * Used during full disconnect to wipe the node clean. */ clearBlockstore(): void { if (this.blockstore) { this.blockstore.clear(); } } /** * Get dialability diagnostics: listening addrs, public addrs, NAT status, * and HTTP reachability (if PUBLIC_URL is configured). */ async getDialability(publicUrl?: string): Promise<{ listeningAddrs: string[]; publicAddrs: string[]; natStatus: "unknown" | "public" | "private"; httpReachable: boolean | null; }> { const allAddrs = this.getMultiaddrs(); const publicAddrs = allAddrs.filter((a) => !isPrivateMultiaddr(a)); // Determine NAT status from multiaddr analysis // getMultiaddrs() includes addresses confirmed by identify/autoNAT let natStatus: "unknown" | "public" | "private" = "unknown"; if (this.helia) { // If we have public addresses in our advertised multiaddrs, // that indicates we're publicly reachable if (publicAddrs.length > 0) { natStatus = "public"; } else if (allAddrs.length > 0) { natStatus = "private"; } } // Check HTTP reachability let httpReachable: boolean | null = null; if (publicUrl) { try { const res = await fetch(publicUrl + "/health", { signal: AbortSignal.timeout(5000), }); httpReachable = res.ok; } catch { httpReachable = false; } } return { listeningAddrs: allAddrs, publicAddrs, natStatus, httpReachable }; } isRunning(): boolean { return this.running; } /** * Get the underlying libp2p node, if networking is enabled. * Typed as `unknown` at the boundary to avoid importing libp2p types * into this module. Callers cast to `Libp2p` when needed. */ getLibp2p(): unknown | null { return this.helia?.libp2p ?? null; } /** * Dial a specific multiaddr to establish a connection. * No-op when networking is disabled. */ async dial(multiaddr: string): Promise { if (!this.helia) return; const { multiaddr: ma } = await import("@multiformats/multiaddr"); await this.helia.libp2p.dial(ma(multiaddr)); } } /** * Check if a multiaddr string contains a private/loopback IP address. * Filters: 127.x.x.x, 10.x.x.x, 192.168.x.x, 172.16-31.x.x, ::1, fe80:: */ function isPrivateMultiaddr(addr: string): boolean { // Extract the IP from the multiaddr (e.g. /ip4/10.0.0.1/tcp/4001 → 10.0.0.1) const ip4Match = addr.match(/\/ip4\/([\d.]+)/); if (ip4Match) { const ip = ip4Match[1]!; if (ip.startsWith("127.")) return true; if (ip.startsWith("10.")) return true; if (ip.startsWith("192.168.")) return true; // 172.16.0.0 - 172.31.255.255 const parts = ip.split("."); if (parts[0] === "172") { const second = parseInt(parts[1]!, 10); if (second >= 16 && second <= 31) return true; } return false; } const ip6Match = addr.match(/\/ip6\/([a-fA-F0-9:]+)/); if (ip6Match) { const ip = ip6Match[1]!.toLowerCase(); if (ip === "::1") return true; if (ip.startsWith("fe80")) return true; if (ip === "::") return true; return false; } // dnsaddr and other non-IP addrs are considered public return false; }