A better Rust ATProto crate

reworking legacy session

Orual 3a7692c4 80ccbe95

Changed files
+215 -30
crates
jacquard
jacquard-common
jacquard-identity
src
jacquard-oauth
src
+1 -1
crates/jacquard-common/src/session.rs
··· 25 25 26 26 async fn access_token(&self) -> Result<AuthorizationToken, SessionStoreError>; 27 27 28 - async fn refresh(&self) -> Result<(), SessionStoreError>; 28 + async fn refresh(&self) -> Result<AuthorizationToken, SessionStoreError>; 29 29 } 30 30 31 31 /// Errors emitted by session stores.
+11 -11
crates/jacquard-identity/src/lib.rs
··· 36 36 use hickory_resolver::{TokioAsyncResolver, config::ResolverConfig}; 37 37 38 38 /// Default resolver implementation with configurable fallback order. 39 - pub struct DefaultResolver { 39 + pub struct JacquardResolver { 40 40 http: reqwest::Client, 41 41 opts: ResolverOptions, 42 42 #[cfg(feature = "dns")] 43 43 dns: Option<TokioAsyncResolver>, 44 44 } 45 45 46 - impl DefaultResolver { 46 + impl JacquardResolver { 47 47 /// Create a new instance of the default resolver with all options (except DNS) up front 48 48 pub fn new(http: reqwest::Client, opts: ResolverOptions) -> Self { 49 49 Self { ··· 189 189 } 190 190 } 191 191 192 - impl DefaultResolver { 192 + impl JacquardResolver { 193 193 /// Resolve handle to DID via a PDS XRPC call (stateless, unauth by default) 194 194 pub async fn resolve_handle_via_pds( 195 195 &self, ··· 270 270 } 271 271 272 272 #[async_trait::async_trait] 273 - impl IdentityResolver for DefaultResolver { 273 + impl IdentityResolver for JacquardResolver { 274 274 fn options(&self) -> &ResolverOptions { 275 275 &self.opts 276 276 } ··· 417 417 } 418 418 } 419 419 420 - impl HttpClient for DefaultResolver { 420 + impl HttpClient for JacquardResolver { 421 421 async fn send_http( 422 422 &self, 423 423 request: http::Request<Vec<u8>>, ··· 438 438 }, 439 439 } 440 440 441 - impl DefaultResolver { 441 + impl JacquardResolver { 442 442 /// Resolve a handle to its DID, fetch the DID document, and return doc plus any warnings. 443 443 /// This applies the default equality check on the document id (error with doc if mismatch). 444 444 pub async fn resolve_handle_and_doc( ··· 523 523 } 524 524 525 525 /// Resolver specialized for unauthenticated/public flows using reqwest and stateless XRPC 526 - pub type PublicResolver = DefaultResolver; 526 + pub type PublicResolver = JacquardResolver; 527 527 528 528 impl Default for PublicResolver { 529 529 /// Build a resolver with: ··· 539 539 fn default() -> Self { 540 540 let http = reqwest::Client::new(); 541 541 let opts = ResolverOptions::default(); 542 - let resolver = DefaultResolver::new(http, opts); 542 + let resolver = JacquardResolver::new(http, opts); 543 543 #[cfg(feature = "dns")] 544 544 let resolver = resolver.with_system_dns(); 545 545 resolver ··· 552 552 let http = reqwest::Client::new(); 553 553 let mut opts = ResolverOptions::default(); 554 554 opts.plc_source = PlcSource::slingshot_default(); 555 - let resolver = DefaultResolver::new(http, opts); 555 + let resolver = JacquardResolver::new(http, opts); 556 556 #[cfg(feature = "dns")] 557 557 let resolver = resolver.with_system_dns(); 558 558 resolver ··· 564 564 565 565 #[test] 566 566 fn did_web_urls() { 567 - let r = DefaultResolver::new(reqwest::Client::new(), ResolverOptions::default()); 567 + let r = JacquardResolver::new(reqwest::Client::new(), ResolverOptions::default()); 568 568 assert_eq!( 569 569 r.test_did_web_url_raw("did:web:example.com"), 570 570 "https://example.com/.well-known/did.json" ··· 577 577 578 578 #[test] 579 579 fn slingshot_mini_doc_url_build() { 580 - let r = DefaultResolver::new(reqwest::Client::new(), ResolverOptions::default()); 580 + let r = JacquardResolver::new(reqwest::Client::new(), ResolverOptions::default()); 581 581 let base = Url::parse("https://slingshot.microcosm.blue").unwrap(); 582 582 let url = r.slingshot_mini_doc_url(&base, "bad-example.com").unwrap(); 583 583 assert_eq!(
+12 -14
crates/jacquard-oauth/src/client.rs
··· 1 + use crate::{ 2 + atproto::atproto_client_metadata, 3 + authstore::ClientAuthStore, 4 + dpop::DpopExt, 5 + error::{OAuthError, Result}, 6 + request::{OAuthMetadata, exchange_code, par}, 7 + resolver::OAuthResolver, 8 + scopes::Scope, 9 + session::{ClientData, ClientSessionData, DpopClientData, SessionRegistry}, 10 + types::{AuthorizeOptions, CallbackParams}, 11 + }; 1 12 use jacquard_common::{ 2 13 AuthorizationToken, CowStr, IntoStatic, 3 14 error::{AuthError, ClientError, TransportError, XrpcResult}, ··· 8 19 }, 9 20 }; 10 21 use jose_jwk::JwkSet; 11 - use smol_str::SmolStr; 12 22 use std::sync::Arc; 13 23 use tokio::sync::RwLock; 14 24 use url::Url; 15 - 16 - use crate::{ 17 - atproto::atproto_client_metadata, 18 - authstore::ClientAuthStore, 19 - dpop::DpopExt, 20 - error::{OAuthError, Result}, 21 - request::{OAuthMetadata, exchange_code, par}, 22 - resolver::OAuthResolver, 23 - scopes::Scope, 24 - session::{ClientData, ClientSessionData, DpopClientData, SessionRegistry}, 25 - types::{AuthorizeOptions, CallbackParams}, 26 - }; 27 25 28 26 pub struct OAuthClient<T, S> 29 27 where ··· 242 240 (data.account_did.clone(), data.session_id.clone()) 243 241 } 244 242 245 - pub async fn pds(&self) -> Url { 243 + pub async fn endpoint(&self) -> Url { 246 244 self.data.read().await.host_url.clone() 247 245 } 248 246
+1 -3
crates/jacquard/src/client.rs
··· 4 4 //! client implementation that manages session tokens. 5 5 6 6 mod at_client; 7 - 7 + pub mod credential_session; 8 8 mod token; 9 9 10 10 pub use at_client::{AtClient, SendOverrides}; ··· 20 20 }; 21 21 pub use token::FileAuthStore; 22 22 use url::Url; 23 - 24 - // Note: Stateless and stateful XRPC clients are implemented in xrpc_call.rs and at_client.rs 25 23 26 24 pub(crate) const NSID_REFRESH_SESSION: &str = "com.atproto.server.refreshSession"; 27 25
+189
crates/jacquard/src/client/credential_session.rs
··· 1 + use std::sync::Arc; 2 + 3 + use jacquard_api::com_atproto::server::refresh_session::RefreshSession; 4 + use jacquard_common::{ 5 + AuthorizationToken, CowStr, IntoStatic, 6 + error::{AuthError, ClientError, XrpcResult}, 7 + http_client::HttpClient, 8 + session::SessionStore, 9 + types::{ 10 + did::Did, 11 + xrpc::{CallOptions, Response, XrpcClient, XrpcError, XrpcExt, XrpcRequest}, 12 + }, 13 + }; 14 + use tokio::sync::RwLock; 15 + use url::Url; 16 + 17 + use crate::client::{AtpSession, token::StoredSession}; 18 + 19 + pub type SessionKey = (Did<'static>, CowStr<'static>); 20 + 21 + pub struct CredentialSession<S, T> 22 + where 23 + S: SessionStore<SessionKey, AtpSession>, 24 + { 25 + store: Arc<S>, 26 + client: Arc<T>, 27 + pub options: RwLock<CallOptions<'static>>, 28 + pub key: RwLock<Option<SessionKey>>, 29 + pub endpoint: RwLock<Option<Url>>, 30 + } 31 + 32 + impl<S, T> CredentialSession<S, T> 33 + where 34 + S: SessionStore<SessionKey, AtpSession>, 35 + { 36 + pub fn new(store: Arc<S>, client: Arc<T>) -> Self { 37 + Self { 38 + store, 39 + client, 40 + options: RwLock::new(CallOptions::default()), 41 + key: RwLock::new(None), 42 + endpoint: RwLock::new(None), 43 + } 44 + } 45 + } 46 + 47 + impl<S, T> CredentialSession<S, T> 48 + where 49 + S: SessionStore<SessionKey, AtpSession>, 50 + { 51 + pub fn with_options(self, options: CallOptions<'_>) -> Self { 52 + Self { 53 + client: self.client, 54 + store: self.store, 55 + options: RwLock::new(options.into_static()), 56 + key: self.key, 57 + endpoint: self.endpoint, 58 + } 59 + } 60 + 61 + pub async fn set_options(&self, options: CallOptions<'_>) { 62 + *self.options.write().await = options.into_static(); 63 + } 64 + 65 + pub async fn session_info(&self) -> Option<SessionKey> { 66 + self.key.read().await.clone() 67 + } 68 + 69 + pub async fn endpoint(&self) -> Url { 70 + self.endpoint.read().await.clone().unwrap_or( 71 + Url::parse("https://public.bsky.app").expect("public appview should be valid url"), 72 + ) 73 + } 74 + 75 + pub async fn set_endpoint(&self, endpoint: Url) { 76 + *self.endpoint.write().await = Some(endpoint); 77 + } 78 + 79 + pub async fn access_token(&self) -> Option<AuthorizationToken<'_>> { 80 + let key = self.key.read().await.clone()?; 81 + let session = self.store.get(&key).await; 82 + session.map(|session| AuthorizationToken::Bearer(session.access_jwt)) 83 + } 84 + 85 + pub async fn refresh_token(&self) -> Option<AuthorizationToken<'_>> { 86 + let key = self.key.read().await.clone()?; 87 + let session = self.store.get(&key).await; 88 + session.map(|session| AuthorizationToken::Bearer(session.refresh_jwt)) 89 + } 90 + } 91 + 92 + impl<S, T> CredentialSession<S, T> 93 + where 94 + S: SessionStore<SessionKey, AtpSession>, 95 + T: HttpClient, 96 + { 97 + pub async fn refresh(&self) -> Result<AuthorizationToken<'_>, ClientError> { 98 + let key = self.key.read().await.clone().ok_or(ClientError::Auth( 99 + jacquard_common::error::AuthError::NotAuthenticated, 100 + ))?; 101 + let session = self.store.get(&key).await; 102 + let endpoint = self.endpoint().await; 103 + let mut opts = self.options.read().await.clone(); 104 + opts.auth = session.map(|s| AuthorizationToken::Bearer(s.refresh_jwt)); 105 + let response = self 106 + .client 107 + .xrpc(endpoint) 108 + .with_options(opts) 109 + .send(&RefreshSession) 110 + .await?; 111 + let refresh = response 112 + .into_output() 113 + .map_err(|_| ClientError::Auth(jacquard_common::error::AuthError::RefreshFailed))?; 114 + 115 + let new_session: AtpSession = refresh.into(); 116 + let token = AuthorizationToken::Bearer(new_session.access_jwt.clone()); 117 + self.store 118 + .set(key, new_session) 119 + .await 120 + .map_err(|_| ClientError::Auth(jacquard_common::error::AuthError::RefreshFailed))?; 121 + 122 + Ok(token) 123 + } 124 + } 125 + 126 + impl<S, T> HttpClient for CredentialSession<S, T> 127 + where 128 + S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, 129 + T: HttpClient + XrpcExt + Send + Sync + 'static, 130 + { 131 + type Error = T::Error; 132 + 133 + async fn send_http( 134 + &self, 135 + request: http::Request<Vec<u8>>, 136 + ) -> core::result::Result<http::Response<Vec<u8>>, Self::Error> { 137 + self.client.send_http(request).await 138 + } 139 + } 140 + 141 + impl<S, T> XrpcClient for CredentialSession<S, T> 142 + where 143 + S: SessionStore<SessionKey, AtpSession> + Send + Sync + 'static, 144 + T: HttpClient + XrpcExt + Send + Sync + 'static, 145 + { 146 + fn base_uri(&self) -> Url { 147 + self.endpoint.blocking_read().clone().unwrap_or( 148 + Url::parse("https://public.bsky.app").expect("public appview should be valid url"), 149 + ) 150 + } 151 + async fn send<R: jacquard_common::types::xrpc::XrpcRequest + Send>( 152 + self, 153 + request: &R, 154 + ) -> XrpcResult<Response<R>> { 155 + let base_uri = self.base_uri(); 156 + let auth = self.access_token().await; 157 + let mut opts = self.options.read().await.clone(); 158 + opts.auth = auth; 159 + let resp = self 160 + .client 161 + .xrpc(base_uri.clone()) 162 + .with_options(opts.clone()) 163 + .send(request) 164 + .await; 165 + 166 + if is_expired(&resp) { 167 + let auth = self.refresh().await?; 168 + opts.auth = Some(auth); 169 + self.client 170 + .xrpc(base_uri) 171 + .with_options(opts) 172 + .send(request) 173 + .await 174 + } else { 175 + resp 176 + } 177 + } 178 + } 179 + 180 + fn is_expired<R: XrpcRequest>(response: &XrpcResult<Response<R>>) -> bool { 181 + match response { 182 + Err(ClientError::Auth(AuthError::TokenExpired)) => true, 183 + Ok(resp) => match resp.parse() { 184 + Err(XrpcError::Auth(AuthError::TokenExpired)) => true, 185 + _ => false, 186 + }, 187 + _ => false, 188 + } 189 + }
+1 -1
crates/jacquard/src/client/token.rs
··· 15 15 use url::Url; 16 16 17 17 #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] 18 - enum StoredSession { 18 + pub enum StoredSession { 19 19 Atp(StoredAtSession), 20 20 OAuth(OAuthSession), 21 21 OAuthState(OAuthState),