+400
Diff
round #1
+400
crates/tranquil-lexicon/src/resolve.rs
+400
crates/tranquil-lexicon/src/resolve.rs
···
1
+
use crate::schema::LexiconDoc;
2
+
use hickory_resolver::TokioAsyncResolver;
3
+
use hickory_resolver::config::{ResolverConfig, ResolverOpts};
4
+
use reqwest::Client;
5
+
use std::sync::OnceLock;
6
+
use std::time::Duration;
7
+
8
+
static RESOLVER_CLIENT: OnceLock<Client> = OnceLock::new();
9
+
10
+
const MAX_RESPONSE_BYTES: usize = 512 * 1024;
11
+
12
+
fn client() -> &'static Client {
13
+
RESOLVER_CLIENT.get_or_init(|| {
14
+
Client::builder()
15
+
.timeout(Duration::from_secs(10))
16
+
.connect_timeout(Duration::from_secs(5))
17
+
.pool_max_idle_per_host(4)
18
+
.pool_idle_timeout(Duration::from_secs(60))
19
+
.redirect(reqwest::redirect::Policy::limited(3))
20
+
.build()
21
+
.expect("failed to build lexicon resolver HTTP client")
22
+
})
23
+
}
24
+
25
+
const DEFAULT_PLC_DIRECTORY: &str = "https://plc.directory";
26
+
27
+
async fn read_body_limited(resp: reqwest::Response, max_bytes: usize) -> Result<Vec<u8>, String> {
28
+
if let Some(len) = resp.content_length()
29
+
&& len > max_bytes as u64
30
+
{
31
+
return Err(format!(
32
+
"response too large: {} bytes (max {})",
33
+
len, max_bytes
34
+
));
35
+
}
36
+
37
+
let bytes = resp
38
+
.bytes()
39
+
.await
40
+
.map_err(|e| format!("failed to read response body: {}", e))?;
41
+
42
+
if bytes.len() > max_bytes {
43
+
return Err(format!(
44
+
"response too large: {} bytes (max {})",
45
+
bytes.len(),
46
+
max_bytes
47
+
));
48
+
}
49
+
50
+
Ok(bytes.to_vec())
51
+
}
52
+
53
+
#[derive(Debug, thiserror::Error)]
54
+
pub enum ResolveError {
55
+
#[error("failed to derive authority from NSID: {0}")]
56
+
InvalidNsid(String),
57
+
#[error("DNS lookup failed for {domain}: {reason}")]
58
+
DnsLookup { domain: String, reason: String },
59
+
#[error("no DID found in DNS TXT records for {domain}")]
60
+
NoDid { domain: String },
61
+
#[error("DID document fetch failed for {did}: {reason}")]
62
+
DidResolution { did: String, reason: String },
63
+
#[error("no PDS endpoint found in DID document for {did}")]
64
+
NoPdsEndpoint { did: String },
65
+
#[error("schema fetch failed from {url}: {reason}")]
66
+
SchemaFetch { url: String, reason: String },
67
+
#[error("schema deserialization failed: {0}")]
68
+
InvalidSchema(String),
69
+
#[error("schema resolution recently failed for {nsid}, cached for {ttl_secs}s")]
70
+
NegativelyCached { nsid: String, ttl_secs: u64 },
71
+
#[error("network resolution disabled")]
72
+
NetworkDisabled,
73
+
}
74
+
75
+
pub fn nsid_to_authority(nsid: &str) -> Result<String, ResolveError> {
76
+
let mut segments: Vec<&str> = nsid.split('.').collect();
77
+
if segments.len() < 3 {
78
+
return Err(ResolveError::InvalidNsid(nsid.to_string()));
79
+
}
80
+
segments.pop();
81
+
segments.reverse();
82
+
Ok(segments.join("."))
83
+
}
84
+
85
+
pub async fn resolve_did_from_dns(authority: &str) -> Result<String, ResolveError> {
86
+
let resolver = TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default());
87
+
88
+
let extract_did = |lookup: hickory_resolver::lookup::TxtLookup| -> Option<String> {
89
+
lookup
90
+
.iter()
91
+
.flat_map(|record| record.txt_data())
92
+
.find_map(|txt| {
93
+
let txt_str = String::from_utf8_lossy(txt);
94
+
txt_str.strip_prefix("did=").and_then(|did| {
95
+
let did = did.trim();
96
+
did.starts_with("did:").then(|| did.to_string())
97
+
})
98
+
})
99
+
};
100
+
101
+
let lexicon_query = format!("_lexicon.{}", authority);
102
+
if let Ok(lookup) = resolver.txt_lookup(&lexicon_query).await
103
+
&& let Some(did) = extract_did(lookup)
104
+
{
105
+
return Ok(did);
106
+
}
107
+
108
+
let atproto_query = format!("_atproto.{}", authority);
109
+
let lookup =
110
+
resolver
111
+
.txt_lookup(&atproto_query)
112
+
.await
113
+
.map_err(|e| ResolveError::DnsLookup {
114
+
domain: authority.to_string(),
115
+
reason: e.to_string(),
116
+
})?;
117
+
118
+
extract_did(lookup).ok_or(ResolveError::NoDid {
119
+
domain: authority.to_string(),
120
+
})
121
+
}
122
+
123
+
pub async fn resolve_pds_endpoint(
124
+
did: &str,
125
+
plc_directory_url: Option<&str>,
126
+
) -> Result<String, ResolveError> {
127
+
let plc_base = plc_directory_url.unwrap_or(DEFAULT_PLC_DIRECTORY);
128
+
129
+
let url = match did
130
+
.split_once(':')
131
+
.and_then(|(_, rest)| rest.split_once(':'))
132
+
{
133
+
Some(("plc", _)) => format!("{}/{}", plc_base.trim_end_matches('/'), did),
134
+
Some(("web", domain)) => format!("https://{}/.well-known/did.json", domain),
135
+
_ => {
136
+
return Err(ResolveError::DidResolution {
137
+
did: did.to_string(),
138
+
reason: "unsupported DID method".to_string(),
139
+
});
140
+
}
141
+
};
142
+
143
+
let resp = client()
144
+
.get(&url)
145
+
.send()
146
+
.await
147
+
.map_err(|e| ResolveError::DidResolution {
148
+
did: did.to_string(),
149
+
reason: e.to_string(),
150
+
})?;
151
+
152
+
let body = read_body_limited(resp, MAX_RESPONSE_BYTES)
153
+
.await
154
+
.map_err(|reason| ResolveError::DidResolution {
155
+
did: did.to_string(),
156
+
reason,
157
+
})?;
158
+
159
+
let doc: serde_json::Value =
160
+
serde_json::from_slice(&body).map_err(|e| ResolveError::DidResolution {
161
+
did: did.to_string(),
162
+
reason: e.to_string(),
163
+
})?;
164
+
165
+
extract_pds_endpoint(&doc).ok_or(ResolveError::NoPdsEndpoint {
166
+
did: did.to_string(),
167
+
})
168
+
}
169
+
170
+
fn extract_pds_endpoint(doc: &serde_json::Value) -> Option<String> {
171
+
doc.get("service")
172
+
.and_then(|s| s.as_array())
173
+
.and_then(|services| {
174
+
services.iter().find_map(|svc| {
175
+
let is_pds = svc
176
+
.get("type")
177
+
.and_then(|t| t.as_str())
178
+
.is_some_and(|t| t == "AtprotoPersonalDataServer");
179
+
is_pds
180
+
.then(|| svc.get("serviceEndpoint").and_then(|ep| ep.as_str()))?
181
+
.map(|s| s.to_string())
182
+
})
183
+
})
184
+
}
185
+
186
+
pub async fn fetch_schema_from_pds(
187
+
pds_endpoint: &str,
188
+
did: &str,
189
+
nsid: &str,
190
+
) -> Result<LexiconDoc, ResolveError> {
191
+
let url = format!(
192
+
"{}/xrpc/com.atproto.repo.getRecord?repo={}&collection=com.atproto.lexicon.schema&rkey={}",
193
+
pds_endpoint.trim_end_matches('/'),
194
+
urlencoding::encode(did),
195
+
urlencoding::encode(nsid)
196
+
);
197
+
198
+
let resp = client()
199
+
.get(&url)
200
+
.send()
201
+
.await
202
+
.map_err(|e| ResolveError::SchemaFetch {
203
+
url: url.clone(),
204
+
reason: e.to_string(),
205
+
})?;
206
+
207
+
let status = resp.status();
208
+
if !status.is_success() {
209
+
return Err(ResolveError::SchemaFetch {
210
+
url,
211
+
reason: format!("HTTP {}", status),
212
+
});
213
+
}
214
+
215
+
let body = read_body_limited(resp, MAX_RESPONSE_BYTES)
216
+
.await
217
+
.map_err(|reason| ResolveError::SchemaFetch {
218
+
url: url.clone(),
219
+
reason,
220
+
})?;
221
+
222
+
let resp_value: serde_json::Value =
223
+
serde_json::from_slice(&body).map_err(|e| ResolveError::SchemaFetch {
224
+
url: url.clone(),
225
+
reason: e.to_string(),
226
+
})?;
227
+
228
+
let value = resp_value
229
+
.get("value")
230
+
.ok_or_else(|| ResolveError::SchemaFetch {
231
+
url: url.clone(),
232
+
reason: "response missing 'value' field".to_string(),
233
+
})?;
234
+
235
+
serde_json::from_value::<LexiconDoc>(value.clone())
236
+
.map_err(|e| ResolveError::InvalidSchema(e.to_string()))
237
+
}
238
+
239
+
fn validate_fetched_schema(doc: &LexiconDoc, nsid: &str) -> Result<(), ResolveError> {
240
+
if doc.id != nsid {
241
+
return Err(ResolveError::InvalidSchema(format!(
242
+
"schema id '{}' does not match requested NSID '{}'",
243
+
doc.id, nsid
244
+
)));
245
+
}
246
+
if doc.lexicon != 1 {
247
+
return Err(ResolveError::InvalidSchema(format!(
248
+
"unsupported lexicon version: {}",
249
+
doc.lexicon
250
+
)));
251
+
}
252
+
Ok(())
253
+
}
254
+
255
+
pub async fn resolve_lexicon(nsid: &str) -> Result<LexiconDoc, ResolveError> {
256
+
resolve_lexicon_with_config(nsid, None).await
257
+
}
258
+
259
+
pub async fn resolve_lexicon_with_config(
260
+
nsid: &str,
261
+
plc_directory_url: Option<&str>,
262
+
) -> Result<LexiconDoc, ResolveError> {
263
+
let authority = nsid_to_authority(nsid)?;
264
+
tracing::debug!(nsid = nsid, authority = %authority, "resolving lexicon schema");
265
+
266
+
let did = resolve_did_from_dns(&authority).await?;
267
+
tracing::debug!(nsid = nsid, did = %did, "resolved authority DID");
268
+
269
+
let pds_endpoint = resolve_pds_endpoint(&did, plc_directory_url).await?;
270
+
tracing::debug!(nsid = nsid, pds = %pds_endpoint, "resolved PDS endpoint");
271
+
272
+
let doc = fetch_schema_from_pds(&pds_endpoint, &did, nsid).await?;
273
+
validate_fetched_schema(&doc, nsid)?;
274
+
275
+
Ok(doc)
276
+
}
277
+
278
+
pub async fn resolve_lexicon_from_did(
279
+
nsid: &str,
280
+
did: &str,
281
+
plc_directory_url: Option<&str>,
282
+
) -> Result<LexiconDoc, ResolveError> {
283
+
let pds_endpoint = resolve_pds_endpoint(did, plc_directory_url).await?;
284
+
let doc = fetch_schema_from_pds(&pds_endpoint, did, nsid).await?;
285
+
validate_fetched_schema(&doc, nsid)?;
286
+
Ok(doc)
287
+
}
288
+
289
+
#[cfg(test)]
290
+
mod tests {
291
+
use super::*;
292
+
293
+
#[test]
294
+
fn test_nsid_to_authority() {
295
+
assert_eq!(
296
+
nsid_to_authority("app.bsky.feed.post").unwrap(),
297
+
"feed.bsky.app"
298
+
);
299
+
assert_eq!(
300
+
nsid_to_authority("com.atproto.repo.strongRef").unwrap(),
301
+
"repo.atproto.com"
302
+
);
303
+
assert_eq!(
304
+
nsid_to_authority("com.germnetwork.social.post").unwrap(),
305
+
"social.germnetwork.com"
306
+
);
307
+
assert!(nsid_to_authority("tooShort").is_err());
308
+
}
309
+
310
+
#[test]
311
+
fn test_nsid_to_authority_three_segments() {
312
+
assert_eq!(
313
+
nsid_to_authority("org.example.record").unwrap(),
314
+
"example.org"
315
+
);
316
+
}
317
+
318
+
#[test]
319
+
fn test_extract_pds_endpoint_valid() {
320
+
let doc = serde_json::json!({
321
+
"service": [{
322
+
"type": "AtprotoPersonalDataServer",
323
+
"serviceEndpoint": "https://pds.example.com"
324
+
}]
325
+
});
326
+
assert_eq!(
327
+
extract_pds_endpoint(&doc),
328
+
Some("https://pds.example.com".to_string())
329
+
);
330
+
}
331
+
332
+
#[test]
333
+
fn test_extract_pds_endpoint_multiple_services() {
334
+
let doc = serde_json::json!({
335
+
"service": [
336
+
{
337
+
"type": "AtprotoLabeler",
338
+
"serviceEndpoint": "https://labeler.example.com"
339
+
},
340
+
{
341
+
"type": "AtprotoPersonalDataServer",
342
+
"serviceEndpoint": "https://pds.example.com"
343
+
}
344
+
]
345
+
});
346
+
assert_eq!(
347
+
extract_pds_endpoint(&doc),
348
+
Some("https://pds.example.com".to_string())
349
+
);
350
+
}
351
+
352
+
#[test]
353
+
fn test_extract_pds_endpoint_missing() {
354
+
let doc = serde_json::json!({
355
+
"service": [{
356
+
"type": "AtprotoLabeler",
357
+
"serviceEndpoint": "https://labeler.example.com"
358
+
}]
359
+
});
360
+
assert_eq!(extract_pds_endpoint(&doc), None);
361
+
}
362
+
363
+
#[test]
364
+
fn test_extract_pds_endpoint_no_services() {
365
+
let doc = serde_json::json!({});
366
+
assert_eq!(extract_pds_endpoint(&doc), None);
367
+
}
368
+
369
+
#[test]
370
+
fn test_validate_fetched_schema_ok() {
371
+
let doc = LexiconDoc {
372
+
lexicon: 1,
373
+
id: "com.example.thing".to_string(),
374
+
defs: Default::default(),
375
+
};
376
+
assert!(validate_fetched_schema(&doc, "com.example.thing").is_ok());
377
+
}
378
+
379
+
#[test]
380
+
fn test_validate_fetched_schema_id_mismatch() {
381
+
let doc = LexiconDoc {
382
+
lexicon: 1,
383
+
id: "com.example.other".to_string(),
384
+
defs: Default::default(),
385
+
};
386
+
let err = validate_fetched_schema(&doc, "com.example.thing").unwrap_err();
387
+
assert!(matches!(err, ResolveError::InvalidSchema(_)));
388
+
}
389
+
390
+
#[test]
391
+
fn test_validate_fetched_schema_bad_version() {
392
+
let doc = LexiconDoc {
393
+
lexicon: 99,
394
+
id: "com.example.thing".to_string(),
395
+
defs: Default::default(),
396
+
};
397
+
let err = validate_fetched_schema(&doc, "com.example.thing").unwrap_err();
398
+
assert!(matches!(err, ResolveError::InvalidSchema(_)));
399
+
}
400
+
}
History
2 rounds
0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
feat(lexicon): schema reference resolution
expand 0 comments
pull request successfully merged
oyster.cafe
submitted
#0
1 commit
expand
collapse
feat(lexicon): schema reference resolution