atproto user agency toolkit for individuals and groups
1/**
2 * CAR-over-libp2p repo sync protocol.
3 *
4 * Mirrors `com.atproto.sync.getRepo` but runs over libp2p streams,
5 * enabling peer-to-peer repo transfer without centralized PDS servers.
6 *
7 * Protocol ID: /p2pds/repo-sync/1.0.0
8 *
9 * Wire format (half-close request-response):
10 * 1. Requester: send CBOR { did: string, since?: string }, then close() (half-close write)
11 * 2. Responder: read request, generate CAR, send 1-byte status + CAR bytes, then close()
12 * 3. Requester: read all response bytes — first byte is status, rest is CAR data
13 *
14 * Status codes: 0 = ok, 1 = not found, 2 = error
15 */
16
17import type { Libp2p, Stream } from "@libp2p/interface";
18import { multiaddr } from "@multiformats/multiaddr";
19import { encode as cborEncode, decode as cborDecode } from "../cbor-compat.js";
20import type { BlockStore } from "../ipfs.js";
21import type { SyncStorage } from "./sync-storage.js";
22import { generateCarForDid } from "../xrpc/sync.js";
23
24export const REPO_SYNC_PROTOCOL = "/p2pds/repo-sync/1.0.0";
25
26/** Maximum request size (64 KB — just DID + since). */
27const MAX_REQUEST_SIZE = 64 * 1024;
28
29/** Maximum response size (256 MB — full repo CARs can be large). */
30const MAX_RESPONSE_SIZE = 256 * 1024 * 1024;
31
32/** Timeout for the entire sync operation (30 seconds). */
33const SYNC_TIMEOUT_MS = 30_000;
34
35/** Status codes in the response. */
36const STATUS_OK = 0;
37const STATUS_NOT_FOUND = 1;
38const STATUS_ERROR = 2;
39
40interface RepoSyncRequest {
41 did: string;
42 since?: string;
43}
44
45/**
46 * Collect all chunks from a libp2p stream into a single Uint8Array.
47 * Stream chunks may be Uint8Array or Uint8ArrayList; normalize via subarray().
48 * Throws if accumulated bytes exceed maxSize.
49 */
50async function collectStream(
51 stream: AsyncIterable<Uint8Array | { subarray(): Uint8Array }>,
52 maxSize: number,
53): Promise<Uint8Array> {
54 const chunks: Uint8Array[] = [];
55 let totalSize = 0;
56 for await (const chunk of stream) {
57 const bytes = chunk instanceof Uint8Array ? chunk : chunk.subarray();
58 totalSize += bytes.length;
59 if (totalSize > maxSize) {
60 throw new Error(`Stream exceeded maximum size of ${maxSize} bytes`);
61 }
62 chunks.push(bytes);
63 }
64 if (chunks.length === 0) return new Uint8Array(0);
65 if (chunks.length === 1) return chunks[0]!;
66 const result = new Uint8Array(totalSize);
67 let offset = 0;
68 for (const c of chunks) {
69 result.set(c, offset);
70 offset += c.length;
71 }
72 return result;
73}
74
75/**
76 * Register the repo sync protocol handler on a libp2p node (server side).
77 *
78 * When a peer requests a repo via this protocol, the handler:
79 * 1. Reads the CBOR-encoded request (DID + optional since)
80 * 2. Generates a CAR file from the local blockstore (incremental if since is provided)
81 * 3. Sends status byte + CAR bytes back
82 */
83export function registerRepoSyncProtocol(
84 libp2p: Libp2p,
85 blockStore: BlockStore,
86 syncStorage: SyncStorage,
87): void {
88 libp2p.handle(
89 REPO_SYNC_PROTOCOL,
90 async (stream: Stream) => {
91 try {
92 // Read request
93 const requestBytes = await collectStream(
94 stream as unknown as AsyncIterable<
95 Uint8Array | { subarray(): Uint8Array }
96 >,
97 MAX_REQUEST_SIZE,
98 );
99
100 const request = cborDecode(requestBytes) as RepoSyncRequest;
101 if (!request.did || typeof request.did !== "string") {
102 const errorResponse = new Uint8Array([STATUS_ERROR]);
103 stream.send(errorResponse);
104 await stream.close();
105 return;
106 }
107
108 console.log(
109 `[libp2p-sync] Serving repo for ${request.did}${request.since ? ` (since: ${request.since.slice(0, 8)}…)` : ""}`,
110 );
111
112 // Generate CAR (incremental if since is provided)
113 const carBytes = await generateCarForDid(
114 request.did,
115 blockStore,
116 syncStorage,
117 request.since,
118 );
119
120 if (!carBytes) {
121 const notFoundResponse = new Uint8Array([STATUS_NOT_FOUND]);
122 stream.send(notFoundResponse);
123 await stream.close();
124 return;
125 }
126
127 // Send status + CAR bytes
128 const response = new Uint8Array(1 + carBytes.length);
129 response[0] = STATUS_OK;
130 response.set(carBytes, 1);
131 stream.send(response);
132 await stream.close();
133
134 console.log(
135 `[libp2p-sync] Served ${(carBytes.length / 1024).toFixed(1)} KB ${request.since ? "incremental" : "full"} CAR for ${request.did}`,
136 );
137 } catch (err) {
138 stream.abort(
139 err instanceof Error ? err : new Error(String(err)),
140 );
141 }
142 },
143 );
144}
145
146/**
147 * Unregister the repo sync protocol handler.
148 */
149export async function unregisterRepoSyncProtocol(
150 libp2p: Libp2p,
151): Promise<void> {
152 await libp2p.unhandle(REPO_SYNC_PROTOCOL);
153}
154
155/**
156 * Fetch a repo from a peer via the libp2p repo sync protocol (client side).
157 *
158 * Dials the peer at the given multiaddrs, sends a CBOR-encoded request,
159 * and returns the CAR bytes. Throws on timeout, protocol error, or if
160 * the peer doesn't have the repo.
161 *
162 * @param libp2p - The local libp2p node
163 * @param multiaddrs - Peer's multiaddrs (from org.p2pds.peer record)
164 * @param did - The DID to fetch
165 * @param since - Optional rev for incremental sync
166 * @returns CAR bytes for the repo
167 */
168export async function fetchRepoFromPeer(
169 libp2p: Libp2p,
170 multiaddrs: string[],
171 did: string,
172 since?: string,
173): Promise<Uint8Array> {
174 if (multiaddrs.length === 0) {
175 throw new Error("No multiaddrs provided for peer");
176 }
177
178 // Try each multiaddr until one works
179 let lastError: Error | null = null;
180 for (const addr of multiaddrs) {
181 try {
182 return await fetchRepoFromAddr(libp2p, addr, did, since);
183 } catch (err) {
184 lastError = err instanceof Error ? err : new Error(String(err));
185 }
186 }
187
188 throw lastError ?? new Error("No multiaddrs to try");
189}
190
191async function fetchRepoFromAddr(
192 libp2p: Libp2p,
193 addr: string,
194 did: string,
195 since?: string,
196): Promise<Uint8Array> {
197 const ma = multiaddr(addr);
198
199 // Apply timeout
200 const controller = new AbortController();
201 const timeout = setTimeout(() => controller.abort(), SYNC_TIMEOUT_MS);
202
203 try {
204 const stream = await libp2p.dialProtocol(ma, REPO_SYNC_PROTOCOL, {
205 signal: controller.signal,
206 });
207
208 try {
209 // Send request and half-close
210 const request: RepoSyncRequest = { did };
211 if (since) request.since = since;
212 const requestBytes = cborEncode(request);
213 stream.send(requestBytes);
214 await stream.close();
215
216 // Read response
217 const responseBytes = await collectStream(
218 stream as unknown as AsyncIterable<
219 Uint8Array | { subarray(): Uint8Array }
220 >,
221 MAX_RESPONSE_SIZE,
222 );
223
224 if (responseBytes.length === 0) {
225 throw new Error("Empty response from peer");
226 }
227
228 const status = responseBytes[0];
229 if (status === STATUS_NOT_FOUND) {
230 throw new Error(`Peer does not have repo for ${did}`);
231 }
232 if (status === STATUS_ERROR) {
233 throw new Error(`Peer returned error for ${did}`);
234 }
235 if (status !== STATUS_OK) {
236 throw new Error(`Unknown status code ${status} from peer`);
237 }
238
239 // Rest of response is CAR bytes
240 return responseBytes.slice(1);
241 } catch (err) {
242 stream.abort(
243 err instanceof Error ? err : new Error(String(err)),
244 );
245 throw err;
246 }
247 } finally {
248 clearTimeout(timeout);
249 }
250}