at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::{self, keys};
2use crate::filter::FilterMode;
3use crate::ingest::stream::{Commit, SubscribeReposMessage};
4use crate::ingest::{BufferRx, IngestMessage};
5use crate::ops;
6use crate::resolver::{NoSigningKeyError, ResolverError};
7use crate::state::AppState;
8use crate::types::{AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoState, RepoStatus};
9
10use crate::db::refcount::RefcountedBatch;
11
12use jacquard_common::IntoStatic;
13use jacquard_common::cowstr::ToCowStr;
14use jacquard_common::types::crypto::PublicKey;
15use jacquard_common::types::did::Did;
16use jacquard_repo::error::CommitError;
17use miette::{Context, Diagnostic, IntoDiagnostic, Result};
18use rand::Rng;
19use smol_str::ToSmolStr;
20use std::collections::hash_map::DefaultHasher;
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23use thiserror::Error;
24use tokio::sync::mpsc;
25use tracing::{debug, error, info, trace, warn};
26
27#[derive(Debug, Diagnostic, Error)]
28enum IngestError {
29 #[error("{0}")]
30 Generic(miette::Report),
31
32 #[error(transparent)]
33 #[diagnostic(transparent)]
34 Resolver(#[from] ResolverError),
35
36 #[error(transparent)]
37 #[diagnostic(transparent)]
38 Commit(#[from] CommitError),
39
40 #[error(transparent)]
41 #[diagnostic(transparent)]
42 NoSigningKey(#[from] NoSigningKeyError),
43}
44
45impl From<miette::Report> for IngestError {
46 fn from(report: miette::Report) -> Self {
47 IngestError::Generic(report)
48 }
49}
50
51#[derive(Debug)]
52enum RepoProcessResult<'s, 'c> {
53 Deleted,
54 Syncing(Option<&'c Commit<'c>>),
55 Ok(RepoState<'s>),
56}
57
58pub struct FirehoseWorker {
59 state: Arc<AppState>,
60 rx: BufferRx,
61 verify_signatures: bool,
62 ephemeral: bool,
63 num_shards: usize,
64}
65
66struct WorkerContext<'a> {
67 verify_signatures: bool,
68 ephemeral: bool,
69 state: &'a AppState,
70 batch: RefcountedBatch<'a>,
71 added_blocks: &'a mut i64,
72 records_delta: &'a mut i64,
73 broadcast_events: &'a mut Vec<BroadcastEvent>,
74 handle: &'a tokio::runtime::Handle,
75}
76
77impl FirehoseWorker {
78 pub fn new(
79 state: Arc<AppState>,
80 rx: BufferRx,
81 verify_signatures: bool,
82 ephemeral: bool,
83 num_shards: usize,
84 ) -> Self {
85 Self {
86 state,
87 rx,
88 verify_signatures,
89 ephemeral,
90 num_shards,
91 }
92 }
93
94 // starts the worker threads and the main dispatch loop
95 // the dispatch loop reads from the firehose channel and
96 // distributes messages to shards based on the hash of the DID
97 pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> {
98 let mut shards = Vec::with_capacity(self.num_shards);
99
100 for i in 0..self.num_shards {
101 let (tx, rx) = mpsc::unbounded_channel();
102 shards.push(tx);
103
104 let state = self.state.clone();
105 let verify = self.verify_signatures;
106 let ephemeral = self.ephemeral;
107 let handle = handle.clone();
108
109 std::thread::Builder::new()
110 .name(format!("ingest-shard-{i}"))
111 .spawn(move || {
112 Self::shard(i, rx, state, verify, ephemeral, handle);
113 })
114 .into_diagnostic()?;
115 }
116
117 info!("started {} ingest shards", self.num_shards);
118
119 let _g = handle.enter();
120
121 // dispatch loop
122 while let Some(msg) = self.rx.blocking_recv() {
123 let did = match &msg {
124 IngestMessage::Firehose(m) => match m {
125 SubscribeReposMessage::Commit(c) => &c.repo,
126 SubscribeReposMessage::Identity(i) => &i.did,
127 SubscribeReposMessage::Account(a) => &a.did,
128 SubscribeReposMessage::Sync(s) => &s.did,
129 _ => continue,
130 },
131 IngestMessage::BackfillFinished(did) => did,
132 };
133
134 let mut hasher = DefaultHasher::new();
135 did.hash(&mut hasher);
136 let hash = hasher.finish();
137 let shard_idx = (hash as usize) % self.num_shards;
138
139 if let Err(e) = shards[shard_idx].send(msg) {
140 error!(shard = shard_idx, err = %e, "failed to send message to shard");
141 // break if send fails; receiver likely closed
142 break;
143 }
144 }
145
146 error!("firehose worker dispatcher shutting down");
147
148 Ok(())
149 }
150
151 #[inline(always)]
152 fn shard(
153 id: usize,
154 mut rx: mpsc::UnboundedReceiver<IngestMessage>,
155 state: Arc<AppState>,
156 verify_signatures: bool,
157 ephemeral: bool,
158 handle: tokio::runtime::Handle,
159 ) {
160 let _guard = handle.enter();
161 debug!(shard = id, "shard started");
162
163 let mut broadcast_events = Vec::new();
164
165 while let Some(msg) = rx.blocking_recv() {
166 let batch = RefcountedBatch::new(&state.db);
167 broadcast_events.clear();
168
169 let mut added_blocks = 0;
170 let mut records_delta = 0;
171
172 let mut ctx = WorkerContext {
173 state: &state,
174 batch,
175 added_blocks: &mut added_blocks,
176 records_delta: &mut records_delta,
177 broadcast_events: &mut broadcast_events,
178 handle: &handle,
179 verify_signatures,
180 ephemeral,
181 };
182
183 match msg {
184 IngestMessage::BackfillFinished(did) => {
185 debug!(did = %did, "backfill finished, verifying state and draining buffer");
186
187 // load repo state to transition status and draining buffer
188 let repo_key = keys::repo_key(&did);
189 if let Ok(Some(state_bytes)) = state.db.repos.get(&repo_key).into_diagnostic() {
190 match crate::db::deser_repo_state(&state_bytes) {
191 Ok(repo_state) => {
192 let repo_state = repo_state.into_static();
193
194 match Self::drain_resync_buffer(&mut ctx, &did, repo_state) {
195 Ok(res) => match res {
196 RepoProcessResult::Ok(s) => {
197 // TODO: there might be a race condition here where we get a new commit
198 // while the resync buffer is being drained, we should handle that probably
199 // but also it should still be fine since we'll sync eventually anyway
200 let res = ops::update_repo_status(
201 ctx.batch.batch_mut(),
202 &state.db,
203 &did,
204 s,
205 RepoStatus::Synced,
206 );
207 if let Err(e) = res {
208 // this can only fail if serde retry fails which would be really weird
209 error!(did = %did, err = %e, "failed to transition to synced");
210 }
211 }
212 // we don't have to handle this since drain_resync_buffer doesn't delete
213 // the commits from the resync buffer so they will get retried later
214 RepoProcessResult::Syncing(_) => {}
215 RepoProcessResult::Deleted => {}
216 },
217 Err(e) => {
218 error!(did = %did, err = %e, "failed to drain resync buffer")
219 }
220 };
221 }
222 Err(e) => error!(did = %did, err = %e, "failed to deser repo state"),
223 }
224 }
225 }
226 IngestMessage::Firehose(msg) => {
227 let (did, seq) = match &msg {
228 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq),
229 SubscribeReposMessage::Identity(i) => (&i.did, i.seq),
230 SubscribeReposMessage::Account(a) => (&a.did, a.seq),
231 SubscribeReposMessage::Sync(s) => (&s.did, s.seq),
232 _ => continue,
233 };
234
235 match Self::process_message(&mut ctx, &msg, did) {
236 Ok(RepoProcessResult::Ok(_)) => {}
237 Ok(RepoProcessResult::Deleted) => {}
238 Ok(RepoProcessResult::Syncing(Some(commit))) => {
239 if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) {
240 error!(did = %did, err = %e, "failed to persist commit to resync_buffer");
241 }
242 }
243 Ok(RepoProcessResult::Syncing(None)) => {}
244 Err(e) => {
245 if let IngestError::Generic(e) = &e {
246 db::check_poisoned_report(e);
247 }
248 error!(did = %did, err = %e, "error processing message");
249 if Self::check_if_retriable_failure(&e) {
250 if let SubscribeReposMessage::Commit(commit) = &msg {
251 if let Err(e) =
252 ops::persist_to_resync_buffer(&state.db, did, commit)
253 {
254 error!(
255 did = %did, err = %e,
256 "failed to persist commit to resync_buffer"
257 );
258 }
259 }
260 }
261 }
262 }
263
264 state
265 .cur_firehose
266 .store(seq, std::sync::atomic::Ordering::SeqCst);
267 }
268 }
269
270 if let Err(e) = ctx.batch.commit() {
271 error!(shard = id, err = %e, "failed to commit batch");
272 }
273
274 if added_blocks > 0 {
275 state.db.update_count("blocks", added_blocks);
276 }
277 if records_delta != 0 {
278 state.db.update_count("records", records_delta);
279 }
280 for evt in broadcast_events.drain(..) {
281 let _ = state.db.event_tx.send(evt);
282 }
283
284 state.db.inner.persist(fjall::PersistMode::Buffer).ok();
285 }
286 }
287
288 // dont retry commit or sync on key fetch errors
289 // since we'll just try again later if we get commit or sync again
290 fn check_if_retriable_failure(e: &IngestError) -> bool {
291 matches!(
292 e,
293 IngestError::Generic(_)
294 | IngestError::Resolver(ResolverError::Ratelimited)
295 | IngestError::Resolver(ResolverError::Transport(_))
296 )
297 }
298
299 fn process_message<'s, 'c>(
300 ctx: &mut WorkerContext,
301 msg: &'c SubscribeReposMessage<'static>,
302 did: &Did,
303 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
304 let check_repo_res = Self::check_repo_state(ctx, did, msg)?;
305 let mut repo_state = match check_repo_res {
306 RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => {
307 return Ok(check_repo_res);
308 }
309 RepoProcessResult::Ok(s) => s,
310 };
311
312 match msg {
313 SubscribeReposMessage::Commit(commit) => {
314 trace!(did = %did, "processing buffered commit");
315
316 return Self::process_commit(ctx, did, repo_state, commit);
317 }
318 SubscribeReposMessage::Sync(sync) => {
319 debug!(did = %did, "processing buffered sync");
320
321 Self::refresh_doc(ctx, &mut repo_state, did)?;
322
323 match ops::verify_sync_event(
324 sync.blocks.as_ref(),
325 Self::fetch_key(ctx, did)?.as_ref(),
326 ) {
327 Ok((root, rev)) => {
328 if let Some(current_data) = &repo_state.data {
329 if current_data == &root.to_ipld().expect("valid cid") {
330 debug!(did = %did, "skipping noop sync");
331 return Ok(RepoProcessResult::Ok(repo_state));
332 }
333 }
334
335 if let Some(current_rev) = &repo_state.rev {
336 if rev.as_str() <= current_rev.to_tid().as_str() {
337 debug!(did = %did, "skipping replayed sync");
338 return Ok(RepoProcessResult::Ok(repo_state));
339 }
340 }
341
342 warn!(did = %did, "sync event, triggering backfill");
343 let mut batch = ctx.state.db.inner.batch();
344 repo_state = ops::update_repo_status(
345 &mut batch,
346 &ctx.state.db,
347 did,
348 repo_state,
349 RepoStatus::Backfilling,
350 )?;
351 batch.commit().into_diagnostic()?;
352 ctx.state
353 .db
354 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending);
355 ctx.state.notify_backfill();
356 return Ok(RepoProcessResult::Ok(repo_state));
357 }
358 Err(e) => {
359 error!(did = %did, err = %e, "failed to process sync event");
360 }
361 }
362 }
363 SubscribeReposMessage::Identity(identity) => {
364 debug!(did = %did, "processing buffered identity");
365
366 if identity.handle.is_none() {
367 // we invalidate only if no handle is sent since its like a
368 // "invalidate your caches" message then basically
369 ctx.state.resolver.invalidate_sync(did);
370 let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?;
371 repo_state.update_from_doc(doc);
372 }
373
374 let handle = identity.handle.as_ref().map(|h| h.clone());
375 repo_state.handle = handle.or(repo_state.handle);
376 ctx.batch.batch_mut().insert(
377 &ctx.state.db.repos,
378 keys::repo_key(did),
379 crate::db::ser_repo_state(&repo_state)?,
380 );
381
382 let evt = IdentityEvt {
383 did: did.clone().into_static(),
384 handle: repo_state.handle.clone(),
385 };
386 ctx.broadcast_events
387 .push(ops::make_identity_event(&ctx.state.db, evt));
388 }
389 SubscribeReposMessage::Account(account) => {
390 debug!(did = %did, "processing buffered account");
391 let evt = AccountEvt {
392 did: did.clone().into_static(),
393 active: account.active,
394 status: account.status.as_ref().map(|s| s.to_cowstr().into_static()),
395 };
396
397 Self::refresh_doc(ctx, &mut repo_state, did)?;
398
399 if !account.active {
400 use crate::ingest::stream::AccountStatus;
401 match &account.status {
402 Some(AccountStatus::Deleted) => {
403 debug!(did = %did, "account deleted, wiping data");
404 crate::ops::delete_repo(
405 &mut ctx.batch,
406 &ctx.state.db,
407 did,
408 &repo_state,
409 )?;
410 return Ok(RepoProcessResult::Deleted);
411 }
412 status => {
413 let target_status = match status {
414 Some(status) => match status {
415 AccountStatus::Deleted => {
416 unreachable!("deleted account status is handled before")
417 }
418 AccountStatus::Takendown => RepoStatus::Takendown,
419 AccountStatus::Suspended => RepoStatus::Suspended,
420 AccountStatus::Deactivated => RepoStatus::Deactivated,
421 AccountStatus::Throttled => {
422 RepoStatus::Error("throttled".into())
423 }
424 AccountStatus::Desynchronized => {
425 RepoStatus::Error("desynchronized".into())
426 }
427 AccountStatus::Other(s) => {
428 warn!(
429 did = %did, status = %s,
430 "unknown account status, will put in error state"
431 );
432 RepoStatus::Error(s.to_smolstr())
433 }
434 },
435 None => {
436 warn!(did = %did, "account inactive but no status provided");
437 RepoStatus::Error("unknown".into())
438 }
439 };
440
441 if repo_state.status == target_status {
442 debug!(did = %did, ?target_status, "account status unchanged");
443 return Ok(RepoProcessResult::Ok(repo_state));
444 }
445
446 repo_state = ops::update_repo_status(
447 ctx.batch.batch_mut(),
448 &ctx.state.db,
449 did,
450 repo_state,
451 target_status,
452 )?;
453 ctx.state
454 .db
455 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None));
456 }
457 }
458 } else {
459 // normally we would initiate backfill here
460 // but we don't have to do anything because:
461 // 1. we handle changing repo status to Synced before this (in check repo state)
462 // 2. initiating backfilling is also handled there
463 }
464 ctx.broadcast_events
465 .push(ops::make_account_event(&ctx.state.db, evt));
466 }
467 _ => {
468 warn!(did = %did, "unknown message type in buffer");
469 }
470 }
471
472 Ok(RepoProcessResult::Ok(repo_state))
473 }
474
475 fn process_commit<'c, 'ns, 's: 'ns>(
476 ctx: &mut WorkerContext,
477 did: &Did,
478 repo_state: RepoState<'s>,
479 commit: &'c Commit<'c>,
480 ) -> Result<RepoProcessResult<'ns, 'c>, IngestError> {
481 // check for replayed events (already seen revision)
482 if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) {
483 debug!(
484 did = %did,
485 commit_rev = %commit.rev,
486 state_rev = %repo_state.rev.as_ref().map(|r| r.to_tid()).expect("we checked in if"),
487 "skipping replayed event"
488 );
489 return Ok(RepoProcessResult::Ok(repo_state));
490 }
491
492 if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data)
493 && repo
494 != &prev_commit
495 .0
496 .to_ipld()
497 .into_diagnostic()
498 .wrap_err("invalid cid from relay")?
499 {
500 warn!(
501 did = %did,
502 repo = %repo,
503 prev_commit = %prev_commit.0,
504 "gap detected, triggering backfill"
505 );
506
507 let mut batch = ctx.state.db.inner.batch();
508 let _repo_state = ops::update_repo_status(
509 &mut batch,
510 &ctx.state.db,
511 did,
512 repo_state,
513 RepoStatus::Backfilling,
514 )?;
515 batch.commit().into_diagnostic()?;
516 ctx.state.db.update_gauge_diff(
517 &crate::types::GaugeState::Synced,
518 &crate::types::GaugeState::Pending,
519 );
520 ctx.state.notify_backfill();
521 return Ok(RepoProcessResult::Syncing(Some(commit)));
522 }
523
524 let signing_key = Self::fetch_key(ctx, did)?;
525 let res = ops::apply_commit(
526 &mut ctx.batch,
527 &ctx.state.db,
528 repo_state,
529 &commit,
530 signing_key.as_ref(),
531 &ctx.state.filter.load(),
532 ctx.ephemeral,
533 )?;
534 let repo_state = res.repo_state;
535 *ctx.added_blocks += res.blocks_count;
536 *ctx.records_delta += res.records_delta;
537 ctx.broadcast_events.push(BroadcastEvent::Persisted(
538 ctx.state
539 .db
540 .next_event_id
541 .load(std::sync::atomic::Ordering::SeqCst)
542 - 1,
543 ));
544
545 Ok(RepoProcessResult::Ok(repo_state))
546 }
547
548 // checks the current state of the repo in the database
549 // if the repo is new, creates initial state and triggers backfill
550 // handles transitions between states (backfilling -> synced, etc)
551 fn check_repo_state<'s, 'c>(
552 ctx: &mut WorkerContext,
553 did: &Did<'_>,
554 msg: &'c SubscribeReposMessage<'static>,
555 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
556 let repo_key = keys::repo_key(&did);
557 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else {
558 let filter = ctx.state.filter.load();
559
560 if filter.mode == FilterMode::Filter && !filter.signals.is_empty() {
561 let commit = match msg {
562 SubscribeReposMessage::Commit(c) => c,
563 _ => return Ok(RepoProcessResult::Syncing(None)),
564 };
565 let touches_signal = commit.ops.iter().any(|op| {
566 op.path
567 .split_once('/')
568 .map(|(col, _)| {
569 let m = filter.matches_signal(col);
570 debug!(
571 did = %did, path = %op.path, col = %col, signals = ?filter.signals, matched = m,
572 "signal check"
573 );
574 m
575 })
576 .unwrap_or(false)
577 });
578 if !touches_signal {
579 trace!(did = %did, "dropping commit, no signal-matching ops");
580 return Ok(RepoProcessResult::Syncing(None));
581 }
582 }
583
584 debug!(did = %did, "discovered new account from firehose, queueing backfill");
585
586 let repo_state = RepoState::untracked(rand::rng().next_u64());
587 let mut batch = ctx.state.db.inner.batch();
588 batch.insert(
589 &ctx.state.db.repos,
590 &repo_key,
591 crate::db::ser_repo_state(&repo_state)?,
592 );
593 batch.insert(
594 &ctx.state.db.pending,
595 keys::pending_key(repo_state.index_id),
596 &repo_key,
597 );
598 batch.commit().into_diagnostic()?;
599
600 ctx.state.db.update_count("repos", 1);
601 ctx.state.db.update_gauge_diff(
602 &crate::types::GaugeState::Synced,
603 &crate::types::GaugeState::Pending,
604 );
605
606 ctx.state.notify_backfill();
607
608 return Ok(RepoProcessResult::Syncing(None));
609 };
610 let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static();
611
612 if !repo_state.tracked && repo_state.status != RepoStatus::Backfilling {
613 trace!(did = %did, "ignoring active status as it is explicitly untracked");
614 return Ok(RepoProcessResult::Syncing(None));
615 }
616
617 // if we are backfilling or it is new, DON'T mark it as synced yet
618 // the backfill worker will do that when it finishes
619 match &repo_state.status {
620 RepoStatus::Synced => {
621 // lazy drain: if there are buffered commits, drain them now
622 if ops::has_buffered_commits(&ctx.state.db, did) {
623 Self::drain_resync_buffer(ctx, did, repo_state)
624 } else {
625 Ok(RepoProcessResult::Ok(repo_state))
626 }
627 }
628 RepoStatus::Backfilling | RepoStatus::Error(_) => {
629 debug!(
630 did = %did, status = ?repo_state.status,
631 "ignoring active status"
632 );
633 Ok(RepoProcessResult::Syncing(None))
634 }
635 RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => {
636 // if it was in deactivated/takendown/suspended state, we can mark it as synced
637 // because we are receiving live events now
638 // UNLESS it is an account status event that keeps it deactivated
639 if let SubscribeReposMessage::Account(acc) = msg {
640 if !acc.active {
641 return Ok(RepoProcessResult::Ok(repo_state));
642 }
643 }
644 repo_state = ops::update_repo_status(
645 ctx.batch.batch_mut(),
646 &ctx.state.db,
647 did,
648 repo_state,
649 RepoStatus::Synced,
650 )?;
651 ctx.state.db.update_gauge_diff(
652 &crate::types::GaugeState::Resync(None),
653 &crate::types::GaugeState::Synced,
654 );
655 Ok(RepoProcessResult::Ok(repo_state))
656 }
657 }
658 }
659
660 fn drain_resync_buffer<'s>(
661 ctx: &mut WorkerContext,
662 did: &Did,
663 mut repo_state: RepoState<'s>,
664 ) -> Result<RepoProcessResult<'s, 'static>, IngestError> {
665 let prefix = keys::resync_buffer_prefix(did);
666
667 for guard in ctx.state.db.resync_buffer.prefix(&prefix) {
668 let (key, value) = guard.into_inner().into_diagnostic()?;
669 let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?;
670
671 let res = Self::process_commit(ctx, did, repo_state, &commit);
672 let res = match res {
673 Ok(r) => r,
674 Err(e) => {
675 if !Self::check_if_retriable_failure(&e) {
676 ctx.batch
677 .batch_mut()
678 .remove(&ctx.state.db.resync_buffer, key);
679 }
680 return Err(e);
681 }
682 };
683 match res {
684 RepoProcessResult::Ok(rs) => {
685 ctx.batch
686 .batch_mut()
687 .remove(&ctx.state.db.resync_buffer, key);
688 repo_state = rs;
689 }
690 RepoProcessResult::Syncing(_) => {
691 return Ok(RepoProcessResult::Syncing(None));
692 }
693 RepoProcessResult::Deleted => {
694 ctx.batch
695 .batch_mut()
696 .remove(&ctx.state.db.resync_buffer, key);
697 return Ok(RepoProcessResult::Deleted);
698 }
699 }
700 }
701
702 Ok(RepoProcessResult::Ok(repo_state))
703 }
704
705 // refreshes the handle, pds url and signing key of a did
706 fn refresh_doc(
707 ctx: &mut WorkerContext,
708 repo_state: &mut RepoState,
709 did: &Did,
710 ) -> Result<(), IngestError> {
711 ctx.state.resolver.invalidate_sync(did);
712 let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?;
713 repo_state.update_from_doc(doc);
714 ctx.batch.batch_mut().insert(
715 &ctx.state.db.repos,
716 keys::repo_key(did),
717 crate::db::ser_repo_state(&repo_state)?,
718 );
719 Ok(())
720 }
721
722 fn fetch_key(
723 ctx: &WorkerContext,
724 did: &Did,
725 ) -> Result<Option<PublicKey<'static>>, IngestError> {
726 if ctx.verify_signatures {
727 let key = ctx
728 .handle
729 .block_on(ctx.state.resolver.resolve_signing_key(did))?;
730 Ok(Some(key))
731 } else {
732 Ok(None)
733 }
734 }
735}