atproto user agency toolkit for individuals and groups
1/**
2 * Tests for incremental sync:
3 * - SyncStorage.getRootCidForRev()
4 * - generateCarForDid() with since parameter (incremental CAR)
5 */
6
7import { describe, it, expect, beforeEach, afterEach } from "vitest";
8import { mkdtempSync, rmSync } from "node:fs";
9import { tmpdir } from "node:os";
10import { join } from "node:path";
11import Database from "better-sqlite3";
12import { IpfsService } from "../ipfs.js";
13import { RepoManager } from "../repo-manager.js";
14import type { Config } from "../config.js";
15import { readCarWithRoot } from "@atproto/repo";
16
17import { SyncStorage } from "./sync-storage.js";
18import { generateCarForDid } from "../xrpc/sync.js";
19
20function testConfig(dataDir: string): Config {
21 return {
22 DID: "did:plc:test123",
23 HANDLE: "test.example.com",
24 PDS_HOSTNAME: "test.example.com",
25 AUTH_TOKEN: "test-auth-token",
26 SIGNING_KEY:
27 "0000000000000000000000000000000000000000000000000000000000000001",
28 SIGNING_KEY_PUBLIC:
29 "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
30 JWT_SECRET: "test-jwt-secret",
31 PASSWORD_HASH: "$2a$10$test",
32 DATA_DIR: dataDir,
33 PORT: 3000,
34 IPFS_ENABLED: true,
35 IPFS_NETWORKING: false,
36 REPLICATE_DIDS: [],
37 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
38 FIREHOSE_ENABLED: false,
39 RATE_LIMIT_ENABLED: false,
40 RATE_LIMIT_READ_PER_MIN: 300,
41 RATE_LIMIT_SYNC_PER_MIN: 30,
42 RATE_LIMIT_SESSION_PER_MIN: 10,
43 RATE_LIMIT_WRITE_PER_MIN: 200,
44 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
45 RATE_LIMIT_MAX_CONNECTIONS: 100,
46 RATE_LIMIT_FIREHOSE_PER_IP: 3,
47 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
48 };
49}
50
51const TEST_DID = "did:plc:testrepo";
52
53// ============================================
54// SyncStorage.getRootCidForRev()
55// ============================================
56
57describe("SyncStorage.getRootCidForRev", () => {
58 let tmpDir: string;
59 let db: InstanceType<typeof Database>;
60 let storage: SyncStorage;
61
62 beforeEach(() => {
63 tmpDir = mkdtempSync(join(tmpdir(), "incremental-sync-test-"));
64 db = new Database(join(tmpDir, "test.db"));
65 storage = new SyncStorage(db);
66 storage.initSchema();
67 });
68
69 afterEach(() => {
70 db.close();
71 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
72 });
73
74 it("returns root_cid for a known successful sync", () => {
75 const eventId = storage.startSyncEvent(TEST_DID, "pds");
76 storage.completeSyncEvent(eventId, {
77 status: "success",
78 rev: "rev-001",
79 rootCid: "bafyreiabc123",
80 });
81
82 const result = storage.getRootCidForRev(TEST_DID, "rev-001");
83 expect(result).toBe("bafyreiabc123");
84 });
85
86 it("returns null for unknown rev", () => {
87 const eventId = storage.startSyncEvent(TEST_DID, "pds");
88 storage.completeSyncEvent(eventId, {
89 status: "success",
90 rev: "rev-001",
91 rootCid: "bafyreiabc123",
92 });
93
94 const result = storage.getRootCidForRev(TEST_DID, "rev-unknown");
95 expect(result).toBeNull();
96 });
97
98 it("returns null for unknown DID", () => {
99 const eventId = storage.startSyncEvent(TEST_DID, "pds");
100 storage.completeSyncEvent(eventId, {
101 status: "success",
102 rev: "rev-001",
103 rootCid: "bafyreiabc123",
104 });
105
106 const result = storage.getRootCidForRev("did:plc:unknown", "rev-001");
107 expect(result).toBeNull();
108 });
109
110 it("excludes failed syncs", () => {
111 const eventId = storage.startSyncEvent(TEST_DID, "pds");
112 storage.completeSyncEvent(eventId, {
113 status: "error",
114 rev: "rev-001",
115 rootCid: "bafyreiabc123",
116 errorMessage: "fetch failed",
117 });
118
119 const result = storage.getRootCidForRev(TEST_DID, "rev-001");
120 expect(result).toBeNull();
121 });
122
123 it("excludes syncs without root_cid", () => {
124 const eventId = storage.startSyncEvent(TEST_DID, "pds");
125 storage.completeSyncEvent(eventId, {
126 status: "success",
127 rev: "rev-001",
128 });
129
130 const result = storage.getRootCidForRev(TEST_DID, "rev-001");
131 expect(result).toBeNull();
132 });
133
134 it("returns most recent root_cid when multiple syncs exist for same rev", () => {
135 const event1 = storage.startSyncEvent(TEST_DID, "pds");
136 storage.completeSyncEvent(event1, {
137 status: "success",
138 rev: "rev-001",
139 rootCid: "bafyreiold",
140 });
141
142 const event2 = storage.startSyncEvent(TEST_DID, "pds");
143 storage.completeSyncEvent(event2, {
144 status: "success",
145 rev: "rev-001",
146 rootCid: "bafyreinew",
147 });
148
149 const result = storage.getRootCidForRev(TEST_DID, "rev-001");
150 expect(result).toBe("bafyreinew");
151 });
152});
153
154// ============================================
155// Incremental CAR generation
156// ============================================
157
158describe("Incremental CAR generation", () => {
159 let tmpDir: string;
160 let db: InstanceType<typeof Database>;
161 let ipfsService: IpfsService;
162 let repoManager: RepoManager;
163 let syncStorage: SyncStorage;
164
165 beforeEach(async () => {
166 tmpDir = mkdtempSync(join(tmpdir(), "incremental-car-test-"));
167 const config = testConfig(tmpDir);
168
169 db = new Database(join(tmpDir, "test.db"));
170 ipfsService = new IpfsService({
171 db,
172 networking: false,
173 });
174 await ipfsService.start();
175
176 repoManager = new RepoManager(db, config);
177 repoManager.init(undefined, ipfsService, ipfsService);
178
179 syncStorage = new SyncStorage(db);
180 syncStorage.initSchema();
181 });
182
183 afterEach(async () => {
184 if (ipfsService.isRunning()) {
185 await ipfsService.stop();
186 }
187 db.close();
188 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
189 });
190
191 /**
192 * Helper: snapshot current repo state into IPFS + SyncStorage.
193 * Returns { rev, rootCid, blockCids }.
194 */
195 async function snapshotRepo(): Promise<{ rev: string; rootCid: string; blockCids: string[] }> {
196 const status = await repoManager.getRepoStatus();
197 const carBytes = await repoManager.getRepoCar();
198 const { root, blocks } = await readCarWithRoot(carBytes);
199 await ipfsService.putBlocks(blocks);
200
201 const rootCid = root.toString();
202 const blockCids: string[] = [];
203 const blockMap = (blocks as unknown as { map: Map<string, Uint8Array> }).map;
204 for (const [cidStr] of blockMap) {
205 blockCids.push(cidStr);
206 }
207
208 // Track in sync storage
209 syncStorage.upsertState({ did: TEST_DID, pdsEndpoint: "https://pds.example.com" });
210 syncStorage.updateSyncProgress(TEST_DID, status.rev, rootCid);
211 syncStorage.clearBlocks(TEST_DID);
212 syncStorage.trackBlocks(TEST_DID, blockCids);
213
214 // Record in sync history
215 const eventId = syncStorage.startSyncEvent(TEST_DID, "test");
216 syncStorage.completeSyncEvent(eventId, {
217 status: "success",
218 rev: status.rev,
219 rootCid,
220 blocksAdded: blockCids.length,
221 });
222
223 return { rev: status.rev, rootCid, blockCids };
224 }
225
226 it("returns smaller CAR for incremental sync after changes", async () => {
227 // Create initial records
228 await repoManager.createRecord("app.bsky.feed.post", undefined, {
229 $type: "app.bsky.feed.post",
230 text: "First post",
231 createdAt: "2025-01-01T00:00:00.000Z",
232 });
233 await repoManager.createRecord("app.bsky.feed.post", undefined, {
234 $type: "app.bsky.feed.post",
235 text: "Second post",
236 createdAt: "2025-01-01T00:00:01.000Z",
237 });
238
239 const snapshot1 = await snapshotRepo();
240
241 // Add more records
242 await repoManager.createRecord("app.bsky.feed.post", undefined, {
243 $type: "app.bsky.feed.post",
244 text: "Third post",
245 createdAt: "2025-01-01T00:00:02.000Z",
246 });
247
248 const snapshot2 = await snapshotRepo();
249
250 // Full CAR (no since)
251 const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage);
252 expect(fullCar).not.toBeNull();
253
254 // Incremental CAR (since first snapshot)
255 const incrementalCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, snapshot1.rev);
256 expect(incrementalCar).not.toBeNull();
257
258 // Incremental should be smaller than full
259 expect(incrementalCar!.length).toBeLessThan(fullCar!.length);
260 });
261
262 it("falls back to full CAR for unknown since rev", async () => {
263 await repoManager.createRecord("app.bsky.feed.post", undefined, {
264 $type: "app.bsky.feed.post",
265 text: "Hello",
266 createdAt: "2025-01-01T00:00:00.000Z",
267 });
268
269 await snapshotRepo();
270
271 // Full CAR
272 const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage);
273 expect(fullCar).not.toBeNull();
274
275 // Incremental with unknown rev → should fall back to full
276 const fallbackCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, "unknown-rev");
277 expect(fallbackCar).not.toBeNull();
278
279 // Should be same size as full (fallback)
280 expect(fallbackCar!.length).toBe(fullCar!.length);
281 });
282
283 it("returns minimal CAR when since matches current rev", async () => {
284 await repoManager.createRecord("app.bsky.feed.post", undefined, {
285 $type: "app.bsky.feed.post",
286 text: "Hello",
287 createdAt: "2025-01-01T00:00:00.000Z",
288 });
289
290 const snapshot = await snapshotRepo();
291
292 // Full CAR
293 const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage);
294 expect(fullCar).not.toBeNull();
295
296 // Incremental with since = current rev → minimal CAR (just commit block)
297 const minimalCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, snapshot.rev);
298 expect(minimalCar).not.toBeNull();
299
300 // Minimal should be much smaller than full
301 expect(minimalCar!.length).toBeLessThan(fullCar!.length);
302
303 // Parse the minimal CAR to verify it has blocks
304 const { root } = await readCarWithRoot(minimalCar!);
305 expect(root.toString()).toBe(snapshot.rootCid);
306 });
307
308 it("returns null for DID with no sync state", async () => {
309 const result = await generateCarForDid("did:plc:nonexistent", ipfsService, syncStorage);
310 expect(result).toBeNull();
311 });
312
313 it("returns null for DID with no sync state even with since", async () => {
314 const result = await generateCarForDid("did:plc:nonexistent", ipfsService, syncStorage, "some-rev");
315 expect(result).toBeNull();
316 });
317});