Highly ambitious ATProtocol AppView service and sdks
at main 9.5 kB view raw
1//! Authentication and authorization utilities for OAuth and AT Protocol. 2//! 3//! This module provides functions for: 4//! - Extracting and validating OAuth bearer tokens 5//! - Verifying tokens with the authorization server 6//! - Managing AT Protocol DPoP (Demonstrating Proof-of-Possession) authentication 7//! - Caching authentication state for performance (5-minute TTL) 8 9use crate::cache::SliceCache; 10use atproto_client::client::DPoPAuth; 11use atproto_identity::key::KeyData; 12use atproto_oauth::jwk::WrappedJsonWebKey; 13use axum::http::{HeaderMap, StatusCode}; 14use serde::{Deserialize, Serialize}; 15use std::sync::Arc; 16use tokio::sync::Mutex; 17 18/// OAuth userinfo response containing the authenticated user's identity. 19#[derive(Serialize, Deserialize, Debug)] 20pub struct UserInfoResponse { 21 /// Subject identifier (user ID) from the OAuth provider 22 pub sub: String, 23 /// Decentralized identifier for the user in AT Protocol 24 pub did: Option<String>, 25} 26 27/// Cached AT Protocol session data to avoid repeated auth server requests. 28#[derive(Serialize, Deserialize, Debug, Clone)] 29struct CachedSession { 30 /// Personal Data Server endpoint URL for the user 31 pds_url: String, 32 /// AT Protocol access token for PDS operations 33 atproto_access_token: String, 34 /// DPoP JSON Web Key for proof-of-possession 35 dpop_jwk: serde_json::Value, 36} 37 38/// Extracts the bearer token from the Authorization header. 39/// 40/// # Arguments 41/// * `headers` - HTTP request headers 42/// 43/// # Returns 44/// * `Ok(String)` - The extracted bearer token 45/// * `Err(StatusCode::UNAUTHORIZED)` - If the header is missing, malformed, or not a Bearer token 46/// 47/// # Example 48/// ```ignore 49/// let token = extract_bearer_token(&headers)?; 50/// ``` 51pub fn extract_bearer_token(headers: &HeaderMap) -> Result<String, StatusCode> { 52 let auth_header = headers 53 .get("authorization") 54 .and_then(|h| h.to_str().ok()) 55 .ok_or(StatusCode::UNAUTHORIZED)?; 56 57 if !auth_header.starts_with("Bearer ") { 58 return Err(StatusCode::UNAUTHORIZED); 59 } 60 61 // Safe to unwrap since we just verified the prefix exists 62 let token = auth_header.strip_prefix("Bearer ").unwrap().to_string(); 63 Ok(token) 64} 65 66/// Verifies an OAuth bearer token with the authorization server. 67/// 68/// This function first checks the cache for a previously validated token to avoid 69/// unnecessary network calls. If not found in cache, it validates with the auth server 70/// and caches the result for 5 minutes. 71/// 72/// # Arguments 73/// * `token` - The OAuth bearer token to verify 74/// * `auth_base_url` - Base URL of the authorization server 75/// * `cache` - Optional cache instance (falls back to direct verification if None) 76/// 77/// # Returns 78/// * `Ok(UserInfoResponse)` - User information if the token is valid 79/// * `Err(StatusCode)` - HTTP status code indicating the failure reason 80/// - `UNAUTHORIZED` - Invalid or expired token 81/// - `INTERNAL_SERVER_ERROR` - Network or parsing errors 82/// 83/// # Cache Behavior 84/// - Cache key format: `oauth_userinfo:{token}` 85/// - TTL: 300 seconds (5 minutes) 86/// - Cache miss triggers verification with auth server 87pub async fn verify_oauth_token_cached( 88 token: &str, 89 auth_base_url: &str, 90 cache: Option<Arc<Mutex<SliceCache>>>, 91) -> Result<UserInfoResponse, StatusCode> { 92 // Try cache first if provided to avoid network round-trip 93 if let Some(cache) = &cache { 94 let cached_result = { 95 let mut cache_lock = cache.lock().await; 96 cache_lock.get_cached_oauth_userinfo(token).await 97 }; 98 99 if let Ok(Some(user_info_value)) = cached_result { 100 let user_info: UserInfoResponse = serde_json::from_value(user_info_value) 101 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 102 return Ok(user_info); 103 } 104 } 105 106 // Cache miss - verify token by calling the OAuth userinfo endpoint 107 let client = reqwest::Client::new(); 108 let userinfo_url = format!("{}/oauth/userinfo", auth_base_url); 109 110 let response = client 111 .get(&userinfo_url) 112 .header("Authorization", format!("Bearer {}", token)) 113 .send() 114 .await 115 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 116 117 if !response.status().is_success() { 118 return Err(StatusCode::UNAUTHORIZED); 119 } 120 121 let user_info: UserInfoResponse = response 122 .json() 123 .await 124 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 125 126 // Cache the validated userinfo for 5 minutes to improve performance 127 if let Some(cache) = &cache { 128 let user_info_value = 129 serde_json::to_value(&user_info).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 130 let mut cache_lock = cache.lock().await; 131 let _ = cache_lock 132 .cache_oauth_userinfo(token, &user_info_value, 300) 133 .await; 134 } 135 136 Ok(user_info) 137} 138 139/// Retrieves AT Protocol DPoP authentication credentials and PDS URL for a user. 140/// 141/// DPoP (Demonstrating Proof-of-Possession) is a security mechanism that binds tokens 142/// to specific cryptographic keys, preventing token theft and replay attacks. 143/// 144/// This function first checks the cache for existing credentials, then falls back to 145/// fetching from the auth server if needed. Results are cached for 5 minutes. 146/// 147/// # Arguments 148/// * `token` - OAuth bearer token identifying the user 149/// * `auth_base_url` - Base URL of the authorization server 150/// * `cache` - Optional cache instance (falls back to direct fetch if None) 151/// 152/// # Returns 153/// * `Ok((DPoPAuth, String))` - Tuple of (DPoP authentication object, PDS endpoint URL) 154/// * `Err(StatusCode)` - HTTP status code indicating the failure reason 155/// - `UNAUTHORIZED` - Invalid token or session expired 156/// - `INTERNAL_SERVER_ERROR` - Network, parsing, or key conversion errors 157/// 158/// # Cache Behavior 159/// - Cache key format: `atproto_session:{token}` 160/// - TTL: 300 seconds (5 minutes) 161/// - Stores serialized CachedSession with PDS URL, access token, and DPoP JWK 162pub async fn get_atproto_auth_for_user_cached( 163 token: &str, 164 auth_base_url: &str, 165 cache: Option<Arc<Mutex<SliceCache>>>, 166) -> Result<(DPoPAuth, String), StatusCode> { 167 // Try cache first if provided to avoid expensive auth server call 168 if let Some(cache) = &cache { 169 let cached_result = { 170 let mut cache_lock = cache.lock().await; 171 cache_lock.get_cached_atproto_session(token).await 172 }; 173 174 if let Ok(Some(session_value)) = cached_result { 175 let cached_session: CachedSession = serde_json::from_value(session_value) 176 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 177 178 // Reconstruct DPoP auth from cached session data 179 let dpop_jwk: WrappedJsonWebKey = serde_json::from_value(cached_session.dpop_jwk) 180 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 181 182 let dpop_private_key_data = 183 KeyData::try_from(dpop_jwk).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 184 185 let dpop_auth = DPoPAuth { 186 dpop_private_key_data, 187 oauth_access_token: cached_session.atproto_access_token, 188 }; 189 190 return Ok((dpop_auth, cached_session.pds_url)); 191 } 192 } 193 194 // Cache miss - fetch fresh session data from auth server 195 let client = reqwest::Client::new(); 196 let session_url = format!("{}/api/atprotocol/session", auth_base_url); 197 198 let session_response = client 199 .get(&session_url) 200 .header("Authorization", format!("Bearer {}", token)) 201 .send() 202 .await 203 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 204 205 if !session_response.status().is_success() { 206 return Err(StatusCode::UNAUTHORIZED); 207 } 208 209 let session_data: serde_json::Value = session_response 210 .json() 211 .await 212 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 213 214 // Extract the user's Personal Data Server endpoint URL 215 let pds_url = session_data["pds_endpoint"] 216 .as_str() 217 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)? 218 .to_string(); 219 220 // Extract the access token used for authenticating with the PDS 221 let atproto_access_token = session_data["access_token"] 222 .as_str() 223 .ok_or(StatusCode::INTERNAL_SERVER_ERROR)? 224 .to_string(); 225 226 // Extract and convert the DPoP JSON Web Key to internal key representation 227 let dpop_jwk_value = session_data["dpop_jwk"].clone(); 228 let dpop_jwk: WrappedJsonWebKey = serde_json::from_value(dpop_jwk_value.clone()) 229 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 230 231 let dpop_private_key_data = 232 KeyData::try_from(dpop_jwk).map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 233 234 let dpop_auth = DPoPAuth { 235 dpop_private_key_data, 236 oauth_access_token: atproto_access_token.clone(), 237 }; 238 239 // Cache the complete session for 5 minutes to avoid repeated auth server calls 240 if let Some(cache) = &cache { 241 let cached_session = CachedSession { 242 pds_url: pds_url.clone(), 243 atproto_access_token, 244 dpop_jwk: dpop_jwk_value, 245 }; 246 let session_value = serde_json::to_value(&cached_session) 247 .map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?; 248 let mut cache_lock = cache.lock().await; 249 let _ = cache_lock 250 .cache_atproto_session(token, &session_value, 300) 251 .await; 252 } 253 254 Ok((dpop_auth, pds_url)) 255}