atproto user agency toolkit for individuals and groups
1import type Database from "better-sqlite3";
2import { encode as cborEncode, decode as cborDecode } from "./cbor-compat.js";
3import { CID } from "@atproto/lex-data";
4import { blocksToCarFile, type BlockMap } from "@atproto/repo";
5import type { RecordWriteOp } from "@atproto/repo";
6
7/**
8 * Commit event payload for the firehose
9 */
10export interface CommitEvent {
11 seq: number;
12 rebase: boolean;
13 tooBig: boolean;
14 repo: string;
15 commit: CID;
16 rev: string;
17 since: string | null;
18 blocks: Uint8Array;
19 ops: RepoOp[];
20 blobs: CID[];
21 time: string;
22}
23
24/**
25 * Identity event payload for the firehose
26 */
27export interface IdentityEvent {
28 seq: number;
29 did: string;
30 handle: string;
31 time: string;
32}
33
34/**
35 * Repository operation in a commit
36 */
37export interface RepoOp {
38 action: "create" | "update" | "delete";
39 path: string;
40 cid: CID | null;
41}
42
43/**
44 * Sequenced commit event wrapper
45 */
46export interface SeqCommitEvent {
47 seq: number;
48 type: "commit";
49 event: CommitEvent;
50 time: string;
51}
52
53/**
54 * Sequenced identity event wrapper
55 */
56export interface SeqIdentityEvent {
57 seq: number;
58 type: "identity";
59 event: IdentityEvent;
60 time: string;
61}
62
63/**
64 * Sequenced event (commit or identity)
65 */
66export type SeqEvent = SeqCommitEvent | SeqIdentityEvent;
67
68/**
69 * Data needed to sequence a commit
70 */
71export interface CommitData {
72 did: string;
73 commit: CID;
74 rev: string;
75 since: string | null;
76 newBlocks: BlockMap;
77 ops: Array<RecordWriteOp & { cid?: CID | null }>;
78}
79
80/**
81 * Sequencer manages the firehose event log.
82 *
83 * Stores commit events in SQLite and provides methods for:
84 * - Sequencing new commits
85 * - Backfilling events from a cursor
86 * - Getting the latest sequence number
87 */
88export class Sequencer {
89 private _insertStmt?: Database.Statement;
90 private _getEventsSinceStmt?: Database.Statement;
91 private _getLatestSeqStmt?: Database.Statement;
92
93 constructor(private db: Database.Database) {}
94
95 /** Lazily prepare statements (tables must exist first via initSchema) */
96 private get insertStmt() {
97 return (this._insertStmt ??= this.db.prepare(
98 `INSERT INTO firehose_events (event_type, payload)
99 VALUES ('commit', ?)`,
100 ));
101 }
102 private get getEventsSinceStmt() {
103 return (this._getEventsSinceStmt ??= this.db.prepare(
104 `SELECT seq, event_type, payload, created_at
105 FROM firehose_events
106 WHERE seq > ?
107 ORDER BY seq ASC
108 LIMIT ?`,
109 ));
110 }
111 private get getLatestSeqStmt() {
112 return (this._getLatestSeqStmt ??= this.db.prepare(
113 "SELECT MAX(seq) as seq FROM firehose_events",
114 ));
115 }
116
117 /**
118 * Add a commit to the firehose sequence.
119 * Returns the complete sequenced event for broadcasting.
120 */
121 async sequenceCommit(data: CommitData): Promise<SeqEvent> {
122 // Create CAR slice with commit diff
123 const carBytes = await blocksToCarFile(data.commit, data.newBlocks);
124 const time = new Date().toISOString();
125
126 // Build event payload
127 const eventPayload: Omit<CommitEvent, "seq"> = {
128 repo: data.did,
129 commit: data.commit,
130 rev: data.rev,
131 since: data.since,
132 blocks: carBytes,
133 ops: data.ops.map(
134 (op): RepoOp => ({
135 action: op.action as "create" | "update" | "delete",
136 path: `${op.collection}/${op.rkey}`,
137 cid: ("cid" in op && op.cid ? op.cid : null) as CID | null,
138 }),
139 ),
140 rebase: false,
141 tooBig: carBytes.length > 1_000_000,
142 blobs: [],
143 time,
144 };
145
146 // Store in SQLite
147 const payload = cborEncode(eventPayload);
148 const result = this.insertStmt.run(Buffer.from(payload));
149 const seq = Number(result.lastInsertRowid);
150
151 return {
152 seq,
153 type: "commit",
154 event: {
155 ...eventPayload,
156 seq,
157 },
158 time,
159 };
160 }
161
162 /**
163 * Get events from a cursor position.
164 */
165 async getEventsSince(cursor: number, limit = 100): Promise<SeqEvent[]> {
166 const rows = this.getEventsSinceStmt.all(cursor, limit) as Array<{
167 seq: number;
168 event_type: string;
169 payload: Buffer;
170 created_at: string;
171 }>;
172
173 const events: SeqEvent[] = [];
174
175 for (const row of rows) {
176 const payload = new Uint8Array(row.payload);
177 const seq = row.seq;
178 const time = row.created_at;
179
180 if (row.event_type === "identity") {
181 if (payload.length === 0) continue;
182 const decoded = cborDecode(payload) as Omit<IdentityEvent, "seq">;
183 events.push({
184 seq,
185 type: "identity",
186 event: { ...decoded, seq },
187 time,
188 });
189 } else {
190 const decoded = cborDecode(payload) as Omit<CommitEvent, "seq">;
191 events.push({
192 seq,
193 type: "commit",
194 event: { ...decoded, seq },
195 time,
196 });
197 }
198 }
199
200 return events;
201 }
202
203 /**
204 * Get the latest sequence number.
205 */
206 getLatestSeq(): number {
207 const result = this.getLatestSeqStmt.get() as {
208 seq: number | null;
209 };
210 return result?.seq ?? 0;
211 }
212
213 /**
214 * Prune old events to keep the log from growing indefinitely.
215 */
216 async pruneOldEvents(keepCount = 10000): Promise<void> {
217 this.db
218 .prepare(
219 `DELETE FROM firehose_events
220 WHERE seq < (SELECT MAX(seq) - ? FROM firehose_events)`,
221 )
222 .run(keepCount);
223 }
224}