atproto user agency toolkit for individuals and groups
at main 283 lines 10 kB view raw
1/** 2 * Two-node DID-less startup test. 3 * 4 * Two clean p2pds servers start without any DID. Each establishes identity 5 * (simulating OAuth), then each replicates a different external account 6 * from mock PDSes. Verifies the full flow: startup → identity → add DID → sync. 7 */ 8 9import { describe, it, expect, afterEach } from "vitest"; 10import { mkdtempSync, rmSync } from "node:fs"; 11import { tmpdir } from "node:os"; 12import { join, resolve } from "node:path"; 13import Database from "better-sqlite3"; 14 15import type { Config } from "./config.js"; 16import { startServer, type ServerHandle } from "./start.js"; 17import { 18 createTestRepo, 19 startMockPds, 20 createMockDidResolver, 21 type MockPds, 22} from "./replication/test-helpers.js"; 23 24const DID_ALICE = "did:plc:alice111"; 25const DID_BOB = "did:plc:bob222"; 26 27function didlessConfig(dataDir: string): Config { 28 return { 29 PDS_HOSTNAME: "local.test", 30 AUTH_TOKEN: "test-auth-token", 31 JWT_SECRET: "test-jwt-secret", 32 PASSWORD_HASH: "$2a$10$test", 33 DATA_DIR: dataDir, 34 PORT: 0, 35 IPFS_ENABLED: true, 36 IPFS_NETWORKING: false, 37 REPLICATE_DIDS: [], 38 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 39 FIREHOSE_ENABLED: false, 40 RATE_LIMIT_ENABLED: false, 41 RATE_LIMIT_READ_PER_MIN: 300, 42 RATE_LIMIT_SYNC_PER_MIN: 30, 43 RATE_LIMIT_SESSION_PER_MIN: 10, 44 RATE_LIMIT_WRITE_PER_MIN: 200, 45 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 46 RATE_LIMIT_MAX_CONNECTIONS: 100, 47 RATE_LIMIT_FIREHOSE_PER_IP: 3, 48 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 49 }; 50} 51 52describe("Two-node DID-less replication", () => { 53 let tmpDirA: string; 54 let tmpDirB: string; 55 let handleA: ServerHandle | undefined; 56 let handleB: ServerHandle | undefined; 57 let mockPds: MockPds | undefined; 58 59 afterEach(async () => { 60 if (handleA) { await handleA.close(); handleA = undefined; } 61 if (handleB) { await handleB.close(); handleB = undefined; } 62 if (mockPds) { await mockPds.close(); mockPds = undefined; } 63 if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true }); 64 if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true }); 65 }); 66 67 it("two clean nodes establish identity and replicate different accounts", async () => { 68 tmpDirA = mkdtempSync(join(tmpdir(), "two-node-a-")); 69 tmpDirB = mkdtempSync(join(tmpdir(), "two-node-b-")); 70 71 // Create test repos for two external accounts 72 const aliceCar = await createTestRepo(DID_ALICE, [ 73 { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post 1", createdAt: new Date().toISOString() } }, 74 { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } }, 75 ]); 76 const bobCar = await createTestRepo(DID_BOB, [ 77 { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post 1", createdAt: new Date().toISOString() } }, 78 ]); 79 80 // Mock PDS serves both accounts 81 mockPds = await startMockPds([ 82 { did: DID_ALICE, carBytes: aliceCar }, 83 { did: DID_BOB, carBytes: bobCar }, 84 ]); 85 const mockResolver = createMockDidResolver({ 86 [DID_ALICE]: mockPds.url, 87 [DID_BOB]: mockPds.url, 88 }); 89 90 // Start two clean servers — no DID, no signing key 91 const configA = didlessConfig(tmpDirA); 92 const configB = didlessConfig(tmpDirB); 93 handleA = await startServer(configA, { didResolver: mockResolver }); 94 handleB = await startServer(configB, { didResolver: mockResolver }); 95 96 // Both should be healthy without DID 97 const healthA = await fetch(`${handleA.url}/xrpc/_health`); 98 const healthB = await fetch(`${handleB.url}/xrpc/_health`); 99 expect(healthA.status).toBe(200); 100 expect(healthB.status).toBe(200); 101 102 // Both should have replication managers 103 expect(handleA.replicationManager).toBeDefined(); 104 expect(handleB.replicationManager).toBeDefined(); 105 106 // Simulate OAuth identity establishment on each node 107 // (In production this happens in the OAuth callback) 108 const dbA = new Database(resolve(tmpDirA, "pds.db")); 109 dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( 110 "did:plc:node-a-identity", 111 "node-a.test", 112 ); 113 configA.DID = "did:plc:node-a-identity"; 114 configA.HANDLE = "node-a.test"; 115 dbA.close(); 116 117 const dbB = new Database(resolve(tmpDirB, "pds.db")); 118 dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( 119 "did:plc:node-b-identity", 120 "node-b.test", 121 ); 122 configB.DID = "did:plc:node-b-identity"; 123 configB.HANDLE = "node-b.test"; 124 dbB.close(); 125 126 // Verify identity shows up in overview 127 const overviewA = await fetch(`${handleA.url}/xrpc/org.p2pds.app.getOverview`, { 128 headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, 129 }); 130 expect(overviewA.status).toBe(200); 131 const ovA = (await overviewA.json()) as { did: string }; 132 expect(ovA.did).toBe("did:plc:node-a-identity"); 133 134 // Node A replicates Alice's account 135 const addAlice = await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, { 136 method: "POST", 137 headers: { 138 Authorization: `Bearer ${configA.AUTH_TOKEN}`, 139 "Content-Type": "application/json", 140 }, 141 body: JSON.stringify({ did: DID_ALICE }), 142 }); 143 expect(addAlice.status).toBe(200); 144 145 // Node B replicates Bob's account 146 const addBob = await fetch(`${handleB.url}/xrpc/org.p2pds.app.addDid`, { 147 method: "POST", 148 headers: { 149 Authorization: `Bearer ${configB.AUTH_TOKEN}`, 150 "Content-Type": "application/json", 151 }, 152 body: JSON.stringify({ did: DID_BOB }), 153 }); 154 expect(addBob.status).toBe(200); 155 156 // Trigger sync on both 157 await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { 158 method: "POST", 159 headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` }, 160 }); 161 await fetch(`${handleB.url}/xrpc/org.p2pds.replication.syncNow`, { 162 method: "POST", 163 headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` }, 164 }); 165 166 // Wait for async sync 167 await new Promise((r) => setTimeout(r, 3000)); 168 169 // Verify Node A synced Alice's data 170 const statusA = await fetch( 171 `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`, 172 { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` } }, 173 ); 174 expect(statusA.status).toBe(200); 175 const didStatusA = (await statusA.json()) as { 176 did: string; 177 blockCount: number; 178 syncState: { status: string }; 179 }; 180 expect(didStatusA.did).toBe(DID_ALICE); 181 expect(didStatusA.blockCount).toBeGreaterThan(0); 182 expect(didStatusA.syncState.status).toBe("synced"); 183 184 // Verify Node B synced Bob's data 185 const statusB = await fetch( 186 `${handleB.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_BOB}`, 187 { headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` } }, 188 ); 189 expect(statusB.status).toBe(200); 190 const didStatusB = (await statusB.json()) as { 191 did: string; 192 blockCount: number; 193 syncState: { status: string }; 194 }; 195 expect(didStatusB.did).toBe(DID_BOB); 196 expect(didStatusB.blockCount).toBeGreaterThan(0); 197 expect(didStatusB.syncState.status).toBe("synced"); 198 }, 30_000); 199 200 it("identity persists across restart and replication continues", async () => { 201 tmpDirA = mkdtempSync(join(tmpdir(), "two-node-restart-")); 202 tmpDirB = ""; // not used 203 204 // Create test repo 205 const aliceCar = await createTestRepo(DID_ALICE, [ 206 { collection: "app.bsky.feed.post", rkey: "r1", record: { text: "restart test", createdAt: new Date().toISOString() } }, 207 ]); 208 mockPds = await startMockPds([{ did: DID_ALICE, carBytes: aliceCar }]); 209 const mockResolver = createMockDidResolver({ [DID_ALICE]: mockPds.url }); 210 211 // First boot: start clean, establish identity, add DID, sync 212 const config1 = didlessConfig(tmpDirA); 213 handleA = await startServer(config1, { didResolver: mockResolver }); 214 215 // Simulate identity establishment 216 const db1 = new Database(resolve(tmpDirA, "pds.db")); 217 db1.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run( 218 "did:plc:persistent-node", 219 "persistent.test", 220 ); 221 config1.DID = "did:plc:persistent-node"; 222 db1.close(); 223 224 // Add and sync 225 await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, { 226 method: "POST", 227 headers: { 228 Authorization: `Bearer ${config1.AUTH_TOKEN}`, 229 "Content-Type": "application/json", 230 }, 231 body: JSON.stringify({ did: DID_ALICE }), 232 }); 233 await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, { 234 method: "POST", 235 headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}` }, 236 }); 237 await new Promise((r) => setTimeout(r, 2000)); 238 239 // Verify sync worked 240 const status1 = await fetch( 241 `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`, 242 { headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}` } }, 243 ); 244 const ds1 = (await status1.json()) as { blockCount: number }; 245 expect(ds1.blockCount).toBeGreaterThan(0); 246 247 // Shut down 248 await handleA.close(); 249 handleA = undefined; 250 251 // Second boot: restart with same data dir — identity should load automatically 252 const config2 = didlessConfig(tmpDirA); 253 expect(config2.DID).toBeUndefined(); // not set in config 254 handleA = await startServer(config2, { didResolver: mockResolver }); 255 256 // Identity should have been loaded from node_identity 257 expect(config2.DID).toBe("did:plc:persistent-node"); 258 expect(config2.HANDLE).toBe("persistent.test"); 259 260 // Replication state should persist — DID_ALICE is still tracked 261 const overview = await fetch(`${handleA.url}/xrpc/org.p2pds.app.getOverview`, { 262 headers: { Authorization: `Bearer ${config2.AUTH_TOKEN}` }, 263 }); 264 const ov = (await overview.json()) as { 265 did: string; 266 replication: { trackedDids: string[] }; 267 }; 268 expect(ov.did).toBe("did:plc:persistent-node"); 269 expect(ov.replication.trackedDids).toContain(DID_ALICE); 270 271 // Blocks should still be in IPFS (persisted to disk) 272 const status2 = await fetch( 273 `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`, 274 { headers: { Authorization: `Bearer ${config2.AUTH_TOKEN}` } }, 275 ); 276 const ds2 = (await status2.json()) as { 277 blockCount: number; 278 syncState: { status: string }; 279 }; 280 expect(ds2.blockCount).toBeGreaterThan(0); 281 expect(ds2.syncState.status).toBe("synced"); 282 }, 30_000); 283});