//! Timestamp-ordered resync queue. //! //! Keys: `"rsq"\0` //! Values: `[u16 BE retry_count][u16 BE reason_len][reason_bytes][commit_cbor_bytes]` use std::collections::HashSet; use std::sync::atomic::Ordering; use std::time::SystemTime; use fjall::util::prefixed_range; use jacquard_common::types::string::Did; use tracing::{debug, trace, warn}; use crate::storage::{ DbRef, PREFIX_RESYNC_QUEUE, error::{StorageError, StorageResult}, repo, }; const NUL: u8 = b'\0'; // --------------------------------------------------------------------------- // Key encoding // --------------------------------------------------------------------------- /// `"rsq"\0` — timestamp-ordered resync queue. /// /// Big-endian timestamp gives natural chronological ordering. fn key(ts: u64, did: &Did<'_>) -> Vec { let d = did.as_str(); let mut k = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 8 + 1 + d.len()); k.extend_from_slice(&PREFIX_RESYNC_QUEUE); k.extend_from_slice(&ts.to_be_bytes()); k.push(NUL); k.extend_from_slice(d.as_bytes()); k } /// `"rsq"` — prefix for scanning the entire queue. fn key_prefix_all() -> Vec { PREFIX_RESYNC_QUEUE.to_vec() } /// `\0` — timestamp middle component, for use as an upper bound /// after concatenating with [`key_prefix_all`]. fn key_ts_midfix(ts: u64) -> Vec { let mut k = Vec::with_capacity(9); k.extend_from_slice(&ts.to_be_bytes()); k.push(NUL); k } /// Parse a timestamp and DID from a full resync queue key. fn key_parse(raw: &[u8]) -> StorageResult<(u64, Did<'static>)> { let key_str = String::from_utf8_lossy(raw); let rest = raw .strip_prefix(&PREFIX_RESYNC_QUEUE) .ok_or(StorageError::Corrupt { key: key_str.to_string(), reason: "wrong prefix for resync queue", })?; if rest.len() < 9 { return Err(StorageError::Corrupt { key: key_str.to_string(), reason: "not enough suffix bytes for resync queue", }); } let ts_bytes: [u8; 8] = rest[..8].try_into().map_err(|_| StorageError::Corrupt { key: key_str.to_string(), reason: "not enough bytes for timestamp in resync queue", })?; let ts = u64::from_be_bytes(ts_bytes); let rest = rest[8..] .strip_prefix(&[NUL]) .ok_or(StorageError::Corrupt { key: key_str.to_string(), reason: "missing NUL separator in resync queue key", })?; let did_str = std::str::from_utf8(rest).map_err(|_| StorageError::Corrupt { key: key_str.to_string(), reason: "invalid UTF-8 for DID in resync queue", })?; let did = Did::new_owned(did_str).map_err(|_| StorageError::Corrupt { key: key_str.to_string(), reason: "invalid DID in resync queue", })?; Ok((ts, did)) } // --------------------------------------------------------------------------- // Value encoding // --------------------------------------------------------------------------- /// An item waiting in the resync queue. #[derive(Debug, Clone)] pub struct ResyncItem { pub did: Did<'static>, pub retry_count: u16, pub retry_reason: String, /// Raw CBOR of the triggering firehose commit. pub commit_cbor: Vec, } fn encode(item: &ResyncItem) -> Vec { let reason = item.retry_reason.as_bytes(); let mut v = Vec::with_capacity(2 + 2 + reason.len() + item.commit_cbor.len()); v.extend_from_slice(&item.retry_count.to_be_bytes()); v.extend_from_slice(&(reason.len() as u16).to_be_bytes()); v.extend_from_slice(reason); v.extend_from_slice(&item.commit_cbor); v } fn decode(bytes: &[u8], key_str: &str, did: Did<'static>) -> StorageResult { if bytes.len() < 4 { return Err(StorageError::Corrupt { key: key_str.to_owned(), reason: "value too short", }); } let retry_count = u16::from_be_bytes([bytes[0], bytes[1]]); let reason_len = u16::from_be_bytes([bytes[2], bytes[3]]) as usize; let rest = &bytes[4..]; if rest.len() < reason_len { return Err(StorageError::Corrupt { key: key_str.to_owned(), reason: "reason truncated", }); } let retry_reason = std::str::from_utf8(&rest[..reason_len]) .map_err(|_| StorageError::Corrupt { key: key_str.to_owned(), reason: "reason not UTF-8", })? .to_owned(); let commit_cbor = rest[reason_len..].to_vec(); Ok(ResyncItem { did, retry_count, retry_reason, commit_cbor, }) } // --------------------------------------------------------------------------- // CRUD // --------------------------------------------------------------------------- /// queue a repo into a batch pub fn enqueue_into( batch: &mut fjall::OwnedWriteBatch, db: &DbRef, when: SystemTime, item: &ResyncItem, ) { let ts = crate::util::to_millis(when); if item.retry_reason == "backfill" { trace!( did = item.did.as_str(), ts, reason = %item.retry_reason, retry = item.retry_count, "enqueue resync to batch" ); } else { debug!( did = item.did.as_str(), ts, reason = %item.retry_reason, retry = item.retry_count, "enqueue resync to batch" ); } batch.insert(&db.ks, key(ts, &item.did), encode(item)); db.stats.resync_queue_depth.fetch_add(1, Ordering::Relaxed); } /// Count the total number of entries currently in the resync queue. /// /// Performs a full prefix scan; use only for admin/diagnostic views. pub fn count_queued(db: &DbRef) -> usize { db.ks.prefix(key_prefix_all()).count() } /// Enqueue a repo for resync at the given time. pub fn enqueue(db: &DbRef, when: SystemTime, item: &ResyncItem) -> StorageResult<()> { let mut batch = db.database.batch(); enqueue_into(&mut batch, db, when, item); batch.commit()?; Ok(()) } /// Dequeue and return the next item whose timestamp is ≤ `now`. /// /// Removes the entry from the queue atomically before returning it. /// /// TODO: no, this is not atomic currently /// /// note: deleted accounts aren't removed from the resync queue so we need to /// check that (or does the caller deal with it?) /// /// `since`: we actually want to pass in a cursor so we can efficiently skip /// over tombstones. we don't have to persist the cursor to disk, but the caller /// can hold it in memory over the app's lifetime so we only pay the tomb scan /// cost once on startup. pub fn dequeue_ready( db: &DbRef, now: SystemTime, since: Option>, ) -> StorageResult)>> { let now_ms = crate::util::to_millis(now); let prefix = key_prefix_all(); let lower_suffix = since.unwrap_or(vec![]); let upper_suffix = key_ts_midfix(now_ms); let Some(guard) = db .ks .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) .next() else { return Ok(None); }; let (key_slice, val_slice) = guard.into_inner()?; let key_bytes = key_slice.as_ref(); let (ts, did) = key_parse(key_bytes)?; assert!(ts < now_ms); let key_str = String::from_utf8_lossy(key_bytes).into_owned(); let item = decode(val_slice.as_ref(), &key_str, did)?; debug!( did = item.did.as_str(), ts, reason = %item.retry_reason, retry = item.retry_count, "dequeue resync" ); db.ks.remove(key_bytes)?; let next_since = key_bytes .get(prefix.len()..) .expect("a resync queue key must start with the resync queue prefix"); Ok(Some((item, next_since.to_vec()))) } /// Claim the next ready resync job, skipping DIDs that are currently in flight. /// /// Scans the queue for the oldest entry whose timestamp is `< now` and whose /// DID is not in `busy`. On finding one it atomically (fjall batch): /// - removes the entry from the resync queue, and /// - writes `state = Resyncing` for the repo (preserving its account status). /// /// Returns the claimed item and an updated `since` cursor (same semantics as /// [`dequeue_ready`]). Returns `None` if no claimable entry exists. pub fn claim_resync( db: &DbRef, now: SystemTime, since: Option>, busy: &HashSet>, ) -> StorageResult)>> { let now_ms = crate::util::to_millis(now); let prefix = key_prefix_all(); let lower_suffix = since.unwrap_or_default(); let upper_suffix = key_ts_midfix(now_ms); for guard in db .ks .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) { let (key_slice, val_slice) = guard.into_inner()?; let key_bytes = key_slice.as_ref(); let (_, did) = key_parse(key_bytes)?; if busy.contains(&did) { debug!(did = did.as_str(), "skip busy did in resync queue"); continue; } let key_str = String::from_utf8_lossy(key_bytes).into_owned(); let item = decode(val_slice.as_ref(), &key_str, did.clone())?; let next_since = key_bytes[prefix.len()..].to_vec(); // Read current repo info to preserve account status across the transition. let repo_key = repo::key(&did); let new_info = match db.ks.get(&repo_key)? { Some(b) => { let rk = String::from_utf8_lossy(&repo_key).into_owned(); let mut info = repo::decode_repo_info(&b, &rk)?; info.state = repo::RepoState::Resyncing; info.error = None; info } None => { warn!( did = did.as_str(), "claiming resync job for did with no repo record; inserting as resyncing/active" ); repo::RepoInfo { state: repo::RepoState::Resyncing, status: repo::AccountStatus::Active, error: None, } } }; // Atomically: remove from queue + write state=Resyncing. let mut batch = db.database.batch(); batch.remove(&db.ks, key_bytes); batch.insert(&db.ks, &repo_key, repo::encode_repo_info(&new_info)); batch.commit()?; db.stats.resync_queue_depth.fetch_sub(1, Ordering::Relaxed); trace!( did = item.did.as_str(), reason = %item.retry_reason, retry = item.retry_count, "claimed resync job" ); return Ok(Some((item, next_since))); } Ok(None) } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use std::collections::HashSet; use super::*; use crate::storage::{open_temporary, repo}; use crate::util::from_millis; fn did(s: &str) -> Did<'static> { Did::new_owned(s).unwrap() } fn item(did_str: &str, retry_count: u16, reason: &str, cbor: &[u8]) -> ResyncItem { ResyncItem { did: did(did_str), retry_count, retry_reason: reason.to_owned(), commit_cbor: cbor.to_vec(), } } // --- key encoding --- #[test] fn key_structure() { let k = key(0x0102030405060708, &did("did:web:example.com")); let mut expected = b"rsq".to_vec(); expected.extend_from_slice(&0x0102030405060708u64.to_be_bytes()); expected.push(b'\0'); expected.extend_from_slice(b"did:web:example.com"); assert_eq!(k, expected); } #[test] fn key_sorts_by_timestamp() { let earlier = key(100, &did("did:web:example.com")); let later = key(200, &did("did:web:example.com")); assert!(earlier < later); } #[test] fn key_same_timestamp_sorts_by_did() { let a = key(100, &did("did:web:a.com")); let b = key(100, &did("did:web:b.com")); assert!(a < b); } #[test] fn key_parse_roundtrips() { let ts = 0xdeadbeefcafe1234u64; let d = did("did:web:example.com"); let k = key(ts, &d); let (parsed_ts, parsed_did) = key_parse(&k).unwrap(); assert_eq!(parsed_ts, ts); assert_eq!(parsed_did, d); } #[test] fn key_prefix_all_is_prefix_of_any_entry() { let prefix_all = key_prefix_all(); let k = key(100, &did("did:web:example.com")); assert!(k.starts_with(&prefix_all)); } #[test] fn key_ts_midfix_upper_bound_excludes_entries_at_that_ts() { let prefix_all = key_prefix_all(); let entry = key(100, &did("did:web:example.com")); let mut upper = prefix_all.clone(); upper.extend_from_slice(&key_ts_midfix(100)); assert!(entry >= upper); } #[test] fn key_ts_midfix_upper_bound_includes_earlier_ts() { let prefix_all = key_prefix_all(); let entry = key(99, &did("did:web:example.com")); let mut upper = prefix_all.clone(); upper.extend_from_slice(&key_ts_midfix(100)); assert!(entry < upper); } #[test] fn key_parse_returns_error_for_truncated_key() { assert!(key_parse(b"rsq").is_err()); } // --- encode / decode --- #[test] fn encode_decode_roundtrips() { let original = item("did:web:example.com", 3, "detected gap", &[0xAB, 0xCD]); let bytes = encode(&original); let decoded = decode(&bytes, "test-key", did("did:web:example.com")).unwrap(); assert_eq!(decoded.retry_count, 3); assert_eq!(decoded.retry_reason, "detected gap"); assert_eq!(decoded.commit_cbor, vec![0xAB, 0xCD]); } #[test] fn encode_decode_empty_commit_cbor() { let original = item("did:web:example.com", 0, "first attempt", &[]); let bytes = encode(&original); let decoded = decode(&bytes, "test-key", did("did:web:example.com")).unwrap(); assert_eq!(decoded.retry_count, 0); assert_eq!(decoded.retry_reason, "first attempt"); assert!(decoded.commit_cbor.is_empty()); } #[test] fn decode_rejects_truncated_header() { assert!(decode(&[0, 1, 2], "k", did("did:web:example.com")).is_err()); } // --- enqueue / dequeue_ready --- #[test] fn dequeue_returns_none_on_empty_queue() { let db = open_temporary().unwrap(); assert!( dequeue_ready(&db, from_millis(9999), None) .unwrap() .is_none() ); } #[test] fn enqueue_and_dequeue_basic() { let db = open_temporary().unwrap(); enqueue( &db, from_millis(100), &item("did:web:a.com", 0, "backfill", &[1, 2, 3]), ) .unwrap(); let (got, _cursor) = dequeue_ready(&db, from_millis(101), None).unwrap().unwrap(); assert_eq!(got.did.as_str(), "did:web:a.com"); assert_eq!(got.retry_reason, "backfill"); assert_eq!(got.commit_cbor, vec![1, 2, 3]); assert!( dequeue_ready(&db, from_millis(101), None) .unwrap() .is_none() ); } #[test] fn dequeue_excludes_items_at_or_after_now() { let db = open_temporary().unwrap(); enqueue( &db, from_millis(100), &item("did:web:a.com", 0, "test", &[]), ) .unwrap(); assert!( dequeue_ready(&db, from_millis(100), None) .unwrap() .is_none() ); assert!(dequeue_ready(&db, from_millis(99), None).unwrap().is_none()); assert!( dequeue_ready(&db, from_millis(101), None) .unwrap() .is_some() ); } #[test] fn dequeue_returns_oldest_entry_first() { let db = open_temporary().unwrap(); enqueue( &db, from_millis(200), &item("did:web:b.com", 0, "later", &[]), ) .unwrap(); enqueue( &db, from_millis(100), &item("did:web:a.com", 0, "earlier", &[]), ) .unwrap(); let (first, _) = dequeue_ready(&db, from_millis(9999), None) .unwrap() .unwrap(); assert_eq!(first.retry_reason, "earlier"); let (second, _) = dequeue_ready(&db, from_millis(9999), None) .unwrap() .unwrap(); assert_eq!(second.retry_reason, "later"); } #[test] fn since_cursor_skips_over_tombstone_region() { let db = open_temporary().unwrap(); enqueue( &db, from_millis(10), &item("did:web:a.com", 0, "first", &[]), ) .unwrap(); enqueue( &db, from_millis(20), &item("did:web:b.com", 0, "second", &[]), ) .unwrap(); let (first, cursor) = dequeue_ready(&db, from_millis(9999), None) .unwrap() .unwrap(); assert_eq!(first.retry_reason, "first"); enqueue( &db, from_millis(5), &item("did:web:late.com", 0, "late", &[]), ) .unwrap(); let (second, _) = dequeue_ready(&db, from_millis(9999), Some(cursor)) .unwrap() .unwrap(); assert_eq!(second.retry_reason, "second"); } // --- claim_resync --- fn pending_repo(db: &DbRef, did_str: &str) { repo::put_info( db, &did(did_str), &repo::RepoInfo { state: repo::RepoState::Pending, status: repo::AccountStatus::Active, error: None, }, ) .unwrap(); } #[test] fn claim_resync_transitions_state_and_dequeues() { let db = open_temporary().unwrap(); pending_repo(&db, "did:web:a.com"); enqueue( &db, from_millis(100), &item("did:web:a.com", 0, "backfill", &[]), ) .unwrap(); let (claimed, _cursor) = claim_resync(&db, from_millis(101), None, &HashSet::new()) .unwrap() .unwrap(); assert_eq!(claimed.did.as_str(), "did:web:a.com"); assert!( dequeue_ready(&db, from_millis(9999), None) .unwrap() .is_none() ); let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); assert_eq!(info.state, repo::RepoState::Resyncing); } #[test] fn claim_resync_preserves_account_status() { let db = open_temporary().unwrap(); repo::put_info( &db, &did("did:web:a.com"), &repo::RepoInfo { state: repo::RepoState::Pending, status: repo::AccountStatus::Suspended, error: None, }, ) .unwrap(); enqueue( &db, from_millis(100), &item("did:web:a.com", 0, "backfill", &[]), ) .unwrap(); claim_resync(&db, from_millis(101), None, &HashSet::new()) .unwrap() .unwrap(); let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); assert_eq!(info.status, repo::AccountStatus::Suspended); } #[test] fn claim_resync_skips_busy_dids() { let db = open_temporary().unwrap(); pending_repo(&db, "did:web:a.com"); pending_repo(&db, "did:web:b.com"); enqueue( &db, from_millis(100), &item("did:web:a.com", 0, "first", &[]), ) .unwrap(); enqueue( &db, from_millis(101), &item("did:web:b.com", 0, "second", &[]), ) .unwrap(); let mut busy: HashSet> = HashSet::new(); busy.insert(did("did:web:a.com")); let (claimed, _) = claim_resync(&db, from_millis(9999), None, &busy) .unwrap() .unwrap(); assert_eq!(claimed.did.as_str(), "did:web:b.com"); } #[test] fn claim_resync_returns_none_when_all_ready_are_busy() { let db = open_temporary().unwrap(); pending_repo(&db, "did:web:a.com"); enqueue( &db, from_millis(100), &item("did:web:a.com", 0, "only", &[]), ) .unwrap(); let mut busy: HashSet> = HashSet::new(); busy.insert(did("did:web:a.com")); assert!( claim_resync(&db, from_millis(9999), None, &busy) .unwrap() .is_none() ); } #[test] fn consecutive_dequeues_drain_in_order() { let db = open_temporary().unwrap(); enqueue( &db, from_millis(10), &item("did:web:a.com", 0, "first", &[]), ) .unwrap(); enqueue( &db, from_millis(20), &item("did:web:b.com", 0, "second", &[]), ) .unwrap(); enqueue( &db, from_millis(30), &item("did:web:c.com", 0, "third", &[]), ) .unwrap(); let (a, _) = dequeue_ready(&db, from_millis(9999), None) .unwrap() .unwrap(); let (b, _) = dequeue_ready(&db, from_millis(9999), None) .unwrap() .unwrap(); let (c, _) = dequeue_ready(&db, from_millis(9999), None) .unwrap() .unwrap(); assert!( dequeue_ready(&db, from_millis(9999), None) .unwrap() .is_none() ); assert_eq!(a.retry_reason, "first"); assert_eq!(b.retry_reason, "second"); assert_eq!(c.retry_reason, "third"); } }