atproto user agency toolkit for individuals and groups
at main 445 lines 15 kB view raw
1/** 2 * Extracted server startup logic — importable and testable. 3 * 4 * startServer(config, opts?) does everything server.ts used to do at module scope: 5 * creates DB, IPFS, replication, HTTP server, etc. and returns a ServerHandle 6 * for programmatic control (tests, Tauri sidecar, etc.). 7 */ 8 9import { getRequestListener } from "@hono/node-server"; 10import { createServer, type Server } from "node:http"; 11import { mkdirSync } from "node:fs"; 12import { resolve } from "node:path"; 13import Database from "better-sqlite3"; 14import { WebSocketServer } from "ws"; 15import pc from "picocolors"; 16 17import { loadPolicies } from "./config.js"; 18import type { Config } from "./config.js"; 19import { RepoManager } from "./repo-manager.js"; 20import { BlobStore } from "./blobs.js"; 21import { Firehose } from "./firehose.js"; 22import { IpfsService } from "./ipfs.js"; 23import { createApp } from "./index.js"; 24import { DidResolver } from "./did-resolver.js"; 25import { InMemoryDidCache } from "./did-cache.js"; 26import { ReplicationManager } from "./replication/replication-manager.js"; 27import { ReplicatedRepoReader } from "./replication/replicated-repo-reader.js"; 28import { PolicyEngine } from "./policy/engine.js"; 29import { PolicyStorage } from "./policy/storage.js"; 30import { migrateToNewPolicies } from "./policy/migrate.js"; 31import { HttpChallengeTransport } from "./replication/challenge-response/http-transport.js"; 32import { Libp2pChallengeTransport } from "./replication/challenge-response/libp2p-transport.js"; 33import { FailoverChallengeTransport } from "./replication/challenge-response/failover-transport.js"; 34import type { ChallengeTransport } from "./replication/challenge-response/transport.js"; 35import type { Libp2p } from "@libp2p/interface"; 36import { RateLimiter } from "./rate-limiter.js"; 37import { createOAuthClient, type OAuthClientManager } from "./oauth/client.js"; 38import { PdsClient } from "./oauth/pds-client.js"; 39import { type PdsClientRef, publishPeerRecord } from "./oauth/routes.js"; 40import { registerRepoSyncProtocol } from "./replication/libp2p-sync.js"; 41 42export interface StartServerOpts { 43 /** Override DID resolver (e.g. mock resolver for tests). */ 44 didResolver?: DidResolver; 45} 46 47export interface ServerHandle { 48 url: string; 49 port: number; 50 close: () => Promise<void>; 51 replicationManager?: ReplicationManager; 52 ipfsService?: IpfsService; 53} 54 55export async function startServer( 56 config: Config, 57 opts?: StartServerOpts, 58): Promise<ServerHandle> { 59 // Initialize rate limiter 60 let rateLimiter: RateLimiter | undefined; 61 if (config.RATE_LIMIT_ENABLED) { 62 rateLimiter = new RateLimiter(); 63 rateLimiter.startCleanup(60_000); 64 } 65 66 // Ensure data directory exists 67 const dataDir = resolve(config.DATA_DIR); 68 mkdirSync(dataDir, { recursive: true }); 69 70 // Initialize SQLite database 71 const dbPath = resolve(dataDir, "pds.db"); 72 const db = new Database(dbPath); 73 db.pragma("journal_mode = WAL"); 74 db.pragma("synchronous = NORMAL"); 75 76 // Create node_identity table for OAuth-established identity 77 db.exec(` 78 CREATE TABLE IF NOT EXISTS node_identity ( 79 did TEXT PRIMARY KEY, 80 handle TEXT, 81 created_at TEXT NOT NULL DEFAULT (datetime('now')) 82 ) 83 `); 84 85 // Persist AUTH_TOKEN so the UI token survives restarts 86 db.exec(` 87 CREATE TABLE IF NOT EXISTS node_settings ( 88 key TEXT PRIMARY KEY, 89 value TEXT NOT NULL 90 ) 91 `); 92 if (!process.env.AUTH_TOKEN) { 93 const storedToken = db.prepare("SELECT value FROM node_settings WHERE key = 'auth_token'").get() as 94 | { value: string } 95 | undefined; 96 if (storedToken) { 97 config.AUTH_TOKEN = storedToken.value; 98 } else { 99 db.prepare("INSERT OR REPLACE INTO node_settings (key, value) VALUES ('auth_token', ?)").run(config.AUTH_TOKEN); 100 } 101 } 102 103 // Load stored identity from previous OAuth login (if not overridden by env) 104 const storedIdentity = db.prepare("SELECT did, handle FROM node_identity LIMIT 1").get() as 105 | { did: string; handle: string | null } 106 | undefined; 107 if (storedIdentity && !config.DID) { 108 config.DID = storedIdentity.did; 109 if (storedIdentity.handle) { 110 config.HANDLE = storedIdentity.handle; 111 } 112 } 113 114 // Initialize IPFS service (if enabled) 115 let ipfsService: IpfsService | undefined; 116 if (config.IPFS_ENABLED) { 117 ipfsService = new IpfsService({ 118 db, 119 networking: config.IPFS_NETWORKING, 120 }); 121 } 122 123 // Initialize repo manager (requires DID + signing key) 124 let repoManager: RepoManager | undefined; 125 let blobStore: BlobStore | undefined; 126 if (config.DID && config.SIGNING_KEY) { 127 repoManager = new RepoManager(db, config as typeof config & { DID: string; SIGNING_KEY: string }); 128 blobStore = new BlobStore(dataDir, config.DID); 129 repoManager.init(blobStore, ipfsService, ipfsService); 130 } 131 132 // Initialize firehose (repoManager is optional — without it, no local event backfill) 133 const firehose = new Firehose(repoManager); 134 135 // Initialize DID resolver (allow override for tests) 136 const didResolver = opts?.didResolver ?? new DidResolver({ 137 didCache: new InMemoryDidCache(), 138 }); 139 140 // Initialize policy storage and engine (always created — needed for offer management) 141 const policyStorage = new PolicyStorage(db); 142 policyStorage.initSchema(); 143 144 const policySet = config.POLICY_FILE ? loadPolicies(config) : null; 145 const policyEngine = new PolicyEngine( 146 policySet ?? { version: 1, policies: [] }, 147 policyStorage, 148 ); 149 150 // Seed from POLICY_FILE only if DB is empty (first run with a policy file) 151 if (policySet && policyStorage.count() === 0) { 152 policyEngine.persistAll(); 153 console.log(pc.dim(` Policies: seeded ${policySet.policies.length} from ${config.POLICY_FILE}`)); 154 } else if (policySet) { 155 console.log(pc.dim(` Policies: loaded ${policySet.policies.length} from ${config.POLICY_FILE}`)); 156 } 157 158 // Load persisted policies from DB (merges with any file-loaded policies) 159 policyEngine.loadFromDb(); 160 161 // Run one-time migration: config DIDs + admin_tracked_dids → policy engine 162 const migrated = migrateToNewPolicies(db, policyStorage, policyEngine, config.REPLICATE_DIDS); 163 if (migrated > 0) { 164 console.log(pc.dim(` Policies: migrated ${migrated} legacy DIDs`)); 165 } 166 167 // Initialize OAuth client (if enabled) 168 let oauthClientManager: OAuthClientManager | undefined; 169 const pdsClientRef: PdsClientRef = { current: undefined }; 170 if (config.OAUTH_ENABLED) { 171 oauthClientManager = await createOAuthClient(db, config); 172 if (config.DID) { 173 pdsClientRef.current = new PdsClient(oauthClientManager.client, config.DID); 174 } 175 } 176 177 // Initialize replication manager and replicated repo reader (if IPFS enabled) 178 // repoManager is optional — without it, manifest records are skipped but sync works 179 let replicationManager: ReplicationManager | undefined; 180 let replicatedRepoReader: ReplicatedRepoReader | undefined; 181 if (ipfsService) { 182 replicationManager = new ReplicationManager( 183 db, 184 config, 185 repoManager, 186 ipfsService, 187 ipfsService, 188 didResolver, 189 undefined, 190 undefined, 191 policyEngine, 192 pdsClientRef.current, 193 ); 194 replicatedRepoReader = new ReplicatedRepoReader( 195 ipfsService, 196 replicationManager.getSyncStorage(), 197 ); 198 replicationManager.setReplicatedRepoReader(replicatedRepoReader); 199 } 200 201 // Pass rate limiter to IPFS service for gossipsub rate limiting 202 if (rateLimiter && ipfsService) { 203 ipfsService.setRateLimiter(rateLimiter); 204 } 205 206 // --- Lazy IPFS startup: deferred until identity is established --- 207 let ipfsStarted = false; 208 209 async function startIpfsReplication(): Promise<void> { 210 if (ipfsStarted || !ipfsService) return; 211 ipfsStarted = true; 212 213 try { 214 console.log(pc.dim(` IPFS: starting...`)); 215 await ipfsService.start(); 216 const peerId = ipfsService.getPeerId(); 217 if (peerId) { 218 console.log(pc.dim(` PeerID: ${peerId}`)); 219 } else { 220 console.log(pc.dim(` IPFS: local blockstore only (networking disabled)`)); 221 } 222 await backfillIpfs(); 223 // Start replication after IPFS is ready 224 if (replicationManager) { 225 try { 226 await replicationManager.init(); 227 228 // Auto-track the node's own DID (idempotent — no-op if already tracked) 229 if (config.DID) { 230 await replicationManager.addDid(config.DID).catch((err) => { 231 console.warn("[start] Self-replication addDid failed:", err instanceof Error ? err.message : String(err)); 232 }); 233 } 234 235 // Register libp2p repo sync protocol and set libp2p on ReplicationManager 236 const libp2pForSync = ipfsService?.getLibp2p(); 237 if (libp2pForSync) { 238 const syncStorage = replicationManager.getSyncStorage(); 239 registerRepoSyncProtocol( 240 libp2pForSync as Libp2p, 241 ipfsService!, 242 syncStorage, 243 ); 244 replicationManager.setLibp2p(libp2pForSync as Libp2p); 245 console.log(pc.dim(` Repo sync: libp2p protocol registered`)); 246 } 247 248 replicationManager.startPeriodicSync(); 249 const trackedDids = replicationManager.getReplicateDids(); 250 console.log(pc.dim(` Replication: tracking ${trackedDids.length} DIDs`)); 251 // Start firehose subscription for real-time updates 252 if (config.FIREHOSE_ENABLED) { 253 replicationManager.startFirehose(); 254 } 255 // Start challenge scheduler if policy engine is available 256 if (policyEngine) { 257 const libp2pNode = ipfsService?.getLibp2p(); 258 let challengeTransport: ChallengeTransport; 259 if (libp2pNode) { 260 const libp2pTransport = new Libp2pChallengeTransport(libp2pNode as Libp2p); 261 const httpTransport = new HttpChallengeTransport(); 262 const syncStorage = replicationManager.getSyncStorage(); 263 challengeTransport = new FailoverChallengeTransport( 264 libp2pTransport, 265 httpTransport, 266 { 267 resolveEndpoint: (httpEndpoint) => syncStorage.getMultiaddrForPdsEndpoint(httpEndpoint), 268 onFallback: (endpoint, error) => { 269 console.log(pc.dim(` Challenge: libp2p failed for ${endpoint}, falling back to HTTP: ${error.message}`)); 270 replicationManager!.refreshPeerInfoForEndpoint(endpoint); 271 }, 272 }, 273 ); 274 } else { 275 challengeTransport = new HttpChallengeTransport(); 276 } 277 replicationManager.startChallengeScheduler(challengeTransport); 278 console.log(pc.dim(` Challenges: scheduler started (${libp2pNode ? "libp2p+HTTP failover" : "HTTP"} transport)`)); 279 } 280 281 // Set up auto-republish of peer record when multiaddrs change 282 replicationManager.setPublishPeerRecordFn(async () => { 283 if (pdsClientRef.current && ipfsService) { 284 await publishPeerRecord(pdsClientRef.current, ipfsService, config.PUBLIC_URL); 285 } 286 }); 287 } catch (err) { 288 console.error(pc.red(` Replication startup failed:`), err); 289 } 290 } 291 } catch (err) { 292 ipfsStarted = false; // Allow retry on failure 293 console.error(pc.red(` IPFS startup failed:`), err); 294 } 295 } 296 297 // Create Hono app 298 const app = createApp( 299 config, 300 firehose, 301 ipfsService, 302 ipfsService, 303 blobStore, 304 replicationManager, 305 replicatedRepoReader, 306 repoManager, 307 rateLimiter, 308 oauthClientManager, 309 pdsClientRef, 310 db, 311 startIpfsReplication, 312 ipfsService, 313 ); 314 315 // Create HTTP server using @hono/node-server's request listener 316 const requestListener = getRequestListener(app.fetch); 317 const httpServer = createServer(requestListener); 318 319 // Set up WebSocket server for firehose with per-IP connection limits 320 const wss = new WebSocketServer({ noServer: true }); 321 const firehoseConnections = new Map<string, number>(); 322 const maxFirehosePerIp = config.RATE_LIMIT_FIREHOSE_PER_IP; 323 324 httpServer.on("upgrade", (request, socket, head) => { 325 const url = new URL(request.url ?? "/", `http://localhost:${config.PORT}`); 326 327 if (url.pathname === "/xrpc/com.atproto.sync.subscribeRepos") { 328 const ip = 329 (request.headers["x-forwarded-for"] as string)?.split(",")[0]?.trim() ?? 330 request.headers["x-real-ip"] as string ?? 331 "unknown"; 332 333 const current = firehoseConnections.get(ip) ?? 0; 334 if (config.RATE_LIMIT_ENABLED && current >= maxFirehosePerIp) { 335 socket.destroy(); 336 return; 337 } 338 339 wss.handleUpgrade(request, socket, head, (ws) => { 340 firehoseConnections.set(ip, (firehoseConnections.get(ip) ?? 0) + 1); 341 ws.on("close", () => { 342 const count = (firehoseConnections.get(ip) ?? 1) - 1; 343 if (count <= 0) { 344 firehoseConnections.delete(ip); 345 } else { 346 firehoseConnections.set(ip, count); 347 } 348 }); 349 firehose.handleConnection(ws, request); 350 }); 351 } else { 352 socket.destroy(); 353 } 354 }); 355 356 // Backfill existing blocks to IPFS (only when RepoManager created the blocks table) 357 async function backfillIpfs(): Promise<void> { 358 if (!ipfsService || !repoManager) return; 359 360 const rows = db 361 .prepare("SELECT cid, bytes FROM blocks") 362 .all() as Array<{ cid: string; bytes: Buffer }>; 363 364 if (rows.length === 0) return; 365 366 let count = 0; 367 for (const row of rows) { 368 const hasIt = await ipfsService.hasBlock(row.cid); 369 if (!hasIt) { 370 await ipfsService.putBlock(row.cid, new Uint8Array(row.bytes)); 371 count++; 372 } 373 } 374 375 if (count > 0) { 376 console.log(pc.dim(` IPFS: backfilled ${count} blocks`)); 377 } 378 } 379 380 // Start the HTTP server and async initialization 381 const { url, port } = await new Promise<{ url: string; port: number }>((resolveStart) => { 382 httpServer.listen(config.PORT, async () => { 383 const addr = httpServer.address() as { port: number }; 384 const actualPort = addr.port; 385 const url = `http://localhost:${actualPort}`; 386 387 console.log( 388 pc.bold(`\nP2PDS running at `) + 389 pc.cyan(url), 390 ); 391 // Machine-readable line for sidecar consumers (Tauri, tests) 392 console.log(`P2PDS_READY ${JSON.stringify({ port: actualPort, url })}`); 393 if (config.DID) { 394 console.log(pc.dim(` DID: ${config.DID}`)); 395 } 396 if (config.HANDLE) { 397 console.log(pc.dim(` Handle: @${config.HANDLE}`)); 398 } 399 console.log(pc.dim(` Data: ${dataDir}`)); 400 if (oauthClientManager) { 401 if (pdsClientRef.current && await pdsClientRef.current.hasSession().catch(() => false)) { 402 console.log(pc.dim(` OAuth: session active for ${config.DID}`)); 403 } else { 404 console.log(pc.dim(` OAuth: enabled (no active session)`)); 405 } 406 } 407 408 // Start IPFS after HTTP server is listening (IPFS startup can be slow) 409 if (ipfsService && config.DID) { 410 await startIpfsReplication(); 411 } else if (ipfsService) { 412 console.log(pc.dim(` IPFS: waiting for login`)); 413 } 414 415 console.log(); 416 resolveStart({ url, port: actualPort }); 417 }); 418 }); 419 420 // Build close function 421 const close = async (): Promise<void> => { 422 if (rateLimiter) { 423 rateLimiter.stop(); 424 } 425 if (replicationManager) { 426 replicationManager.stop(); 427 } 428 wss.close(); 429 // Force-close keep-alive connections so httpServer.close() resolves promptly 430 httpServer.closeAllConnections(); 431 await new Promise<void>((res) => httpServer.close(() => res())); 432 if (ipfsService) { 433 await ipfsService.stop(); 434 } 435 db.close(); 436 }; 437 438 return { 439 url, 440 port, 441 close, 442 replicationManager, 443 ipfsService, 444 }; 445}