//! SQLite persistence layer for Jetstream events. use bevy::prelude::*; use crossbeam_channel::{Receiver, Sender}; use jacquard::types::string::{Did, Nsid}; use rusqlite::{Connection, params}; pub enum DbWriteRequest { WriteEvents(Vec), } pub enum DbReadRequest { QueryTimeRange { start_us: i64, end_us: i64, response_tx: Sender>>, }, GetLatestCursor { response_tx: Sender>, }, } /// Event data for SQLite insertion. pub struct DbEvent { pub kind: String, pub did: Did, pub time_us: i64, pub collection: Option, pub message_json: String, } #[derive(Resource, Clone)] pub struct DbChannel { pub writer: Sender, pub reader: Sender, } const CREATE_EVENTS_TABLE: &str = " CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY, kind TEXT NOT NULL, did TEXT NOT NULL, time_us INTEGER NOT NULL, collection TEXT, message_json TEXT NOT NULL ); "; const CREATE_IDX_TIME_US: &str = "CREATE INDEX IF NOT EXISTS idx_events_time_us ON events(time_us);"; const CREATE_IDX_COLLECTION: &str = "CREATE INDEX IF NOT EXISTS idx_events_collection ON events(collection);"; const CREATE_IDX_DID: &str = "CREATE INDEX IF NOT EXISTS idx_events_did ON events(did);"; pub fn spawn_db_thread(mut commands: Commands) { let (write_tx, write_rx) = crossbeam_channel::bounded::(256); let (read_tx, read_rx) = crossbeam_channel::bounded::(64); std::thread::spawn(move || { run_writer(write_rx); }); std::thread::spawn(move || { run_reader(read_rx); }); commands.insert_resource(DbChannel { writer: write_tx, reader: read_tx, }); info!("SQLite writer + reader threads started"); } fn run_writer(rx: Receiver) { let conn = match open_and_init() { Some(c) => c, None => return, }; loop { match rx.recv() { Ok(DbWriteRequest::WriteEvents(events)) => { if let Err(e) = write_events(&conn, &events) { error!("failed to write events to SQLite: {e}"); } } Err(_) => { info!("SQLite writer shutting down"); break; } } } } fn run_reader(rx: Receiver) { let conn = match open_and_init() { Some(c) => c, None => return, }; loop { match rx.recv() { Ok(DbReadRequest::QueryTimeRange { start_us, end_us, response_tx, }) => { let result = query_time_range(&conn, start_us, end_us); match result { Ok(rows) => { let _ = response_tx.send(rows); } Err(e) => { error!("failed to query time range: {e}"); let _ = response_tx.send(vec![]); } } } Ok(DbReadRequest::GetLatestCursor { response_tx }) => match get_latest_cursor(&conn) { Ok(cursor) => { let _ = response_tx.send(cursor); } Err(e) => { error!("failed to get latest cursor: {e}"); let _ = response_tx.send(None); } }, Err(_) => { info!("SQLite reader shutting down"); break; } } } } fn open_and_init() -> Option { let conn = match Connection::open("jetstream_events.db") { Ok(c) => c, Err(e) => { error!("failed to open SQLite database: {e}"); return None; } }; if let Err(e) = apply_schema(&conn) { error!("failed to apply SQLite schema: {e}"); return None; } if let Err(e) = conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;") { error!("failed to set SQLite pragmas: {e}"); return None; } Some(conn) } fn apply_schema(conn: &Connection) -> rusqlite::Result<()> { let has_message_json: bool = conn .prepare("PRAGMA table_info(events)")? .query_map([], |row| row.get::<_, String>(1))? .filter_map(Result::ok) .any(|col| col == "message_json"); let table_exists: bool = conn .prepare("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='events'")? .query_row([], |row| row.get::<_, i64>(0)) .unwrap_or(0) > 0; if table_exists && !has_message_json { warn!("existing events table has incompatible schema — recreating"); conn.execute_batch("DROP TABLE events")?; } conn.execute_batch(CREATE_EVENTS_TABLE)?; conn.execute_batch(CREATE_IDX_TIME_US)?; conn.execute_batch(CREATE_IDX_COLLECTION)?; conn.execute_batch(CREATE_IDX_DID)?; Ok(()) } fn write_events(conn: &Connection, events: &[DbEvent]) -> rusqlite::Result<()> { if events.is_empty() { return Ok(()); } let tx = conn.unchecked_transaction()?; { let mut stmt = tx.prepare_cached( "INSERT INTO events (kind, did, time_us, collection, message_json) VALUES (?1, ?2, ?3, ?4, ?5)", )?; for event in events { let collection_str = event.collection.as_ref().map(|n| n.as_str().to_owned()); stmt.execute(params![ event.kind, event.did.as_str(), event.time_us, collection_str, event.message_json, ])?; } } tx.commit()?; Ok(()) } fn query_time_range( conn: &Connection, start_us: i64, end_us: i64, ) -> rusqlite::Result>> { let mut stmt = conn.prepare_cached( "SELECT id, kind, did, time_us, collection, message_json FROM events WHERE time_us >= ?1 AND time_us <= ?2 ORDER BY time_us ASC", )?; let rows = stmt.query_map(params![start_us, end_us], |row| { let col_count = row.as_ref().column_count(); let mut values = Vec::with_capacity(col_count); for i in 0..col_count { values.push(row.get::<_, rusqlite::types::Value>(i)?); } Ok(values) })?; rows.collect() } fn get_latest_cursor(conn: &Connection) -> rusqlite::Result> { conn.query_row("SELECT MAX(time_us) FROM events", [], |row| { row.get::<_, Option>(0) }) }