this repo has no description
1//! SQLite persistence layer for Jetstream events.
2
3use bevy::prelude::*;
4use crossbeam_channel::{Receiver, Sender};
5use jacquard::types::string::{Did, Nsid};
6use rusqlite::{Connection, params};
7
8pub enum DbWriteRequest {
9 WriteEvents(Vec<DbEvent>),
10}
11
12pub enum DbReadRequest {
13 QueryTimeRange {
14 start_us: i64,
15 end_us: i64,
16 response_tx: Sender<Vec<Vec<rusqlite::types::Value>>>,
17 },
18 GetLatestCursor {
19 response_tx: Sender<Option<i64>>,
20 },
21}
22
23/// Event data for SQLite insertion.
24pub struct DbEvent {
25 pub kind: String,
26 pub did: Did,
27 pub time_us: i64,
28 pub collection: Option<Nsid>,
29 pub message_json: String,
30}
31
32#[derive(Resource, Clone)]
33pub struct DbChannel {
34 pub writer: Sender<DbWriteRequest>,
35 pub reader: Sender<DbReadRequest>,
36}
37
38const CREATE_EVENTS_TABLE: &str = "
39CREATE TABLE IF NOT EXISTS events (
40 id INTEGER PRIMARY KEY,
41 kind TEXT NOT NULL,
42 did TEXT NOT NULL,
43 time_us INTEGER NOT NULL,
44 collection TEXT,
45 message_json TEXT NOT NULL
46);
47";
48
49const CREATE_IDX_TIME_US: &str =
50 "CREATE INDEX IF NOT EXISTS idx_events_time_us ON events(time_us);";
51const CREATE_IDX_COLLECTION: &str =
52 "CREATE INDEX IF NOT EXISTS idx_events_collection ON events(collection);";
53const CREATE_IDX_DID: &str = "CREATE INDEX IF NOT EXISTS idx_events_did ON events(did);";
54
55pub fn spawn_db_thread(mut commands: Commands) {
56 let (write_tx, write_rx) = crossbeam_channel::bounded::<DbWriteRequest>(256);
57 let (read_tx, read_rx) = crossbeam_channel::bounded::<DbReadRequest>(64);
58
59 std::thread::spawn(move || {
60 run_writer(write_rx);
61 });
62
63 std::thread::spawn(move || {
64 run_reader(read_rx);
65 });
66
67 commands.insert_resource(DbChannel {
68 writer: write_tx,
69 reader: read_tx,
70 });
71 info!("SQLite writer + reader threads started");
72}
73
74fn run_writer(rx: Receiver<DbWriteRequest>) {
75 let conn = match open_and_init() {
76 Some(c) => c,
77 None => return,
78 };
79
80 loop {
81 match rx.recv() {
82 Ok(DbWriteRequest::WriteEvents(events)) => {
83 if let Err(e) = write_events(&conn, &events) {
84 error!("failed to write events to SQLite: {e}");
85 }
86 }
87 Err(_) => {
88 info!("SQLite writer shutting down");
89 break;
90 }
91 }
92 }
93}
94
95fn run_reader(rx: Receiver<DbReadRequest>) {
96 let conn = match open_and_init() {
97 Some(c) => c,
98 None => return,
99 };
100
101 loop {
102 match rx.recv() {
103 Ok(DbReadRequest::QueryTimeRange {
104 start_us,
105 end_us,
106 response_tx,
107 }) => {
108 let result = query_time_range(&conn, start_us, end_us);
109 match result {
110 Ok(rows) => {
111 let _ = response_tx.send(rows);
112 }
113 Err(e) => {
114 error!("failed to query time range: {e}");
115 let _ = response_tx.send(vec![]);
116 }
117 }
118 }
119 Ok(DbReadRequest::GetLatestCursor { response_tx }) => match get_latest_cursor(&conn) {
120 Ok(cursor) => {
121 let _ = response_tx.send(cursor);
122 }
123 Err(e) => {
124 error!("failed to get latest cursor: {e}");
125 let _ = response_tx.send(None);
126 }
127 },
128 Err(_) => {
129 info!("SQLite reader shutting down");
130 break;
131 }
132 }
133 }
134}
135
136fn open_and_init() -> Option<Connection> {
137 let conn = match Connection::open("jetstream_events.db") {
138 Ok(c) => c,
139 Err(e) => {
140 error!("failed to open SQLite database: {e}");
141 return None;
142 }
143 };
144
145 if let Err(e) = apply_schema(&conn) {
146 error!("failed to apply SQLite schema: {e}");
147 return None;
148 }
149
150 if let Err(e) = conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;") {
151 error!("failed to set SQLite pragmas: {e}");
152 return None;
153 }
154
155 Some(conn)
156}
157
158fn apply_schema(conn: &Connection) -> rusqlite::Result<()> {
159 let has_message_json: bool = conn
160 .prepare("PRAGMA table_info(events)")?
161 .query_map([], |row| row.get::<_, String>(1))?
162 .filter_map(Result::ok)
163 .any(|col| col == "message_json");
164
165 let table_exists: bool = conn
166 .prepare("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='events'")?
167 .query_row([], |row| row.get::<_, i64>(0))
168 .unwrap_or(0)
169 > 0;
170
171 if table_exists && !has_message_json {
172 warn!("existing events table has incompatible schema — recreating");
173 conn.execute_batch("DROP TABLE events")?;
174 }
175
176 conn.execute_batch(CREATE_EVENTS_TABLE)?;
177 conn.execute_batch(CREATE_IDX_TIME_US)?;
178 conn.execute_batch(CREATE_IDX_COLLECTION)?;
179 conn.execute_batch(CREATE_IDX_DID)?;
180 Ok(())
181}
182
183fn write_events(conn: &Connection, events: &[DbEvent]) -> rusqlite::Result<()> {
184 if events.is_empty() {
185 return Ok(());
186 }
187
188 let tx = conn.unchecked_transaction()?;
189
190 {
191 let mut stmt = tx.prepare_cached(
192 "INSERT INTO events (kind, did, time_us, collection, message_json)
193 VALUES (?1, ?2, ?3, ?4, ?5)",
194 )?;
195
196 for event in events {
197 let collection_str = event.collection.as_ref().map(|n| n.as_str().to_owned());
198 stmt.execute(params![
199 event.kind,
200 event.did.as_str(),
201 event.time_us,
202 collection_str,
203 event.message_json,
204 ])?;
205 }
206 }
207
208 tx.commit()?;
209 Ok(())
210}
211
212fn query_time_range(
213 conn: &Connection,
214 start_us: i64,
215 end_us: i64,
216) -> rusqlite::Result<Vec<Vec<rusqlite::types::Value>>> {
217 let mut stmt = conn.prepare_cached(
218 "SELECT id, kind, did, time_us, collection, message_json
219 FROM events
220 WHERE time_us >= ?1 AND time_us <= ?2
221 ORDER BY time_us ASC",
222 )?;
223
224 let rows = stmt.query_map(params![start_us, end_us], |row| {
225 let col_count = row.as_ref().column_count();
226 let mut values = Vec::with_capacity(col_count);
227 for i in 0..col_count {
228 values.push(row.get::<_, rusqlite::types::Value>(i)?);
229 }
230 Ok(values)
231 })?;
232
233 rows.collect()
234}
235
236fn get_latest_cursor(conn: &Connection) -> rusqlite::Result<Option<i64>> {
237 conn.query_row("SELECT MAX(time_us) FROM events", [], |row| {
238 row.get::<_, Option<i64>>(0)
239 })
240}