this repo has no description
at main 240 lines 6.8 kB view raw
1//! SQLite persistence layer for Jetstream events. 2 3use bevy::prelude::*; 4use crossbeam_channel::{Receiver, Sender}; 5use jacquard::types::string::{Did, Nsid}; 6use rusqlite::{Connection, params}; 7 8pub enum DbWriteRequest { 9 WriteEvents(Vec<DbEvent>), 10} 11 12pub enum DbReadRequest { 13 QueryTimeRange { 14 start_us: i64, 15 end_us: i64, 16 response_tx: Sender<Vec<Vec<rusqlite::types::Value>>>, 17 }, 18 GetLatestCursor { 19 response_tx: Sender<Option<i64>>, 20 }, 21} 22 23/// Event data for SQLite insertion. 24pub struct DbEvent { 25 pub kind: String, 26 pub did: Did, 27 pub time_us: i64, 28 pub collection: Option<Nsid>, 29 pub message_json: String, 30} 31 32#[derive(Resource, Clone)] 33pub struct DbChannel { 34 pub writer: Sender<DbWriteRequest>, 35 pub reader: Sender<DbReadRequest>, 36} 37 38const CREATE_EVENTS_TABLE: &str = " 39CREATE TABLE IF NOT EXISTS events ( 40 id INTEGER PRIMARY KEY, 41 kind TEXT NOT NULL, 42 did TEXT NOT NULL, 43 time_us INTEGER NOT NULL, 44 collection TEXT, 45 message_json TEXT NOT NULL 46); 47"; 48 49const CREATE_IDX_TIME_US: &str = 50 "CREATE INDEX IF NOT EXISTS idx_events_time_us ON events(time_us);"; 51const CREATE_IDX_COLLECTION: &str = 52 "CREATE INDEX IF NOT EXISTS idx_events_collection ON events(collection);"; 53const CREATE_IDX_DID: &str = "CREATE INDEX IF NOT EXISTS idx_events_did ON events(did);"; 54 55pub fn spawn_db_thread(mut commands: Commands) { 56 let (write_tx, write_rx) = crossbeam_channel::bounded::<DbWriteRequest>(256); 57 let (read_tx, read_rx) = crossbeam_channel::bounded::<DbReadRequest>(64); 58 59 std::thread::spawn(move || { 60 run_writer(write_rx); 61 }); 62 63 std::thread::spawn(move || { 64 run_reader(read_rx); 65 }); 66 67 commands.insert_resource(DbChannel { 68 writer: write_tx, 69 reader: read_tx, 70 }); 71 info!("SQLite writer + reader threads started"); 72} 73 74fn run_writer(rx: Receiver<DbWriteRequest>) { 75 let conn = match open_and_init() { 76 Some(c) => c, 77 None => return, 78 }; 79 80 loop { 81 match rx.recv() { 82 Ok(DbWriteRequest::WriteEvents(events)) => { 83 if let Err(e) = write_events(&conn, &events) { 84 error!("failed to write events to SQLite: {e}"); 85 } 86 } 87 Err(_) => { 88 info!("SQLite writer shutting down"); 89 break; 90 } 91 } 92 } 93} 94 95fn run_reader(rx: Receiver<DbReadRequest>) { 96 let conn = match open_and_init() { 97 Some(c) => c, 98 None => return, 99 }; 100 101 loop { 102 match rx.recv() { 103 Ok(DbReadRequest::QueryTimeRange { 104 start_us, 105 end_us, 106 response_tx, 107 }) => { 108 let result = query_time_range(&conn, start_us, end_us); 109 match result { 110 Ok(rows) => { 111 let _ = response_tx.send(rows); 112 } 113 Err(e) => { 114 error!("failed to query time range: {e}"); 115 let _ = response_tx.send(vec![]); 116 } 117 } 118 } 119 Ok(DbReadRequest::GetLatestCursor { response_tx }) => match get_latest_cursor(&conn) { 120 Ok(cursor) => { 121 let _ = response_tx.send(cursor); 122 } 123 Err(e) => { 124 error!("failed to get latest cursor: {e}"); 125 let _ = response_tx.send(None); 126 } 127 }, 128 Err(_) => { 129 info!("SQLite reader shutting down"); 130 break; 131 } 132 } 133 } 134} 135 136fn open_and_init() -> Option<Connection> { 137 let conn = match Connection::open("jetstream_events.db") { 138 Ok(c) => c, 139 Err(e) => { 140 error!("failed to open SQLite database: {e}"); 141 return None; 142 } 143 }; 144 145 if let Err(e) = apply_schema(&conn) { 146 error!("failed to apply SQLite schema: {e}"); 147 return None; 148 } 149 150 if let Err(e) = conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;") { 151 error!("failed to set SQLite pragmas: {e}"); 152 return None; 153 } 154 155 Some(conn) 156} 157 158fn apply_schema(conn: &Connection) -> rusqlite::Result<()> { 159 let has_message_json: bool = conn 160 .prepare("PRAGMA table_info(events)")? 161 .query_map([], |row| row.get::<_, String>(1))? 162 .filter_map(Result::ok) 163 .any(|col| col == "message_json"); 164 165 let table_exists: bool = conn 166 .prepare("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='events'")? 167 .query_row([], |row| row.get::<_, i64>(0)) 168 .unwrap_or(0) 169 > 0; 170 171 if table_exists && !has_message_json { 172 warn!("existing events table has incompatible schema — recreating"); 173 conn.execute_batch("DROP TABLE events")?; 174 } 175 176 conn.execute_batch(CREATE_EVENTS_TABLE)?; 177 conn.execute_batch(CREATE_IDX_TIME_US)?; 178 conn.execute_batch(CREATE_IDX_COLLECTION)?; 179 conn.execute_batch(CREATE_IDX_DID)?; 180 Ok(()) 181} 182 183fn write_events(conn: &Connection, events: &[DbEvent]) -> rusqlite::Result<()> { 184 if events.is_empty() { 185 return Ok(()); 186 } 187 188 let tx = conn.unchecked_transaction()?; 189 190 { 191 let mut stmt = tx.prepare_cached( 192 "INSERT INTO events (kind, did, time_us, collection, message_json) 193 VALUES (?1, ?2, ?3, ?4, ?5)", 194 )?; 195 196 for event in events { 197 let collection_str = event.collection.as_ref().map(|n| n.as_str().to_owned()); 198 stmt.execute(params![ 199 event.kind, 200 event.did.as_str(), 201 event.time_us, 202 collection_str, 203 event.message_json, 204 ])?; 205 } 206 } 207 208 tx.commit()?; 209 Ok(()) 210} 211 212fn query_time_range( 213 conn: &Connection, 214 start_us: i64, 215 end_us: i64, 216) -> rusqlite::Result<Vec<Vec<rusqlite::types::Value>>> { 217 let mut stmt = conn.prepare_cached( 218 "SELECT id, kind, did, time_us, collection, message_json 219 FROM events 220 WHERE time_us >= ?1 AND time_us <= ?2 221 ORDER BY time_us ASC", 222 )?; 223 224 let rows = stmt.query_map(params![start_us, end_us], |row| { 225 let col_count = row.as_ref().column_count(); 226 let mut values = Vec::with_capacity(col_count); 227 for i in 0..col_count { 228 values.push(row.get::<_, rusqlite::types::Value>(i)?); 229 } 230 Ok(values) 231 })?; 232 233 rows.collect() 234} 235 236fn get_latest_cursor(conn: &Connection) -> rusqlite::Result<Option<i64>> { 237 conn.query_row("SELECT MAX(time_us) FROM events", [], |row| { 238 row.get::<_, Option<i64>>(0) 239 }) 240}