at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[types] use Did for did's in event types

ptr.pet 85291e9d 61539b51

verified
+41 -33
+3 -3
src/backfill/mod.rs
··· 4 use crate::types::{AccountEvt, BroadcastEvent, RepoState, RepoStatus, ResyncState, StoredEvent}; 5 use futures::TryFutureExt; 6 use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 7 - use jacquard::prelude::*; 8 use jacquard::types::did::Did; 9 use jacquard_common::xrpc::XrpcError; 10 use jacquard_repo::mst::Mst; 11 use jacquard_repo::MemoryBlockStore; ··· 237 238 let emit_identity = |status: &RepoStatus| { 239 let evt = AccountEvt { 240 - did: did.as_str().into(), 241 active: !matches!( 242 status, 243 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended ··· 403 app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 404 let evt = StoredEvent::Record { 405 live: false, 406 - did: did.as_str().into(), 407 rev: rev.as_str().into(), 408 collection: collection.into(), 409 rkey: rkey.into(),
··· 4 use crate::types::{AccountEvt, BroadcastEvent, RepoState, RepoStatus, ResyncState, StoredEvent}; 5 use futures::TryFutureExt; 6 use jacquard::api::com_atproto::sync::get_repo::{GetRepo, GetRepoError}; 7 use jacquard::types::did::Did; 8 + use jacquard::{prelude::*, IntoStatic}; 9 use jacquard_common::xrpc::XrpcError; 10 use jacquard_repo::mst::Mst; 11 use jacquard_repo::MemoryBlockStore; ··· 237 238 let emit_identity = |status: &RepoStatus| { 239 let evt = AccountEvt { 240 + did: did.clone(), 241 active: !matches!( 242 status, 243 RepoStatus::Deactivated | RepoStatus::Takendown | RepoStatus::Suspended ··· 403 app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 404 let evt = StoredEvent::Record { 405 live: false, 406 + did: did.clone().into_static(), 407 rev: rev.as_str().into(), 408 collection: collection.into(), 409 rkey: rkey.into(),
+5 -8
src/buffer/processor.rs
··· 40 let mut to_remove: Vec<Did<'static>> = Vec::new(); 41 42 loop { 43 - // receive new messages (non-blocking drain) 44 while let Ok(msg) = self.rx.try_recv() { 45 queues.entry(msg.did.clone()).or_default().push_back(msg); 46 } 47 48 - // process unblocked DIDs 49 for (did, queue) in &mut queues { 50 if self.state.blocked_dids.contains_sync(did) { 51 continue; ··· 98 debug!("processing buffered identity for {did}"); 99 let handle = identity.handle.as_ref().map(|h| h.as_str().to_smolstr()); 100 let evt = IdentityEvt { 101 - did: did.to_smolstr(), 102 handle, 103 }; 104 ops::emit_identity_event(&self.state.db, evt); ··· 106 SubscribeReposMessage::Account(account) => { 107 debug!("processing buffered account for {did}"); 108 let evt = AccountEvt { 109 - did: did.to_smolstr(), 110 active: account.active, 111 status: account.status.as_ref().map(|s| s.to_smolstr()), 112 }; 113 ops::emit_account_event(&self.state.db, evt); 114 115 let state = self.state.clone(); 116 - let did = did.clone(); 117 - let account = account.clone(); // Account is 'static in BufferedMessage 118 119 tokio::task::spawn_blocking(move || -> Result<()> { 120 - // handle status updates 121 if !account.active { 122 use jacquard::api::com_atproto::sync::subscribe_repos::AccountStatus; 123 if let Some(status) = &account.status { ··· 208 .flatten() 209 } 210 211 - async fn remove_from_db_buffer(&self, did: &str, buffered_at: i64) -> Result<()> { 212 let key = keys::buffer_key(did, buffered_at); 213 self.state.db.buffer.remove(key).into_diagnostic()?; 214 Ok(())
··· 40 let mut to_remove: Vec<Did<'static>> = Vec::new(); 41 42 loop { 43 while let Ok(msg) = self.rx.try_recv() { 44 queues.entry(msg.did.clone()).or_default().push_back(msg); 45 } 46 47 for (did, queue) in &mut queues { 48 if self.state.blocked_dids.contains_sync(did) { 49 continue; ··· 96 debug!("processing buffered identity for {did}"); 97 let handle = identity.handle.as_ref().map(|h| h.as_str().to_smolstr()); 98 let evt = IdentityEvt { 99 + did: did.clone(), 100 handle, 101 }; 102 ops::emit_identity_event(&self.state.db, evt); ··· 104 SubscribeReposMessage::Account(account) => { 105 debug!("processing buffered account for {did}"); 106 let evt = AccountEvt { 107 + did: did.clone(), 108 active: account.active, 109 status: account.status.as_ref().map(|s| s.to_smolstr()), 110 }; 111 ops::emit_account_event(&self.state.db, evt); 112 113 + let did = did.clone(); 114 let state = self.state.clone(); 115 + let account = account.clone(); 116 117 tokio::task::spawn_blocking(move || -> Result<()> { 118 if !account.active { 119 use jacquard::api::com_atproto::sync::subscribe_repos::AccountStatus; 120 if let Some(status) = &account.status { ··· 205 .flatten() 206 } 207 208 + async fn remove_from_db_buffer(&self, did: &Did<'_>, buffered_at: i64) -> Result<()> { 209 let key = keys::buffer_key(did, buffered_at); 210 self.state.db.buffer.remove(key).into_diagnostic()?; 211 Ok(())
+2 -2
src/db/keys.rs
··· 66 } 67 68 // key format: {DID}\x00{timestamp} (for buffer entries) 69 - pub fn buffer_key(did: &str, timestamp: i64) -> Vec<u8> { 70 let mut key = Vec::with_capacity(did.len() + 1 + 8); 71 - key.extend_from_slice(did.as_bytes()); 72 key.push(SEP); 73 key.extend_from_slice(&timestamp.to_be_bytes()); 74 key
··· 66 } 67 68 // key format: {DID}\x00{timestamp} (for buffer entries) 69 + pub fn buffer_key(did: &Did, timestamp: i64) -> Vec<u8> { 70 let mut key = Vec::with_capacity(did.len() + 1 + 8); 71 + key.extend_from_slice(did_prefix(did).as_bytes()); 72 key.push(SEP); 73 key.extend_from_slice(&timestamp.to_be_bytes()); 74 key
+3 -2
src/ingest/mod.rs
··· 8 use n0_future::StreamExt; 9 use std::sync::atomic::Ordering; 10 use std::sync::Arc; 11 use tracing::{debug, error, info}; 12 use url::Url; 13 ··· 62 Ok(s) => s, 63 Err(e) => { 64 error!("failed to connect to firehose: {e}, retrying in 5s..."); 65 - tokio::time::sleep(std::time::Duration::from_secs(5)).await; 66 continue; 67 } 68 }; ··· 83 } 84 85 error!("firehose disconnected, reconnecting in 5s..."); 86 - tokio::time::sleep(std::time::Duration::from_secs(5)).await; 87 } 88 } 89
··· 8 use n0_future::StreamExt; 9 use std::sync::atomic::Ordering; 10 use std::sync::Arc; 11 + use std::time::Duration; 12 use tracing::{debug, error, info}; 13 use url::Url; 14 ··· 63 Ok(s) => s, 64 Err(e) => { 65 error!("failed to connect to firehose: {e}, retrying in 5s..."); 66 + tokio::time::sleep(Duration::from_secs(5)).await; 67 continue; 68 } 69 }; ··· 84 } 85 86 error!("firehose disconnected, reconnecting in 5s..."); 87 + tokio::time::sleep(Duration::from_secs(5)).await; 88 } 89 } 90
+4 -3
src/ops.rs
··· 2 use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, StoredEvent}; 3 use jacquard::api::com_atproto::sync::subscribe_repos::Commit; 4 use jacquard::cowstr::ToCowStr; 5 use jacquard_repo::car::reader::parse_car_bytes; 6 use miette::{IntoDiagnostic, Result}; 7 use smol_str::{SmolStr, ToSmolStr}; ··· 12 13 // emitting identity is ephemeral 14 // we dont replay these, consumers can just fetch identity themselves if they need it 15 - pub fn emit_identity_event(db: &Db, evt: IdentityEvt) { 16 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 17 let marshallable = MarshallableEvt { 18 id: event_id, ··· 24 let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable)); 25 } 26 27 - pub fn emit_account_event(db: &Db, evt: AccountEvt) { 28 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 29 let marshallable = MarshallableEvt { 30 id: event_id, ··· 186 187 let evt = StoredEvent::Record { 188 live, 189 - did: did.as_str().into(), 190 rev: commit.rev.as_str().into(), 191 collection: collection.into(), 192 rkey: rkey.into(),
··· 2 use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, StoredEvent}; 3 use jacquard::api::com_atproto::sync::subscribe_repos::Commit; 4 use jacquard::cowstr::ToCowStr; 5 + use jacquard::IntoStatic; 6 use jacquard_repo::car::reader::parse_car_bytes; 7 use miette::{IntoDiagnostic, Result}; 8 use smol_str::{SmolStr, ToSmolStr}; ··· 13 14 // emitting identity is ephemeral 15 // we dont replay these, consumers can just fetch identity themselves if they need it 16 + pub fn emit_identity_event(db: &Db, evt: IdentityEvt<'static>) { 17 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 18 let marshallable = MarshallableEvt { 19 id: event_id, ··· 25 let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable)); 26 } 27 28 + pub fn emit_account_event(db: &Db, evt: AccountEvt<'static>) { 29 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 30 let marshallable = MarshallableEvt { 31 id: event_id, ··· 187 188 let evt = StoredEvent::Record { 189 live, 190 + did: did.clone().into_static(), 191 rev: commit.rev.as_str().into(), 192 collection: collection.into(), 193 rkey: rkey.into(),
+24 -15
src/types.rs
··· 69 // from src/api/event.rs 70 71 #[derive(Debug, Serialize, Deserialize, Clone)] 72 - pub struct MarshallableEvt { 73 pub id: u64, 74 #[serde(rename = "type")] 75 pub event_type: SmolStr, 76 #[serde(skip_serializing_if = "Option::is_none")] 77 - pub record: Option<RecordEvt>, 78 #[serde(skip_serializing_if = "Option::is_none")] 79 - pub identity: Option<IdentityEvt>, 80 #[serde(skip_serializing_if = "Option::is_none")] 81 - pub account: Option<AccountEvt>, 82 } 83 84 #[derive(Clone, Debug)] 85 pub enum BroadcastEvent { 86 Persisted(u64), 87 - Ephemeral(MarshallableEvt), 88 } 89 90 #[derive(Debug, Serialize, Deserialize, Clone)] 91 - pub struct RecordEvt { 92 pub live: bool, 93 - pub did: SmolStr, 94 pub rev: SmolStr, 95 pub collection: SmolStr, 96 pub rkey: SmolStr, ··· 102 } 103 104 #[derive(Debug, Serialize, Deserialize, Clone)] 105 - pub struct IdentityEvt { 106 - pub did: SmolStr, 107 #[serde(skip_serializing_if = "Option::is_none")] 108 pub handle: Option<SmolStr>, 109 } 110 111 #[derive(Debug, Serialize, Deserialize, Clone)] 112 - pub struct AccountEvt { 113 - pub did: SmolStr, 114 pub active: bool, 115 #[serde(skip_serializing_if = "Option::is_none")] 116 pub status: Option<SmolStr>, 117 } 118 119 #[derive(Debug, Serialize, Deserialize, Clone)] 120 - pub enum StoredEvent { 121 Record { 122 live: bool, 123 - did: SmolStr, 124 rev: SmolStr, 125 collection: SmolStr, 126 rkey: SmolStr, 127 action: SmolStr, 128 cid: Option<SmolStr>, 129 }, 130 - Identity(IdentityEvt), 131 - Account(AccountEvt), 132 }
··· 69 // from src/api/event.rs 70 71 #[derive(Debug, Serialize, Deserialize, Clone)] 72 + pub struct MarshallableEvt<'i> { 73 pub id: u64, 74 #[serde(rename = "type")] 75 pub event_type: SmolStr, 76 + #[serde(borrow)] 77 #[serde(skip_serializing_if = "Option::is_none")] 78 + pub record: Option<RecordEvt<'i>>, 79 + #[serde(borrow)] 80 #[serde(skip_serializing_if = "Option::is_none")] 81 + pub identity: Option<IdentityEvt<'i>>, 82 + #[serde(borrow)] 83 #[serde(skip_serializing_if = "Option::is_none")] 84 + pub account: Option<AccountEvt<'i>>, 85 } 86 87 #[derive(Clone, Debug)] 88 pub enum BroadcastEvent { 89 Persisted(u64), 90 + Ephemeral(MarshallableEvt<'static>), 91 } 92 93 #[derive(Debug, Serialize, Deserialize, Clone)] 94 + pub struct RecordEvt<'i> { 95 pub live: bool, 96 + #[serde(borrow)] 97 + pub did: Did<'i>, 98 pub rev: SmolStr, 99 pub collection: SmolStr, 100 pub rkey: SmolStr, ··· 106 } 107 108 #[derive(Debug, Serialize, Deserialize, Clone)] 109 + pub struct IdentityEvt<'i> { 110 + #[serde(borrow)] 111 + pub did: Did<'i>, 112 #[serde(skip_serializing_if = "Option::is_none")] 113 pub handle: Option<SmolStr>, 114 } 115 116 #[derive(Debug, Serialize, Deserialize, Clone)] 117 + pub struct AccountEvt<'i> { 118 + #[serde(borrow)] 119 + pub did: Did<'i>, 120 pub active: bool, 121 #[serde(skip_serializing_if = "Option::is_none")] 122 pub status: Option<SmolStr>, 123 } 124 125 #[derive(Debug, Serialize, Deserialize, Clone)] 126 + pub enum StoredEvent<'i> { 127 Record { 128 live: bool, 129 + #[serde(borrow)] 130 + did: Did<'i>, 131 rev: SmolStr, 132 collection: SmolStr, 133 rkey: SmolStr, 134 action: SmolStr, 135 cid: Option<SmolStr>, 136 }, 137 + #[serde(borrow)] 138 + Identity(IdentityEvt<'i>), 139 + #[serde(borrow)] 140 + Account(AccountEvt<'i>), 141 }