···100100 UrlParseError(#[from] url::ParseError),
101101 #[error(transparent)]
102102 ReqwestError(#[from] reqwest::Error),
103103+ #[error(transparent)]
104104+ InvalidHeader(#[from] reqwest::header::InvalidHeaderValue),
105105+ #[error(transparent)]
106106+ IdentityError(#[from] IdentityError),
107107+ #[error("upstream service could not be resolved")]
108108+ ServiceNotFound,
109109+ #[error("upstream service was found but no services matched")]
110110+ ServiceNotMatched,
103111}
+208-16
slingshot/src/identity.rs
···17171818use crate::error::IdentityError;
1919use atrium_api::{
2020- did_doc::DidDocument,
2020+ did_doc::{DidDocument, Service as DidDocServic},
2121 types::string::{Did, Handle},
2222};
2323use atrium_common::resolver::Resolver;
···4141pub enum IdentityKey {
4242 Handle(Handle),
4343 Did(Did),
4444+ ServiceDid(Did),
4445}
45464647impl IdentityKey {
···4849 let s = match self {
4950 IdentityKey::Handle(h) => h.as_str(),
5051 IdentityKey::Did(d) => d.as_str(),
5252+ IdentityKey::ServiceDid(d) => d.as_str(),
5153 };
5254 std::mem::size_of::<Self>() + std::mem::size_of_val(s)
5355 }
···5961#[derive(Debug, Serialize, Deserialize)]
6062enum IdentityData {
6163 NotFound,
6262- Did(Did),
6363- Doc(PartialMiniDoc),
6464+ Did(Did), // from handle
6565+ Doc(PartialMiniDoc), // from did
6666+ ServiceDoc(MiniServiceDoc), // from service did
6467}
65686669impl IdentityVal {
···7174 IdentityData::Did(d) => std::mem::size_of_val(d.as_str()),
7275 IdentityData::Doc(d) => {
7376 std::mem::size_of_val(d.unverified_handle.as_str())
7474- + std::mem::size_of_val(d.pds.as_str())
7575- + std::mem::size_of_val(d.signing_key.as_str())
7777+ + std::mem::size_of_val(&d.pds)
7878+ + std::mem::size_of_val(&d.signing_key)
7979+ }
8080+ IdentityData::ServiceDoc(d) => {
8181+ let mut s = std::mem::size_of::<MiniServiceDoc>();
8282+ s += std::mem::size_of_val(&d.services);
8383+ for sv in &d.services {
8484+ s += std::mem::size_of_val(&sv.full_id);
8585+ s += std::mem::size_of_val(&sv.r#type);
8686+ s += std::mem::size_of_val(&sv.endpoint);
8787+ }
8888+ s
7689 }
7790 };
7891 wrapping + inner
···168181 }
169182}
170183184184+/// Simplified info from service DID docs
185185+#[derive(Debug, Clone, Serialize, Deserialize)]
186186+pub struct MiniServiceDoc {
187187+ services: Vec<MiniService>,
188188+}
189189+190190+impl MiniServiceDoc {
191191+ pub fn get(&self, id_fragment: &str, service_type: Option<&str>) -> Option<&MiniService> {
192192+ self.services.iter().find(|ms| {
193193+ ms.full_id.ends_with(id_fragment)
194194+ && service_type.map(|t| t == ms.r#type).unwrap_or(true)
195195+ })
196196+ }
197197+}
198198+199199+/// The corresponding service info
200200+#[derive(Debug, Clone, Serialize, Deserialize)]
201201+pub struct MiniService {
202202+ /// The full id
203203+ ///
204204+ /// for informational purposes only -- services are deduplicated by id fragment
205205+ full_id: String,
206206+ r#type: String,
207207+ /// HTTP endpoint for the actual service
208208+ pub endpoint: String,
209209+}
210210+211211+impl TryFrom<DidDocument> for MiniServiceDoc {
212212+ type Error = String;
213213+ fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
214214+ let mut services = Vec::new();
215215+ let mut seen = HashSet::new();
216216+217217+ for DidDocServic {
218218+ id,
219219+ r#type,
220220+ service_endpoint,
221221+ } in did_doc.service.unwrap_or(vec![])
222222+ {
223223+ let Some((_, id_fragment)) = id.rsplit_once('#') else {
224224+ continue;
225225+ };
226226+ if !seen.insert((id_fragment.to_string(), r#type.clone())) {
227227+ continue;
228228+ }
229229+ services.push(MiniService {
230230+ full_id: id,
231231+ r#type,
232232+ endpoint: service_endpoint,
233233+ });
234234+ }
235235+236236+ Ok(Self { services })
237237+ }
238238+}
239239+171240/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
172241///
173242/// the hashset allows testing for presense of items in the queue.
···296365 let now = UtcDateTime::now();
297366 let IdentityVal(last_fetch, data) = entry.value();
298367 match data {
299299- IdentityData::Doc(_) => {
300300- log::error!("identity value mixup: got a doc from a handle key (should be a did)");
301301- Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
302302- }
303368 IdentityData::NotFound => {
304369 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
305370 metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···313378 self.queue_refresh(key).await;
314379 }
315380 Ok(Some(did.clone()))
381381+ }
382382+ _ => {
383383+ log::error!("identity value mixup: got a doc from a handle key (should be a did)");
384384+ Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
316385 }
317386 }
318387 }
···362431 let now = UtcDateTime::now();
363432 let IdentityVal(last_fetch, data) = entry.value();
364433 match data {
365365- IdentityData::Did(_) => {
366366- log::error!("identity value mixup: got a did from a did key (should be a doc)");
367367- Err(IdentityError::IdentityValTypeMixup(did.to_string()))
368368- }
369434 IdentityData::NotFound => {
370435 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
371436 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
···373438 }
374439 Ok(None)
375440 }
376376- IdentityData::Doc(mini_did) => {
441441+ IdentityData::Doc(mini_doc) => {
377442 if (now - *last_fetch) >= MIN_TTL {
378443 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
379444 self.queue_refresh(key).await;
380445 }
381381- Ok(Some(mini_did.clone()))
446446+ Ok(Some(mini_doc.clone()))
447447+ }
448448+ _ => {
449449+ log::error!("identity value mixup: got a doc from a handle key (should be a did)");
450450+ Err(IdentityError::IdentityValTypeMixup(did.to_string()))
451451+ }
452452+ }
453453+ }
454454+455455+ /// Fetch (and cache) a service mini doc from a did
456456+ pub async fn did_to_mini_service_doc(
457457+ &self,
458458+ did: &Did,
459459+ ) -> Result<Option<MiniServiceDoc>, IdentityError> {
460460+ let key = IdentityKey::ServiceDid(did.clone());
461461+ metrics::counter!("slingshot_get_service_did_doc").increment(1);
462462+ let entry = self
463463+ .cache
464464+ .get_or_fetch(&key, {
465465+ let did = did.clone();
466466+ let resolver = self.did_resolver.clone();
467467+ || async move {
468468+ let t0 = Instant::now();
469469+ let (res, success) = match resolver.resolve(&did).await {
470470+ Ok(did_doc) if did_doc.id != did.to_string() => (
471471+ // TODO: fix in atrium: should verify id is did
472472+ Err(IdentityError::BadDidDoc(
473473+ "did doc's id did not match did".to_string(),
474474+ )),
475475+ "false",
476476+ ),
477477+ Ok(did_doc) => match did_doc.try_into() {
478478+ Ok(mini_service_doc) => (
479479+ Ok(IdentityVal(
480480+ UtcDateTime::now(),
481481+ IdentityData::ServiceDoc(mini_service_doc),
482482+ )),
483483+ "true",
484484+ ),
485485+ Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"),
486486+ },
487487+ Err(atrium_identity::Error::NotFound) => (
488488+ Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)),
489489+ "false",
490490+ ),
491491+ Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"),
492492+ };
493493+ metrics::histogram!("slingshot_fetch_service_did_doc", "success" => success)
494494+ .record(t0.elapsed());
495495+ res
496496+ }
497497+ })
498498+ .await?;
499499+500500+ let now = UtcDateTime::now();
501501+ let IdentityVal(last_fetch, data) = entry.value();
502502+ match data {
503503+ IdentityData::NotFound => {
504504+ if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
505505+ metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
506506+ self.queue_refresh(key).await;
507507+ }
508508+ Ok(None)
509509+ }
510510+ IdentityData::ServiceDoc(mini_service_doc) => {
511511+ if (now - *last_fetch) >= MIN_TTL {
512512+ metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
513513+ self.queue_refresh(key).await;
514514+ }
515515+ Ok(Some(mini_service_doc.clone()))
516516+ }
517517+ _ => {
518518+ log::error!(
519519+ "identity value mixup: got a doc from a different key type (should be a service did)"
520520+ );
521521+ Err(IdentityError::IdentityValTypeMixup(did.to_string()))
382522 }
383523 }
384524 }
···519659 log::warn!(
520660 "refreshed did doc failed: wrong did doc id. dropping refresh."
521661 );
662662+ self.complete_refresh(&task_key).await?;
522663 continue;
523664 }
524665 let mini_doc = match did_doc.try_into() {
···526667 Err(e) => {
527668 metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
528669 log::warn!(
529529- "converting mini doc failed: {e:?}. dropping refresh."
670670+ "converting mini doc for {did:?} failed: {e:?}. dropping refresh."
530671 );
672672+ self.complete_refresh(&task_key).await?;
531673 continue;
532674 }
533675 };
···554696 }
555697556698 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
699699+ }
700700+ IdentityKey::ServiceDid(ref did) => {
701701+ log::trace!("refreshing service did doc: {did:?}");
702702+703703+ match self.did_resolver.resolve(did).await {
704704+ Ok(did_doc) => {
705705+ // TODO: fix in atrium: should verify id is did
706706+ if did_doc.id != did.to_string() {
707707+ metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "wrong did").increment(1);
708708+ log::warn!(
709709+ "refreshed did doc failed: wrong did doc id. dropping refresh."
710710+ );
711711+ self.complete_refresh(&task_key).await?;
712712+ continue;
713713+ }
714714+ let mini_service_doc = match did_doc.try_into() {
715715+ Ok(md) => md,
716716+ Err(e) => {
717717+ metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
718718+ log::warn!(
719719+ "converting mini service doc failed: {e:?}. dropping refresh."
720720+ );
721721+ self.complete_refresh(&task_key).await?;
722722+ continue;
723723+ }
724724+ };
725725+ metrics::counter!("identity_service_did_refresh", "success" => "true")
726726+ .increment(1);
727727+ self.cache.insert(
728728+ task_key.clone(),
729729+ IdentityVal(
730730+ UtcDateTime::now(),
731731+ IdentityData::ServiceDoc(mini_service_doc),
732732+ ),
733733+ );
734734+ }
735735+ Err(atrium_identity::Error::NotFound) => {
736736+ metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "not found").increment(1);
737737+ self.cache.insert(
738738+ task_key.clone(),
739739+ IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
740740+ );
741741+ }
742742+ Err(err) => {
743743+ metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "other").increment(1);
744744+ log::warn!(
745745+ "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
746746+ );
747747+ }
748748+ }
557749 }
558750 }
559751 }
+61-25
slingshot/src/main.rs
···11-// use foyer::HybridCache;
22-// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
31use metrics_exporter_prometheus::PrometheusBuilder;
42use slingshot::{
53 Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
···97108use clap::Parser;
119use tokio_util::sync::CancellationToken;
1010+use url::Url;
12111312/// Slingshot record edge cache
1413#[derive(Parser, Debug, Clone)]
···4847 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")]
4948 #[clap(default_value_t = 1)]
5049 identity_cache_disk_gb: usize,
5050+ /// the address of this server
5151+ ///
5252+ /// used if --acme-domain is not set, defaulting to `--bind`
5353+ #[arg(long, conflicts_with("tls_domain"), env = "SLINGSHOT_PUBLIC_HOST")]
5454+ base_url: Option<Url>,
5155 /// the domain pointing to this server
5256 ///
5357 /// if present:
5458 /// - a did:web document will be served at /.well-known/did.json
5555- /// - an HTTPS certs will be automatically configured with Acme/letsencrypt
5959+ /// - the server will bind on port 443
6060+ /// - if `--acme-contact` is present, the server will bind port 80 for http
6161+ /// challenges and attempt to auto-provision certs for `--tls-domain`
6262+ /// - if `--acme-contact is absent, the server will load certs from the
6363+ /// `--tls-certs` folder, and try to reload them twice daily, guarded by
6464+ /// a lock file called `.cert-lock` in the `--tls-certs` folder.
5665 /// - TODO: a rate-limiter will be installed
5766 #[arg(
5867 long,
5968 conflicts_with("bind"),
6060- requires("acme_cache_path"),
6161- env = "SLINGSHOT_ACME_DOMAIN"
6969+ requires("tls_certs"),
7070+ env = "SLINGSHOT_TLS_DOMAIN"
7171+ )]
7272+ tls_domain: Option<String>,
7373+ /// a location to find/cache acme or other tls certs
7474+ ///
7575+ /// recommended in production, mind the file permissions.
7676+ #[arg(long, env = "SLINGSHOT_TLS_CERTS_PATH")]
7777+ tls_certs: Option<PathBuf>,
7878+ /// listen for ipv6 when using acme or other tls
7979+ ///
8080+ /// you must also configure the relevant DNS records for this to work
8181+ #[arg(long, action, requires("tls_domain"), env = "SLINGSHOT_TLS_IPV6")]
8282+ tls_ipv6: bool,
8383+ /// redirect acme http-01 challenges to this url
8484+ ///
8585+ /// useful if you're setting up a second instance that synchronizes its
8686+ /// certs from a main instance doing acme.
8787+ #[arg(
8888+ long,
8989+ conflicts_with("acme_contact"),
9090+ requires("tls_domain"),
9191+ env = "SLINGSHOT_ACME_CHALLENGE_REDIRECT"
6292 )]
6363- acme_domain: Option<String>,
9393+ acme_challenge_redirect: Option<String>,
6494 /// email address for letsencrypt contact
6595 ///
6696 /// recommended in production, i guess?
6767- #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")]
9797+ #[arg(long, requires("tls_domain"), env = "SLINGSHOT_ACME_CONTACT")]
6898 acme_contact: Option<String>,
6969- /// a location to cache acme https certs
9999+ /// use the staging environment for letsencrypt
70100 ///
7171- /// required when (and only used when) --acme-domain is specified.
7272- ///
7373- /// recommended in production, but mind the file permissions.
7474- #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")]
7575- acme_cache_path: Option<PathBuf>,
7676- /// listen for ipv6 when using acme
7777- ///
7878- /// you must also configure the relevant DNS records for this to work
7979- #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")]
8080- acme_ipv6: bool,
101101+ /// recommended to initially test out new deployments with this to avoid
102102+ /// letsencrypt rate limit problems.
103103+ #[arg(long, action, requires("acme_contact"), env = "SLINGSHOT_ACME_STAGING")]
104104+ acme_staging: bool,
81105 /// an web address to send healtcheck pings to every ~51s or so
82106 #[arg(long, env = "SLINGSHOT_HEALTHCHECK")]
83107 healthcheck: Option<String>,
···101125102126 let args = Args::parse();
103127128128+ let base_url: Url = args
129129+ .base_url
130130+ .or_else(|| {
131131+ args.tls_domain
132132+ .as_ref()
133133+ .map(|d| Url::parse(&format!("https://{d}")).unwrap())
134134+ })
135135+ .unwrap_or_else(|| Url::parse(&format!("http://{}", args.bind)).unwrap());
136136+104137 if args.collect_metrics {
105138 log::trace!("installing metrics server...");
106139 if let Err(e) = install_metrics_server(args.bind_metrics) {
···152185 log::info!("identity service ready.");
153186154187 let repo = Repo::new(identity.clone());
155155- let proxy = Proxy::new(repo.clone());
188188+ let proxy = Proxy::new(identity.clone());
156189157190 let identity_for_server = identity.clone();
158191 let server_shutdown = shutdown.clone();
···164197 identity_for_server,
165198 repo,
166199 proxy,
167167- args.acme_domain,
200200+ base_url,
201201+ args.tls_domain,
202202+ args.tls_certs,
203203+ args.tls_ipv6,
204204+ args.acme_challenge_redirect,
168205 args.acme_contact,
169169- args.acme_cache_path,
170170- args.acme_ipv6,
206206+ args.acme_staging,
171207 server_shutdown,
172208 bind,
173209 )
···236272) -> Result<(), metrics_exporter_prometheus::BuildError> {
237273 log::info!("installing metrics server...");
238274 PrometheusBuilder::new()
239239- .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
240240- .set_bucket_duration(std::time::Duration::from_secs(300))?
241241- .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
275275+ .set_buckets(&[0.001, 0.006, 0.036, 0.216, 1.296, 7.776, 45.656])?
276276+ .set_bucket_duration(std::time::Duration::from_secs(15))?
277277+ .set_bucket_count(std::num::NonZero::new(4).unwrap()) // count * duration = bucket lifetime
242278 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
243279 .with_http_listener(bind_metrics)
244280 .install()?;