at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 735 lines 30 kB view raw
1use crate::db::{self, keys}; 2use crate::filter::FilterMode; 3use crate::ingest::stream::{Commit, SubscribeReposMessage}; 4use crate::ingest::{BufferRx, IngestMessage}; 5use crate::ops; 6use crate::resolver::{NoSigningKeyError, ResolverError}; 7use crate::state::AppState; 8use crate::types::{AccountEvt, BroadcastEvent, GaugeState, IdentityEvt, RepoState, RepoStatus}; 9 10use crate::db::refcount::RefcountedBatch; 11 12use jacquard_common::IntoStatic; 13use jacquard_common::cowstr::ToCowStr; 14use jacquard_common::types::crypto::PublicKey; 15use jacquard_common::types::did::Did; 16use jacquard_repo::error::CommitError; 17use miette::{Context, Diagnostic, IntoDiagnostic, Result}; 18use rand::Rng; 19use smol_str::ToSmolStr; 20use std::collections::hash_map::DefaultHasher; 21use std::hash::{Hash, Hasher}; 22use std::sync::Arc; 23use thiserror::Error; 24use tokio::sync::mpsc; 25use tracing::{debug, error, info, trace, warn}; 26 27#[derive(Debug, Diagnostic, Error)] 28enum IngestError { 29 #[error("{0}")] 30 Generic(miette::Report), 31 32 #[error(transparent)] 33 #[diagnostic(transparent)] 34 Resolver(#[from] ResolverError), 35 36 #[error(transparent)] 37 #[diagnostic(transparent)] 38 Commit(#[from] CommitError), 39 40 #[error(transparent)] 41 #[diagnostic(transparent)] 42 NoSigningKey(#[from] NoSigningKeyError), 43} 44 45impl From<miette::Report> for IngestError { 46 fn from(report: miette::Report) -> Self { 47 IngestError::Generic(report) 48 } 49} 50 51#[derive(Debug)] 52enum RepoProcessResult<'s, 'c> { 53 Deleted, 54 Syncing(Option<&'c Commit<'c>>), 55 Ok(RepoState<'s>), 56} 57 58pub struct FirehoseWorker { 59 state: Arc<AppState>, 60 rx: BufferRx, 61 verify_signatures: bool, 62 ephemeral: bool, 63 num_shards: usize, 64} 65 66struct WorkerContext<'a> { 67 verify_signatures: bool, 68 ephemeral: bool, 69 state: &'a AppState, 70 batch: RefcountedBatch<'a>, 71 added_blocks: &'a mut i64, 72 records_delta: &'a mut i64, 73 broadcast_events: &'a mut Vec<BroadcastEvent>, 74 handle: &'a tokio::runtime::Handle, 75} 76 77impl FirehoseWorker { 78 pub fn new( 79 state: Arc<AppState>, 80 rx: BufferRx, 81 verify_signatures: bool, 82 ephemeral: bool, 83 num_shards: usize, 84 ) -> Self { 85 Self { 86 state, 87 rx, 88 verify_signatures, 89 ephemeral, 90 num_shards, 91 } 92 } 93 94 // starts the worker threads and the main dispatch loop 95 // the dispatch loop reads from the firehose channel and 96 // distributes messages to shards based on the hash of the DID 97 pub fn run(mut self, handle: tokio::runtime::Handle) -> Result<()> { 98 let mut shards = Vec::with_capacity(self.num_shards); 99 100 for i in 0..self.num_shards { 101 let (tx, rx) = mpsc::unbounded_channel(); 102 shards.push(tx); 103 104 let state = self.state.clone(); 105 let verify = self.verify_signatures; 106 let ephemeral = self.ephemeral; 107 let handle = handle.clone(); 108 109 std::thread::Builder::new() 110 .name(format!("ingest-shard-{i}")) 111 .spawn(move || { 112 Self::shard(i, rx, state, verify, ephemeral, handle); 113 }) 114 .into_diagnostic()?; 115 } 116 117 info!("started {} ingest shards", self.num_shards); 118 119 let _g = handle.enter(); 120 121 // dispatch loop 122 while let Some(msg) = self.rx.blocking_recv() { 123 let did = match &msg { 124 IngestMessage::Firehose(m) => match m { 125 SubscribeReposMessage::Commit(c) => &c.repo, 126 SubscribeReposMessage::Identity(i) => &i.did, 127 SubscribeReposMessage::Account(a) => &a.did, 128 SubscribeReposMessage::Sync(s) => &s.did, 129 _ => continue, 130 }, 131 IngestMessage::BackfillFinished(did) => did, 132 }; 133 134 let mut hasher = DefaultHasher::new(); 135 did.hash(&mut hasher); 136 let hash = hasher.finish(); 137 let shard_idx = (hash as usize) % self.num_shards; 138 139 if let Err(e) = shards[shard_idx].send(msg) { 140 error!(shard = shard_idx, err = %e, "failed to send message to shard"); 141 // break if send fails; receiver likely closed 142 break; 143 } 144 } 145 146 error!("firehose worker dispatcher shutting down"); 147 148 Ok(()) 149 } 150 151 #[inline(always)] 152 fn shard( 153 id: usize, 154 mut rx: mpsc::UnboundedReceiver<IngestMessage>, 155 state: Arc<AppState>, 156 verify_signatures: bool, 157 ephemeral: bool, 158 handle: tokio::runtime::Handle, 159 ) { 160 let _guard = handle.enter(); 161 debug!(shard = id, "shard started"); 162 163 let mut broadcast_events = Vec::new(); 164 165 while let Some(msg) = rx.blocking_recv() { 166 let batch = RefcountedBatch::new(&state.db); 167 broadcast_events.clear(); 168 169 let mut added_blocks = 0; 170 let mut records_delta = 0; 171 172 let mut ctx = WorkerContext { 173 state: &state, 174 batch, 175 added_blocks: &mut added_blocks, 176 records_delta: &mut records_delta, 177 broadcast_events: &mut broadcast_events, 178 handle: &handle, 179 verify_signatures, 180 ephemeral, 181 }; 182 183 match msg { 184 IngestMessage::BackfillFinished(did) => { 185 debug!(did = %did, "backfill finished, verifying state and draining buffer"); 186 187 // load repo state to transition status and draining buffer 188 let repo_key = keys::repo_key(&did); 189 if let Ok(Some(state_bytes)) = state.db.repos.get(&repo_key).into_diagnostic() { 190 match crate::db::deser_repo_state(&state_bytes) { 191 Ok(repo_state) => { 192 let repo_state = repo_state.into_static(); 193 194 match Self::drain_resync_buffer(&mut ctx, &did, repo_state) { 195 Ok(res) => match res { 196 RepoProcessResult::Ok(s) => { 197 // TODO: there might be a race condition here where we get a new commit 198 // while the resync buffer is being drained, we should handle that probably 199 // but also it should still be fine since we'll sync eventually anyway 200 let res = ops::update_repo_status( 201 ctx.batch.batch_mut(), 202 &state.db, 203 &did, 204 s, 205 RepoStatus::Synced, 206 ); 207 if let Err(e) = res { 208 // this can only fail if serde retry fails which would be really weird 209 error!(did = %did, err = %e, "failed to transition to synced"); 210 } 211 } 212 // we don't have to handle this since drain_resync_buffer doesn't delete 213 // the commits from the resync buffer so they will get retried later 214 RepoProcessResult::Syncing(_) => {} 215 RepoProcessResult::Deleted => {} 216 }, 217 Err(e) => { 218 error!(did = %did, err = %e, "failed to drain resync buffer") 219 } 220 }; 221 } 222 Err(e) => error!(did = %did, err = %e, "failed to deser repo state"), 223 } 224 } 225 } 226 IngestMessage::Firehose(msg) => { 227 let (did, seq) = match &msg { 228 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), 229 SubscribeReposMessage::Identity(i) => (&i.did, i.seq), 230 SubscribeReposMessage::Account(a) => (&a.did, a.seq), 231 SubscribeReposMessage::Sync(s) => (&s.did, s.seq), 232 _ => continue, 233 }; 234 235 match Self::process_message(&mut ctx, &msg, did) { 236 Ok(RepoProcessResult::Ok(_)) => {} 237 Ok(RepoProcessResult::Deleted) => {} 238 Ok(RepoProcessResult::Syncing(Some(commit))) => { 239 if let Err(e) = ops::persist_to_resync_buffer(&state.db, did, commit) { 240 error!(did = %did, err = %e, "failed to persist commit to resync_buffer"); 241 } 242 } 243 Ok(RepoProcessResult::Syncing(None)) => {} 244 Err(e) => { 245 if let IngestError::Generic(e) = &e { 246 db::check_poisoned_report(e); 247 } 248 error!(did = %did, err = %e, "error processing message"); 249 if Self::check_if_retriable_failure(&e) { 250 if let SubscribeReposMessage::Commit(commit) = &msg { 251 if let Err(e) = 252 ops::persist_to_resync_buffer(&state.db, did, commit) 253 { 254 error!( 255 did = %did, err = %e, 256 "failed to persist commit to resync_buffer" 257 ); 258 } 259 } 260 } 261 } 262 } 263 264 state 265 .cur_firehose 266 .store(seq, std::sync::atomic::Ordering::SeqCst); 267 } 268 } 269 270 if let Err(e) = ctx.batch.commit() { 271 error!(shard = id, err = %e, "failed to commit batch"); 272 } 273 274 if added_blocks > 0 { 275 state.db.update_count("blocks", added_blocks); 276 } 277 if records_delta != 0 { 278 state.db.update_count("records", records_delta); 279 } 280 for evt in broadcast_events.drain(..) { 281 let _ = state.db.event_tx.send(evt); 282 } 283 284 state.db.inner.persist(fjall::PersistMode::Buffer).ok(); 285 } 286 } 287 288 // dont retry commit or sync on key fetch errors 289 // since we'll just try again later if we get commit or sync again 290 fn check_if_retriable_failure(e: &IngestError) -> bool { 291 matches!( 292 e, 293 IngestError::Generic(_) 294 | IngestError::Resolver(ResolverError::Ratelimited) 295 | IngestError::Resolver(ResolverError::Transport(_)) 296 ) 297 } 298 299 fn process_message<'s, 'c>( 300 ctx: &mut WorkerContext, 301 msg: &'c SubscribeReposMessage<'static>, 302 did: &Did, 303 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 304 let check_repo_res = Self::check_repo_state(ctx, did, msg)?; 305 let mut repo_state = match check_repo_res { 306 RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => { 307 return Ok(check_repo_res); 308 } 309 RepoProcessResult::Ok(s) => s, 310 }; 311 312 match msg { 313 SubscribeReposMessage::Commit(commit) => { 314 trace!(did = %did, "processing buffered commit"); 315 316 return Self::process_commit(ctx, did, repo_state, commit); 317 } 318 SubscribeReposMessage::Sync(sync) => { 319 debug!(did = %did, "processing buffered sync"); 320 321 Self::refresh_doc(ctx, &mut repo_state, did)?; 322 323 match ops::verify_sync_event( 324 sync.blocks.as_ref(), 325 Self::fetch_key(ctx, did)?.as_ref(), 326 ) { 327 Ok((root, rev)) => { 328 if let Some(current_data) = &repo_state.data { 329 if current_data == &root.to_ipld().expect("valid cid") { 330 debug!(did = %did, "skipping noop sync"); 331 return Ok(RepoProcessResult::Ok(repo_state)); 332 } 333 } 334 335 if let Some(current_rev) = &repo_state.rev { 336 if rev.as_str() <= current_rev.to_tid().as_str() { 337 debug!(did = %did, "skipping replayed sync"); 338 return Ok(RepoProcessResult::Ok(repo_state)); 339 } 340 } 341 342 warn!(did = %did, "sync event, triggering backfill"); 343 let mut batch = ctx.state.db.inner.batch(); 344 repo_state = ops::update_repo_status( 345 &mut batch, 346 &ctx.state.db, 347 did, 348 repo_state, 349 RepoStatus::Backfilling, 350 )?; 351 batch.commit().into_diagnostic()?; 352 ctx.state 353 .db 354 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Pending); 355 ctx.state.notify_backfill(); 356 return Ok(RepoProcessResult::Ok(repo_state)); 357 } 358 Err(e) => { 359 error!(did = %did, err = %e, "failed to process sync event"); 360 } 361 } 362 } 363 SubscribeReposMessage::Identity(identity) => { 364 debug!(did = %did, "processing buffered identity"); 365 366 if identity.handle.is_none() { 367 // we invalidate only if no handle is sent since its like a 368 // "invalidate your caches" message then basically 369 ctx.state.resolver.invalidate_sync(did); 370 let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 371 repo_state.update_from_doc(doc); 372 } 373 374 let handle = identity.handle.as_ref().map(|h| h.clone()); 375 repo_state.handle = handle.or(repo_state.handle); 376 ctx.batch.batch_mut().insert( 377 &ctx.state.db.repos, 378 keys::repo_key(did), 379 crate::db::ser_repo_state(&repo_state)?, 380 ); 381 382 let evt = IdentityEvt { 383 did: did.clone().into_static(), 384 handle: repo_state.handle.clone(), 385 }; 386 ctx.broadcast_events 387 .push(ops::make_identity_event(&ctx.state.db, evt)); 388 } 389 SubscribeReposMessage::Account(account) => { 390 debug!(did = %did, "processing buffered account"); 391 let evt = AccountEvt { 392 did: did.clone().into_static(), 393 active: account.active, 394 status: account.status.as_ref().map(|s| s.to_cowstr().into_static()), 395 }; 396 397 Self::refresh_doc(ctx, &mut repo_state, did)?; 398 399 if !account.active { 400 use crate::ingest::stream::AccountStatus; 401 match &account.status { 402 Some(AccountStatus::Deleted) => { 403 debug!(did = %did, "account deleted, wiping data"); 404 crate::ops::delete_repo( 405 &mut ctx.batch, 406 &ctx.state.db, 407 did, 408 &repo_state, 409 )?; 410 return Ok(RepoProcessResult::Deleted); 411 } 412 status => { 413 let target_status = match status { 414 Some(status) => match status { 415 AccountStatus::Deleted => { 416 unreachable!("deleted account status is handled before") 417 } 418 AccountStatus::Takendown => RepoStatus::Takendown, 419 AccountStatus::Suspended => RepoStatus::Suspended, 420 AccountStatus::Deactivated => RepoStatus::Deactivated, 421 AccountStatus::Throttled => { 422 RepoStatus::Error("throttled".into()) 423 } 424 AccountStatus::Desynchronized => { 425 RepoStatus::Error("desynchronized".into()) 426 } 427 AccountStatus::Other(s) => { 428 warn!( 429 did = %did, status = %s, 430 "unknown account status, will put in error state" 431 ); 432 RepoStatus::Error(s.to_smolstr()) 433 } 434 }, 435 None => { 436 warn!(did = %did, "account inactive but no status provided"); 437 RepoStatus::Error("unknown".into()) 438 } 439 }; 440 441 if repo_state.status == target_status { 442 debug!(did = %did, ?target_status, "account status unchanged"); 443 return Ok(RepoProcessResult::Ok(repo_state)); 444 } 445 446 repo_state = ops::update_repo_status( 447 ctx.batch.batch_mut(), 448 &ctx.state.db, 449 did, 450 repo_state, 451 target_status, 452 )?; 453 ctx.state 454 .db 455 .update_gauge_diff(&GaugeState::Synced, &GaugeState::Resync(None)); 456 } 457 } 458 } else { 459 // normally we would initiate backfill here 460 // but we don't have to do anything because: 461 // 1. we handle changing repo status to Synced before this (in check repo state) 462 // 2. initiating backfilling is also handled there 463 } 464 ctx.broadcast_events 465 .push(ops::make_account_event(&ctx.state.db, evt)); 466 } 467 _ => { 468 warn!(did = %did, "unknown message type in buffer"); 469 } 470 } 471 472 Ok(RepoProcessResult::Ok(repo_state)) 473 } 474 475 fn process_commit<'c, 'ns, 's: 'ns>( 476 ctx: &mut WorkerContext, 477 did: &Did, 478 repo_state: RepoState<'s>, 479 commit: &'c Commit<'c>, 480 ) -> Result<RepoProcessResult<'ns, 'c>, IngestError> { 481 // check for replayed events (already seen revision) 482 if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) { 483 debug!( 484 did = %did, 485 commit_rev = %commit.rev, 486 state_rev = %repo_state.rev.as_ref().map(|r| r.to_tid()).expect("we checked in if"), 487 "skipping replayed event" 488 ); 489 return Ok(RepoProcessResult::Ok(repo_state)); 490 } 491 492 if let (Some(repo), Some(prev_commit)) = (&repo_state.data, &commit.prev_data) 493 && repo 494 != &prev_commit 495 .0 496 .to_ipld() 497 .into_diagnostic() 498 .wrap_err("invalid cid from relay")? 499 { 500 warn!( 501 did = %did, 502 repo = %repo, 503 prev_commit = %prev_commit.0, 504 "gap detected, triggering backfill" 505 ); 506 507 let mut batch = ctx.state.db.inner.batch(); 508 let _repo_state = ops::update_repo_status( 509 &mut batch, 510 &ctx.state.db, 511 did, 512 repo_state, 513 RepoStatus::Backfilling, 514 )?; 515 batch.commit().into_diagnostic()?; 516 ctx.state.db.update_gauge_diff( 517 &crate::types::GaugeState::Synced, 518 &crate::types::GaugeState::Pending, 519 ); 520 ctx.state.notify_backfill(); 521 return Ok(RepoProcessResult::Syncing(Some(commit))); 522 } 523 524 let signing_key = Self::fetch_key(ctx, did)?; 525 let res = ops::apply_commit( 526 &mut ctx.batch, 527 &ctx.state.db, 528 repo_state, 529 &commit, 530 signing_key.as_ref(), 531 &ctx.state.filter.load(), 532 ctx.ephemeral, 533 )?; 534 let repo_state = res.repo_state; 535 *ctx.added_blocks += res.blocks_count; 536 *ctx.records_delta += res.records_delta; 537 ctx.broadcast_events.push(BroadcastEvent::Persisted( 538 ctx.state 539 .db 540 .next_event_id 541 .load(std::sync::atomic::Ordering::SeqCst) 542 - 1, 543 )); 544 545 Ok(RepoProcessResult::Ok(repo_state)) 546 } 547 548 // checks the current state of the repo in the database 549 // if the repo is new, creates initial state and triggers backfill 550 // handles transitions between states (backfilling -> synced, etc) 551 fn check_repo_state<'s, 'c>( 552 ctx: &mut WorkerContext, 553 did: &Did<'_>, 554 msg: &'c SubscribeReposMessage<'static>, 555 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 556 let repo_key = keys::repo_key(&did); 557 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else { 558 let filter = ctx.state.filter.load(); 559 560 if filter.mode == FilterMode::Filter && !filter.signals.is_empty() { 561 let commit = match msg { 562 SubscribeReposMessage::Commit(c) => c, 563 _ => return Ok(RepoProcessResult::Syncing(None)), 564 }; 565 let touches_signal = commit.ops.iter().any(|op| { 566 op.path 567 .split_once('/') 568 .map(|(col, _)| { 569 let m = filter.matches_signal(col); 570 debug!( 571 did = %did, path = %op.path, col = %col, signals = ?filter.signals, matched = m, 572 "signal check" 573 ); 574 m 575 }) 576 .unwrap_or(false) 577 }); 578 if !touches_signal { 579 trace!(did = %did, "dropping commit, no signal-matching ops"); 580 return Ok(RepoProcessResult::Syncing(None)); 581 } 582 } 583 584 debug!(did = %did, "discovered new account from firehose, queueing backfill"); 585 586 let repo_state = RepoState::untracked(rand::rng().next_u64()); 587 let mut batch = ctx.state.db.inner.batch(); 588 batch.insert( 589 &ctx.state.db.repos, 590 &repo_key, 591 crate::db::ser_repo_state(&repo_state)?, 592 ); 593 batch.insert( 594 &ctx.state.db.pending, 595 keys::pending_key(repo_state.index_id), 596 &repo_key, 597 ); 598 batch.commit().into_diagnostic()?; 599 600 ctx.state.db.update_count("repos", 1); 601 ctx.state.db.update_gauge_diff( 602 &crate::types::GaugeState::Synced, 603 &crate::types::GaugeState::Pending, 604 ); 605 606 ctx.state.notify_backfill(); 607 608 return Ok(RepoProcessResult::Syncing(None)); 609 }; 610 let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static(); 611 612 if !repo_state.tracked && repo_state.status != RepoStatus::Backfilling { 613 trace!(did = %did, "ignoring active status as it is explicitly untracked"); 614 return Ok(RepoProcessResult::Syncing(None)); 615 } 616 617 // if we are backfilling or it is new, DON'T mark it as synced yet 618 // the backfill worker will do that when it finishes 619 match &repo_state.status { 620 RepoStatus::Synced => { 621 // lazy drain: if there are buffered commits, drain them now 622 if ops::has_buffered_commits(&ctx.state.db, did) { 623 Self::drain_resync_buffer(ctx, did, repo_state) 624 } else { 625 Ok(RepoProcessResult::Ok(repo_state)) 626 } 627 } 628 RepoStatus::Backfilling | RepoStatus::Error(_) => { 629 debug!( 630 did = %did, status = ?repo_state.status, 631 "ignoring active status" 632 ); 633 Ok(RepoProcessResult::Syncing(None)) 634 } 635 RepoStatus::Deactivated | RepoStatus::Suspended | RepoStatus::Takendown => { 636 // if it was in deactivated/takendown/suspended state, we can mark it as synced 637 // because we are receiving live events now 638 // UNLESS it is an account status event that keeps it deactivated 639 if let SubscribeReposMessage::Account(acc) = msg { 640 if !acc.active { 641 return Ok(RepoProcessResult::Ok(repo_state)); 642 } 643 } 644 repo_state = ops::update_repo_status( 645 ctx.batch.batch_mut(), 646 &ctx.state.db, 647 did, 648 repo_state, 649 RepoStatus::Synced, 650 )?; 651 ctx.state.db.update_gauge_diff( 652 &crate::types::GaugeState::Resync(None), 653 &crate::types::GaugeState::Synced, 654 ); 655 Ok(RepoProcessResult::Ok(repo_state)) 656 } 657 } 658 } 659 660 fn drain_resync_buffer<'s>( 661 ctx: &mut WorkerContext, 662 did: &Did, 663 mut repo_state: RepoState<'s>, 664 ) -> Result<RepoProcessResult<'s, 'static>, IngestError> { 665 let prefix = keys::resync_buffer_prefix(did); 666 667 for guard in ctx.state.db.resync_buffer.prefix(&prefix) { 668 let (key, value) = guard.into_inner().into_diagnostic()?; 669 let commit: Commit = rmp_serde::from_slice(&value).into_diagnostic()?; 670 671 let res = Self::process_commit(ctx, did, repo_state, &commit); 672 let res = match res { 673 Ok(r) => r, 674 Err(e) => { 675 if !Self::check_if_retriable_failure(&e) { 676 ctx.batch 677 .batch_mut() 678 .remove(&ctx.state.db.resync_buffer, key); 679 } 680 return Err(e); 681 } 682 }; 683 match res { 684 RepoProcessResult::Ok(rs) => { 685 ctx.batch 686 .batch_mut() 687 .remove(&ctx.state.db.resync_buffer, key); 688 repo_state = rs; 689 } 690 RepoProcessResult::Syncing(_) => { 691 return Ok(RepoProcessResult::Syncing(None)); 692 } 693 RepoProcessResult::Deleted => { 694 ctx.batch 695 .batch_mut() 696 .remove(&ctx.state.db.resync_buffer, key); 697 return Ok(RepoProcessResult::Deleted); 698 } 699 } 700 } 701 702 Ok(RepoProcessResult::Ok(repo_state)) 703 } 704 705 // refreshes the handle, pds url and signing key of a did 706 fn refresh_doc( 707 ctx: &mut WorkerContext, 708 repo_state: &mut RepoState, 709 did: &Did, 710 ) -> Result<(), IngestError> { 711 ctx.state.resolver.invalidate_sync(did); 712 let doc = ctx.handle.block_on(ctx.state.resolver.resolve_doc(did))?; 713 repo_state.update_from_doc(doc); 714 ctx.batch.batch_mut().insert( 715 &ctx.state.db.repos, 716 keys::repo_key(did), 717 crate::db::ser_repo_state(&repo_state)?, 718 ); 719 Ok(()) 720 } 721 722 fn fetch_key( 723 ctx: &WorkerContext, 724 did: &Did, 725 ) -> Result<Option<PublicKey<'static>>, IngestError> { 726 if ctx.verify_signatures { 727 let key = ctx 728 .handle 729 .block_on(ctx.state.resolver.resolve_signing_key(did))?; 730 Ok(Some(key)) 731 } else { 732 Ok(None) 733 } 734 } 735}