//! Per-repo firehose event buffer for the resyncing state. //! //! While a repo's full CAR fetch is in flight, incoming firehose commits are //! buffered here keyed by `"rsb"\0`. After the //! fetch completes the worker drains the buffer, applying each event that //! is newer than the fetched rev, and acks (deletes) it as it goes. //! //! Keys are written exactly once — firehose sequence numbers are unique — so //! removals could use a weak/single delete to avoid full tombstones. Fjall //! 3.0.3 does not yet expose that API; switch to it when available. use std::sync::atomic::Ordering; use jacquard_common::types::string::Did; use tracing::debug; use crate::storage::{DbRef, PREFIX_RESYNC_BUFFER, error::StorageResult}; const NUL: u8 = b'\0'; fn key(did: Did<'_>, seq: u64) -> Vec { let d = did.as_str(); let mut k = Vec::with_capacity(PREFIX_RESYNC_BUFFER.len() + d.len() + 1 + 8); k.extend_from_slice(&PREFIX_RESYNC_BUFFER); k.extend_from_slice(d.as_bytes()); k.push(NUL); k.extend_from_slice(&seq.to_be_bytes()); k } fn key_prefix(did: Did<'_>) -> Vec { let d = did.as_str(); let mut k = Vec::with_capacity(PREFIX_RESYNC_BUFFER.len() + d.len() + 1); k.extend_from_slice(&PREFIX_RESYNC_BUFFER); k.extend_from_slice(d.as_bytes()); k.push(NUL); k } /// A single buffered firehose event. #[derive(Debug, Clone)] pub struct BufferedEvent { /// Firehose sequence number (relay-assigned, monotonically increasing). pub seq: u64, /// Raw CBOR bytes of the firehose event. pub cbor: Vec, } /// Buffer one incoming firehose event for `did` at firehose sequence `seq`. /// /// Called by the firehose handler when the repo's state is [`Resyncing`]. /// /// [`Resyncing`]: crate::storage::repo::RepoState::Resyncing pub fn push_buffer(db: &DbRef, did: Did<'_>, seq: u64, cbor: &[u8]) -> StorageResult<()> { debug!( did = did.as_str(), seq, "buffer firehose event for resyncing repo" ); let key = key(did, seq); db.ks.insert(key, cbor)?; db.stats.resync_buffer_count.fetch_add(1, Ordering::Relaxed); Ok(()) } /// Return all buffered events for `did` in sequence order. /// /// Eagerly loads into memory. Buffers cover only the narrow window of an /// in-flight CAR fetch, so they are expected to be small. pub fn scan_buffer(db: &DbRef, did: Did<'_>) -> StorageResult> { let prefix = key_prefix(did); let mut events = Vec::new(); for guard in db.ks.prefix(&prefix) { let (key_slice, val_slice) = guard.into_inner()?; let key_bytes = key_slice.as_ref(); // The suffix after the prefix is the 8-byte BE sequence number. let suffix = &key_bytes[prefix.len()..]; if suffix.len() != 8 { continue; // corrupt key, skip } let seq = u64::from_be_bytes(suffix.try_into().unwrap()); events.push(BufferedEvent { seq, cbor: val_slice.as_ref().to_vec(), }); } Ok(events) } /// Delete a single buffered event after it has been successfully applied. /// /// Call this after confirming the event was processed, not before — if the /// worker crashes between acks, the remaining events are replayed from the /// start on the next resync attempt (which is safe since `apply_if_newer` /// skips events already covered by the fresh fetch). pub fn ack_buffer_entry(db: &DbRef, did: Did<'_>, seq: u64) -> StorageResult<()> { debug!(did = did.as_str(), seq, "ack buffered event"); let key = key(did, seq); db.ks.remove(key)?; db.stats.resync_buffer_count.fetch_sub(1, Ordering::Relaxed); Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::storage::open_temporary; fn did(s: &str) -> Did<'static> { Did::new_owned(s).unwrap() } #[test] fn push_and_scan_returns_events_in_seq_order() { let db = open_temporary().unwrap(); let d = did("did:web:a.com"); push_buffer(&db, d.clone(), 300, &[0x03]).unwrap(); push_buffer(&db, d.clone(), 100, &[0x01]).unwrap(); push_buffer(&db, d.clone(), 200, &[0x02]).unwrap(); let events = scan_buffer(&db, d).unwrap(); assert_eq!(events.len(), 3); assert_eq!(events[0].seq, 100); assert_eq!(events[1].seq, 200); assert_eq!(events[2].seq, 300); assert_eq!(events[0].cbor, &[0x01]); } #[test] fn scan_is_isolated_per_did() { let db = open_temporary().unwrap(); push_buffer(&db, did("did:web:a.com"), 1, &[0xAA]).unwrap(); push_buffer(&db, did("did:web:b.com"), 2, &[0xBB]).unwrap(); let a_events = scan_buffer(&db, did("did:web:a.com")).unwrap(); let b_events = scan_buffer(&db, did("did:web:b.com")).unwrap(); assert_eq!(a_events.len(), 1); assert_eq!(a_events[0].cbor, &[0xAA]); assert_eq!(b_events.len(), 1); assert_eq!(b_events[0].cbor, &[0xBB]); } #[test] fn ack_removes_entry_and_leaves_others() { let db = open_temporary().unwrap(); let d = did("did:web:a.com"); push_buffer(&db, d.clone(), 1, &[0x01]).unwrap(); push_buffer(&db, d.clone(), 2, &[0x02]).unwrap(); ack_buffer_entry(&db, d.clone(), 1).unwrap(); let events = scan_buffer(&db, d).unwrap(); assert_eq!(events.len(), 1); assert_eq!(events[0].seq, 2); } #[test] fn scan_empty_buffer_returns_empty_vec() { let db = open_temporary().unwrap(); let events = scan_buffer(&db, did("did:web:a.com")).unwrap(); assert!(events.is_empty()); } }