lightweight
com.atproto.sync.listReposByCollection
1//! Timestamp-ordered resync queue.
2//!
3//! Keys: `"rsq"<ts_be:u64>\0<did>`
4//! Values: `[u16 BE retry_count][u16 BE reason_len][reason_bytes][commit_cbor_bytes]`
5
6use std::collections::HashSet;
7use std::sync::atomic::Ordering;
8
9use fjall::util::prefixed_range;
10use jacquard_common::types::string::Did;
11use tracing::{debug, trace, warn};
12
13use crate::storage::{
14 DbRef, PREFIX_RESYNC_QUEUE,
15 error::{StorageError, StorageResult},
16 repo,
17};
18
19const NUL: u8 = b'\0';
20
21// ---------------------------------------------------------------------------
22// Key encoding
23// ---------------------------------------------------------------------------
24
25/// `"rsq"<ts_be:u64>\0<did>` — timestamp-ordered resync queue.
26///
27/// Big-endian timestamp gives natural chronological ordering.
28fn key(ts: u64, did: &Did<'_>) -> Vec<u8> {
29 let d = did.as_str();
30 let mut k = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 8 + 1 + d.len());
31 k.extend_from_slice(&PREFIX_RESYNC_QUEUE);
32 k.extend_from_slice(&ts.to_be_bytes());
33 k.push(NUL);
34 k.extend_from_slice(d.as_bytes());
35 k
36}
37
38/// `"rsq"` — prefix for scanning the entire queue.
39fn key_prefix_all() -> Vec<u8> {
40 PREFIX_RESYNC_QUEUE.to_vec()
41}
42
43/// `<ts_be:u64>\0` — timestamp middle component, for use as an upper bound
44/// after concatenating with [`key_prefix_all`].
45fn key_ts_midfix(ts: u64) -> Vec<u8> {
46 let mut k = Vec::with_capacity(9);
47 k.extend_from_slice(&ts.to_be_bytes());
48 k.push(NUL);
49 k
50}
51
52/// Parse a timestamp and DID from a full resync queue key.
53fn key_parse(raw: &[u8]) -> StorageResult<(u64, Did<'static>)> {
54 let key_str = String::from_utf8_lossy(raw);
55 let rest = raw
56 .strip_prefix(&PREFIX_RESYNC_QUEUE)
57 .ok_or(StorageError::Corrupt {
58 key: key_str.to_string(),
59 reason: "wrong prefix for resync queue",
60 })?;
61 if rest.len() < 9 {
62 return Err(StorageError::Corrupt {
63 key: key_str.to_string(),
64 reason: "not enough suffix bytes for resync queue",
65 });
66 }
67 let ts_bytes: [u8; 8] = rest[..8].try_into().map_err(|_| StorageError::Corrupt {
68 key: key_str.to_string(),
69 reason: "not enough bytes for timestamp in resync queue",
70 })?;
71 let ts = u64::from_be_bytes(ts_bytes);
72 let rest = rest[8..]
73 .strip_prefix(&[NUL])
74 .ok_or(StorageError::Corrupt {
75 key: key_str.to_string(),
76 reason: "missing NUL separator in resync queue key",
77 })?;
78 let did_str = std::str::from_utf8(rest).map_err(|_| StorageError::Corrupt {
79 key: key_str.to_string(),
80 reason: "invalid UTF-8 for DID in resync queue",
81 })?;
82 let did = Did::new_owned(did_str).map_err(|_| StorageError::Corrupt {
83 key: key_str.to_string(),
84 reason: "invalid DID in resync queue",
85 })?;
86 Ok((ts, did))
87}
88
89// ---------------------------------------------------------------------------
90// Value encoding
91// ---------------------------------------------------------------------------
92
93/// An item waiting in the resync queue.
94#[derive(Debug, Clone)]
95pub struct ResyncItem {
96 pub did: Did<'static>,
97 pub retry_count: u16,
98 pub retry_reason: String,
99 /// Raw CBOR of the triggering firehose commit.
100 pub commit_cbor: Vec<u8>,
101}
102
103fn encode(item: &ResyncItem) -> Vec<u8> {
104 let reason = item.retry_reason.as_bytes();
105 let mut v = Vec::with_capacity(2 + 2 + reason.len() + item.commit_cbor.len());
106 v.extend_from_slice(&item.retry_count.to_be_bytes());
107 v.extend_from_slice(&(reason.len() as u16).to_be_bytes());
108 v.extend_from_slice(reason);
109 v.extend_from_slice(&item.commit_cbor);
110 v
111}
112
113fn decode(bytes: &[u8], key_str: &str, did: Did<'static>) -> StorageResult<ResyncItem> {
114 if bytes.len() < 4 {
115 return Err(StorageError::Corrupt {
116 key: key_str.to_owned(),
117 reason: "value too short",
118 });
119 }
120 let retry_count = u16::from_be_bytes([bytes[0], bytes[1]]);
121 let reason_len = u16::from_be_bytes([bytes[2], bytes[3]]) as usize;
122 let rest = &bytes[4..];
123 if rest.len() < reason_len {
124 return Err(StorageError::Corrupt {
125 key: key_str.to_owned(),
126 reason: "reason truncated",
127 });
128 }
129 let retry_reason = std::str::from_utf8(&rest[..reason_len])
130 .map_err(|_| StorageError::Corrupt {
131 key: key_str.to_owned(),
132 reason: "reason not UTF-8",
133 })?
134 .to_owned();
135 let commit_cbor = rest[reason_len..].to_vec();
136 Ok(ResyncItem {
137 did,
138 retry_count,
139 retry_reason,
140 commit_cbor,
141 })
142}
143
144// ---------------------------------------------------------------------------
145// CRUD
146// ---------------------------------------------------------------------------
147
148/// queue a repo into a batch
149pub fn enqueue_into(batch: &mut fjall::OwnedWriteBatch, db: &DbRef, ts: u64, item: &ResyncItem) {
150 if item.retry_reason == "backfill" {
151 trace!(
152 did = item.did.as_str(),
153 ts,
154 reason = %item.retry_reason,
155 retry = item.retry_count,
156 "enqueue resync to batch"
157 );
158 } else {
159 debug!(
160 did = item.did.as_str(),
161 ts,
162 reason = %item.retry_reason,
163 retry = item.retry_count,
164 "enqueue resync to batch"
165 );
166 }
167 batch.insert(&db.ks, key(ts, &item.did), encode(item));
168 db.stats.resync_queue_depth.fetch_add(1, Ordering::Relaxed);
169}
170
171/// Count the total number of entries currently in the resync queue.
172///
173/// Performs a full prefix scan; use only for admin/diagnostic views.
174pub fn count_queued(db: &DbRef) -> usize {
175 db.ks.prefix(key_prefix_all()).count()
176}
177
178/// Enqueue a repo for resync at the given Unix timestamp (seconds).
179pub fn enqueue(db: &DbRef, ts: u64, item: &ResyncItem) -> StorageResult<()> {
180 let mut batch = db.database.batch();
181 enqueue_into(&mut batch, db, ts, item);
182 batch.commit()?;
183 Ok(())
184}
185
186/// Dequeue and return the next item whose timestamp is ≤ `now`.
187///
188/// Removes the entry from the queue atomically before returning it.
189///
190/// TODO: no, this is not atomic currently
191///
192/// note: deleted accounts aren't removed from the resync queue so we need to
193/// check that (or does the caller deal with it?)
194///
195/// `since`: we actually want to pass in a cursor so we can efficiently skip
196/// over tombstones. we don't have to persist the cursor to disk, but the caller
197/// can hold it in memory over the app's lifetime so we only pay the tomb scan
198/// cost once on startup.
199pub fn dequeue_ready(
200 db: &DbRef,
201 now: u64,
202 since: Option<Vec<u8>>,
203) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> {
204 let prefix = key_prefix_all();
205
206 let lower_suffix = since.unwrap_or(vec![]);
207 let upper_suffix = key_ts_midfix(now);
208
209 let Some(guard) = db
210 .ks
211 .range(prefixed_range(&prefix, lower_suffix..upper_suffix))
212 .next()
213 else {
214 return Ok(None);
215 };
216
217 let (key_slice, val_slice) = guard.into_inner()?;
218 let key_bytes = key_slice.as_ref();
219 let (ts, did) = key_parse(key_bytes)?;
220 assert!(ts < now);
221 let key_str = String::from_utf8_lossy(key_bytes).into_owned();
222 let item = decode(val_slice.as_ref(), &key_str, did)?;
223 debug!(
224 did = item.did.as_str(),
225 ts,
226 reason = %item.retry_reason,
227 retry = item.retry_count,
228 "dequeue resync"
229 );
230 db.ks.remove(key_bytes)?;
231 let next_since = key_bytes
232 .get(prefix.len()..)
233 .expect("a resync queue key must start with the resync queue prefix");
234 Ok(Some((item, next_since.to_vec())))
235}
236
237/// Claim the next ready resync job, skipping DIDs that are currently in flight.
238///
239/// Scans the queue for the oldest entry whose timestamp is `< now` and whose
240/// DID is not in `busy`. On finding one it atomically (fjall batch):
241/// - removes the entry from the resync queue, and
242/// - writes `state = Resyncing` for the repo (preserving its account status).
243///
244/// Returns the claimed item and an updated `since` cursor (same semantics as
245/// [`dequeue_ready`]). Returns `None` if no claimable entry exists.
246pub fn claim_resync(
247 db: &DbRef,
248 now: u64,
249 since: Option<Vec<u8>>,
250 busy: &HashSet<Did<'_>>,
251) -> StorageResult<Option<(ResyncItem, Vec<u8>)>> {
252 let prefix = key_prefix_all();
253 let lower_suffix = since.unwrap_or_default();
254 let upper_suffix = key_ts_midfix(now);
255
256 for guard in db
257 .ks
258 .range(prefixed_range(&prefix, lower_suffix..upper_suffix))
259 {
260 let (key_slice, val_slice) = guard.into_inner()?;
261 let key_bytes = key_slice.as_ref();
262 let (_, did) = key_parse(key_bytes)?;
263
264 if busy.contains(&did) {
265 debug!(did = did.as_str(), "skip busy did in resync queue");
266 continue;
267 }
268
269 let key_str = String::from_utf8_lossy(key_bytes).into_owned();
270 let item = decode(val_slice.as_ref(), &key_str, did.clone())?;
271 let next_since = key_bytes[prefix.len()..].to_vec();
272
273 // Read current repo info to preserve account status across the transition.
274 let repo_key = repo::key(&did);
275 let new_info = match db.ks.get(&repo_key)? {
276 Some(b) => {
277 let rk = String::from_utf8_lossy(&repo_key).into_owned();
278 let mut info = repo::decode_repo_info(&b, &rk)?;
279 info.state = repo::RepoState::Resyncing;
280 info.error = None;
281 info
282 }
283 None => {
284 warn!(
285 did = did.as_str(),
286 "claiming resync job for did with no repo record; inserting as resyncing/active"
287 );
288 repo::RepoInfo {
289 state: repo::RepoState::Resyncing,
290 status: repo::AccountStatus::Active,
291 error: None,
292 }
293 }
294 };
295
296 // Atomically: remove from queue + write state=Resyncing.
297 let mut batch = db.database.batch();
298 batch.remove(&db.ks, key_bytes);
299 batch.insert(&db.ks, &repo_key, repo::encode_repo_info(&new_info));
300 batch.commit()?;
301
302 db.stats.resync_queue_depth.fetch_sub(1, Ordering::Relaxed);
303 trace!(
304 did = item.did.as_str(),
305 reason = %item.retry_reason,
306 retry = item.retry_count,
307 "claimed resync job"
308 );
309 return Ok(Some((item, next_since)));
310 }
311
312 Ok(None)
313}
314
315// ---------------------------------------------------------------------------
316// Tests
317// ---------------------------------------------------------------------------
318
319#[cfg(test)]
320mod tests {
321 use std::collections::HashSet;
322
323 use super::*;
324 use crate::storage::{open_temporary, repo};
325
326 fn did(s: &str) -> Did<'static> {
327 Did::new_owned(s).unwrap()
328 }
329
330 fn item(did_str: &str, retry_count: u16, reason: &str, cbor: &[u8]) -> ResyncItem {
331 ResyncItem {
332 did: did(did_str),
333 retry_count,
334 retry_reason: reason.to_owned(),
335 commit_cbor: cbor.to_vec(),
336 }
337 }
338
339 // --- key encoding ---
340
341 #[test]
342 fn key_structure() {
343 let k = key(0x0102030405060708, &did("did:web:example.com"));
344 let mut expected = b"rsq".to_vec();
345 expected.extend_from_slice(&0x0102030405060708u64.to_be_bytes());
346 expected.push(b'\0');
347 expected.extend_from_slice(b"did:web:example.com");
348 assert_eq!(k, expected);
349 }
350
351 #[test]
352 fn key_sorts_by_timestamp() {
353 let earlier = key(100, &did("did:web:example.com"));
354 let later = key(200, &did("did:web:example.com"));
355 assert!(earlier < later);
356 }
357
358 #[test]
359 fn key_same_timestamp_sorts_by_did() {
360 let a = key(100, &did("did:web:a.com"));
361 let b = key(100, &did("did:web:b.com"));
362 assert!(a < b);
363 }
364
365 #[test]
366 fn key_parse_roundtrips() {
367 let ts = 0xdeadbeefcafe1234u64;
368 let d = did("did:web:example.com");
369 let k = key(ts, &d);
370 let (parsed_ts, parsed_did) = key_parse(&k).unwrap();
371 assert_eq!(parsed_ts, ts);
372 assert_eq!(parsed_did, d);
373 }
374
375 #[test]
376 fn key_prefix_all_is_prefix_of_any_entry() {
377 let prefix_all = key_prefix_all();
378 let k = key(100, &did("did:web:example.com"));
379 assert!(k.starts_with(&prefix_all));
380 }
381
382 #[test]
383 fn key_ts_midfix_upper_bound_excludes_entries_at_that_ts() {
384 let prefix_all = key_prefix_all();
385 let entry = key(100, &did("did:web:example.com"));
386 let mut upper = prefix_all.clone();
387 upper.extend_from_slice(&key_ts_midfix(100));
388 assert!(entry >= upper);
389 }
390
391 #[test]
392 fn key_ts_midfix_upper_bound_includes_earlier_ts() {
393 let prefix_all = key_prefix_all();
394 let entry = key(99, &did("did:web:example.com"));
395 let mut upper = prefix_all.clone();
396 upper.extend_from_slice(&key_ts_midfix(100));
397 assert!(entry < upper);
398 }
399
400 #[test]
401 fn key_parse_returns_error_for_truncated_key() {
402 assert!(key_parse(b"rsq").is_err());
403 }
404
405 // --- encode / decode ---
406
407 #[test]
408 fn encode_decode_roundtrips() {
409 let original = item("did:web:example.com", 3, "detected gap", &[0xAB, 0xCD]);
410 let bytes = encode(&original);
411 let decoded = decode(&bytes, "test-key", did("did:web:example.com")).unwrap();
412 assert_eq!(decoded.retry_count, 3);
413 assert_eq!(decoded.retry_reason, "detected gap");
414 assert_eq!(decoded.commit_cbor, vec![0xAB, 0xCD]);
415 }
416
417 #[test]
418 fn encode_decode_empty_commit_cbor() {
419 let original = item("did:web:example.com", 0, "first attempt", &[]);
420 let bytes = encode(&original);
421 let decoded = decode(&bytes, "test-key", did("did:web:example.com")).unwrap();
422 assert_eq!(decoded.retry_count, 0);
423 assert_eq!(decoded.retry_reason, "first attempt");
424 assert!(decoded.commit_cbor.is_empty());
425 }
426
427 #[test]
428 fn decode_rejects_truncated_header() {
429 assert!(decode(&[0, 1, 2], "k", did("did:web:example.com")).is_err());
430 }
431
432 // --- enqueue / dequeue_ready ---
433
434 #[test]
435 fn dequeue_returns_none_on_empty_queue() {
436 let db = open_temporary().unwrap();
437 assert!(dequeue_ready(&db, 9999, None).unwrap().is_none());
438 }
439
440 #[test]
441 fn enqueue_and_dequeue_basic() {
442 let db = open_temporary().unwrap();
443 enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[1, 2, 3])).unwrap();
444
445 let (got, _cursor) = dequeue_ready(&db, 101, None).unwrap().unwrap();
446 assert_eq!(got.did.as_str(), "did:web:a.com");
447 assert_eq!(got.retry_reason, "backfill");
448 assert_eq!(got.commit_cbor, vec![1, 2, 3]);
449
450 assert!(dequeue_ready(&db, 101, None).unwrap().is_none());
451 }
452
453 #[test]
454 fn dequeue_excludes_items_at_or_after_now() {
455 let db = open_temporary().unwrap();
456 enqueue(&db, 100, &item("did:web:a.com", 0, "test", &[])).unwrap();
457
458 assert!(dequeue_ready(&db, 100, None).unwrap().is_none());
459 assert!(dequeue_ready(&db, 99, None).unwrap().is_none());
460 assert!(dequeue_ready(&db, 101, None).unwrap().is_some());
461 }
462
463 #[test]
464 fn dequeue_returns_oldest_entry_first() {
465 let db = open_temporary().unwrap();
466 enqueue(&db, 200, &item("did:web:b.com", 0, "later", &[])).unwrap();
467 enqueue(&db, 100, &item("did:web:a.com", 0, "earlier", &[])).unwrap();
468
469 let (first, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap();
470 assert_eq!(first.retry_reason, "earlier");
471
472 let (second, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap();
473 assert_eq!(second.retry_reason, "later");
474 }
475
476 #[test]
477 fn since_cursor_skips_over_tombstone_region() {
478 let db = open_temporary().unwrap();
479 enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap();
480 enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap();
481
482 let (first, cursor) = dequeue_ready(&db, 9999, None).unwrap().unwrap();
483 assert_eq!(first.retry_reason, "first");
484
485 enqueue(&db, 5, &item("did:web:late.com", 0, "late", &[])).unwrap();
486
487 let (second, _) = dequeue_ready(&db, 9999, Some(cursor)).unwrap().unwrap();
488 assert_eq!(second.retry_reason, "second");
489 }
490
491 // --- claim_resync ---
492
493 fn pending_repo(db: &DbRef, did_str: &str) {
494 repo::put_info(
495 db,
496 &did(did_str),
497 &repo::RepoInfo {
498 state: repo::RepoState::Pending,
499 status: repo::AccountStatus::Active,
500 error: None,
501 },
502 )
503 .unwrap();
504 }
505
506 #[test]
507 fn claim_resync_transitions_state_and_dequeues() {
508 let db = open_temporary().unwrap();
509 pending_repo(&db, "did:web:a.com");
510 enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[])).unwrap();
511
512 let (claimed, _cursor) = claim_resync(&db, 101, None, &HashSet::new())
513 .unwrap()
514 .unwrap();
515 assert_eq!(claimed.did.as_str(), "did:web:a.com");
516
517 assert!(dequeue_ready(&db, 9999, None).unwrap().is_none());
518
519 let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap();
520 assert_eq!(info.state, repo::RepoState::Resyncing);
521 }
522
523 #[test]
524 fn claim_resync_preserves_account_status() {
525 let db = open_temporary().unwrap();
526 repo::put_info(
527 &db,
528 &did("did:web:a.com"),
529 &repo::RepoInfo {
530 state: repo::RepoState::Pending,
531 status: repo::AccountStatus::Suspended,
532 error: None,
533 },
534 )
535 .unwrap();
536 enqueue(&db, 100, &item("did:web:a.com", 0, "backfill", &[])).unwrap();
537
538 claim_resync(&db, 101, None, &HashSet::new())
539 .unwrap()
540 .unwrap();
541
542 let (info, _) = repo::get(&db, &did("did:web:a.com")).unwrap().unwrap();
543 assert_eq!(info.status, repo::AccountStatus::Suspended);
544 }
545
546 #[test]
547 fn claim_resync_skips_busy_dids() {
548 let db = open_temporary().unwrap();
549 pending_repo(&db, "did:web:a.com");
550 pending_repo(&db, "did:web:b.com");
551 enqueue(&db, 100, &item("did:web:a.com", 0, "first", &[])).unwrap();
552 enqueue(&db, 101, &item("did:web:b.com", 0, "second", &[])).unwrap();
553
554 let mut busy: HashSet<Did<'static>> = HashSet::new();
555 busy.insert(did("did:web:a.com"));
556
557 let (claimed, _) = claim_resync(&db, 9999, None, &busy).unwrap().unwrap();
558 assert_eq!(claimed.did.as_str(), "did:web:b.com");
559 }
560
561 #[test]
562 fn claim_resync_returns_none_when_all_ready_are_busy() {
563 let db = open_temporary().unwrap();
564 pending_repo(&db, "did:web:a.com");
565 enqueue(&db, 100, &item("did:web:a.com", 0, "only", &[])).unwrap();
566
567 let mut busy: HashSet<Did<'static>> = HashSet::new();
568 busy.insert(did("did:web:a.com"));
569
570 assert!(claim_resync(&db, 9999, None, &busy).unwrap().is_none());
571 }
572
573 #[test]
574 fn consecutive_dequeues_drain_in_order() {
575 let db = open_temporary().unwrap();
576 enqueue(&db, 10, &item("did:web:a.com", 0, "first", &[])).unwrap();
577 enqueue(&db, 20, &item("did:web:b.com", 0, "second", &[])).unwrap();
578 enqueue(&db, 30, &item("did:web:c.com", 0, "third", &[])).unwrap();
579
580 let (a, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap();
581 let (b, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap();
582 let (c, _) = dequeue_ready(&db, 9999, None).unwrap().unwrap();
583 assert!(dequeue_ready(&db, 9999, None).unwrap().is_none());
584
585 assert_eq!(a.retry_reason, "first");
586 assert_eq!(b.retry_reason, "second");
587 assert_eq!(c.retry_reason, "third");
588 }
589}