very fast at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
fjall
at-protocol
atproto
indexer
rust
1use std::fmt::Display;
2
3use jacquard_common::types::cid::IpldCid;
4use jacquard_common::types::string::Did;
5use jacquard_common::{CowStr, IntoStatic, types::string::Handle};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use smol_str::{SmolStr, ToSmolStr};
9
10use crate::db::types::{DbAction, DbRkey, DbTid, DidKey, TrimmedDid};
11use crate::resolver::MiniDoc;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14pub enum RepoStatus {
15 Backfilling,
16 Synced,
17 Error(SmolStr),
18 Deactivated,
19 Takendown,
20 Suspended,
21}
22
23impl Display for RepoStatus {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 RepoStatus::Backfilling => write!(f, "backfilling"),
27 RepoStatus::Synced => write!(f, "synced"),
28 RepoStatus::Error(e) => write!(f, "error({e})"),
29 RepoStatus::Deactivated => write!(f, "deactivated"),
30 RepoStatus::Takendown => write!(f, "takendown"),
31 RepoStatus::Suspended => write!(f, "suspended"),
32 }
33 }
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(bound(deserialize = "'i: 'de"))]
38pub struct RepoState<'i> {
39 pub status: RepoStatus,
40 pub rev: Option<DbTid>,
41 pub data: Option<IpldCid>,
42 // todo: is this actually valid? the spec says this is informal and intermadiate
43 // services may change it. we should probably document it. if we cant use this
44 // then how do we dedup account / identity ops?
45 /// ms since epoch of the last firehose message we processed for this repo.
46 /// used to deduplicate identity / account events that can arrive from multiple relays at
47 /// different wall-clock times but represent the same underlying PDS event.
48 #[serde(default)]
49 pub last_message_time: Option<i64>,
50 /// this is when we *ingested* any last updates
51 pub last_updated_at: i64, // unix timestamp
52 /// whether we are ingesting events for this repo
53 pub tracked: bool,
54 /// index id in pending keyspace
55 pub index_id: u64,
56 #[serde(borrow)]
57 pub signing_key: Option<DidKey<'i>>,
58 #[serde(borrow)]
59 pub pds: Option<CowStr<'i>>,
60 #[serde(borrow)]
61 pub handle: Option<Handle<'i>>,
62}
63
64impl<'i> RepoState<'i> {
65 pub fn backfilling(index_id: u64) -> Self {
66 Self {
67 status: RepoStatus::Backfilling,
68 rev: None,
69 data: None,
70 last_updated_at: chrono::Utc::now().timestamp(),
71 index_id,
72 tracked: true,
73 handle: None,
74 pds: None,
75 signing_key: None,
76 last_message_time: None,
77 }
78 }
79
80 /// backfilling, but not tracked yet
81 pub fn untracked(index_id: u64) -> Self {
82 Self {
83 tracked: false,
84 ..Self::backfilling(index_id)
85 }
86 }
87
88 // advances the high-water mark to event_ms if it's newer than what we've seen
89 pub fn advance_message_time(&mut self, event_ms: i64) {
90 self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0)));
91 }
92
93 // updates last_updated_at to now
94 pub fn touch(&mut self) {
95 self.last_updated_at = chrono::Utc::now().timestamp();
96 }
97
98 pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool {
99 let new_signing_key = doc.key.map(From::from);
100 let changed = self.pds.as_deref() != Some(doc.pds.as_str())
101 || self.handle != doc.handle
102 || self.signing_key != new_signing_key;
103 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr()));
104 self.handle = doc.handle;
105 self.signing_key = new_signing_key;
106 changed
107 }
108}
109
110impl<'i> IntoStatic for RepoState<'i> {
111 type Output = RepoState<'static>;
112
113 fn into_static(self) -> Self::Output {
114 RepoState {
115 status: self.status,
116 rev: self.rev,
117 data: self.data,
118 last_updated_at: self.last_updated_at,
119 index_id: self.index_id,
120 tracked: self.tracked,
121 handle: self.handle.map(IntoStatic::into_static),
122 pds: self.pds.map(IntoStatic::into_static),
123 signing_key: self.signing_key.map(IntoStatic::into_static),
124 last_message_time: self.last_message_time,
125 }
126 }
127}
128
129#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
130pub enum ResyncErrorKind {
131 Ratelimited,
132 Transport,
133 Generic,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub enum ResyncState {
138 Error {
139 kind: ResyncErrorKind,
140 retry_count: u32,
141 next_retry: i64, // unix timestamp
142 },
143 Gone {
144 status: RepoStatus, // deactivated, takendown, suspended
145 },
146}
147
148impl ResyncState {
149 pub fn next_backoff(retry_count: u32) -> i64 {
150 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
151 let base = 60;
152 let cap = 3600;
153 let mult = 2u64.pow(retry_count.min(10)) as i64;
154 let delay = (base * mult).min(cap);
155
156 // add +/- 10% jitter
157 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
158 let delay = (delay as f64 + jitter) as i64;
159
160 chrono::Utc::now().timestamp() + delay
161 }
162}
163
164// from src/api/event.rs
165
166#[derive(Debug, Serialize, Clone)]
167pub struct MarshallableEvt<'i> {
168 pub id: u64,
169 #[serde(rename = "type")]
170 pub event_type: SmolStr,
171 #[serde(borrow)]
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub record: Option<RecordEvt<'i>>,
174 #[serde(borrow)]
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub identity: Option<IdentityEvt<'i>>,
177 #[serde(borrow)]
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub account: Option<AccountEvt<'i>>,
180}
181
182#[derive(Clone, Debug)]
183pub enum BroadcastEvent {
184 #[allow(dead_code)]
185 Persisted(u64),
186 Ephemeral(Box<MarshallableEvt<'static>>),
187}
188
189#[derive(Debug, Serialize, Clone)]
190pub struct RecordEvt<'i> {
191 pub live: bool,
192 #[serde(borrow)]
193 pub did: Did<'i>,
194 pub rev: CowStr<'i>,
195 pub collection: CowStr<'i>,
196 pub rkey: CowStr<'i>,
197 pub action: CowStr<'i>,
198 #[serde(skip_serializing_if = "Option::is_none")]
199 pub record: Option<Value>,
200 #[serde(skip_serializing_if = "Option::is_none")]
201 pub cid: Option<CowStr<'i>>,
202}
203
204#[derive(Debug, Serialize, Clone)]
205pub struct IdentityEvt<'i> {
206 #[serde(borrow)]
207 pub did: Did<'i>,
208 #[serde(skip_serializing_if = "Option::is_none")]
209 pub handle: Option<Handle<'i>>,
210}
211
212#[derive(Debug, Serialize, Clone)]
213pub struct AccountEvt<'i> {
214 #[serde(borrow)]
215 pub did: Did<'i>,
216 pub active: bool,
217 #[serde(skip_serializing_if = "Option::is_none")]
218 pub status: Option<CowStr<'i>>,
219}
220
221#[derive(Debug, Serialize, Deserialize, Clone)]
222#[serde(bound(deserialize = "'i: 'de"))]
223pub struct StoredEvent<'i> {
224 #[serde(default)]
225 pub live: bool,
226 #[serde(borrow)]
227 pub did: TrimmedDid<'i>,
228 pub rev: DbTid,
229 #[serde(borrow)]
230 pub collection: CowStr<'i>,
231 pub rkey: DbRkey,
232 pub action: DbAction,
233 #[serde(default)]
234 #[serde(skip_serializing_if = "Option::is_none")]
235 pub cid: Option<IpldCid>,
236}
237
238#[derive(Debug, PartialEq, Eq, Clone, Copy)]
239pub enum GaugeState {
240 Synced,
241 Pending,
242 Resync(Option<ResyncErrorKind>),
243}
244
245impl GaugeState {
246 pub fn is_resync(&self) -> bool {
247 matches!(self, GaugeState::Resync(_))
248 }
249}