A better Rust ATProto crate

event stream subscriptions and jetstream work!

- fixed issues in the codegen for subscription types,
- fixed decode to account for the header
- added jetstream as manual impl of the traits.
- added examples for both.

Orual 0b5d4cbe 6b0272d9

Changed files
+1018 -128
crates
jacquard
jacquard-api
jacquard-common
jacquard-lexicon
src
codegen
jacquard-oauth
src
examples
+114 -7
Cargo.lock
··· 803 803 ] 804 804 805 805 [[package]] 806 + name = "core-foundation" 807 + version = "0.10.1" 808 + source = "registry+https://github.com/rust-lang/crates.io-index" 809 + checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" 810 + dependencies = [ 811 + "core-foundation-sys", 812 + "libc", 813 + ] 814 + 815 + [[package]] 806 816 name = "core-foundation-sys" 807 817 version = "0.8.7" 808 818 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1246 1256 checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" 1247 1257 dependencies = [ 1248 1258 "libc", 1249 - "windows-sys 0.60.2", 1259 + "windows-sys 0.61.1", 1250 1260 ] 1251 1261 1252 1262 [[package]] ··· 2251 2261 "jacquard-derive 0.5.4", 2252 2262 "miette", 2253 2263 "serde", 2264 + "serde_ipld_dagcbor", 2254 2265 "thiserror 2.0.17", 2255 2266 ] 2256 2267 ··· 2326 2337 "bon", 2327 2338 "bytes", 2328 2339 "chrono", 2340 + "ciborium", 2329 2341 "cid", 2330 2342 "ed25519-dalek", 2331 2343 "futures", ··· 2358 2370 "tracing", 2359 2371 "trait-variant", 2360 2372 "url", 2373 + "zstd", 2361 2374 ] 2362 2375 2363 2376 [[package]] ··· 2978 2991 source = "registry+https://github.com/rust-lang/crates.io-index" 2979 2992 checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" 2980 2993 dependencies = [ 2981 - "windows-sys 0.60.2", 2994 + "windows-sys 0.61.1", 2982 2995 ] 2983 2996 2984 2997 [[package]] ··· 3137 3150 version = "1.70.1" 3138 3151 source = "registry+https://github.com/rust-lang/crates.io-index" 3139 3152 checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" 3153 + 3154 + [[package]] 3155 + name = "openssl-probe" 3156 + version = "0.1.6" 3157 + source = "registry+https://github.com/rust-lang/crates.io-index" 3158 + checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" 3140 3159 3141 3160 [[package]] 3142 3161 name = "ouroboros" ··· 3957 3976 "errno", 3958 3977 "libc", 3959 3978 "linux-raw-sys 0.11.0", 3960 - "windows-sys 0.60.2", 3979 + "windows-sys 0.61.1", 3961 3980 ] 3962 3981 3963 3982 [[package]] ··· 3975 3994 ] 3976 3995 3977 3996 [[package]] 3997 + name = "rustls-native-certs" 3998 + version = "0.8.2" 3999 + source = "registry+https://github.com/rust-lang/crates.io-index" 4000 + checksum = "9980d917ebb0c0536119ba501e90834767bffc3d60641457fd84a1f3fd337923" 4001 + dependencies = [ 4002 + "openssl-probe", 4003 + "rustls-pki-types", 4004 + "schannel", 4005 + "security-framework", 4006 + ] 4007 + 4008 + [[package]] 3978 4009 name = "rustls-pki-types" 3979 4010 version = "1.12.0" 3980 4011 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4020 4051 checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" 4021 4052 dependencies = [ 4022 4053 "winapi-util", 4054 + ] 4055 + 4056 + [[package]] 4057 + name = "schannel" 4058 + version = "0.1.28" 4059 + source = "registry+https://github.com/rust-lang/crates.io-index" 4060 + checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" 4061 + dependencies = [ 4062 + "windows-sys 0.61.1", 4023 4063 ] 4024 4064 4025 4065 [[package]] ··· 4073 4113 ] 4074 4114 4075 4115 [[package]] 4116 + name = "security-framework" 4117 + version = "3.5.1" 4118 + source = "registry+https://github.com/rust-lang/crates.io-index" 4119 + checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" 4120 + dependencies = [ 4121 + "bitflags", 4122 + "core-foundation 0.10.1", 4123 + "core-foundation-sys", 4124 + "libc", 4125 + "security-framework-sys", 4126 + ] 4127 + 4128 + [[package]] 4129 + name = "security-framework-sys" 4130 + version = "2.15.0" 4131 + source = "registry+https://github.com/rust-lang/crates.io-index" 4132 + checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" 4133 + dependencies = [ 4134 + "core-foundation-sys", 4135 + "libc", 4136 + ] 4137 + 4138 + [[package]] 4076 4139 name = "semver" 4077 4140 version = "1.0.27" 4078 4141 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4520 4583 checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" 4521 4584 dependencies = [ 4522 4585 "bitflags", 4523 - "core-foundation", 4586 + "core-foundation 0.9.4", 4524 4587 "system-configuration-sys", 4525 4588 ] 4526 4589 ··· 4563 4626 "getrandom 0.3.4", 4564 4627 "once_cell", 4565 4628 "rustix 1.1.2", 4566 - "windows-sys 0.60.2", 4629 + "windows-sys 0.61.1", 4567 4630 ] 4568 4631 4569 4632 [[package]] ··· 4797 4860 dependencies = [ 4798 4861 "futures-util", 4799 4862 "log", 4863 + "rustls", 4864 + "rustls-native-certs", 4865 + "rustls-pki-types", 4800 4866 "tokio", 4867 + "tokio-rustls", 4801 4868 "tungstenite", 4802 4869 ] 4803 4870 ··· 4812 4879 "http", 4813 4880 "httparse", 4814 4881 "js-sys", 4882 + "rustls", 4815 4883 "thiserror 1.0.69", 4816 4884 "tokio", 4817 4885 "tokio-tungstenite", ··· 5007 5075 "httparse", 5008 5076 "log", 5009 5077 "rand 0.8.5", 5078 + "rustls", 5079 + "rustls-pki-types", 5010 5080 "sha1", 5011 5081 "thiserror 1.0.69", 5012 5082 "utf-8", ··· 5342 5412 source = "registry+https://github.com/rust-lang/crates.io-index" 5343 5413 checksum = "db67ae75a9405634f5882791678772c94ff5f16a66535aae186e26aa0841fc8b" 5344 5414 dependencies = [ 5345 - "core-foundation", 5415 + "core-foundation 0.9.4", 5346 5416 "home", 5347 5417 "jni", 5348 5418 "log", ··· 5396 5466 source = "registry+https://github.com/rust-lang/crates.io-index" 5397 5467 checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" 5398 5468 dependencies = [ 5399 - "windows-sys 0.60.2", 5469 + "windows-sys 0.61.1", 5400 5470 ] 5401 5471 5402 5472 [[package]] ··· 5601 5671 ] 5602 5672 5603 5673 [[package]] 5674 + name = "windows-sys" 5675 + version = "0.61.1" 5676 + source = "registry+https://github.com/rust-lang/crates.io-index" 5677 + checksum = "6f109e41dd4a3c848907eb83d5a42ea98b3769495597450cf6d153507b166f0f" 5678 + dependencies = [ 5679 + "windows-link 0.2.0", 5680 + ] 5681 + 5682 + [[package]] 5604 5683 name = "windows-targets" 5605 5684 version = "0.42.2" 5606 5685 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 6003 6082 "proc-macro2", 6004 6083 "quote", 6005 6084 "syn 2.0.106", 6085 + ] 6086 + 6087 + [[package]] 6088 + name = "zstd" 6089 + version = "0.13.3" 6090 + source = "registry+https://github.com/rust-lang/crates.io-index" 6091 + checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" 6092 + dependencies = [ 6093 + "zstd-safe", 6094 + ] 6095 + 6096 + [[package]] 6097 + name = "zstd-safe" 6098 + version = "7.2.4" 6099 + source = "registry+https://github.com/rust-lang/crates.io-index" 6100 + checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" 6101 + dependencies = [ 6102 + "zstd-sys", 6103 + ] 6104 + 6105 + [[package]] 6106 + name = "zstd-sys" 6107 + version = "2.0.16+zstd.1.5.7" 6108 + source = "registry+https://github.com/rust-lang/crates.io-index" 6109 + checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" 6110 + dependencies = [ 6111 + "cc", 6112 + "pkg-config", 6006 6113 ] 6007 6114 6008 6115 [[package]]
+1
crates/jacquard-api/Cargo.toml
··· 21 21 jacquard-derive = { version = "0.5", path = "../jacquard-derive" } 22 22 miette.workspace = true 23 23 serde.workspace = true 24 + serde_ipld_dagcbor.workspace = true 24 25 thiserror.workspace = true 25 26 26 27
+6 -7
crates/jacquard-api/src/com_atproto/label.rs
··· 6 6 // Any manual changes will be overwritten on the next regeneration. 7 7 8 8 pub mod query_labels; 9 - #[cfg(feature = "streaming")] 10 9 pub mod subscribe_labels; 11 10 12 11 /// Metadata tag on an atproto resource (eg, repo or record). ··· 19 18 PartialEq, 20 19 Eq, 21 20 jacquard_derive::IntoStatic, 22 - bon::Builder, 21 + bon::Builder 23 22 )] 24 23 #[serde(rename_all = "camelCase")] 25 24 pub struct Label<'a> { ··· 189 188 PartialEq, 190 189 Eq, 191 190 jacquard_derive::IntoStatic, 192 - bon::Builder, 191 + bon::Builder 193 192 )] 194 193 #[serde(rename_all = "camelCase")] 195 194 pub struct LabelValueDefinition<'a> { ··· 228 227 PartialEq, 229 228 Eq, 230 229 jacquard_derive::IntoStatic, 231 - bon::Builder, 230 + bon::Builder 232 231 )] 233 232 #[serde(rename_all = "camelCase")] 234 233 pub struct LabelValueDefinitionStrings<'a> { ··· 254 253 PartialEq, 255 254 Eq, 256 255 jacquard_derive::IntoStatic, 257 - Default, 256 + Default 258 257 )] 259 258 #[serde(rename_all = "camelCase")] 260 259 pub struct SelfLabel<'a> { ··· 273 272 PartialEq, 274 273 Eq, 275 274 jacquard_derive::IntoStatic, 276 - bon::Builder, 275 + bon::Builder 277 276 )] 278 277 #[serde(rename_all = "camelCase")] 279 278 pub struct SelfLabels<'a> { 280 279 #[serde(borrow)] 281 280 pub values: Vec<crate::com_atproto::label::SelfLabel<'a>>, 282 - } 281 + }
+31
crates/jacquard-api/src/com_atproto/label/subscribe_labels.rs
··· 79 79 Info(Box<crate::com_atproto::label::subscribe_labels::Info<'a>>), 80 80 } 81 81 82 + impl<'a> SubscribeLabelsMessage<'a> { 83 + /// Decode a framed DAG-CBOR message (header + body). 84 + pub fn decode_framed<'de: 'a>( 85 + bytes: &'de [u8], 86 + ) -> Result<SubscribeLabelsMessage<'a>, jacquard_common::error::DecodeError> { 87 + let (header, body) = jacquard_common::xrpc::subscription::parse_event_header( 88 + bytes, 89 + )?; 90 + match header.t.as_str() { 91 + "#labels" => { 92 + let variant = serde_ipld_dagcbor::from_slice(body)?; 93 + Ok(Self::Labels(Box::new(variant))) 94 + } 95 + "#info" => { 96 + let variant = serde_ipld_dagcbor::from_slice(body)?; 97 + Ok(Self::Info(Box::new(variant))) 98 + } 99 + unknown => { 100 + Err( 101 + jacquard_common::error::DecodeError::UnknownEventType(unknown.into()), 102 + ) 103 + } 104 + } 105 + } 106 + } 107 + 82 108 #[jacquard_derive::open_union] 83 109 #[derive( 84 110 serde::Serialize, ··· 121 147 const ENCODING: jacquard_common::xrpc::MessageEncoding = jacquard_common::xrpc::MessageEncoding::DagCbor; 122 148 type Message<'de> = SubscribeLabelsMessage<'de>; 123 149 type Error<'de> = SubscribeLabelsError<'de>; 150 + fn decode_message<'de>( 151 + bytes: &'de [u8], 152 + ) -> Result<Self::Message<'de>, jacquard_common::error::DecodeError> { 153 + SubscribeLabelsMessage::decode_framed(bytes) 154 + } 124 155 } 125 156 126 157 impl jacquard_common::xrpc::XrpcSubscription for SubscribeLabels {
+1 -2
crates/jacquard-api/src/com_atproto/sync.rs
··· 20 20 pub mod list_repos_by_collection; 21 21 pub mod notify_of_update; 22 22 pub mod request_crawl; 23 - #[cfg(feature = "streaming")] 24 23 pub mod subscribe_repos; 25 24 26 25 #[derive(Debug, Clone, PartialEq, Eq, Hash)] ··· 112 111 HostStatus::Other(v) => HostStatus::Other(v.into_static()), 113 112 } 114 113 } 115 - } 114 + }
+55 -19
crates/jacquard-api/src/com_atproto/sync/subscribe_repos.rs
··· 15 15 PartialEq, 16 16 Eq, 17 17 jacquard_derive::IntoStatic, 18 - bon::Builder 18 + bon::Builder, 19 19 )] 20 20 #[serde(rename_all = "camelCase")] 21 21 pub struct Account<'a> { ··· 42 42 PartialEq, 43 43 Eq, 44 44 jacquard_derive::IntoStatic, 45 - bon::Builder 45 + bon::Builder, 46 46 )] 47 47 #[serde(rename_all = "camelCase")] 48 48 pub struct Commit<'a> { ··· 87 87 PartialEq, 88 88 Eq, 89 89 jacquard_derive::IntoStatic, 90 - bon::Builder 90 + bon::Builder, 91 91 )] 92 92 #[serde(rename_all = "camelCase")] 93 93 pub struct Identity<'a> { ··· 111 111 PartialEq, 112 112 Eq, 113 113 jacquard_derive::IntoStatic, 114 - Default 114 + Default, 115 115 )] 116 116 #[serde(rename_all = "camelCase")] 117 117 pub struct Info<'a> { ··· 130 130 PartialEq, 131 131 Eq, 132 132 bon::Builder, 133 - jacquard_derive::IntoStatic 133 + jacquard_derive::IntoStatic, 134 134 )] 135 135 #[builder(start_fn = new)] 136 136 #[serde(rename_all = "camelCase")] ··· 141 141 142 142 #[jacquard_derive::open_union] 143 143 #[derive( 144 - serde::Serialize, 145 - serde::Deserialize, 146 - Debug, 147 - Clone, 148 - PartialEq, 149 - Eq, 150 - jacquard_derive::IntoStatic 144 + serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, jacquard_derive::IntoStatic, 151 145 )] 152 146 #[serde(tag = "$type")] 153 147 #[serde(bound(deserialize = "'de: 'a"))] ··· 164 158 Info(Box<crate::com_atproto::sync::subscribe_repos::Info<'a>>), 165 159 } 166 160 161 + impl<'a> SubscribeReposMessage<'a> { 162 + /// Decode a framed DAG-CBOR message (header + body). 163 + pub fn decode_framed<'de: 'a>( 164 + bytes: &'de [u8], 165 + ) -> Result<SubscribeReposMessage<'a>, jacquard_common::error::DecodeError> { 166 + let (header, body) = jacquard_common::xrpc::subscription::parse_event_header(bytes)?; 167 + match header.t.as_str() { 168 + "#commit" => { 169 + let variant = serde_ipld_dagcbor::from_slice(body)?; 170 + Ok(Self::Commit(Box::new(variant))) 171 + } 172 + "#sync" => { 173 + let variant = serde_ipld_dagcbor::from_slice(body)?; 174 + Ok(Self::Sync(Box::new(variant))) 175 + } 176 + "#identity" => { 177 + let variant = serde_ipld_dagcbor::from_slice(body)?; 178 + Ok(Self::Identity(Box::new(variant))) 179 + } 180 + "#account" => { 181 + let variant = serde_ipld_dagcbor::from_slice(body)?; 182 + Ok(Self::Account(Box::new(variant))) 183 + } 184 + "#info" => { 185 + let variant = serde_ipld_dagcbor::from_slice(body)?; 186 + Ok(Self::Info(Box::new(variant))) 187 + } 188 + unknown => Err(jacquard_common::error::DecodeError::UnknownEventType( 189 + unknown.into(), 190 + )), 191 + } 192 + } 193 + } 194 + 167 195 #[jacquard_derive::open_union] 168 196 #[derive( 169 197 serde::Serialize, ··· 174 202 Eq, 175 203 thiserror::Error, 176 204 miette::Diagnostic, 177 - jacquard_derive::IntoStatic 205 + jacquard_derive::IntoStatic, 178 206 )] 179 207 #[serde(tag = "error", content = "message")] 180 208 #[serde(bound(deserialize = "'de: 'a"))] ··· 213 241 pub struct SubscribeReposStream; 214 242 impl jacquard_common::xrpc::SubscriptionResp for SubscribeReposStream { 215 243 const NSID: &'static str = "com.atproto.sync.subscribeRepos"; 216 - const ENCODING: jacquard_common::xrpc::MessageEncoding = jacquard_common::xrpc::MessageEncoding::DagCbor; 244 + const ENCODING: jacquard_common::xrpc::MessageEncoding = 245 + jacquard_common::xrpc::MessageEncoding::DagCbor; 217 246 type Message<'de> = SubscribeReposMessage<'de>; 218 247 type Error<'de> = SubscribeReposError<'de>; 248 + fn decode_message<'de>( 249 + bytes: &'de [u8], 250 + ) -> Result<Self::Message<'de>, jacquard_common::error::DecodeError> { 251 + SubscribeReposMessage::decode_framed(bytes) 252 + } 219 253 } 220 254 221 255 impl jacquard_common::xrpc::XrpcSubscription for SubscribeRepos { 222 256 const NSID: &'static str = "com.atproto.sync.subscribeRepos"; 223 - const ENCODING: jacquard_common::xrpc::MessageEncoding = jacquard_common::xrpc::MessageEncoding::DagCbor; 257 + const ENCODING: jacquard_common::xrpc::MessageEncoding = 258 + jacquard_common::xrpc::MessageEncoding::DagCbor; 224 259 type Stream = SubscribeReposStream; 225 260 } 226 261 227 262 pub struct SubscribeReposEndpoint; 228 263 impl jacquard_common::xrpc::SubscriptionEndpoint for SubscribeReposEndpoint { 229 264 const PATH: &'static str = "/xrpc/com.atproto.sync.subscribeRepos"; 230 - const ENCODING: jacquard_common::xrpc::MessageEncoding = jacquard_common::xrpc::MessageEncoding::DagCbor; 265 + const ENCODING: jacquard_common::xrpc::MessageEncoding = 266 + jacquard_common::xrpc::MessageEncoding::DagCbor; 231 267 type Params<'de> = SubscribeRepos; 232 268 type Stream = SubscribeReposStream; 233 269 } ··· 242 278 PartialEq, 243 279 Eq, 244 280 jacquard_derive::IntoStatic, 245 - bon::Builder 281 + bon::Builder, 246 282 )] 247 283 #[serde(rename_all = "camelCase")] 248 284 pub struct RepoOp<'a> { ··· 272 308 PartialEq, 273 309 Eq, 274 310 jacquard_derive::IntoStatic, 275 - bon::Builder 311 + bon::Builder, 276 312 )] 277 313 #[serde(rename_all = "camelCase")] 278 314 pub struct Sync<'a> { ··· 289 325 pub seq: i64, 290 326 /// Timestamp of when this message was originally broadcast. 291 327 pub time: jacquard_common::types::string::Datetime, 292 - } 328 + }
+10 -4
crates/jacquard-api/src/place_stream/live/get_profile_card.rs
··· 13 13 PartialEq, 14 14 Eq, 15 15 bon::Builder, 16 - jacquard_derive::IntoStatic, 16 + jacquard_derive::IntoStatic 17 17 )] 18 18 #[builder(start_fn = new)] 19 19 #[serde(rename_all = "camelCase")] ··· 24 24 } 25 25 26 26 #[derive( 27 - serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, jacquard_derive::IntoStatic, 27 + serde::Serialize, 28 + serde::Deserialize, 29 + Debug, 30 + Clone, 31 + PartialEq, 32 + Eq, 33 + jacquard_derive::IntoStatic 28 34 )] 29 35 #[serde(rename_all = "camelCase")] 30 36 pub struct GetProfileCardOutput { ··· 41 47 Eq, 42 48 thiserror::Error, 43 49 miette::Diagnostic, 44 - jacquard_derive::IntoStatic, 50 + jacquard_derive::IntoStatic 45 51 )] 46 52 #[serde(tag = "error", content = "message")] 47 53 #[serde(bound(deserialize = "'de: 'a"))] ··· 104 110 const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query; 105 111 type Request<'de> = GetProfileCard<'de>; 106 112 type Response = GetProfileCardResponse; 107 - } 113 + }
+6 -3
crates/jacquard-common/Cargo.toml
··· 12 12 license.workspace = true 13 13 14 14 [features] 15 - default = ["service-auth", "reqwest-client", "crypto"] 15 + default = ["service-auth", "reqwest-client", "crypto", "websocket", "zstd"] 16 16 crypto = [] 17 17 crypto-ed25519 = ["crypto", "dep:ed25519-dalek"] 18 18 crypto-k256 = ["crypto", "dep:k256", "k256/ecdsa"] ··· 21 21 reqwest-client = ["dep:reqwest"] 22 22 tracing = ["dep:tracing"] 23 23 streaming = ["n0-future", "futures"] 24 - websocket = ["streaming", "tokio-tungstenite-wasm"] 24 + websocket = ["streaming", "tokio-tungstenite-wasm", "dep:ciborium"] 25 + zstd = ["dep:zstd"] 25 26 26 27 [dependencies] 27 28 trait-variant.workspace = true ··· 55 56 # Streaming support (optional) 56 57 n0-future = { workspace = true, optional = true } 57 58 futures = { version = "0.3", optional = true } 58 - tokio-tungstenite-wasm = { version = "0.4", optional = true } 59 + tokio-tungstenite-wasm = { version = "0.4", features = ["rustls-tls-native-roots"], optional = true } 60 + ciborium = {version = "0.2.0", optional = true } 59 61 genawaiter = { version = "0.99.1", features = ["futures03"] } 62 + zstd = { version = "0.13", optional = true } 60 63 61 64 [target.'cfg(target_family = "wasm")'.dependencies] 62 65 getrandom = { version = "0.3.4", features = ["wasm_js"] }
+12
crates/jacquard-common/src/error.rs
··· 98 98 #[source] 99 99 serde_ipld_dagcbor::DecodeError<std::convert::Infallible>, 100 100 ), 101 + #[cfg(feature = "websocket")] 102 + #[error("Failed to deserialize cbor header: {0}")] 103 + CborHeader( 104 + #[from] 105 + #[source] 106 + ciborium::de::Error<std::io::Error>, 107 + ), 108 + 109 + /// Unknown event type in framed message 110 + #[cfg(feature = "websocket")] 111 + #[error("Unknown event type: {0}")] 112 + UnknownEventType(smol_str::SmolStr), 101 113 } 102 114 103 115 /// HTTP error response (non-200 status codes outside of XRPC error handling)
+279
crates/jacquard-common/src/jetstream.rs
··· 1 + //! Jetstream subscription support 2 + //! 3 + //! Jetstream is a simplified JSON-based alternative to the atproto firehose. 4 + //! Unlike subscribeRepos which uses DAG-CBOR, Jetstream uses JSON encoding. 5 + 6 + use crate::types::string::{Datetime, Did, Handle}; 7 + use crate::xrpc::{MessageEncoding, SubscriptionResp, XrpcSubscription}; 8 + use crate::{CowStr, Data, IntoStatic}; 9 + use serde::{Deserialize, Serialize}; 10 + 11 + /// Parameters for subscribing to Jetstream 12 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, bon::Builder)] 13 + #[serde(rename_all = "camelCase")] 14 + #[builder(start_fn = new)] 15 + pub struct JetstreamParams<'a> { 16 + /// Filter by collection NSIDs (max 100) 17 + #[serde(skip_serializing_if = "Option::is_none")] 18 + #[serde(borrow)] 19 + #[builder(into)] 20 + pub wanted_collections: Option<Vec<crate::CowStr<'a>>>, 21 + 22 + /// Filter by DIDs (max 10,000) 23 + #[serde(skip_serializing_if = "Option::is_none")] 24 + #[serde(borrow)] 25 + #[builder(into)] 26 + pub wanted_dids: Option<Vec<crate::CowStr<'a>>>, 27 + 28 + /// Unix microseconds timestamp to start playback 29 + #[serde(skip_serializing_if = "Option::is_none")] 30 + pub cursor: Option<i64>, 31 + 32 + /// Maximum payload size in bytes 33 + #[serde(skip_serializing_if = "Option::is_none")] 34 + pub max_message_size_bytes: Option<u64>, 35 + 36 + /// Enable zstd compression 37 + #[serde(skip_serializing_if = "Option::is_none")] 38 + pub compress: Option<bool>, 39 + 40 + /// Pause stream until first options update 41 + #[serde(skip_serializing_if = "Option::is_none")] 42 + pub require_hello: Option<bool>, 43 + } 44 + 45 + /// Commit operation type 46 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 47 + #[serde(rename_all = "lowercase")] 48 + pub enum CommitOperation { 49 + /// Create a new record 50 + Create, 51 + /// Update an existing record 52 + Update, 53 + /// Delete a record 54 + Delete, 55 + } 56 + 57 + /// Commit event details 58 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 59 + pub struct JetstreamCommit<'a> { 60 + /// Revision string 61 + #[serde(borrow)] 62 + pub rev: CowStr<'a>, 63 + /// Operation type 64 + pub operation: CommitOperation, 65 + /// Collection NSID 66 + #[serde(borrow)] 67 + pub collection: CowStr<'a>, 68 + /// Record key 69 + #[serde(borrow)] 70 + pub rkey: CowStr<'a>, 71 + /// Record data (present for create/update) 72 + #[serde(skip_serializing_if = "Option::is_none")] 73 + #[serde(borrow)] 74 + pub record: Option<Data<'a>>, 75 + /// Content identifier 76 + #[serde(skip_serializing_if = "Option::is_none")] 77 + #[serde(borrow)] 78 + pub cid: Option<CowStr<'a>>, 79 + } 80 + 81 + /// Identity event details 82 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 83 + pub struct JetstreamIdentity<'a> { 84 + /// DID 85 + #[serde(borrow)] 86 + pub did: Did<'a>, 87 + /// Handle 88 + #[serde(skip_serializing_if = "Option::is_none")] 89 + #[serde(borrow)] 90 + pub handle: Option<Handle<'a>>, 91 + /// Sequence number 92 + pub seq: i64, 93 + /// Timestamp 94 + pub time: Datetime, 95 + } 96 + 97 + /// Account event details 98 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 99 + pub struct JetstreamAccount<'a> { 100 + /// Account active status 101 + pub active: bool, 102 + /// DID 103 + #[serde(borrow)] 104 + pub did: Did<'a>, 105 + /// Sequence number 106 + pub seq: i64, 107 + /// Timestamp 108 + pub time: Datetime, 109 + /// Optional status message 110 + #[serde(skip_serializing_if = "Option::is_none")] 111 + #[serde(borrow)] 112 + pub status: Option<CowStr<'a>>, 113 + } 114 + 115 + /// Jetstream event message 116 + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 117 + #[serde(tag = "kind")] 118 + #[serde(rename_all = "lowercase")] 119 + pub enum JetstreamMessage<'a> { 120 + /// Commit event 121 + Commit { 122 + /// DID 123 + #[serde(borrow)] 124 + did: Did<'a>, 125 + /// Unix microseconds timestamp 126 + time_us: i64, 127 + /// Commit details 128 + #[serde(borrow)] 129 + commit: JetstreamCommit<'a>, 130 + }, 131 + /// Identity event 132 + Identity { 133 + /// DID 134 + #[serde(borrow)] 135 + did: Did<'a>, 136 + /// Unix microseconds timestamp 137 + time_us: i64, 138 + /// Identity details 139 + #[serde(borrow)] 140 + identity: JetstreamIdentity<'a>, 141 + }, 142 + /// Account event 143 + Account { 144 + /// DID 145 + #[serde(borrow)] 146 + did: Did<'a>, 147 + /// Unix microseconds timestamp 148 + time_us: i64, 149 + /// Account details 150 + #[serde(borrow)] 151 + account: JetstreamAccount<'a>, 152 + }, 153 + } 154 + 155 + impl IntoStatic for CommitOperation { 156 + type Output = CommitOperation; 157 + 158 + fn into_static(self) -> Self::Output { 159 + self 160 + } 161 + } 162 + 163 + impl IntoStatic for JetstreamCommit<'_> { 164 + type Output = JetstreamCommit<'static>; 165 + 166 + fn into_static(self) -> Self::Output { 167 + JetstreamCommit { 168 + rev: self.rev.into_static(), 169 + operation: self.operation, 170 + collection: self.collection.into_static(), 171 + rkey: self.rkey.into_static(), 172 + record: self.record.map(|r| r.into_static()), 173 + cid: self.cid.map(|c| c.into_static()), 174 + } 175 + } 176 + } 177 + 178 + impl IntoStatic for JetstreamIdentity<'_> { 179 + type Output = JetstreamIdentity<'static>; 180 + 181 + fn into_static(self) -> Self::Output { 182 + JetstreamIdentity { 183 + did: self.did.into_static(), 184 + handle: self.handle.map(|h| h.into_static()), 185 + seq: self.seq, 186 + time: self.time, 187 + } 188 + } 189 + } 190 + 191 + impl IntoStatic for JetstreamAccount<'_> { 192 + type Output = JetstreamAccount<'static>; 193 + 194 + fn into_static(self) -> Self::Output { 195 + JetstreamAccount { 196 + active: self.active, 197 + did: self.did.into_static(), 198 + seq: self.seq, 199 + time: self.time, 200 + status: self.status.map(|s| s.into_static()), 201 + } 202 + } 203 + } 204 + 205 + impl IntoStatic for JetstreamMessage<'_> { 206 + type Output = JetstreamMessage<'static>; 207 + 208 + fn into_static(self) -> Self::Output { 209 + match self { 210 + JetstreamMessage::Commit { 211 + did, 212 + time_us, 213 + commit, 214 + } => JetstreamMessage::Commit { 215 + did: did.into_static(), 216 + time_us, 217 + commit: commit.into_static(), 218 + }, 219 + JetstreamMessage::Identity { 220 + did, 221 + time_us, 222 + identity, 223 + } => JetstreamMessage::Identity { 224 + did: did.into_static(), 225 + time_us, 226 + identity: identity.into_static(), 227 + }, 228 + JetstreamMessage::Account { 229 + did, 230 + time_us, 231 + account, 232 + } => JetstreamMessage::Account { 233 + did: did.into_static(), 234 + time_us, 235 + account: account.into_static(), 236 + }, 237 + } 238 + } 239 + } 240 + 241 + /// Stream response type for Jetstream subscriptions 242 + pub struct JetstreamStream; 243 + 244 + impl SubscriptionResp for JetstreamStream { 245 + const NSID: &'static str = "jetstream"; 246 + const ENCODING: MessageEncoding = MessageEncoding::Json; 247 + 248 + /// Typed Jetstream message 249 + type Message<'de> = JetstreamMessage<'de>; 250 + 251 + /// Generic error type 252 + type Error<'de> = crate::xrpc::GenericError<'de>; 253 + } 254 + 255 + impl<'a> XrpcSubscription for JetstreamParams<'a> { 256 + const NSID: &'static str = "jetstream"; 257 + const ENCODING: MessageEncoding = MessageEncoding::Json; 258 + const CUSTOM_PATH: Option<&'static str> = Some("/subscribe"); 259 + type Stream = JetstreamStream; 260 + } 261 + 262 + impl IntoStatic for JetstreamParams<'_> { 263 + type Output = JetstreamParams<'static>; 264 + 265 + fn into_static(self) -> Self::Output { 266 + JetstreamParams { 267 + wanted_collections: self 268 + .wanted_collections 269 + .map(|v| v.into_iter().map(|s| s.into_static()).collect()), 270 + wanted_dids: self 271 + .wanted_dids 272 + .map(|v| v.into_iter().map(|s| s.into_static()).collect()), 273 + cursor: self.cursor, 274 + max_message_size_bytes: self.max_message_size_bytes, 275 + compress: self.compress, 276 + require_hello: self.require_hello, 277 + } 278 + } 279 + }
+3
crates/jacquard-common/src/lib.rs
··· 234 234 pub mod websocket; 235 235 236 236 #[cfg(feature = "websocket")] 237 + pub mod jetstream; 238 + 239 + #[cfg(feature = "websocket")] 237 240 pub use websocket::{ 238 241 tungstenite_client::TungsteniteClient, CloseCode, CloseFrame, WebSocketClient, 239 242 WebSocketConnection, WsMessage, WsSink, WsStream, WsText,
+25 -27
crates/jacquard-common/src/types/cid.rs
··· 164 164 } 165 165 } 166 166 167 - // TODO: take another look at this, see if we can do more borrowed and such 168 167 impl<'de, 'a> Deserialize<'de> for Cid<'a> 169 168 where 170 169 'de: 'a, ··· 173 172 where 174 173 D: Deserializer<'de>, 175 174 { 176 - struct CidVisitor; 175 + if deserializer.is_human_readable() { 176 + // JSON: always a string 177 + struct StrVisitor; 177 178 178 - impl<'de> Visitor<'de> for CidVisitor { 179 - type Value = Cid<'de>; 179 + impl<'de> Visitor<'de> for StrVisitor { 180 + type Value = Cid<'de>; 180 181 181 - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 182 - formatter.write_str("either valid IPLD CID bytes or a str") 183 - } 182 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 183 + formatter.write_str("a CID string") 184 + } 184 185 185 - fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> 186 - where 187 - E: serde::de::Error, 188 - { 189 - Ok(Cid::str(v)) 190 - } 186 + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> 187 + where 188 + E: serde::de::Error, 189 + { 190 + Ok(Cid::str(v)) 191 + } 191 192 192 - fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> 193 - where 194 - E: serde::de::Error, 195 - { 196 - Ok(FromStr::from_str(v).unwrap()) 193 + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> 194 + where 195 + E: serde::de::Error, 196 + { 197 + Ok(FromStr::from_str(v).unwrap()) 198 + } 197 199 } 198 200 199 - fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> 200 - where 201 - E: serde::de::Error, 202 - { 203 - let hash = cid::multihash::Multihash::from_bytes(v).map_err(|e| E::custom(e))?; 204 - Ok(Cid::ipld(IpldCid::new_v1(ATP_CID_CODEC, hash))) 205 - } 201 + deserializer.deserialize_str(StrVisitor) 202 + } else { 203 + // CBOR: use IpldCid's deserializer which handles CBOR tag 42 204 + let cid = IpldCid::deserialize(deserializer)?; 205 + Ok(Cid::ipld(cid)) 206 206 } 207 - 208 - deserializer.deserialize_any(CidVisitor) 209 207 } 210 208 } 211 209
+147 -34
crates/jacquard-common/src/xrpc/subscription.rs
··· 10 10 use std::marker::PhantomData; 11 11 use url::Url; 12 12 13 + use crate::error::DecodeError; 13 14 use crate::stream::StreamError; 14 15 use crate::websocket::{WebSocketClient, WebSocketConnection, WsSink, WsStream}; 15 16 use crate::{CowStr, Data, IntoStatic, RawData, WsMessage}; ··· 42 43 43 44 /// Error union type 44 45 type Error<'de>: Error + Deserialize<'de> + IntoStatic; 46 + 47 + /// Decode a message from bytes. 48 + /// 49 + /// Default implementation uses simple deserialization via serde. 50 + /// Subscriptions that use framed encoding (header + body) can override 51 + /// this to do two-stage deserialization. 52 + fn decode_message<'de>(bytes: &'de [u8]) -> Result<Self::Message<'de>, DecodeError> { 53 + match Self::ENCODING { 54 + MessageEncoding::Json => serde_json::from_slice(bytes).map_err(DecodeError::from), 55 + MessageEncoding::DagCbor => { 56 + serde_ipld_dagcbor::from_slice(bytes).map_err(DecodeError::from) 57 + } 58 + } 59 + } 45 60 } 46 61 47 62 /// XRPC subscription (WebSocket) ··· 57 72 /// Message encoding (JSON or DAG-CBOR) 58 73 const ENCODING: MessageEncoding; 59 74 75 + /// Custom path override (e.g., "/subscribe" for Jetstream). 76 + /// If None, defaults to "/xrpc/{NSID}" 77 + const CUSTOM_PATH: Option<&'static str> = None; 78 + 60 79 /// Stream response type (marker struct) 61 80 type Stream: SubscriptionResp; 62 81 ··· 79 98 } 80 99 } 81 100 101 + #[derive(Debug, serde::Deserialize)] 102 + pub struct EventHeader { 103 + pub op: i64, 104 + pub t: smol_str::SmolStr, // type discriminator like "#commit" 105 + } 106 + 107 + pub fn parse_event_header<'a>(bytes: &'a [u8]) -> Result<(EventHeader, &'a [u8]), DecodeError> { 108 + let mut cursor = std::io::Cursor::new(bytes); 109 + let header: EventHeader = ciborium::de::from_reader(&mut cursor)?; 110 + let position = cursor.position() as usize; 111 + drop(cursor); // explicit drop before reborrowing bytes 112 + 113 + Ok((header, &bytes[position..])) 114 + } 115 + 82 116 /// Decode JSON messages from a WebSocket stream 83 - fn decode_json_msg<S: SubscriptionResp>( 117 + pub fn decode_json_msg<S: SubscriptionResp>( 84 118 msg_result: Result<crate::websocket::WsMessage, StreamError>, 85 119 ) -> Option<Result<StreamMessage<'static, S>, StreamError>> 86 120 where ··· 88 122 { 89 123 use crate::websocket::WsMessage; 90 124 91 - fn parse_msg<'a, S: SubscriptionResp>( 92 - bytes: &'a [u8], 93 - ) -> Result<S::Message<'a>, serde_json::Error> { 94 - serde_json::from_slice(bytes) 95 - } 96 - 97 125 match msg_result { 98 126 Ok(WsMessage::Text(text)) => Some( 99 - parse_msg::<S>(text.as_ref()) 127 + S::decode_message(text.as_ref()) 100 128 .map(|v| v.into_static()) 101 129 .map_err(StreamError::decode), 102 130 ), 103 - Ok(WsMessage::Binary(bytes)) => Some( 104 - parse_msg::<S>(&bytes) 105 - .map(|v| v.into_static()) 106 - .map_err(StreamError::decode), 107 - ), 131 + Ok(WsMessage::Binary(bytes)) => { 132 + #[cfg(feature = "zstd")] 133 + { 134 + // Try to decompress with zstd first (Jetstream uses zstd compression) 135 + match decompress_zstd(&bytes) { 136 + Ok(decompressed) => Some( 137 + S::decode_message(&decompressed) 138 + .map(|v| v.into_static()) 139 + .map_err(StreamError::decode), 140 + ), 141 + Err(_) => { 142 + // Not zstd-compressed, try direct decode 143 + Some( 144 + S::decode_message(&bytes) 145 + .map(|v| v.into_static()) 146 + .map_err(StreamError::decode), 147 + ) 148 + } 149 + } 150 + } 151 + #[cfg(not(feature = "zstd"))] 152 + { 153 + Some( 154 + S::decode_message(&bytes) 155 + .map(|v| v.into_static()) 156 + .map_err(StreamError::decode), 157 + ) 158 + } 159 + } 108 160 Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())), 109 161 Err(e) => Some(Err(e)), 110 162 } 111 163 } 112 164 165 + #[cfg(feature = "zstd")] 166 + fn decompress_zstd(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> { 167 + use std::sync::OnceLock; 168 + use zstd::stream::decode_all; 169 + 170 + static DICTIONARY: OnceLock<Vec<u8>> = OnceLock::new(); 171 + 172 + let dict = DICTIONARY.get_or_init(|| { 173 + include_bytes!("../../zstd_dictionary").to_vec() 174 + }); 175 + 176 + decode_all(std::io::Cursor::new(bytes)) 177 + .or_else(|_| { 178 + // Try with dictionary 179 + let mut decoder = zstd::Decoder::with_dictionary(std::io::Cursor::new(bytes), dict)?; 180 + let mut result = Vec::new(); 181 + std::io::Read::read_to_end(&mut decoder, &mut result)?; 182 + Ok(result) 183 + }) 184 + } 185 + 113 186 /// Decode CBOR messages from a WebSocket stream 114 - fn decode_cbor_msg<S: SubscriptionResp>( 187 + pub fn decode_cbor_msg<S: SubscriptionResp>( 115 188 msg_result: Result<crate::websocket::WsMessage, StreamError>, 116 189 ) -> Option<Result<StreamMessage<'static, S>, StreamError>> 117 190 where ··· 119 192 { 120 193 use crate::websocket::WsMessage; 121 194 122 - fn parse_cbor<'a, S: SubscriptionResp>( 123 - bytes: &'a [u8], 124 - ) -> Result<S::Message<'a>, serde_ipld_dagcbor::DecodeError<std::convert::Infallible>> { 125 - serde_ipld_dagcbor::from_slice(bytes) 126 - } 127 - 128 195 match msg_result { 129 196 Ok(WsMessage::Binary(bytes)) => Some( 130 - parse_cbor::<S>(&bytes) 197 + S::decode_message(&bytes) 131 198 .map(|v| v.into_static()) 132 - .map_err(|e| StreamError::decode(crate::error::DecodeError::from(e))), 199 + .map_err(StreamError::decode), 133 200 ), 134 201 Ok(WsMessage::Text(_)) => Some(Err(StreamError::wrong_message_format( 135 202 "expected binary frame for CBOR, got text", ··· 223 290 .map(|v| v.into_static()) 224 291 .map_err(StreamError::decode), 225 292 ), 226 - Ok(WsMessage::Binary(bytes)) => Some( 227 - parse_msg(&bytes) 228 - .map(|v| v.into_static()) 229 - .map_err(StreamError::decode), 230 - ), 293 + Ok(WsMessage::Binary(bytes)) => { 294 + #[cfg(feature = "zstd")] 295 + { 296 + match decompress_zstd(&bytes) { 297 + Ok(decompressed) => Some( 298 + parse_msg(&decompressed) 299 + .map(|v| v.into_static()) 300 + .map_err(StreamError::decode), 301 + ), 302 + Err(_) => Some( 303 + parse_msg(&bytes) 304 + .map(|v| v.into_static()) 305 + .map_err(StreamError::decode), 306 + ), 307 + } 308 + } 309 + #[cfg(not(feature = "zstd"))] 310 + { 311 + Some( 312 + parse_msg(&bytes) 313 + .map(|v| v.into_static()) 314 + .map_err(StreamError::decode), 315 + ) 316 + } 317 + } 231 318 Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())), 232 319 Err(e) => Some(Err(e)), 233 320 }) ··· 276 363 .map(|v| v.into_static()) 277 364 .map_err(StreamError::decode), 278 365 ), 279 - Ok(WsMessage::Binary(bytes)) => Some( 280 - parse_msg(&bytes) 281 - .map(|v| v.into_static()) 282 - .map_err(StreamError::decode), 283 - ), 366 + Ok(WsMessage::Binary(bytes)) => { 367 + #[cfg(feature = "zstd")] 368 + { 369 + match decompress_zstd(&bytes) { 370 + Ok(decompressed) => Some( 371 + parse_msg(&decompressed) 372 + .map(|v| v.into_static()) 373 + .map_err(StreamError::decode), 374 + ), 375 + Err(_) => Some( 376 + parse_msg(&bytes) 377 + .map(|v| v.into_static()) 378 + .map_err(StreamError::decode), 379 + ), 380 + } 381 + } 382 + #[cfg(not(feature = "zstd"))] 383 + { 384 + Some( 385 + parse_msg(&bytes) 386 + .map(|v| v.into_static()) 387 + .map_err(StreamError::decode), 388 + ) 389 + } 390 + } 284 391 Ok(WsMessage::Close(_)) => Some(Err(StreamError::closed())), 285 392 Err(e) => Some(Err(e)), 286 393 }) ··· 439 546 Sub: XrpcSubscription, 440 547 { 441 548 let mut url = self.base.clone(); 549 + 550 + // Use custom path if provided, otherwise construct from NSID 442 551 let mut path = url.path().trim_end_matches('/').to_owned(); 443 - path.push_str("/xrpc/"); 444 - path.push_str(Sub::NSID); 552 + if let Some(custom_path) = Sub::CUSTOM_PATH { 553 + path.push_str(custom_path); 554 + } else { 555 + path.push_str("/xrpc/"); 556 + path.push_str(Sub::NSID); 557 + } 445 558 url.set_path(&path); 446 559 447 560 let query_params = params.query_params();
crates/jacquard-common/zstd_dictionary

