/** * CAR-over-libp2p repo sync protocol. * * Mirrors `com.atproto.sync.getRepo` but runs over libp2p streams, * enabling peer-to-peer repo transfer without centralized PDS servers. * * Protocol ID: /p2pds/repo-sync/1.0.0 * * Wire format (half-close request-response): * 1. Requester: send CBOR { did: string, since?: string }, then close() (half-close write) * 2. Responder: read request, generate CAR, send 1-byte status + CAR bytes, then close() * 3. Requester: read all response bytes — first byte is status, rest is CAR data * * Status codes: 0 = ok, 1 = not found, 2 = error */ import type { Libp2p, Stream } from "@libp2p/interface"; import { multiaddr } from "@multiformats/multiaddr"; import { encode as cborEncode, decode as cborDecode } from "../cbor-compat.js"; import type { BlockStore } from "../ipfs.js"; import type { SyncStorage } from "./sync-storage.js"; import { generateCarForDid } from "../xrpc/sync.js"; export const REPO_SYNC_PROTOCOL = "/p2pds/repo-sync/1.0.0"; /** Maximum request size (64 KB — just DID + since). */ const MAX_REQUEST_SIZE = 64 * 1024; /** Maximum response size (256 MB — full repo CARs can be large). */ const MAX_RESPONSE_SIZE = 256 * 1024 * 1024; /** Timeout for the entire sync operation (30 seconds). */ const SYNC_TIMEOUT_MS = 30_000; /** Status codes in the response. */ const STATUS_OK = 0; const STATUS_NOT_FOUND = 1; const STATUS_ERROR = 2; interface RepoSyncRequest { did: string; since?: string; } /** * Collect all chunks from a libp2p stream into a single Uint8Array. * Stream chunks may be Uint8Array or Uint8ArrayList; normalize via subarray(). * Throws if accumulated bytes exceed maxSize. */ async function collectStream( stream: AsyncIterable, maxSize: number, ): Promise { const chunks: Uint8Array[] = []; let totalSize = 0; for await (const chunk of stream) { const bytes = chunk instanceof Uint8Array ? chunk : chunk.subarray(); totalSize += bytes.length; if (totalSize > maxSize) { throw new Error(`Stream exceeded maximum size of ${maxSize} bytes`); } chunks.push(bytes); } if (chunks.length === 0) return new Uint8Array(0); if (chunks.length === 1) return chunks[0]!; const result = new Uint8Array(totalSize); let offset = 0; for (const c of chunks) { result.set(c, offset); offset += c.length; } return result; } /** * Register the repo sync protocol handler on a libp2p node (server side). * * When a peer requests a repo via this protocol, the handler: * 1. Reads the CBOR-encoded request (DID + optional since) * 2. Generates a CAR file from the local blockstore (incremental if since is provided) * 3. Sends status byte + CAR bytes back */ export function registerRepoSyncProtocol( libp2p: Libp2p, blockStore: BlockStore, syncStorage: SyncStorage, ): void { libp2p.handle( REPO_SYNC_PROTOCOL, async (stream: Stream) => { try { // Read request const requestBytes = await collectStream( stream as unknown as AsyncIterable< Uint8Array | { subarray(): Uint8Array } >, MAX_REQUEST_SIZE, ); const request = cborDecode(requestBytes) as RepoSyncRequest; if (!request.did || typeof request.did !== "string") { const errorResponse = new Uint8Array([STATUS_ERROR]); stream.send(errorResponse); await stream.close(); return; } console.log( `[libp2p-sync] Serving repo for ${request.did}${request.since ? ` (since: ${request.since.slice(0, 8)}…)` : ""}`, ); // Generate CAR (incremental if since is provided) const carBytes = await generateCarForDid( request.did, blockStore, syncStorage, request.since, ); if (!carBytes) { const notFoundResponse = new Uint8Array([STATUS_NOT_FOUND]); stream.send(notFoundResponse); await stream.close(); return; } // Send status + CAR bytes const response = new Uint8Array(1 + carBytes.length); response[0] = STATUS_OK; response.set(carBytes, 1); stream.send(response); await stream.close(); console.log( `[libp2p-sync] Served ${(carBytes.length / 1024).toFixed(1)} KB ${request.since ? "incremental" : "full"} CAR for ${request.did}`, ); } catch (err) { stream.abort( err instanceof Error ? err : new Error(String(err)), ); } }, ); } /** * Unregister the repo sync protocol handler. */ export async function unregisterRepoSyncProtocol( libp2p: Libp2p, ): Promise { await libp2p.unhandle(REPO_SYNC_PROTOCOL); } /** * Fetch a repo from a peer via the libp2p repo sync protocol (client side). * * Dials the peer at the given multiaddrs, sends a CBOR-encoded request, * and returns the CAR bytes. Throws on timeout, protocol error, or if * the peer doesn't have the repo. * * @param libp2p - The local libp2p node * @param multiaddrs - Peer's multiaddrs (from org.p2pds.peer record) * @param did - The DID to fetch * @param since - Optional rev for incremental sync * @returns CAR bytes for the repo */ export async function fetchRepoFromPeer( libp2p: Libp2p, multiaddrs: string[], did: string, since?: string, ): Promise { if (multiaddrs.length === 0) { throw new Error("No multiaddrs provided for peer"); } // Try each multiaddr until one works let lastError: Error | null = null; for (const addr of multiaddrs) { try { return await fetchRepoFromAddr(libp2p, addr, did, since); } catch (err) { lastError = err instanceof Error ? err : new Error(String(err)); } } throw lastError ?? new Error("No multiaddrs to try"); } async function fetchRepoFromAddr( libp2p: Libp2p, addr: string, did: string, since?: string, ): Promise { const ma = multiaddr(addr); // Apply timeout const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), SYNC_TIMEOUT_MS); try { const stream = await libp2p.dialProtocol(ma, REPO_SYNC_PROTOCOL, { signal: controller.signal, }); try { // Send request and half-close const request: RepoSyncRequest = { did }; if (since) request.since = since; const requestBytes = cborEncode(request); stream.send(requestBytes); await stream.close(); // Read response const responseBytes = await collectStream( stream as unknown as AsyncIterable< Uint8Array | { subarray(): Uint8Array } >, MAX_RESPONSE_SIZE, ); if (responseBytes.length === 0) { throw new Error("Empty response from peer"); } const status = responseBytes[0]; if (status === STATUS_NOT_FOUND) { throw new Error(`Peer does not have repo for ${did}`); } if (status === STATUS_ERROR) { throw new Error(`Peer returned error for ${did}`); } if (status !== STATUS_OK) { throw new Error(`Unknown status code ${status} from peer`); } // Rest of response is CAR bytes return responseBytes.slice(1); } catch (err) { stream.abort( err instanceof Error ? err : new Error(String(err)), ); throw err; } } finally { clearTimeout(timeout); } }