at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use fjall::OwnedWriteBatch;
2use fjall::Slice;
3
4use crate::db::refcount::RefcountedBatch;
5use jacquard_common::CowStr;
6use jacquard_common::IntoStatic;
7use jacquard_common::types::cid::Cid;
8use jacquard_common::types::crypto::PublicKey;
9use jacquard_common::types::did::Did;
10use jacquard_repo::car::reader::parse_car_bytes;
11use miette::{Context, IntoDiagnostic, Result};
12use rand::{Rng, rng};
13use std::collections::HashMap;
14use std::sync::atomic::Ordering;
15use std::time::Instant;
16use tracing::{debug, trace};
17
18use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid};
19use crate::db::{self, Db, keys, ser_repo_state};
20use crate::filter::FilterConfig;
21use crate::ingest::stream::Commit;
22use crate::types::{
23 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState,
24 StoredEvent,
25};
26
27pub fn persist_to_resync_buffer(db: &Db, did: &Did, commit: &Commit) -> Result<()> {
28 let key = keys::resync_buffer_key(did, DbTid::from(&commit.rev));
29 let value = rmp_serde::to_vec(commit).into_diagnostic()?;
30 db.resync_buffer.insert(key, value).into_diagnostic()?;
31 debug!(
32 did = %did,
33 seq = commit.seq,
34 "buffered commit to resync_buffer"
35 );
36 Ok(())
37}
38
39pub fn has_buffered_commits(db: &Db, did: &Did) -> bool {
40 let prefix = keys::resync_buffer_prefix(did);
41 db.resync_buffer.prefix(&prefix).next().is_some()
42}
43
44// emitting identity is ephemeral
45// we dont replay these, consumers can just fetch identity themselves if they need it
46pub fn make_identity_event(db: &Db, evt: IdentityEvt<'static>) -> BroadcastEvent {
47 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
48 let marshallable = MarshallableEvt {
49 id: event_id,
50 event_type: "identity".into(),
51 record: None,
52 identity: Some(evt),
53 account: None,
54 };
55 BroadcastEvent::Ephemeral(Box::new(marshallable))
56}
57
58pub fn make_account_event(db: &Db, evt: AccountEvt<'static>) -> BroadcastEvent {
59 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
60 let marshallable = MarshallableEvt {
61 id: event_id,
62 event_type: "account".into(),
63 record: None,
64 identity: None,
65 account: Some(evt),
66 };
67 BroadcastEvent::Ephemeral(Box::new(marshallable))
68}
69
70pub fn delete_repo(
71 batch: &mut RefcountedBatch<'_>,
72 db: &Db,
73 did: &Did,
74 repo_state: &RepoState,
75) -> Result<()> {
76 debug!(did = %did, "deleting repo");
77
78 let repo_key = keys::repo_key(did);
79 let pending_key = keys::pending_key(repo_state.index_id);
80
81 // 1. delete from repos, pending, resync
82 batch.batch_mut().remove(&db.repos, &repo_key);
83 match repo_state.status {
84 RepoStatus::Synced => {}
85 RepoStatus::Backfilling => {
86 batch.batch_mut().remove(&db.pending, &pending_key);
87 }
88 _ => {
89 batch.batch_mut().remove(&db.resync, &repo_key);
90 }
91 }
92
93 // 2. delete from resync buffer
94 let resync_prefix = keys::resync_buffer_prefix(did);
95 for guard in db.resync_buffer.prefix(&resync_prefix) {
96 let k = guard.key().into_diagnostic()?;
97 batch.batch_mut().remove(&db.resync_buffer, k);
98 }
99
100 // 3. delete from records
101 let records_prefix = keys::record_prefix_did(did);
102 for guard in db.records.prefix(&records_prefix) {
103 let (k, cid_bytes) = guard.into_inner().into_diagnostic()?;
104 batch.update_block_refcount(cid_bytes, -1)?;
105 batch.batch_mut().remove(&db.records, k);
106 }
107
108 // 4. reset collection counts
109 let mut count_prefix = Vec::new();
110 count_prefix.push(b'r');
111 count_prefix.push(keys::SEP);
112 TrimmedDid::from(did).write_to_vec(&mut count_prefix);
113 count_prefix.push(keys::SEP);
114
115 for guard in db.counts.prefix(&count_prefix) {
116 let k = guard.key().into_diagnostic()?;
117 batch.batch_mut().remove(&db.counts, k);
118 }
119
120 Ok(())
121}
122
123pub fn update_repo_status<'batch, 's>(
124 batch: &'batch mut OwnedWriteBatch,
125 db: &Db,
126 did: &Did,
127 mut repo_state: RepoState<'s>,
128 new_status: RepoStatus,
129) -> Result<RepoState<'s>> {
130 debug!(did = %did, status = ?new_status, "updating repo status");
131
132 let repo_key = keys::repo_key(did);
133 let pending_key = keys::pending_key(repo_state.index_id);
134
135 // manage queues
136 match &new_status {
137 RepoStatus::Synced => {
138 batch.remove(&db.pending, &pending_key);
139 // we dont have to remove from resync here because it has to transition resync -> pending first
140 }
141 RepoStatus::Backfilling => {
142 // if we are coming from an error state, remove from resync
143 if !matches!(repo_state.status, RepoStatus::Synced) {
144 batch.remove(&db.resync, &repo_key);
145 }
146 // remove the old entry
147 batch.remove(&db.pending, &pending_key);
148 // add as new entry
149 repo_state.index_id = rng().next_u64();
150 batch.insert(
151 &db.pending,
152 keys::pending_key(repo_state.index_id),
153 &repo_key,
154 );
155 }
156 RepoStatus::Error(_msg) => {
157 batch.remove(&db.pending, &pending_key);
158 // TODO: we need to make errors have kind instead of "message" in repo status
159 // and then pass it to resync error kind
160 let resync_state = crate::types::ResyncState::Error {
161 kind: crate::types::ResyncErrorKind::Generic,
162 retry_count: 0,
163 next_retry: chrono::Utc::now().timestamp(),
164 };
165 batch.insert(
166 &db.resync,
167 &repo_key,
168 rmp_serde::to_vec(&resync_state).into_diagnostic()?,
169 );
170 }
171 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended => {
172 // this shouldnt be needed since a repo wont be in a pending state when it gets to any of these states
173 // batch.remove(&db.pending, &pending_key);
174 let resync_state = ResyncState::Gone {
175 status: new_status.clone(),
176 };
177 batch.insert(
178 &db.resync,
179 &repo_key,
180 rmp_serde::to_vec(&resync_state).into_diagnostic()?,
181 );
182 }
183 }
184
185 repo_state.status = new_status;
186 repo_state.last_updated_at = chrono::Utc::now().timestamp();
187
188 batch.insert(&db.repos, &repo_key, ser_repo_state(&repo_state)?);
189
190 Ok(repo_state)
191}
192
193pub fn verify_sync_event(blocks: &[u8], key: Option<&PublicKey>) -> Result<(Cid<'static>, String)> {
194 let parsed = tokio::task::block_in_place(|| {
195 tokio::runtime::Handle::current()
196 .block_on(parse_car_bytes(blocks))
197 .into_diagnostic()
198 })?;
199
200 let root_bytes = parsed
201 .blocks
202 .get(&parsed.root)
203 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
204
205 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?;
206
207 if let Some(key) = key {
208 repo_commit
209 .verify(key)
210 .map_err(|e| miette::miette!("signature verification failed: {e}"))?;
211 }
212
213 Ok((
214 Cid::ipld(repo_commit.data).into_static(),
215 repo_commit.rev.to_string(),
216 ))
217}
218
219pub struct ApplyCommitResults<'s> {
220 pub repo_state: RepoState<'s>,
221 pub records_delta: i64,
222 pub blocks_count: i64,
223}
224
225pub fn apply_commit<'db, 'commit, 's>(
226 batch: &mut RefcountedBatch<'db>,
227 db: &'db Db,
228 mut repo_state: RepoState<'s>,
229 commit: &'commit Commit<'commit>,
230 signing_key: Option<&PublicKey>,
231 filter: &FilterConfig,
232 ephemeral: bool,
233) -> Result<ApplyCommitResults<'s>> {
234 let did = &commit.repo;
235 debug!(did = %did, commit = %commit.commit, "applying commit");
236
237 // 1. parse CAR blocks and store them in CAS
238 let start = Instant::now();
239 let parsed = tokio::task::block_in_place(|| {
240 tokio::runtime::Handle::current()
241 .block_on(parse_car_bytes(commit.blocks.as_ref()))
242 .into_diagnostic()
243 })?;
244
245 trace!(did = %did, elapsed = ?start.elapsed(), "parsed car");
246
247 let root_bytes = parsed
248 .blocks
249 .get(&parsed.root)
250 .ok_or_else(|| miette::miette!("root block missing from CAR"))?;
251
252 let repo_commit = jacquard_repo::commit::Commit::from_cbor(root_bytes).into_diagnostic()?;
253
254 if let Some(key) = signing_key {
255 repo_commit
256 .verify(key)
257 .map_err(|e| miette::miette!("signature verification failed for {did}: {e}"))?;
258 trace!(did = %did, "signature verified");
259 }
260
261 repo_state.rev = Some((&commit.rev).into());
262 repo_state.data = Some(repo_commit.data);
263 repo_state.last_updated_at = chrono::Utc::now().timestamp();
264
265 batch
266 .batch_mut()
267 .insert(&db.repos, keys::repo_key(did), ser_repo_state(&repo_state)?);
268
269 // 2. iterate ops and update records index
270 let mut records_delta = 0;
271 let mut blocks_count = 0;
272 let mut collection_deltas: HashMap<&str, i64> = HashMap::new();
273
274 for op in &commit.ops {
275 let (collection, rkey) = parse_path(&op.path)?;
276
277 if !filter.matches_collection(collection) {
278 continue;
279 }
280
281 let rkey = DbRkey::new(rkey);
282 let db_key = keys::record_key(did, collection, &rkey);
283
284 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst);
285
286 let action = DbAction::try_from(op.action.as_str())?;
287 match action {
288 DbAction::Create | DbAction::Update => {
289 let Some(cid) = &op.cid else {
290 continue;
291 };
292 let cid_ipld = cid
293 .to_ipld()
294 .into_diagnostic()
295 .wrap_err("expected valid cid from relay")?;
296
297 let Some(bytes) = parsed.blocks.get(&cid_ipld) else {
298 return Err(miette::miette!(
299 "block {cid} not found in CAR for record {did}/{collection}/{rkey}"
300 ));
301 };
302 let cid_bytes = Slice::from(cid_ipld.to_bytes());
303 batch
304 .batch_mut()
305 .insert(&db.blocks, cid_bytes.clone(), bytes.to_vec());
306 blocks_count += 1;
307 batch.update_block_refcount(
308 cid_bytes.clone(),
309 ephemeral.then_some(1).unwrap_or(2),
310 )?;
311
312 if !ephemeral {
313 batch
314 .batch_mut()
315 .insert(&db.records, db_key.clone(), cid_ipld.to_bytes());
316 // for Update, also decrement old CID refcount
317 if action == DbAction::Update {
318 let Some(old_cid_bytes) = db.records.get(&db_key).into_diagnostic()? else {
319 return Err(miette::miette!(
320 "!!! THIS IS A BUG !!! expected previous cid to be there for record being updated ({did}/{collection}/{rkey}). how did we get here?"
321 ));
322 };
323 if old_cid_bytes != cid_bytes {
324 batch.update_block_refcount(old_cid_bytes, -1)?;
325 }
326 }
327 // accumulate counts
328 if action == DbAction::Create {
329 records_delta += 1;
330 *collection_deltas.entry(collection).or_default() += 1;
331 }
332 }
333 }
334 DbAction::Delete => {
335 if !ephemeral {
336 // decrement block refcount
337 let old_cid_bytes = db.records.get(&db_key).into_diagnostic()?;
338 if let Some(cid_bytes) = old_cid_bytes {
339 batch.update_block_refcount(cid_bytes, -1)?;
340 }
341 batch.batch_mut().remove(&db.records, db_key);
342
343 // accumulate counts
344 records_delta -= 1;
345 *collection_deltas.entry(collection).or_default() -= 1;
346 }
347 }
348 }
349
350 let evt = StoredEvent {
351 live: true,
352 did: TrimmedDid::from(did),
353 rev: DbTid::from(&commit.rev),
354 collection: CowStr::Borrowed(collection),
355 rkey,
356 action,
357 cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")),
358 };
359
360 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
361 batch
362 .batch_mut()
363 .insert(&db.events, keys::event_key(event_id), bytes);
364 }
365
366 // update counts
367 if !ephemeral {
368 for (col, delta) in collection_deltas {
369 db::update_record_count(batch.batch_mut(), db, did, col, delta)?;
370 }
371 }
372
373 Ok(ApplyCommitResults {
374 repo_state,
375 records_delta,
376 blocks_count,
377 })
378}
379
380pub fn parse_path(path: &str) -> Result<(&str, &str)> {
381 let mut parts = path.splitn(2, '/');
382 let collection = parts.next().wrap_err("missing collection")?;
383 let rkey = parts.next().wrap_err("missing rkey")?;
384 Ok((collection, rkey))
385}