A library for ATProtocol identities.

Compare changes

Choose any two refs to compare.

Changed files
+501 -194
crates
atproto-identity
src
atproto-oauth
src
atproto-tap
atproto-xrpcs
atproto-xrpcs-helloworld
src
+1 -1
Cargo.toml
··· 63 63 secrecy = { version = "0.10", features = ["serde"] } 64 64 serde = { version = "1.0", features = ["derive"] } 65 65 serde_ipld_dagcbor = "0.6" 66 - serde_json = "1.0" 66 + serde_json = { version = "1.0", features = ["unbounded_depth"] } 67 67 sha2 = "0.10" 68 68 thiserror = "2.0" 69 69 tokio = { version = "1.41", features = ["macros", "rt", "rt-multi-thread"] }
+4 -4
README.md
··· 131 131 ### XRPC Service 132 132 133 133 ```rust 134 - use atproto_xrpcs::authorization::ResolvingAuthorization; 134 + use atproto_xrpcs::authorization::Authorization; 135 135 use axum::{Json, Router, extract::Query, routing::get}; 136 136 use serde::Deserialize; 137 137 use serde_json::json; ··· 143 143 144 144 async fn handle_hello( 145 145 params: Query<HelloParams>, 146 - authorization: Option<ResolvingAuthorization>, 146 + authorization: Option<Authorization>, 147 147 ) -> Json<serde_json::Value> { 148 148 let subject = params.subject.as_deref().unwrap_or("World"); 149 - 149 + 150 150 let message = if let Some(auth) = authorization { 151 151 format!("Hello, authenticated {}! (caller: {})", subject, auth.subject()) 152 152 } else { 153 153 format!("Hello, {}!", subject) 154 154 }; 155 - 155 + 156 156 Json(json!({ "message": message })) 157 157 } 158 158
+19 -1
crates/atproto-identity/src/model.rs
··· 70 70 /// The DID identifier (e.g., "did:plc:abc123"). 71 71 pub id: String, 72 72 /// Alternative identifiers like handles and domains. 73 + #[serde(default)] 73 74 pub also_known_as: Vec<String>, 74 75 /// Available services for this identity. 76 + #[serde(default)] 75 77 pub service: Vec<Service>, 76 78 77 79 /// Cryptographic verification methods. 78 - #[serde(alias = "verificationMethod")] 80 + #[serde(alias = "verificationMethod", default)] 79 81 pub verification_method: Vec<VerificationMethod>, 80 82 81 83 /// Additional document properties not explicitly defined. ··· 402 404 let document = document.unwrap(); 403 405 assert_eq!(document.id, "did:plc:cbkjy5n7bk3ax2wplmtjofq2"); 404 406 } 407 + } 408 + 409 + #[test] 410 + fn test_deserialize_service_did_document() { 411 + // DID document from api.bsky.app - a service DID without alsoKnownAs 412 + let document = serde_json::from_str::<Document>( 413 + r##"{"@context":["https://www.w3.org/ns/did/v1","https://w3id.org/security/multikey/v1"],"id":"did:web:api.bsky.app","verificationMethod":[{"id":"did:web:api.bsky.app#atproto","type":"Multikey","controller":"did:web:api.bsky.app","publicKeyMultibase":"zQ3shpRzb2NDriwCSSsce6EqGxG23kVktHZc57C3NEcuNy1jg"}],"service":[{"id":"#bsky_notif","type":"BskyNotificationService","serviceEndpoint":"https://api.bsky.app"},{"id":"#bsky_appview","type":"BskyAppView","serviceEndpoint":"https://api.bsky.app"}]}"##, 414 + ); 415 + assert!(document.is_ok(), "Failed to parse: {:?}", document.err()); 416 + 417 + let document = document.unwrap(); 418 + assert_eq!(document.id, "did:web:api.bsky.app"); 419 + assert!(document.also_known_as.is_empty()); 420 + assert_eq!(document.service.len(), 2); 421 + assert_eq!(document.service[0].id, "#bsky_notif"); 422 + assert_eq!(document.service[1].id, "#bsky_appview"); 405 423 } 406 424 }
+283
crates/atproto-oauth/src/scopes.rs
··· 38 38 Atproto, 39 39 /// Transition scope for migration operations 40 40 Transition(TransitionScope), 41 + /// Include scope for referencing permission sets by NSID 42 + Include(IncludeScope), 41 43 /// OpenID Connect scope - required for OpenID Connect authentication 42 44 OpenId, 43 45 /// Profile scope - access to user profile information ··· 91 93 Generic, 92 94 /// Email transition operations 93 95 Email, 96 + } 97 + 98 + /// Include scope for referencing permission sets by NSID 99 + #[derive(Debug, Clone, PartialEq, Eq, Hash)] 100 + pub struct IncludeScope { 101 + /// The permission set NSID (e.g., "app.example.authFull") 102 + pub nsid: String, 103 + /// Optional audience DID for inherited RPC permissions 104 + pub aud: Option<String>, 94 105 } 95 106 96 107 /// Blob scope with mime type constraints ··· 310 321 "rpc", 311 322 "atproto", 312 323 "transition", 324 + "include", 313 325 "openid", 314 326 "profile", 315 327 "email", ··· 349 361 "rpc" => Self::parse_rpc(suffix), 350 362 "atproto" => Self::parse_atproto(suffix), 351 363 "transition" => Self::parse_transition(suffix), 364 + "include" => Self::parse_include(suffix), 352 365 "openid" => Self::parse_openid(suffix), 353 366 "profile" => Self::parse_profile(suffix), 354 367 "email" => Self::parse_email(suffix), ··· 573 586 Ok(Scope::Transition(scope)) 574 587 } 575 588 589 + fn parse_include(suffix: Option<&str>) -> Result<Self, ParseError> { 590 + let (nsid, params) = match suffix { 591 + Some(s) => { 592 + if let Some(pos) = s.find('?') { 593 + (&s[..pos], Some(&s[pos + 1..])) 594 + } else { 595 + (s, None) 596 + } 597 + } 598 + None => return Err(ParseError::MissingResource), 599 + }; 600 + 601 + if nsid.is_empty() { 602 + return Err(ParseError::MissingResource); 603 + } 604 + 605 + let aud = if let Some(params) = params { 606 + let parsed_params = parse_query_string(params); 607 + parsed_params 608 + .get("aud") 609 + .and_then(|v| v.first()) 610 + .map(|s| url_decode(s)) 611 + } else { 612 + None 613 + }; 614 + 615 + Ok(Scope::Include(IncludeScope { 616 + nsid: nsid.to_string(), 617 + aud, 618 + })) 619 + } 620 + 576 621 fn parse_openid(suffix: Option<&str>) -> Result<Self, ParseError> { 577 622 if suffix.is_some() { 578 623 return Err(ParseError::InvalidResource( ··· 730 775 TransitionScope::Generic => "transition:generic".to_string(), 731 776 TransitionScope::Email => "transition:email".to_string(), 732 777 }, 778 + Scope::Include(scope) => { 779 + if let Some(ref aud) = scope.aud { 780 + format!("include:{}?aud={}", scope.nsid, url_encode(aud)) 781 + } else { 782 + format!("include:{}", scope.nsid) 783 + } 784 + } 733 785 Scope::OpenId => "openid".to_string(), 734 786 Scope::Profile => "profile".to_string(), 735 787 Scope::Email => "email".to_string(), ··· 749 801 // Other scopes don't grant transition scopes 750 802 (_, Scope::Transition(_)) => false, 751 803 (Scope::Transition(_), _) => false, 804 + // Include scopes only grant themselves (exact match including aud) 805 + (Scope::Include(a), Scope::Include(b)) => a == b, 806 + // Other scopes don't grant include scopes 807 + (_, Scope::Include(_)) => false, 808 + (Scope::Include(_), _) => false, 752 809 // OpenID Connect scopes only grant themselves 753 810 (Scope::OpenId, Scope::OpenId) => true, 754 811 (Scope::OpenId, _) => false, ··· 888 945 } 889 946 890 947 params 948 + } 949 + 950 + /// Decode a percent-encoded string 951 + fn url_decode(s: &str) -> String { 952 + let mut result = String::with_capacity(s.len()); 953 + let mut chars = s.chars().peekable(); 954 + 955 + while let Some(c) = chars.next() { 956 + if c == '%' { 957 + let hex: String = chars.by_ref().take(2).collect(); 958 + if hex.len() == 2 { 959 + if let Ok(byte) = u8::from_str_radix(&hex, 16) { 960 + result.push(byte as char); 961 + continue; 962 + } 963 + } 964 + result.push('%'); 965 + result.push_str(&hex); 966 + } else { 967 + result.push(c); 968 + } 969 + } 970 + 971 + result 972 + } 973 + 974 + /// Encode a string for use in a URL query parameter 975 + fn url_encode(s: &str) -> String { 976 + let mut result = String::with_capacity(s.len() * 3); 977 + 978 + for c in s.chars() { 979 + match c { 980 + 'A'..='Z' | 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '~' | ':' => { 981 + result.push(c); 982 + } 983 + _ => { 984 + for byte in c.to_string().as_bytes() { 985 + result.push_str(&format!("%{:02X}", byte)); 986 + } 987 + } 988 + } 989 + } 990 + 991 + result 891 992 } 892 993 893 994 /// Error type for scope parsing ··· 1921 2022 let reduced = Scope::parse_multiple_reduced("repo:app.bsky.feed.* repo:app.bsky.graph.* repo:*").unwrap(); 1922 2023 assert_eq!(reduced.len(), 1); 1923 2024 assert_eq!(reduced[0], repo_all); 2025 + } 2026 + 2027 + #[test] 2028 + fn test_include_scope_parsing() { 2029 + // Test basic include scope 2030 + let scope = Scope::parse("include:app.example.authFull").unwrap(); 2031 + assert_eq!( 2032 + scope, 2033 + Scope::Include(IncludeScope { 2034 + nsid: "app.example.authFull".to_string(), 2035 + aud: None, 2036 + }) 2037 + ); 2038 + 2039 + // Test include scope with audience 2040 + let scope = Scope::parse("include:app.example.authFull?aud=did:web:api.example.com").unwrap(); 2041 + assert_eq!( 2042 + scope, 2043 + Scope::Include(IncludeScope { 2044 + nsid: "app.example.authFull".to_string(), 2045 + aud: Some("did:web:api.example.com".to_string()), 2046 + }) 2047 + ); 2048 + 2049 + // Test include scope with URL-encoded audience (with fragment) 2050 + let scope = Scope::parse("include:app.example.authFull?aud=did:web:api.example.com%23svc_chat").unwrap(); 2051 + assert_eq!( 2052 + scope, 2053 + Scope::Include(IncludeScope { 2054 + nsid: "app.example.authFull".to_string(), 2055 + aud: Some("did:web:api.example.com#svc_chat".to_string()), 2056 + }) 2057 + ); 2058 + 2059 + // Test missing NSID 2060 + assert!(matches!( 2061 + Scope::parse("include"), 2062 + Err(ParseError::MissingResource) 2063 + )); 2064 + 2065 + // Test empty NSID with query params 2066 + assert!(matches!( 2067 + Scope::parse("include:?aud=did:example:123"), 2068 + Err(ParseError::MissingResource) 2069 + )); 2070 + } 2071 + 2072 + #[test] 2073 + fn test_include_scope_normalization() { 2074 + // Test normalization without audience 2075 + let scope = Scope::parse("include:com.example.authBasic").unwrap(); 2076 + assert_eq!(scope.to_string_normalized(), "include:com.example.authBasic"); 2077 + 2078 + // Test normalization with audience (no special chars) 2079 + let scope = Scope::parse("include:com.example.authBasic?aud=did:plc:xyz123").unwrap(); 2080 + assert_eq!( 2081 + scope.to_string_normalized(), 2082 + "include:com.example.authBasic?aud=did:plc:xyz123" 2083 + ); 2084 + 2085 + // Test normalization with URL encoding (fragment needs encoding) 2086 + let scope = Scope::parse("include:app.example.authFull?aud=did:web:api.example.com%23svc_chat").unwrap(); 2087 + let normalized = scope.to_string_normalized(); 2088 + assert_eq!( 2089 + normalized, 2090 + "include:app.example.authFull?aud=did:web:api.example.com%23svc_chat" 2091 + ); 2092 + } 2093 + 2094 + #[test] 2095 + fn test_include_scope_grants() { 2096 + let include1 = Scope::parse("include:app.example.authFull").unwrap(); 2097 + let include2 = Scope::parse("include:app.example.authBasic").unwrap(); 2098 + let include1_with_aud = Scope::parse("include:app.example.authFull?aud=did:plc:xyz").unwrap(); 2099 + let account = Scope::parse("account:email").unwrap(); 2100 + 2101 + // Include scopes only grant themselves (exact match) 2102 + assert!(include1.grants(&include1)); 2103 + assert!(!include1.grants(&include2)); 2104 + assert!(!include1.grants(&include1_with_aud)); // Different because aud differs 2105 + assert!(include1_with_aud.grants(&include1_with_aud)); 2106 + 2107 + // Include scopes don't grant other scope types 2108 + assert!(!include1.grants(&account)); 2109 + assert!(!account.grants(&include1)); 2110 + 2111 + // Include scopes don't grant atproto or transition 2112 + let atproto = Scope::parse("atproto").unwrap(); 2113 + let transition = Scope::parse("transition:generic").unwrap(); 2114 + assert!(!include1.grants(&atproto)); 2115 + assert!(!include1.grants(&transition)); 2116 + assert!(!atproto.grants(&include1)); 2117 + assert!(!transition.grants(&include1)); 2118 + } 2119 + 2120 + #[test] 2121 + fn test_parse_multiple_with_include() { 2122 + let scopes = Scope::parse_multiple("atproto include:app.example.auth repo:*").unwrap(); 2123 + assert_eq!(scopes.len(), 3); 2124 + assert_eq!(scopes[0], Scope::Atproto); 2125 + assert!(matches!(scopes[1], Scope::Include(_))); 2126 + assert!(matches!(scopes[2], Scope::Repo(_))); 2127 + 2128 + // Test with URL-encoded audience 2129 + let scopes = Scope::parse_multiple( 2130 + "include:app.example.auth?aud=did:web:api.example.com%23svc account:email" 2131 + ).unwrap(); 2132 + assert_eq!(scopes.len(), 2); 2133 + if let Scope::Include(inc) = &scopes[0] { 2134 + assert_eq!(inc.nsid, "app.example.auth"); 2135 + assert_eq!(inc.aud, Some("did:web:api.example.com#svc".to_string())); 2136 + } else { 2137 + panic!("Expected Include scope"); 2138 + } 2139 + } 2140 + 2141 + #[test] 2142 + fn test_parse_multiple_reduced_with_include() { 2143 + // Include scopes don't reduce each other (each is distinct) 2144 + let scopes = Scope::parse_multiple_reduced( 2145 + "include:app.example.auth include:app.example.other include:app.example.auth" 2146 + ).unwrap(); 2147 + assert_eq!(scopes.len(), 2); // Duplicates are removed 2148 + assert!(scopes.contains(&Scope::Include(IncludeScope { 2149 + nsid: "app.example.auth".to_string(), 2150 + aud: None, 2151 + }))); 2152 + assert!(scopes.contains(&Scope::Include(IncludeScope { 2153 + nsid: "app.example.other".to_string(), 2154 + aud: None, 2155 + }))); 2156 + 2157 + // Include scopes with different audiences are not duplicates 2158 + let scopes = Scope::parse_multiple_reduced( 2159 + "include:app.example.auth include:app.example.auth?aud=did:plc:xyz" 2160 + ).unwrap(); 2161 + assert_eq!(scopes.len(), 2); 2162 + } 2163 + 2164 + #[test] 2165 + fn test_serialize_multiple_with_include() { 2166 + let scopes = vec![ 2167 + Scope::parse("repo:*").unwrap(), 2168 + Scope::parse("include:app.example.authFull").unwrap(), 2169 + Scope::Atproto, 2170 + ]; 2171 + let result = Scope::serialize_multiple(&scopes); 2172 + assert_eq!(result, "atproto include:app.example.authFull repo:*"); 2173 + 2174 + // Test with URL-encoded audience 2175 + let scopes = vec![ 2176 + Scope::Include(IncludeScope { 2177 + nsid: "app.example.auth".to_string(), 2178 + aud: Some("did:web:api.example.com#svc".to_string()), 2179 + }), 2180 + ]; 2181 + let result = Scope::serialize_multiple(&scopes); 2182 + assert_eq!(result, "include:app.example.auth?aud=did:web:api.example.com%23svc"); 2183 + } 2184 + 2185 + #[test] 2186 + fn test_remove_scope_with_include() { 2187 + let scopes = vec![ 2188 + Scope::Atproto, 2189 + Scope::parse("include:app.example.auth").unwrap(), 2190 + Scope::parse("account:email").unwrap(), 2191 + ]; 2192 + let to_remove = Scope::parse("include:app.example.auth").unwrap(); 2193 + let result = Scope::remove_scope(&scopes, &to_remove); 2194 + assert_eq!(result.len(), 2); 2195 + assert!(!result.contains(&to_remove)); 2196 + assert!(result.contains(&Scope::Atproto)); 2197 + } 2198 + 2199 + #[test] 2200 + fn test_include_scope_roundtrip() { 2201 + // Test that parse and serialize are inverses 2202 + let original = "include:com.example.authBasicFeatures?aud=did:web:api.example.com%23svc_appview"; 2203 + let scope = Scope::parse(original).unwrap(); 2204 + let serialized = scope.to_string_normalized(); 2205 + let reparsed = Scope::parse(&serialized).unwrap(); 2206 + assert_eq!(scope, reparsed); 1924 2207 } 1925 2208 }
+3 -3
crates/atproto-tap/Cargo.toml
··· 39 39 40 40 [features] 41 41 default = [] 42 - cli = ["dep:clap", "dep:tracing-subscriber", "dep:atproto-client", "tokio/rt-multi-thread", "tokio/macros", "tokio/signal"] 42 + clap = ["dep:clap", "dep:tracing-subscriber", "dep:atproto-client", "tokio/rt-multi-thread", "tokio/macros", "tokio/signal"] 43 43 44 44 [[bin]] 45 45 name = "atproto-tap-client" 46 - required-features = ["cli"] 46 + required-features = ["clap"] 47 47 48 48 [[bin]] 49 49 name = "atproto-tap-extras" 50 - required-features = ["cli"] 50 + required-features = ["clap"] 51 51 52 52 [lints] 53 53 workspace = true
+112
crates/atproto-tap/src/events.rs
··· 7 7 //! - `serde_json::Value` for record payloads (allows lazy access) 8 8 9 9 use compact_str::CompactString; 10 + use serde::de::{self, Deserializer, IgnoredAny, MapAccess, Visitor}; 10 11 use serde::{Deserialize, Serialize, de::DeserializeOwned}; 12 + use std::fmt; 11 13 12 14 /// A TAP event received from the stream. 13 15 /// ··· 40 42 TapEvent::Record { id, .. } => *id, 41 43 TapEvent::Identity { id, .. } => *id, 42 44 } 45 + } 46 + } 47 + 48 + /// Extract only the event ID from a JSON string without fully parsing it. 49 + /// 50 + /// This is a fallback parser used when full `TapEvent` parsing fails (e.g., due to 51 + /// deeply nested records hitting serde_json's recursion limit). It uses `IgnoredAny` 52 + /// to efficiently skip over nested content without building data structures, allowing 53 + /// us to extract the ID for acknowledgment even when full parsing fails. 54 + /// 55 + /// # Example 56 + /// 57 + /// ``` 58 + /// use atproto_tap::extract_event_id; 59 + /// 60 + /// let json = r#"{"type":"record","id":12345,"record":{"deeply":"nested"}}"#; 61 + /// assert_eq!(extract_event_id(json), Some(12345)); 62 + /// ``` 63 + pub fn extract_event_id(json: &str) -> Option<u64> { 64 + let mut deserializer = serde_json::Deserializer::from_str(json); 65 + deserializer.disable_recursion_limit(); 66 + EventIdOnly::deserialize(&mut deserializer).ok().map(|e| e.id) 67 + } 68 + 69 + /// Internal struct for extracting only the "id" field from a TAP event. 70 + #[derive(Debug)] 71 + struct EventIdOnly { 72 + id: u64, 73 + } 74 + 75 + impl<'de> Deserialize<'de> for EventIdOnly { 76 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 77 + where 78 + D: Deserializer<'de>, 79 + { 80 + deserializer.deserialize_map(EventIdOnlyVisitor) 81 + } 82 + } 83 + 84 + struct EventIdOnlyVisitor; 85 + 86 + impl<'de> Visitor<'de> for EventIdOnlyVisitor { 87 + type Value = EventIdOnly; 88 + 89 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 90 + formatter.write_str("a map with an 'id' field") 91 + } 92 + 93 + fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error> 94 + where 95 + M: MapAccess<'de>, 96 + { 97 + let mut id: Option<u64> = None; 98 + 99 + while let Some(key) = map.next_key::<&str>()? { 100 + if key == "id" { 101 + id = Some(map.next_value()?); 102 + // Found what we need - skip the rest efficiently using IgnoredAny 103 + // which handles deeply nested structures without recursion issues 104 + while map.next_entry::<IgnoredAny, IgnoredAny>()?.is_some() {} 105 + break; 106 + } else { 107 + // Skip this value without fully parsing it 108 + map.next_value::<IgnoredAny>()?; 109 + } 110 + } 111 + 112 + id.map(|id| EventIdOnly { id }) 113 + .ok_or_else(|| de::Error::missing_field("id")) 43 114 } 44 115 } 45 116 ··· 372 443 }, 373 444 }; 374 445 assert_eq!(identity_event.id(), 200); 446 + } 447 + 448 + #[test] 449 + fn test_extract_event_id_simple() { 450 + let json = r#"{"type":"record","id":12345,"record":{"deeply":"nested"}}"#; 451 + assert_eq!(extract_event_id(json), Some(12345)); 452 + } 453 + 454 + #[test] 455 + fn test_extract_event_id_at_end() { 456 + let json = r#"{"type":"record","record":{"deeply":"nested"},"id":99999}"#; 457 + assert_eq!(extract_event_id(json), Some(99999)); 458 + } 459 + 460 + #[test] 461 + fn test_extract_event_id_missing() { 462 + let json = r#"{"type":"record","record":{"deeply":"nested"}}"#; 463 + assert_eq!(extract_event_id(json), None); 464 + } 465 + 466 + #[test] 467 + fn test_extract_event_id_invalid_json() { 468 + let json = r#"{"type":"record","id":123"#; // Truncated JSON 469 + assert_eq!(extract_event_id(json), None); 470 + } 471 + 472 + #[test] 473 + fn test_extract_event_id_deeply_nested() { 474 + // Create a deeply nested JSON that would exceed serde_json's default recursion limit 475 + let mut json = String::from(r#"{"id":42,"record":{"nested":"#); 476 + for _ in 0..200 { 477 + json.push_str("["); 478 + } 479 + json.push_str("1"); 480 + for _ in 0..200 { 481 + json.push_str("]"); 482 + } 483 + json.push_str("}}"); 484 + 485 + // extract_event_id should still work because it uses IgnoredAny with disabled recursion limit 486 + assert_eq!(extract_event_id(&json), Some(42)); 375 487 } 376 488 }
+1 -1
crates/atproto-tap/src/lib.rs
··· 115 115 pub use client::RepoStatus; 116 116 pub use config::{TapConfig, TapConfigBuilder}; 117 117 pub use errors::TapError; 118 - pub use events::{IdentityEvent, IdentityStatus, RecordAction, RecordEvent, TapEvent}; 118 + pub use events::{IdentityEvent, IdentityStatus, RecordAction, RecordEvent, TapEvent, extract_event_id}; 119 119 pub use stream::{TapStream, connect, connect_to};
+15 -1
crates/atproto-tap/src/stream.rs
··· 15 15 use crate::config::TapConfig; 16 16 use crate::connection::TapConnection; 17 17 use crate::errors::TapError; 18 - use crate::events::TapEvent; 18 + use crate::events::{TapEvent, extract_event_id}; 19 19 use futures::Stream; 20 20 use std::pin::Pin; 21 21 use std::sync::Arc; ··· 187 187 Err(err) => { 188 188 // Parse errors don't affect connection 189 189 tracing::warn!(error = %err, "Failed to parse TAP message"); 190 + 191 + // Try to extract just the ID using fallback parser 192 + // so we can still ack the message even if full parsing fails 193 + if config.send_acks { 194 + if let Some(event_id) = extract_event_id(&msg) { 195 + tracing::debug!(event_id, "Extracted event ID via fallback parser"); 196 + if let Err(ack_err) = conn.send_ack(event_id).await { 197 + tracing::warn!(error = %ack_err, "Failed to send ack for unparseable message"); 198 + } 199 + } else { 200 + tracing::warn!("Could not extract event ID from unparseable message"); 201 + } 202 + } 203 + 190 204 if event_tx.send(Err(TapError::ParseError(err.to_string()))).await.is_err() { 191 205 tracing::debug!("Event receiver dropped, closing connection"); 192 206 let _ = conn.close().await;
+13 -13
crates/atproto-xrpcs/README.md
··· 23 23 ### Basic XRPC Service 24 24 25 25 ```rust 26 - use atproto_xrpcs::authorization::ResolvingAuthorization; 26 + use atproto_xrpcs::authorization::Authorization; 27 27 use axum::{Json, Router, extract::Query, routing::get}; 28 28 use serde::Deserialize; 29 29 use serde_json::json; ··· 35 35 36 36 async fn handle_hello( 37 37 params: Query<HelloParams>, 38 - authorization: Option<ResolvingAuthorization>, 38 + authorization: Option<Authorization>, 39 39 ) -> Json<serde_json::Value> { 40 40 let name = params.name.as_deref().unwrap_or("World"); 41 - 41 + 42 42 let message = if authorization.is_some() { 43 43 format!("Hello, authenticated {}!", name) 44 44 } else { 45 45 format!("Hello, {}!", name) 46 46 }; 47 - 47 + 48 48 Json(json!({ "message": message })) 49 49 } 50 50 ··· 56 56 ### JWT Authorization 57 57 58 58 ```rust 59 - use atproto_xrpcs::authorization::ResolvingAuthorization; 59 + use atproto_xrpcs::authorization::Authorization; 60 60 61 61 async fn handle_secure_endpoint( 62 - authorization: ResolvingAuthorization, // Required authorization 62 + authorization: Authorization, // Required authorization 63 63 ) -> Json<serde_json::Value> { 64 - // The ResolvingAuthorization extractor automatically: 64 + // The Authorization extractor automatically: 65 65 // 1. Validates the JWT token 66 - // 2. Resolves the caller's DID document 66 + // 2. Resolves the caller's DID document 67 67 // 3. Verifies the signature against the DID document 68 68 // 4. Provides access to caller identity information 69 - 69 + 70 70 let caller_did = authorization.subject(); 71 71 Json(json!({"caller": caller_did, "status": "authenticated"})) 72 72 } ··· 79 79 use axum::{response::IntoResponse, http::StatusCode}; 80 80 81 81 async fn protected_handler( 82 - authorization: Result<ResolvingAuthorization, AuthorizationError>, 82 + authorization: Result<Authorization, AuthorizationError>, 83 83 ) -> impl IntoResponse { 84 84 match authorization { 85 85 Ok(auth) => (StatusCode::OK, "Access granted").into_response(), 86 - Err(AuthorizationError::InvalidJWTToken { .. }) => { 86 + Err(AuthorizationError::InvalidJWTFormat) => { 87 87 (StatusCode::UNAUTHORIZED, "Invalid token").into_response() 88 88 } 89 - Err(AuthorizationError::DIDDocumentResolutionFailed { .. }) => { 89 + Err(AuthorizationError::SubjectResolutionFailed { .. }) => { 90 90 (StatusCode::FORBIDDEN, "Identity verification failed").into_response() 91 91 } 92 92 Err(_) => { ··· 98 98 99 99 ## Authorization Flow 100 100 101 - The `ResolvingAuthorization` extractor implements: 101 + The `Authorization` extractor implements: 102 102 103 103 1. JWT extraction from HTTP Authorization headers 104 104 2. Token validation (signature and claims structure)
+42 -108
crates/atproto-xrpcs/src/authorization.rs
··· 1 1 //! JWT authorization extractors for XRPC services. 2 2 //! 3 - //! Axum extractors for JWT validation against DID documents with 4 - //! cached and resolving authorization modes. 3 + //! Axum extractors for JWT validation against DID documents resolved 4 + //! via an identity resolver. 5 5 6 6 use anyhow::Result; 7 7 use atproto_identity::key::identify_key; 8 - use atproto_identity::resolve::IdentityResolver; 9 - use atproto_identity::traits::DidDocumentStorage; 8 + use atproto_identity::traits::IdentityResolver; 10 9 use atproto_oauth::jwt::{Claims, Header}; 11 10 use axum::extract::{FromRef, OptionalFromRequestParts}; 12 11 use axum::http::request::Parts; ··· 17 16 18 17 use crate::errors::AuthorizationError; 19 18 20 - /// JWT authorization extractor that validates tokens against cached DID documents. 19 + /// JWT authorization extractor that validates tokens against DID documents. 21 20 /// 22 21 /// Contains JWT header, validated claims, original token, and validation status. 23 - /// Only validates against DID documents already present in storage. 22 + /// Resolves DID documents via the configured identity resolver. 23 + #[derive(Clone)] 24 24 pub struct Authorization(pub Header, pub Claims, pub String, pub bool); 25 25 26 - /// JWT authorization extractor with automatic DID document resolution. 27 - /// 28 - /// Contains JWT header, validated claims, original token, and validation status. 29 - /// Attempts to resolve missing DID documents from authoritative sources when needed. 30 - pub struct ResolvingAuthorization(pub Header, pub Claims, pub String, pub bool); 31 - 32 - impl<S> OptionalFromRequestParts<S> for Authorization 33 - where 34 - S: Send + Sync, 35 - Arc<dyn DidDocumentStorage>: FromRef<S>, 36 - { 37 - type Rejection = Infallible; 38 - 39 - async fn from_request_parts( 40 - parts: &mut Parts, 41 - state: &S, 42 - ) -> Result<Option<Self>, Self::Rejection> { 43 - let auth_header = parts 44 - .headers 45 - .get("authorization") 46 - .and_then(|value| value.to_str().ok()) 47 - .and_then(|s| s.strip_prefix("Bearer ")); 48 - 49 - let token = match auth_header { 50 - Some(token) => token.to_string(), 51 - None => { 52 - return Ok(None); 53 - } 54 - }; 55 - 56 - let did_document_storage = Arc::<dyn DidDocumentStorage>::from_ref(state); 57 - 58 - match validate_jwt(&token, did_document_storage, None).await { 59 - Ok((header, claims)) => Ok(Some(Authorization(header, claims, token, true))), 60 - Err(_) => { 61 - // Return unvalidated authorization so the handler can decide what to do 62 - let header = Header::default(); 63 - let claims = Claims::default(); 64 - Ok(Some(Authorization(header, claims, token, false))) 65 - } 26 + impl Authorization { 27 + /// identity returns the optional issuer claim of the authorization structure. 28 + pub fn identity(&self) -> Option<&str> { 29 + if self.3 { 30 + return self.1.jose.issuer.as_deref(); 66 31 } 32 + None 67 33 } 68 34 } 69 35 70 - impl<S> OptionalFromRequestParts<S> for ResolvingAuthorization 36 + impl<S> OptionalFromRequestParts<S> for Authorization 71 37 where 72 38 S: Send + Sync, 73 - Arc<dyn DidDocumentStorage>: FromRef<S>, 74 39 Arc<dyn IdentityResolver>: FromRef<S>, 75 40 { 76 41 type Rejection = Infallible; ··· 92 57 } 93 58 }; 94 59 95 - let did_document_storage = Arc::<dyn DidDocumentStorage>::from_ref(state); 96 60 let identity_resolver = Arc::<dyn IdentityResolver>::from_ref(state); 97 61 98 - match validate_jwt(&token, did_document_storage, Some(identity_resolver)).await { 99 - Ok((header, claims)) => Ok(Some(ResolvingAuthorization(header, claims, token, true))), 62 + match validate_jwt(&token, identity_resolver).await { 63 + Ok((header, claims)) => Ok(Some(Authorization(header, claims, token, true))), 100 64 Err(_) => { 101 65 // Return unvalidated authorization so the handler can decide what to do 102 66 let header = Header::default(); 103 67 let claims = Claims::default(); 104 - Ok(Some(ResolvingAuthorization(header, claims, token, false))) 68 + Ok(Some(Authorization(header, claims, token, false))) 105 69 } 106 70 } 107 71 } ··· 109 73 110 74 async fn validate_jwt( 111 75 token: &str, 112 - storage: Arc<dyn DidDocumentStorage + Send + Sync>, 113 - identity_resolver: Option<Arc<dyn IdentityResolver>>, 76 + identity_resolver: Arc<dyn IdentityResolver>, 114 77 ) -> Result<(Header, Claims)> { 115 78 // Split and decode JWT 116 79 let parts: Vec<&str> = token.split('.').collect(); ··· 134 97 .as_ref() 135 98 .ok_or_else(|| AuthorizationError::NoIssuerInClaims)?; 136 99 137 - // Try to look up DID document directly first 138 - let mut did_document = storage.get_document_by_did(issuer).await?; 139 - 140 - // If not found, try to resolve the subject 141 - if did_document.is_none() 142 - && let Some(identity_resolver) = identity_resolver 143 - { 144 - did_document = match identity_resolver.resolve(issuer).await { 145 - Ok(value) => { 146 - storage 147 - .store_document(value.clone()) 148 - .await 149 - .map_err(|err| AuthorizationError::DocumentStorageFailed { error: err })?; 150 - 151 - Some(value) 152 - } 153 - Err(err) => { 154 - return Err(AuthorizationError::SubjectResolutionFailed { 155 - issuer: issuer.to_string(), 156 - error: err, 157 - } 158 - .into()); 159 - } 160 - }; 161 - } 162 - 163 - let did_document = did_document.ok_or_else(|| AuthorizationError::DIDDocumentNotFound { 164 - issuer: issuer.to_string(), 100 + // Resolve the DID document via identity resolver 101 + let did_document = identity_resolver.resolve(issuer).await.map_err(|err| { 102 + AuthorizationError::SubjectResolutionFailed { 103 + issuer: issuer.to_string(), 104 + error: err, 105 + } 165 106 })?; 166 107 167 108 // Extract keys from DID document ··· 206 147 mod tests { 207 148 use super::*; 208 149 use atproto_identity::model::{Document, VerificationMethod}; 209 - use atproto_identity::traits::DidDocumentStorage; 210 150 use axum::extract::FromRef; 211 151 use axum::http::{Method, Request}; 212 152 use std::collections::HashMap; 213 153 214 154 #[derive(Clone)] 215 - struct MockStorage { 155 + struct MockResolver { 216 156 document: Document, 217 157 } 218 158 219 159 #[async_trait::async_trait] 220 - impl DidDocumentStorage for MockStorage { 221 - async fn get_document_by_did(&self, did: &str) -> Result<Option<Document>> { 222 - if did == self.document.id { 223 - Ok(Some(self.document.clone())) 160 + impl IdentityResolver for MockResolver { 161 + async fn resolve(&self, subject: &str) -> Result<Document> { 162 + if subject == self.document.id { 163 + Ok(self.document.clone()) 224 164 } else { 225 - Ok(None) 165 + Err(anyhow::anyhow!( 166 + "error-atproto-xrpcs-authorization-1 DID not found: {}", 167 + subject 168 + )) 226 169 } 227 170 } 228 - 229 - async fn store_document(&self, _document: Document) -> Result<()> { 230 - Ok(()) 231 - } 232 - 233 - async fn delete_document_by_did(&self, _did: &str) -> Result<()> { 234 - Ok(()) 235 - } 236 171 } 237 172 238 173 #[derive(Clone)] 239 174 struct TestState { 240 - storage: Arc<dyn DidDocumentStorage + Send + Sync>, 175 + resolver: Arc<dyn IdentityResolver>, 241 176 } 242 177 243 - impl FromRef<TestState> for Arc<dyn DidDocumentStorage> { 178 + impl FromRef<TestState> for Arc<dyn IdentityResolver> { 244 179 fn from_ref(state: &TestState) -> Self { 245 - state.storage.clone() 180 + state.resolver.clone() 246 181 } 247 182 } 248 183 ··· 266 201 extra: HashMap::new(), 267 202 }; 268 203 269 - // Create mock storage 270 - let storage = 271 - Arc::new(MockStorage { document }) as Arc<dyn DidDocumentStorage + Send + Sync>; 272 - let state = TestState { storage }; 204 + // Create mock resolver 205 + let resolver = Arc::new(MockResolver { document }) as Arc<dyn IdentityResolver>; 206 + let state = TestState { resolver }; 273 207 274 208 // Create request with Authorization header 275 209 let request = Request::builder() ··· 307 241 308 242 #[tokio::test] 309 243 async fn test_authorization_no_header() { 310 - // Create mock storage 311 - let storage = Arc::new(MockStorage { 244 + // Create mock resolver 245 + let resolver = Arc::new(MockResolver { 312 246 document: Document { 313 247 context: vec![], 314 248 id: "did:plc:test".to_string(), ··· 317 251 verification_method: vec![], 318 252 extra: HashMap::new(), 319 253 }, 320 - }) as Arc<dyn DidDocumentStorage + Send + Sync>; 321 - let state = TestState { storage }; 254 + }) as Arc<dyn IdentityResolver>; 255 + let state = TestState { resolver }; 322 256 323 257 // Create request without Authorization header 324 258 let request = Request::builder()
+5 -49
crates/atproto-xrpcs/src/errors.rs
··· 42 42 #[error("error-atproto-xrpcs-authorization-4 No issuer found in JWT claims")] 43 43 NoIssuerInClaims, 44 44 45 - /// Occurs when DID document is not found for the issuer 46 - #[error("error-atproto-xrpcs-authorization-5 DID document not found for issuer: {issuer}")] 47 - DIDDocumentNotFound { 48 - /// The issuer DID that was not found 49 - issuer: String, 50 - }, 51 - 52 45 /// Occurs when no verification keys are found in DID document 53 - #[error("error-atproto-xrpcs-authorization-6 No verification keys found in DID document")] 46 + #[error("error-atproto-xrpcs-authorization-5 No verification keys found in DID document")] 54 47 NoVerificationKeys, 55 48 56 49 /// Occurs when JWT header cannot be base64 decoded 57 - #[error("error-atproto-xrpcs-authorization-7 Failed to decode JWT header: {error}")] 50 + #[error("error-atproto-xrpcs-authorization-6 Failed to decode JWT header: {error}")] 58 51 HeaderDecodeError { 59 52 /// The underlying base64 decode error 60 53 error: base64::DecodeError, 61 54 }, 62 55 63 56 /// Occurs when JWT header cannot be parsed as JSON 64 - #[error("error-atproto-xrpcs-authorization-8 Failed to parse JWT header: {error}")] 57 + #[error("error-atproto-xrpcs-authorization-7 Failed to parse JWT header: {error}")] 65 58 HeaderParseError { 66 59 /// The underlying JSON parse error 67 60 error: serde_json::Error, 68 61 }, 69 62 70 63 /// Occurs when JWT validation fails with all available keys 71 - #[error("error-atproto-xrpcs-authorization-9 JWT validation failed with all available keys")] 64 + #[error("error-atproto-xrpcs-authorization-8 JWT validation failed with all available keys")] 72 65 ValidationFailedAllKeys, 73 66 74 67 /// Occurs when subject resolution fails during DID document lookup 75 - #[error("error-atproto-xrpcs-authorization-10 Subject resolution failed: {issuer} {error}")] 68 + #[error("error-atproto-xrpcs-authorization-9 Subject resolution failed: {issuer} {error}")] 76 69 SubjectResolutionFailed { 77 70 /// The issuer that failed to resolve 78 71 issuer: String, 79 72 /// The underlying resolution error 80 - error: anyhow::Error, 81 - }, 82 - 83 - /// Occurs when DID document lookup fails after successful resolution 84 - #[error( 85 - "error-atproto-xrpcs-authorization-11 DID document not found for resolved issuer: {resolved_did}" 86 - )] 87 - ResolvedDIDDocumentNotFound { 88 - /// The resolved DID that was not found in storage 89 - resolved_did: String, 90 - }, 91 - 92 - /// Occurs when PLC directory query fails 93 - #[error("error-atproto-xrpcs-authorization-12 PLC directory query failed: {error}")] 94 - PLCQueryFailed { 95 - /// The underlying PLC query error 96 - error: anyhow::Error, 97 - }, 98 - 99 - /// Occurs when web DID query fails 100 - #[error("error-atproto-xrpcs-authorization-13 Web DID query failed: {error}")] 101 - WebDIDQueryFailed { 102 - /// The underlying web DID query error 103 - error: anyhow::Error, 104 - }, 105 - 106 - /// Occurs when DID document storage operation fails 107 - #[error("error-atproto-xrpcs-authorization-14 DID document storage failed: {error}")] 108 - DocumentStorageFailed { 109 - /// The underlying storage error 110 - error: anyhow::Error, 111 - }, 112 - 113 - /// Occurs when input parsing fails for resolved DID 114 - #[error("error-atproto-xrpcs-authorization-15 Input parsing failed for resolved DID: {error}")] 115 - InputParsingFailed { 116 - /// The underlying parsing error 117 73 error: anyhow::Error, 118 74 }, 119 75 }
+3 -13
crates/atproto-xrpcs-helloworld/src/main.rs
··· 7 7 config::{CertificateBundles, DnsNameservers, default_env, optional_env, require_env, version}, 8 8 key::{KeyData, KeyResolver, identify_key, to_public}, 9 9 resolve::{HickoryDnsResolver, IdentityResolver, InnerIdentityResolver}, 10 - storage_lru::LruDidDocumentStorage, 11 - traits::DidDocumentStorage, 12 10 }; 13 - use atproto_xrpcs::authorization::ResolvingAuthorization; 11 + use atproto_xrpcs::authorization::Authorization; 14 12 use axum::{ 15 13 Json, Router, 16 14 extract::{FromRef, Query, State}, ··· 21 19 use http::{HeaderMap, StatusCode}; 22 20 use serde::Deserialize; 23 21 use serde_json::json; 24 - use std::{collections::HashMap, num::NonZeroUsize, ops::Deref, sync::Arc}; 22 + use std::{collections::HashMap, ops::Deref, sync::Arc}; 25 23 26 24 #[derive(Clone)] 27 25 pub struct SimpleKeyResolver { ··· 61 59 62 60 pub struct InnerWebContext { 63 61 pub http_client: reqwest::Client, 64 - pub document_storage: Arc<dyn DidDocumentStorage>, 65 62 pub key_resolver: Arc<dyn KeyResolver>, 66 63 pub service_document: ServiceDocument, 67 64 pub service_did: ServiceDID, ··· 97 94 } 98 95 } 99 96 100 - impl FromRef<WebContext> for Arc<dyn DidDocumentStorage> { 101 - fn from_ref(context: &WebContext) -> Self { 102 - context.0.document_storage.clone() 103 - } 104 - } 105 - 106 97 impl FromRef<WebContext> for Arc<dyn KeyResolver> { 107 98 fn from_ref(context: &WebContext) -> Self { 108 99 context.0.key_resolver.clone() ··· 216 207 217 208 let web_context = WebContext(Arc::new(InnerWebContext { 218 209 http_client: http_client.clone(), 219 - document_storage: Arc::new(LruDidDocumentStorage::new(NonZeroUsize::new(255).unwrap())), 220 210 key_resolver: Arc::new(SimpleKeyResolver { 221 211 keys: signing_key_storage, 222 212 }), ··· 284 274 async fn handle_xrpc_hello_world( 285 275 parameters: Query<HelloParameters>, 286 276 headers: HeaderMap, 287 - authorization: Option<ResolvingAuthorization>, 277 + authorization: Option<Authorization>, 288 278 ) -> Json<serde_json::Value> { 289 279 println!("headers {headers:?}"); 290 280 let subject = parameters.subject.as_deref().unwrap_or("World");