import type Database from "better-sqlite3"; import { encode as cborEncode, decode as cborDecode } from "./cbor-compat.js"; import { CID } from "@atproto/lex-data"; import { blocksToCarFile, type BlockMap } from "@atproto/repo"; import type { RecordWriteOp } from "@atproto/repo"; /** * Commit event payload for the firehose */ export interface CommitEvent { seq: number; rebase: boolean; tooBig: boolean; repo: string; commit: CID; rev: string; since: string | null; blocks: Uint8Array; ops: RepoOp[]; blobs: CID[]; time: string; } /** * Identity event payload for the firehose */ export interface IdentityEvent { seq: number; did: string; handle: string; time: string; } /** * Repository operation in a commit */ export interface RepoOp { action: "create" | "update" | "delete"; path: string; cid: CID | null; } /** * Sequenced commit event wrapper */ export interface SeqCommitEvent { seq: number; type: "commit"; event: CommitEvent; time: string; } /** * Sequenced identity event wrapper */ export interface SeqIdentityEvent { seq: number; type: "identity"; event: IdentityEvent; time: string; } /** * Sequenced event (commit or identity) */ export type SeqEvent = SeqCommitEvent | SeqIdentityEvent; /** * Data needed to sequence a commit */ export interface CommitData { did: string; commit: CID; rev: string; since: string | null; newBlocks: BlockMap; ops: Array; } /** * Sequencer manages the firehose event log. * * Stores commit events in SQLite and provides methods for: * - Sequencing new commits * - Backfilling events from a cursor * - Getting the latest sequence number */ export class Sequencer { private _insertStmt?: Database.Statement; private _getEventsSinceStmt?: Database.Statement; private _getLatestSeqStmt?: Database.Statement; constructor(private db: Database.Database) {} /** Lazily prepare statements (tables must exist first via initSchema) */ private get insertStmt() { return (this._insertStmt ??= this.db.prepare( `INSERT INTO firehose_events (event_type, payload) VALUES ('commit', ?)`, )); } private get getEventsSinceStmt() { return (this._getEventsSinceStmt ??= this.db.prepare( `SELECT seq, event_type, payload, created_at FROM firehose_events WHERE seq > ? ORDER BY seq ASC LIMIT ?`, )); } private get getLatestSeqStmt() { return (this._getLatestSeqStmt ??= this.db.prepare( "SELECT MAX(seq) as seq FROM firehose_events", )); } /** * Add a commit to the firehose sequence. * Returns the complete sequenced event for broadcasting. */ async sequenceCommit(data: CommitData): Promise { // Create CAR slice with commit diff const carBytes = await blocksToCarFile(data.commit, data.newBlocks); const time = new Date().toISOString(); // Build event payload const eventPayload: Omit = { repo: data.did, commit: data.commit, rev: data.rev, since: data.since, blocks: carBytes, ops: data.ops.map( (op): RepoOp => ({ action: op.action as "create" | "update" | "delete", path: `${op.collection}/${op.rkey}`, cid: ("cid" in op && op.cid ? op.cid : null) as CID | null, }), ), rebase: false, tooBig: carBytes.length > 1_000_000, blobs: [], time, }; // Store in SQLite const payload = cborEncode(eventPayload); const result = this.insertStmt.run(Buffer.from(payload)); const seq = Number(result.lastInsertRowid); return { seq, type: "commit", event: { ...eventPayload, seq, }, time, }; } /** * Get events from a cursor position. */ async getEventsSince(cursor: number, limit = 100): Promise { const rows = this.getEventsSinceStmt.all(cursor, limit) as Array<{ seq: number; event_type: string; payload: Buffer; created_at: string; }>; const events: SeqEvent[] = []; for (const row of rows) { const payload = new Uint8Array(row.payload); const seq = row.seq; const time = row.created_at; if (row.event_type === "identity") { if (payload.length === 0) continue; const decoded = cborDecode(payload) as Omit; events.push({ seq, type: "identity", event: { ...decoded, seq }, time, }); } else { const decoded = cborDecode(payload) as Omit; events.push({ seq, type: "commit", event: { ...decoded, seq }, time, }); } } return events; } /** * Get the latest sequence number. */ getLatestSeq(): number { const result = this.getLatestSeqStmt.get() as { seq: number | null; }; return result?.seq ?? 0; } /** * Prune old events to keep the log from growing indefinitely. */ async pruneOldEvents(keepCount = 10000): Promise { this.db .prepare( `DELETE FROM firehose_events WHERE seq < (SELECT MAX(seq) - ? FROM firehose_events)`, ) .run(keepCount); } }