···100 UrlParseError(#[from] url::ParseError),
101 #[error(transparent)]
102 ReqwestError(#[from] reqwest::Error),
103+ #[error(transparent)]
104+ InvalidHeader(#[from] reqwest::header::InvalidHeaderValue),
105+ #[error(transparent)]
106+ IdentityError(#[from] IdentityError),
107+ #[error("upstream service could not be resolved")]
108+ ServiceNotFound,
109+ #[error("upstream service was found but no services matched")]
110+ ServiceNotMatched,
111}
+208-16
slingshot/src/identity.rs
···1718use crate::error::IdentityError;
19use atrium_api::{
20- did_doc::DidDocument,
21 types::string::{Did, Handle},
22};
23use atrium_common::resolver::Resolver;
···41pub enum IdentityKey {
42 Handle(Handle),
43 Did(Did),
044}
4546impl IdentityKey {
···48 let s = match self {
49 IdentityKey::Handle(h) => h.as_str(),
50 IdentityKey::Did(d) => d.as_str(),
051 };
52 std::mem::size_of::<Self>() + std::mem::size_of_val(s)
53 }
···59#[derive(Debug, Serialize, Deserialize)]
60enum IdentityData {
61 NotFound,
62- Did(Did),
63- Doc(PartialMiniDoc),
064}
6566impl IdentityVal {
···71 IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
72 IdentityData::Doc(d) => {
73 std::mem::size_of_val(d.unverified_handle.as_str())
74- + std::mem::size_of_val(d.pds.as_str())
75- + std::mem::size_of_val(d.signing_key.as_str())
000000000076 }
77 };
78 wrapping + inner
···168 }
169}
17000000000000000000000000000000000000000000000000000000000171/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
172///
173/// the hashset allows testing for presense of items in the queue.
···296 let now = UtcDateTime::now();
297 let IdentityVal(last_fetch, data) = entry.value();
298 match data {
299- IdentityData::Doc(_) => {
300- log::error!("identity value mixup: got a doc from a handle key (should be a did)");
301- Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
302- }
303 IdentityData::NotFound => {
304 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
305 metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···313 self.queue_refresh(key).await;
314 }
315 Ok(Some(did.clone()))
0000316 }
317 }
318 }
···362 let now = UtcDateTime::now();
363 let IdentityVal(last_fetch, data) = entry.value();
364 match data {
365- IdentityData::Did(_) => {
366- log::error!("identity value mixup: got a did from a did key (should be a doc)");
367- Err(IdentityError::IdentityValTypeMixup(did.to_string()))
368- }
369 IdentityData::NotFound => {
370 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
371 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···373 }
374 Ok(None)
375 }
376- IdentityData::Doc(mini_did) => {
377 if (now - *last_fetch) >= MIN_TTL {
378 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
379 self.queue_refresh(key).await;
380 }
381- Ok(Some(mini_did.clone()))
000000000000000000000000000000000000000000000000000000000000000000000000000382 }
383 }
384 }
···519 log::warn!(
520 "refreshed did doc failed: wrong did doc id. dropping refresh."
521 );
0522 continue;
523 }
524 let mini_doc = match did_doc.try_into() {
···526 Err(e) => {
527 metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
528 log::warn!(
529- "converting mini doc failed: {e:?}. dropping refresh."
530 );
0531 continue;
532 }
533 };
···554 }
555556 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
00000000000000000000000000000000000000000000000000557 }
558 }
559 }
···1718use crate::error::IdentityError;
19use atrium_api::{
20+ did_doc::{DidDocument, Service as DidDocServic},
21 types::string::{Did, Handle},
22};
23use atrium_common::resolver::Resolver;
···41pub enum IdentityKey {
42 Handle(Handle),
43 Did(Did),
44+ ServiceDid(Did),
45}
4647impl IdentityKey {
···49 let s = match self {
50 IdentityKey::Handle(h) => h.as_str(),
51 IdentityKey::Did(d) => d.as_str(),
52+ IdentityKey::ServiceDid(d) => d.as_str(),
53 };
54 std::mem::size_of::<Self>() + std::mem::size_of_val(s)
55 }
···61#[derive(Debug, Serialize, Deserialize)]
62enum IdentityData {
63 NotFound,
64+ Did(Did), // from handle
65+ Doc(PartialMiniDoc), // from did
66+ ServiceDoc(MiniServiceDoc), // from service did
67}
6869impl IdentityVal {
···74 IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
75 IdentityData::Doc(d) => {
76 std::mem::size_of_val(d.unverified_handle.as_str())
77+ + std::mem::size_of_val(&d.pds)
78+ + std::mem::size_of_val(&d.signing_key)
79+ }
80+ IdentityData::ServiceDoc(d) => {
81+ let mut s = std::mem::size_of::<MiniServiceDoc>();
82+ s += std::mem::size_of_val(&d.services);
83+ for sv in &d.services {
84+ s += std::mem::size_of_val(&sv.full_id);
85+ s += std::mem::size_of_val(&sv.r#type);
86+ s += std::mem::size_of_val(&sv.endpoint);
87+ }
88+ s
89 }
90 };
91 wrapping + inner
···181 }
182}
183184+/// Simplified info from service DID docs
185+#[derive(Debug, Clone, Serialize, Deserialize)]
186+pub struct MiniServiceDoc {
187+ services: Vec<MiniService>,
188+}
189+190+impl MiniServiceDoc {
191+ pub fn get(&self, id_fragment: &str, service_type: Option<&str>) -> Option<&MiniService> {
192+ self.services.iter().find(|ms| {
193+ ms.full_id.ends_with(id_fragment)
194+ && service_type.map(|t| t == ms.r#type).unwrap_or(true)
195+ })
196+ }
197+}
198+199+/// The corresponding service info
200+#[derive(Debug, Clone, Serialize, Deserialize)]
201+pub struct MiniService {
202+ /// The full id
203+ ///
204+ /// for informational purposes only -- services are deduplicated by id fragment
205+ full_id: String,
206+ r#type: String,
207+ /// HTTP endpoint for the actual service
208+ pub endpoint: String,
209+}
210+211+impl TryFrom<DidDocument> for MiniServiceDoc {
212+ type Error = String;
213+ fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
214+ let mut services = Vec::new();
215+ let mut seen = HashSet::new();
216+217+ for DidDocServic {
218+ id,
219+ r#type,
220+ service_endpoint,
221+ } in did_doc.service.unwrap_or(vec![])
222+ {
223+ let Some((_, id_fragment)) = id.rsplit_once('#') else {
224+ continue;
225+ };
226+ if !seen.insert((id_fragment.to_string(), r#type.clone())) {
227+ continue;
228+ }
229+ services.push(MiniService {
230+ full_id: id,
231+ r#type,
232+ endpoint: service_endpoint,
233+ });
234+ }
235+236+ Ok(Self { services })
237+ }
238+}
239+240/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
241///
242/// the hashset allows testing for presense of items in the queue.
···365 let now = UtcDateTime::now();
366 let IdentityVal(last_fetch, data) = entry.value();
367 match data {
0000368 IdentityData::NotFound => {
369 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
370 metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···378 self.queue_refresh(key).await;
379 }
380 Ok(Some(did.clone()))
381+ }
382+ _ => {
383+ log::error!("identity value mixup: got a doc from a handle key (should be a did)");
384+ Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
385 }
386 }
387 }
···431 let now = UtcDateTime::now();
432 let IdentityVal(last_fetch, data) = entry.value();
433 match data {
0000434 IdentityData::NotFound => {
435 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
436 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···438 }
439 Ok(None)
440 }
441+ IdentityData::Doc(mini_doc) => {
442 if (now - *last_fetch) >= MIN_TTL {
443 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
444 self.queue_refresh(key).await;
445 }
446+ Ok(Some(mini_doc.clone()))
447+ }
448+ _ => {
449+ log::error!("identity value mixup: got a doc from a handle key (should be a did)");
450+ Err(IdentityError::IdentityValTypeMixup(did.to_string()))
451+ }
452+ }
453+ }
454+455+ /// Fetch (and cache) a service mini doc from a did
456+ pub async fn did_to_mini_service_doc(
457+ &self,
458+ did: &Did,
459+ ) -> Result<Option<MiniServiceDoc>, IdentityError> {
460+ let key = IdentityKey::ServiceDid(did.clone());
461+ metrics::counter!("slingshot_get_service_did_doc").increment(1);
462+ let entry = self
463+ .cache
464+ .get_or_fetch(&key, {
465+ let did = did.clone();
466+ let resolver = self.did_resolver.clone();
467+ || async move {
468+ let t0 = Instant::now();
469+ let (res, success) = match resolver.resolve(&did).await {
470+ Ok(did_doc) if did_doc.id != did.to_string() => (
471+ // TODO: fix in atrium: should verify id is did
472+ Err(IdentityError::BadDidDoc(
473+ "did doc's id did not match did".to_string(),
474+ )),
475+ "false",
476+ ),
477+ Ok(did_doc) => match did_doc.try_into() {
478+ Ok(mini_service_doc) => (
479+ Ok(IdentityVal(
480+ UtcDateTime::now(),
481+ IdentityData::ServiceDoc(mini_service_doc),
482+ )),
483+ "true",
484+ ),
485+ Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"),
486+ },
487+ Err(atrium_identity::Error::NotFound) => (
488+ Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)),
489+ "false",
490+ ),
491+ Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"),
492+ };
493+ metrics::histogram!("slingshot_fetch_service_did_doc", "success" => success)
494+ .record(t0.elapsed());
495+ res
496+ }
497+ })
498+ .await?;
499+500+ let now = UtcDateTime::now();
501+ let IdentityVal(last_fetch, data) = entry.value();
502+ match data {
503+ IdentityData::NotFound => {
504+ if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
505+ metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
506+ self.queue_refresh(key).await;
507+ }
508+ Ok(None)
509+ }
510+ IdentityData::ServiceDoc(mini_service_doc) => {
511+ if (now - *last_fetch) >= MIN_TTL {
512+ metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
513+ self.queue_refresh(key).await;
514+ }
515+ Ok(Some(mini_service_doc.clone()))
516+ }
517+ _ => {
518+ log::error!(
519+ "identity value mixup: got a doc from a different key type (should be a service did)"
520+ );
521+ Err(IdentityError::IdentityValTypeMixup(did.to_string()))
522 }
523 }
524 }
···659 log::warn!(
660 "refreshed did doc failed: wrong did doc id. dropping refresh."
661 );
662+ self.complete_refresh(&task_key).await?;
663 continue;
664 }
665 let mini_doc = match did_doc.try_into() {
···667 Err(e) => {
668 metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
669 log::warn!(
670+ "converting mini doc for {did:?} failed: {e:?}. dropping refresh."
671 );
672+ self.complete_refresh(&task_key).await?;
673 continue;
674 }
675 };
···696 }
697698 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
699+ }
700+ IdentityKey::ServiceDid(ref did) => {
701+ log::trace!("refreshing service did doc: {did:?}");
702+703+ match self.did_resolver.resolve(did).await {
704+ Ok(did_doc) => {
705+ // TODO: fix in atrium: should verify id is did
706+ if did_doc.id != did.to_string() {
707+ metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "wrong did").increment(1);
708+ log::warn!(
709+ "refreshed did doc failed: wrong did doc id. dropping refresh."
710+ );
711+ self.complete_refresh(&task_key).await?;
712+ continue;
713+ }
714+ let mini_service_doc = match did_doc.try_into() {
715+ Ok(md) => md,
716+ Err(e) => {
717+ metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
718+ log::warn!(
719+ "converting mini service doc failed: {e:?}. dropping refresh."
720+ );
721+ self.complete_refresh(&task_key).await?;
722+ continue;
723+ }
724+ };
725+ metrics::counter!("identity_service_did_refresh", "success" => "true")
726+ .increment(1);
727+ self.cache.insert(
728+ task_key.clone(),
729+ IdentityVal(
730+ UtcDateTime::now(),
731+ IdentityData::ServiceDoc(mini_service_doc),
732+ ),
733+ );
734+ }
735+ Err(atrium_identity::Error::NotFound) => {
736+ metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "not found").increment(1);
737+ self.cache.insert(
738+ task_key.clone(),
739+ IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
740+ );
741+ }
742+ Err(err) => {
743+ metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "other").increment(1);
744+ log::warn!(
745+ "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
746+ );
747+ }
748+ }
749 }
750 }
751 }
+61-25
slingshot/src/main.rs
···1-// use foyer::HybridCache;
2-// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3use metrics_exporter_prometheus::PrometheusBuilder;
4use slingshot::{
5 Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
···910use clap::Parser;
11use tokio_util::sync::CancellationToken;
01213/// Slingshot record edge cache
14#[derive(Parser, Debug, Clone)]
···48 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
49 #[clap(default_value_t = 1)]
50 identity_cache_disk_gb: usize,
0000051 /// the domain pointing to this server
52 ///
53 /// if present:
54 /// - a did:web document will be served at /.well-known/did.json
55- /// - an HTTPS certs will be automatically configured with Acme/letsencrypt
0000056 /// - TODO: a rate-limiter will be installed
57 #[arg(
58 long,
59 conflicts_with("bind"),
60- requires("acme_cache_path"),
61- env = "SLINGSHOT_ACME_DOMAIN"
00000000000000000000062 )]
63- acme_domain: Option<String>,
64 /// email address for letsencrypt contact
65 ///
66 /// recommended in production, i guess?
67- #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")]
68 acme_contact: Option<String>,
69- /// a location to cache acme https certs
70 ///
71- /// required when (and only used when) --acme-domain is specified.
72- ///
73- /// recommended in production, but mind the file permissions.
74- #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")]
75- acme_cache_path: Option<PathBuf>,
76- /// listen for ipv6 when using acme
77- ///
78- /// you must also configure the relevant DNS records for this to work
79- #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")]
80- acme_ipv6: bool,
81 /// an web address to send healtcheck pings to every ~51s or so
82 #[arg(long, env = "SLINGSHOT_HEALTHCHECK")]
83 healthcheck: Option<String>,
···101102 let args = Args::parse();
103000000000104 if args.collect_metrics {
105 log::trace!("installing metrics server...");
106 if let Err(e) = install_metrics_server(args.bind_metrics) {
···152 log::info!("identity service ready.");
153154 let repo = Repo::new(identity.clone());
155- let proxy = Proxy::new(repo.clone());
156157 let identity_for_server = identity.clone();
158 let server_shutdown = shutdown.clone();
···164 identity_for_server,
165 repo,
166 proxy,
167- args.acme_domain,
0000168 args.acme_contact,
169- args.acme_cache_path,
170- args.acme_ipv6,
171 server_shutdown,
172 bind,
173 )
···236) -> Result<(), metrics_exporter_prometheus::BuildError> {
237 log::info!("installing metrics server...");
238 PrometheusBuilder::new()
239- .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
240- .set_bucket_duration(std::time::Duration::from_secs(300))?
241- .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
242 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
243 .with_http_listener(bind_metrics)
244 .install()?;
···001use metrics_exporter_prometheus::PrometheusBuilder;
2use slingshot::{
3 Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
···78use clap::Parser;
9use tokio_util::sync::CancellationToken;
10+use url::Url;
1112/// Slingshot record edge cache
13#[derive(Parser, Debug, Clone)]
···47 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
48 #[clap(default_value_t = 1)]
49 identity_cache_disk_gb: usize,
50+ /// the address of this server
51+ ///
52+ /// used if --acme-domain is not set, defaulting to `--bind`
53+ #[arg(long, conflicts_with("tls_domain"), env = "SLINGSHOT_PUBLIC_HOST")]
54+ base_url: Option<Url>,
55 /// the domain pointing to this server
56 ///
57 /// if present:
58 /// - a did:web document will be served at /.well-known/did.json
59+ /// - the server will bind on port 443
60+ /// - if `--acme-contact` is present, the server will bind port 80 for http
61+ /// challenges and attempt to auto-provision certs for `--tls-domain`
62+ /// - if `--acme-contact is absent, the server will load certs from the
63+ /// `--tls-certs` folder, and try to reload them twice daily, guarded by
64+ /// a lock file called `.cert-lock` in the `--tls-certs` folder.
65 /// - TODO: a rate-limiter will be installed
66 #[arg(
67 long,
68 conflicts_with("bind"),
69+ requires("tls_certs"),
70+ env = "SLINGSHOT_TLS_DOMAIN"
71+ )]
72+ tls_domain: Option<String>,
73+ /// a location to find/cache acme or other tls certs
74+ ///
75+ /// recommended in production, mind the file permissions.
76+ #[arg(long, env = "SLINGSHOT_TLS_CERTS_PATH")]
77+ tls_certs: Option<PathBuf>,
78+ /// listen for ipv6 when using acme or other tls
79+ ///
80+ /// you must also configure the relevant DNS records for this to work
81+ #[arg(long, action, requires("tls_domain"), env = "SLINGSHOT_TLS_IPV6")]
82+ tls_ipv6: bool,
83+ /// redirect acme http-01 challenges to this url
84+ ///
85+ /// useful if you're setting up a second instance that synchronizes its
86+ /// certs from a main instance doing acme.
87+ #[arg(
88+ long,
89+ conflicts_with("acme_contact"),
90+ requires("tls_domain"),
91+ env = "SLINGSHOT_ACME_CHALLENGE_REDIRECT"
92 )]
93+ acme_challenge_redirect: Option<String>,
94 /// email address for letsencrypt contact
95 ///
96 /// recommended in production, i guess?
97+ #[arg(long, requires("tls_domain"), env = "SLINGSHOT_ACME_CONTACT")]
98 acme_contact: Option<String>,
99+ /// use the staging environment for letsencrypt
100 ///
101+ /// recommended to initially test out new deployments with this to avoid
102+ /// letsencrypt rate limit problems.
103+ #[arg(long, action, requires("acme_contact"), env = "SLINGSHOT_ACME_STAGING")]
104+ acme_staging: bool,
000000105 /// an web address to send healtcheck pings to every ~51s or so
106 #[arg(long, env = "SLINGSHOT_HEALTHCHECK")]
107 healthcheck: Option<String>,
···125126 let args = Args::parse();
127128+ let base_url: Url = args
129+ .base_url
130+ .or_else(|| {
131+ args.tls_domain
132+ .as_ref()
133+ .map(|d| Url::parse(&format!("https://{d}")).unwrap())
134+ })
135+ .unwrap_or_else(|| Url::parse(&format!("http://{}", args.bind)).unwrap());
136+137 if args.collect_metrics {
138 log::trace!("installing metrics server...");
139 if let Err(e) = install_metrics_server(args.bind_metrics) {
···185 log::info!("identity service ready.");
186187 let repo = Repo::new(identity.clone());
188+ let proxy = Proxy::new(identity.clone());
189190 let identity_for_server = identity.clone();
191 let server_shutdown = shutdown.clone();
···197 identity_for_server,
198 repo,
199 proxy,
200+ base_url,
201+ args.tls_domain,
202+ args.tls_certs,
203+ args.tls_ipv6,
204+ args.acme_challenge_redirect,
205 args.acme_contact,
206+ args.acme_staging,
0207 server_shutdown,
208 bind,
209 )
···272) -> Result<(), metrics_exporter_prometheus::BuildError> {
273 log::info!("installing metrics server...");
274 PrometheusBuilder::new()
275+ .set_buckets(&[0.001, 0.006, 0.036, 0.216, 1.296, 7.776, 45.656])?
276+ .set_bucket_duration(std::time::Duration::from_secs(15))?
277+ .set_bucket_count(std::num::NonZero::new(4).unwrap()) // count * duration = bucket lifetime
278 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
279 .with_http_listener(bind_metrics)
280 .install()?;
+251-155
slingshot/src/proxy.rs
···1-use serde::Deserialize;
2-use url::Url;
3-use std::{collections::HashMap, time::Duration};
4-use crate::{Repo, server::HydrationSource, error::ProxyError};
5use reqwest::Client;
6use serde_json::{Map, Value};
0078pub enum ParamValue {
9 String(Vec<String>),
···13pub struct Params(HashMap<String, ParamValue>);
1415impl TryFrom<Map<String, Value>> for Params {
16- type Error = (); // TODO
17 fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18 let mut out = HashMap::new();
19 for (k, v) in val {
···7071#[derive(Clone)]
72pub struct Proxy {
73- repo: Repo,
74 client: Client,
75}
7677impl Proxy {
78- pub fn new(repo: Repo) -> Self {
79 let client = Client::builder()
80 .user_agent(format!(
81 "microcosm slingshot v{} (contact: @bad-example.com)",
···85 .timeout(Duration::from_secs(6))
86 .build()
87 .unwrap();
88- Self { repo, client }
89 }
9091 pub async fn proxy(
92 &self,
93- xrpc: String,
94- service: String,
00095 params: Option<Map<String, Value>>,
96 ) -> Result<Value, ProxyError> {
97-98- // hackin it to start
99-100- // 1. assume did-web (TODO) and get the did doc
101- #[derive(Debug, Deserialize)]
102- struct ServiceDoc {
103- id: String,
104- service: Vec<ServiceItem>,
105- }
106- #[derive(Debug, Deserialize)]
107- struct ServiceItem {
108- id: String,
109- #[expect(unused)]
110- r#type: String,
111- #[serde(rename = "serviceEndpoint")]
112- service_endpoint: Url,
113- }
114- let dw = service.strip_prefix("did:web:").expect("a did web");
115- let (dw, service_id) = dw.split_once("#").expect("whatever");
116- let mut dw_url = Url::parse(&format!("https://{dw}"))?;
117- dw_url.set_path("/.well-known/did.json");
118- let doc: ServiceDoc = self.client
119- .get(dw_url)
120- .send()
121 .await?
122- .error_for_status()?
123- .json()
124- .await?;
00125126- assert_eq!(doc.id, format!("did:web:{}", dw));
127-128- let mut upstream = None;
129- for ServiceItem { id, service_endpoint, .. } in doc.service {
130- let Some((_, id)) = id.split_once("#") else { continue; };
131- if id != service_id { continue; };
132- upstream = Some(service_endpoint);
133- break;
134- }
135-136- // 2. proxy the request forward
137- let mut upstream = upstream.expect("to find it");
138- upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid
139140 if let Some(params) = params {
141 let mut query = upstream.query_pairs_mut();
···161 }
162 }
163164- // TODO: other headers to proxy
165- Ok(self.client
00000000000166 .get(upstream)
0167 .send()
168- .await?
169- .error_for_status()?
170- .json()
171- .await?)
00000000172 }
173}
174···188 while let Some((i, c)) = chars.next() {
189 match c {
190 '[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
191- '[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")),
00192 '[' => in_bracket = true,
193 ']' if in_bracket => {
194 in_bracket = false;
195 let key = std::mem::take(&mut key_acc);
196 let r#type = std::mem::take(&mut type_acc);
197- let t = if r#type.is_empty() { None } else { Some(r#type) };
0000198 out.push(PathPart::Vector(key, t));
199 // peek ahead because we need a dot after array if there's more and i don't want to add more loop state
200 let Some((i, c)) = chars.next() else {
201 break;
202 };
203 if c != '.' {
204- return Err(format!("expected dot after close bracket, found {c:?} at {i}"));
00205 }
206 }
207 ']' => return Err(format!("unexpected close bracket at {i}")),
208 '.' if in_bracket => type_acc.push(c),
209- '.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")),
00210 '.' => {
211 let key = std::mem::take(&mut key_acc);
212 assert!(type_acc.is_empty());
···225 Ok(out)
226}
227228-#[derive(Debug, Clone, PartialEq)]
229pub enum RefShape {
230 StrongRef,
231 AtUri,
···233 Did,
234 Handle,
235 AtIdentifier,
236- Blob,
237- // TODO: blob with type?
238}
239240impl TryFrom<&str> for RefShape {
···247 "did" => Ok(Self::Did),
248 "handle" => Ok(Self::Handle),
249 "at-identifier" => Ok(Self::AtIdentifier),
250- "blob" => Ok(Self::Blob),
251 _ => Err(format!("unknown shape: {s}")),
252 }
253 }
254}
255256#[derive(Debug, PartialEq)]
000000000000000000000000000000257pub enum MatchedRef {
258- AtUri {
259- uri: String,
260- cid: Option<String>,
261- },
262- Identifier(String),
263- Blob {
264- link: String,
265- mime: String,
266- size: u64,
267- }
268}
269270-pub fn match_shape(shape: &RefShape, val: &Value) -> Option<MatchedRef> {
271 // TODO: actually validate at-uri format
272 // TODO: actually validate everything else also
273 // TODO: should this function normalize identifiers to DIDs probably?
···276 RefShape::StrongRef => {
277 let o = val.as_object()?;
278 let uri = o.get("uri")?.as_str()?.to_string();
279- let cid = o.get("cid")?.as_str()?.to_string();
280- Some(MatchedRef::AtUri { uri, cid: Some(cid) })
000000281 }
282 RefShape::AtUri => {
283 let uri = val.as_str()?.to_string();
284- Some(MatchedRef::AtUri { uri, cid: None })
000000285 }
286 RefShape::AtUriParts => {
287 let o = val.as_object()?;
288- let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string();
289- let collection = o.get("collection")?.as_str()?.to_string();
290- let rkey = o.get("rkey")?.as_str()?.to_string();
291- let uri = format!("at://{identifier}/{collection}/{rkey}");
292- let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string);
293- Some(MatchedRef::AtUri { uri, cid })
0000000294 }
295 RefShape::Did => {
296- let id = val.as_str()?;
297- if !id.starts_with("did:") {
298- return None;
299- }
300- Some(MatchedRef::Identifier(id.to_string()))
301 }
302 RefShape::Handle => {
303- let id = val.as_str()?;
304- if id.contains(':') {
305- return None;
306- }
307- Some(MatchedRef::Identifier(id.to_string()))
308 }
309 RefShape::AtIdentifier => {
310- Some(MatchedRef::Identifier(val.as_str()?.to_string()))
311- }
312- RefShape::Blob => {
313- let o = val.as_object()?;
314- if o.get("$type")? != "blob" {
315- return None;
316- }
317- let link = o.get("ref")?.as_object()?.get("$link")?.as_str()?.to_string();
318- let mime = o.get("mimeType")?.as_str()?.to_string();
319- let size = o.get("size")?.as_u64()?;
320- Some(MatchedRef::Blob { link, mime, size })
321 }
322 }
323}
···343 let mut out = Vec::new();
344 for (path_parts, shape) in sources {
345 for val in PathWalker::new(&path_parts, skeleton) {
346- if let Some(matched) = match_shape(&shape, val) {
347 out.push(matched);
348 }
349 }
···357}
358impl<'a> PathWalker<'a> {
359 fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
360- Self { todo: vec![(path_parts, skeleton)] }
00361 }
362}
363impl<'a> Iterator for PathWalker<'a> {
···382 let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
383 continue;
384 };
385- for v in a
386- .iter()
387- .rev()
388- .filter(|c| {
389- let Some(t) = t else { return true };
390- c
391- .as_object()
392- .and_then(|o| o.get("$type"))
393- .and_then(|v| v.as_str())
394- .map(|s| s == t)
395- .unwrap_or(false)
396- })
397- {
398 self.todo.push((rest, v))
399 }
400 }
···403 }
404}
405406-407#[cfg(test)]
408mod tests {
409 use super::*;
410 use serde_json::json;
41100412 #[test]
413 fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
414 let cases = [
415 ("", vec![]),
416 ("subject", vec![PathPart::Scalar("subject".into())]),
417 ("authorDid", vec![PathPart::Scalar("authorDid".into())]),
418- ("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]),
000000419 ("members[]", vec![PathPart::Vector("members".into(), None)]),
420- ("add[].key", vec![
421- PathPart::Vector("add".into(), None),
422- PathPart::Scalar("key".into()),
423- ]),
000424 ("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
425- ("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]),
426- ("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![
427- PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
428- PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())),
429- PathPart::Scalar("did".into()),
430- ]),
000000000431 ];
432433 for (path, expected) in cases {
···444 ("strong-ref", json!(""), None),
445 ("strong-ref", json!({}), None),
446 ("strong-ref", json!({ "uri": "abc" }), None),
447- ("strong-ref", json!({ "cid": "def" }), None),
448 (
449 "strong-ref",
450- json!({ "uri": "abc", "cid": "def" }),
451- Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }),
00000452 ),
453 ("at-uri", json!({ "uri": "abc" }), None),
454- ("at-uri", json!({ "uri": "abc", "cid": "def" }), None),
455 (
456 "at-uri",
457- json!("abc"),
458- Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }),
0000000000459 ),
460 ("at-uri-parts", json!("abc"), None),
461 ("at-uri-parts", json!({}), None),
462 (
463 "at-uri-parts",
464- json!({"repo": "a", "collection": "b", "rkey": "c"}),
465- Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
00000466 ),
467 (
468 "at-uri-parts",
469- json!({"did": "a", "collection": "b", "rkey": "c"}),
470- Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
00000471 ),
472 (
473 "at-uri-parts",
474 // 'repo' takes precedence over 'did'
475- json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}),
476- Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }),
00000477 ),
478 (
479 "at-uri-parts",
480- json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}),
481- Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }),
00000482 ),
483 (
484 "at-uri-parts",
485- json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}),
486- Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
00000487 ),
488 ("did", json!({}), None),
489 ("did", json!(""), None),
490 ("did", json!("bad-example.com"), None),
491- ("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
0000492 ("handle", json!({}), None),
493- ("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
0000494 ("handle", json!("did:plc:xyz"), None),
495 ("at-identifier", json!({}), None),
496- ("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
497- ("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
00000000498 ];
499- for (shape, val, expected) in cases {
500 let s = shape.try_into().unwrap();
501- let matched = match_shape(&s, &val);
502- assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}");
503 }
504 }
505}