this repo has no description
1use bevy::prelude::*;
2use crossbeam_channel::{Receiver, bounded};
3use crossbeam_queue::ArrayQueue;
4use futures_util::StreamExt;
5use jacquard::DefaultStr;
6use jacquard::jetstream::{JetstreamMessage, JetstreamParams};
7use jacquard::types::string::{Did, Nsid};
8use jacquard::xrpc::{BasicSubscriptionClient, SubscriptionClient};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicI64};
11use std::time::Duration;
12
13use crate::db::{DbChannel, DbEvent, DbReadRequest, DbWriteRequest};
14use crate::helix::{self, HelixHierarchy, HelixState};
15use crate::net::AsyncTungsteniteClient;
16
17use jacquard::api::app_bsky::feed::like::LikeRecord;
18use jacquard::api::app_bsky::feed::post::PostRecord;
19use jacquard::api::app_bsky::graph::follow::FollowRecord;
20use jacquard::common::types::collection::Collection;
21
22pub type IngestEvent = JetstreamMessage;
23
24/// Lock-free ring buffer: producer force_push, consumer pops per frame.
25#[derive(Resource)]
26pub struct JetstreamChannels {
27 pub queue: Arc<ArrayQueue<IngestEvent>>,
28}
29
30#[derive(Resource)]
31pub struct EventDotAssets {
32 pub mesh: Handle<Mesh>,
33 pub post: Handle<StandardMaterial>,
34 pub like: Handle<StandardMaterial>,
35 pub follow: Handle<StandardMaterial>,
36 pub shiny: Handle<StandardMaterial>,
37 pub default_bsky: Handle<StandardMaterial>,
38}
39
40#[derive(Resource)]
41pub struct SharedTimeState {
42 pub anchor_us: Arc<AtomicI64>,
43 pub latest_us: Arc<AtomicI64>,
44 pub initialized: Arc<AtomicBool>,
45}
46
47#[derive(Component, Debug, Clone)]
48pub struct JetstreamEventMarker(pub JetstreamMessage);
49
50impl JetstreamEventMarker {
51 pub fn did(&self) -> &Did {
52 match &self.0 {
53 JetstreamMessage::Commit { did, .. } => did,
54 JetstreamMessage::Identity { did, .. } => did,
55 JetstreamMessage::Account { did, .. } => did,
56 }
57 }
58
59 pub fn time_us(&self) -> i64 {
60 match &self.0 {
61 JetstreamMessage::Commit { time_us, .. } => *time_us,
62 JetstreamMessage::Identity { time_us, .. } => *time_us,
63 JetstreamMessage::Account { time_us, .. } => *time_us,
64 }
65 }
66
67 pub fn collection(&self) -> Option<&Nsid> {
68 match &self.0 {
69 JetstreamMessage::Commit { commit, .. } => Some(&commit.collection),
70 _ => None,
71 }
72 }
73
74 pub fn record(&self) -> Option<&jacquard::Data> {
75 match &self.0 {
76 JetstreamMessage::Commit { commit, .. } => commit.record.as_ref(),
77 _ => None,
78 }
79 }
80}
81
82#[derive(Component)]
83pub struct NeedsParticle;
84
85/// Live = bloom outward, Historical = bloom inward.
86#[derive(Component, Debug, Clone, Copy, PartialEq)]
87pub enum EventSource {
88 Live,
89 Historical,
90}
91
92/// `t = (time_us - anchor_us) / MICROS_PER_TURN`. 1 t-unit = 1 minute.
93#[derive(Resource, Debug)]
94pub struct TimeWindow {
95 pub anchor_us: i64,
96 pub latest_t: f32,
97 pub is_initialized: bool,
98}
99
100impl TimeWindow {
101 pub const MICROS_PER_TURN: f64 = 60_000_000.0;
102
103 pub fn time_to_t(&self, time_us: i64) -> f32 {
104 ((time_us - self.anchor_us) as f64 / Self::MICROS_PER_TURN) as f32
105 }
106}
107
108/// Sorted, non-overlapping `(start_us, end_us)` intervals already loaded from SQLite.
109#[derive(Resource, Debug, Default)]
110pub struct LoadedRanges {
111 pub ranges: Vec<(i64, i64)>,
112}
113
114impl LoadedRanges {
115 pub fn is_covered(&self, start_us: i64, end_us: i64) -> bool {
116 self.ranges
117 .iter()
118 .any(|(lo, hi)| *lo <= start_us && *hi >= end_us)
119 }
120
121 pub fn insert(&mut self, start_us: i64, end_us: i64) {
122 self.ranges.push((start_us, end_us));
123 self.ranges.sort_unstable_by_key(|r| r.0);
124
125 let mut merged: Vec<(i64, i64)> = Vec::with_capacity(self.ranges.len());
126 for (lo, hi) in self.ranges.drain(..) {
127 if let Some(last) = merged.last_mut() {
128 if lo <= last.1 + 1 {
129 last.1 = last.1.max(hi);
130 continue;
131 }
132 }
133 merged.push((lo, hi));
134 }
135 self.ranges = merged;
136 }
137}
138
139#[derive(Resource, Default)]
140pub struct HistoryQueryState {
141 pub pending: Option<Receiver<Vec<Vec<rusqlite::types::Value>>>>,
142 pub pending_range: Option<(i64, i64)>,
143}
144
145const SAMPLE_RATE: u64 = 10;
146const RING_BUFFER_CAPACITY: usize = 1024;
147
148fn emissive_mat(
149 materials: &mut Assets<StandardMaterial>,
150 base: Color,
151 emissive: LinearRgba,
152) -> Handle<StandardMaterial> {
153 materials.add(StandardMaterial {
154 base_color: base,
155 emissive,
156 ..default()
157 })
158}
159
160pub fn setup_event_dots(
161 mut commands: Commands,
162 mut meshes: ResMut<Assets<Mesh>>,
163 mut materials: ResMut<Assets<StandardMaterial>>,
164) {
165 let mesh = meshes.add(Sphere::new(0.002).mesh().ico(2).unwrap());
166 commands.insert_resource(EventDotAssets {
167 mesh,
168 post: emissive_mat(
169 &mut materials,
170 Color::srgb(0.08, 0.15, 0.4),
171 LinearRgba::new(0.2, 0.4, 1.5, 1.0),
172 ),
173 like: emissive_mat(
174 &mut materials,
175 Color::srgb(0.4, 0.08, 0.25),
176 LinearRgba::new(1.5, 0.2, 0.8, 1.0),
177 ),
178 follow: emissive_mat(
179 &mut materials,
180 Color::srgb(0.08, 0.4, 0.2),
181 LinearRgba::new(0.2, 1.5, 0.4, 1.0),
182 ),
183 shiny: emissive_mat(
184 &mut materials,
185 Color::srgb(0.4, 0.35, 0.08),
186 LinearRgba::new(1.5, 1.2, 0.2, 1.0),
187 ),
188 default_bsky: emissive_mat(
189 &mut materials,
190 Color::srgb(0.15, 0.15, 0.2),
191 LinearRgba::new(0.4, 0.4, 0.5, 1.0),
192 ),
193 });
194}
195
196pub fn spawn_jetstream_consumer(mut commands: Commands, db_channel: Res<DbChannel>) {
197 use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
198
199 let queue = Arc::new(ArrayQueue::<IngestEvent>::new(RING_BUFFER_CAPACITY));
200
201 let anchor_us = Arc::new(AtomicI64::new(0));
202 let latest_us = Arc::new(AtomicI64::new(0));
203 let initialized = Arc::new(AtomicBool::new(false));
204
205 commands.insert_resource(JetstreamChannels {
206 queue: queue.clone(),
207 });
208 commands.insert_resource(SharedTimeState {
209 anchor_us: anchor_us.clone(),
210 latest_us: latest_us.clone(),
211 initialized: initialized.clone(),
212 });
213
214 let queue_handle = queue;
215 let anchor_handle = anchor_us;
216 let latest_handle = latest_us;
217 let init_handle = initialized;
218 let db_writer = db_channel.writer.clone();
219 let db_reader = db_channel.reader.clone();
220
221 bevy::tasks::IoTaskPool::get()
222 .spawn(async move {
223 let client = AsyncTungsteniteClient;
224
225 // Query latest cursor from SQLite for reconnection.
226 let initial_cursor = {
227 let (resp_tx, resp_rx) = bounded::<Option<i64>>(1);
228 if db_reader
229 .send(DbReadRequest::GetLatestCursor {
230 response_tx: resp_tx,
231 })
232 .is_err()
233 {
234 warn!("failed to send GetLatestCursor to SQLite worker");
235 None
236 } else {
237 match resp_rx.recv() {
238 Ok(cursor) => {
239 if let Some(c) = cursor {
240 info!("resuming Jetstream from SQLite cursor: {c}");
241 }
242 cursor
243 }
244 Err(_) => None,
245 }
246 }
247 };
248
249 let base_uri = match jacquard::deps::fluent_uri::Uri::parse(
250 "wss://jetstream2.us-east.bsky.network".to_string(),
251 ) {
252 Ok(uri) => uri,
253 Err(_) => {
254 error!("failed to parse Jetstream URI");
255 return;
256 }
257 };
258
259 let sub_client = BasicSubscriptionClient::new(client, base_uri);
260
261 let params = JetstreamParams::<DefaultStr>::new()
262 .wanted_collections(vec![])
263 .compress(false)
264 .build();
265
266 let mut cursor: Option<i64> = initial_cursor;
267 let base_params = params;
268 let mut sample_counter: u64 = 0;
269
270 loop {
271 let mut reconnect_params = base_params.clone();
272 if let Some(c) = cursor {
273 reconnect_params.cursor = Some(c);
274 }
275
276 match sub_client.subscribe(&reconnect_params).await {
277 Ok(subscription) => {
278 let (_sink, mut stream) = subscription.into_stream();
279
280 let mut db_batch: Vec<DbEvent> = Vec::with_capacity(100);
281 let mut last_flush = std::time::Instant::now();
282
283 while let Some(msg_result) = stream.next().await {
284 match msg_result {
285 Ok(msg) => {
286 let time_us = match &msg {
287 JetstreamMessage::Commit { time_us, .. } => *time_us,
288 JetstreamMessage::Identity { time_us, .. } => *time_us,
289 JetstreamMessage::Account { time_us, .. } => *time_us,
290 };
291 cursor = Some(time_us);
292
293 if !init_handle.load(Ordering::Relaxed) {
294 anchor_handle.store(time_us, Ordering::Relaxed);
295 init_handle.store(true, Ordering::Release);
296 }
297 latest_handle.fetch_max(time_us, Ordering::Relaxed);
298
299 if let Some(db_event) = jetstream_to_db_event(&msg) {
300 db_batch.push(db_event);
301 }
302
303 let is_high_volume = match &msg {
304 JetstreamMessage::Commit { commit, .. } => {
305 let col: &str = commit.collection.as_ref();
306 col == <PostRecord as Collection>::NSID
307 || col == <LikeRecord as Collection>::NSID
308 }
309 _ => false,
310 };
311
312 let send_to_bevy = if is_high_volume {
313 sample_counter += 1;
314 sample_counter % SAMPLE_RATE == 0
315 } else {
316 true
317 };
318
319 if send_to_bevy {
320 queue_handle.force_push(msg);
321 }
322
323 if db_batch.len() >= 100
324 || last_flush.elapsed() >= Duration::from_millis(500)
325 {
326 flush_db_batch(&db_writer, &mut db_batch);
327 last_flush = std::time::Instant::now();
328 }
329 }
330 Err(e) => {
331 warn!("Jetstream stream error: {e:?}, reconnecting");
332 flush_db_batch(&db_writer, &mut db_batch);
333 break;
334 }
335 }
336 }
337 }
338 Err(e) => {
339 warn!("Jetstream connection failed: {e:?}");
340 }
341 }
342
343 futures_timer::Delay::new(Duration::from_secs(2)).await;
344 }
345 })
346 .detach();
347}
348
349fn flush_db_batch(db_writer: &crossbeam_channel::Sender<DbWriteRequest>, batch: &mut Vec<DbEvent>) {
350 if batch.is_empty() {
351 return;
352 }
353 let events = std::mem::take(batch);
354 if db_writer
355 .try_send(DbWriteRequest::WriteEvents(events))
356 .is_err()
357 {
358 warn!("SQLite write channel full or closed, dropping batch");
359 }
360}
361
362fn jetstream_to_db_event(msg: &JetstreamMessage) -> Option<DbEvent> {
363 let message_json = match serde_json::to_string(msg) {
364 Ok(json) => json,
365 Err(e) => {
366 warn!("failed to serialize JetstreamMessage to JSON: {}", e);
367 return None;
368 }
369 };
370
371 let (kind, did, time_us, collection) = match msg {
372 JetstreamMessage::Commit {
373 did,
374 time_us,
375 commit,
376 } => (
377 "commit".to_owned(),
378 did.clone(),
379 *time_us,
380 Some(commit.collection.clone()),
381 ),
382 JetstreamMessage::Identity { did, time_us, .. } => {
383 ("identity".to_owned(), did.clone(), *time_us, None)
384 }
385 JetstreamMessage::Account { did, time_us, .. } => {
386 ("account".to_owned(), did.clone(), *time_us, None)
387 }
388 };
389
390 Some(DbEvent {
391 kind,
392 did,
393 time_us,
394 collection,
395 message_json,
396 })
397}
398
399pub fn drain_jetstream_events(
400 mut commands: Commands,
401 channels: Res<JetstreamChannels>,
402 hierarchy: Res<HelixHierarchy>,
403 state: Res<HelixState>,
404 mut time_window: ResMut<TimeWindow>,
405 shared_time: Res<SharedTimeState>,
406 dot_assets: Res<EventDotAssets>,
407) {
408 use std::sync::atomic::Ordering;
409
410 if shared_time.initialized.load(Ordering::Acquire) {
411 if !time_window.is_initialized {
412 time_window.anchor_us = shared_time.anchor_us.load(Ordering::Relaxed);
413 time_window.is_initialized = true;
414 }
415 let latest_us = shared_time.latest_us.load(Ordering::Relaxed);
416 time_window.latest_t = time_window.time_to_t(latest_us);
417 }
418
419 // Paused: discard ring buffer (SQLite has everything for replay).
420 if !state.auto_follow {
421 while channels.queue.pop().is_some() {}
422 return;
423 }
424
425 // Too far behind latest_t — discard to avoid off-screen dots.
426 if time_window.is_initialized {
427 let lag = time_window.latest_t - state.focal_time;
428 if lag > 0.5 {
429 while channels.queue.pop().is_some() {}
430 return;
431 }
432 }
433
434 let mut spawned = 0u32;
435 while let Some(event) = channels.queue.pop() {
436 if spawned >= 20 {
437 break;
438 }
439
440 let time_us = match &event {
441 JetstreamMessage::Commit { time_us, .. } => *time_us,
442 JetstreamMessage::Identity { time_us, .. } => *time_us,
443 JetstreamMessage::Account { time_us, .. } => *time_us,
444 };
445
446 let t = time_window.time_to_t(time_us);
447 let frame = helix::eval_coil(t, hierarchy.levels.len() - 1, &hierarchy);
448
449 let dot_mat = match &event {
450 JetstreamMessage::Commit { commit, .. } => {
451 let col: &str = commit.collection.as_ref();
452 if col == <PostRecord as Collection>::NSID {
453 dot_assets.post.clone()
454 } else if col == <LikeRecord as Collection>::NSID {
455 dot_assets.like.clone()
456 } else if col == <FollowRecord as Collection>::NSID {
457 dot_assets.follow.clone()
458 } else if col.starts_with("app.bsky.") {
459 dot_assets.default_bsky.clone()
460 } else {
461 dot_assets.shiny.clone()
462 }
463 }
464 _ => dot_assets.shiny.clone(),
465 };
466
467 commands.spawn((
468 JetstreamEventMarker(event),
469 Transform::from_translation(frame.position),
470 Visibility::default(),
471 NeedsParticle,
472 EventSource::Live,
473 Mesh3d(dot_assets.mesh.clone()),
474 MeshMaterial3d(dot_mat),
475 ));
476 spawned += 1;
477 }
478}
479
480/// Startup system: query SQLite for recent events and spawn them as historical entities.
481pub fn load_historical_events(
482 mut commands: Commands,
483 db_channel: Res<DbChannel>,
484 hierarchy: Res<HelixHierarchy>,
485 state: Res<HelixState>,
486 mut time_window: ResMut<TimeWindow>,
487 mut loaded_ranges: ResMut<LoadedRanges>,
488) {
489 let latest_us = {
490 let (resp_tx, resp_rx) = bounded::<Option<i64>>(1);
491 if db_channel
492 .reader
493 .send(DbReadRequest::GetLatestCursor {
494 response_tx: resp_tx,
495 })
496 .is_err()
497 {
498 warn!("load_historical_events: failed to send GetLatestCursor");
499 return;
500 }
501 match resp_rx.recv() {
502 Ok(cursor) => cursor,
503 Err(_) => {
504 warn!("load_historical_events: SQLite worker closed before returning cursor");
505 return;
506 }
507 }
508 };
509
510 let Some(end_us) = latest_us else {
511 info!("load_historical_events: database is empty, skipping historical load");
512 return;
513 };
514
515 const HISTORY_WINDOW_US: i64 = 60_000_000;
516 let start_us = end_us - HISTORY_WINDOW_US;
517
518 info!(
519 "load_historical_events: loading events from {} to {} (1 hour window)",
520 start_us, end_us
521 );
522
523 let (resp_tx, resp_rx) = bounded::<Vec<Vec<rusqlite::types::Value>>>(1);
524 if db_channel
525 .reader
526 .send(DbReadRequest::QueryTimeRange {
527 start_us,
528 end_us,
529 response_tx: resp_tx,
530 })
531 .is_err()
532 {
533 warn!("load_historical_events: failed to send QueryTimeRange");
534 return;
535 }
536
537 let rows = match resp_rx.recv() {
538 Ok(r) => r,
539 Err(_) => {
540 warn!("load_historical_events: SQLite worker closed before returning rows");
541 return;
542 }
543 };
544
545 info!(
546 "load_historical_events: received {} historical rows",
547 rows.len()
548 );
549
550 if !time_window.is_initialized
551 && let Some(first_row) = rows.first()
552 && let Some(t) = row_time_us(first_row)
553 {
554 time_window.anchor_us = t;
555 time_window.is_initialized = true;
556 info!(
557 "load_historical_events: initialized TimeWindow anchor_us={}",
558 t
559 );
560 }
561
562 let count =
563 spawn_historical_entities(&mut commands, &rows, &hierarchy, &state, &mut time_window);
564
565 info!(
566 "load_historical_events: spawned {} historical entities",
567 count
568 );
569
570 loaded_ranges.insert(start_us, end_us);
571}
572
573pub fn poll_history_responses(
574 mut commands: Commands,
575 mut query_state: ResMut<HistoryQueryState>,
576 mut loaded_ranges: ResMut<LoadedRanges>,
577 hierarchy: Res<HelixHierarchy>,
578 state: Res<HelixState>,
579 mut time_window: ResMut<TimeWindow>,
580) {
581 let Some(receiver) = &query_state.pending else {
582 return;
583 };
584
585 match receiver.try_recv() {
586 Ok(rows) => {
587 let count = spawn_historical_entities(
588 &mut commands,
589 &rows,
590 &hierarchy,
591 &state,
592 &mut time_window,
593 );
594 info!(
595 "poll_history_responses: spawned {} historical entities from pan query",
596 count
597 );
598
599 if let Some((start_us, end_us)) = query_state.pending_range.take() {
600 loaded_ranges.insert(start_us, end_us);
601 }
602
603 query_state.pending = None;
604 }
605 Err(crossbeam_channel::TryRecvError::Empty) => {}
606
607 Err(crossbeam_channel::TryRecvError::Disconnected) => {
608 warn!("poll_history_responses: SQLite worker closed response channel");
609 query_state.pending = None;
610 query_state.pending_range = None;
611 }
612 }
613}
614
615pub fn row_time_us(row: &[rusqlite::types::Value]) -> Option<i64> {
616 match row.get(3)? {
617 rusqlite::types::Value::Integer(v) => Some(*v),
618 _ => None,
619 }
620}
621
622fn row_text(row: &[rusqlite::types::Value], idx: usize) -> Option<&str> {
623 match row.get(idx)? {
624 rusqlite::types::Value::Text(s) => Some(s.as_str()),
625 _ => None,
626 }
627}
628
629fn row_to_message(row: &[rusqlite::types::Value]) -> Option<JetstreamMessage> {
630 let json = row_text(row, 5)?;
631 match serde_json::from_str::<JetstreamMessage>(json) {
632 Ok(msg) => Some(msg),
633 Err(e) => {
634 warn!("row_to_message: failed to deserialize message_json: {}", e);
635 None
636 }
637 }
638}
639
640fn spawn_historical_entities(
641 commands: &mut Commands,
642 rows: &[Vec<rusqlite::types::Value>],
643 hierarchy: &HelixHierarchy,
644 state: &HelixState,
645 time_window: &mut TimeWindow,
646) -> usize {
647 let mut count = 0;
648
649 for row in rows {
650 let Some(time_us) = row_time_us(row) else {
651 warn!("spawn_historical_entities: row missing time_us, skipping");
652 continue;
653 };
654
655 let Some(msg) = row_to_message(row) else {
656 continue;
657 };
658
659 if !time_window.is_initialized {
660 time_window.anchor_us = time_us;
661 time_window.is_initialized = true;
662 }
663
664 let t = time_window.time_to_t(time_us);
665 time_window.latest_t = time_window.latest_t.max(t);
666
667 let frame = helix::eval_coil(t, hierarchy.levels.len() - 1, hierarchy);
668
669 commands.spawn((
670 JetstreamEventMarker(msg),
671 Transform::from_translation(frame.position),
672 Visibility::default(),
673 EventSource::Historical,
674 ));
675
676 count += 1;
677 }
678
679 count
680}
681
682#[cfg(test)]
683mod tests {
684 use super::*;
685 use jacquard::jetstream::{
686 CommitOperation, JetstreamAccount, JetstreamCommit, JetstreamIdentity,
687 };
688 use jacquard::types::string::{Cid, Datetime, Did, Nsid, Rkey};
689 use std::str::FromStr;
690
691
692 #[test]
693 fn is_covered_empty_ranges_returns_false() {
694 let ranges = LoadedRanges::default();
695 assert!(!ranges.is_covered(100, 200));
696 }
697
698 #[test]
699 fn is_covered_exact_match() {
700 let mut ranges = LoadedRanges::default();
701 ranges.insert(100, 200);
702 assert!(ranges.is_covered(100, 200));
703 }
704
705 #[test]
706 fn is_covered_subset_of_loaded_range() {
707 let mut ranges = LoadedRanges::default();
708 ranges.insert(0, 1000);
709 assert!(ranges.is_covered(100, 200));
710 assert!(ranges.is_covered(0, 1000));
711 assert!(ranges.is_covered(500, 999));
712 }
713
714 #[test]
715 fn is_covered_partial_overlap_returns_false() {
716 let mut ranges = LoadedRanges::default();
717 ranges.insert(100, 200);
718 assert!(!ranges.is_covered(50, 200));
719 assert!(!ranges.is_covered(100, 250));
720 assert!(!ranges.is_covered(50, 250));
721 }
722
723 #[test]
724 fn is_covered_no_overlap_returns_false() {
725 let mut ranges = LoadedRanges::default();
726 ranges.insert(100, 200);
727 assert!(!ranges.is_covered(300, 400));
728 assert!(!ranges.is_covered(0, 50));
729 }
730
731
732 #[test]
733 fn insert_single_range() {
734 let mut ranges = LoadedRanges::default();
735 ranges.insert(100, 200);
736 assert_eq!(ranges.ranges, vec![(100, 200)]);
737 }
738
739 #[test]
740 fn insert_non_overlapping_ranges_stay_separate() {
741 let mut ranges = LoadedRanges::default();
742 ranges.insert(100, 200);
743 ranges.insert(400, 500);
744 assert_eq!(ranges.ranges, vec![(100, 200), (400, 500)]);
745 }
746
747 #[test]
748 fn insert_overlapping_ranges_merge() {
749 let mut ranges = LoadedRanges::default();
750 ranges.insert(100, 300);
751 ranges.insert(200, 400);
752 // Overlapping — should merge into a single interval.
753 assert_eq!(ranges.ranges, vec![(100, 400)]);
754 }
755
756 #[test]
757 fn insert_adjacent_ranges_merge() {
758 let mut ranges = LoadedRanges::default();
759 ranges.insert(100, 200);
760 // Adjacent (200 + 1 == 201) — should merge.
761 ranges.insert(201, 300);
762 assert_eq!(ranges.ranges, vec![(100, 300)]);
763 }
764
765 #[test]
766 fn insert_idempotent() {
767 let mut ranges = LoadedRanges::default();
768 ranges.insert(100, 200);
769 ranges.insert(100, 200);
770 // Identical insert should not add a duplicate.
771 assert_eq!(ranges.ranges, vec![(100, 200)]);
772 }
773
774 #[test]
775 fn insert_multiple_merges_into_one() {
776 let mut ranges = LoadedRanges::default();
777 ranges.insert(100, 200);
778 ranges.insert(300, 400);
779 ranges.insert(500, 600);
780 // A new range that spans all three should collapse them all.
781 ranges.insert(150, 550);
782 assert_eq!(ranges.ranges, vec![(100, 600)]);
783 }
784
785
786 fn make_commit_message() -> JetstreamMessage {
787 let did = Did::new_static("did:plc:testuser123456789").expect("valid DID");
788 let collection = Nsid::new_static("app.bsky.feed.post").expect("valid NSID");
789 let rkey = Rkey::new_static("3jui7kd54c2").expect("valid rkey");
790
791 let commit = JetstreamCommit {
792 rev: jacquard::deps::smol_str::SmolStr::new_static("rev1"),
793 operation: jacquard::jetstream::CommitOperation::Create,
794 collection,
795 rkey,
796 record: None,
797 cid: None,
798 };
799
800 JetstreamMessage::Commit {
801 did,
802 time_us: 1_700_000_000_000_000,
803 commit,
804 }
805 }
806
807 fn make_identity_message() -> JetstreamMessage {
808 let did = Did::new_static("did:plc:testuser123456789").expect("valid DID");
809 let identity = JetstreamIdentity {
810 did: did.clone(),
811 handle: None,
812 seq: 42,
813 time: Datetime::now(),
814 };
815 JetstreamMessage::Identity {
816 did,
817 time_us: 1_700_000_000_000_001,
818 identity,
819 }
820 }
821
822 fn make_account_message() -> JetstreamMessage {
823 let did = Did::new_static("did:plc:testuser123456789").expect("valid DID");
824 let account = JetstreamAccount {
825 did: did.clone(),
826 active: false,
827 seq: 99,
828 time: Datetime::now(),
829 status: None,
830 };
831 JetstreamMessage::Account {
832 did,
833 time_us: 1_700_000_000_000_002,
834 account,
835 }
836 }
837
838 #[test]
839 fn commit_message_roundtrip() {
840 let original = make_commit_message();
841 let json = serde_json::to_string(&original).expect("serialize");
842 let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize");
843 assert_eq!(original, decoded);
844 }
845
846 #[test]
847 fn commit_with_populated_fields_roundtrip() {
848 let did = Did::new_static("did:plc:testuser123456789").expect("valid DID");
849 let collection = Nsid::new_static("app.bsky.feed.post").expect("valid NSID");
850 let rkey = Rkey::new_static("3jui7kd54c2").expect("valid rkey");
851 let cid = Cid::from_str("bafyreihffx5ateunq6ghc7q7x2s3stoabwrq3cq").unwrap();
852
853 // Build a Data value from JSON — avoids dealing with jacquard's Object wrapper directly
854 let record: jacquard::Data = serde_json::from_str(
855 r#"{"text":"hello from the test","createdAt":"2024-01-01T00:00:00Z"}"#,
856 )
857 .expect("valid JSON → Data");
858
859 let commit = JetstreamCommit {
860 rev: jacquard::deps::smol_str::SmolStr::new_static("rev1"),
861 operation: CommitOperation::Create,
862 collection,
863 rkey,
864 record: Some(record),
865 cid: Some(cid),
866 };
867
868 let original = JetstreamMessage::Commit {
869 did,
870 time_us: 1_700_000_000_000_000,
871 commit,
872 };
873 let json = serde_json::to_string(&original).expect("serialize");
874 let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize");
875 assert_eq!(original, decoded);
876 }
877
878 #[test]
879 fn identity_message_roundtrip() {
880 let original = make_identity_message();
881 let json = serde_json::to_string(&original).expect("serialize");
882 let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize");
883 assert_eq!(original, decoded);
884 }
885
886 #[test]
887 fn account_message_roundtrip() {
888 let original = make_account_message();
889 let json = serde_json::to_string(&original).expect("serialize");
890 let decoded: JetstreamMessage = serde_json::from_str(&json).expect("deserialize");
891 assert_eq!(original, decoded);
892 }
893
894 /// Verify that `jetstream_to_db_event` followed by `row_to_message` is a
895 /// lossless round-trip. We can't go through real SQLite in a unit test, so
896 /// we exercise the JSON encode/decode path directly.
897 #[test]
898 fn jetstream_to_db_event_round_trips_through_json() {
899 let original = make_commit_message();
900 let db_event = jetstream_to_db_event(&original).expect("should produce DbEvent");
901
902 // Simulate the DB row: id=0, kind=1, did=2, time_us=3, collection=4, message_json=5
903 let row = vec![
904 rusqlite::types::Value::Integer(1),
905 rusqlite::types::Value::Text(db_event.kind.clone()),
906 rusqlite::types::Value::Text(db_event.did.as_str().to_owned()),
907 rusqlite::types::Value::Integer(db_event.time_us),
908 rusqlite::types::Value::Text(
909 db_event
910 .collection
911 .as_ref()
912 .map(|c| c.as_str().to_owned())
913 .unwrap_or_default(),
914 ),
915 rusqlite::types::Value::Text(db_event.message_json.clone()),
916 ];
917
918 let decoded = row_to_message(&row).expect("should decode row");
919 assert_eq!(original, decoded);
920 }
921}