at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 594 lines 18 kB view raw
1use std::convert::Infallible; 2use std::option::Option; 3 4use futures::StreamExt; 5use jacquard_common::bytes::Bytes; 6use jacquard_common::error::DecodeError; 7use jacquard_common::{ 8 CowStr, 9 types::{ 10 cid::CidLink, 11 string::{Did, Handle, Tid}, 12 }, 13}; 14use miette::Diagnostic; 15use smol_str::format_smolstr; 16use thiserror::Error; 17use tokio::net::TcpStream; 18use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message}; 19use tracing::trace; 20use url::Url; 21 22#[derive(Debug, Error, Diagnostic)] 23pub enum FirehoseError { 24 #[error("websocket error: {0}")] 25 WebSocket(#[from] tokio_tungstenite::tungstenite::Error), 26 #[error("unknown scheme: {0}")] 27 UnknownScheme(String), 28 #[error("decode error: {0}")] 29 Decode(#[from] DecodeError), 30 #[error("empty frame")] 31 EmptyFrame, 32 #[error("relay error {error}: {message:?}")] 33 RelayError { 34 error: String, 35 message: Option<String>, 36 }, 37 #[error("unknown op: {0}")] 38 UnknownOp(i64), 39 #[error("missing type in header")] 40 MissingType, 41 #[error("unknown event type: {0}")] 42 UnknownType(String), 43 #[error("cbor decode error: {0}")] 44 Cbor(String), 45} 46 47impl From<serde_ipld_dagcbor::DecodeError<Infallible>> for FirehoseError { 48 fn from(e: serde_ipld_dagcbor::DecodeError<Infallible>) -> Self { 49 Self::Cbor(e.to_string()) 50 } 51} 52 53pub struct FirehoseStream { 54 ws: WebSocketStream<MaybeTlsStream<TcpStream>>, 55} 56 57impl FirehoseStream { 58 pub async fn connect(mut relay: Url, cursor: Option<i64>) -> Result<Self, FirehoseError> { 59 let scheme = match relay.scheme() { 60 "https" | "wss" => "wss", 61 "http" | "ws" => "ws", 62 x => return Err(FirehoseError::UnknownScheme(x.to_string())), 63 }; 64 relay.set_scheme(scheme).expect("to be valid url"); 65 relay.set_path("/xrpc/com.atproto.sync.subscribeRepos"); 66 let cursor = cursor.map(|c| format_smolstr!("cursor={c}")); 67 relay.set_query(cursor.as_deref()); 68 69 let (ws, _) = connect_async(relay.as_str()).await?; 70 Ok(Self { ws }) 71 } 72 73 /// gets the next message bytes from the firehose 74 pub async fn next(&mut self) -> Option<Result<Bytes, FirehoseError>> { 75 loop { 76 match self.ws.next().await? { 77 Err(e) => return Some(Err(e.into())), 78 Ok(Message::Binary(bytes)) => { 79 if bytes.is_empty() { 80 return Some(Err(FirehoseError::EmptyFrame)); 81 } 82 return Some(Ok(bytes)); 83 } 84 Ok(Message::Close(_)) => return None, 85 Ok(x) => { 86 trace!(msg = ?x, "relay sent unexpected message"); 87 continue; 88 } 89 } 90 } 91 } 92} 93 94#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] 95#[serde(try_from = "String")] 96pub struct Datetime(pub chrono::DateTime<chrono::FixedOffset>); 97 98impl TryFrom<String> for Datetime { 99 type Error = chrono::ParseError; 100 101 fn try_from(s: String) -> Result<Self, Self::Error> { 102 chrono::DateTime::parse_from_rfc3339(&s) 103 .map(Self) 104 .or_else(|_| { 105 // no timezone — warn and assume UTC 106 tracing::warn!( 107 value = %s, 108 "datetime missing timezone suffix, assuming UTC" 109 ); 110 chrono::NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%.f") 111 .map(|ndt| Self(ndt.and_utc().fixed_offset())) 112 }) 113 } 114} 115 116impl jacquard_common::IntoStatic for Datetime { 117 type Output = Datetime; 118 fn into_static(self) -> Self::Output { 119 self 120 } 121} 122 123#[derive(Debug, Clone, PartialEq, Eq, Hash)] 124pub enum RepoOpAction<'a> { 125 Create, 126 Update, 127 Delete, 128 Other(jacquard_common::CowStr<'a>), 129} 130 131impl<'a> RepoOpAction<'a> { 132 pub fn as_str(&self) -> &str { 133 match self { 134 Self::Create => "create", 135 Self::Update => "update", 136 Self::Delete => "delete", 137 Self::Other(s) => s.as_ref(), 138 } 139 } 140} 141 142impl<'a> From<&'a str> for RepoOpAction<'a> { 143 fn from(s: &'a str) -> Self { 144 match s { 145 "create" => Self::Create, 146 "update" => Self::Update, 147 "delete" => Self::Delete, 148 _ => Self::Other(jacquard_common::CowStr::from(s)), 149 } 150 } 151} 152 153impl<'a> From<String> for RepoOpAction<'a> { 154 fn from(s: String) -> Self { 155 match s.as_str() { 156 "create" => Self::Create, 157 "update" => Self::Update, 158 "delete" => Self::Delete, 159 _ => Self::Other(jacquard_common::CowStr::from(s)), 160 } 161 } 162} 163 164impl<'a> core::fmt::Display for RepoOpAction<'a> { 165 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 166 write!(f, "{}", self.as_str()) 167 } 168} 169 170impl<'a> AsRef<str> for RepoOpAction<'a> { 171 fn as_ref(&self) -> &str { 172 self.as_str() 173 } 174} 175 176impl<'a> serde::Serialize for RepoOpAction<'a> { 177 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 178 where 179 S: serde::Serializer, 180 { 181 serializer.serialize_str(self.as_str()) 182 } 183} 184 185impl<'de, 'a> serde::Deserialize<'de> for RepoOpAction<'a> 186where 187 'de: 'a, 188{ 189 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 190 where 191 D: serde::Deserializer<'de>, 192 { 193 let s = <&'de str>::deserialize(deserializer)?; 194 Ok(Self::from(s)) 195 } 196} 197 198impl<'a> Default for RepoOpAction<'a> { 199 fn default() -> Self { 200 Self::Other(Default::default()) 201 } 202} 203 204impl<'a> jacquard_common::IntoStatic for RepoOpAction<'a> { 205 type Output = RepoOpAction<'static>; 206 fn into_static(self) -> Self::Output { 207 match self { 208 RepoOpAction::Create => RepoOpAction::Create, 209 RepoOpAction::Update => RepoOpAction::Update, 210 RepoOpAction::Delete => RepoOpAction::Delete, 211 RepoOpAction::Other(v) => RepoOpAction::Other(v.into_static()), 212 } 213 } 214} 215 216#[derive(serde::Deserialize, serde::Serialize, Debug, Clone, jacquard_derive::IntoStatic)] 217#[serde(rename_all = "camelCase")] 218pub struct RepoOp<'a> { 219 #[serde(borrow)] 220 pub action: RepoOpAction<'a>, 221 /// For creates and updates, the new record CID. For deletions, null. 222 #[serde(borrow)] 223 pub cid: Option<CidLink<'a>>, 224 #[serde(borrow)] 225 pub path: jacquard_common::CowStr<'a>, 226 /// For updates and deletes, the previous record CID (required for inductive firehose). For creations, field should not be defined. 227 #[serde(skip_serializing_if = "std::option::Option::is_none")] 228 #[serde(default)] 229 #[serde(borrow)] 230 pub prev: Option<CidLink<'a>>, 231} 232 233#[derive(serde::Deserialize, serde::Serialize, Debug, Clone, jacquard_derive::IntoStatic)] 234#[serde(rename_all = "camelCase")] 235pub struct Commit<'a> { 236 #[serde(borrow)] 237 pub blobs: Vec<CidLink<'a>>, 238 #[serde(with = "jacquard_common::serde_bytes_helper")] 239 pub blocks: Bytes, 240 #[serde(borrow)] 241 pub commit: CidLink<'a>, 242 #[serde(borrow)] 243 pub ops: Vec<RepoOp<'a>>, 244 #[serde(skip_serializing_if = "Option::is_none")] 245 #[serde(default)] 246 #[serde(borrow)] 247 pub prev_data: Option<CidLink<'a>>, 248 pub rebase: bool, 249 #[serde(borrow)] 250 pub repo: Did<'a>, 251 pub rev: Tid, 252 pub seq: i64, 253 #[serde(deserialize_with = "deserialize_tid_or_empty")] 254 pub since: Option<Tid>, 255 pub time: Datetime, 256 pub too_big: bool, 257} 258 259#[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)] 260#[serde(rename_all = "camelCase")] 261pub struct Identity<'a> { 262 #[serde(borrow)] 263 pub did: Did<'a>, 264 #[serde(skip_serializing_if = "Option::is_none")] 265 #[serde(default)] 266 #[serde(borrow)] 267 pub handle: Option<Handle<'a>>, 268 pub seq: i64, 269 pub time: Datetime, 270} 271 272#[derive(Debug, Clone, PartialEq, Eq, Hash)] 273pub enum AccountStatus<'a> { 274 Takendown, 275 Suspended, 276 Deleted, 277 Deactivated, 278 Desynchronized, 279 Throttled, 280 Other(jacquard_common::CowStr<'a>), 281} 282 283impl<'a> AccountStatus<'a> { 284 pub fn as_str(&self) -> &str { 285 match self { 286 Self::Takendown => "takendown", 287 Self::Suspended => "suspended", 288 Self::Deleted => "deleted", 289 Self::Deactivated => "deactivated", 290 Self::Desynchronized => "desynchronized", 291 Self::Throttled => "throttled", 292 Self::Other(s) => s.as_ref(), 293 } 294 } 295} 296 297impl<'a> From<&'a str> for AccountStatus<'a> { 298 fn from(s: &'a str) -> Self { 299 match s { 300 "takendown" => Self::Takendown, 301 "suspended" => Self::Suspended, 302 "deleted" => Self::Deleted, 303 "deactivated" => Self::Deactivated, 304 "desynchronized" => Self::Desynchronized, 305 "throttled" => Self::Throttled, 306 _ => Self::Other(jacquard_common::CowStr::from(s)), 307 } 308 } 309} 310 311impl<'a> From<String> for AccountStatus<'a> { 312 fn from(s: String) -> Self { 313 match s.as_str() { 314 "takendown" => Self::Takendown, 315 "suspended" => Self::Suspended, 316 "deleted" => Self::Deleted, 317 "deactivated" => Self::Deactivated, 318 "desynchronized" => Self::Desynchronized, 319 "throttled" => Self::Throttled, 320 _ => Self::Other(jacquard_common::CowStr::from(s)), 321 } 322 } 323} 324 325impl<'a> core::fmt::Display for AccountStatus<'a> { 326 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 327 write!(f, "{}", self.as_str()) 328 } 329} 330 331impl<'a> AsRef<str> for AccountStatus<'a> { 332 fn as_ref(&self) -> &str { 333 self.as_str() 334 } 335} 336 337impl<'a> serde::Serialize for AccountStatus<'a> { 338 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 339 where 340 S: serde::Serializer, 341 { 342 serializer.serialize_str(self.as_str()) 343 } 344} 345 346impl<'de, 'a> serde::Deserialize<'de> for AccountStatus<'a> 347where 348 'de: 'a, 349{ 350 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 351 where 352 D: serde::Deserializer<'de>, 353 { 354 let s = <&'de str>::deserialize(deserializer)?; 355 Ok(Self::from(s)) 356 } 357} 358 359impl<'a> Default for AccountStatus<'a> { 360 fn default() -> Self { 361 Self::Other(Default::default()) 362 } 363} 364 365impl jacquard_common::IntoStatic for AccountStatus<'_> { 366 type Output = AccountStatus<'static>; 367 fn into_static(self) -> Self::Output { 368 match self { 369 AccountStatus::Takendown => AccountStatus::Takendown, 370 AccountStatus::Suspended => AccountStatus::Suspended, 371 AccountStatus::Deleted => AccountStatus::Deleted, 372 AccountStatus::Deactivated => AccountStatus::Deactivated, 373 AccountStatus::Desynchronized => AccountStatus::Desynchronized, 374 AccountStatus::Throttled => AccountStatus::Throttled, 375 AccountStatus::Other(v) => AccountStatus::Other(v.into_static()), 376 } 377 } 378} 379 380#[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)] 381#[serde(rename_all = "camelCase")] 382pub struct Account<'a> { 383 pub active: bool, 384 #[serde(borrow)] 385 pub did: Did<'a>, 386 pub seq: i64, 387 pub status: Option<AccountStatus<'a>>, 388 pub time: Datetime, 389} 390 391#[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)] 392#[serde(rename_all = "camelCase")] 393pub struct Sync<'a> { 394 #[serde(with = "jacquard_common::serde_bytes_helper")] 395 pub blocks: Bytes, 396 #[serde(borrow)] 397 pub did: Did<'a>, 398 #[serde(borrow)] 399 pub rev: CowStr<'a>, 400 pub seq: i64, 401 pub time: Datetime, 402} 403 404#[derive(Debug, Clone, PartialEq, Eq, Hash)] 405pub enum InfoName<'a> { 406 OutdatedCursor, 407 Other(jacquard_common::CowStr<'a>), 408} 409 410impl<'a> InfoName<'a> { 411 pub fn as_str(&self) -> &str { 412 match self { 413 Self::OutdatedCursor => "OutdatedCursor", 414 Self::Other(s) => s.as_ref(), 415 } 416 } 417} 418 419impl<'a> From<&'a str> for InfoName<'a> { 420 fn from(s: &'a str) -> Self { 421 match s { 422 "OutdatedCursor" => Self::OutdatedCursor, 423 _ => Self::Other(jacquard_common::CowStr::from(s)), 424 } 425 } 426} 427 428impl<'a> From<String> for InfoName<'a> { 429 fn from(s: String) -> Self { 430 match s.as_str() { 431 "OutdatedCursor" => Self::OutdatedCursor, 432 _ => Self::Other(jacquard_common::CowStr::from(s)), 433 } 434 } 435} 436 437impl<'a> core::fmt::Display for InfoName<'a> { 438 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { 439 write!(f, "{}", self.as_str()) 440 } 441} 442 443impl<'a> AsRef<str> for InfoName<'a> { 444 fn as_ref(&self) -> &str { 445 self.as_str() 446 } 447} 448 449impl<'a> serde::Serialize for InfoName<'a> { 450 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 451 where 452 S: serde::Serializer, 453 { 454 serializer.serialize_str(self.as_str()) 455 } 456} 457 458impl<'de, 'a> serde::Deserialize<'de> for InfoName<'a> 459where 460 'de: 'a, 461{ 462 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 463 where 464 D: serde::Deserializer<'de>, 465 { 466 let s = <&'de str>::deserialize(deserializer)?; 467 Ok(Self::from(s)) 468 } 469} 470 471impl<'a> Default for InfoName<'a> { 472 fn default() -> Self { 473 Self::Other(Default::default()) 474 } 475} 476 477impl<'a> jacquard_common::IntoStatic for InfoName<'a> { 478 type Output = InfoName<'static>; 479 fn into_static(self) -> Self::Output { 480 match self { 481 InfoName::OutdatedCursor => InfoName::OutdatedCursor, 482 InfoName::Other(v) => InfoName::Other(v.into_static()), 483 } 484 } 485} 486 487#[derive(serde::Deserialize, Debug, Clone, jacquard_derive::IntoStatic)] 488#[serde(rename_all = "camelCase")] 489pub struct Info<'a> { 490 #[serde(skip_serializing_if = "Option::is_none")] 491 #[serde(borrow)] 492 pub message: Option<CowStr<'a>>, 493 #[serde(borrow)] 494 pub name: InfoName<'a>, 495} 496 497#[derive(Debug, Clone, jacquard_derive::IntoStatic)] 498pub enum SubscribeReposMessage<'i> { 499 Commit(Box<Commit<'i>>), 500 Sync(Box<Sync<'i>>), 501 Identity(Box<Identity<'i>>), 502 Account(Box<Account<'i>>), 503 Info(Box<Info<'i>>), 504} 505 506use serde::Deserialize; 507use serde_ipld_dagcbor::de::Deserializer; 508 509// some relays send `""` for `since` when there is no previous revision instead of null 510fn deserialize_tid_or_empty<'de, D>(deserializer: D) -> Result<Option<Tid>, D::Error> 511where 512 D: serde::Deserializer<'de>, 513{ 514 let s = <Option<String>>::deserialize(deserializer)?; 515 match s.as_deref() { 516 None => Ok(None), 517 Some("") => { 518 tracing::warn!("received since with empty string instead of null"); 519 Ok(None) 520 } 521 Some(s) => s.parse::<Tid>().map(Some).map_err(serde::de::Error::custom), 522 } 523} 524 525#[derive(Debug, Deserialize)] 526struct EventHeader { 527 op: i64, 528 t: Option<String>, 529} 530 531#[derive(Deserialize)] 532struct ErrorFrame { 533 error: String, 534 message: Option<String>, 535} 536 537pub fn decode_frame<'i>(bytes: &'i [u8]) -> Result<SubscribeReposMessage<'i>, FirehoseError> { 538 let mut de = Deserializer::from_slice(bytes); 539 let header = EventHeader::deserialize(&mut de)?; 540 541 match header.op { 542 -1 => { 543 let err = ErrorFrame::deserialize(&mut de)?; 544 return Err(FirehoseError::RelayError { 545 error: err.error, 546 message: err.message, 547 }); 548 } 549 1 => {} 550 op => return Err(FirehoseError::UnknownOp(op)), 551 } 552 553 let t = header.t.ok_or(FirehoseError::MissingType)?; 554 555 let msg = match t.as_str() { 556 "#commit" => SubscribeReposMessage::Commit(Box::new(Deserialize::deserialize(&mut de)?)), 557 "#account" => SubscribeReposMessage::Account(Box::new(Deserialize::deserialize(&mut de)?)), 558 "#identity" => { 559 SubscribeReposMessage::Identity(Box::new(Deserialize::deserialize(&mut de)?)) 560 } 561 "#sync" => SubscribeReposMessage::Sync(Box::new(Deserialize::deserialize(&mut de)?)), 562 "#info" => SubscribeReposMessage::Info(Box::new(Deserialize::deserialize(&mut de)?)), 563 other => return Err(FirehoseError::UnknownType(other.to_string())), 564 }; 565 566 Ok(msg) 567} 568 569#[cfg(test)] 570mod test { 571 use super::{SubscribeReposMessage, decode_frame}; 572 573 #[test] 574 fn test_decode_account() { 575 const FRAME: &[u8] = b"omF0aCNhY2NvdW50Ym9wAaRjZGlkeCBkaWQ6cGxjOjNuNDNncWY2YTZua3J3MzU1cnNjNnJ0ZGNzZXEbAAAAAlP5Zt5kdGltZXgbMjAyNi0wMi0yNFQxNDowNToyMC41MjE1NDY5ZmFjdGl2ZfU="; 576 let bytes = data_encoding::BASE64.decode(FRAME).unwrap(); 577 let msg = decode_frame(&bytes).unwrap(); 578 assert!(matches!(msg, SubscribeReposMessage::Account(_))); 579 } 580 581 /// regression: some relays send `since: ""` (empty string) instead of null/absent for the initial commit. 582 /// this should decode cleanly with `since = None`. 583 /// TODO: is this behaviour we should reject? 584 #[test] 585 fn test_decode_commit_empty_since() { 586 const FRAME: &[u8] = b"omF0ZyNjb21taXRib3ABq2NvcHOAY3Jldm0zbWdkeWNmdWIyeTIyY3NlcRsAAAACZ8sHJmRyZXBveCBkaWQ6cGxjOmpxM3p2cmI1ZXdnMnFndXA3M3Fzb3V6ZWR0aW1leBsyMDI2LTAzLTA1VDE1OjQ3OjU0LjcxNjMyOFplYmxvYnOAZXNpbmNlYGZibG9ja3NZAUk6omVyb290c4HYKlglAAFxEiAlaO/rjabPL4/e2QlxkoxzCCwv69hE4P3Vdxpv7f6uEWd2ZXJzaW9uAeABAXESICVo7+uNps8vj97ZCXGSjHMILC/r2ETg/dV3Gm/t/q4RpmNkaWR4IGRpZDpwbGM6anEzenZyYjVld2cycWd1cDczcXNvdXplY3Jldm0zbWdkeWNmdWIyeTIyY3NpZ1hAwKfrZtwwbN7dW0uSbviOs65NWQRvlS9Qc7oRtiorybMTEYxKGJaFK2kHIMEWIJqumb4751En2aJEpsilWlaQOWRkYXRh2CpYJQABcRIgnf7+Yd126j3K5QI4gLCDedV63yBILW/b4nWSifZHZ3tkcHJldvZndmVyc2lvbgMrAXESIJ3+/mHdduo9yuUCOICwg3nVet8gSC1v2+J1kon2R2d7omFlgGFs9mZjb21taXTYKlglAAFxEiAlaO/rjabPL4/e2QlxkoxzCCwv69hE4P3Vdxpv7f6uEWZyZWJhc2X0ZnRvb0JpZ/Q="; 587 let bytes = data_encoding::BASE64.decode(FRAME).unwrap(); 588 let msg = decode_frame(&bytes).unwrap(); 589 let SubscribeReposMessage::Commit(c) = msg else { 590 panic!("expected Commit"); 591 }; 592 assert!(c.since.is_none(), "since should be None for empty string"); 593 } 594}