at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use std::convert::Infallible;
2use std::option::Option;
3
4use futures::StreamExt;
5use jacquard_common::bytes::Bytes;
6use jacquard_common::error::DecodeError;
7use jacquard_common::{
8 CowStr,
9 types::{
10 cid::CidLink,
11 string::{Did, Handle, Tid},
12 },
13};
14use miette::Diagnostic;
15use smol_str::format_smolstr;
16use thiserror::Error;
17use tokio::net::TcpStream;
18use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message};
19use tracing::trace;
20use url::Url;
21
22#[derive(Debug, Error, Diagnostic)]
23pub enum FirehoseError {
24 #[error("websocket error: {0}")]
25 WebSocket(#[from] tokio_tungstenite::tungstenite::Error),
26 #[error("unknown scheme: {0}")]
27 UnknownScheme(String),
28 #[error("decode error: {0}")]
29 Decode(#[from] DecodeError),
30 #[error("empty frame")]
31 EmptyFrame,
32 #[error("relay error {error}: {message:?}")]
33 RelayError {
34 error: String,
35 message: Option<String>,
36 },
37 #[error("unknown op: {0}")]
38 UnknownOp(i64),
39 #[error("missing type in header")]
40 MissingType,
41 #[error("unknown event type: {0}")]
42 UnknownType(String),
43 #[error("cbor decode error: {0}")]
44 Cbor(String),
45}
46
47impl From<serde_ipld_dagcbor::DecodeError<Infallible>> for FirehoseError {
48 fn from(e: serde_ipld_dagcbor::DecodeError<Infallible>) -> Self {
49 Self::Cbor(e.to_string())
50 }
51}
52
53pub struct FirehoseStream {
54 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
55}
56
57impl FirehoseStream {
58 pub async fn connect(mut relay: Url, cursor: Option<i64>) -> Result<Self, FirehoseError> {
59 let scheme = match relay.scheme() {
60 "https" | "wss" => "wss",
61 "http" | "ws" => "ws",
62 x => return Err(FirehoseError::UnknownScheme(x.to_string())),
63 };
64 relay.set_scheme(scheme).expect("to be valid url");
65 relay.set_path("/xrpc/com.atproto.sync.subscribeRepos");
66 let cursor = cursor.map(|c| format_smolstr!("cursor={c}"));
67 relay.set_query(cursor.as_deref());
68
69 let (ws, _) = connect_async(relay.as_str()).await?;
70 Ok(Self { ws })
71 }
72
73 /// gets the next message bytes from the firehose
74 pub async fn next(&mut self) -> Option<Result<Bytes, FirehoseError>> {
75 loop {
76 match self.ws.next().await? {
77 Err(e) => return Some(Err(e.into())),
78 Ok(Message::Binary(bytes)) => {
79 if bytes.is_empty() {
80 return Some(Err(FirehoseError::EmptyFrame));
81 }
82 return Some(Ok(bytes));
83 }
84 Ok(Message::Close(_)) => return None,
85 Ok(x) => {
86 trace!(msg = ?x, "relay sent unexpected message");
87 continue;
88 }
89 }
90 }
91 }
92}
93
94#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
95#[serde(try_from = "String")]
96pub struct Datetime(pub chrono::DateTime<chrono::FixedOffset>);
97
98impl TryFrom<String> for Datetime {
99 type Error = chrono::ParseError;
100
101 fn try_from(s: String) -> Result<Self, Self::Error> {
102 chrono::DateTime::parse_from_rfc3339(&s)
103 .map(Self)
104 .or_else(|_| {
105 // no timezone — warn and assume UTC
106 tracing::warn!(
107 value = %s,
108 "datetime missing timezone suffix, assuming UTC"
109 );
110 chrono::NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%.f")
111 .map(|ndt| Self(ndt.and_utc().fixed_offset()))
112 })
113 }
114}
115
116impl jacquard_common::IntoStatic for Datetime {
117 type Output = Datetime;
118 fn into_static(self) -> Self::Output {
119 self
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124pub enum RepoOpAction<'a> {
125 Create,
126 Update,
127 Delete,
128 Other(jacquard_common::CowStr<'a>),
129}
130
131impl<'a> RepoOpAction<'a> {
132 pub fn as_str(&self) -> &str {
133 match self {
134 Self::Create => "create",
135 Self::Update => "update",
136 Self::Delete => "delete",
137 Self::Other(s) => s.as_ref(),
138 }
139 }
140}
141
142impl<'a> From<&'a str> for RepoOpAction<'a> {
143 fn from(s: &'a str) -> Self {
144 match s {
145 "create" => Self::Create,
146 "update" => Self::Update,
147 "delete" => Self::Delete,
148 _ => Self::Other(jacquard_common::CowStr::from(s)),
149 }
150 }
151}
152
153impl<'a> From<String> for RepoOpAction<'a> {
154 fn from(s: String) -> Self {
155 match s.as_str() {
156 "create" => Self::Create,
157 "update" => Self::Update,
158 "delete" => Self::Delete,
159 _ => Self::Other(jacquard_common::CowStr::from(s)),
160 }
161 }
162}
163
164impl<'a> core::fmt::Display for RepoOpAction<'a> {
165 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
166 write!(f, "{}", self.as_str())
167 }
168}
169
170impl<'a> AsRef<str> for RepoOpAction<'a> {
171 fn as_ref(&self) -> &str {
172 self.as_str()
173 }
174}
175
176impl<'a> serde::Serialize for RepoOpAction<'a> {
177 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
178 where
179 S: serde::Serializer,
180 {
181 serializer.serialize_str(self.as_str())
182 }
183}
184
185impl<'de, 'a> serde::Deserialize<'de> for RepoOpAction<'a>
186where
187 'de: 'a,
188{
189 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
190 where
191 D: serde::Deserializer<'de>,
192 {
193 let s = <&'de str>::deserialize(deserializer)?;
194 Ok(Self::from(s))
195 }
196}
197
198impl<'a> Default for RepoOpAction<'a> {
199 fn default() -> Self {
200 Self::Other(Default::default())
201 }
202}
203
204impl<'a> jacquard_common::IntoStatic for RepoOpAction<'a> {
205 type Output = RepoOpAction<'static>;
206 fn into_static(self) -> Self::Output {
207 match self {
208 RepoOpAction::Create => RepoOpAction::Create,
209 RepoOpAction::Update => RepoOpAction::Update,
210 RepoOpAction::Delete => RepoOpAction::Delete,
211 RepoOpAction::Other(v) => RepoOpAction::Other(v.into_static()),
212 }
213 }
214}
215
216#[derive(serde::Deserialize, serde::Serialize, Debug, Clone, jacquard_derive::IntoStatic)]
217#[serde(rename_all = "camelCase")]
218pub struct RepoOp<'a> {
219 #[serde(borrow)]
220 pub action: RepoOpAction<'a>,
221 /// For creates and updates, the new record CID. For deletions, null.
222 #[serde(borrow)]
223 pub cid: Option<CidLink<'a>>,
224 #[serde(borrow)]
225 pub path: jacquard_common::CowStr<'a>,
226 /// For updates and deletes, the previous record CID (required for inductive firehose). For creations, field should not be defined.
227 #[serde(skip_serializing_if = "std::option::Option::is_none")]
228 #[serde(default)]
229 #[serde(borrow)]
230 pub prev: Option<CidLink<'a>>,
231}
232
233#[derive(serde::Deserialize, serde::Serialize, Debug, Clone, jacquard_derive::IntoStatic)]
234#[serde(rename_all = "camelCase")]
235pub struct Commit<'a> {
236 #[serde(borrow)]
237 pub blobs: Vec<CidLink<'a>>,
238 #[serde(with = "jacquard_common::serde_bytes_helper")]
239 pub blocks: Bytes,
240 #[serde(borrow)]
241 pub commit: CidLink<'a>,
242 #[serde(borrow)]
243 pub ops: Vec<RepoOp<'a>>,
244 #[serde(skip_serializing_if = "Option::is_none")]
245 #[serde(default)]
246 #[serde(borrow)]
247 pub prev_data: Option<CidLink<'a>>,
248 pub rebase: bool,
249 #[serde(borrow)]
250 pub repo: Did<'a>,
251 pub rev: Tid,
252 pub seq: i64,
253 #[serde(deserialize_with = "deserialize_tid_or_empty")]
254 pub since: Option<Tid>,
255 pub time: Datetime,
256 pub too_big: bool,
257}
258
259#[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)]
260#[serde(rename_all = "camelCase")]
261pub struct Identity<'a> {
262 #[serde(borrow)]
263 pub did: Did<'a>,
264 #[serde(skip_serializing_if = "Option::is_none")]
265 #[serde(default)]
266 #[serde(borrow)]
267 pub handle: Option<Handle<'a>>,
268 pub seq: i64,
269 pub time: Datetime,
270}
271
272#[derive(Debug, Clone, PartialEq, Eq, Hash)]
273pub enum AccountStatus<'a> {
274 Takendown,
275 Suspended,
276 Deleted,
277 Deactivated,
278 Desynchronized,
279 Throttled,
280 Other(jacquard_common::CowStr<'a>),
281}
282
283impl<'a> AccountStatus<'a> {
284 pub fn as_str(&self) -> &str {
285 match self {
286 Self::Takendown => "takendown",
287 Self::Suspended => "suspended",
288 Self::Deleted => "deleted",
289 Self::Deactivated => "deactivated",
290 Self::Desynchronized => "desynchronized",
291 Self::Throttled => "throttled",
292 Self::Other(s) => s.as_ref(),
293 }
294 }
295}
296
297impl<'a> From<&'a str> for AccountStatus<'a> {
298 fn from(s: &'a str) -> Self {
299 match s {
300 "takendown" => Self::Takendown,
301 "suspended" => Self::Suspended,
302 "deleted" => Self::Deleted,
303 "deactivated" => Self::Deactivated,
304 "desynchronized" => Self::Desynchronized,
305 "throttled" => Self::Throttled,
306 _ => Self::Other(jacquard_common::CowStr::from(s)),
307 }
308 }
309}
310
311impl<'a> From<String> for AccountStatus<'a> {
312 fn from(s: String) -> Self {
313 match s.as_str() {
314 "takendown" => Self::Takendown,
315 "suspended" => Self::Suspended,
316 "deleted" => Self::Deleted,
317 "deactivated" => Self::Deactivated,
318 "desynchronized" => Self::Desynchronized,
319 "throttled" => Self::Throttled,
320 _ => Self::Other(jacquard_common::CowStr::from(s)),
321 }
322 }
323}
324
325impl<'a> core::fmt::Display for AccountStatus<'a> {
326 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
327 write!(f, "{}", self.as_str())
328 }
329}
330
331impl<'a> AsRef<str> for AccountStatus<'a> {
332 fn as_ref(&self) -> &str {
333 self.as_str()
334 }
335}
336
337impl<'a> serde::Serialize for AccountStatus<'a> {
338 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
339 where
340 S: serde::Serializer,
341 {
342 serializer.serialize_str(self.as_str())
343 }
344}
345
346impl<'de, 'a> serde::Deserialize<'de> for AccountStatus<'a>
347where
348 'de: 'a,
349{
350 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
351 where
352 D: serde::Deserializer<'de>,
353 {
354 let s = <&'de str>::deserialize(deserializer)?;
355 Ok(Self::from(s))
356 }
357}
358
359impl<'a> Default for AccountStatus<'a> {
360 fn default() -> Self {
361 Self::Other(Default::default())
362 }
363}
364
365impl jacquard_common::IntoStatic for AccountStatus<'_> {
366 type Output = AccountStatus<'static>;
367 fn into_static(self) -> Self::Output {
368 match self {
369 AccountStatus::Takendown => AccountStatus::Takendown,
370 AccountStatus::Suspended => AccountStatus::Suspended,
371 AccountStatus::Deleted => AccountStatus::Deleted,
372 AccountStatus::Deactivated => AccountStatus::Deactivated,
373 AccountStatus::Desynchronized => AccountStatus::Desynchronized,
374 AccountStatus::Throttled => AccountStatus::Throttled,
375 AccountStatus::Other(v) => AccountStatus::Other(v.into_static()),
376 }
377 }
378}
379
380#[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)]
381#[serde(rename_all = "camelCase")]
382pub struct Account<'a> {
383 pub active: bool,
384 #[serde(borrow)]
385 pub did: Did<'a>,
386 pub seq: i64,
387 pub status: Option<AccountStatus<'a>>,
388 pub time: Datetime,
389}
390
391#[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)]
392#[serde(rename_all = "camelCase")]
393pub struct Sync<'a> {
394 #[serde(with = "jacquard_common::serde_bytes_helper")]
395 pub blocks: Bytes,
396 #[serde(borrow)]
397 pub did: Did<'a>,
398 #[serde(borrow)]
399 pub rev: CowStr<'a>,
400 pub seq: i64,
401 pub time: Datetime,
402}
403
404#[derive(Debug, Clone, PartialEq, Eq, Hash)]
405pub enum InfoName<'a> {
406 OutdatedCursor,
407 Other(jacquard_common::CowStr<'a>),
408}
409
410impl<'a> InfoName<'a> {
411 pub fn as_str(&self) -> &str {
412 match self {
413 Self::OutdatedCursor => "OutdatedCursor",
414 Self::Other(s) => s.as_ref(),
415 }
416 }
417}
418
419impl<'a> From<&'a str> for InfoName<'a> {
420 fn from(s: &'a str) -> Self {
421 match s {
422 "OutdatedCursor" => Self::OutdatedCursor,
423 _ => Self::Other(jacquard_common::CowStr::from(s)),
424 }
425 }
426}
427
428impl<'a> From<String> for InfoName<'a> {
429 fn from(s: String) -> Self {
430 match s.as_str() {
431 "OutdatedCursor" => Self::OutdatedCursor,
432 _ => Self::Other(jacquard_common::CowStr::from(s)),
433 }
434 }
435}
436
437impl<'a> core::fmt::Display for InfoName<'a> {
438 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
439 write!(f, "{}", self.as_str())
440 }
441}
442
443impl<'a> AsRef<str> for InfoName<'a> {
444 fn as_ref(&self) -> &str {
445 self.as_str()
446 }
447}
448
449impl<'a> serde::Serialize for InfoName<'a> {
450 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
451 where
452 S: serde::Serializer,
453 {
454 serializer.serialize_str(self.as_str())
455 }
456}
457
458impl<'de, 'a> serde::Deserialize<'de> for InfoName<'a>
459where
460 'de: 'a,
461{
462 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
463 where
464 D: serde::Deserializer<'de>,
465 {
466 let s = <&'de str>::deserialize(deserializer)?;
467 Ok(Self::from(s))
468 }
469}
470
471impl<'a> Default for InfoName<'a> {
472 fn default() -> Self {
473 Self::Other(Default::default())
474 }
475}
476
477impl<'a> jacquard_common::IntoStatic for InfoName<'a> {
478 type Output = InfoName<'static>;
479 fn into_static(self) -> Self::Output {
480 match self {
481 InfoName::OutdatedCursor => InfoName::OutdatedCursor,
482 InfoName::Other(v) => InfoName::Other(v.into_static()),
483 }
484 }
485}
486
487#[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)]
488#[serde(rename_all = "camelCase")]
489pub struct Info<'a> {
490 #[serde(skip_serializing_if = "Option::is_none")]
491 #[serde(borrow)]
492 pub message: Option<CowStr<'a>>,
493 #[serde(borrow)]
494 pub name: InfoName<'a>,
495}
496
497#[derive(Debug, Clone, jacquard_derive::IntoStatic)]
498pub enum SubscribeReposMessage<'i> {
499 Commit(Box<Commit<'i>>),
500 Sync(Box<Sync<'i>>),
501 Identity(Box<Identity<'i>>),
502 Account(Box<Account<'i>>),
503 Info(Box<Info<'i>>),
504}
505
506use serde::Deserialize;
507use serde_ipld_dagcbor::de::Deserializer;
508
509// some relays send `""` for `since` when there is no previous revision instead of null
510fn deserialize_tid_or_empty<'de, D>(deserializer: D) -> Result<Option<Tid>, D::Error>
511where
512 D: serde::Deserializer<'de>,
513{
514 let s = <Option<String>>::deserialize(deserializer)?;
515 match s.as_deref() {
516 None => Ok(None),
517 Some("") => {
518 tracing::warn!("received since with empty string instead of null");
519 Ok(None)
520 }
521 Some(s) => s.parse::<Tid>().map(Some).map_err(serde::de::Error::custom),
522 }
523}
524
525#[derive(Debug, Deserialize)]
526struct EventHeader {
527 op: i64,
528 t: Option<String>,
529}
530
531#[derive(Deserialize)]
532struct ErrorFrame {
533 error: String,
534 message: Option<String>,
535}
536
537pub fn decode_frame<'i>(bytes: &'i [u8]) -> Result<SubscribeReposMessage<'i>, FirehoseError> {
538 let mut de = Deserializer::from_slice(bytes);
539 let header = EventHeader::deserialize(&mut de)?;
540
541 match header.op {
542 -1 => {
543 let err = ErrorFrame::deserialize(&mut de)?;
544 return Err(FirehoseError::RelayError {
545 error: err.error,
546 message: err.message,
547 });
548 }
549 1 => {}
550 op => return Err(FirehoseError::UnknownOp(op)),
551 }
552
553 let t = header.t.ok_or(FirehoseError::MissingType)?;
554
555 let msg = match t.as_str() {
556 "#commit" => SubscribeReposMessage::Commit(Box::new(Deserialize::deserialize(&mut de)?)),
557 "#account" => SubscribeReposMessage::Account(Box::new(Deserialize::deserialize(&mut de)?)),
558 "#identity" => {
559 SubscribeReposMessage::Identity(Box::new(Deserialize::deserialize(&mut de)?))
560 }
561 "#sync" => SubscribeReposMessage::Sync(Box::new(Deserialize::deserialize(&mut de)?)),
562 "#info" => SubscribeReposMessage::Info(Box::new(Deserialize::deserialize(&mut de)?)),
563 other => return Err(FirehoseError::UnknownType(other.to_string())),
564 };
565
566 Ok(msg)
567}
568
569#[cfg(test)]
570mod test {
571 use super::{SubscribeReposMessage, decode_frame};
572
573 #[test]
574 fn test_decode_account() {
575 const FRAME: &[u8] = b"omF0aCNhY2NvdW50Ym9wAaRjZGlkeCBkaWQ6cGxjOjNuNDNncWY2YTZua3J3MzU1cnNjNnJ0ZGNzZXEbAAAAAlP5Zt5kdGltZXgbMjAyNi0wMi0yNFQxNDowNToyMC41MjE1NDY5ZmFjdGl2ZfU=";
576 let bytes = data_encoding::BASE64.decode(FRAME).unwrap();
577 let msg = decode_frame(&bytes).unwrap();
578 assert!(matches!(msg, SubscribeReposMessage::Account(_)));
579 }
580
581 /// regression: some relays send `since: ""` (empty string) instead of null/absent for the initial commit.
582 /// this should decode cleanly with `since = None`.
583 /// TODO: is this behaviour we should reject?
584 #[test]
585 fn test_decode_commit_empty_since() {
586 const FRAME: &[u8] = b"omF0ZyNjb21taXRib3ABq2NvcHOAY3Jldm0zbWdkeWNmdWIyeTIyY3NlcRsAAAACZ8sHJmRyZXBveCBkaWQ6cGxjOmpxM3p2cmI1ZXdnMnFndXA3M3Fzb3V6ZWR0aW1leBsyMDI2LTAzLTA1VDE1OjQ3OjU0LjcxNjMyOFplYmxvYnOAZXNpbmNlYGZibG9ja3NZAUk6omVyb290c4HYKlglAAFxEiAlaO/rjabPL4/e2QlxkoxzCCwv69hE4P3Vdxpv7f6uEWd2ZXJzaW9uAeABAXESICVo7+uNps8vj97ZCXGSjHMILC/r2ETg/dV3Gm/t/q4RpmNkaWR4IGRpZDpwbGM6anEzenZyYjVld2cycWd1cDczcXNvdXplY3Jldm0zbWdkeWNmdWIyeTIyY3NpZ1hAwKfrZtwwbN7dW0uSbviOs65NWQRvlS9Qc7oRtiorybMTEYxKGJaFK2kHIMEWIJqumb4751En2aJEpsilWlaQOWRkYXRh2CpYJQABcRIgnf7+Yd126j3K5QI4gLCDedV63yBILW/b4nWSifZHZ3tkcHJldvZndmVyc2lvbgMrAXESIJ3+/mHdduo9yuUCOICwg3nVet8gSC1v2+J1kon2R2d7omFlgGFs9mZjb21taXTYKlglAAFxEiAlaO/rjabPL4/e2QlxkoxzCCwv69hE4P3Vdxpv7f6uEWZyZWJhc2X0ZnRvb0JpZ/Q=";
587 let bytes = data_encoding::BASE64.decode(FRAME).unwrap();
588 let msg = decode_frame(&bytes).unwrap();
589 let SubscribeReposMessage::Commit(c) = msg else {
590 panic!("expected Commit");
591 };
592 assert!(c.since.is_none(), "since should be None for empty string");
593 }
594}