this repo has no description
at main 921 lines 30 kB view raw
1use bevy::prelude::*; 2use crossbeam_channel::{Receiver, bounded}; 3use crossbeam_queue::ArrayQueue; 4use futures_util::StreamExt; 5use jacquard::DefaultStr; 6use jacquard::jetstream::{JetstreamMessage, JetstreamParams}; 7use jacquard::types::string::{Did, Nsid}; 8use jacquard::xrpc::{BasicSubscriptionClient, SubscriptionClient}; 9use std::sync::Arc; 10use std::sync::atomic::{AtomicBool, AtomicI64}; 11use std::time::Duration; 12 13use crate::db::{DbChannel, DbEvent, DbReadRequest, DbWriteRequest}; 14use crate::helix::{self, HelixHierarchy, HelixState}; 15use crate::net::AsyncTungsteniteClient; 16 17use jacquard::api::app_bsky::feed::like::LikeRecord; 18use jacquard::api::app_bsky::feed::post::PostRecord; 19use jacquard::api::app_bsky::graph::follow::FollowRecord; 20use jacquard::common::types::collection::Collection; 21 22pub type IngestEvent = JetstreamMessage; 23 24/// Lock-free ring buffer: producer force_push, consumer pops per frame. 25#[derive(Resource)] 26pub struct JetstreamChannels { 27 pub queue: Arc<ArrayQueue<IngestEvent>>, 28} 29 30#[derive(Resource)] 31pub struct EventDotAssets { 32 pub mesh: Handle<Mesh>, 33 pub post: Handle<StandardMaterial>, 34 pub like: Handle<StandardMaterial>, 35 pub follow: Handle<StandardMaterial>, 36 pub shiny: Handle<StandardMaterial>, 37 pub default_bsky: Handle<StandardMaterial>, 38} 39 40#[derive(Resource)] 41pub struct SharedTimeState { 42 pub anchor_us: Arc<AtomicI64>, 43 pub latest_us: Arc<AtomicI64>, 44 pub initialized: Arc<AtomicBool>, 45} 46 47#[derive(Component, Debug, Clone)] 48pub struct JetstreamEventMarker(pub JetstreamMessage); 49 50impl JetstreamEventMarker { 51 pub fn did(&self) -> &Did { 52 match &self.0 { 53 JetstreamMessage::Commit { did, .. } => did, 54 JetstreamMessage::Identity { did, .. } => did, 55 JetstreamMessage::Account { did, .. } => did, 56 } 57 } 58 59 pub fn time_us(&self) -> i64 { 60 match &self.0 { 61 JetstreamMessage::Commit { time_us, .. } => *time_us, 62 JetstreamMessage::Identity { time_us, .. } => *time_us, 63 JetstreamMessage::Account { time_us, .. } => *time_us, 64 } 65 } 66 67 pub fn collection(&self) -> Option<&Nsid> { 68 match &self.0 { 69 JetstreamMessage::Commit { commit, .. } => Some(&commit.collection), 70 _ => None, 71 } 72 } 73 74 pub fn record(&self) -> Option<&jacquard::Data> { 75 match &self.0 { 76 JetstreamMessage::Commit { commit, .. } => commit.record.as_ref(), 77 _ => None, 78 } 79 } 80} 81 82#[derive(Component)] 83pub struct NeedsParticle; 84 85/// Live = bloom outward, Historical = bloom inward. 86#[derive(Component, Debug, Clone, Copy, PartialEq)] 87pub enum EventSource { 88 Live, 89 Historical, 90} 91 92/// `t = (time_us - anchor_us) / MICROS_PER_TURN`. 1 t-unit = 1 minute. 93#[derive(Resource, Debug)] 94pub struct TimeWindow { 95 pub anchor_us: i64, 96 pub latest_t: f32, 97 pub is_initialized: bool, 98} 99 100impl TimeWindow { 101 pub const MICROS_PER_TURN: f64 = 60_000_000.0; 102 103 pub fn time_to_t(&self, time_us: i64) -> f32 { 104 ((time_us - self.anchor_us) as f64 / Self::MICROS_PER_TURN) as f32 105 } 106} 107 108/// Sorted, non-overlapping `(start_us, end_us)` intervals already loaded from SQLite. 109#[derive(Resource, Debug, Default)] 110pub struct LoadedRanges { 111 pub ranges: Vec<(i64, i64)>, 112} 113 114impl LoadedRanges { 115 pub fn is_covered(&self, start_us: i64, end_us: i64) -> bool { 116 self.ranges 117 .iter() 118 .any(|(lo, hi)| *lo <= start_us && *hi >= end_us) 119 } 120 121 pub fn insert(&mut self, start_us: i64, end_us: i64) { 122 self.ranges.push((start_us, end_us)); 123 self.ranges.sort_unstable_by_key(|r| r.0); 124 125 let mut merged: Vec<(i64, i64)> = Vec::with_capacity(self.ranges.len()); 126 for (lo, hi) in self.ranges.drain(..) { 127 if let Some(last) = merged.last_mut() { 128 if lo <= last.1 + 1 { 129 last.1 = last.1.max(hi); 130 continue; 131 } 132 } 133 merged.push((lo, hi)); 134 } 135 self.ranges = merged; 136 } 137} 138 139#[derive(Resource, Default)] 140pub struct HistoryQueryState { 141 pub pending: Option<Receiver<Vec<Vec<rusqlite::types::Value>>>>, 142 pub pending_range: Option<(i64, i64)>, 143} 144 145const SAMPLE_RATE: u64 = 10; 146const RING_BUFFER_CAPACITY: usize = 1024; 147 148fn emissive_mat( 149 materials: &mut Assets<StandardMaterial>, 150 base: Color, 151 emissive: LinearRgba, 152) -> Handle<StandardMaterial> { 153 materials.add(StandardMaterial { 154 base_color: base, 155 emissive, 156 ..default() 157 }) 158} 159 160pub fn setup_event_dots( 161 mut commands: Commands, 162 mut meshes: ResMut<Assets<Mesh>>, 163 mut materials: ResMut<Assets<StandardMaterial>>, 164) { 165 let mesh = meshes.add(Sphere::new(0.002).mesh().ico(2).unwrap()); 166 commands.insert_resource(EventDotAssets { 167 mesh, 168 post: emissive_mat( 169 &mut materials, 170 Color::srgb(0.08, 0.15, 0.4), 171 LinearRgba::new(0.2, 0.4, 1.5, 1.0), 172 ), 173 like: emissive_mat( 174 &mut materials, 175 Color::srgb(0.4, 0.08, 0.25), 176 LinearRgba::new(1.5, 0.2, 0.8, 1.0), 177 ), 178 follow: emissive_mat( 179 &mut materials, 180 Color::srgb(0.08, 0.4, 0.2), 181 LinearRgba::new(0.2, 1.5, 0.4, 1.0), 182 ), 183 shiny: emissive_mat( 184 &mut materials, 185 Color::srgb(0.4, 0.35, 0.08), 186 LinearRgba::new(1.5, 1.2, 0.2, 1.0), 187 ), 188 default_bsky: emissive_mat( 189 &mut materials, 190 Color::srgb(0.15, 0.15, 0.2), 191 LinearRgba::new(0.4, 0.4, 0.5, 1.0), 192 ), 193 }); 194} 195 196pub fn spawn_jetstream_consumer(mut commands: Commands, db_channel: Res<DbChannel>) { 197 use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; 198 199 let queue = Arc::new(ArrayQueue::<IngestEvent>::new(RING_BUFFER_CAPACITY)); 200 201 let anchor_us = Arc::new(AtomicI64::new(0)); 202 let latest_us = Arc::new(AtomicI64::new(0)); 203 let initialized = Arc::new(AtomicBool::new(false)); 204 205 commands.insert_resource(JetstreamChannels { 206 queue: queue.clone(), 207 }); 208 commands.insert_resource(SharedTimeState { 209 anchor_us: anchor_us.clone(), 210 latest_us: latest_us.clone(), 211 initialized: initialized.clone(), 212 }); 213 214 let queue_handle = queue; 215 let anchor_handle = anchor_us; 216 let latest_handle = latest_us; 217 let init_handle = initialized; 218 let db_writer = db_channel.writer.clone(); 219 let db_reader = db_channel.reader.clone(); 220 221 bevy::tasks::IoTaskPool::get() 222 .spawn(async move { 223 let client = AsyncTungsteniteClient; 224 225 // Query latest cursor from SQLite for reconnection. 226 let initial_cursor = { 227 let (resp_tx, resp_rx) = bounded::<Option<i64>>(1); 228 if db_reader 229 .send(DbReadRequest::GetLatestCursor { 230 response_tx: resp_tx, 231 }) 232 .is_err() 233 { 234 warn!("failed to send GetLatestCursor to SQLite worker"); 235 None 236 } else { 237 match resp_rx.recv() { 238 Ok(cursor) => { 239 if let Some(c) = cursor { 240 info!("resuming Jetstream from SQLite cursor: {c}"); 241 } 242 cursor 243 } 244 Err(_) => None, 245 } 246 } 247 }; 248 249 let base_uri = match jacquard::deps::fluent_uri::Uri::parse( 250 "wss://jetstream2.us-east.bsky.network".to_string(), 251 ) { 252 Ok(uri) => uri, 253 Err(_) => { 254 error!("failed to parse Jetstream URI"); 255 return; 256 } 257 }; 258 259 let sub_client = BasicSubscriptionClient::new(client, base_uri); 260 261 let params = JetstreamParams::<DefaultStr>::new() 262 .wanted_collections(vec![]) 263 .compress(false) 264 .build(); 265 266 let mut cursor: Option<i64> = initial_cursor; 267 let base_params = params; 268 let mut sample_counter: u64 = 0; 269 270 loop { 271 let mut reconnect_params = base_params.clone(); 272 if let Some(c) = cursor { 273 reconnect_params.cursor = Some(c); 274 } 275 276 match sub_client.subscribe(&reconnect_params).await { 277 Ok(subscription) => { 278 let (_sink, mut stream) = subscription.into_stream(); 279 280 let mut db_batch: Vec<DbEvent> = Vec::with_capacity(100); 281 let mut last_flush = std::time::Instant::now(); 282 283 while let Some(msg_result) = stream.next().await { 284 match msg_result { 285 Ok(msg) => { 286 let time_us = match &msg { 287 JetstreamMessage::Commit { time_us, .. } => *time_us, 288 JetstreamMessage::Identity { time_us, .. } => *time_us, 289 JetstreamMessage::Account { time_us, .. } => *time_us, 290 }; 291 cursor = Some(time_us); 292 293 if !init_handle.load(Ordering::Relaxed) { 294 anchor_handle.store(time_us, Ordering::Relaxed); 295 init_handle.store(true, Ordering::Release); 296 } 297 latest_handle.fetch_max(time_us, Ordering::Relaxed); 298 299 if let Some(db_event) = jetstream_to_db_event(&msg) { 300 db_batch.push(db_event); 301 } 302 303 let is_high_volume = match &msg { 304 JetstreamMessage::Commit { commit, .. } => { 305 let col: &str = commit.collection.as_ref(); 306 col == <PostRecord as Collection>::NSID 307 || col == <LikeRecord as Collection>::NSID 308 } 309 _ => false, 310 }; 311 312 let send_to_bevy = if is_high_volume { 313 sample_counter += 1; 314 sample_counter % SAMPLE_RATE == 0 315 } else { 316 true 317 }; 318 319 if send_to_bevy { 320 queue_handle.force_push(msg); 321 } 322 323 if db_batch.len() >= 100 324 || last_flush.elapsed() >= Duration::from_millis(500) 325 { 326 flush_db_batch(&db_writer, &mut db_batch); 327 last_flush = std::time::Instant::now(); 328 } 329 } 330 Err(e) => { 331 warn!("Jetstream stream error: {e:?}, reconnecting"); 332 flush_db_batch(&db_writer, &mut db_batch); 333 break; 334 } 335 } 336 } 337 } 338 Err(e) => { 339 warn!("Jetstream connection failed: {e:?}"); 340 } 341 } 342 343 futures_timer::Delay::new(Duration::from_secs(2)).await; 344 } 345 }) 346 .detach(); 347} 348 349fn flush_db_batch(db_writer: &crossbeam_channel::Sender<DbWriteRequest>, batch: &mut Vec<DbEvent>) { 350 if batch.is_empty() { 351 return; 352 } 353 let events = std::mem::take(batch); 354 if db_writer 355 .try_send(DbWriteRequest::WriteEvents(events)) 356 .is_err() 357 { 358 warn!("SQLite write channel full or closed, dropping batch"); 359 } 360} 361 362fn jetstream_to_db_event(msg: &JetstreamMessage) -> Option<DbEvent> { 363 let message_json = match serde_json::to_string(msg) { 364 Ok(json) => json, 365 Err(e) => { 366 warn!("failed to serialize JetstreamMessage to JSON: {}", e); 367 return None; 368 } 369 }; 370 371 let (kind, did, time_us, collection) = match msg { 372 JetstreamMessage::Commit { 373 did, 374 time_us, 375 commit, 376 } => ( 377 "commit".to_owned(), 378 did.clone(), 379 *time_us, 380 Some(commit.collection.clone()), 381 ), 382 JetstreamMessage::Identity { did, time_us, .. } => { 383 ("identity".to_owned(), did.clone(), *time_us, None) 384 } 385 JetstreamMessage::Account { did, time_us, .. } => { 386 ("account".to_owned(), did.clone(), *time_us, None) 387 } 388 }; 389 390 Some(DbEvent { 391 kind, 392 did, 393 time_us, 394 collection, 395 message_json, 396 }) 397} 398 399pub fn drain_jetstream_events( 400 mut commands: Commands, 401 channels: Res<JetstreamChannels>, 402 hierarchy: Res<HelixHierarchy>, 403 state: Res<HelixState>, 404 mut time_window: ResMut<TimeWindow>, 405 shared_time: Res<SharedTimeState>, 406 dot_assets: Res<EventDotAssets>, 407) { 408 use std::sync::atomic::Ordering; 409 410 if shared_time.initialized.load(Ordering::Acquire) { 411 if !time_window.is_initialized { 412 time_window.anchor_us = shared_time.anchor_us.load(Ordering::Relaxed); 413 time_window.is_initialized = true; 414 } 415 let latest_us = shared_time.latest_us.load(Ordering::Relaxed); 416 time_window.latest_t = time_window.time_to_t(latest_us); 417 } 418 419 // Paused: discard ring buffer (SQLite has everything for replay). 420 if !state.auto_follow { 421 while channels.queue.pop().is_some() {} 422 return; 423 } 424 425 // Too far behind latest_t — discard to avoid off-screen dots. 426 if time_window.is_initialized { 427 let lag = time_window.latest_t - state.focal_time; 428 if lag > 0.5 { 429 while channels.queue.pop().is_some() {} 430 return; 431 } 432 } 433 434 let mut spawned = 0u32; 435 while let Some(event) = channels.queue.pop() { 436 if spawned >= 20 { 437 break; 438 } 439 440 let time_us = match &event { 441 JetstreamMessage::Commit { time_us, .. } => *time_us, 442 JetstreamMessage::Identity { time_us, .. } => *time_us, 443 JetstreamMessage::Account { time_us, .. } => *time_us, 444 }; 445 446 let t = time_window.time_to_t(time_us); 447 let frame = helix::eval_coil(t, hierarchy.levels.len() - 1, &hierarchy); 448 449 let dot_mat = match &event { 450 JetstreamMessage::Commit { commit, .. } => { 451 let col: &str = commit.collection.as_ref(); 452 if col == <PostRecord as Collection>::NSID { 453 dot_assets.post.clone() 454 } else if col == <LikeRecord as Collection>::NSID { 455 dot_assets.like.clone() 456 } else if col == <FollowRecord as Collection>::NSID { 457 dot_assets.follow.clone() 458 } else if col.starts_with("app.bsky.") { 459 dot_assets.default_bsky.clone() 460 } else { 461 dot_assets.shiny.clone() 462 } 463 } 464 _ => dot_assets.shiny.clone(), 465 }; 466 467 commands.spawn(( 468 JetstreamEventMarker(event), 469 Transform::from_translation(frame.position), 470 Visibility::default(), 471 NeedsParticle, 472 EventSource::Live, 473 Mesh3d(dot_assets.mesh.clone()), 474 MeshMaterial3d(dot_mat), 475 )); 476 spawned += 1; 477 } 478} 479 480/// Startup system: query SQLite for recent events and spawn them as historical entities. 481pub fn load_historical_events( 482 mut commands: Commands, 483 db_channel: Res<DbChannel>, 484 hierarchy: Res<HelixHierarchy>, 485 state: Res<HelixState>, 486 mut time_window: ResMut<TimeWindow>, 487 mut loaded_ranges: ResMut<LoadedRanges>, 488) { 489 let latest_us = { 490 let (resp_tx, resp_rx) = bounded::<Option<i64>>(1); 491 if db_channel 492 .reader 493 .send(DbReadRequest::GetLatestCursor { 494 response_tx: resp_tx, 495 }) 496 .is_err() 497 { 498 warn!("load_historical_events: failed to send GetLatestCursor"); 499 return; 500 } 501 match resp_rx.recv() { 502 Ok(cursor) => cursor, 503 Err(_) => { 504 warn!("load_historical_events: SQLite worker closed before returning cursor"); 505 return; 506 } 507 } 508 }; 509 510 let Some(end_us) = latest_us else { 511 info!("load_historical_events: database is empty, skipping historical load"); 512 return; 513 }; 514 515 const HISTORY_WINDOW_US: i64 = 60_000_000; 516 let start_us = end_us - HISTORY_WINDOW_US; 517 518 info!( 519 "load_historical_events: loading events from {} to {} (1 hour window)", 520 start_us, end_us 521 ); 522 523 let (resp_tx, resp_rx) = bounded::<Vec<Vec<rusqlite::types::Value>>>(1); 524 if db_channel 525 .reader 526 .send(DbReadRequest::QueryTimeRange { 527 start_us, 528 end_us, 529 response_tx: resp_tx, 530 }) 531 .is_err() 532 { 533 warn!("load_historical_events: failed to send QueryTimeRange"); 534 return; 535 } 536 537 let rows = match resp_rx.recv() { 538 Ok(r) => r, 539 Err(_) => { 540 warn!("load_historical_events: SQLite worker closed before returning rows"); 541 return; 542 } 543 }; 544 545 info!( 546 "load_historical_events: received {} historical rows", 547 rows.len() 548 ); 549 550 if !time_window.is_initialized 551 && let Some(first_row) = rows.first() 552 && let Some(t) = row_time_us(first_row) 553 { 554 time_window.anchor_us = t; 555 time_window.is_initialized = true; 556 info!( 557 "load_historical_events: initialized TimeWindow anchor_us={}", 558 t 559 ); 560 } 561 562 let count = 563 spawn_historical_entities(&mut commands, &rows, &hierarchy, &state, &mut time_window); 564 565 info!( 566 "load_historical_events: spawned {} historical entities", 567 count 568 ); 569 570 loaded_ranges.insert(start_us, end_us); 571} 572 573pub fn poll_history_responses( 574 mut commands: Commands, 575 mut query_state: ResMut<HistoryQueryState>, 576 mut loaded_ranges: ResMut<LoadedRanges>, 577 hierarchy: Res<HelixHierarchy>, 578 state: Res<HelixState>, 579 mut time_window: ResMut<TimeWindow>, 580) { 581 let Some(receiver) = &query_state.pending else { 582 return; 583 }; 584 585 match receiver.try_recv() { 586 Ok(rows) => { 587 let count = spawn_historical_entities( 588 &mut commands, 589 &rows, 590 &hierarchy, 591 &state, 592 &mut time_window, 593 ); 594 info!( 595 "poll_history_responses: spawned {} historical entities from pan query", 596 count 597 ); 598 599 if let Some((start_us, end_us)) = query_state.pending_range.take() { 600 loaded_ranges.insert(start_us, end_us); 601 } 602 603 query_state.pending = None; 604 } 605 Err(crossbeam_channel::TryRecvError::Empty) => {} 606 607 Err(crossbeam_channel::TryRecvError::Disconnected) => { 608 warn!("poll_history_responses: SQLite worker closed response channel"); 609 query_state.pending = None; 610 query_state.pending_range = None; 611 } 612 } 613} 614 615pub fn row_time_us(row: &[rusqlite::types::Value]) -> Option<i64> { 616 match row.get(3)? { 617 rusqlite::types::Value::Integer(v) => Some(*v), 618 _ => None, 619 } 620} 621 622fn row_text(row: &[rusqlite::types::Value], idx: usize) -> Option<&str> { 623 match row.get(idx)? { 624 rusqlite::types::Value::Text(s) => Some(s.as_str()), 625 _ => None, 626 } 627} 628 629fn row_to_message(row: &[rusqlite::types::Value]) -> Option<JetstreamMessage> { 630 let json = row_text(row, 5)?; 631 match serde_json::from_str::<JetstreamMessage>(json) { 632 Ok(msg) => Some(msg), 633 Err(e) => { 634 warn!("row_to_message: failed to deserialize message_json: {}", e); 635 None 636 } 637 } 638} 639 640fn spawn_historical_entities( 641 commands: &mut Commands, 642 rows: &[Vec<rusqlite::types::Value>], 643 hierarchy: &HelixHierarchy, 644 state: &HelixState, 645 time_window: &mut TimeWindow, 646) -> usize { 647 let mut count = 0; 648 649 for row in rows { 650 let Some(time_us) = row_time_us(row) else { 651 warn!("spawn_historical_entities: row missing time_us, skipping"); 652 continue; 653 }; 654 655 let Some(msg) = row_to_message(row) else { 656 continue; 657 }; 658 659 if !time_window.is_initialized { 660 time_window.anchor_us = time_us; 661 time_window.is_initialized = true; 662 } 663 664 let t = time_window.time_to_t(time_us); 665 time_window.latest_t = time_window.latest_t.max(t); 666 667 let frame = helix::eval_coil(t, hierarchy.levels.len() - 1, hierarchy); 668 669 commands.spawn(( 670 JetstreamEventMarker(msg), 671 Transform::from_translation(frame.position), 672 Visibility::default(), 673 EventSource::Historical, 674 )); 675 676 count += 1; 677 } 678 679 count 680} 681 682#[cfg(test)] 683mod tests { 684 use super::*; 685 use jacquard::jetstream::{ 686 CommitOperation, JetstreamAccount, JetstreamCommit, JetstreamIdentity, 687 }; 688 use jacquard::types::string::{Cid, Datetime, Did, Nsid, Rkey}; 689 use std::str::FromStr; 690 691 692 #[test] 693 fn is_covered_empty_ranges_returns_false() { 694 let ranges = LoadedRanges::default(); 695 assert!(!ranges.is_covered(100, 200)); 696 } 697 698 #[test] 699 fn is_covered_exact_match() { 700 let mut ranges = LoadedRanges::default(); 701 ranges.insert(100, 200); 702 assert!(ranges.is_covered(100, 200)); 703 } 704 705 #[test] 706 fn is_covered_subset_of_loaded_range() { 707 let mut ranges = LoadedRanges::default(); 708 ranges.insert(0, 1000); 709 assert!(ranges.is_covered(100, 200)); 710 assert!(ranges.is_covered(0, 1000)); 711 assert!(ranges.is_covered(500, 999)); 712 } 713 714 #[test] 715 fn is_covered_partial_overlap_returns_false() { 716 let mut ranges = LoadedRanges::default(); 717 ranges.insert(100, 200); 718 assert!(!ranges.is_covered(50, 200)); 719 assert!(!ranges.is_covered(100, 250)); 720 assert!(!ranges.is_covered(50, 250)); 721 } 722 723 #[test] 724 fn is_covered_no_overlap_returns_false() { 725 let mut ranges = LoadedRanges::default(); 726 ranges.insert(100, 200); 727 assert!(!ranges.is_covered(300, 400)); 728 assert!(!ranges.is_covered(0, 50)); 729 } 730 731 732 #[test] 733 fn insert_single_range() { 734 let mut ranges = LoadedRanges::default(); 735 ranges.insert(100, 200); 736 assert_eq!(ranges.ranges, vec![(100, 200)]); 737 } 738 739 #[test] 740 fn insert_non_overlapping_ranges_stay_separate() { 741 let mut ranges = LoadedRanges::default(); 742 ranges.insert(100, 200); 743 ranges.insert(400, 500); 744 assert_eq!(ranges.ranges, vec![(100, 200), (400, 500)]); 745 } 746 747 #[test] 748 fn insert_overlapping_ranges_merge() { 749 let mut ranges = LoadedRanges::default(); 750 ranges.insert(100, 300); 751 ranges.insert(200, 400); 752 // Overlapping — should merge into a single interval. 753 assert_eq!(ranges.ranges, vec![(100, 400)]); 754 } 755 756 #[test] 757 fn insert_adjacent_ranges_merge() { 758 let mut ranges = LoadedRanges::default(); 759 ranges.insert(100, 200); 760 // Adjacent (200 + 1 == 201) — should merge. 761 ranges.insert(201, 300); 762 assert_eq!(ranges.ranges, vec![(100, 300)]); 763 } 764 765 #[test] 766 fn insert_idempotent() { 767 let mut ranges = LoadedRanges::default(); 768 ranges.insert(100, 200); 769 ranges.insert(100, 200); 770 // Identical insert should not add a duplicate. 771 assert_eq!(ranges.ranges, vec![(100, 200)]); 772 } 773 774 #[test] 775 fn insert_multiple_merges_into_one() { 776 let mut ranges = LoadedRanges::default(); 777 ranges.insert(100, 200); 778 ranges.insert(300, 400); 779 ranges.insert(500, 600); 780 // A new range that spans all three should collapse them all. 781 ranges.insert(150, 550); 782 assert_eq!(ranges.ranges, vec![(100, 600)]); 783 } 784 785 786 fn make_commit_message() -> JetstreamMessage { 787 let did = Did::new_static("did:plc:testuser123456789").expect("valid DID"); 788 let collection = Nsid::new_static("app.bsky.feed.post").expect("valid NSID"); 789 let rkey = Rkey::new_static("3jui7kd54c2").expect("valid rkey"); 790 791 let commit = JetstreamCommit { 792 rev: jacquard::deps::smol_str::SmolStr::new_static("rev1"), 793 operation: jacquard::jetstream::CommitOperation::Create, 794 collection, 795 rkey, 796 record: None, 797 cid: None, 798 }; 799 800 JetstreamMessage::Commit { 801 did, 802 time_us: 1_700_000_000_000_000, 803 commit, 804 } 805 } 806 807 fn make_identity_message() -> JetstreamMessage { 808 let did = Did::new_static("did:plc:testuser123456789").expect("valid DID"); 809 let identity = JetstreamIdentity { 810 did: did.clone(), 811 handle: None, 812 seq: 42, 813 time: Datetime::now(), 814 }; 815 JetstreamMessage::Identity { 816 did, 817 time_us: 1_700_000_000_000_001, 818 identity, 819 } 820 } 821 822 fn make_account_message() -> JetstreamMessage { 823 let did = Did::new_static("did:plc:testuser123456789").expect("valid DID"); 824 let account = JetstreamAccount { 825 did: did.clone(), 826 active: false, 827 seq: 99, 828 time: Datetime::now(), 829 status: None, 830 }; 831 JetstreamMessage::Account { 832 did, 833 time_us: 1_700_000_000_000_002, 834 account, 835 } 836 } 837 838 #[test] 839 fn commit_message_roundtrip() { 840 let original = make_commit_message(); 841 let json = serde_json::to_string(&original).expect("serialize"); 842 let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize"); 843 assert_eq!(original, decoded); 844 } 845 846 #[test] 847 fn commit_with_populated_fields_roundtrip() { 848 let did = Did::new_static("did:plc:testuser123456789").expect("valid DID"); 849 let collection = Nsid::new_static("app.bsky.feed.post").expect("valid NSID"); 850 let rkey = Rkey::new_static("3jui7kd54c2").expect("valid rkey"); 851 let cid = Cid::from_str("bafyreihffx5ateunq6ghc7q7x2s3stoabwrq3cq").unwrap(); 852 853 // Build a Data value from JSON — avoids dealing with jacquard's Object wrapper directly 854 let record: jacquard::Data = serde_json::from_str( 855 r#"{"text":"hello from the test","createdAt":"2024-01-01T00:00:00Z"}"#, 856 ) 857 .expect("valid JSON → Data"); 858 859 let commit = JetstreamCommit { 860 rev: jacquard::deps::smol_str::SmolStr::new_static("rev1"), 861 operation: CommitOperation::Create, 862 collection, 863 rkey, 864 record: Some(record), 865 cid: Some(cid), 866 }; 867 868 let original = JetstreamMessage::Commit { 869 did, 870 time_us: 1_700_000_000_000_000, 871 commit, 872 }; 873 let json = serde_json::to_string(&original).expect("serialize"); 874 let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize"); 875 assert_eq!(original, decoded); 876 } 877 878 #[test] 879 fn identity_message_roundtrip() { 880 let original = make_identity_message(); 881 let json = serde_json::to_string(&original).expect("serialize"); 882 let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize"); 883 assert_eq!(original, decoded); 884 } 885 886 #[test] 887 fn account_message_roundtrip() { 888 let original = make_account_message(); 889 let json = serde_json::to_string(&original).expect("serialize"); 890 let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize"); 891 assert_eq!(original, decoded); 892 } 893 894 /// Verify that `jetstream_to_db_event` followed by `row_to_message` is a 895 /// lossless round-trip. We can't go through real SQLite in a unit test, so 896 /// we exercise the JSON encode/decode path directly. 897 #[test] 898 fn jetstream_to_db_event_round_trips_through_json() { 899 let original = make_commit_message(); 900 let db_event = jetstream_to_db_event(&original).expect("should produce DbEvent"); 901 902 // Simulate the DB row: id=0, kind=1, did=2, time_us=3, collection=4, message_json=5 903 let row = vec![ 904 rusqlite::types::Value::Integer(1), 905 rusqlite::types::Value::Text(db_event.kind.clone()), 906 rusqlite::types::Value::Text(db_event.did.as_str().to_owned()), 907 rusqlite::types::Value::Integer(db_event.time_us), 908 rusqlite::types::Value::Text( 909 db_event 910 .collection 911 .as_ref() 912 .map(|c| c.as_str().to_owned()) 913 .unwrap_or_default(), 914 ), 915 rusqlite::types::Value::Text(db_event.message_json.clone()), 916 ]; 917 918 let decoded = row_to_message(&row).expect("should decode row"); 919 assert_eq!(original, decoded); 920 } 921}