WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at main 91 lines 2.7 kB view raw
1import { type Database, firehoseCursor } from "@atbb/db"; 2import type { Logger } from "@atbb/logger"; 3import { eq } from "drizzle-orm"; 4 5/** 6 * Manages firehose cursor persistence in the database. 7 * 8 * The cursor tracks the last processed event timestamp (in microseconds) 9 * to enable resumption after restart or reconnection. 10 */ 11export class CursorManager { 12 constructor(private db: Database, private logger: Logger) {} 13 14 /** 15 * Load the last cursor from database 16 * 17 * @param service - Service name (default: "jetstream") 18 * @returns The last cursor value, or null if none exists 19 */ 20 async load(service: string = "jetstream"): Promise<bigint | null> { 21 try { 22 const result = await this.db 23 .select() 24 .from(firehoseCursor) 25 .where(eq(firehoseCursor.service, service)) 26 .limit(1); 27 28 return result.length > 0 ? result[0].cursor : null; 29 } catch (error) { 30 this.logger.error("Failed to load cursor from database", { 31 error: error instanceof Error ? error.message : String(error), 32 }); 33 return null; 34 } 35 } 36 37 /** 38 * Update the cursor in database 39 * 40 * @param timeUs - Timestamp in microseconds 41 * @param service - Service name (default: "jetstream") 42 */ 43 async update(timeUs: number, service: string = "jetstream"): Promise<void> { 44 try { 45 await this.db 46 .insert(firehoseCursor) 47 .values({ 48 service, 49 cursor: BigInt(timeUs), 50 updatedAt: new Date(), 51 }) 52 .onConflictDoUpdate({ 53 target: firehoseCursor.service, 54 set: { 55 cursor: BigInt(timeUs), 56 updatedAt: new Date(), 57 }, 58 }); 59 } catch (error) { 60 // Don't throw - we don't want cursor updates to break the stream 61 this.logger.error("Failed to update cursor", { 62 error: error instanceof Error ? error.message : String(error), 63 }); 64 } 65 } 66 67 /** 68 * Rewind cursor by specified microseconds for safety margin 69 * 70 * @param cursor - Current cursor value 71 * @param microseconds - Amount to rewind in microseconds 72 * @returns Rewound cursor value 73 */ 74 rewind(cursor: bigint, microseconds: number): bigint { 75 return cursor - BigInt(microseconds); 76 } 77 78 /** 79 * Calculate cursor age in hours. 80 * Cursor values are Jetstream timestamps in microseconds since epoch. 81 * 82 * @param cursor - Cursor value (microseconds), or null 83 * @returns Age in hours, or null if cursor is null 84 */ 85 getCursorAgeHours(cursor: bigint | null): number | null { 86 if (cursor === null) return null; 87 const cursorMs = Number(cursor / 1000n); 88 const ageMs = Date.now() - cursorMs; 89 return ageMs / (1000 * 60 * 60); 90 } 91}