Barazo AppView backend
barazo.forum
1import { eq } from 'drizzle-orm'
2import { firehoseCursor } from '../db/schema/firehose.js'
3import type { Database } from '../db/index.js'
4
5const DEFAULT_DEBOUNCE_MS = 5000
6
7export class CursorStore {
8 private db: Database
9 private debounceMs: number
10 private pendingCursor: bigint | null = null
11 private timer: ReturnType<typeof setTimeout> | null = null
12
13 constructor(db: Database, debounceMs = DEFAULT_DEBOUNCE_MS) {
14 this.db = db
15 this.debounceMs = debounceMs
16 }
17
18 async getCursor(): Promise<bigint | null> {
19 const rows = await this.db.select().from(firehoseCursor).where(eq(firehoseCursor.id, 'default'))
20
21 const row = rows[0]
22 return row?.cursor ?? null
23 }
24
25 saveCursor(cursor: bigint): void {
26 this.pendingCursor = cursor
27
28 if (this.timer !== null) {
29 return
30 }
31
32 this.timer = setTimeout(() => {
33 void this.writeCursor()
34 }, this.debounceMs)
35 }
36
37 async flush(): Promise<void> {
38 if (this.timer !== null) {
39 clearTimeout(this.timer)
40 this.timer = null
41 }
42 await this.writeCursor()
43 }
44
45 private async writeCursor(): Promise<void> {
46 this.timer = null
47 const cursor = this.pendingCursor
48 if (cursor === null) {
49 return
50 }
51 this.pendingCursor = null
52
53 await this.db
54 .update(firehoseCursor)
55 .set({ cursor, updatedAt: new Date() })
56 .where(eq(firehoseCursor.id, 'default'))
57 }
58}