lightweight
com.atproto.sync.listReposByCollection
1//! Per-repo firehose event buffer for the resyncing state.
2//!
3//! While a repo's full CAR fetch is in flight, incoming firehose commits are
4//! buffered here keyed by `"rsb"<did>\0<seq_be:u64>`. After the
5//! fetch completes the worker drains the buffer, applying each event that
6//! is newer than the fetched rev, and acks (deletes) it as it goes.
7//!
8//! Keys are written exactly once — firehose sequence numbers are unique — so
9//! removals could use a weak/single delete to avoid full tombstones. Fjall
10//! 3.0.3 does not yet expose that API; switch to it when available.
11
12use std::sync::atomic::Ordering;
13
14use jacquard_common::types::string::Did;
15use tracing::debug;
16
17use crate::storage::{DbRef, PREFIX_RESYNC_BUFFER, error::StorageResult};
18
19const NUL: u8 = b'\0';
20
21fn key(did: Did<'_>, seq: u64) -> Vec<u8> {
22 let d = did.as_str();
23 let mut k = Vec::with_capacity(PREFIX_RESYNC_BUFFER.len() + d.len() + 1 + 8);
24 k.extend_from_slice(&PREFIX_RESYNC_BUFFER);
25 k.extend_from_slice(d.as_bytes());
26 k.push(NUL);
27 k.extend_from_slice(&seq.to_be_bytes());
28 k
29}
30
31fn key_prefix(did: Did<'_>) -> Vec<u8> {
32 let d = did.as_str();
33 let mut k = Vec::with_capacity(PREFIX_RESYNC_BUFFER.len() + d.len() + 1);
34 k.extend_from_slice(&PREFIX_RESYNC_BUFFER);
35 k.extend_from_slice(d.as_bytes());
36 k.push(NUL);
37 k
38}
39
40/// A single buffered firehose event.
41#[derive(Debug, Clone)]
42pub struct BufferedEvent {
43 /// Firehose sequence number (relay-assigned, monotonically increasing).
44 pub seq: u64,
45 /// Raw CBOR bytes of the firehose event.
46 pub cbor: Vec<u8>,
47}
48
49/// Buffer one incoming firehose event for `did` at firehose sequence `seq`.
50///
51/// Called by the firehose handler when the repo's state is [`Resyncing`].
52///
53/// [`Resyncing`]: crate::storage::repo::RepoState::Resyncing
54pub fn push_buffer(db: &DbRef, did: Did<'_>, seq: u64, cbor: &[u8]) -> StorageResult<()> {
55 debug!(
56 did = did.as_str(),
57 seq, "buffer firehose event for resyncing repo"
58 );
59 let key = key(did, seq);
60 db.ks.insert(key, cbor)?;
61 db.stats.resync_buffer_count.fetch_add(1, Ordering::Relaxed);
62 Ok(())
63}
64
65/// Return all buffered events for `did` in sequence order.
66///
67/// Eagerly loads into memory. Buffers cover only the narrow window of an
68/// in-flight CAR fetch, so they are expected to be small.
69pub fn scan_buffer(db: &DbRef, did: Did<'_>) -> StorageResult<Vec<BufferedEvent>> {
70 let prefix = key_prefix(did);
71 let mut events = Vec::new();
72 for guard in db.ks.prefix(&prefix) {
73 let (key_slice, val_slice) = guard.into_inner()?;
74 let key_bytes = key_slice.as_ref();
75 // The suffix after the prefix is the 8-byte BE sequence number.
76 let suffix = &key_bytes[prefix.len()..];
77 if suffix.len() != 8 {
78 continue; // corrupt key, skip
79 }
80 let seq = u64::from_be_bytes(suffix.try_into().unwrap());
81 events.push(BufferedEvent {
82 seq,
83 cbor: val_slice.as_ref().to_vec(),
84 });
85 }
86 Ok(events)
87}
88
89/// Delete a single buffered event after it has been successfully applied.
90///
91/// Call this after confirming the event was processed, not before — if the
92/// worker crashes between acks, the remaining events are replayed from the
93/// start on the next resync attempt (which is safe since `apply_if_newer`
94/// skips events already covered by the fresh fetch).
95pub fn ack_buffer_entry(db: &DbRef, did: Did<'_>, seq: u64) -> StorageResult<()> {
96 debug!(did = did.as_str(), seq, "ack buffered event");
97 let key = key(did, seq);
98 db.ks.remove(key)?;
99 db.stats.resync_buffer_count.fetch_sub(1, Ordering::Relaxed);
100 Ok(())
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::storage::open_temporary;
107
108 fn did(s: &str) -> Did<'static> {
109 Did::new_owned(s).unwrap()
110 }
111
112 #[test]
113 fn push_and_scan_returns_events_in_seq_order() {
114 let db = open_temporary().unwrap();
115 let d = did("did:web:a.com");
116 push_buffer(&db, d.clone(), 300, &[0x03]).unwrap();
117 push_buffer(&db, d.clone(), 100, &[0x01]).unwrap();
118 push_buffer(&db, d.clone(), 200, &[0x02]).unwrap();
119
120 let events = scan_buffer(&db, d).unwrap();
121 assert_eq!(events.len(), 3);
122 assert_eq!(events[0].seq, 100);
123 assert_eq!(events[1].seq, 200);
124 assert_eq!(events[2].seq, 300);
125 assert_eq!(events[0].cbor, &[0x01]);
126 }
127
128 #[test]
129 fn scan_is_isolated_per_did() {
130 let db = open_temporary().unwrap();
131 push_buffer(&db, did("did:web:a.com"), 1, &[0xAA]).unwrap();
132 push_buffer(&db, did("did:web:b.com"), 2, &[0xBB]).unwrap();
133
134 let a_events = scan_buffer(&db, did("did:web:a.com")).unwrap();
135 let b_events = scan_buffer(&db, did("did:web:b.com")).unwrap();
136 assert_eq!(a_events.len(), 1);
137 assert_eq!(a_events[0].cbor, &[0xAA]);
138 assert_eq!(b_events.len(), 1);
139 assert_eq!(b_events[0].cbor, &[0xBB]);
140 }
141
142 #[test]
143 fn ack_removes_entry_and_leaves_others() {
144 let db = open_temporary().unwrap();
145 let d = did("did:web:a.com");
146 push_buffer(&db, d.clone(), 1, &[0x01]).unwrap();
147 push_buffer(&db, d.clone(), 2, &[0x02]).unwrap();
148
149 ack_buffer_entry(&db, d.clone(), 1).unwrap();
150
151 let events = scan_buffer(&db, d).unwrap();
152 assert_eq!(events.len(), 1);
153 assert_eq!(events[0].seq, 2);
154 }
155
156 #[test]
157 fn scan_empty_buffer_returns_empty_vec() {
158 let db = open_temporary().unwrap();
159 let events = scan_buffer(&db, did("did:web:a.com")).unwrap();
160 assert!(events.is_empty());
161 }
162}