lightweight com.atproto.sync.listReposByCollection
at main 162 lines 5.7 kB view raw
1//! Per-repo firehose event buffer for the resyncing state. 2//! 3//! While a repo's full CAR fetch is in flight, incoming firehose commits are 4//! buffered here keyed by `"rsb"<did>\0<seq_be:u64>`. After the 5//! fetch completes the worker drains the buffer, applying each event that 6//! is newer than the fetched rev, and acks (deletes) it as it goes. 7//! 8//! Keys are written exactly once — firehose sequence numbers are unique — so 9//! removals could use a weak/single delete to avoid full tombstones. Fjall 10//! 3.0.3 does not yet expose that API; switch to it when available. 11 12use std::sync::atomic::Ordering; 13 14use jacquard_common::types::string::Did; 15use tracing::debug; 16 17use crate::storage::{DbRef, PREFIX_RESYNC_BUFFER, error::StorageResult}; 18 19const NUL: u8 = b'\0'; 20 21fn key(did: Did<'_>, seq: u64) -> Vec<u8> { 22 let d = did.as_str(); 23 let mut k = Vec::with_capacity(PREFIX_RESYNC_BUFFER.len() + d.len() + 1 + 8); 24 k.extend_from_slice(&PREFIX_RESYNC_BUFFER); 25 k.extend_from_slice(d.as_bytes()); 26 k.push(NUL); 27 k.extend_from_slice(&seq.to_be_bytes()); 28 k 29} 30 31fn key_prefix(did: Did<'_>) -> Vec<u8> { 32 let d = did.as_str(); 33 let mut k = Vec::with_capacity(PREFIX_RESYNC_BUFFER.len() + d.len() + 1); 34 k.extend_from_slice(&PREFIX_RESYNC_BUFFER); 35 k.extend_from_slice(d.as_bytes()); 36 k.push(NUL); 37 k 38} 39 40/// A single buffered firehose event. 41#[derive(Debug, Clone)] 42pub struct BufferedEvent { 43 /// Firehose sequence number (relay-assigned, monotonically increasing). 44 pub seq: u64, 45 /// Raw CBOR bytes of the firehose event. 46 pub cbor: Vec<u8>, 47} 48 49/// Buffer one incoming firehose event for `did` at firehose sequence `seq`. 50/// 51/// Called by the firehose handler when the repo's state is [`Resyncing`]. 52/// 53/// [`Resyncing`]: crate::storage::repo::RepoState::Resyncing 54pub fn push_buffer(db: &DbRef, did: Did<'_>, seq: u64, cbor: &[u8]) -> StorageResult<()> { 55 debug!( 56 did = did.as_str(), 57 seq, "buffer firehose event for resyncing repo" 58 ); 59 let key = key(did, seq); 60 db.ks.insert(key, cbor)?; 61 db.stats.resync_buffer_count.fetch_add(1, Ordering::Relaxed); 62 Ok(()) 63} 64 65/// Return all buffered events for `did` in sequence order. 66/// 67/// Eagerly loads into memory. Buffers cover only the narrow window of an 68/// in-flight CAR fetch, so they are expected to be small. 69pub fn scan_buffer(db: &DbRef, did: Did<'_>) -> StorageResult<Vec<BufferedEvent>> { 70 let prefix = key_prefix(did); 71 let mut events = Vec::new(); 72 for guard in db.ks.prefix(&prefix) { 73 let (key_slice, val_slice) = guard.into_inner()?; 74 let key_bytes = key_slice.as_ref(); 75 // The suffix after the prefix is the 8-byte BE sequence number. 76 let suffix = &key_bytes[prefix.len()..]; 77 if suffix.len() != 8 { 78 continue; // corrupt key, skip 79 } 80 let seq = u64::from_be_bytes(suffix.try_into().unwrap()); 81 events.push(BufferedEvent { 82 seq, 83 cbor: val_slice.as_ref().to_vec(), 84 }); 85 } 86 Ok(events) 87} 88 89/// Delete a single buffered event after it has been successfully applied. 90/// 91/// Call this after confirming the event was processed, not before — if the 92/// worker crashes between acks, the remaining events are replayed from the 93/// start on the next resync attempt (which is safe since `apply_if_newer` 94/// skips events already covered by the fresh fetch). 95pub fn ack_buffer_entry(db: &DbRef, did: Did<'_>, seq: u64) -> StorageResult<()> { 96 debug!(did = did.as_str(), seq, "ack buffered event"); 97 let key = key(did, seq); 98 db.ks.remove(key)?; 99 db.stats.resync_buffer_count.fetch_sub(1, Ordering::Relaxed); 100 Ok(()) 101} 102 103#[cfg(test)] 104mod tests { 105 use super::*; 106 use crate::storage::open_temporary; 107 108 fn did(s: &str) -> Did<'static> { 109 Did::new_owned(s).unwrap() 110 } 111 112 #[test] 113 fn push_and_scan_returns_events_in_seq_order() { 114 let db = open_temporary().unwrap(); 115 let d = did("did:web:a.com"); 116 push_buffer(&db, d.clone(), 300, &[0x03]).unwrap(); 117 push_buffer(&db, d.clone(), 100, &[0x01]).unwrap(); 118 push_buffer(&db, d.clone(), 200, &[0x02]).unwrap(); 119 120 let events = scan_buffer(&db, d).unwrap(); 121 assert_eq!(events.len(), 3); 122 assert_eq!(events[0].seq, 100); 123 assert_eq!(events[1].seq, 200); 124 assert_eq!(events[2].seq, 300); 125 assert_eq!(events[0].cbor, &[0x01]); 126 } 127 128 #[test] 129 fn scan_is_isolated_per_did() { 130 let db = open_temporary().unwrap(); 131 push_buffer(&db, did("did:web:a.com"), 1, &[0xAA]).unwrap(); 132 push_buffer(&db, did("did:web:b.com"), 2, &[0xBB]).unwrap(); 133 134 let a_events = scan_buffer(&db, did("did:web:a.com")).unwrap(); 135 let b_events = scan_buffer(&db, did("did:web:b.com")).unwrap(); 136 assert_eq!(a_events.len(), 1); 137 assert_eq!(a_events[0].cbor, &[0xAA]); 138 assert_eq!(b_events.len(), 1); 139 assert_eq!(b_events[0].cbor, &[0xBB]); 140 } 141 142 #[test] 143 fn ack_removes_entry_and_leaves_others() { 144 let db = open_temporary().unwrap(); 145 let d = did("did:web:a.com"); 146 push_buffer(&db, d.clone(), 1, &[0x01]).unwrap(); 147 push_buffer(&db, d.clone(), 2, &[0x02]).unwrap(); 148 149 ack_buffer_entry(&db, d.clone(), 1).unwrap(); 150 151 let events = scan_buffer(&db, d).unwrap(); 152 assert_eq!(events.len(), 1); 153 assert_eq!(events[0].seq, 2); 154 } 155 156 #[test] 157 fn scan_empty_buffer_returns_empty_vec() { 158 let db = open_temporary().unwrap(); 159 let events = scan_buffer(&db, did("did:web:a.com")).unwrap(); 160 assert!(events.is_empty()); 161 } 162}