This is a binary file and will not be displayed.

+47 -1
crates/jacquard-lexicon/src/codegen/xrpc.rs
··· 224 224 let enum_ident = syn::Ident::new(&enum_name, proc_macro2::Span::call_site()); 225 225 226 226 let mut variants = Vec::new(); 227 + let mut decode_arms = Vec::new(); 228 + 227 229 for ref_str in &union.refs { 228 230 let ref_str_s = ref_str.as_ref(); 229 231 ··· 255 257 #[serde(rename = #ref_str_s)] 256 258 #variant_ident(Box<#type_path>) 257 259 }); 260 + 261 + // Generate decode arm for framed decoding 262 + decode_arms.push(quote! { 263 + #ref_str_s => { 264 + let variant = serde_ipld_dagcbor::from_slice(body)?; 265 + Ok(Self::#variant_ident(Box::new(variant))) 266 + } 267 + }); 258 268 } 259 269 260 270 let doc = self.generate_doc_comment(union.description.as_ref()); 261 271 272 + // Generate decode_framed method for DAG-CBOR subscriptions 273 + let decode_framed_impl = quote! { 274 + impl<'a> #enum_ident<'a> { 275 + /// Decode a framed DAG-CBOR message (header + body). 276 + pub fn decode_framed<'de: 'a>(bytes: &'de [u8]) -> Result<#enum_ident<'a>, jacquard_common::error::DecodeError> { 277 + let (header, body) = jacquard_common::xrpc::subscription::parse_event_header(bytes)?; 278 + match header.t.as_str() { 279 + #(#decode_arms)* 280 + unknown => Err(jacquard_common::error::DecodeError::UnknownEventType( 281 + unknown.into() 282 + )), 283 + } 284 + } 285 + } 286 + }; 287 + 262 288 Ok(quote! { 263 289 #doc 264 290 #[jacquard_derive::open_union] ··· 268 294 pub enum #enum_ident<'a> { 269 295 #(#variants,)* 270 296 } 297 + 298 + #decode_framed_impl 271 299 }) 272 300 } 273 301 LexXrpcSubscriptionMessageSchema::Object(obj) => { ··· 1252 1280 1253 1281 // Determine encoding from nsid convention 1254 1282 // ATProto subscriptions use DAG-CBOR, community ones might use JSON 1255 - let encoding = if nsid.starts_with("com.atproto") { 1283 + let is_dag_cbor = nsid.starts_with("com.atproto"); 1284 + let encoding = if is_dag_cbor { 1256 1285 quote! { jacquard_common::xrpc::MessageEncoding::DagCbor } 1257 1286 } else { 1258 1287 quote! { jacquard_common::xrpc::MessageEncoding::Json } 1259 1288 }; 1260 1289 1261 1290 // Generate SubscriptionResp impl 1291 + // For DAG-CBOR subscriptions, override decode_message to use framed decoding 1292 + let decode_message_override = if is_dag_cbor && has_message { 1293 + let msg_ident = syn::Ident::new( 1294 + &format!("{}Message", type_base), 1295 + proc_macro2::Span::call_site(), 1296 + ); 1297 + quote! { 1298 + fn decode_message<'de>(bytes: &'de [u8]) -> Result<Self::Message<'de>, jacquard_common::error::DecodeError> { 1299 + #msg_ident::decode_framed(bytes) 1300 + } 1301 + } 1302 + } else { 1303 + quote! {} 1304 + }; 1305 + 1262 1306 let stream_resp_impl = quote! { 1263 1307 #[doc = "Stream response type for "] 1264 1308 #[doc = #nsid] ··· 1270 1314 1271 1315 type Message<'de> = #message_type; 1272 1316 type Error<'de> = #error_type; 1317 + 1318 + #decode_message_override 1273 1319 } 1274 1320 }; 1275 1321
+1 -1
crates/jacquard-oauth/src/client.rs
··· 815 815 Sub: XrpcSubscription + Send + Sync, 816 816 { 817 817 use jacquard_common::xrpc::SubscriptionExt; 818 - let base = self.base_uri(); 818 + let base = self.base_uri().await; 819 819 self.subscription(base) 820 820 .with_options(opts) 821 821 .subscribe(params)
+11
crates/jacquard/Cargo.toml
··· 110 110 path = "../../examples/app_password_create_post.rs" 111 111 required-features = ["api_bluesky"] 112 112 113 + [[example]] 114 + name = "subscribe_repos" 115 + path = "../../examples/subscribe_repos.rs" 116 + required-features = ["api_bluesky", "streaming"] 117 + 118 + [[example]] 119 + name = "subscribe_jetstream" 120 + path = "../../examples/subscribe_jetstream.rs" 121 + required-features = ["streaming"] 122 + 113 123 114 124 [dependencies] 115 125 jacquard-api = { version = "0.5", path = "../jacquard-api" } ··· 156 166 [dev-dependencies] 157 167 clap.workspace = true 158 168 miette = { workspace = true, features = ["fancy"] } 169 + tokio = { workspace = true, features = ["signal"] } 159 170 viuer = { version = "0.9", features = ["print-file", "sixel"] } 160 171 tiff = { version = "0.6.0-alpha" } 161 172 image = { version = "0.25" }
+5 -22
crates/jacquard/src/client/credential_session.rs
··· 769 769 T: Send + Sync + 'static, 770 770 W: WebSocketClient + Send + Sync, 771 771 { 772 - fn base_uri(&self) -> Url { 773 - #[cfg(not(target_arch = "wasm32"))] 774 - if tokio::runtime::Handle::try_current().is_ok() { 775 - tokio::task::block_in_place(|| { 776 - self.endpoint.blocking_read().clone().unwrap_or( 777 - Url::parse("https://public.bsky.app") 778 - .expect("public appview should be valid url"), 779 - ) 780 - }) 781 - } else { 782 - self.endpoint.blocking_read().clone().unwrap_or( 783 - Url::parse("https://public.bsky.app").expect("public appview should be valid url"), 784 - ) 785 - } 786 - 787 - #[cfg(target_arch = "wasm32")] 788 - { 789 - self.endpoint.blocking_read().clone().unwrap_or( 790 - Url::parse("https://public.bsky.app").expect("public appview should be valid url"), 791 - ) 792 - } 772 + async fn base_uri(&self) -> Url { 773 + self.endpoint.read().await.clone().unwrap_or( 774 + Url::parse("https://public.bsky.app").expect("public appview should be valid url"), 775 + ) 793 776 } 794 777 795 778 async fn subscription_opts(&self) -> jacquard_common::xrpc::SubscriptionOptions<'_> { ··· 825 808 Sub: XrpcSubscription + Send + Sync, 826 809 { 827 810 use jacquard_common::xrpc::SubscriptionExt; 828 - let base = self.base_uri(); 811 + let base = self.base_uri().await; 829 812 self.subscription(base) 830 813 .with_options(opts) 831 814 .subscribe(params)
+133
examples/subscribe_jetstream.rs
··· 1 + //! Example: Subscribe to Jetstream firehose 2 + //! 3 + //! Jetstream is a JSON-based alternative to the standard DAG-CBOR firehose. 4 + //! It streams all public network updates in a simplified format. 5 + //! 6 + //! Usage: 7 + //! cargo run --example subscribe_jetstream 8 + //! cargo run --example subscribe_jetstream -- jetstream2.us-west.bsky.network 9 + 10 + use clap::Parser; 11 + use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams}; 12 + use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 13 + use miette::IntoDiagnostic; 14 + use n0_future::StreamExt; 15 + use url::Url; 16 + 17 + #[derive(Parser, Debug)] 18 + #[command(author, version, about = "Subscribe to Jetstream firehose")] 19 + struct Args { 20 + /// Jetstream URL (e.g., jetstream1.us-east.fire.hose.cam) 21 + #[arg(default_value = "jetstream1.us-east.fire.hose.cam")] 22 + jetstream_url: String, 23 + } 24 + 25 + fn normalize_url(input: &str) -> Result<Url, url::ParseError> { 26 + // Strip any existing scheme 27 + let without_scheme = input 28 + .trim_start_matches("https://") 29 + .trim_start_matches("http://") 30 + .trim_start_matches("wss://") 31 + .trim_start_matches("ws://"); 32 + 33 + // Prepend wss:// and parse 34 + Url::parse(&format!("wss://{}", without_scheme)) 35 + } 36 + 37 + fn print_message(msg: &JetstreamMessage) { 38 + match msg { 39 + JetstreamMessage::Commit { 40 + did, 41 + time_us, 42 + commit, 43 + } => { 44 + let op = match commit.operation { 45 + CommitOperation::Create => "create", 46 + CommitOperation::Update => "update", 47 + CommitOperation::Delete => "delete", 48 + }; 49 + println!( 50 + "Commit | did={} time_us={} op={} collection={} rkey={} cid={:?}", 51 + did, time_us, op, commit.collection, commit.rkey, commit.cid 52 + ); 53 + } 54 + JetstreamMessage::Identity { 55 + did, 56 + time_us, 57 + identity, 58 + } => { 59 + println!( 60 + "Identity | did={} time_us={} handle={:?} seq={} time={}", 61 + did, time_us, identity.handle, identity.seq, identity.time 62 + ); 63 + } 64 + JetstreamMessage::Account { 65 + did, 66 + time_us, 67 + account, 68 + } => { 69 + println!( 70 + "Account | did={} time_us={} active={} seq={} time={} status={:?}", 71 + did, time_us, account.active, account.seq, account.time, account.status 72 + ); 73 + } 74 + } 75 + } 76 + 77 + #[tokio::main] 78 + async fn main() -> miette::Result<()> { 79 + let args = Args::parse(); 80 + 81 + let base_url = normalize_url(&args.jetstream_url).into_diagnostic()?; 82 + println!("Connecting to {}", base_url); 83 + 84 + // Create subscription client 85 + let client = TungsteniteSubscriptionClient::from_base_uri(base_url); 86 + 87 + // Subscribe with no filters (firehose mode) 88 + let mut params_builder = JetstreamParams::new(); 89 + 90 + // Enable compression if zstd feature is available 91 + #[cfg(feature = "zstd")] 92 + { 93 + params_builder = params_builder.compress(true); 94 + } 95 + 96 + let params = params_builder.build(); 97 + let stream = client.subscribe(&params).await.into_diagnostic()?; 98 + 99 + println!("Connected! Streaming messages (Ctrl-C to stop)...\n"); 100 + 101 + // Set up Ctrl-C handler 102 + let (tx, mut rx) = tokio::sync::oneshot::channel(); 103 + tokio::spawn(async move { 104 + tokio::signal::ctrl_c().await.ok(); 105 + let _ = tx.send(()); 106 + }); 107 + 108 + // Convert to typed message stream 109 + let (_sink, mut messages) = stream.into_stream(); 110 + 111 + let mut count = 0u64; 112 + 113 + loop { 114 + tokio::select! { 115 + Some(result) = messages.next() => { 116 + match result { 117 + Ok(msg) => { 118 + count += 1; 119 + print_message(&msg); 120 + } 121 + Err(e) => eprintln!("Error: {}", e), 122 + } 123 + } 124 + _ = &mut rx => { 125 + println!("\nReceived {} messages", count); 126 + println!("Shutting down..."); 127 + break; 128 + } 129 + } 130 + } 131 + 132 + Ok(()) 133 + }
+130
examples/subscribe_repos.rs
··· 1 + //! Example: Subscribe to a PDS's subscribeRepos endpoint 2 + //! 3 + //! This demonstrates consuming the repo event stream directly from a PDS, 4 + //! which is what a Relay does to ingest updates from PDSes. 5 + //! 6 + //! Usage: 7 + //! cargo run --example subscribe_repos -- atproto.systems 8 + 9 + use clap::Parser; 10 + use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 11 + use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 12 + use miette::IntoDiagnostic; 13 + use n0_future::StreamExt; 14 + use url::Url; 15 + 16 + #[derive(Parser, Debug)] 17 + #[command(author, version, about = "Subscribe to a PDS's subscribeRepos endpoint")] 18 + struct Args { 19 + /// PDS URL (e.g., atproto.systems or https://atproto.systems) 20 + pds_url: String, 21 + 22 + /// Starting cursor position 23 + #[arg(short, long)] 24 + cursor: Option<i64>, 25 + } 26 + 27 + fn normalize_url(input: &str) -> Result<Url, url::ParseError> { 28 + // Strip any existing scheme 29 + let without_scheme = input 30 + .trim_start_matches("https://") 31 + .trim_start_matches("http://") 32 + .trim_start_matches("wss://") 33 + .trim_start_matches("ws://"); 34 + 35 + // Prepend wss:// 36 + Url::parse(&format!("wss://{}", without_scheme)) 37 + } 38 + 39 + fn print_message(msg: &SubscribeReposMessage) { 40 + match msg { 41 + SubscribeReposMessage::Commit(commit) => { 42 + println!( 43 + "Commit | repo={} seq={} time={} rev={} commit={} ops={} prev={}", 44 + commit.repo, 45 + commit.seq, 46 + commit.time, 47 + commit.rev, 48 + commit.commit, 49 + commit.ops.len(), 50 + commit.since, 51 + ); 52 + } 53 + SubscribeReposMessage::Identity(identity) => { 54 + println!( 55 + "Identity | did={} seq={} time={} handle={:?}", 56 + identity.did, identity.seq, identity.time, identity.handle 57 + ); 58 + } 59 + SubscribeReposMessage::Account(account) => { 60 + println!( 61 + "Account | did={} seq={} time={} active={} status={:?}", 62 + account.did, account.seq, account.time, account.active, account.status 63 + ); 64 + } 65 + SubscribeReposMessage::Sync(sync) => { 66 + println!( 67 + "Sync | did={} seq={} time={} rev={} blocks={}b", 68 + sync.did, 69 + sync.seq, 70 + sync.time, 71 + sync.rev, 72 + sync.blocks.len() 73 + ); 74 + } 75 + SubscribeReposMessage::Info(info) => { 76 + println!("Info | name={} message={:?}", info.name, info.message); 77 + } 78 + SubscribeReposMessage::Unknown(data) => { 79 + println!("Unknown message: {:?}", data); 80 + } 81 + } 82 + } 83 + 84 + #[tokio::main] 85 + async fn main() -> miette::Result<()> { 86 + let args = Args::parse(); 87 + 88 + let base_url = normalize_url(&args.pds_url).into_diagnostic()?; 89 + println!("Connecting to {}", base_url); 90 + 91 + // Create subscription client 92 + let client = TungsteniteSubscriptionClient::from_base_uri(base_url); 93 + 94 + // Subscribe with optional cursor 95 + let params = if let Some(cursor) = args.cursor { 96 + SubscribeRepos::new().cursor(cursor).build() 97 + } else { 98 + SubscribeRepos::new().build() 99 + }; 100 + let stream = client.subscribe(&params).await.into_diagnostic()?; 101 + 102 + println!("Connected! Streaming messages (Ctrl-C to stop)...\n"); 103 + 104 + // Set up Ctrl-C handler 105 + let (tx, mut rx) = tokio::sync::oneshot::channel(); 106 + tokio::spawn(async move { 107 + tokio::signal::ctrl_c().await.ok(); 108 + let _ = tx.send(()); 109 + }); 110 + 111 + // Convert to typed message stream 112 + let (_sink, mut messages) = stream.into_stream(); 113 + 114 + loop { 115 + tokio::select! { 116 + Some(result) = messages.next() => { 117 + match result { 118 + Ok(msg) => print_message(&msg), 119 + Err(e) => eprintln!("Error: {}", e), 120 + } 121 + } 122 + _ = &mut rx => { 123 + println!("\nShutting down..."); 124 + break; 125 + } 126 + } 127 + } 128 + 129 + Ok(()) 130 + }
+1 -1
justfile
··· 56 56 example NAME *ARGS: 57 57 #!/usr/bin/env bash 58 58 if [ -f "examples/{{NAME}}.rs" ]; then 59 - cargo run -p jacquard --features=api_bluesky --example {{NAME}} -- {{ARGS}} 59 + cargo run -p jacquard --features=api_bluesky,streaming --example {{NAME}} -- {{ARGS}} 60 60 elif cargo metadata --format-version=1 --no-deps | \ 61 61 jq -e '.packages[] | select(.name == "jacquard-axum") | .targets[] | select(.kind[] == "example" and .name == "{{NAME}}")' > /dev/null; then 62 62 cargo run -p jacquard-axum --example {{NAME}} -- {{ARGS}}