WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at main 317 lines 11 kB view raw
1import { Jetstream } from "@skyware/jetstream"; 2import { type Database } from "@atbb/db"; 3import type { Logger } from "@atbb/logger"; 4import { Indexer } from "./indexer.js"; 5import { CursorManager } from "./cursor-manager.js"; 6import { CircuitBreaker } from "./circuit-breaker.js"; 7import { ReconnectionManager } from "./reconnection-manager.js"; 8import { EventHandlerRegistry } from "./event-handler-registry.js"; 9import type { BackfillManager } from "./backfill-manager.js"; 10import { BackfillStatus } from "./backfill-manager.js"; 11 12/** 13 * Firehose service that subscribes to AT Proto Jetstream 14 * and indexes space.atbb.* records into the database. 15 * 16 * Responsibilities: 17 * - WebSocket connection management via Jetstream 18 * - Event routing to indexer handlers 19 * - Health status monitoring 20 * 21 * Delegates to: 22 * - CursorManager: cursor persistence 23 * - CircuitBreaker: failure tracking and circuit breaking 24 * - ReconnectionManager: reconnection with exponential backoff 25 */ 26export class FirehoseService { 27 private jetstream: Jetstream; 28 private indexer: Indexer; 29 private running = false; 30 private lastEventTime: Date | null = null; 31 private cursorManager: CursorManager; 32 private circuitBreaker: CircuitBreaker; 33 private reconnectionManager: ReconnectionManager; 34 35 // Event handler registry 36 private handlerRegistry: EventHandlerRegistry; 37 38 private backfillManager: BackfillManager | null = null; 39 40 // Guard: only run startup backfill on the initial start, not on reconnects. 41 private isInitialStart = true; 42 43 // Collections we're interested in (full lexicon IDs) 44 private readonly wantedCollections: string[]; 45 46 constructor( 47 private db: Database, 48 private jetstreamUrl: string, 49 private logger: Logger 50 ) { 51 // Initialize the indexer instance with the database 52 this.indexer = new Indexer(db, logger); 53 54 // Initialize helper classes 55 this.cursorManager = new CursorManager(db, logger); 56 this.circuitBreaker = new CircuitBreaker(100, () => this.stop(), logger); 57 this.reconnectionManager = new ReconnectionManager(10, 5000, logger); 58 59 // Build handler registry 60 this.handlerRegistry = this.createHandlerRegistry(); 61 this.wantedCollections = this.handlerRegistry.getCollections(); 62 63 // Initialize with a placeholder - will be recreated with cursor in start() 64 this.jetstream = this.createJetstream(); 65 this.setupEventHandlers(); 66 } 67 68 /** 69 * Create a new Jetstream instance with optional cursor 70 */ 71 private createJetstream(cursor?: number): Jetstream { 72 return new Jetstream({ 73 wantedCollections: this.wantedCollections, 74 endpoint: this.jetstreamUrl, 75 cursor, 76 }); 77 } 78 79 /** 80 * Factory method that creates a wrapped handler for a given Indexer method. 81 * The returned handler delegates to the indexer method with circuit breaker protection. 82 */ 83 private createWrappedHandler<M extends keyof Indexer>(methodName: M) { 84 return async (event: any) => { 85 await this.circuitBreaker.execute( 86 () => (this.indexer[methodName] as any).call(this.indexer, event), 87 methodName as string 88 ); 89 }; 90 } 91 92 /** 93 * Create and configure the event handler registry 94 */ 95 private createHandlerRegistry(): EventHandlerRegistry { 96 return new EventHandlerRegistry() 97 .register({ 98 collection: "space.atbb.post", 99 onCreate: this.createWrappedHandler("handlePostCreate"), 100 onUpdate: this.createWrappedHandler("handlePostUpdate"), 101 onDelete: this.createWrappedHandler("handlePostDelete"), 102 }) 103 .register({ 104 collection: "space.atbb.forum.forum", 105 onCreate: this.createWrappedHandler("handleForumCreate"), 106 onUpdate: this.createWrappedHandler("handleForumUpdate"), 107 onDelete: this.createWrappedHandler("handleForumDelete"), 108 }) 109 .register({ 110 collection: "space.atbb.forum.category", 111 onCreate: this.createWrappedHandler("handleCategoryCreate"), 112 onUpdate: this.createWrappedHandler("handleCategoryUpdate"), 113 onDelete: this.createWrappedHandler("handleCategoryDelete"), 114 }) 115 .register({ 116 collection: "space.atbb.forum.board", 117 onCreate: this.createWrappedHandler("handleBoardCreate"), 118 onUpdate: this.createWrappedHandler("handleBoardUpdate"), 119 onDelete: this.createWrappedHandler("handleBoardDelete"), 120 }) 121 .register({ 122 collection: "space.atbb.forum.role", 123 onCreate: this.createWrappedHandler("handleRoleCreate"), 124 onUpdate: this.createWrappedHandler("handleRoleUpdate"), 125 onDelete: this.createWrappedHandler("handleRoleDelete"), 126 }) 127 .register({ 128 collection: "space.atbb.membership", 129 onCreate: this.createWrappedHandler("handleMembershipCreate"), 130 onUpdate: this.createWrappedHandler("handleMembershipUpdate"), 131 onDelete: this.createWrappedHandler("handleMembershipDelete"), 132 }) 133 .register({ 134 collection: "space.atbb.modAction", 135 onCreate: this.createWrappedHandler("handleModActionCreate"), 136 onUpdate: this.createWrappedHandler("handleModActionUpdate"), 137 onDelete: this.createWrappedHandler("handleModActionDelete"), 138 }) 139 .register({ 140 collection: "space.atbb.reaction", 141 onCreate: this.createWrappedHandler("handleReactionCreate"), 142 onUpdate: this.createWrappedHandler("handleReactionUpdate"), 143 onDelete: this.createWrappedHandler("handleReactionDelete"), 144 }) 145 .register({ 146 collection: "space.atbb.forum.theme", 147 onCreate: this.createWrappedHandler("handleThemeCreate"), 148 onUpdate: this.createWrappedHandler("handleThemeUpdate"), 149 onDelete: this.createWrappedHandler("handleThemeDelete"), 150 }) 151 .register({ 152 collection: "space.atbb.forum.themePolicy", 153 onCreate: this.createWrappedHandler("handleThemePolicyCreate"), 154 onUpdate: this.createWrappedHandler("handleThemePolicyUpdate"), 155 onDelete: this.createWrappedHandler("handleThemePolicyDelete"), 156 }); 157 } 158 159 /** 160 * Set up event handlers using the registry 161 */ 162 private setupEventHandlers() { 163 // Apply all handlers from the registry 164 this.handlerRegistry.applyTo(this.jetstream); 165 166 // Listen to all commits to track cursor and last event time 167 this.jetstream.on("commit", async (event) => { 168 this.lastEventTime = new Date(); 169 await this.cursorManager.update(event.time_us); 170 }); 171 172 // Handle errors and disconnections 173 this.jetstream.on("error", (error) => { 174 this.logger.error("Jetstream error", { error: error instanceof Error ? error.message : String(error) }); 175 this.handleReconnect(); 176 }); 177 } 178 179 /** 180 * Start the firehose subscription 181 */ 182 async start() { 183 if (this.running) { 184 this.logger.warn("Firehose service is already running"); 185 return; 186 } 187 188 // Check for backfill before starting firehose — only on the initial start. 189 // Reconnects skip this block to avoid re-running a completed backfill every 190 // time the Jetstream WebSocket drops and recovers. 191 // Wrapped in try-catch so a transient DB error at startup doesn't kill the process — 192 // stale data served from the firehose is better than no data at all. 193 if (this.isInitialStart && this.backfillManager) { 194 this.isInitialStart = false; 195 try { 196 const interrupted = await this.backfillManager.checkForInterruptedBackfill(); 197 if (interrupted) { 198 this.logger.info("Resuming interrupted backfill", { 199 event: "firehose.backfill.resuming_interrupted", 200 backfillId: interrupted.id.toString(), 201 lastProcessedDid: interrupted.lastProcessedDid, 202 }); 203 await this.backfillManager.resumeBackfill(interrupted); 204 this.logger.info("Interrupted backfill resumed", { 205 event: "firehose.backfill.resumed", 206 backfillId: interrupted.id.toString(), 207 }); 208 } else { 209 const savedCursorForCheck = await this.cursorManager.load(); 210 const backfillStatus = await this.backfillManager.checkIfNeeded(savedCursorForCheck); 211 212 if (backfillStatus !== BackfillStatus.NotNeeded) { 213 this.logger.info("Starting backfill", { 214 event: "firehose.backfill.starting", 215 type: backfillStatus, 216 }); 217 await this.backfillManager.performBackfill(backfillStatus); 218 this.logger.info("Backfill completed", { 219 event: "firehose.backfill.completed", 220 type: backfillStatus, 221 }); 222 } 223 } 224 } catch (error) { 225 this.logger.error("Backfill skipped due to startup error — firehose will start without it", { 226 event: "firehose.backfill.startup_error", 227 error: error instanceof Error ? error.message : String(error), 228 }); 229 // Continue to start firehose — stale data is better than no data 230 } 231 } 232 233 try { 234 // Load the last cursor from database 235 const savedCursor = await this.cursorManager.load(); 236 if (savedCursor) { 237 this.logger.info("Resuming from cursor", { cursor: savedCursor.toString() }); 238 // Rewind by 10 seconds to ensure we don't miss any events 239 const rewindedCursor = this.cursorManager.rewind(savedCursor, 10_000_000); 240 241 // Recreate Jetstream instance with cursor 242 this.jetstream = this.createJetstream(Number(rewindedCursor)); 243 this.setupEventHandlers(); 244 } 245 246 this.logger.info("Starting Jetstream firehose subscription", { url: this.jetstreamUrl }); 247 await this.jetstream.start(); 248 this.running = true; 249 this.reconnectionManager.reset(); 250 this.logger.info("Jetstream firehose subscription started successfully"); 251 } catch (error) { 252 this.logger.error("Failed to start Jetstream firehose", { error: error instanceof Error ? error.message : String(error) }); 253 this.handleReconnect(); 254 } 255 } 256 257 /** 258 * Stop the firehose subscription 259 */ 260 async stop() { 261 if (!this.running) { 262 return; 263 } 264 265 this.logger.info("Stopping Jetstream firehose subscription"); 266 await this.jetstream.close(); 267 this.running = false; 268 this.logger.info("Jetstream firehose subscription stopped"); 269 } 270 271 /** 272 * Check if the firehose is currently running 273 */ 274 isRunning(): boolean { 275 return this.running; 276 } 277 278 /** 279 * Get the timestamp of the last received event 280 */ 281 getLastEventTime(): Date | null { 282 return this.lastEventTime; 283 } 284 285 /** 286 * Inject the BackfillManager. Called during AppContext wiring. 287 */ 288 setBackfillManager(manager: BackfillManager): void { 289 this.backfillManager = manager; 290 } 291 292 /** 293 * Expose the Indexer instance for BackfillManager wiring. 294 */ 295 getIndexer(): Indexer { 296 return this.indexer; 297 } 298 299 /** 300 * Handle reconnection with exponential backoff 301 */ 302 private async handleReconnect() { 303 try { 304 await this.reconnectionManager.attemptReconnect(async () => { 305 this.running = false; 306 await this.start(); 307 }); 308 } catch (error) { 309 this.logger.fatal("Firehose indexing has stopped. The appview will continue serving stale data.", { 310 event: "firehose.reconnect.exhausted", 311 error: error instanceof Error ? error.message : String(error), 312 }); 313 this.running = false; 314 } 315 } 316 317}