atproto user agency toolkit for individuals and groups
at main 250 lines 7.1 kB view raw
1/** 2 * CAR-over-libp2p repo sync protocol. 3 * 4 * Mirrors `com.atproto.sync.getRepo` but runs over libp2p streams, 5 * enabling peer-to-peer repo transfer without centralized PDS servers. 6 * 7 * Protocol ID: /p2pds/repo-sync/1.0.0 8 * 9 * Wire format (half-close request-response): 10 * 1. Requester: send CBOR { did: string, since?: string }, then close() (half-close write) 11 * 2. Responder: read request, generate CAR, send 1-byte status + CAR bytes, then close() 12 * 3. Requester: read all response bytes — first byte is status, rest is CAR data 13 * 14 * Status codes: 0 = ok, 1 = not found, 2 = error 15 */ 16 17import type { Libp2p, Stream } from "@libp2p/interface"; 18import { multiaddr } from "@multiformats/multiaddr"; 19import { encode as cborEncode, decode as cborDecode } from "../cbor-compat.js"; 20import type { BlockStore } from "../ipfs.js"; 21import type { SyncStorage } from "./sync-storage.js"; 22import { generateCarForDid } from "../xrpc/sync.js"; 23 24export const REPO_SYNC_PROTOCOL = "/p2pds/repo-sync/1.0.0"; 25 26/** Maximum request size (64 KB — just DID + since). */ 27const MAX_REQUEST_SIZE = 64 * 1024; 28 29/** Maximum response size (256 MB — full repo CARs can be large). */ 30const MAX_RESPONSE_SIZE = 256 * 1024 * 1024; 31 32/** Timeout for the entire sync operation (30 seconds). */ 33const SYNC_TIMEOUT_MS = 30_000; 34 35/** Status codes in the response. */ 36const STATUS_OK = 0; 37const STATUS_NOT_FOUND = 1; 38const STATUS_ERROR = 2; 39 40interface RepoSyncRequest { 41 did: string; 42 since?: string; 43} 44 45/** 46 * Collect all chunks from a libp2p stream into a single Uint8Array. 47 * Stream chunks may be Uint8Array or Uint8ArrayList; normalize via subarray(). 48 * Throws if accumulated bytes exceed maxSize. 49 */ 50async function collectStream( 51 stream: AsyncIterable<Uint8Array | { subarray(): Uint8Array }>, 52 maxSize: number, 53): Promise<Uint8Array> { 54 const chunks: Uint8Array[] = []; 55 let totalSize = 0; 56 for await (const chunk of stream) { 57 const bytes = chunk instanceof Uint8Array ? chunk : chunk.subarray(); 58 totalSize += bytes.length; 59 if (totalSize > maxSize) { 60 throw new Error(`Stream exceeded maximum size of ${maxSize} bytes`); 61 } 62 chunks.push(bytes); 63 } 64 if (chunks.length === 0) return new Uint8Array(0); 65 if (chunks.length === 1) return chunks[0]!; 66 const result = new Uint8Array(totalSize); 67 let offset = 0; 68 for (const c of chunks) { 69 result.set(c, offset); 70 offset += c.length; 71 } 72 return result; 73} 74 75/** 76 * Register the repo sync protocol handler on a libp2p node (server side). 77 * 78 * When a peer requests a repo via this protocol, the handler: 79 * 1. Reads the CBOR-encoded request (DID + optional since) 80 * 2. Generates a CAR file from the local blockstore (incremental if since is provided) 81 * 3. Sends status byte + CAR bytes back 82 */ 83export function registerRepoSyncProtocol( 84 libp2p: Libp2p, 85 blockStore: BlockStore, 86 syncStorage: SyncStorage, 87): void { 88 libp2p.handle( 89 REPO_SYNC_PROTOCOL, 90 async (stream: Stream) => { 91 try { 92 // Read request 93 const requestBytes = await collectStream( 94 stream as unknown as AsyncIterable< 95 Uint8Array | { subarray(): Uint8Array } 96 >, 97 MAX_REQUEST_SIZE, 98 ); 99 100 const request = cborDecode(requestBytes) as RepoSyncRequest; 101 if (!request.did || typeof request.did !== "string") { 102 const errorResponse = new Uint8Array([STATUS_ERROR]); 103 stream.send(errorResponse); 104 await stream.close(); 105 return; 106 } 107 108 console.log( 109 `[libp2p-sync] Serving repo for ${request.did}${request.since ? ` (since: ${request.since.slice(0, 8)}…)` : ""}`, 110 ); 111 112 // Generate CAR (incremental if since is provided) 113 const carBytes = await generateCarForDid( 114 request.did, 115 blockStore, 116 syncStorage, 117 request.since, 118 ); 119 120 if (!carBytes) { 121 const notFoundResponse = new Uint8Array([STATUS_NOT_FOUND]); 122 stream.send(notFoundResponse); 123 await stream.close(); 124 return; 125 } 126 127 // Send status + CAR bytes 128 const response = new Uint8Array(1 + carBytes.length); 129 response[0] = STATUS_OK; 130 response.set(carBytes, 1); 131 stream.send(response); 132 await stream.close(); 133 134 console.log( 135 `[libp2p-sync] Served ${(carBytes.length / 1024).toFixed(1)} KB ${request.since ? "incremental" : "full"} CAR for ${request.did}`, 136 ); 137 } catch (err) { 138 stream.abort( 139 err instanceof Error ? err : new Error(String(err)), 140 ); 141 } 142 }, 143 ); 144} 145 146/** 147 * Unregister the repo sync protocol handler. 148 */ 149export async function unregisterRepoSyncProtocol( 150 libp2p: Libp2p, 151): Promise<void> { 152 await libp2p.unhandle(REPO_SYNC_PROTOCOL); 153} 154 155/** 156 * Fetch a repo from a peer via the libp2p repo sync protocol (client side). 157 * 158 * Dials the peer at the given multiaddrs, sends a CBOR-encoded request, 159 * and returns the CAR bytes. Throws on timeout, protocol error, or if 160 * the peer doesn't have the repo. 161 * 162 * @param libp2p - The local libp2p node 163 * @param multiaddrs - Peer's multiaddrs (from org.p2pds.peer record) 164 * @param did - The DID to fetch 165 * @param since - Optional rev for incremental sync 166 * @returns CAR bytes for the repo 167 */ 168export async function fetchRepoFromPeer( 169 libp2p: Libp2p, 170 multiaddrs: string[], 171 did: string, 172 since?: string, 173): Promise<Uint8Array> { 174 if (multiaddrs.length === 0) { 175 throw new Error("No multiaddrs provided for peer"); 176 } 177 178 // Try each multiaddr until one works 179 let lastError: Error | null = null; 180 for (const addr of multiaddrs) { 181 try { 182 return await fetchRepoFromAddr(libp2p, addr, did, since); 183 } catch (err) { 184 lastError = err instanceof Error ? err : new Error(String(err)); 185 } 186 } 187 188 throw lastError ?? new Error("No multiaddrs to try"); 189} 190 191async function fetchRepoFromAddr( 192 libp2p: Libp2p, 193 addr: string, 194 did: string, 195 since?: string, 196): Promise<Uint8Array> { 197 const ma = multiaddr(addr); 198 199 // Apply timeout 200 const controller = new AbortController(); 201 const timeout = setTimeout(() => controller.abort(), SYNC_TIMEOUT_MS); 202 203 try { 204 const stream = await libp2p.dialProtocol(ma, REPO_SYNC_PROTOCOL, { 205 signal: controller.signal, 206 }); 207 208 try { 209 // Send request and half-close 210 const request: RepoSyncRequest = { did }; 211 if (since) request.since = since; 212 const requestBytes = cborEncode(request); 213 stream.send(requestBytes); 214 await stream.close(); 215 216 // Read response 217 const responseBytes = await collectStream( 218 stream as unknown as AsyncIterable< 219 Uint8Array | { subarray(): Uint8Array } 220 >, 221 MAX_RESPONSE_SIZE, 222 ); 223 224 if (responseBytes.length === 0) { 225 throw new Error("Empty response from peer"); 226 } 227 228 const status = responseBytes[0]; 229 if (status === STATUS_NOT_FOUND) { 230 throw new Error(`Peer does not have repo for ${did}`); 231 } 232 if (status === STATUS_ERROR) { 233 throw new Error(`Peer returned error for ${did}`); 234 } 235 if (status !== STATUS_OK) { 236 throw new Error(`Unknown status code ${status} from peer`); 237 } 238 239 // Rest of response is CAR bytes 240 return responseBytes.slice(1); 241 } catch (err) { 242 stream.abort( 243 err instanceof Error ? err : new Error(String(err)), 244 ); 245 throw err; 246 } 247 } finally { 248 clearTimeout(timeout); 249 } 250}