Our Personal Data Server from scratch! tranquil.farm
atproto pds rust postgresql fun oauth

feat(lexicon): schema reference resolution #46

merged opened by oyster.cafe targeting main from feat/real-lex-schema-validation
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mgvbqsbiej22
+400
Diff #1
+400
crates/tranquil-lexicon/src/resolve.rs
··· 1 + use crate::schema::LexiconDoc; 2 + use hickory_resolver::TokioAsyncResolver; 3 + use hickory_resolver::config::{ResolverConfig, ResolverOpts}; 4 + use reqwest::Client; 5 + use std::sync::OnceLock; 6 + use std::time::Duration; 7 + 8 + static RESOLVER_CLIENT: OnceLock<Client> = OnceLock::new(); 9 + 10 + const MAX_RESPONSE_BYTES: usize = 512 * 1024; 11 + 12 + fn client() -> &'static Client { 13 + RESOLVER_CLIENT.get_or_init(|| { 14 + Client::builder() 15 + .timeout(Duration::from_secs(10)) 16 + .connect_timeout(Duration::from_secs(5)) 17 + .pool_max_idle_per_host(4) 18 + .pool_idle_timeout(Duration::from_secs(60)) 19 + .redirect(reqwest::redirect::Policy::limited(3)) 20 + .build() 21 + .expect("failed to build lexicon resolver HTTP client") 22 + }) 23 + } 24 + 25 + const DEFAULT_PLC_DIRECTORY: &str = "https://plc.directory"; 26 + 27 + async fn read_body_limited(resp: reqwest::Response, max_bytes: usize) -> Result<Vec<u8>, String> { 28 + if let Some(len) = resp.content_length() 29 + && len > max_bytes as u64 30 + { 31 + return Err(format!( 32 + "response too large: {} bytes (max {})", 33 + len, max_bytes 34 + )); 35 + } 36 + 37 + let bytes = resp 38 + .bytes() 39 + .await 40 + .map_err(|e| format!("failed to read response body: {}", e))?; 41 + 42 + if bytes.len() > max_bytes { 43 + return Err(format!( 44 + "response too large: {} bytes (max {})", 45 + bytes.len(), 46 + max_bytes 47 + )); 48 + } 49 + 50 + Ok(bytes.to_vec()) 51 + } 52 + 53 + #[derive(Debug, thiserror::Error)] 54 + pub enum ResolveError { 55 + #[error("failed to derive authority from NSID: {0}")] 56 + InvalidNsid(String), 57 + #[error("DNS lookup failed for {domain}: {reason}")] 58 + DnsLookup { domain: String, reason: String }, 59 + #[error("no DID found in DNS TXT records for {domain}")] 60 + NoDid { domain: String }, 61 + #[error("DID document fetch failed for {did}: {reason}")] 62 + DidResolution { did: String, reason: String }, 63 + #[error("no PDS endpoint found in DID document for {did}")] 64 + NoPdsEndpoint { did: String }, 65 + #[error("schema fetch failed from {url}: {reason}")] 66 + SchemaFetch { url: String, reason: String }, 67 + #[error("schema deserialization failed: {0}")] 68 + InvalidSchema(String), 69 + #[error("schema resolution recently failed for {nsid}, cached for {ttl_secs}s")] 70 + NegativelyCached { nsid: String, ttl_secs: u64 }, 71 + #[error("network resolution disabled")] 72 + NetworkDisabled, 73 + } 74 + 75 + pub fn nsid_to_authority(nsid: &str) -> Result<String, ResolveError> { 76 + let mut segments: Vec<&str> = nsid.split('.').collect(); 77 + if segments.len() < 3 { 78 + return Err(ResolveError::InvalidNsid(nsid.to_string())); 79 + } 80 + segments.pop(); 81 + segments.reverse(); 82 + Ok(segments.join(".")) 83 + } 84 + 85 + pub async fn resolve_did_from_dns(authority: &str) -> Result<String, ResolveError> { 86 + let resolver = TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default()); 87 + 88 + let extract_did = |lookup: hickory_resolver::lookup::TxtLookup| -> Option<String> { 89 + lookup 90 + .iter() 91 + .flat_map(|record| record.txt_data()) 92 + .find_map(|txt| { 93 + let txt_str = String::from_utf8_lossy(txt); 94 + txt_str.strip_prefix("did=").and_then(|did| { 95 + let did = did.trim(); 96 + did.starts_with("did:").then(|| did.to_string()) 97 + }) 98 + }) 99 + }; 100 + 101 + let lexicon_query = format!("_lexicon.{}", authority); 102 + if let Ok(lookup) = resolver.txt_lookup(&lexicon_query).await 103 + && let Some(did) = extract_did(lookup) 104 + { 105 + return Ok(did); 106 + } 107 + 108 + let atproto_query = format!("_atproto.{}", authority); 109 + let lookup = 110 + resolver 111 + .txt_lookup(&atproto_query) 112 + .await 113 + .map_err(|e| ResolveError::DnsLookup { 114 + domain: authority.to_string(), 115 + reason: e.to_string(), 116 + })?; 117 + 118 + extract_did(lookup).ok_or(ResolveError::NoDid { 119 + domain: authority.to_string(), 120 + }) 121 + } 122 + 123 + pub async fn resolve_pds_endpoint( 124 + did: &str, 125 + plc_directory_url: Option<&str>, 126 + ) -> Result<String, ResolveError> { 127 + let plc_base = plc_directory_url.unwrap_or(DEFAULT_PLC_DIRECTORY); 128 + 129 + let url = match did 130 + .split_once(':') 131 + .and_then(|(_, rest)| rest.split_once(':')) 132 + { 133 + Some(("plc", _)) => format!("{}/{}", plc_base.trim_end_matches('/'), did), 134 + Some(("web", domain)) => format!("https://{}/.well-known/did.json", domain), 135 + _ => { 136 + return Err(ResolveError::DidResolution { 137 + did: did.to_string(), 138 + reason: "unsupported DID method".to_string(), 139 + }); 140 + } 141 + }; 142 + 143 + let resp = client() 144 + .get(&url) 145 + .send() 146 + .await 147 + .map_err(|e| ResolveError::DidResolution { 148 + did: did.to_string(), 149 + reason: e.to_string(), 150 + })?; 151 + 152 + let body = read_body_limited(resp, MAX_RESPONSE_BYTES) 153 + .await 154 + .map_err(|reason| ResolveError::DidResolution { 155 + did: did.to_string(), 156 + reason, 157 + })?; 158 + 159 + let doc: serde_json::Value = 160 + serde_json::from_slice(&body).map_err(|e| ResolveError::DidResolution { 161 + did: did.to_string(), 162 + reason: e.to_string(), 163 + })?; 164 + 165 + extract_pds_endpoint(&doc).ok_or(ResolveError::NoPdsEndpoint { 166 + did: did.to_string(), 167 + }) 168 + } 169 + 170 + fn extract_pds_endpoint(doc: &serde_json::Value) -> Option<String> { 171 + doc.get("service") 172 + .and_then(|s| s.as_array()) 173 + .and_then(|services| { 174 + services.iter().find_map(|svc| { 175 + let is_pds = svc 176 + .get("type") 177 + .and_then(|t| t.as_str()) 178 + .is_some_and(|t| t == "AtprotoPersonalDataServer"); 179 + is_pds 180 + .then(|| svc.get("serviceEndpoint").and_then(|ep| ep.as_str()))? 181 + .map(|s| s.to_string()) 182 + }) 183 + }) 184 + } 185 + 186 + pub async fn fetch_schema_from_pds( 187 + pds_endpoint: &str, 188 + did: &str, 189 + nsid: &str, 190 + ) -> Result<LexiconDoc, ResolveError> { 191 + let url = format!( 192 + "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=com.atproto.lexicon.schema&rkey={}", 193 + pds_endpoint.trim_end_matches('/'), 194 + urlencoding::encode(did), 195 + urlencoding::encode(nsid) 196 + ); 197 + 198 + let resp = client() 199 + .get(&url) 200 + .send() 201 + .await 202 + .map_err(|e| ResolveError::SchemaFetch { 203 + url: url.clone(), 204 + reason: e.to_string(), 205 + })?; 206 + 207 + let status = resp.status(); 208 + if !status.is_success() { 209 + return Err(ResolveError::SchemaFetch { 210 + url, 211 + reason: format!("HTTP {}", status), 212 + }); 213 + } 214 + 215 + let body = read_body_limited(resp, MAX_RESPONSE_BYTES) 216 + .await 217 + .map_err(|reason| ResolveError::SchemaFetch { 218 + url: url.clone(), 219 + reason, 220 + })?; 221 + 222 + let resp_value: serde_json::Value = 223 + serde_json::from_slice(&body).map_err(|e| ResolveError::SchemaFetch { 224 + url: url.clone(), 225 + reason: e.to_string(), 226 + })?; 227 + 228 + let value = resp_value 229 + .get("value") 230 + .ok_or_else(|| ResolveError::SchemaFetch { 231 + url: url.clone(), 232 + reason: "response missing 'value' field".to_string(), 233 + })?; 234 + 235 + serde_json::from_value::<LexiconDoc>(value.clone()) 236 + .map_err(|e| ResolveError::InvalidSchema(e.to_string())) 237 + } 238 + 239 + fn validate_fetched_schema(doc: &LexiconDoc, nsid: &str) -> Result<(), ResolveError> { 240 + if doc.id != nsid { 241 + return Err(ResolveError::InvalidSchema(format!( 242 + "schema id '{}' does not match requested NSID '{}'", 243 + doc.id, nsid 244 + ))); 245 + } 246 + if doc.lexicon != 1 { 247 + return Err(ResolveError::InvalidSchema(format!( 248 + "unsupported lexicon version: {}", 249 + doc.lexicon 250 + ))); 251 + } 252 + Ok(()) 253 + } 254 + 255 + pub async fn resolve_lexicon(nsid: &str) -> Result<LexiconDoc, ResolveError> { 256 + resolve_lexicon_with_config(nsid, None).await 257 + } 258 + 259 + pub async fn resolve_lexicon_with_config( 260 + nsid: &str, 261 + plc_directory_url: Option<&str>, 262 + ) -> Result<LexiconDoc, ResolveError> { 263 + let authority = nsid_to_authority(nsid)?; 264 + tracing::debug!(nsid = nsid, authority = %authority, "resolving lexicon schema"); 265 + 266 + let did = resolve_did_from_dns(&authority).await?; 267 + tracing::debug!(nsid = nsid, did = %did, "resolved authority DID"); 268 + 269 + let pds_endpoint = resolve_pds_endpoint(&did, plc_directory_url).await?; 270 + tracing::debug!(nsid = nsid, pds = %pds_endpoint, "resolved PDS endpoint"); 271 + 272 + let doc = fetch_schema_from_pds(&pds_endpoint, &did, nsid).await?; 273 + validate_fetched_schema(&doc, nsid)?; 274 + 275 + Ok(doc) 276 + } 277 + 278 + pub async fn resolve_lexicon_from_did( 279 + nsid: &str, 280 + did: &str, 281 + plc_directory_url: Option<&str>, 282 + ) -> Result<LexiconDoc, ResolveError> { 283 + let pds_endpoint = resolve_pds_endpoint(did, plc_directory_url).await?; 284 + let doc = fetch_schema_from_pds(&pds_endpoint, did, nsid).await?; 285 + validate_fetched_schema(&doc, nsid)?; 286 + Ok(doc) 287 + } 288 + 289 + #[cfg(test)] 290 + mod tests { 291 + use super::*; 292 + 293 + #[test] 294 + fn test_nsid_to_authority() { 295 + assert_eq!( 296 + nsid_to_authority("app.bsky.feed.post").unwrap(), 297 + "feed.bsky.app" 298 + ); 299 + assert_eq!( 300 + nsid_to_authority("com.atproto.repo.strongRef").unwrap(), 301 + "repo.atproto.com" 302 + ); 303 + assert_eq!( 304 + nsid_to_authority("com.germnetwork.social.post").unwrap(), 305 + "social.germnetwork.com" 306 + ); 307 + assert!(nsid_to_authority("tooShort").is_err()); 308 + } 309 + 310 + #[test] 311 + fn test_nsid_to_authority_three_segments() { 312 + assert_eq!( 313 + nsid_to_authority("org.example.record").unwrap(), 314 + "example.org" 315 + ); 316 + } 317 + 318 + #[test] 319 + fn test_extract_pds_endpoint_valid() { 320 + let doc = serde_json::json!({ 321 + "service": [{ 322 + "type": "AtprotoPersonalDataServer", 323 + "serviceEndpoint": "https://pds.example.com" 324 + }] 325 + }); 326 + assert_eq!( 327 + extract_pds_endpoint(&doc), 328 + Some("https://pds.example.com".to_string()) 329 + ); 330 + } 331 + 332 + #[test] 333 + fn test_extract_pds_endpoint_multiple_services() { 334 + let doc = serde_json::json!({ 335 + "service": [ 336 + { 337 + "type": "AtprotoLabeler", 338 + "serviceEndpoint": "https://labeler.example.com" 339 + }, 340 + { 341 + "type": "AtprotoPersonalDataServer", 342 + "serviceEndpoint": "https://pds.example.com" 343 + } 344 + ] 345 + }); 346 + assert_eq!( 347 + extract_pds_endpoint(&doc), 348 + Some("https://pds.example.com".to_string()) 349 + ); 350 + } 351 + 352 + #[test] 353 + fn test_extract_pds_endpoint_missing() { 354 + let doc = serde_json::json!({ 355 + "service": [{ 356 + "type": "AtprotoLabeler", 357 + "serviceEndpoint": "https://labeler.example.com" 358 + }] 359 + }); 360 + assert_eq!(extract_pds_endpoint(&doc), None); 361 + } 362 + 363 + #[test] 364 + fn test_extract_pds_endpoint_no_services() { 365 + let doc = serde_json::json!({}); 366 + assert_eq!(extract_pds_endpoint(&doc), None); 367 + } 368 + 369 + #[test] 370 + fn test_validate_fetched_schema_ok() { 371 + let doc = LexiconDoc { 372 + lexicon: 1, 373 + id: "com.example.thing".to_string(), 374 + defs: Default::default(), 375 + }; 376 + assert!(validate_fetched_schema(&doc, "com.example.thing").is_ok()); 377 + } 378 + 379 + #[test] 380 + fn test_validate_fetched_schema_id_mismatch() { 381 + let doc = LexiconDoc { 382 + lexicon: 1, 383 + id: "com.example.other".to_string(), 384 + defs: Default::default(), 385 + }; 386 + let err = validate_fetched_schema(&doc, "com.example.thing").unwrap_err(); 387 + assert!(matches!(err, ResolveError::InvalidSchema(_))); 388 + } 389 + 390 + #[test] 391 + fn test_validate_fetched_schema_bad_version() { 392 + let doc = LexiconDoc { 393 + lexicon: 99, 394 + id: "com.example.thing".to_string(), 395 + defs: Default::default(), 396 + }; 397 + let err = validate_fetched_schema(&doc, "com.example.thing").unwrap_err(); 398 + assert!(matches!(err, ResolveError::InvalidSchema(_))); 399 + } 400 + }

History

2 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
feat(lexicon): schema reference resolution
expand 0 comments
pull request successfully merged
1 commit
expand
feat(lexicon): schema reference resolution
expand 0 comments