A better Rust ATProto crate

disabling streaming support by default for now

Orual 6b0272d9 d45b5d1b

Changed files
+90 -82
crates
jacquard
jacquard-api
src
com_atproto
jacquard-common
jacquard-oauth
+2
crates/jacquard-api/Cargo.toml
··· 36 36 lexicon_community = ["community_lexicon", "my_skylights", "com_whtwnd", "social_psky", "blue_linkat", "minimal"] 37 37 ufos = [ "moe_karashiiro", "dev_regnault", "buzz_bookhive", "uk_ewancroft", "win_tomo_x", "dev_fudgeu", "dev_ocbwoy3", "blog_pckt", "net_bnewbold", "club_stellz", "dev_baileytownsend", "net_aftertheinter", "org_devcon","org_robocracy", "garden_lexicon", "social_clippr", "social_grain", "social_pmsky", "com_crabdance", "app_blebbit", "app_ocho", "uk_skyblur", "us_polhem", "blue_atplane", "net_mmatt", "minimal", "app_rocksky"] 38 38 39 + streaming = [] 40 + 39 41 # --- generated --- 40 42 # Generated namespace features 41 43 app_blebbit = []
+7 -6
crates/jacquard-api/src/com_atproto/label.rs
··· 6 6 // Any manual changes will be overwritten on the next regeneration. 7 7 8 8 pub mod query_labels; 9 + #[cfg(feature = "streaming")] 9 10 pub mod subscribe_labels; 10 11 11 12 /// Metadata tag on an atproto resource (eg, repo or record). ··· 18 19 PartialEq, 19 20 Eq, 20 21 jacquard_derive::IntoStatic, 21 - bon::Builder 22 + bon::Builder, 22 23 )] 23 24 #[serde(rename_all = "camelCase")] 24 25 pub struct Label<'a> { ··· 188 189 PartialEq, 189 190 Eq, 190 191 jacquard_derive::IntoStatic, 191 - bon::Builder 192 + bon::Builder, 192 193 )] 193 194 #[serde(rename_all = "camelCase")] 194 195 pub struct LabelValueDefinition<'a> { ··· 227 228 PartialEq, 228 229 Eq, 229 230 jacquard_derive::IntoStatic, 230 - bon::Builder 231 + bon::Builder, 231 232 )] 232 233 #[serde(rename_all = "camelCase")] 233 234 pub struct LabelValueDefinitionStrings<'a> { ··· 253 254 PartialEq, 254 255 Eq, 255 256 jacquard_derive::IntoStatic, 256 - Default 257 + Default, 257 258 )] 258 259 #[serde(rename_all = "camelCase")] 259 260 pub struct SelfLabel<'a> { ··· 272 273 PartialEq, 273 274 Eq, 274 275 jacquard_derive::IntoStatic, 275 - bon::Builder 276 + bon::Builder, 276 277 )] 277 278 #[serde(rename_all = "camelCase")] 278 279 pub struct SelfLabels<'a> { 279 280 #[serde(borrow)] 280 281 pub values: Vec<crate::com_atproto::label::SelfLabel<'a>>, 281 - } 282 + }
+2 -1
crates/jacquard-api/src/com_atproto/sync.rs
··· 20 20 pub mod list_repos_by_collection; 21 21 pub mod notify_of_update; 22 22 pub mod request_crawl; 23 + #[cfg(feature = "streaming")] 23 24 pub mod subscribe_repos; 24 25 25 26 #[derive(Debug, Clone, PartialEq, Eq, Hash)] ··· 111 112 HostStatus::Other(v) => HostStatus::Other(v.into_static()), 112 113 } 113 114 } 114 - } 115 + }
+12 -11
crates/jacquard-common/Cargo.toml
··· 11 11 exclude.workspace = true 12 12 license.workspace = true 13 13 14 + [features] 15 + default = ["service-auth", "reqwest-client", "crypto"] 16 + crypto = [] 17 + crypto-ed25519 = ["crypto", "dep:ed25519-dalek"] 18 + crypto-k256 = ["crypto", "dep:k256", "k256/ecdsa"] 19 + crypto-p256 = ["crypto", "dep:p256", "p256/ecdsa"] 20 + service-auth = ["crypto-k256", "crypto-p256", "dep:signature"] 21 + reqwest-client = ["dep:reqwest"] 22 + tracing = ["dep:tracing"] 23 + streaming = ["n0-future", "futures"] 24 + websocket = ["streaming", "tokio-tungstenite-wasm"] 14 25 15 26 [dependencies] 16 27 trait-variant.workspace = true ··· 54 65 reqwest = { workspace = true, optional = true, features = [ "http2", "system-proxy", "rustls-tls"] } 55 66 tokio-util = { version = "0.7.16", features = ["io"] } 56 67 57 - [features] 58 - default = ["service-auth", "reqwest-client", "crypto", "websocket"] 59 - crypto = [] 60 - crypto-ed25519 = ["crypto", "dep:ed25519-dalek"] 61 - crypto-k256 = ["crypto", "dep:k256", "k256/ecdsa"] 62 - crypto-p256 = ["crypto", "dep:p256", "p256/ecdsa"] 63 - service-auth = ["crypto-k256", "crypto-p256", "dep:signature"] 64 - reqwest-client = ["dep:reqwest"] 65 - tracing = ["dep:tracing"] 66 - streaming = ["n0-future", "futures"] 67 - websocket = ["streaming", "tokio-tungstenite-wasm"] 68 + 68 69 69 70 [dependencies.ed25519-dalek] 70 71 version = "2"
-12
crates/jacquard-common/src/stream.rs
··· 167 167 168 168 impl ByteStream { 169 169 /// Create a new byte stream from any compatible stream 170 - #[cfg(not(target_arch = "wasm32"))] 171 170 pub fn new<S>(stream: S) -> Self 172 171 where 173 172 S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static, 174 - { 175 - Self { 176 - inner: Box::pin(stream), 177 - } 178 - } 179 - 180 - /// Create a new byte stream from any compatible stream 181 - #[cfg(target_arch = "wasm32")] 182 - pub fn new<S>(stream: S) -> Self 183 - where 184 - S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static, 185 173 { 186 174 Self { 187 175 inner: Box::pin(stream),
+2 -13
crates/jacquard-common/src/types/blob.rs
··· 1 - use crate::{ 2 - CowStr, IntoStatic, 3 - types::cid::{Cid, CidLink}, 4 - }; 5 - #[allow(unused)] 1 + use crate::{CowStr, IntoStatic, types::cid::CidLink}; 6 2 use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error}; 7 3 use smol_str::ToSmolStr; 8 4 use std::convert::Infallible; 9 - #[allow(unused)] 10 - use std::{ 11 - borrow::Cow, 12 - fmt, 13 - hash::{Hash, Hasher}, 14 - ops::Deref, 15 - str::FromStr, 16 - }; 5 + use std::{fmt, hash::Hash, ops::Deref, str::FromStr}; 17 6 18 7 /// Blob reference for binary data in AT Protocol 19 8 ///
+32 -19
crates/jacquard-common/src/xrpc.rs
··· 22 22 #[cfg(feature = "websocket")] 23 23 pub mod subscription; 24 24 25 - #[cfg(feature = "websocket")] 26 - pub use subscription::{ 27 - BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient, 28 - SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp, 29 - SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription, 30 - }; 31 - 25 + #[cfg(feature = "streaming")] 26 + use crate::StreamError; 27 + use crate::http_client::HttpClient; 28 + #[cfg(feature = "streaming")] 29 + use crate::http_client::HttpClientExt; 30 + use crate::types::value::Data; 31 + use crate::{AuthorizationToken, error::AuthError}; 32 + use crate::{CowStr, error::XrpcResult}; 33 + use crate::{IntoStatic, error::DecodeError}; 34 + use crate::{error::TransportError, types::value::RawData}; 32 35 use bytes::Bytes; 33 36 use http::{ 34 37 HeaderName, HeaderValue, Request, StatusCode, ··· 38 41 use smol_str::SmolStr; 39 42 use std::fmt::{self, Debug}; 40 43 use std::{error::Error, marker::PhantomData}; 44 + #[cfg(feature = "websocket")] 45 + pub use subscription::{ 46 + BasicSubscriptionClient, MessageEncoding, SubscriptionCall, SubscriptionClient, 47 + SubscriptionEndpoint, SubscriptionExt, SubscriptionOptions, SubscriptionResp, 48 + SubscriptionStream, TungsteniteSubscriptionClient, XrpcSubscription, 49 + }; 41 50 use url::Url; 42 - 43 - use crate::http_client::{HttpClient, HttpClientExt}; 44 - use crate::types::value::Data; 45 - use crate::{AuthorizationToken, error::AuthError}; 46 - use crate::{CowStr, error::XrpcResult}; 47 - use crate::{IntoStatic, error::DecodeError}; 48 - #[cfg(feature = "streaming")] 49 - use crate::StreamError; 50 - use crate::{error::TransportError, types::value::RawData}; 51 51 52 52 /// Error type for encoding XRPC requests 53 53 #[derive(Debug, thiserror::Error, miette::Diagnostic)] ··· 315 315 where 316 316 R: XrpcRequest + Send + Sync, 317 317 <R as XrpcRequest>::Response: Send + Sync; 318 - 319 318 } 320 319 321 320 /// Stateful XRPC streaming client trait ··· 347 346 fn stream<S>( 348 347 &self, 349 348 stream: XrpcProcedureSend<S::Frame<'static>>, 350 - ) -> impl Future<Output = Result<XrpcResponseStream<<<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>>, StreamError>> 349 + ) -> impl Future< 350 + Output = Result< 351 + XrpcResponseStream< 352 + <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>, 353 + >, 354 + StreamError, 355 + >, 356 + > 351 357 where 352 358 S: XrpcProcedureStream + 'static, 353 359 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp, ··· 358 364 fn stream<S>( 359 365 &self, 360 366 stream: XrpcProcedureSend<S::Frame<'static>>, 361 - ) -> impl Future<Output = Result<XrpcResponseStream<<<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>>, StreamError>> 367 + ) -> impl Future< 368 + Output = Result< 369 + XrpcResponseStream< 370 + <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>, 371 + >, 372 + StreamError, 373 + >, 374 + > 362 375 where 363 376 S: XrpcProcedureStream + 'static, 364 377 <<S as XrpcProcedureStream>::Response as XrpcStreamResp>::Frame<'static>: XrpcStreamResp;
+9 -8
crates/jacquard-oauth/Cargo.toml
··· 11 11 exclude.workspace = true 12 12 license.workspace = true 13 13 14 + 15 + [features] 16 + default = [] 17 + loopback = ["dep:rouille"] 18 + browser-open = ["dep:webbrowser"] 19 + tracing = ["dep:tracing"] 20 + websocket = ["jacquard-common/websocket"] 21 + streaming = ["jacquard-common/streaming", "dep:n0-future"] 22 + 14 23 [dependencies] 15 24 jacquard-common = { version = "0.5", path = "../jacquard-common", features = ["reqwest-client"] } 16 25 jacquard-identity = { version = "0.5", path = "../jacquard-identity" } ··· 44 53 [target.'cfg(not(target_arch = "wasm32"))'.dependencies] 45 54 tokio = { workspace = true, features = ["rt", "net", "time"] } 46 55 rouille = { version = "3.6.2", optional = true } 47 - 48 - [features] 49 - default = [] 50 - loopback = ["dep:rouille"] 51 - browser-open = ["dep:webbrowser"] 52 - tracing = ["dep:tracing"] 53 - websocket = ["jacquard-common/websocket"] 54 - streaming = ["jacquard-common/streaming", "dep:n0-future"]
+4 -3
crates/jacquard/Cargo.toml
··· 12 12 license.workspace = true 13 13 14 14 [features] 15 - default = ["api_full", "dns", "loopback", "derive", "streaming"] 15 + default = ["api_full", "dns", "loopback", "derive"] 16 16 derive = ["dep:jacquard-derive"] 17 17 # Minimal API bindings 18 18 api = ["jacquard-api/minimal"] ··· 39 39 ] 40 40 dns = ["jacquard-identity/dns"] 41 41 streaming = [ 42 - "jacquard-common/streaming", 42 + "jacquard-common/websocket", 43 43 "jacquard-oauth/streaming", 44 44 "jacquard-identity/streaming", 45 45 "dep:n0-future", 46 - "dep:futures" 46 + "dep:futures", 47 + "jacquard-api/streaming", 47 48 ] 48 49 websocket = ["jacquard-common/websocket"] 49 50
+19 -9
crates/jacquard/src/client/credential_session.rs
··· 595 595 let mut opts = self.options.read().await.clone(); 596 596 opts.auth = self.access_token().await; 597 597 598 - let mut url = base_uri; 598 + let mut url = base_uri.clone(); 599 599 let mut path = url.path().trim_end_matches('/').to_owned(); 600 600 path.push_str("/xrpc/"); 601 601 path.push_str(<Str::Request as jacquard_common::xrpc::XrpcRequest>::NSID); ··· 642 642 let body_stream = 643 643 jacquard_common::stream::ByteStream::new(stream.0.map_ok(|f| f.buffer).boxed()); 644 644 645 + // Clone the stream for potential retry 646 + let (body1, body2) = body_stream.tee(); 647 + 645 648 let response = self 646 649 .client 647 - .send_http_bidirectional(parts.clone(), body_stream.into_inner()) 650 + .send_http_bidirectional(parts.clone(), body1.into_inner()) 648 651 .await 649 652 .map_err(StreamError::transport)?; 650 653 ··· 694 697 .map_err(|e| StreamError::protocol(e.to_string()))? 695 698 .into_parts(); 696 699 697 - // Can't retry with the same stream - it's been consumed 698 - // This is a limitation of streaming upload with auth refresh 699 - return Err(StreamError::protocol("Authentication failed on streaming upload and stream cannot be retried".to_string())); 700 + // Retry with the second half of the cloned stream 701 + let response = self 702 + .client 703 + .send_http_bidirectional(parts, body2.into_inner()) 704 + .await 705 + .map_err(StreamError::transport)?; 706 + let (resp_parts, resp_body) = response.into_parts(); 707 + Ok(jacquard_common::xrpc::streaming::XrpcResponseStream::from_typed_parts( 708 + resp_parts, resp_body, 709 + )) 710 + } else { 711 + Ok(jacquard_common::xrpc::streaming::XrpcResponseStream::from_typed_parts( 712 + resp_parts, resp_body, 713 + )) 700 714 } 701 - 702 - Ok(jacquard_common::xrpc::streaming::XrpcResponseStream::from_typed_parts( 703 - resp_parts, resp_body, 704 - )) 705 715 } 706 716 } 707 717
+1
crates/jacquard/src/lib.rs
··· 220 220 pub mod client; 221 221 222 222 #[cfg(feature = "streaming")] 223 + /// Experimental streaming endpoints 223 224 pub mod streaming; 224 225 225 226 pub use common::*;