atproto user agency toolkit for individuals and groups
at main 224 lines 5.0 kB view raw
1import type Database from "better-sqlite3"; 2import { encode as cborEncode, decode as cborDecode } from "./cbor-compat.js"; 3import { CID } from "@atproto/lex-data"; 4import { blocksToCarFile, type BlockMap } from "@atproto/repo"; 5import type { RecordWriteOp } from "@atproto/repo"; 6 7/** 8 * Commit event payload for the firehose 9 */ 10export interface CommitEvent { 11 seq: number; 12 rebase: boolean; 13 tooBig: boolean; 14 repo: string; 15 commit: CID; 16 rev: string; 17 since: string | null; 18 blocks: Uint8Array; 19 ops: RepoOp[]; 20 blobs: CID[]; 21 time: string; 22} 23 24/** 25 * Identity event payload for the firehose 26 */ 27export interface IdentityEvent { 28 seq: number; 29 did: string; 30 handle: string; 31 time: string; 32} 33 34/** 35 * Repository operation in a commit 36 */ 37export interface RepoOp { 38 action: "create" | "update" | "delete"; 39 path: string; 40 cid: CID | null; 41} 42 43/** 44 * Sequenced commit event wrapper 45 */ 46export interface SeqCommitEvent { 47 seq: number; 48 type: "commit"; 49 event: CommitEvent; 50 time: string; 51} 52 53/** 54 * Sequenced identity event wrapper 55 */ 56export interface SeqIdentityEvent { 57 seq: number; 58 type: "identity"; 59 event: IdentityEvent; 60 time: string; 61} 62 63/** 64 * Sequenced event (commit or identity) 65 */ 66export type SeqEvent = SeqCommitEvent | SeqIdentityEvent; 67 68/** 69 * Data needed to sequence a commit 70 */ 71export interface CommitData { 72 did: string; 73 commit: CID; 74 rev: string; 75 since: string | null; 76 newBlocks: BlockMap; 77 ops: Array<RecordWriteOp & { cid?: CID | null }>; 78} 79 80/** 81 * Sequencer manages the firehose event log. 82 * 83 * Stores commit events in SQLite and provides methods for: 84 * - Sequencing new commits 85 * - Backfilling events from a cursor 86 * - Getting the latest sequence number 87 */ 88export class Sequencer { 89 private _insertStmt?: Database.Statement; 90 private _getEventsSinceStmt?: Database.Statement; 91 private _getLatestSeqStmt?: Database.Statement; 92 93 constructor(private db: Database.Database) {} 94 95 /** Lazily prepare statements (tables must exist first via initSchema) */ 96 private get insertStmt() { 97 return (this._insertStmt ??= this.db.prepare( 98 `INSERT INTO firehose_events (event_type, payload) 99 VALUES ('commit', ?)`, 100 )); 101 } 102 private get getEventsSinceStmt() { 103 return (this._getEventsSinceStmt ??= this.db.prepare( 104 `SELECT seq, event_type, payload, created_at 105 FROM firehose_events 106 WHERE seq > ? 107 ORDER BY seq ASC 108 LIMIT ?`, 109 )); 110 } 111 private get getLatestSeqStmt() { 112 return (this._getLatestSeqStmt ??= this.db.prepare( 113 "SELECT MAX(seq) as seq FROM firehose_events", 114 )); 115 } 116 117 /** 118 * Add a commit to the firehose sequence. 119 * Returns the complete sequenced event for broadcasting. 120 */ 121 async sequenceCommit(data: CommitData): Promise<SeqEvent> { 122 // Create CAR slice with commit diff 123 const carBytes = await blocksToCarFile(data.commit, data.newBlocks); 124 const time = new Date().toISOString(); 125 126 // Build event payload 127 const eventPayload: Omit<CommitEvent, "seq"> = { 128 repo: data.did, 129 commit: data.commit, 130 rev: data.rev, 131 since: data.since, 132 blocks: carBytes, 133 ops: data.ops.map( 134 (op): RepoOp => ({ 135 action: op.action as "create" | "update" | "delete", 136 path: `${op.collection}/${op.rkey}`, 137 cid: ("cid" in op && op.cid ? op.cid : null) as CID | null, 138 }), 139 ), 140 rebase: false, 141 tooBig: carBytes.length > 1_000_000, 142 blobs: [], 143 time, 144 }; 145 146 // Store in SQLite 147 const payload = cborEncode(eventPayload); 148 const result = this.insertStmt.run(Buffer.from(payload)); 149 const seq = Number(result.lastInsertRowid); 150 151 return { 152 seq, 153 type: "commit", 154 event: { 155 ...eventPayload, 156 seq, 157 }, 158 time, 159 }; 160 } 161 162 /** 163 * Get events from a cursor position. 164 */ 165 async getEventsSince(cursor: number, limit = 100): Promise<SeqEvent[]> { 166 const rows = this.getEventsSinceStmt.all(cursor, limit) as Array<{ 167 seq: number; 168 event_type: string; 169 payload: Buffer; 170 created_at: string; 171 }>; 172 173 const events: SeqEvent[] = []; 174 175 for (const row of rows) { 176 const payload = new Uint8Array(row.payload); 177 const seq = row.seq; 178 const time = row.created_at; 179 180 if (row.event_type === "identity") { 181 if (payload.length === 0) continue; 182 const decoded = cborDecode(payload) as Omit<IdentityEvent, "seq">; 183 events.push({ 184 seq, 185 type: "identity", 186 event: { ...decoded, seq }, 187 time, 188 }); 189 } else { 190 const decoded = cborDecode(payload) as Omit<CommitEvent, "seq">; 191 events.push({ 192 seq, 193 type: "commit", 194 event: { ...decoded, seq }, 195 time, 196 }); 197 } 198 } 199 200 return events; 201 } 202 203 /** 204 * Get the latest sequence number. 205 */ 206 getLatestSeq(): number { 207 const result = this.getLatestSeqStmt.get() as { 208 seq: number | null; 209 }; 210 return result?.seq ?? 0; 211 } 212 213 /** 214 * Prune old events to keep the log from growing indefinitely. 215 */ 216 async pruneOldEvents(keepCount = 10000): Promise<void> { 217 this.db 218 .prepare( 219 `DELETE FROM firehose_events 220 WHERE seq < (SELECT MAX(seq) - ? FROM firehose_events)`, 221 ) 222 .run(keepCount); 223 } 224}