use bevy::prelude::*; use crossbeam_channel::{Receiver, bounded}; use crossbeam_queue::ArrayQueue; use futures_util::StreamExt; use jacquard::DefaultStr; use jacquard::jetstream::{JetstreamMessage, JetstreamParams}; use jacquard::types::string::{Did, Nsid}; use jacquard::xrpc::{BasicSubscriptionClient, SubscriptionClient}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI64}; use std::time::Duration; use crate::db::{DbChannel, DbEvent, DbReadRequest, DbWriteRequest}; use crate::helix::{self, HelixHierarchy, HelixState}; use crate::net::AsyncTungsteniteClient; use jacquard::api::app_bsky::feed::like::LikeRecord; use jacquard::api::app_bsky::feed::post::PostRecord; use jacquard::api::app_bsky::graph::follow::FollowRecord; use jacquard::common::types::collection::Collection; pub type IngestEvent = JetstreamMessage; /// Lock-free ring buffer: producer force_push, consumer pops per frame. #[derive(Resource)] pub struct JetstreamChannels { pub queue: Arc>, } #[derive(Resource)] pub struct EventDotAssets { pub mesh: Handle, pub post: Handle, pub like: Handle, pub follow: Handle, pub shiny: Handle, pub default_bsky: Handle, } #[derive(Resource)] pub struct SharedTimeState { pub anchor_us: Arc, pub latest_us: Arc, pub initialized: Arc, } #[derive(Component, Debug, Clone)] pub struct JetstreamEventMarker(pub JetstreamMessage); impl JetstreamEventMarker { pub fn did(&self) -> &Did { match &self.0 { JetstreamMessage::Commit { did, .. } => did, JetstreamMessage::Identity { did, .. } => did, JetstreamMessage::Account { did, .. } => did, } } pub fn time_us(&self) -> i64 { match &self.0 { JetstreamMessage::Commit { time_us, .. } => *time_us, JetstreamMessage::Identity { time_us, .. } => *time_us, JetstreamMessage::Account { time_us, .. } => *time_us, } } pub fn collection(&self) -> Option<&Nsid> { match &self.0 { JetstreamMessage::Commit { commit, .. } => Some(&commit.collection), _ => None, } } pub fn record(&self) -> Option<&jacquard::Data> { match &self.0 { JetstreamMessage::Commit { commit, .. } => commit.record.as_ref(), _ => None, } } } #[derive(Component)] pub struct NeedsParticle; /// Live = bloom outward, Historical = bloom inward. #[derive(Component, Debug, Clone, Copy, PartialEq)] pub enum EventSource { Live, Historical, } /// `t = (time_us - anchor_us) / MICROS_PER_TURN`. 1 t-unit = 1 minute. #[derive(Resource, Debug)] pub struct TimeWindow { pub anchor_us: i64, pub latest_t: f32, pub is_initialized: bool, } impl TimeWindow { pub const MICROS_PER_TURN: f64 = 60_000_000.0; pub fn time_to_t(&self, time_us: i64) -> f32 { ((time_us - self.anchor_us) as f64 / Self::MICROS_PER_TURN) as f32 } } /// Sorted, non-overlapping `(start_us, end_us)` intervals already loaded from SQLite. #[derive(Resource, Debug, Default)] pub struct LoadedRanges { pub ranges: Vec<(i64, i64)>, } impl LoadedRanges { pub fn is_covered(&self, start_us: i64, end_us: i64) -> bool { self.ranges .iter() .any(|(lo, hi)| *lo <= start_us && *hi >= end_us) } pub fn insert(&mut self, start_us: i64, end_us: i64) { self.ranges.push((start_us, end_us)); self.ranges.sort_unstable_by_key(|r| r.0); let mut merged: Vec<(i64, i64)> = Vec::with_capacity(self.ranges.len()); for (lo, hi) in self.ranges.drain(..) { if let Some(last) = merged.last_mut() { if lo <= last.1 + 1 { last.1 = last.1.max(hi); continue; } } merged.push((lo, hi)); } self.ranges = merged; } } #[derive(Resource, Default)] pub struct HistoryQueryState { pub pending: Option>>>, pub pending_range: Option<(i64, i64)>, } const SAMPLE_RATE: u64 = 10; const RING_BUFFER_CAPACITY: usize = 1024; fn emissive_mat( materials: &mut Assets, base: Color, emissive: LinearRgba, ) -> Handle { materials.add(StandardMaterial { base_color: base, emissive, ..default() }) } pub fn setup_event_dots( mut commands: Commands, mut meshes: ResMut>, mut materials: ResMut>, ) { let mesh = meshes.add(Sphere::new(0.002).mesh().ico(2).unwrap()); commands.insert_resource(EventDotAssets { mesh, post: emissive_mat( &mut materials, Color::srgb(0.08, 0.15, 0.4), LinearRgba::new(0.2, 0.4, 1.5, 1.0), ), like: emissive_mat( &mut materials, Color::srgb(0.4, 0.08, 0.25), LinearRgba::new(1.5, 0.2, 0.8, 1.0), ), follow: emissive_mat( &mut materials, Color::srgb(0.08, 0.4, 0.2), LinearRgba::new(0.2, 1.5, 0.4, 1.0), ), shiny: emissive_mat( &mut materials, Color::srgb(0.4, 0.35, 0.08), LinearRgba::new(1.5, 1.2, 0.2, 1.0), ), default_bsky: emissive_mat( &mut materials, Color::srgb(0.15, 0.15, 0.2), LinearRgba::new(0.4, 0.4, 0.5, 1.0), ), }); } pub fn spawn_jetstream_consumer(mut commands: Commands, db_channel: Res) { use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; let queue = Arc::new(ArrayQueue::::new(RING_BUFFER_CAPACITY)); let anchor_us = Arc::new(AtomicI64::new(0)); let latest_us = Arc::new(AtomicI64::new(0)); let initialized = Arc::new(AtomicBool::new(false)); commands.insert_resource(JetstreamChannels { queue: queue.clone(), }); commands.insert_resource(SharedTimeState { anchor_us: anchor_us.clone(), latest_us: latest_us.clone(), initialized: initialized.clone(), }); let queue_handle = queue; let anchor_handle = anchor_us; let latest_handle = latest_us; let init_handle = initialized; let db_writer = db_channel.writer.clone(); let db_reader = db_channel.reader.clone(); bevy::tasks::IoTaskPool::get() .spawn(async move { let client = AsyncTungsteniteClient; // Query latest cursor from SQLite for reconnection. let initial_cursor = { let (resp_tx, resp_rx) = bounded::>(1); if db_reader .send(DbReadRequest::GetLatestCursor { response_tx: resp_tx, }) .is_err() { warn!("failed to send GetLatestCursor to SQLite worker"); None } else { match resp_rx.recv() { Ok(cursor) => { if let Some(c) = cursor { info!("resuming Jetstream from SQLite cursor: {c}"); } cursor } Err(_) => None, } } }; let base_uri = match jacquard::deps::fluent_uri::Uri::parse( "wss://jetstream2.us-east.bsky.network".to_string(), ) { Ok(uri) => uri, Err(_) => { error!("failed to parse Jetstream URI"); return; } }; let sub_client = BasicSubscriptionClient::new(client, base_uri); let params = JetstreamParams::::new() .wanted_collections(vec![]) .compress(false) .build(); let mut cursor: Option = initial_cursor; let base_params = params; let mut sample_counter: u64 = 0; loop { let mut reconnect_params = base_params.clone(); if let Some(c) = cursor { reconnect_params.cursor = Some(c); } match sub_client.subscribe(&reconnect_params).await { Ok(subscription) => { let (_sink, mut stream) = subscription.into_stream(); let mut db_batch: Vec = Vec::with_capacity(100); let mut last_flush = std::time::Instant::now(); while let Some(msg_result) = stream.next().await { match msg_result { Ok(msg) => { let time_us = match &msg { JetstreamMessage::Commit { time_us, .. } => *time_us, JetstreamMessage::Identity { time_us, .. } => *time_us, JetstreamMessage::Account { time_us, .. } => *time_us, }; cursor = Some(time_us); if !init_handle.load(Ordering::Relaxed) { anchor_handle.store(time_us, Ordering::Relaxed); init_handle.store(true, Ordering::Release); } latest_handle.fetch_max(time_us, Ordering::Relaxed); if let Some(db_event) = jetstream_to_db_event(&msg) { db_batch.push(db_event); } let is_high_volume = match &msg { JetstreamMessage::Commit { commit, .. } => { let col: &str = commit.collection.as_ref(); col == ::NSID || col == ::NSID } _ => false, }; let send_to_bevy = if is_high_volume { sample_counter += 1; sample_counter % SAMPLE_RATE == 0 } else { true }; if send_to_bevy { queue_handle.force_push(msg); } if db_batch.len() >= 100 || last_flush.elapsed() >= Duration::from_millis(500) { flush_db_batch(&db_writer, &mut db_batch); last_flush = std::time::Instant::now(); } } Err(e) => { warn!("Jetstream stream error: {e:?}, reconnecting"); flush_db_batch(&db_writer, &mut db_batch); break; } } } } Err(e) => { warn!("Jetstream connection failed: {e:?}"); } } futures_timer::Delay::new(Duration::from_secs(2)).await; } }) .detach(); } fn flush_db_batch(db_writer: &crossbeam_channel::Sender, batch: &mut Vec) { if batch.is_empty() { return; } let events = std::mem::take(batch); if db_writer .try_send(DbWriteRequest::WriteEvents(events)) .is_err() { warn!("SQLite write channel full or closed, dropping batch"); } } fn jetstream_to_db_event(msg: &JetstreamMessage) -> Option { let message_json = match serde_json::to_string(msg) { Ok(json) => json, Err(e) => { warn!("failed to serialize JetstreamMessage to JSON: {}", e); return None; } }; let (kind, did, time_us, collection) = match msg { JetstreamMessage::Commit { did, time_us, commit, } => ( "commit".to_owned(), did.clone(), *time_us, Some(commit.collection.clone()), ), JetstreamMessage::Identity { did, time_us, .. } => { ("identity".to_owned(), did.clone(), *time_us, None) } JetstreamMessage::Account { did, time_us, .. } => { ("account".to_owned(), did.clone(), *time_us, None) } }; Some(DbEvent { kind, did, time_us, collection, message_json, }) } pub fn drain_jetstream_events( mut commands: Commands, channels: Res, hierarchy: Res, state: Res, mut time_window: ResMut, shared_time: Res, dot_assets: Res, ) { use std::sync::atomic::Ordering; if shared_time.initialized.load(Ordering::Acquire) { if !time_window.is_initialized { time_window.anchor_us = shared_time.anchor_us.load(Ordering::Relaxed); time_window.is_initialized = true; } let latest_us = shared_time.latest_us.load(Ordering::Relaxed); time_window.latest_t = time_window.time_to_t(latest_us); } // Paused: discard ring buffer (SQLite has everything for replay). if !state.auto_follow { while channels.queue.pop().is_some() {} return; } // Too far behind latest_t — discard to avoid off-screen dots. if time_window.is_initialized { let lag = time_window.latest_t - state.focal_time; if lag > 0.5 { while channels.queue.pop().is_some() {} return; } } let mut spawned = 0u32; while let Some(event) = channels.queue.pop() { if spawned >= 20 { break; } let time_us = match &event { JetstreamMessage::Commit { time_us, .. } => *time_us, JetstreamMessage::Identity { time_us, .. } => *time_us, JetstreamMessage::Account { time_us, .. } => *time_us, }; let t = time_window.time_to_t(time_us); let frame = helix::eval_coil(t, hierarchy.levels.len() - 1, &hierarchy); let dot_mat = match &event { JetstreamMessage::Commit { commit, .. } => { let col: &str = commit.collection.as_ref(); if col == ::NSID { dot_assets.post.clone() } else if col == ::NSID { dot_assets.like.clone() } else if col == ::NSID { dot_assets.follow.clone() } else if col.starts_with("app.bsky.") { dot_assets.default_bsky.clone() } else { dot_assets.shiny.clone() } } _ => dot_assets.shiny.clone(), }; commands.spawn(( JetstreamEventMarker(event), Transform::from_translation(frame.position), Visibility::default(), NeedsParticle, EventSource::Live, Mesh3d(dot_assets.mesh.clone()), MeshMaterial3d(dot_mat), )); spawned += 1; } } /// Startup system: query SQLite for recent events and spawn them as historical entities. pub fn load_historical_events( mut commands: Commands, db_channel: Res, hierarchy: Res, state: Res, mut time_window: ResMut, mut loaded_ranges: ResMut, ) { let latest_us = { let (resp_tx, resp_rx) = bounded::>(1); if db_channel .reader .send(DbReadRequest::GetLatestCursor { response_tx: resp_tx, }) .is_err() { warn!("load_historical_events: failed to send GetLatestCursor"); return; } match resp_rx.recv() { Ok(cursor) => cursor, Err(_) => { warn!("load_historical_events: SQLite worker closed before returning cursor"); return; } } }; let Some(end_us) = latest_us else { info!("load_historical_events: database is empty, skipping historical load"); return; }; const HISTORY_WINDOW_US: i64 = 60_000_000; let start_us = end_us - HISTORY_WINDOW_US; info!( "load_historical_events: loading events from {} to {} (1 hour window)", start_us, end_us ); let (resp_tx, resp_rx) = bounded::>>(1); if db_channel .reader .send(DbReadRequest::QueryTimeRange { start_us, end_us, response_tx: resp_tx, }) .is_err() { warn!("load_historical_events: failed to send QueryTimeRange"); return; } let rows = match resp_rx.recv() { Ok(r) => r, Err(_) => { warn!("load_historical_events: SQLite worker closed before returning rows"); return; } }; info!( "load_historical_events: received {} historical rows", rows.len() ); if !time_window.is_initialized && let Some(first_row) = rows.first() && let Some(t) = row_time_us(first_row) { time_window.anchor_us = t; time_window.is_initialized = true; info!( "load_historical_events: initialized TimeWindow anchor_us={}", t ); } let count = spawn_historical_entities(&mut commands, &rows, &hierarchy, &state, &mut time_window); info!( "load_historical_events: spawned {} historical entities", count ); loaded_ranges.insert(start_us, end_us); } pub fn poll_history_responses( mut commands: Commands, mut query_state: ResMut, mut loaded_ranges: ResMut, hierarchy: Res, state: Res, mut time_window: ResMut, ) { let Some(receiver) = &query_state.pending else { return; }; match receiver.try_recv() { Ok(rows) => { let count = spawn_historical_entities( &mut commands, &rows, &hierarchy, &state, &mut time_window, ); info!( "poll_history_responses: spawned {} historical entities from pan query", count ); if let Some((start_us, end_us)) = query_state.pending_range.take() { loaded_ranges.insert(start_us, end_us); } query_state.pending = None; } Err(crossbeam_channel::TryRecvError::Empty) => {} Err(crossbeam_channel::TryRecvError::Disconnected) => { warn!("poll_history_responses: SQLite worker closed response channel"); query_state.pending = None; query_state.pending_range = None; } } } pub fn row_time_us(row: &[rusqlite::types::Value]) -> Option { match row.get(3)? { rusqlite::types::Value::Integer(v) => Some(*v), _ => None, } } fn row_text(row: &[rusqlite::types::Value], idx: usize) -> Option<&str> { match row.get(idx)? { rusqlite::types::Value::Text(s) => Some(s.as_str()), _ => None, } } fn row_to_message(row: &[rusqlite::types::Value]) -> Option { let json = row_text(row, 5)?; match serde_json::from_str::(json) { Ok(msg) => Some(msg), Err(e) => { warn!("row_to_message: failed to deserialize message_json: {}", e); None } } } fn spawn_historical_entities( commands: &mut Commands, rows: &[Vec], hierarchy: &HelixHierarchy, state: &HelixState, time_window: &mut TimeWindow, ) -> usize { let mut count = 0; for row in rows { let Some(time_us) = row_time_us(row) else { warn!("spawn_historical_entities: row missing time_us, skipping"); continue; }; let Some(msg) = row_to_message(row) else { continue; }; if !time_window.is_initialized { time_window.anchor_us = time_us; time_window.is_initialized = true; } let t = time_window.time_to_t(time_us); time_window.latest_t = time_window.latest_t.max(t); let frame = helix::eval_coil(t, hierarchy.levels.len() - 1, hierarchy); commands.spawn(( JetstreamEventMarker(msg), Transform::from_translation(frame.position), Visibility::default(), EventSource::Historical, )); count += 1; } count } #[cfg(test)] mod tests { use super::*; use jacquard::jetstream::{ CommitOperation, JetstreamAccount, JetstreamCommit, JetstreamIdentity, }; use jacquard::types::string::{Cid, Datetime, Did, Nsid, Rkey}; use std::str::FromStr; #[test] fn is_covered_empty_ranges_returns_false() { let ranges = LoadedRanges::default(); assert!(!ranges.is_covered(100, 200)); } #[test] fn is_covered_exact_match() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 200); assert!(ranges.is_covered(100, 200)); } #[test] fn is_covered_subset_of_loaded_range() { let mut ranges = LoadedRanges::default(); ranges.insert(0, 1000); assert!(ranges.is_covered(100, 200)); assert!(ranges.is_covered(0, 1000)); assert!(ranges.is_covered(500, 999)); } #[test] fn is_covered_partial_overlap_returns_false() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 200); assert!(!ranges.is_covered(50, 200)); assert!(!ranges.is_covered(100, 250)); assert!(!ranges.is_covered(50, 250)); } #[test] fn is_covered_no_overlap_returns_false() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 200); assert!(!ranges.is_covered(300, 400)); assert!(!ranges.is_covered(0, 50)); } #[test] fn insert_single_range() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 200); assert_eq!(ranges.ranges, vec![(100, 200)]); } #[test] fn insert_non_overlapping_ranges_stay_separate() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 200); ranges.insert(400, 500); assert_eq!(ranges.ranges, vec![(100, 200), (400, 500)]); } #[test] fn insert_overlapping_ranges_merge() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 300); ranges.insert(200, 400); // Overlapping — should merge into a single interval. assert_eq!(ranges.ranges, vec![(100, 400)]); } #[test] fn insert_adjacent_ranges_merge() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 200); // Adjacent (200 + 1 == 201) — should merge. ranges.insert(201, 300); assert_eq!(ranges.ranges, vec![(100, 300)]); } #[test] fn insert_idempotent() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 200); ranges.insert(100, 200); // Identical insert should not add a duplicate. assert_eq!(ranges.ranges, vec![(100, 200)]); } #[test] fn insert_multiple_merges_into_one() { let mut ranges = LoadedRanges::default(); ranges.insert(100, 200); ranges.insert(300, 400); ranges.insert(500, 600); // A new range that spans all three should collapse them all. ranges.insert(150, 550); assert_eq!(ranges.ranges, vec![(100, 600)]); } fn make_commit_message() -> JetstreamMessage { let did = Did::new_static("did:plc:testuser123456789").expect("valid DID"); let collection = Nsid::new_static("app.bsky.feed.post").expect("valid NSID"); let rkey = Rkey::new_static("3jui7kd54c2").expect("valid rkey"); let commit = JetstreamCommit { rev: jacquard::deps::smol_str::SmolStr::new_static("rev1"), operation: jacquard::jetstream::CommitOperation::Create, collection, rkey, record: None, cid: None, }; JetstreamMessage::Commit { did, time_us: 1_700_000_000_000_000, commit, } } fn make_identity_message() -> JetstreamMessage { let did = Did::new_static("did:plc:testuser123456789").expect("valid DID"); let identity = JetstreamIdentity { did: did.clone(), handle: None, seq: 42, time: Datetime::now(), }; JetstreamMessage::Identity { did, time_us: 1_700_000_000_000_001, identity, } } fn make_account_message() -> JetstreamMessage { let did = Did::new_static("did:plc:testuser123456789").expect("valid DID"); let account = JetstreamAccount { did: did.clone(), active: false, seq: 99, time: Datetime::now(), status: None, }; JetstreamMessage::Account { did, time_us: 1_700_000_000_000_002, account, } } #[test] fn commit_message_roundtrip() { let original = make_commit_message(); let json = serde_json::to_string(&original).expect("serialize"); let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize"); assert_eq!(original, decoded); } #[test] fn commit_with_populated_fields_roundtrip() { let did = Did::new_static("did:plc:testuser123456789").expect("valid DID"); let collection = Nsid::new_static("app.bsky.feed.post").expect("valid NSID"); let rkey = Rkey::new_static("3jui7kd54c2").expect("valid rkey"); let cid = Cid::from_str("bafyreihffx5ateunq6ghc7q7x2s3stoabwrq3cq").unwrap(); // Build a Data value from JSON — avoids dealing with jacquard's Object wrapper directly let record: jacquard::Data = serde_json::from_str( r#"{"text":"hello from the test","createdAt":"2024-01-01T00:00:00Z"}"#, ) .expect("valid JSON → Data"); let commit = JetstreamCommit { rev: jacquard::deps::smol_str::SmolStr::new_static("rev1"), operation: CommitOperation::Create, collection, rkey, record: Some(record), cid: Some(cid), }; let original = JetstreamMessage::Commit { did, time_us: 1_700_000_000_000_000, commit, }; let json = serde_json::to_string(&original).expect("serialize"); let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize"); assert_eq!(original, decoded); } #[test] fn identity_message_roundtrip() { let original = make_identity_message(); let json = serde_json::to_string(&original).expect("serialize"); let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize"); assert_eq!(original, decoded); } #[test] fn account_message_roundtrip() { let original = make_account_message(); let json = serde_json::to_string(&original).expect("serialize"); let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize"); assert_eq!(original, decoded); } /// Verify that `jetstream_to_db_event` followed by `row_to_message` is a /// lossless round-trip. We can't go through real SQLite in a unit test, so /// we exercise the JSON encode/decode path directly. #[test] fn jetstream_to_db_event_round_trips_through_json() { let original = make_commit_message(); let db_event = jetstream_to_db_event(&original).expect("should produce DbEvent"); // Simulate the DB row: id=0, kind=1, did=2, time_us=3, collection=4, message_json=5 let row = vec![ rusqlite::types::Value::Integer(1), rusqlite::types::Value::Text(db_event.kind.clone()), rusqlite::types::Value::Text(db_event.did.as_str().to_owned()), rusqlite::types::Value::Integer(db_event.time_us), rusqlite::types::Value::Text( db_event .collection .as_ref() .map(|c| c.as_str().to_owned()) .unwrap_or_default(), ), rusqlite::types::Value::Text(db_event.message_json.clone()), ]; let decoded = row_to_message(&row).expect("should decode row"); assert_eq!(original, decoded); } }