A better Rust ATProto crate
at main 477 lines 14 kB view raw
1//! Jetstream subscription support 2//! 3//! Jetstream is a simplified JSON-based alternative to the atproto firehose. 4//! Unlike subscribeRepos which uses DAG-CBOR, Jetstream uses JSON encoding. 5 6use crate::types::cid::Cid; 7use crate::types::nsid::Nsid; 8use crate::types::string::{Datetime, Did, Handle, Rkey}; 9use crate::xrpc::{MessageEncoding, SubscriptionResp, XrpcSubscription}; 10use crate::{CowStr, Data, IntoStatic, RawData}; 11use alloc::vec::Vec; 12use serde::{Deserialize, Serialize}; 13 14/// Parameters for subscribing to Jetstream 15#[cfg_attr(feature = "std", derive(bon::Builder))] 16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 17#[serde(rename_all = "camelCase")] 18#[cfg_attr(feature = "std", builder(start_fn = new))] 19pub struct JetstreamParams<'a> { 20 /// Filter by collection NSIDs (max 100) 21 #[serde(skip_serializing_if = "Option::is_none")] 22 #[serde(borrow)] 23 #[builder(into)] 24 pub wanted_collections: Option<Vec<Nsid<'a>>>, 25 26 /// Filter by DIDs (max 10,000) 27 #[serde(skip_serializing_if = "Option::is_none")] 28 #[serde(borrow)] 29 #[builder(into)] 30 pub wanted_dids: Option<Vec<Did<'a>>>, 31 32 /// Unix microseconds timestamp to start playback 33 #[serde(skip_serializing_if = "Option::is_none")] 34 pub cursor: Option<i64>, 35 36 /// Maximum payload size in bytes 37 #[serde(skip_serializing_if = "Option::is_none")] 38 pub max_message_size_bytes: Option<u64>, 39 40 /// Enable zstd compression 41 #[serde(skip_serializing_if = "Option::is_none")] 42 pub compress: Option<bool>, 43 44 /// Pause stream until first options update 45 #[serde(skip_serializing_if = "Option::is_none")] 46 pub require_hello: Option<bool>, 47} 48 49/// Commit operation type 50#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 51#[serde(rename_all = "lowercase")] 52pub enum CommitOperation { 53 /// Create a new record 54 Create, 55 /// Update an existing record 56 Update, 57 /// Delete a record 58 Delete, 59} 60 61/// Commit event details (minimal validation) 62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 63pub struct RawJetstreamCommit<'a> { 64 /// Revision string 65 #[serde(borrow)] 66 pub rev: CowStr<'a>, 67 /// Operation type 68 pub operation: CommitOperation, 69 /// Collection NSID 70 #[serde(borrow)] 71 pub collection: CowStr<'a>, 72 /// Record key 73 #[serde(borrow)] 74 pub rkey: CowStr<'a>, 75 /// Record data (present for create/update) 76 #[serde(skip_serializing_if = "Option::is_none")] 77 #[serde(borrow)] 78 pub record: Option<RawData<'a>>, 79 /// Content identifier 80 #[serde(skip_serializing_if = "Option::is_none")] 81 #[serde(borrow)] 82 pub cid: Option<CowStr<'a>>, 83} 84 85/// Commit event details (additional validation) 86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 87pub struct JetstreamCommit<'a> { 88 /// Revision string 89 #[serde(borrow)] 90 pub rev: CowStr<'a>, 91 /// Operation type 92 pub operation: CommitOperation, 93 /// Collection NSID 94 #[serde(borrow)] 95 pub collection: Nsid<'a>, 96 /// Record key 97 #[serde(borrow)] 98 pub rkey: Rkey<'a>, 99 /// Record data (present for create/update) 100 #[serde(skip_serializing_if = "Option::is_none")] 101 #[serde(borrow)] 102 pub record: Option<Data<'a>>, 103 /// Content identifier 104 #[serde(skip_serializing_if = "Option::is_none")] 105 #[serde(borrow)] 106 pub cid: Option<Cid<'a>>, 107} 108 109/// Identity event details 110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 111pub struct JetstreamIdentity<'a> { 112 /// DID 113 #[serde(borrow)] 114 pub did: Did<'a>, 115 /// Handle 116 #[serde(skip_serializing_if = "Option::is_none")] 117 #[serde(borrow)] 118 pub handle: Option<Handle<'a>>, 119 /// Sequence number 120 pub seq: i64, 121 /// Timestamp 122 pub time: Datetime, 123} 124 125/// Account event details 126#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 127pub struct JetstreamAccount<'a> { 128 /// Account active status 129 pub active: bool, 130 /// DID 131 #[serde(borrow)] 132 pub did: Did<'a>, 133 /// Sequence number 134 pub seq: i64, 135 /// Timestamp 136 pub time: Datetime, 137 /// Optional status message 138 #[serde(skip_serializing_if = "Option::is_none")] 139 #[serde(borrow)] 140 pub status: Option<CowStr<'a>>, 141} 142 143/// Jetstream event message 144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 145#[serde(tag = "kind")] 146#[serde(rename_all = "lowercase")] 147pub enum JetstreamMessage<'a> { 148 /// Commit event 149 Commit { 150 /// DID 151 #[serde(borrow)] 152 did: Did<'a>, 153 /// Unix microseconds timestamp 154 time_us: i64, 155 /// Commit details 156 #[serde(borrow)] 157 commit: JetstreamCommit<'a>, 158 }, 159 /// Identity event 160 Identity { 161 /// DID 162 #[serde(borrow)] 163 did: Did<'a>, 164 /// Unix microseconds timestamp 165 time_us: i64, 166 /// Identity details 167 #[serde(borrow)] 168 identity: JetstreamIdentity<'a>, 169 }, 170 /// Account event 171 Account { 172 /// DID 173 #[serde(borrow)] 174 did: Did<'a>, 175 /// Unix microseconds timestamp 176 time_us: i64, 177 /// Account details 178 #[serde(borrow)] 179 account: JetstreamAccount<'a>, 180 }, 181} 182 183/// Jetstream event message 184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 185#[serde(tag = "kind")] 186#[serde(rename_all = "lowercase")] 187pub enum RawJetstreamMessage<'a> { 188 /// Commit event 189 Commit { 190 /// DID 191 #[serde(borrow)] 192 did: Did<'a>, 193 /// Unix microseconds timestamp 194 time_us: i64, 195 /// Commit details 196 #[serde(borrow)] 197 commit: RawJetstreamCommit<'a>, 198 }, 199 /// Identity event 200 Identity { 201 /// DID 202 #[serde(borrow)] 203 did: Did<'a>, 204 /// Unix microseconds timestamp 205 time_us: i64, 206 /// Identity details 207 #[serde(borrow)] 208 identity: JetstreamIdentity<'a>, 209 }, 210 /// Account event 211 Account { 212 /// DID 213 #[serde(borrow)] 214 did: Did<'a>, 215 /// Unix microseconds timestamp 216 time_us: i64, 217 /// Account details 218 #[serde(borrow)] 219 account: JetstreamAccount<'a>, 220 }, 221 /// Unknown messsage type 222 #[serde(untagged)] 223 Unknown(RawData<'a>), 224} 225 226impl IntoStatic for CommitOperation { 227 type Output = CommitOperation; 228 229 fn into_static(self) -> Self::Output { 230 self 231 } 232} 233 234impl IntoStatic for JetstreamCommit<'_> { 235 type Output = JetstreamCommit<'static>; 236 237 fn into_static(self) -> Self::Output { 238 JetstreamCommit { 239 rev: self.rev.into_static(), 240 operation: self.operation, 241 collection: self.collection.into_static(), 242 rkey: self.rkey.into_static(), 243 record: self.record.map(|r| r.into_static()), 244 cid: self.cid.map(|c| c.into_static()), 245 } 246 } 247} 248 249impl IntoStatic for RawJetstreamCommit<'_> { 250 type Output = RawJetstreamCommit<'static>; 251 252 fn into_static(self) -> Self::Output { 253 RawJetstreamCommit { 254 rev: self.rev.into_static(), 255 operation: self.operation, 256 collection: self.collection.into_static(), 257 rkey: self.rkey.into_static(), 258 record: self.record.map(|r| r.into_static()), 259 cid: self.cid.map(|c| c.into_static()), 260 } 261 } 262} 263 264impl IntoStatic for JetstreamIdentity<'_> { 265 type Output = JetstreamIdentity<'static>; 266 267 fn into_static(self) -> Self::Output { 268 JetstreamIdentity { 269 did: self.did.into_static(), 270 handle: self.handle.map(|h| h.into_static()), 271 seq: self.seq, 272 time: self.time, 273 } 274 } 275} 276 277impl IntoStatic for JetstreamAccount<'_> { 278 type Output = JetstreamAccount<'static>; 279 280 fn into_static(self) -> Self::Output { 281 JetstreamAccount { 282 active: self.active, 283 did: self.did.into_static(), 284 seq: self.seq, 285 time: self.time, 286 status: self.status.map(|s| s.into_static()), 287 } 288 } 289} 290 291impl IntoStatic for JetstreamMessage<'_> { 292 type Output = JetstreamMessage<'static>; 293 294 fn into_static(self) -> Self::Output { 295 match self { 296 JetstreamMessage::Commit { 297 did, 298 time_us, 299 commit, 300 } => JetstreamMessage::Commit { 301 did: did.into_static(), 302 time_us, 303 commit: commit.into_static(), 304 }, 305 JetstreamMessage::Identity { 306 did, 307 time_us, 308 identity, 309 } => JetstreamMessage::Identity { 310 did: did.into_static(), 311 time_us, 312 identity: identity.into_static(), 313 }, 314 JetstreamMessage::Account { 315 did, 316 time_us, 317 account, 318 } => JetstreamMessage::Account { 319 did: did.into_static(), 320 time_us, 321 account: account.into_static(), 322 }, 323 } 324 } 325} 326 327impl IntoStatic for RawJetstreamMessage<'_> { 328 type Output = RawJetstreamMessage<'static>; 329 330 fn into_static(self) -> Self::Output { 331 match self { 332 RawJetstreamMessage::Commit { 333 did, 334 time_us, 335 commit, 336 } => RawJetstreamMessage::Commit { 337 did: did.into_static(), 338 time_us, 339 commit: commit.into_static(), 340 }, 341 RawJetstreamMessage::Identity { 342 did, 343 time_us, 344 identity, 345 } => RawJetstreamMessage::Identity { 346 did: did.into_static(), 347 time_us, 348 identity: identity.into_static(), 349 }, 350 RawJetstreamMessage::Account { 351 did, 352 time_us, 353 account, 354 } => RawJetstreamMessage::Account { 355 did: did.into_static(), 356 time_us, 357 account: account.into_static(), 358 }, 359 RawJetstreamMessage::Unknown(data) => RawJetstreamMessage::Unknown(data.into_static()), 360 } 361 } 362} 363 364/// Stream response type for Jetstream subscriptions 365pub struct JetstreamStream; 366 367impl SubscriptionResp for JetstreamStream { 368 const NSID: &'static str = "jetstream"; 369 const ENCODING: MessageEncoding = MessageEncoding::Json; 370 371 /// Typed Jetstream message 372 type Message<'de> = JetstreamMessage<'de>; 373 374 /// Generic error type 375 type Error<'de> = crate::xrpc::GenericError<'de>; 376} 377 378impl<'a> XrpcSubscription for JetstreamParams<'a> { 379 const NSID: &'static str = "jetstream"; 380 const ENCODING: MessageEncoding = MessageEncoding::Json; 381 const CUSTOM_PATH: Option<&'static str> = Some("/subscribe"); 382 type Stream = JetstreamStream; 383} 384 385impl IntoStatic for JetstreamParams<'_> { 386 type Output = JetstreamParams<'static>; 387 388 fn into_static(self) -> Self::Output { 389 JetstreamParams { 390 wanted_collections: self 391 .wanted_collections 392 .map(|v| v.into_iter().map(|s| s.into_static()).collect()), 393 wanted_dids: self 394 .wanted_dids 395 .map(|v| v.into_iter().map(|s| s.into_static()).collect()), 396 cursor: self.cursor, 397 max_message_size_bytes: self.max_message_size_bytes, 398 compress: self.compress, 399 require_hello: self.require_hello, 400 } 401 } 402} 403 404/// Parameters for subscribing to Jetstream 405#[cfg_attr(feature = "std", derive(bon::Builder))] 406#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 407#[serde(rename_all = "camelCase")] 408#[cfg_attr(feature = "std", builder(start_fn = new))] 409pub struct RawJetstreamParams<'a> { 410 /// Filter by collection NSIDs (max 100) 411 #[serde(skip_serializing_if = "Option::is_none")] 412 #[serde(borrow)] 413 #[builder(into)] 414 pub wanted_collections: Option<Vec<crate::CowStr<'a>>>, 415 416 /// Filter by DIDs (max 10,000) 417 #[serde(skip_serializing_if = "Option::is_none")] 418 #[serde(borrow)] 419 #[builder(into)] 420 pub wanted_dids: Option<Vec<crate::CowStr<'a>>>, 421 422 /// Unix microseconds timestamp to start playback 423 #[serde(skip_serializing_if = "Option::is_none")] 424 pub cursor: Option<i64>, 425 426 /// Maximum payload size in bytes 427 #[serde(skip_serializing_if = "Option::is_none")] 428 pub max_message_size_bytes: Option<u64>, 429 430 /// Enable zstd compression 431 #[serde(skip_serializing_if = "Option::is_none")] 432 pub compress: Option<bool>, 433 434 /// Pause stream until first options update 435 #[serde(skip_serializing_if = "Option::is_none")] 436 pub require_hello: Option<bool>, 437} 438 439/// Stream response type for Jetstream subscriptions 440pub struct JetstreamRawStream; 441 442impl SubscriptionResp for JetstreamRawStream { 443 const NSID: &'static str = "jetstream"; 444 const ENCODING: MessageEncoding = MessageEncoding::Json; 445 446 /// Typed Jetstream message 447 type Message<'de> = RawJetstreamMessage<'de>; 448 449 /// Generic error type 450 type Error<'de> = crate::xrpc::GenericError<'de>; 451} 452 453impl<'a> XrpcSubscription for RawJetstreamParams<'a> { 454 const NSID: &'static str = "jetstream"; 455 const ENCODING: MessageEncoding = MessageEncoding::Json; 456 const CUSTOM_PATH: Option<&'static str> = Some("/subscribe"); 457 type Stream = JetstreamRawStream; 458} 459 460impl IntoStatic for RawJetstreamParams<'_> { 461 type Output = RawJetstreamParams<'static>; 462 463 fn into_static(self) -> Self::Output { 464 RawJetstreamParams { 465 wanted_collections: self 466 .wanted_collections 467 .map(|v| v.into_iter().map(|s| s.into_static()).collect()), 468 wanted_dids: self 469 .wanted_dids 470 .map(|v| v.into_iter().map(|s| s.into_static()).collect()), 471 cursor: self.cursor, 472 max_message_size_bytes: self.max_message_size_bytes, 473 compress: self.compress, 474 require_hello: self.require_hello, 475 } 476 } 477}