atproto user agency toolkit for individuals and groups
1/**
2 * Extracted server startup logic — importable and testable.
3 *
4 * startServer(config, opts?) does everything server.ts used to do at module scope:
5 * creates DB, IPFS, replication, HTTP server, etc. and returns a ServerHandle
6 * for programmatic control (tests, Tauri sidecar, etc.).
7 */
8
9import { getRequestListener } from "@hono/node-server";
10import { createServer, type Server } from "node:http";
11import { mkdirSync } from "node:fs";
12import { resolve } from "node:path";
13import Database from "better-sqlite3";
14import { WebSocketServer } from "ws";
15import pc from "picocolors";
16
17import { loadPolicies } from "./config.js";
18import type { Config } from "./config.js";
19import { RepoManager } from "./repo-manager.js";
20import { BlobStore } from "./blobs.js";
21import { Firehose } from "./firehose.js";
22import { IpfsService } from "./ipfs.js";
23import { createApp } from "./index.js";
24import { DidResolver } from "./did-resolver.js";
25import { InMemoryDidCache } from "./did-cache.js";
26import { ReplicationManager } from "./replication/replication-manager.js";
27import { ReplicatedRepoReader } from "./replication/replicated-repo-reader.js";
28import { PolicyEngine } from "./policy/engine.js";
29import { PolicyStorage } from "./policy/storage.js";
30import { migrateToNewPolicies } from "./policy/migrate.js";
31import { HttpChallengeTransport } from "./replication/challenge-response/http-transport.js";
32import { Libp2pChallengeTransport } from "./replication/challenge-response/libp2p-transport.js";
33import { FailoverChallengeTransport } from "./replication/challenge-response/failover-transport.js";
34import type { ChallengeTransport } from "./replication/challenge-response/transport.js";
35import type { Libp2p } from "@libp2p/interface";
36import { RateLimiter } from "./rate-limiter.js";
37import { createOAuthClient, type OAuthClientManager } from "./oauth/client.js";
38import { PdsClient } from "./oauth/pds-client.js";
39import { type PdsClientRef, publishPeerRecord } from "./oauth/routes.js";
40import { registerRepoSyncProtocol } from "./replication/libp2p-sync.js";
41
42export interface StartServerOpts {
43 /** Override DID resolver (e.g. mock resolver for tests). */
44 didResolver?: DidResolver;
45}
46
47export interface ServerHandle {
48 url: string;
49 port: number;
50 close: () => Promise<void>;
51 replicationManager?: ReplicationManager;
52 ipfsService?: IpfsService;
53}
54
55export async function startServer(
56 config: Config,
57 opts?: StartServerOpts,
58): Promise<ServerHandle> {
59 // Initialize rate limiter
60 let rateLimiter: RateLimiter | undefined;
61 if (config.RATE_LIMIT_ENABLED) {
62 rateLimiter = new RateLimiter();
63 rateLimiter.startCleanup(60_000);
64 }
65
66 // Ensure data directory exists
67 const dataDir = resolve(config.DATA_DIR);
68 mkdirSync(dataDir, { recursive: true });
69
70 // Initialize SQLite database
71 const dbPath = resolve(dataDir, "pds.db");
72 const db = new Database(dbPath);
73 db.pragma("journal_mode = WAL");
74 db.pragma("synchronous = NORMAL");
75
76 // Create node_identity table for OAuth-established identity
77 db.exec(`
78 CREATE TABLE IF NOT EXISTS node_identity (
79 did TEXT PRIMARY KEY,
80 handle TEXT,
81 created_at TEXT NOT NULL DEFAULT (datetime('now'))
82 )
83 `);
84
85 // Persist AUTH_TOKEN so the UI token survives restarts
86 db.exec(`
87 CREATE TABLE IF NOT EXISTS node_settings (
88 key TEXT PRIMARY KEY,
89 value TEXT NOT NULL
90 )
91 `);
92 if (!process.env.AUTH_TOKEN) {
93 const storedToken = db.prepare("SELECT value FROM node_settings WHERE key = 'auth_token'").get() as
94 | { value: string }
95 | undefined;
96 if (storedToken) {
97 config.AUTH_TOKEN = storedToken.value;
98 } else {
99 db.prepare("INSERT OR REPLACE INTO node_settings (key, value) VALUES ('auth_token', ?)").run(config.AUTH_TOKEN);
100 }
101 }
102
103 // Load stored identity from previous OAuth login (if not overridden by env)
104 const storedIdentity = db.prepare("SELECT did, handle FROM node_identity LIMIT 1").get() as
105 | { did: string; handle: string | null }
106 | undefined;
107 if (storedIdentity && !config.DID) {
108 config.DID = storedIdentity.did;
109 if (storedIdentity.handle) {
110 config.HANDLE = storedIdentity.handle;
111 }
112 }
113
114 // Initialize IPFS service (if enabled)
115 let ipfsService: IpfsService | undefined;
116 if (config.IPFS_ENABLED) {
117 ipfsService = new IpfsService({
118 db,
119 networking: config.IPFS_NETWORKING,
120 });
121 }
122
123 // Initialize repo manager (requires DID + signing key)
124 let repoManager: RepoManager | undefined;
125 let blobStore: BlobStore | undefined;
126 if (config.DID && config.SIGNING_KEY) {
127 repoManager = new RepoManager(db, config as typeof config & { DID: string; SIGNING_KEY: string });
128 blobStore = new BlobStore(dataDir, config.DID);
129 repoManager.init(blobStore, ipfsService, ipfsService);
130 }
131
132 // Initialize firehose (repoManager is optional — without it, no local event backfill)
133 const firehose = new Firehose(repoManager);
134
135 // Initialize DID resolver (allow override for tests)
136 const didResolver = opts?.didResolver ?? new DidResolver({
137 didCache: new InMemoryDidCache(),
138 });
139
140 // Initialize policy storage and engine (always created — needed for offer management)
141 const policyStorage = new PolicyStorage(db);
142 policyStorage.initSchema();
143
144 const policySet = config.POLICY_FILE ? loadPolicies(config) : null;
145 const policyEngine = new PolicyEngine(
146 policySet ?? { version: 1, policies: [] },
147 policyStorage,
148 );
149
150 // Seed from POLICY_FILE only if DB is empty (first run with a policy file)
151 if (policySet && policyStorage.count() === 0) {
152 policyEngine.persistAll();
153 console.log(pc.dim(` Policies: seeded ${policySet.policies.length} from ${config.POLICY_FILE}`));
154 } else if (policySet) {
155 console.log(pc.dim(` Policies: loaded ${policySet.policies.length} from ${config.POLICY_FILE}`));
156 }
157
158 // Load persisted policies from DB (merges with any file-loaded policies)
159 policyEngine.loadFromDb();
160
161 // Run one-time migration: config DIDs + admin_tracked_dids → policy engine
162 const migrated = migrateToNewPolicies(db, policyStorage, policyEngine, config.REPLICATE_DIDS);
163 if (migrated > 0) {
164 console.log(pc.dim(` Policies: migrated ${migrated} legacy DIDs`));
165 }
166
167 // Initialize OAuth client (if enabled)
168 let oauthClientManager: OAuthClientManager | undefined;
169 const pdsClientRef: PdsClientRef = { current: undefined };
170 if (config.OAUTH_ENABLED) {
171 oauthClientManager = await createOAuthClient(db, config);
172 if (config.DID) {
173 pdsClientRef.current = new PdsClient(oauthClientManager.client, config.DID);
174 }
175 }
176
177 // Initialize replication manager and replicated repo reader (if IPFS enabled)
178 // repoManager is optional — without it, manifest records are skipped but sync works
179 let replicationManager: ReplicationManager | undefined;
180 let replicatedRepoReader: ReplicatedRepoReader | undefined;
181 if (ipfsService) {
182 replicationManager = new ReplicationManager(
183 db,
184 config,
185 repoManager,
186 ipfsService,
187 ipfsService,
188 didResolver,
189 undefined,
190 undefined,
191 policyEngine,
192 pdsClientRef.current,
193 );
194 replicatedRepoReader = new ReplicatedRepoReader(
195 ipfsService,
196 replicationManager.getSyncStorage(),
197 );
198 replicationManager.setReplicatedRepoReader(replicatedRepoReader);
199 }
200
201 // Pass rate limiter to IPFS service for gossipsub rate limiting
202 if (rateLimiter && ipfsService) {
203 ipfsService.setRateLimiter(rateLimiter);
204 }
205
206 // --- Lazy IPFS startup: deferred until identity is established ---
207 let ipfsStarted = false;
208
209 async function startIpfsReplication(): Promise<void> {
210 if (ipfsStarted || !ipfsService) return;
211 ipfsStarted = true;
212
213 try {
214 console.log(pc.dim(` IPFS: starting...`));
215 await ipfsService.start();
216 const peerId = ipfsService.getPeerId();
217 if (peerId) {
218 console.log(pc.dim(` PeerID: ${peerId}`));
219 } else {
220 console.log(pc.dim(` IPFS: local blockstore only (networking disabled)`));
221 }
222 await backfillIpfs();
223 // Start replication after IPFS is ready
224 if (replicationManager) {
225 try {
226 await replicationManager.init();
227
228 // Auto-track the node's own DID (idempotent — no-op if already tracked)
229 if (config.DID) {
230 await replicationManager.addDid(config.DID).catch((err) => {
231 console.warn("[start] Self-replication addDid failed:", err instanceof Error ? err.message : String(err));
232 });
233 }
234
235 // Register libp2p repo sync protocol and set libp2p on ReplicationManager
236 const libp2pForSync = ipfsService?.getLibp2p();
237 if (libp2pForSync) {
238 const syncStorage = replicationManager.getSyncStorage();
239 registerRepoSyncProtocol(
240 libp2pForSync as Libp2p,
241 ipfsService!,
242 syncStorage,
243 );
244 replicationManager.setLibp2p(libp2pForSync as Libp2p);
245 console.log(pc.dim(` Repo sync: libp2p protocol registered`));
246 }
247
248 replicationManager.startPeriodicSync();
249 const trackedDids = replicationManager.getReplicateDids();
250 console.log(pc.dim(` Replication: tracking ${trackedDids.length} DIDs`));
251 // Start firehose subscription for real-time updates
252 if (config.FIREHOSE_ENABLED) {
253 replicationManager.startFirehose();
254 }
255 // Start challenge scheduler if policy engine is available
256 if (policyEngine) {
257 const libp2pNode = ipfsService?.getLibp2p();
258 let challengeTransport: ChallengeTransport;
259 if (libp2pNode) {
260 const libp2pTransport = new Libp2pChallengeTransport(libp2pNode as Libp2p);
261 const httpTransport = new HttpChallengeTransport();
262 const syncStorage = replicationManager.getSyncStorage();
263 challengeTransport = new FailoverChallengeTransport(
264 libp2pTransport,
265 httpTransport,
266 {
267 resolveEndpoint: (httpEndpoint) => syncStorage.getMultiaddrForPdsEndpoint(httpEndpoint),
268 onFallback: (endpoint, error) => {
269 console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`));
270 replicationManager!.refreshPeerInfoForEndpoint(endpoint);
271 },
272 },
273 );
274 } else {
275 challengeTransport = new HttpChallengeTransport();
276 }
277 replicationManager.startChallengeScheduler(challengeTransport);
278 console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p+HTTP failover" : "HTTP"} transport)`));
279 }
280
281 // Set up auto-republish of peer record when multiaddrs change
282 replicationManager.setPublishPeerRecordFn(async () => {
283 if (pdsClientRef.current && ipfsService) {
284 await publishPeerRecord(pdsClientRef.current, ipfsService, config.PUBLIC_URL);
285 }
286 });
287 } catch (err) {
288 console.error(pc.red(` Replication startup failed:`), err);
289 }
290 }
291 } catch (err) {
292 ipfsStarted = false; // Allow retry on failure
293 console.error(pc.red(` IPFS startup failed:`), err);
294 }
295 }
296
297 // Create Hono app
298 const app = createApp(
299 config,
300 firehose,
301 ipfsService,
302 ipfsService,
303 blobStore,
304 replicationManager,
305 replicatedRepoReader,
306 repoManager,
307 rateLimiter,
308 oauthClientManager,
309 pdsClientRef,
310 db,
311 startIpfsReplication,
312 ipfsService,
313 );
314
315 // Create HTTP server using @hono/node-server's request listener
316 const requestListener = getRequestListener(app.fetch);
317 const httpServer = createServer(requestListener);
318
319 // Set up WebSocket server for firehose with per-IP connection limits
320 const wss = new WebSocketServer({ noServer: true });
321 const firehoseConnections = new Map<string, number>();
322 const maxFirehosePerIp = config.RATE_LIMIT_FIREHOSE_PER_IP;
323
324 httpServer.on("upgrade", (request, socket, head) => {
325 const url = new URL(request.url ?? "/", `http://localhost:${config.PORT}`);
326
327 if (url.pathname === "/xrpc/com.atproto.sync.subscribeRepos") {
328 const ip =
329 (request.headers["x-forwarded-for"] as string)?.split(",")[0]?.trim() ??
330 request.headers["x-real-ip"] as string ??
331 "unknown";
332
333 const current = firehoseConnections.get(ip) ?? 0;
334 if (config.RATE_LIMIT_ENABLED && current >= maxFirehosePerIp) {
335 socket.destroy();
336 return;
337 }
338
339 wss.handleUpgrade(request, socket, head, (ws) => {
340 firehoseConnections.set(ip, (firehoseConnections.get(ip) ?? 0) + 1);
341 ws.on("close", () => {
342 const count = (firehoseConnections.get(ip) ?? 1) - 1;
343 if (count <= 0) {
344 firehoseConnections.delete(ip);
345 } else {
346 firehoseConnections.set(ip, count);
347 }
348 });
349 firehose.handleConnection(ws, request);
350 });
351 } else {
352 socket.destroy();
353 }
354 });
355
356 // Backfill existing blocks to IPFS (only when RepoManager created the blocks table)
357 async function backfillIpfs(): Promise<void> {
358 if (!ipfsService || !repoManager) return;
359
360 const rows = db
361 .prepare("SELECT cid, bytes FROM blocks")
362 .all() as Array<{ cid: string; bytes: Buffer }>;
363
364 if (rows.length === 0) return;
365
366 let count = 0;
367 for (const row of rows) {
368 const hasIt = await ipfsService.hasBlock(row.cid);
369 if (!hasIt) {
370 await ipfsService.putBlock(row.cid, new Uint8Array(row.bytes));
371 count++;
372 }
373 }
374
375 if (count > 0) {
376 console.log(pc.dim(` IPFS: backfilled ${count} blocks`));
377 }
378 }
379
380 // Start the HTTP server and async initialization
381 const { url, port } = await new Promise<{ url: string; port: number }>((resolveStart) => {
382 httpServer.listen(config.PORT, async () => {
383 const addr = httpServer.address() as { port: number };
384 const actualPort = addr.port;
385 const url = `http://localhost:${actualPort}`;
386
387 console.log(
388 pc.bold(`\nP2PDS running at `) +
389 pc.cyan(url),
390 );
391 // Machine-readable line for sidecar consumers (Tauri, tests)
392 console.log(`P2PDS_READY ${JSON.stringify({ port: actualPort, url })}`);
393 if (config.DID) {
394 console.log(pc.dim(` DID: ${config.DID}`));
395 }
396 if (config.HANDLE) {
397 console.log(pc.dim(` Handle: @${config.HANDLE}`));
398 }
399 console.log(pc.dim(` Data: ${dataDir}`));
400 if (oauthClientManager) {
401 if (pdsClientRef.current && await pdsClientRef.current.hasSession().catch(() => false)) {
402 console.log(pc.dim(` OAuth: session active for ${config.DID}`));
403 } else {
404 console.log(pc.dim(` OAuth: enabled (no active session)`));
405 }
406 }
407
408 // Start IPFS after HTTP server is listening (IPFS startup can be slow)
409 if (ipfsService && config.DID) {
410 await startIpfsReplication();
411 } else if (ipfsService) {
412 console.log(pc.dim(` IPFS: waiting for login`));
413 }
414
415 console.log();
416 resolveStart({ url, port: actualPort });
417 });
418 });
419
420 // Build close function
421 const close = async (): Promise<void> => {
422 if (rateLimiter) {
423 rateLimiter.stop();
424 }
425 if (replicationManager) {
426 replicationManager.stop();
427 }
428 wss.close();
429 // Force-close keep-alive connections so httpServer.close() resolves promptly
430 httpServer.closeAllConnections();
431 await new Promise<void>((res) => httpServer.close(() => res()));
432 if (ipfsService) {
433 await ipfsService.stop();
434 }
435 db.close();
436 };
437
438 return {
439 url,
440 port,
441 close,
442 replicationManager,
443 ipfsService,
444 };
445}