very fast at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
fjall at-protocol atproto indexer rust
at main 249 lines 7.4 kB view raw
1use std::fmt::Display; 2 3use jacquard_common::types::cid::IpldCid; 4use jacquard_common::types::string::Did; 5use jacquard_common::{CowStr, IntoStatic, types::string::Handle}; 6use serde::{Deserialize, Serialize}; 7use serde_json::Value; 8use smol_str::{SmolStr, ToSmolStr}; 9 10use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid}; 11use crate::resolver::MiniDoc; 12 13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 14pub enum RepoStatus { 15 Backfilling, 16 Synced, 17 Error(SmolStr), 18 Deactivated, 19 Takendown, 20 Suspended, 21} 22 23impl Display for RepoStatus { 24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 25 match self { 26 RepoStatus::Backfilling => write!(f, "backfilling"), 27 RepoStatus::Synced => write!(f, "synced"), 28 RepoStatus::Error(e) => write!(f, "error({e})"), 29 RepoStatus::Deactivated => write!(f, "deactivated"), 30 RepoStatus::Takendown => write!(f, "takendown"), 31 RepoStatus::Suspended => write!(f, "suspended"), 32 } 33 } 34} 35 36#[derive(Debug, Clone, Serialize, Deserialize)] 37#[serde(bound(deserialize = "'i: 'de"))] 38pub struct RepoState<'i> { 39 pub status: RepoStatus, 40 pub rev: Option<DbTid>, 41 pub data: Option<IpldCid>, 42 // todo: is this actually valid? the spec says this is informal and intermadiate 43 // services may change it. we should probably document it. if we cant use this 44 // then how do we dedup account / identity ops? 45 /// ms since epoch of the last firehose message we processed for this repo. 46 /// used to deduplicate identity / account events that can arrive from multiple relays at 47 /// different wall-clock times but represent the same underlying PDS event. 48 #[serde(default)] 49 pub last_message_time: Option<i64>, 50 /// this is when we *ingested* any last updates 51 pub last_updated_at: i64, // unix timestamp 52 /// whether we are ingesting events for this repo 53 pub tracked: bool, 54 /// index id in pending keyspace 55 pub index_id: u64, 56 #[serde(borrow)] 57 pub signing_key: Option<DidKey<'i>>, 58 #[serde(borrow)] 59 pub pds: Option<CowStr<'i>>, 60 #[serde(borrow)] 61 pub handle: Option<Handle<'i>>, 62} 63 64impl<'i> RepoState<'i> { 65 pub fn backfilling(index_id: u64) -> Self { 66 Self { 67 status: RepoStatus::Backfilling, 68 rev: None, 69 data: None, 70 last_updated_at: chrono::Utc::now().timestamp(), 71 index_id, 72 tracked: true, 73 handle: None, 74 pds: None, 75 signing_key: None, 76 last_message_time: None, 77 } 78 } 79 80 /// backfilling, but not tracked yet 81 pub fn untracked(index_id: u64) -> Self { 82 Self { 83 tracked: false, 84 ..Self::backfilling(index_id) 85 } 86 } 87 88 // advances the high-water mark to event_ms if it's newer than what we've seen 89 pub fn advance_message_time(&mut self, event_ms: i64) { 90 self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0))); 91 } 92 93 // updates last_updated_at to now 94 pub fn touch(&mut self) { 95 self.last_updated_at = chrono::Utc::now().timestamp(); 96 } 97 98 pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool { 99 let new_signing_key = doc.key.map(From::from); 100 let changed = self.pds.as_deref() != Some(doc.pds.as_str()) 101 || self.handle != doc.handle 102 || self.signing_key != new_signing_key; 103 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr())); 104 self.handle = doc.handle; 105 self.signing_key = new_signing_key; 106 changed 107 } 108} 109 110impl<'i> IntoStatic for RepoState<'i> { 111 type Output = RepoState<'static>; 112 113 fn into_static(self) -> Self::Output { 114 RepoState { 115 status: self.status, 116 rev: self.rev, 117 data: self.data, 118 last_updated_at: self.last_updated_at, 119 index_id: self.index_id, 120 tracked: self.tracked, 121 handle: self.handle.map(IntoStatic::into_static), 122 pds: self.pds.map(IntoStatic::into_static), 123 signing_key: self.signing_key.map(IntoStatic::into_static), 124 last_message_time: self.last_message_time, 125 } 126 } 127} 128 129#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 130pub enum ResyncErrorKind { 131 Ratelimited, 132 Transport, 133 Generic, 134} 135 136#[derive(Debug, Clone, Serialize, Deserialize)] 137pub enum ResyncState { 138 Error { 139 kind: ResyncErrorKind, 140 retry_count: u32, 141 next_retry: i64, // unix timestamp 142 }, 143 Gone { 144 status: RepoStatus, // deactivated, takendown, suspended 145 }, 146} 147 148impl ResyncState { 149 pub fn next_backoff(retry_count: u32) -> i64 { 150 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 151 let base = 60; 152 let cap = 3600; 153 let mult = 2u64.pow(retry_count.min(10)) as i64; 154 let delay = (base * mult).min(cap); 155 156 // add +/- 10% jitter 157 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64; 158 let delay = (delay as f64 + jitter) as i64; 159 160 chrono::Utc::now().timestamp() + delay 161 } 162} 163 164// from src/api/event.rs 165 166#[derive(Debug, Serialize, Clone)] 167pub struct MarshallableEvt<'i> { 168 pub id: u64, 169 #[serde(rename = "type")] 170 pub event_type: SmolStr, 171 #[serde(borrow)] 172 #[serde(skip_serializing_if = "Option::is_none")] 173 pub record: Option<RecordEvt<'i>>, 174 #[serde(borrow)] 175 #[serde(skip_serializing_if = "Option::is_none")] 176 pub identity: Option<IdentityEvt<'i>>, 177 #[serde(borrow)] 178 #[serde(skip_serializing_if = "Option::is_none")] 179 pub account: Option<AccountEvt<'i>>, 180} 181 182#[derive(Clone, Debug)] 183pub enum BroadcastEvent { 184 #[allow(dead_code)] 185 Persisted(u64), 186 Ephemeral(Box<MarshallableEvt<'static>>), 187} 188 189#[derive(Debug, Serialize, Clone)] 190pub struct RecordEvt<'i> { 191 pub live: bool, 192 #[serde(borrow)] 193 pub did: Did<'i>, 194 pub rev: CowStr<'i>, 195 pub collection: CowStr<'i>, 196 pub rkey: CowStr<'i>, 197 pub action: CowStr<'i>, 198 #[serde(skip_serializing_if = "Option::is_none")] 199 pub record: Option<Value>, 200 #[serde(skip_serializing_if = "Option::is_none")] 201 pub cid: Option<CowStr<'i>>, 202} 203 204#[derive(Debug, Serialize, Clone)] 205pub struct IdentityEvt<'i> { 206 #[serde(borrow)] 207 pub did: Did<'i>, 208 #[serde(skip_serializing_if = "Option::is_none")] 209 pub handle: Option<Handle<'i>>, 210} 211 212#[derive(Debug, Serialize, Clone)] 213pub struct AccountEvt<'i> { 214 #[serde(borrow)] 215 pub did: Did<'i>, 216 pub active: bool, 217 #[serde(skip_serializing_if = "Option::is_none")] 218 pub status: Option<CowStr<'i>>, 219} 220 221#[derive(Debug, Serialize, Deserialize, Clone)] 222#[serde(bound(deserialize = "'i: 'de"))] 223pub struct StoredEvent<'i> { 224 #[serde(default)] 225 pub live: bool, 226 #[serde(borrow)] 227 pub did: TrimmedDid<'i>, 228 pub rev: DbTid, 229 #[serde(borrow)] 230 pub collection: CowStr<'i>, 231 pub rkey: DbRkey, 232 pub action: DbAction, 233 #[serde(default)] 234 #[serde(skip_serializing_if = "Option::is_none")] 235 pub cid: Option<IpldCid>, 236} 237 238#[derive(Debug, PartialEq, Eq, Clone, Copy)] 239pub enum GaugeState { 240 Synced, 241 Pending, 242 Resync(Option<ResyncErrorKind>), 243} 244 245impl GaugeState { 246 pub fn is_resync(&self) -> bool { 247 matches!(self, GaugeState::Resync(_)) 248 } 249}