lightweight com.atproto.sync.listReposByCollection
at main 589 lines 20 kB view raw
1//! Timestamp-ordered resync queue. 2//! 3//! Keys: `"rsq"<ts_be:u64>\0<did>` 4//! Values: `[u16 BE retry_count][u16 BE reason_len][reason_bytes][commit_cbor_bytes]` 5 6use std::collections::HashSet; 7use std::sync::atomic::Ordering; 8 9use fjall::util::prefixed_range; 10use jacquard_common::types::string::Did; 11use tracing::{debug, trace, warn}; 12 13use crate::storage::{ 14 DbRef, PREFIX_RESYNC_QUEUE, 15 error::{StorageError, StorageResult}, 16 repo, 17}; 18 19const NUL: u8 = b'\0'; 20 21// --------------------------------------------------------------------------- 22// Key encoding 23// --------------------------------------------------------------------------- 24 25/// `"rsq"<ts_be:u64>\0<did>` — timestamp-ordered resync queue. 26/// 27/// Big-endian timestamp gives natural chronological ordering. 28fn key(ts: u64, did: &Did<'_>) -> Vec<u8> { 29 let d = did.as_str(); 30 let mut k = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 8 + 1 + d.len()); 31 k.extend_from_slice(&PREFIX_RESYNC_QUEUE); 32 k.extend_from_slice(&ts.to_be_bytes()); 33 k.push(NUL); 34 k.extend_from_slice(d.as_bytes()); 35 k 36} 37 38/// `"rsq"` — prefix for scanning the entire queue. 39fn key_prefix_all() -> Vec<u8> { 40 PREFIX_RESYNC_QUEUE.to_vec() 41} 42 43/// `<ts_be:u64>\0` — timestamp middle component, for use as an upper bound 44/// after concatenating with [`key_prefix_all`]. 45fn key_ts_midfix(ts: u64) -> Vec<u8> { 46 let mut k = Vec::with_capacity(9); 47 k.extend_from_slice(&ts.to_be_bytes()); 48 k.push(NUL); 49 k 50} 51 52/// Parse a timestamp and DID from a full resync queue key. 53fn key_parse(raw: &[u8]) -> StorageResult<(u64, Did<'static>)> { 54 let key_str = String::from_utf8_lossy(raw); 55 let rest = raw 56 .strip_prefix(&PREFIX_RESYNC_QUEUE) 57 .ok_or(StorageError::Corrupt { 58 key: key_str.to_string(), 59 reason: "wrong prefix for resync queue", 60 })?; 61 if rest.len() < 9 { 62 return Err(StorageError::Corrupt { 63 key: key_str.to_string(), 64 reason: "not enough suffix bytes for resync queue", 65 }); 66 } 67 let ts_bytes: [u8; 8] = rest[..8].try_into().map_err(|_| StorageError::Corrupt { 68 key: key_str.to_string(), 69 reason: "not enough bytes for timestamp in resync queue", 70 })?; 71 let ts = u64::from_be_bytes(ts_bytes); 72 let rest = rest[8..] 73 .strip_prefix(&[NUL]) 74 .ok_or(StorageError::Corrupt { 75 key: key_str.to_string(), 76 reason: "missing NUL separator in resync queue key", 77 })?; 78 let did_str = std::str::from_utf8(rest).map_err(|_| StorageError::Corrupt { 79 key: key_str.to_string(), 80 reason: "invalid UTF-8 for DID in resync queue", 81 })?; 82 let did = Did::new_owned(did_str).map_err(|_| StorageError::Corrupt { 83 key: key_str.to_string(), 84 reason: "invalid DID in resync queue", 85 })?; 86 Ok((ts, did)) 87} 88 89// --------------------------------------------------------------------------- 90// Value encoding 91// --------------------------------------------------------------------------- 92 93/// An item waiting in the resync queue. 94#[derive(Debug, Clone)] 95pub struct ResyncItem { 96 pub did: Did<'static>, 97 pub retry_count: u16, 98 pub retry_reason: String, 99 /// Raw CBOR of the triggering firehose commit. 100 pub commit_cbor: Vec<u8>, 101} 102 103fn encode(item: &ResyncItem) -> Vec<u8> { 104 let reason = item.retry_reason.as_bytes(); 105 let mut v = Vec::with_capacity(2 + 2 + reason.len() + item.commit_cbor.len()); 106 v.extend_from_slice(&item.retry_count.to_be_bytes()); 107 v.extend_from_slice(&(reason.len() as u16).to_be_bytes()); 108 v.extend_from_slice(reason); 109 v.extend_from_slice(&item.commit_cbor); 110 v 111} 112 113fn decode(bytes: &[u8], key_str: &str, did: Did<'static>) -> StorageResult<ResyncItem> { 114 if bytes.len() < 4 { 115 return Err(StorageError::Corrupt { 116 key: key_str.to_owned(), 117 reason: "value too short", 118 }); 119 } 120 let retry_count = u16::from_be_bytes([bytes[0], bytes[1]]); 121 let reason_len = u16::from_be_bytes([bytes[2], bytes[3]]) as usize; 122 let rest = &bytes[4..]; 123 if rest.len() < reason_len { 124 return Err(StorageError::Corrupt { 125 key: key_str.to_owned(), 126 reason: "reason truncated", 127 }); 128 } 129 let retry_reason = std::str::from_utf8(&rest[..reason_len]) 130 .map_err(|_| StorageError::Corrupt { 131 key: key_str.to_owned(), 132 reason: "reason not UTF-8", 133 })? 134 .to_owned(); 135 let commit_cbor = rest[reason_len..].to_vec(); 136 Ok(ResyncItem { 137 did, 138 retry_count, 139 retry_reason, 140 commit_cbor, 141 }) 142} 143 144// --------------------------------------------------------------------------- 145// CRUD 146// --------------------------------------------------------------------------- 147 148/// queue a repo into a batch 149pub fn enqueue_into(batch: &mut fjall::OwnedWriteBatch, db: &DbRef, ts: u64, item: &ResyncItem) { 150 if item.retry_reason == "backfill" { 151 trace!( 152 did = item.did.as_str(), 153 ts, 154 reason = %item.retry_reason, 155 retry = item.retry_count, 156 "enqueue resync to batch" 157 ); 158 } else { 159 debug!( 160 did = item.did.as_str(), 161 ts, 162 reason = %item.retry_reason, 163 retry = item.retry_count, 164 "enqueue resync to batch" 165 ); 166 } 167 batch.insert(&db.ks, key(ts, &item.did), encode(item)); 168 db.stats.resync_queue_depth.fetch_add(1, Ordering::Relaxed); 169} 170 171/// Count the total number of entries currently in the resync queue. 172/// 173/// Performs a full prefix scan; use only for admin/diagnostic views. 174pub fn count_queued(db: &DbRef) -> usize { 175 db.ks.prefix(key_prefix_all()).count() 176} 177 178/// Enqueue a repo for resync at the given Unix timestamp (seconds). 179pub fn enqueue(db: &DbRef, ts: u64, item: &ResyncItem) -> StorageResult<()> { 180 let mut batch = db.database.batch(); 181 enqueue_into(&mut batch, db, ts, item); 182 batch.commit()?; 183 Ok(()) 184} 185 186/// Dequeue and return the next item whose timestamp is ≤ `now`. 187/// 188/// Removes the entry from the queue atomically before returning it. 189/// 190/// TODO: no, this is not atomic currently 191/// 192/// note: deleted accounts aren't removed from the resync queue so we need to 193/// check that (or does the caller deal with it?) 194/// 195/// `since`: we actually want to pass in a cursor so we can efficiently skip 196/// over tombstones. we don't have to persist the cursor to disk, but the caller 197/// can hold it in memory over the app's lifetime so we only pay the tomb scan 198/// cost once on startup. 199pub fn dequeue_ready( 200 db: &DbRef, 201 now: u64, 202 since: Option<Vec<u8>>, 203) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 204 let prefix = key_prefix_all(); 205 206 let lower_suffix = since.unwrap_or(vec![]); 207 let upper_suffix = key_ts_midfix(now); 208 209 let Some(guard) = db 210 .ks 211 .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) 212 .next() 213 else { 214 return Ok(None); 215 }; 216 217 let (key_slice, val_slice) = guard.into_inner()?; 218 let key_bytes = key_slice.as_ref(); 219 let (ts, did) = key_parse(key_bytes)?; 220 assert!(ts < now); 221 let key_str = String::from_utf8_lossy(key_bytes).into_owned(); 222 let item = decode(val_slice.as_ref(), &key_str, did)?; 223 debug!( 224 did = item.did.as_str(), 225 ts, 226 reason = %item.retry_reason, 227 retry = item.retry_count, 228 "dequeue resync" 229 ); 230 db.ks.remove(key_bytes)?; 231 let next_since = key_bytes 232 .get(prefix.len()..) 233 .expect("a resync queue key must start with the resync queue prefix"); 234 Ok(Some((item, next_since.to_vec()))) 235} 236 237/// Claim the next ready resync job, skipping DIDs that are currently in flight. 238/// 239/// Scans the queue for the oldest entry whose timestamp is `< now` and whose 240/// DID is not in `busy`. On finding one it atomically (fjall batch): 241/// - removes the entry from the resync queue, and 242/// - writes `state = Resyncing` for the repo (preserving its account status). 243/// 244/// Returns the claimed item and an updated `since` cursor (same semantics as 245/// [`dequeue_ready`]). Returns `None` if no claimable entry exists. 246pub fn claim_resync( 247 db: &DbRef, 248 now: u64, 249 since: Option<Vec<u8>>, 250 busy: &HashSet<Did<'_>>, 251) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> { 252 let prefix = key_prefix_all(); 253 let lower_suffix = since.unwrap_or_default(); 254 let upper_suffix = key_ts_midfix(now); 255 256 for guard in db 257 .ks 258 .range(prefixed_range(&prefix, lower_suffix..upper_suffix)) 259 { 260 let (key_slice, val_slice) = guard.into_inner()?; 261 let key_bytes = key_slice.as_ref(); 262 let (_, did) = key_parse(key_bytes)?; 263 264 if busy.contains(&did) { 265 debug!(did = did.as_str(), "skip busy did in resync queue"); 266 continue; 267 } 268 269 let key_str = String::from_utf8_lossy(key_bytes).into_owned(); 270 let item = decode(val_slice.as_ref(), &key_str, did.clone())?; 271 let next_since = key_bytes[prefix.len()..].to_vec(); 272 273 // Read current repo info to preserve account status across the transition. 274 let repo_key = repo::key(&did); 275 let new_info = match db.ks.get(&repo_key)? { 276 Some(b) => { 277 let rk = String::from_utf8_lossy(&repo_key).into_owned(); 278 let mut info = repo::decode_repo_info(&b, &rk)?; 279 info.state = repo::RepoState::Resyncing; 280 info.error = None; 281 info 282 } 283 None => { 284 warn!( 285 did = did.as_str(), 286 "claiming resync job for did with no repo record; inserting as resyncing/active" 287 ); 288 repo::RepoInfo { 289 state: repo::RepoState::Resyncing, 290 status: repo::AccountStatus::Active, 291 error: None, 292 } 293 } 294 }; 295 296 // Atomically: remove from queue + write state=Resyncing. 297 let mut batch = db.database.batch(); 298 batch.remove(&db.ks, key_bytes); 299 batch.insert(&db.ks, &repo_key, repo::encode_repo_info(&new_info)); 300 batch.commit()?; 301 302 db.stats.resync_queue_depth.fetch_sub(1, Ordering::Relaxed); 303 trace!( 304 did = item.did.as_str(), 305 reason = %item.retry_reason, 306 retry = item.retry_count, 307 "claimed resync job" 308 ); 309 return Ok(Some((item, next_since))); 310 } 311 312 Ok(None) 313} 314 315// --------------------------------------------------------------------------- 316// Tests 317// --------------------------------------------------------------------------- 318 319#[cfg(test)] 320mod tests { 321 use std::collections::HashSet; 322 323 use super::*; 324 use crate::storage::{open_temporary, repo}; 325 326 fn did(s: &str) -> Did<'static> { 327 Did::new_owned(s).unwrap() 328 } 329 330 fn item(did_str: &str, retry_count: u16, reason: &str, cbor: &[u8]) -> ResyncItem { 331 ResyncItem { 332 did: did(did_str), 333 retry_count, 334 retry_reason: reason.to_owned(), 335 commit_cbor: cbor.to_vec(), 336 } 337 } 338 339 // --- key encoding --- 340 341 #[test] 342 fn key_structure() { 343 let k = key(0x0102030405060708, &did("did:web:example.com")); 344 let mut expected = b"rsq".to_vec(); 345 expected.extend_from_slice(&0x0102030405060708u64.to_be_bytes()); 346 expected.push(b'\0'); 347 expected.extend_from_slice(b"did:web:example.com"); 348 assert_eq!(k, expected); 349 } 350 351 #[test] 352 fn key_sorts_by_timestamp() { 353 let earlier = key(100, &did("did:web:example.com")); 354 let later = key(200, &did("did:web:example.com")); 355 assert!(earlier < later); 356 } 357 358 #[test] 359 fn key_same_timestamp_sorts_by_did() { 360 let a = key(100, &did("did:web:a.com")); 361 let b = key(100, &did("did:web:b.com")); 362 assert!(a < b); 363 } 364 365 #[test] 366 fn key_parse_roundtrips() { 367 let ts = 0xdeadbeefcafe1234u64; 368 let d = did("did:web:example.com"); 369 let k = key(ts, &d); 370 let (parsed_ts, parsed_did) = key_parse(&k).unwrap(); 371 assert_eq!(parsed_ts, ts); 372 assert_eq!(parsed_did, d); 373 } 374 375 #[test] 376 fn key_prefix_all_is_prefix_of_any_entry() { 377 let prefix_all = key_prefix_all(); 378 let k = key(100, &did("did:web:example.com")); 379 assert!(k.starts_with(&prefix_all)); 380 } 381 382 #[test] 383 fn key_ts_midfix_upper_bound_excludes_entries_at_that_ts() { 384 let prefix_all = key_prefix_all(); 385 let entry = key(100, &did("did:web:example.com")); 386 let mut upper = prefix_all.clone(); 387 upper.extend_from_slice(&key_ts_midfix(100)); 388 assert!(entry >= upper); 389 } 390 391 #[test] 392 fn key_ts_midfix_upper_bound_includes_earlier_ts() { 393 let prefix_all = key_prefix_all(); 394 let entry = key(99, &did("did:web:example.com")); 395 let mut upper = prefix_all.clone(); 396 upper.extend_from_slice(&key_ts_midfix(100)); 397 assert!(entry < upper); 398 } 399 400 #[test] 401 fn key_parse_returns_error_for_truncated_key() { 402 assert!(key_parse(b"rsq").is_err()); 403 } 404 405 // --- encode / decode --- 406 407 #[test] 408 fn encode_decode_roundtrips() { 409 let original = item("did:web:example.com", 3, "detected gap", &[0xAB, 0xCD]); 410 let bytes = encode(&original); 411 let decoded = decode(&bytes, "test-key", did("did:web:example.com")).unwrap(); 412 assert_eq!(decoded.retry_count, 3); 413 assert_eq!(decoded.retry_reason, "detected gap"); 414 assert_eq!(decoded.commit_cbor, vec![0xAB, 0xCD]); 415 } 416 417 #[test] 418 fn encode_decode_empty_commit_cbor() { 419 let original = item("did:web:example.com", 0, "first attempt", &[]); 420 let bytes = encode(&original); 421 let decoded = decode(&bytes, "test-key", did("did:web:example.com")).unwrap(); 422 assert_eq!(decoded.retry_count, 0); 423 assert_eq!(decoded.retry_reason, "first attempt"); 424 assert!(decoded.commit_cbor.is_empty()); 425 } 426 427 #[test] 428 fn decode_rejects_truncated_header() { 429 assert!(decode(&[0, 1, 2], "k", did("did:web:example.com")).is_err()); 430 } 431 432 // --- enqueue / dequeue_ready --- 433 434 #[test] 435 fn dequeue_returns_none_on_empty_queue() { 436 let db = open_temporary().unwrap(); 437 assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 438 } 439 440 #[test] 441 fn enqueue_and_dequeue_basic() { 442 let db = open_temporary().unwrap(); 443 enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[1, 2, 3])).unwrap(); 444 445 let (got, _cursor) = dequeue_ready(&db, 101, None).unwrap().unwrap(); 446 assert_eq!(got.did.as_str(), "did:web:a.com"); 447 assert_eq!(got.retry_reason, "backfill"); 448 assert_eq!(got.commit_cbor, vec![1, 2, 3]); 449 450 assert!(dequeue_ready(&db, 101, None).unwrap().is_none()); 451 } 452 453 #[test] 454 fn dequeue_excludes_items_at_or_after_now() { 455 let db = open_temporary().unwrap(); 456 enqueue(&db, 100, &item("did:web:a.com", 0, "test", &[])).unwrap(); 457 458 assert!(dequeue_ready(&db, 100, None).unwrap().is_none()); 459 assert!(dequeue_ready(&db, 99, None).unwrap().is_none()); 460 assert!(dequeue_ready(&db, 101, None).unwrap().is_some()); 461 } 462 463 #[test] 464 fn dequeue_returns_oldest_entry_first() { 465 let db = open_temporary().unwrap(); 466 enqueue(&db, 200, &item("did:web:b.com", 0, "later", &[])).unwrap(); 467 enqueue(&db, 100, &item("did:web:a.com", 0, "earlier", &[])).unwrap(); 468 469 let (first, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 470 assert_eq!(first.retry_reason, "earlier"); 471 472 let (second, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 473 assert_eq!(second.retry_reason, "later"); 474 } 475 476 #[test] 477 fn since_cursor_skips_over_tombstone_region() { 478 let db = open_temporary().unwrap(); 479 enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap(); 480 enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap(); 481 482 let (first, cursor) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 483 assert_eq!(first.retry_reason, "first"); 484 485 enqueue(&db, 5, &item("did:web:late.com", 0, "late", &[])).unwrap(); 486 487 let (second, _) = dequeue_ready(&db, 9999, Some(cursor)).unwrap().unwrap(); 488 assert_eq!(second.retry_reason, "second"); 489 } 490 491 // --- claim_resync --- 492 493 fn pending_repo(db: &DbRef, did_str: &str) { 494 repo::put_info( 495 db, 496 &did(did_str), 497 &repo::RepoInfo { 498 state: repo::RepoState::Pending, 499 status: repo::AccountStatus::Active, 500 error: None, 501 }, 502 ) 503 .unwrap(); 504 } 505 506 #[test] 507 fn claim_resync_transitions_state_and_dequeues() { 508 let db = open_temporary().unwrap(); 509 pending_repo(&db, "did:web:a.com"); 510 enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[])).unwrap(); 511 512 let (claimed, _cursor) = claim_resync(&db, 101, None, &HashSet::new()) 513 .unwrap() 514 .unwrap(); 515 assert_eq!(claimed.did.as_str(), "did:web:a.com"); 516 517 assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 518 519 let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); 520 assert_eq!(info.state, repo::RepoState::Resyncing); 521 } 522 523 #[test] 524 fn claim_resync_preserves_account_status() { 525 let db = open_temporary().unwrap(); 526 repo::put_info( 527 &db, 528 &did("did:web:a.com"), 529 &repo::RepoInfo { 530 state: repo::RepoState::Pending, 531 status: repo::AccountStatus::Suspended, 532 error: None, 533 }, 534 ) 535 .unwrap(); 536 enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[])).unwrap(); 537 538 claim_resync(&db, 101, None, &HashSet::new()) 539 .unwrap() 540 .unwrap(); 541 542 let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap(); 543 assert_eq!(info.status, repo::AccountStatus::Suspended); 544 } 545 546 #[test] 547 fn claim_resync_skips_busy_dids() { 548 let db = open_temporary().unwrap(); 549 pending_repo(&db, "did:web:a.com"); 550 pending_repo(&db, "did:web:b.com"); 551 enqueue(&db, 100, &item("did:web:a.com", 0, "first", &[])).unwrap(); 552 enqueue(&db, 101, &item("did:web:b.com", 0, "second", &[])).unwrap(); 553 554 let mut busy: HashSet<Did<'static>> = HashSet::new(); 555 busy.insert(did("did:web:a.com")); 556 557 let (claimed, _) = claim_resync(&db, 9999, None, &busy).unwrap().unwrap(); 558 assert_eq!(claimed.did.as_str(), "did:web:b.com"); 559 } 560 561 #[test] 562 fn claim_resync_returns_none_when_all_ready_are_busy() { 563 let db = open_temporary().unwrap(); 564 pending_repo(&db, "did:web:a.com"); 565 enqueue(&db, 100, &item("did:web:a.com", 0, "only", &[])).unwrap(); 566 567 let mut busy: HashSet<Did<'static>> = HashSet::new(); 568 busy.insert(did("did:web:a.com")); 569 570 assert!(claim_resync(&db, 9999, None, &busy).unwrap().is_none()); 571 } 572 573 #[test] 574 fn consecutive_dequeues_drain_in_order() { 575 let db = open_temporary().unwrap(); 576 enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap(); 577 enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap(); 578 enqueue(&db, 30, &item("did:web:c.com", 0, "third", &[])).unwrap(); 579 580 let (a, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 581 let (b, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 582 let (c, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap(); 583 assert!(dequeue_ready(&db, 9999, None).unwrap().is_none()); 584 585 assert_eq!(a.retry_reason, "first"); 586 assert_eq!(b.retry_reason, "second"); 587 assert_eq!(c.retry_reason, "third"); 588 } 589}