A better Rust ATProto crate

run tests in CI, split jetstream subscription traits into two types with different levels of validation

Orual 85153488 edd2477c

Changed files
+211 -13
.tangled
workflows
.zed
crates
jacquard-common
+6 -6
.tangled/workflows/build.yml
··· 11 11 - gcc 12 12 13 13 steps: 14 - - name: build crate 15 - command: | 16 - cargo build 17 - 18 - - name: check wasm compatibility 14 + - name: check basic wasm compatibility 19 15 command: | 20 16 rustup target add wasm32-unknown-unknown 21 17 cargo build -p jacquard-common \ 22 18 --target wasm32-unknown-unknown \ 23 - --features streaming \ 19 + --features websocket \ 24 20 --no-default-features 21 + 22 + - name: run tests 23 + command: | 24 + cargo test
+3
.zed/settings.json
··· 8 8 "initialization_options": { 9 9 "rust": { 10 10 "analyzerTargetDir": true 11 + }, 12 + "cargo": { 13 + "allFeatures": true 11 14 } 12 15 } 13 16 }
+202 -7
crates/jacquard-common/src/jetstream.rs
··· 3 3 //! Jetstream is a simplified JSON-based alternative to the atproto firehose. 4 4 //! Unlike subscribeRepos which uses DAG-CBOR, Jetstream uses JSON encoding. 5 5 6 - use crate::types::string::{Datetime, Did, Handle}; 6 + use crate::types::cid::Cid; 7 + use crate::types::nsid::Nsid; 8 + use crate::types::string::{Datetime, Did, Handle, Rkey}; 7 9 use crate::xrpc::{MessageEncoding, SubscriptionResp, XrpcSubscription}; 8 - use crate::{CowStr, Data, IntoStatic}; 10 + use crate::{CowStr, Data, IntoStatic, RawData}; 9 11 use serde::{Deserialize, Serialize}; 10 12 11 13 /// Parameters for subscribing to Jetstream ··· 17 19 #[serde(skip_serializing_if = "Option::is_none")] 18 20 #[serde(borrow)] 19 21 #[builder(into)] 20 - pub wanted_collections: Option<Vec<crate::CowStr<'a>>>, 22 + pub wanted_collections: Option<Vec<Nsid<'a>>>, 21 23 22 24 /// Filter by DIDs (max 10,000) 23 25 #[serde(skip_serializing_if = "Option::is_none")] 24 26 #[serde(borrow)] 25 27 #[builder(into)] 26 - pub wanted_dids: Option<Vec<crate::CowStr<'a>>>, 28 + pub wanted_dids: Option<Vec<Did<'a>>>, 27 29 28 30 /// Unix microseconds timestamp to start playback 29 31 #[serde(skip_serializing_if = "Option::is_none")] ··· 54 56 Delete, 55 57 } 56 58 57 - /// Commit event details 59 + /// Commit event details (minimal validation) 58 60 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 59 - pub struct JetstreamCommit<'a> { 61 + pub struct RawJetstreamCommit<'a> { 60 62 /// Revision string 61 63 #[serde(borrow)] 62 64 pub rev: CowStr<'a>, ··· 71 73 /// Record data (present for create/update) 72 74 #[serde(skip_serializing_if = "Option::is_none")] 73 75 #[serde(borrow)] 76 + pub record: Option<RawData<'a>>, 77 + /// Content identifier 78 + #[serde(skip_serializing_if = "Option::is_none")] 79 + #[serde(borrow)] 80 + pub cid: Option<CowStr<'a>>, 81 + } 82 + 83 + /// Commit event details (additional validation) 84 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 85 + pub struct JetstreamCommit<'a> { 86 + /// Revision string 87 + #[serde(borrow)] 88 + pub rev: CowStr<'a>, 89 + /// Operation type 90 + pub operation: CommitOperation, 91 + /// Collection NSID 92 + #[serde(borrow)] 93 + pub collection: Nsid<'a>, 94 + /// Record key 95 + #[serde(borrow)] 96 + pub rkey: Rkey<'a>, 97 + /// Record data (present for create/update) 98 + #[serde(skip_serializing_if = "Option::is_none")] 99 + #[serde(borrow)] 74 100 pub record: Option<Data<'a>>, 75 101 /// Content identifier 76 102 #[serde(skip_serializing_if = "Option::is_none")] 77 103 #[serde(borrow)] 78 - pub cid: Option<CowStr<'a>>, 104 + pub cid: Option<Cid<'a>>, 79 105 } 80 106 81 107 /// Identity event details ··· 152 178 }, 153 179 } 154 180 181 + /// Jetstream event message 182 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 183 + #[serde(tag = "kind")] 184 + #[serde(rename_all = "lowercase")] 185 + pub enum RawJetstreamMessage<'a> { 186 + /// Commit event 187 + Commit { 188 + /// DID 189 + #[serde(borrow)] 190 + did: Did<'a>, 191 + /// Unix microseconds timestamp 192 + time_us: i64, 193 + /// Commit details 194 + #[serde(borrow)] 195 + commit: RawJetstreamCommit<'a>, 196 + }, 197 + /// Identity event 198 + Identity { 199 + /// DID 200 + #[serde(borrow)] 201 + did: Did<'a>, 202 + /// Unix microseconds timestamp 203 + time_us: i64, 204 + /// Identity details 205 + #[serde(borrow)] 206 + identity: JetstreamIdentity<'a>, 207 + }, 208 + /// Account event 209 + Account { 210 + /// DID 211 + #[serde(borrow)] 212 + did: Did<'a>, 213 + /// Unix microseconds timestamp 214 + time_us: i64, 215 + /// Account details 216 + #[serde(borrow)] 217 + account: JetstreamAccount<'a>, 218 + }, 219 + /// Unknown messsage type 220 + #[serde(untagged)] 221 + Unknown(RawData<'a>), 222 + } 223 + 155 224 impl IntoStatic for CommitOperation { 156 225 type Output = CommitOperation; 157 226 ··· 165 234 166 235 fn into_static(self) -> Self::Output { 167 236 JetstreamCommit { 237 + rev: self.rev.into_static(), 238 + operation: self.operation, 239 + collection: self.collection.into_static(), 240 + rkey: self.rkey.into_static(), 241 + record: self.record.map(|r| r.into_static()), 242 + cid: self.cid.map(|c| c.into_static()), 243 + } 244 + } 245 + } 246 + 247 + impl IntoStatic for RawJetstreamCommit<'_> { 248 + type Output = RawJetstreamCommit<'static>; 249 + 250 + fn into_static(self) -> Self::Output { 251 + RawJetstreamCommit { 168 252 rev: self.rev.into_static(), 169 253 operation: self.operation, 170 254 collection: self.collection.into_static(), ··· 238 322 } 239 323 } 240 324 325 + impl IntoStatic for RawJetstreamMessage<'_> { 326 + type Output = RawJetstreamMessage<'static>; 327 + 328 + fn into_static(self) -> Self::Output { 329 + match self { 330 + RawJetstreamMessage::Commit { 331 + did, 332 + time_us, 333 + commit, 334 + } => RawJetstreamMessage::Commit { 335 + did: did.into_static(), 336 + time_us, 337 + commit: commit.into_static(), 338 + }, 339 + RawJetstreamMessage::Identity { 340 + did, 341 + time_us, 342 + identity, 343 + } => RawJetstreamMessage::Identity { 344 + did: did.into_static(), 345 + time_us, 346 + identity: identity.into_static(), 347 + }, 348 + RawJetstreamMessage::Account { 349 + did, 350 + time_us, 351 + account, 352 + } => RawJetstreamMessage::Account { 353 + did: did.into_static(), 354 + time_us, 355 + account: account.into_static(), 356 + }, 357 + RawJetstreamMessage::Unknown(data) => RawJetstreamMessage::Unknown(data.into_static()), 358 + } 359 + } 360 + } 361 + 241 362 /// Stream response type for Jetstream subscriptions 242 363 pub struct JetstreamStream; 243 364 ··· 277 398 } 278 399 } 279 400 } 401 + 402 + /// Parameters for subscribing to Jetstream 403 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bon::Builder)] 404 + #[serde(rename_all = "camelCase")] 405 + #[builder(start_fn = new)] 406 + pub struct RawJetstreamParams<'a> { 407 + /// Filter by collection NSIDs (max 100) 408 + #[serde(skip_serializing_if = "Option::is_none")] 409 + #[serde(borrow)] 410 + #[builder(into)] 411 + pub wanted_collections: Option<Vec<crate::CowStr<'a>>>, 412 + 413 + /// Filter by DIDs (max 10,000) 414 + #[serde(skip_serializing_if = "Option::is_none")] 415 + #[serde(borrow)] 416 + #[builder(into)] 417 + pub wanted_dids: Option<Vec<crate::CowStr<'a>>>, 418 + 419 + /// Unix microseconds timestamp to start playback 420 + #[serde(skip_serializing_if = "Option::is_none")] 421 + pub cursor: Option<i64>, 422 + 423 + /// Maximum payload size in bytes 424 + #[serde(skip_serializing_if = "Option::is_none")] 425 + pub max_message_size_bytes: Option<u64>, 426 + 427 + /// Enable zstd compression 428 + #[serde(skip_serializing_if = "Option::is_none")] 429 + pub compress: Option<bool>, 430 + 431 + /// Pause stream until first options update 432 + #[serde(skip_serializing_if = "Option::is_none")] 433 + pub require_hello: Option<bool>, 434 + } 435 + 436 + /// Stream response type for Jetstream subscriptions 437 + pub struct JetstreamRawStream; 438 + 439 + impl SubscriptionResp for JetstreamRawStream { 440 + const NSID: &'static str = "jetstream"; 441 + const ENCODING: MessageEncoding = MessageEncoding::Json; 442 + 443 + /// Typed Jetstream message 444 + type Message<'de> = RawJetstreamMessage<'de>; 445 + 446 + /// Generic error type 447 + type Error<'de> = crate::xrpc::GenericError<'de>; 448 + } 449 + 450 + impl<'a> XrpcSubscription for RawJetstreamParams<'a> { 451 + const NSID: &'static str = "jetstream"; 452 + const ENCODING: MessageEncoding = MessageEncoding::Json; 453 + const CUSTOM_PATH: Option<&'static str> = Some("/subscribe"); 454 + type Stream = JetstreamRawStream; 455 + } 456 + 457 + impl IntoStatic for RawJetstreamParams<'_> { 458 + type Output = RawJetstreamParams<'static>; 459 + 460 + fn into_static(self) -> Self::Output { 461 + RawJetstreamParams { 462 + wanted_collections: self 463 + .wanted_collections 464 + .map(|v| v.into_iter().map(|s| s.into_static()).collect()), 465 + wanted_dids: self 466 + .wanted_dids 467 + .map(|v| v.into_iter().map(|s| s.into_static()).collect()), 468 + cursor: self.cursor, 469 + max_message_size_bytes: self.max_message_size_bytes, 470 + compress: self.compress, 471 + require_hello: self.require_hello, 472 + } 473 + } 474 + }