atproto user agency toolkit for individuals and groups
at main 317 lines 9.7 kB view raw
1/** 2 * Tests for incremental sync: 3 * - SyncStorage.getRootCidForRev() 4 * - generateCarForDid() with since parameter (incremental CAR) 5 */ 6 7import { describe, it, expect, beforeEach, afterEach } from "vitest"; 8import { mkdtempSync, rmSync } from "node:fs"; 9import { tmpdir } from "node:os"; 10import { join } from "node:path"; 11import Database from "better-sqlite3"; 12import { IpfsService } from "../ipfs.js"; 13import { RepoManager } from "../repo-manager.js"; 14import type { Config } from "../config.js"; 15import { readCarWithRoot } from "@atproto/repo"; 16 17import { SyncStorage } from "./sync-storage.js"; 18import { generateCarForDid } from "../xrpc/sync.js"; 19 20function testConfig(dataDir: string): Config { 21 return { 22 DID: "did:plc:test123", 23 HANDLE: "test.example.com", 24 PDS_HOSTNAME: "test.example.com", 25 AUTH_TOKEN: "test-auth-token", 26 SIGNING_KEY: 27 "0000000000000000000000000000000000000000000000000000000000000001", 28 SIGNING_KEY_PUBLIC: 29 "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 30 JWT_SECRET: "test-jwt-secret", 31 PASSWORD_HASH: "$2a$10$test", 32 DATA_DIR: dataDir, 33 PORT: 3000, 34 IPFS_ENABLED: true, 35 IPFS_NETWORKING: false, 36 REPLICATE_DIDS: [], 37 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 38 FIREHOSE_ENABLED: false, 39 RATE_LIMIT_ENABLED: false, 40 RATE_LIMIT_READ_PER_MIN: 300, 41 RATE_LIMIT_SYNC_PER_MIN: 30, 42 RATE_LIMIT_SESSION_PER_MIN: 10, 43 RATE_LIMIT_WRITE_PER_MIN: 200, 44 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 45 RATE_LIMIT_MAX_CONNECTIONS: 100, 46 RATE_LIMIT_FIREHOSE_PER_IP: 3, 47 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 48 }; 49} 50 51const TEST_DID = "did:plc:testrepo"; 52 53// ============================================ 54// SyncStorage.getRootCidForRev() 55// ============================================ 56 57describe("SyncStorage.getRootCidForRev", () => { 58 let tmpDir: string; 59 let db: InstanceType<typeof Database>; 60 let storage: SyncStorage; 61 62 beforeEach(() => { 63 tmpDir = mkdtempSync(join(tmpdir(), "incremental-sync-test-")); 64 db = new Database(join(tmpDir, "test.db")); 65 storage = new SyncStorage(db); 66 storage.initSchema(); 67 }); 68 69 afterEach(() => { 70 db.close(); 71 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 72 }); 73 74 it("returns root_cid for a known successful sync", () => { 75 const eventId = storage.startSyncEvent(TEST_DID, "pds"); 76 storage.completeSyncEvent(eventId, { 77 status: "success", 78 rev: "rev-001", 79 rootCid: "bafyreiabc123", 80 }); 81 82 const result = storage.getRootCidForRev(TEST_DID, "rev-001"); 83 expect(result).toBe("bafyreiabc123"); 84 }); 85 86 it("returns null for unknown rev", () => { 87 const eventId = storage.startSyncEvent(TEST_DID, "pds"); 88 storage.completeSyncEvent(eventId, { 89 status: "success", 90 rev: "rev-001", 91 rootCid: "bafyreiabc123", 92 }); 93 94 const result = storage.getRootCidForRev(TEST_DID, "rev-unknown"); 95 expect(result).toBeNull(); 96 }); 97 98 it("returns null for unknown DID", () => { 99 const eventId = storage.startSyncEvent(TEST_DID, "pds"); 100 storage.completeSyncEvent(eventId, { 101 status: "success", 102 rev: "rev-001", 103 rootCid: "bafyreiabc123", 104 }); 105 106 const result = storage.getRootCidForRev("did:plc:unknown", "rev-001"); 107 expect(result).toBeNull(); 108 }); 109 110 it("excludes failed syncs", () => { 111 const eventId = storage.startSyncEvent(TEST_DID, "pds"); 112 storage.completeSyncEvent(eventId, { 113 status: "error", 114 rev: "rev-001", 115 rootCid: "bafyreiabc123", 116 errorMessage: "fetch failed", 117 }); 118 119 const result = storage.getRootCidForRev(TEST_DID, "rev-001"); 120 expect(result).toBeNull(); 121 }); 122 123 it("excludes syncs without root_cid", () => { 124 const eventId = storage.startSyncEvent(TEST_DID, "pds"); 125 storage.completeSyncEvent(eventId, { 126 status: "success", 127 rev: "rev-001", 128 }); 129 130 const result = storage.getRootCidForRev(TEST_DID, "rev-001"); 131 expect(result).toBeNull(); 132 }); 133 134 it("returns most recent root_cid when multiple syncs exist for same rev", () => { 135 const event1 = storage.startSyncEvent(TEST_DID, "pds"); 136 storage.completeSyncEvent(event1, { 137 status: "success", 138 rev: "rev-001", 139 rootCid: "bafyreiold", 140 }); 141 142 const event2 = storage.startSyncEvent(TEST_DID, "pds"); 143 storage.completeSyncEvent(event2, { 144 status: "success", 145 rev: "rev-001", 146 rootCid: "bafyreinew", 147 }); 148 149 const result = storage.getRootCidForRev(TEST_DID, "rev-001"); 150 expect(result).toBe("bafyreinew"); 151 }); 152}); 153 154// ============================================ 155// Incremental CAR generation 156// ============================================ 157 158describe("Incremental CAR generation", () => { 159 let tmpDir: string; 160 let db: InstanceType<typeof Database>; 161 let ipfsService: IpfsService; 162 let repoManager: RepoManager; 163 let syncStorage: SyncStorage; 164 165 beforeEach(async () => { 166 tmpDir = mkdtempSync(join(tmpdir(), "incremental-car-test-")); 167 const config = testConfig(tmpDir); 168 169 db = new Database(join(tmpDir, "test.db")); 170 ipfsService = new IpfsService({ 171 db, 172 networking: false, 173 }); 174 await ipfsService.start(); 175 176 repoManager = new RepoManager(db, config); 177 repoManager.init(undefined, ipfsService, ipfsService); 178 179 syncStorage = new SyncStorage(db); 180 syncStorage.initSchema(); 181 }); 182 183 afterEach(async () => { 184 if (ipfsService.isRunning()) { 185 await ipfsService.stop(); 186 } 187 db.close(); 188 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 189 }); 190 191 /** 192 * Helper: snapshot current repo state into IPFS + SyncStorage. 193 * Returns { rev, rootCid, blockCids }. 194 */ 195 async function snapshotRepo(): Promise<{ rev: string; rootCid: string; blockCids: string[] }> { 196 const status = await repoManager.getRepoStatus(); 197 const carBytes = await repoManager.getRepoCar(); 198 const { root, blocks } = await readCarWithRoot(carBytes); 199 await ipfsService.putBlocks(blocks); 200 201 const rootCid = root.toString(); 202 const blockCids: string[] = []; 203 const blockMap = (blocks as unknown as { map: Map<string, Uint8Array> }).map; 204 for (const [cidStr] of blockMap) { 205 blockCids.push(cidStr); 206 } 207 208 // Track in sync storage 209 syncStorage.upsertState({ did: TEST_DID, pdsEndpoint: "https://pds.example.com" }); 210 syncStorage.updateSyncProgress(TEST_DID, status.rev, rootCid); 211 syncStorage.clearBlocks(TEST_DID); 212 syncStorage.trackBlocks(TEST_DID, blockCids); 213 214 // Record in sync history 215 const eventId = syncStorage.startSyncEvent(TEST_DID, "test"); 216 syncStorage.completeSyncEvent(eventId, { 217 status: "success", 218 rev: status.rev, 219 rootCid, 220 blocksAdded: blockCids.length, 221 }); 222 223 return { rev: status.rev, rootCid, blockCids }; 224 } 225 226 it("returns smaller CAR for incremental sync after changes", async () => { 227 // Create initial records 228 await repoManager.createRecord("app.bsky.feed.post", undefined, { 229 $type: "app.bsky.feed.post", 230 text: "First post", 231 createdAt: "2025-01-01T00:00:00.000Z", 232 }); 233 await repoManager.createRecord("app.bsky.feed.post", undefined, { 234 $type: "app.bsky.feed.post", 235 text: "Second post", 236 createdAt: "2025-01-01T00:00:01.000Z", 237 }); 238 239 const snapshot1 = await snapshotRepo(); 240 241 // Add more records 242 await repoManager.createRecord("app.bsky.feed.post", undefined, { 243 $type: "app.bsky.feed.post", 244 text: "Third post", 245 createdAt: "2025-01-01T00:00:02.000Z", 246 }); 247 248 const snapshot2 = await snapshotRepo(); 249 250 // Full CAR (no since) 251 const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); 252 expect(fullCar).not.toBeNull(); 253 254 // Incremental CAR (since first snapshot) 255 const incrementalCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, snapshot1.rev); 256 expect(incrementalCar).not.toBeNull(); 257 258 // Incremental should be smaller than full 259 expect(incrementalCar!.length).toBeLessThan(fullCar!.length); 260 }); 261 262 it("falls back to full CAR for unknown since rev", async () => { 263 await repoManager.createRecord("app.bsky.feed.post", undefined, { 264 $type: "app.bsky.feed.post", 265 text: "Hello", 266 createdAt: "2025-01-01T00:00:00.000Z", 267 }); 268 269 await snapshotRepo(); 270 271 // Full CAR 272 const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); 273 expect(fullCar).not.toBeNull(); 274 275 // Incremental with unknown rev → should fall back to full 276 const fallbackCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, "unknown-rev"); 277 expect(fallbackCar).not.toBeNull(); 278 279 // Should be same size as full (fallback) 280 expect(fallbackCar!.length).toBe(fullCar!.length); 281 }); 282 283 it("returns minimal CAR when since matches current rev", async () => { 284 await repoManager.createRecord("app.bsky.feed.post", undefined, { 285 $type: "app.bsky.feed.post", 286 text: "Hello", 287 createdAt: "2025-01-01T00:00:00.000Z", 288 }); 289 290 const snapshot = await snapshotRepo(); 291 292 // Full CAR 293 const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); 294 expect(fullCar).not.toBeNull(); 295 296 // Incremental with since = current rev → minimal CAR (just commit block) 297 const minimalCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, snapshot.rev); 298 expect(minimalCar).not.toBeNull(); 299 300 // Minimal should be much smaller than full 301 expect(minimalCar!.length).toBeLessThan(fullCar!.length); 302 303 // Parse the minimal CAR to verify it has blocks 304 const { root } = await readCarWithRoot(minimalCar!); 305 expect(root.toString()).toBe(snapshot.rootCid); 306 }); 307 308 it("returns null for DID with no sync state", async () => { 309 const result = await generateCarForDid("did:plc:nonexistent", ipfsService, syncStorage); 310 expect(result).toBeNull(); 311 }); 312 313 it("returns null for DID with no sync state even with since", async () => { 314 const result = await generateCarForDid("did:plc:nonexistent", ipfsService, syncStorage, "some-rev"); 315 expect(result).toBeNull(); 316 }); 317});