atproto user agency toolkit for individuals and groups
1import { CID } from "multiformats";
2import type { Helia } from "@helia/interface";
3import type { BlockMap } from "@atproto/repo";
4import type Database from "better-sqlite3";
5import { encode as cborEncode, decode as cborDecode } from "./cbor-compat.js";
6import { SqliteBlockstore } from "./sqlite-blockstore.js";
7import { SqliteDatastore } from "./sqlite-datastore.js";
8import type { RateLimiter } from "./rate-limiter.js";
9import { DEFAULT_RATE_LIMIT_CONFIG } from "./rate-limiter.js";
10
11/** Maximum gossipsub message size (8 KB). Messages larger than this are dropped before decoding. */
12const MAX_GOSSIPSUB_MESSAGE_SIZE = 8192;
13
14/**
15 * Pure storage: put, get, has blocks by CID string.
16 * No networking, no peer identity — just content-addressed bytes.
17 */
18export interface BlockStore {
19 putBlock(cidStr: string, bytes: Uint8Array): Promise<void>;
20 getBlock(cidStr: string): Promise<Uint8Array | null>;
21 hasBlock(cidStr: string): Promise<boolean>;
22 putBlocks(blocks: BlockMap): Promise<void>;
23 deleteBlock(cidStr: string): Promise<void>;
24}
25
26/**
27 * Lightweight gossipsub notification for a repo commit.
28 * Contains only metadata — actual blocks are fetched via existing sync mechanisms.
29 */
30export interface CommitNotification {
31 did: string;
32 commit: string;
33 rev: string;
34 time: string;
35 peer: string;
36}
37
38export type CommitNotificationHandler = (notification: CommitNotification) => void | Promise<void>;
39
40export const COMMIT_TOPIC_PREFIX = "/p2pds/commits/1/";
41
42export function commitTopic(did: string): string {
43 return `${COMMIT_TOPIC_PREFIX}${did}`;
44}
45
46/**
47 * Lightweight gossipsub notification for a peer identity change.
48 * Published when a peer's multiaddrs or PeerID changes.
49 */
50export interface IdentityNotification {
51 did: string;
52 peerId: string;
53 multiaddrs: string[];
54 time: string;
55}
56
57export type IdentityNotificationHandler = (notification: IdentityNotification) => void | Promise<void>;
58
59export const IDENTITY_TOPIC_PREFIX = "/p2pds/identity/1/";
60
61export function identityTopic(did: string): string {
62 return `${IDENTITY_TOPIC_PREFIX}${did}`;
63}
64
65/**
66 * P2P networking: content routing, peer identity, connectivity, gossipsub.
67 * Separated from storage so transports can be swapped independently.
68 */
69export interface NetworkService {
70 provideBlocks(cidStrs: string[]): Promise<void>;
71 publishCommitNotification(
72 did: string,
73 commitCid: string,
74 rev: string,
75 ): Promise<void>;
76 onCommitNotification(handler: CommitNotificationHandler): void;
77 subscribeCommitTopics(dids: string[]): void;
78 unsubscribeCommitTopics(dids: string[]): void;
79 getPeerId(): string | null;
80 getMultiaddrs(): string[];
81 getConnectionCount(): number;
82 getRemoteAddrs(peerId: string): string[];
83 publishIdentityNotification(did: string, peerId: string, multiaddrs: string[]): Promise<void>;
84 onIdentityNotification(handler: IdentityNotificationHandler): void;
85 subscribeIdentityTopics(dids: string[]): void;
86 unsubscribeIdentityTopics(dids: string[]): void;
87 provideForDid(did: string): Promise<void>;
88 findProvidersForDid(did: string): Promise<string[]>;
89}
90
91export interface IpfsConfig {
92 db: Database.Database;
93 networking: boolean;
94}
95
96/**
97 * IpfsService encapsulates all Helia/IPFS functionality.
98 *
99 * Implements both BlockStore (storage) and NetworkService (P2P networking).
100 * When networking is enabled, a full Helia node is created (libp2p + bitswap + DHT + gossipsub).
101 * When networking is disabled, only a local FsBlockstore is used (for testing).
102 *
103 * All methods no-op gracefully if the service hasn't started yet.
104 * String-based CID interface at the boundary — decouples from @atproto's CID class.
105 */
106export class IpfsService implements BlockStore, NetworkService {
107 private helia: Helia | null = null;
108 private blockstore: SqliteBlockstore | null = null;
109 private config: IpfsConfig;
110 private running = false;
111 private commitHandlers: CommitNotificationHandler[] = [];
112 private identityHandlers: IdentityNotificationHandler[] = [];
113 private subscribedTopics: Set<string> = new Set();
114 private rateLimiter: RateLimiter | null = null;
115
116 constructor(config: IpfsConfig) {
117 this.config = config;
118 }
119
120 /**
121 * Set a rate limiter for gossipsub message rate limiting.
122 * Called from server.ts after construction.
123 */
124 setRateLimiter(limiter: RateLimiter): void {
125 this.rateLimiter = limiter;
126 }
127
128 async start(): Promise<void> {
129 this.blockstore = new SqliteBlockstore(this.config.db);
130
131 if (this.config.networking) {
132 const { createHelia } = await import("helia");
133 const { tcp } = await import("@libp2p/tcp");
134 const { noise } = await import("@chainsafe/libp2p-noise");
135 const { yamux } = await import("@chainsafe/libp2p-yamux");
136 const { identify } = await import("@libp2p/identify");
137 const { autoNAT } = await import("@libp2p/autonat");
138 const { kadDHT, removePrivateAddressesMapper } = await import("@libp2p/kad-dht");
139 const { bootstrap } = await import("@libp2p/bootstrap");
140 const { ping } = await import("@libp2p/ping");
141 const datastore = new SqliteDatastore(this.config.db);
142
143 // libp2p config: direct peer connections + lightweight diagnostics.
144 // - Amino DHT in client mode: queries the public IPFS DHT for content
145 // routing and PLC log discovery, but doesn't serve DHT queries.
146 // - AutoNAT: asks connected peers to dial us back, confirming reachability.
147 // - Bootstrap: connects to standard IPFS bootstrap peers on startup.
148 // Cast SQLite stores to Helia's expected interfaces.
149 // Our implementations are duck-type compatible but use Promise
150 // returns instead of AwaitGenerator, which Helia handles fine at runtime.
151 this.helia = await createHelia({
152 libp2p: {
153 addresses: { listen: ["/ip4/0.0.0.0/tcp/0"] },
154 transports: [tcp()],
155 connectionEncrypters: [noise()],
156 streamMuxers: [yamux()],
157 services: {
158 identify: identify(),
159 ping: ping(),
160 autoNAT: autoNAT(),
161 aminoDHT: kadDHT({
162 protocol: "/ipfs/kad/1.0.0",
163 peerInfoMapper: removePrivateAddressesMapper,
164 clientMode: true,
165 }),
166 },
167 peerDiscovery: [
168 bootstrap({
169 list: [
170 "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
171 "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
172 ],
173 }),
174 ],
175 connectionManager: {
176 maxConnections: 10,
177 maxIncomingPendingConnections: 3,
178 inboundConnectionThreshold: 2,
179 },
180 },
181 blockstore: this.blockstore as any,
182 datastore: datastore as any,
183 });
184 }
185
186 this.running = true;
187 }
188
189 async stop(): Promise<void> {
190 if (this.helia) {
191 await this.helia.stop();
192 this.helia = null;
193 }
194 this.blockstore = null;
195 this.running = false;
196 this.subscribedTopics.clear();
197 }
198
199 async putBlock(cidStr: string, bytes: Uint8Array): Promise<void> {
200 if (!this.blockstore) return;
201 const cid = CID.parse(cidStr);
202 if (this.helia) {
203 await this.helia.blockstore.put(cid, bytes);
204 } else {
205 await this.blockstore.put(cid, bytes);
206 }
207 }
208
209 async putBlocks(blocks: BlockMap): Promise<void> {
210 if (!this.blockstore) return;
211 const internalMap = (
212 blocks as unknown as { map: Map<string, Uint8Array> }
213 ).map;
214 if (!internalMap) return;
215 for (const [cidStr, bytes] of internalMap) {
216 await this.putBlock(cidStr, bytes);
217 }
218 }
219
220 async getBlock(cidStr: string): Promise<Uint8Array | null> {
221 if (!this.blockstore) return null;
222 try {
223 const cid = CID.parse(cidStr);
224 // Both Helia's blockstore and SqliteBlockstore return AsyncGenerator<Uint8Array>
225 const gen = this.helia
226 ? this.helia.blockstore.get(cid, { offline: true })
227 : this.blockstore.get(cid);
228 const chunks: Uint8Array[] = [];
229 for await (const chunk of gen) {
230 chunks.push(chunk);
231 }
232 if (chunks.length === 0) return null;
233 if (chunks.length === 1) return chunks[0]!;
234 const total = chunks.reduce((acc, c) => acc + c.length, 0);
235 const result = new Uint8Array(total);
236 let offset = 0;
237 for (const c of chunks) {
238 result.set(c, offset);
239 offset += c.length;
240 }
241 return result;
242 } catch {
243 return null;
244 }
245 }
246
247 async hasBlock(cidStr: string): Promise<boolean> {
248 if (!this.blockstore) return false;
249 try {
250 const cid = CID.parse(cidStr);
251 if (this.helia) {
252 return await this.helia.blockstore.has(cid);
253 }
254 return await this.blockstore.has(cid);
255 } catch {
256 return false;
257 }
258 }
259
260 async deleteBlock(cidStr: string): Promise<void> {
261 if (!this.blockstore) return;
262 try {
263 const cid = CID.parse(cidStr);
264 if (this.helia) {
265 await this.helia.blockstore.delete(cid);
266 } else {
267 await this.blockstore.delete(cid);
268 }
269 } catch {
270 // No-op if block doesn't exist
271 }
272 }
273
274 /**
275 * Announce blocks to the DHT. Fire-and-forget, non-blocking.
276 * Only works when networking is enabled.
277 */
278 async provideBlocks(cidStrs: string[]): Promise<void> {
279 if (!this.helia) return;
280 for (const cidStr of cidStrs) {
281 try {
282 const cid = CID.parse(cidStr);
283 await this.helia.routing.provide(cid);
284 } catch {
285 // fire-and-forget: log but don't throw
286 }
287 }
288 }
289
290 /**
291 * Publish a commit notification via gossipsub.
292 * CBOR-encodes { did, commit, rev, time, peer } and publishes to the DID's topic.
293 * No-op when networking is disabled.
294 */
295 async publishCommitNotification(
296 did: string,
297 commitCid: string,
298 rev: string,
299 ): Promise<void> {
300 if (!this.helia) return;
301 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as
302 { publish(topic: string, data: Uint8Array): Promise<unknown> } | undefined;
303 if (!pubsub) return;
304
305 const notification: CommitNotification = {
306 did,
307 commit: commitCid,
308 rev,
309 time: new Date().toISOString(),
310 peer: this.helia.libp2p.peerId.toString(),
311 };
312
313 const data = cborEncode(notification);
314 await pubsub.publish(commitTopic(did), data);
315 }
316
317 /**
318 * Register a handler for incoming commit notifications.
319 * Multiple handlers can be registered; all are called for each notification.
320 */
321 onCommitNotification(handler: CommitNotificationHandler): void {
322 this.commitHandlers.push(handler);
323 }
324
325 /**
326 * Subscribe to gossipsub topics for the given DIDs.
327 * No-op when networking is disabled.
328 */
329 subscribeCommitTopics(dids: string[]): void {
330 if (!this.helia) return;
331 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as
332 { subscribe(topic: string): void } | undefined;
333 if (!pubsub) return;
334
335 for (const did of dids) {
336 const topic = commitTopic(did);
337 if (!this.subscribedTopics.has(topic)) {
338 pubsub.subscribe(topic);
339 this.subscribedTopics.add(topic);
340 }
341 }
342 }
343
344 /**
345 * Unsubscribe from gossipsub topics for the given DIDs.
346 * No-op when networking is disabled.
347 */
348 unsubscribeCommitTopics(dids: string[]): void {
349 if (!this.helia) return;
350 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as
351 { unsubscribe(topic: string): void } | undefined;
352 if (!pubsub) return;
353
354 for (const did of dids) {
355 const topic = commitTopic(did);
356 if (this.subscribedTopics.has(topic)) {
357 pubsub.unsubscribe(topic);
358 this.subscribedTopics.delete(topic);
359 }
360 }
361 }
362
363 /**
364 * Publish an identity notification via gossipsub.
365 * CBOR-encodes { did, peerId, multiaddrs, time } and publishes to the DID's identity topic.
366 */
367 async publishIdentityNotification(
368 did: string,
369 peerId: string,
370 multiaddrs: string[],
371 ): Promise<void> {
372 if (!this.helia) return;
373 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as
374 { publish(topic: string, data: Uint8Array): Promise<unknown> } | undefined;
375 if (!pubsub) return;
376
377 const notification: IdentityNotification = {
378 did,
379 peerId,
380 multiaddrs,
381 time: new Date().toISOString(),
382 };
383
384 const data = cborEncode(notification);
385 await pubsub.publish(identityTopic(did), data);
386 }
387
388 /**
389 * Register a handler for incoming identity notifications.
390 */
391 onIdentityNotification(handler: IdentityNotificationHandler): void {
392 this.identityHandlers.push(handler);
393 }
394
395 /**
396 * Subscribe to identity gossipsub topics for the given DIDs.
397 */
398 subscribeIdentityTopics(dids: string[]): void {
399 if (!this.helia) return;
400 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as
401 { subscribe(topic: string): void } | undefined;
402 if (!pubsub) return;
403
404 for (const did of dids) {
405 const topic = identityTopic(did);
406 if (!this.subscribedTopics.has(topic)) {
407 pubsub.subscribe(topic);
408 this.subscribedTopics.add(topic);
409 }
410 }
411 }
412
413 /**
414 * Unsubscribe from identity gossipsub topics for the given DIDs.
415 */
416 unsubscribeIdentityTopics(dids: string[]): void {
417 if (!this.helia) return;
418 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as
419 { unsubscribe(topic: string): void } | undefined;
420 if (!pubsub) return;
421
422 for (const did of dids) {
423 const topic = identityTopic(did);
424 if (this.subscribedTopics.has(topic)) {
425 pubsub.unsubscribe(topic);
426 this.subscribedTopics.delete(topic);
427 }
428 }
429 }
430
431 /**
432 * Set up the gossipsub message handler.
433 * Listens for "message" events, CBOR-decodes, and dispatches to all registered handlers.
434 */
435 private setupGossipsubHandler(): void {
436 if (!this.helia) return;
437 const pubsub = (this.helia.libp2p.services as Record<string, unknown>).pubsub as
438 { addEventListener(event: string, handler: (evt: unknown) => void): void } | undefined;
439 if (!pubsub) return;
440
441 pubsub.addEventListener("message", (evt: unknown) => {
442 try {
443 const detail = (evt as { detail: { topic: string; data: Uint8Array } }).detail;
444
445 // Drop oversized messages before decoding
446 if (detail.data.length > MAX_GOSSIPSUB_MESSAGE_SIZE) {
447 return;
448 }
449
450 // Per-topic rate limiting via the shared RateLimiter
451 if (this.rateLimiter) {
452 const isCommit = detail.topic.startsWith(COMMIT_TOPIC_PREFIX);
453 const isIdentity = detail.topic.startsWith(IDENTITY_TOPIC_PREFIX);
454 if (isCommit || isIdentity) {
455 const rule = isCommit
456 ? DEFAULT_RATE_LIMIT_CONFIG.gossipsubCommit
457 : DEFAULT_RATE_LIMIT_CONFIG.gossipsubIdentity;
458 const result = this.rateLimiter.check("gossipsub", detail.topic, rule);
459 if (!result.allowed) return; // silently drop
460 }
461 }
462
463 if (detail.topic.startsWith(COMMIT_TOPIC_PREFIX)) {
464 const notification = cborDecode(detail.data) as CommitNotification;
465 if (
466 typeof notification.did !== "string" ||
467 typeof notification.commit !== "string" ||
468 typeof notification.rev !== "string"
469 ) {
470 return;
471 }
472
473 for (const handler of this.commitHandlers) {
474 try {
475 const result = handler(notification);
476 if (result && typeof (result as Promise<void>).catch === "function") {
477 (result as Promise<void>).catch(() => {});
478 }
479 } catch {
480 // Individual handler errors don't affect other handlers
481 }
482 }
483 } else if (detail.topic.startsWith(IDENTITY_TOPIC_PREFIX)) {
484 const notification = cborDecode(detail.data) as IdentityNotification;
485 if (
486 typeof notification.did !== "string" ||
487 typeof notification.peerId !== "string" ||
488 !Array.isArray(notification.multiaddrs)
489 ) {
490 return;
491 }
492
493 for (const handler of this.identityHandlers) {
494 try {
495 const result = handler(notification);
496 if (result && typeof (result as Promise<void>).catch === "function") {
497 (result as Promise<void>).catch(() => {});
498 }
499 } catch {
500 // Individual handler errors don't affect other handlers
501 }
502 }
503 }
504 } catch {
505 // Malformed messages are silently dropped
506 }
507 });
508 }
509
510 getMultiaddrs(): string[] {
511 if (!this.helia) return [];
512 return this.helia.libp2p.getMultiaddrs().map(ma => ma.toString());
513 }
514
515 getPeerId(): string | null {
516 if (!this.helia) return null;
517 return this.helia.libp2p.peerId.toString();
518 }
519
520 getConnectionCount(): number {
521 if (!this.helia) return 0;
522 return this.helia.libp2p.getConnections().length;
523 }
524
525 getRemoteAddrs(peerId: string): string[] {
526 if (!this.helia) return [];
527 return this.helia.libp2p
528 .getConnections()
529 .filter((conn) => conn.remotePeer.toString() === peerId)
530 .map((conn) => conn.remoteAddr.toString());
531 }
532
533 /**
534 * Announce this node as a provider for a DID's PLC log via the DHT.
535 * Computes a deterministic CID from the DID and calls routing.provide().
536 */
537 async provideForDid(did: string): Promise<void> {
538 if (!this.helia) return;
539 try {
540 const { computeDiscoveryCid } = await import("./identity/plc-mirror.js");
541 const cidStr = await computeDiscoveryCid(did);
542 const cid = CID.parse(cidStr);
543 await this.helia.routing.provide(cid);
544 } catch {
545 // fire-and-forget
546 }
547 }
548
549 /**
550 * Find providers for a DID's PLC log via the DHT.
551 * Returns multiaddrs of peers that have announced they hold this DID's log.
552 */
553 async findProvidersForDid(did: string): Promise<string[]> {
554 if (!this.helia) return [];
555 try {
556 const { computeDiscoveryCid } = await import("./identity/plc-mirror.js");
557 const cidStr = await computeDiscoveryCid(did);
558 const cid = CID.parse(cidStr);
559 const providers: string[] = [];
560 for await (const provider of this.helia.routing.findProviders(cid)) {
561 for (const ma of provider.multiaddrs) {
562 providers.push(ma.toString());
563 }
564 if (providers.length >= 10) break;
565 }
566 return providers;
567 } catch {
568 return [];
569 }
570 }
571
572 /**
573 * Delete all blocks from the underlying blockstore.
574 * Used during full disconnect to wipe the node clean.
575 */
576 clearBlockstore(): void {
577 if (this.blockstore) {
578 this.blockstore.clear();
579 }
580 }
581
582 /**
583 * Get dialability diagnostics: listening addrs, public addrs, NAT status,
584 * and HTTP reachability (if PUBLIC_URL is configured).
585 */
586 async getDialability(publicUrl?: string): Promise<{
587 listeningAddrs: string[];
588 publicAddrs: string[];
589 natStatus: "unknown" | "public" | "private";
590 httpReachable: boolean | null;
591 }> {
592 const allAddrs = this.getMultiaddrs();
593 const publicAddrs = allAddrs.filter((a) => !isPrivateMultiaddr(a));
594
595 // Determine NAT status from multiaddr analysis
596 // getMultiaddrs() includes addresses confirmed by identify/autoNAT
597 let natStatus: "unknown" | "public" | "private" = "unknown";
598 if (this.helia) {
599 // If we have public addresses in our advertised multiaddrs,
600 // that indicates we're publicly reachable
601 if (publicAddrs.length > 0) {
602 natStatus = "public";
603 } else if (allAddrs.length > 0) {
604 natStatus = "private";
605 }
606 }
607
608 // Check HTTP reachability
609 let httpReachable: boolean | null = null;
610 if (publicUrl) {
611 try {
612 const res = await fetch(publicUrl + "/health", {
613 signal: AbortSignal.timeout(5000),
614 });
615 httpReachable = res.ok;
616 } catch {
617 httpReachable = false;
618 }
619 }
620
621 return { listeningAddrs: allAddrs, publicAddrs, natStatus, httpReachable };
622 }
623
624 isRunning(): boolean {
625 return this.running;
626 }
627
628 /**
629 * Get the underlying libp2p node, if networking is enabled.
630 * Typed as `unknown` at the boundary to avoid importing libp2p types
631 * into this module. Callers cast to `Libp2p` when needed.
632 */
633 getLibp2p(): unknown | null {
634 return this.helia?.libp2p ?? null;
635 }
636
637 /**
638 * Dial a specific multiaddr to establish a connection.
639 * No-op when networking is disabled.
640 */
641 async dial(multiaddr: string): Promise<void> {
642 if (!this.helia) return;
643 const { multiaddr: ma } = await import("@multiformats/multiaddr");
644 await this.helia.libp2p.dial(ma(multiaddr));
645 }
646}
647
648/**
649 * Check if a multiaddr string contains a private/loopback IP address.
650 * Filters: 127.x.x.x, 10.x.x.x, 192.168.x.x, 172.16-31.x.x, ::1, fe80::
651 */
652function isPrivateMultiaddr(addr: string): boolean {
653 // Extract the IP from the multiaddr (e.g. /ip4/10.0.0.1/tcp/4001 → 10.0.0.1)
654 const ip4Match = addr.match(/\/ip4\/([\d.]+)/);
655 if (ip4Match) {
656 const ip = ip4Match[1]!;
657 if (ip.startsWith("127.")) return true;
658 if (ip.startsWith("10.")) return true;
659 if (ip.startsWith("192.168.")) return true;
660 // 172.16.0.0 - 172.31.255.255
661 const parts = ip.split(".");
662 if (parts[0] === "172") {
663 const second = parseInt(parts[1]!, 10);
664 if (second >= 16 && second <= 31) return true;
665 }
666 return false;
667 }
668
669 const ip6Match = addr.match(/\/ip6\/([a-fA-F0-9:]+)/);
670 if (ip6Match) {
671 const ip = ip6Match[1]!.toLowerCase();
672 if (ip === "::1") return true;
673 if (ip.startsWith("fe80")) return true;
674 if (ip === "::") return true;
675 return false;
676 }
677
678 // dnsaddr and other non-IP addrs are considered public
679 return false;
680}