atproto user agency toolkit for individuals and groups
1/**
2 * Two-node DID-less startup test.
3 *
4 * Two clean p2pds servers start without any DID. Each establishes identity
5 * (simulating OAuth), then each replicates a different external account
6 * from mock PDSes. Verifies the full flow: startup → identity → add DID → sync.
7 */
8
9import { describe, it, expect, afterEach } from "vitest";
10import { mkdtempSync, rmSync } from "node:fs";
11import { tmpdir } from "node:os";
12import { join, resolve } from "node:path";
13import Database from "better-sqlite3";
14
15import type { Config } from "./config.js";
16import { startServer, type ServerHandle } from "./start.js";
17import {
18 createTestRepo,
19 startMockPds,
20 createMockDidResolver,
21 type MockPds,
22} from "./replication/test-helpers.js";
23
24const DID_ALICE = "did:plc:alice111";
25const DID_BOB = "did:plc:bob222";
26
27function didlessConfig(dataDir: string): Config {
28 return {
29 PDS_HOSTNAME: "local.test",
30 AUTH_TOKEN: "test-auth-token",
31 JWT_SECRET: "test-jwt-secret",
32 PASSWORD_HASH: "$2a$10$test",
33 DATA_DIR: dataDir,
34 PORT: 0,
35 IPFS_ENABLED: true,
36 IPFS_NETWORKING: false,
37 REPLICATE_DIDS: [],
38 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
39 FIREHOSE_ENABLED: false,
40 RATE_LIMIT_ENABLED: false,
41 RATE_LIMIT_READ_PER_MIN: 300,
42 RATE_LIMIT_SYNC_PER_MIN: 30,
43 RATE_LIMIT_SESSION_PER_MIN: 10,
44 RATE_LIMIT_WRITE_PER_MIN: 200,
45 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
46 RATE_LIMIT_MAX_CONNECTIONS: 100,
47 RATE_LIMIT_FIREHOSE_PER_IP: 3,
48 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
49 };
50}
51
52describe("Two-node DID-less replication", () => {
53 let tmpDirA: string;
54 let tmpDirB: string;
55 let handleA: ServerHandle | undefined;
56 let handleB: ServerHandle | undefined;
57 let mockPds: MockPds | undefined;
58
59 afterEach(async () => {
60 if (handleA) { await handleA.close(); handleA = undefined; }
61 if (handleB) { await handleB.close(); handleB = undefined; }
62 if (mockPds) { await mockPds.close(); mockPds = undefined; }
63 if (tmpDirA) rmSync(tmpDirA, { recursive: true, force: true });
64 if (tmpDirB) rmSync(tmpDirB, { recursive: true, force: true });
65 });
66
67 it("two clean nodes establish identity and replicate different accounts", async () => {
68 tmpDirA = mkdtempSync(join(tmpdir(), "two-node-a-"));
69 tmpDirB = mkdtempSync(join(tmpdir(), "two-node-b-"));
70
71 // Create test repos for two external accounts
72 const aliceCar = await createTestRepo(DID_ALICE, [
73 { collection: "app.bsky.feed.post", rkey: "a1", record: { text: "Alice post 1", createdAt: new Date().toISOString() } },
74 { collection: "app.bsky.feed.post", rkey: "a2", record: { text: "Alice post 2", createdAt: new Date().toISOString() } },
75 ]);
76 const bobCar = await createTestRepo(DID_BOB, [
77 { collection: "app.bsky.feed.post", rkey: "b1", record: { text: "Bob post 1", createdAt: new Date().toISOString() } },
78 ]);
79
80 // Mock PDS serves both accounts
81 mockPds = await startMockPds([
82 { did: DID_ALICE, carBytes: aliceCar },
83 { did: DID_BOB, carBytes: bobCar },
84 ]);
85 const mockResolver = createMockDidResolver({
86 [DID_ALICE]: mockPds.url,
87 [DID_BOB]: mockPds.url,
88 });
89
90 // Start two clean servers — no DID, no signing key
91 const configA = didlessConfig(tmpDirA);
92 const configB = didlessConfig(tmpDirB);
93 handleA = await startServer(configA, { didResolver: mockResolver });
94 handleB = await startServer(configB, { didResolver: mockResolver });
95
96 // Both should be healthy without DID
97 const healthA = await fetch(`${handleA.url}/xrpc/_health`);
98 const healthB = await fetch(`${handleB.url}/xrpc/_health`);
99 expect(healthA.status).toBe(200);
100 expect(healthB.status).toBe(200);
101
102 // Both should have replication managers
103 expect(handleA.replicationManager).toBeDefined();
104 expect(handleB.replicationManager).toBeDefined();
105
106 // Simulate OAuth identity establishment on each node
107 // (In production this happens in the OAuth callback)
108 const dbA = new Database(resolve(tmpDirA, "pds.db"));
109 dbA.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(
110 "did:plc:node-a-identity",
111 "node-a.test",
112 );
113 configA.DID = "did:plc:node-a-identity";
114 configA.HANDLE = "node-a.test";
115 dbA.close();
116
117 const dbB = new Database(resolve(tmpDirB, "pds.db"));
118 dbB.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(
119 "did:plc:node-b-identity",
120 "node-b.test",
121 );
122 configB.DID = "did:plc:node-b-identity";
123 configB.HANDLE = "node-b.test";
124 dbB.close();
125
126 // Verify identity shows up in overview
127 const overviewA = await fetch(`${handleA.url}/xrpc/org.p2pds.app.getOverview`, {
128 headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` },
129 });
130 expect(overviewA.status).toBe(200);
131 const ovA = (await overviewA.json()) as { did: string };
132 expect(ovA.did).toBe("did:plc:node-a-identity");
133
134 // Node A replicates Alice's account
135 const addAlice = await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, {
136 method: "POST",
137 headers: {
138 Authorization: `Bearer ${configA.AUTH_TOKEN}`,
139 "Content-Type": "application/json",
140 },
141 body: JSON.stringify({ did: DID_ALICE }),
142 });
143 expect(addAlice.status).toBe(200);
144
145 // Node B replicates Bob's account
146 const addBob = await fetch(`${handleB.url}/xrpc/org.p2pds.app.addDid`, {
147 method: "POST",
148 headers: {
149 Authorization: `Bearer ${configB.AUTH_TOKEN}`,
150 "Content-Type": "application/json",
151 },
152 body: JSON.stringify({ did: DID_BOB }),
153 });
154 expect(addBob.status).toBe(200);
155
156 // Trigger sync on both
157 await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, {
158 method: "POST",
159 headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` },
160 });
161 await fetch(`${handleB.url}/xrpc/org.p2pds.replication.syncNow`, {
162 method: "POST",
163 headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` },
164 });
165
166 // Wait for async sync
167 await new Promise((r) => setTimeout(r, 3000));
168
169 // Verify Node A synced Alice's data
170 const statusA = await fetch(
171 `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`,
172 { headers: { Authorization: `Bearer ${configA.AUTH_TOKEN}` } },
173 );
174 expect(statusA.status).toBe(200);
175 const didStatusA = (await statusA.json()) as {
176 did: string;
177 blockCount: number;
178 syncState: { status: string };
179 };
180 expect(didStatusA.did).toBe(DID_ALICE);
181 expect(didStatusA.blockCount).toBeGreaterThan(0);
182 expect(didStatusA.syncState.status).toBe("synced");
183
184 // Verify Node B synced Bob's data
185 const statusB = await fetch(
186 `${handleB.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_BOB}`,
187 { headers: { Authorization: `Bearer ${configB.AUTH_TOKEN}` } },
188 );
189 expect(statusB.status).toBe(200);
190 const didStatusB = (await statusB.json()) as {
191 did: string;
192 blockCount: number;
193 syncState: { status: string };
194 };
195 expect(didStatusB.did).toBe(DID_BOB);
196 expect(didStatusB.blockCount).toBeGreaterThan(0);
197 expect(didStatusB.syncState.status).toBe("synced");
198 }, 30_000);
199
200 it("identity persists across restart and replication continues", async () => {
201 tmpDirA = mkdtempSync(join(tmpdir(), "two-node-restart-"));
202 tmpDirB = ""; // not used
203
204 // Create test repo
205 const aliceCar = await createTestRepo(DID_ALICE, [
206 { collection: "app.bsky.feed.post", rkey: "r1", record: { text: "restart test", createdAt: new Date().toISOString() } },
207 ]);
208 mockPds = await startMockPds([{ did: DID_ALICE, carBytes: aliceCar }]);
209 const mockResolver = createMockDidResolver({ [DID_ALICE]: mockPds.url });
210
211 // First boot: start clean, establish identity, add DID, sync
212 const config1 = didlessConfig(tmpDirA);
213 handleA = await startServer(config1, { didResolver: mockResolver });
214
215 // Simulate identity establishment
216 const db1 = new Database(resolve(tmpDirA, "pds.db"));
217 db1.prepare("INSERT INTO node_identity (did, handle) VALUES (?, ?)").run(
218 "did:plc:persistent-node",
219 "persistent.test",
220 );
221 config1.DID = "did:plc:persistent-node";
222 db1.close();
223
224 // Add and sync
225 await fetch(`${handleA.url}/xrpc/org.p2pds.app.addDid`, {
226 method: "POST",
227 headers: {
228 Authorization: `Bearer ${config1.AUTH_TOKEN}`,
229 "Content-Type": "application/json",
230 },
231 body: JSON.stringify({ did: DID_ALICE }),
232 });
233 await fetch(`${handleA.url}/xrpc/org.p2pds.replication.syncNow`, {
234 method: "POST",
235 headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}` },
236 });
237 await new Promise((r) => setTimeout(r, 2000));
238
239 // Verify sync worked
240 const status1 = await fetch(
241 `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`,
242 { headers: { Authorization: `Bearer ${config1.AUTH_TOKEN}` } },
243 );
244 const ds1 = (await status1.json()) as { blockCount: number };
245 expect(ds1.blockCount).toBeGreaterThan(0);
246
247 // Shut down
248 await handleA.close();
249 handleA = undefined;
250
251 // Second boot: restart with same data dir — identity should load automatically
252 const config2 = didlessConfig(tmpDirA);
253 expect(config2.DID).toBeUndefined(); // not set in config
254 handleA = await startServer(config2, { didResolver: mockResolver });
255
256 // Identity should have been loaded from node_identity
257 expect(config2.DID).toBe("did:plc:persistent-node");
258 expect(config2.HANDLE).toBe("persistent.test");
259
260 // Replication state should persist — DID_ALICE is still tracked
261 const overview = await fetch(`${handleA.url}/xrpc/org.p2pds.app.getOverview`, {
262 headers: { Authorization: `Bearer ${config2.AUTH_TOKEN}` },
263 });
264 const ov = (await overview.json()) as {
265 did: string;
266 replication: { trackedDids: string[] };
267 };
268 expect(ov.did).toBe("did:plc:persistent-node");
269 expect(ov.replication.trackedDids).toContain(DID_ALICE);
270
271 // Blocks should still be in IPFS (persisted to disk)
272 const status2 = await fetch(
273 `${handleA.url}/xrpc/org.p2pds.app.getDidStatus?did=${DID_ALICE}`,
274 { headers: { Authorization: `Bearer ${config2.AUTH_TOKEN}` } },
275 );
276 const ds2 = (await status2.json()) as {
277 blockCount: number;
278 syncState: { status: string };
279 };
280 expect(ds2.blockCount).toBeGreaterThan(0);
281 expect(ds2.syncState.status).toBe("synced");
282 }, 30_000);
283});