Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Compare changes

Choose any two refs to compare.

+59 -861
+2 -3
Cargo.lock
··· 803 803 804 804 [[package]] 805 805 name = "bytes" 806 - version = "1.10.1" 806 + version = "1.11.1" 807 807 source = "registry+https://github.com/rust-lang/crates.io-index" 808 - checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 808 + checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" 809 809 810 810 [[package]] 811 811 name = "byteview" ··· 5965 5965 "form_urlencoded", 5966 5966 "idna", 5967 5967 "percent-encoding", 5968 - "serde", 5969 5968 ] 5970 5969 5971 5970 [[package]]
+7 -1
constellation/src/bin/main.rs
··· 45 45 #[arg(short, long)] 46 46 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 47 47 backend: StorageBackend, 48 + /// Serve a did:web document for this domain 49 + #[arg(long)] 50 + did_web_domain: Option<String>, 48 51 /// Initiate a database backup into this dir, if supported by the storage 49 52 #[arg(long)] 50 53 backup: Option<PathBuf>, ··· 103 106 MemStorage::new(), 104 107 fixture, 105 108 None, 109 + args.did_web_domain, 106 110 stream, 107 111 bind, 108 112 metrics_bind, ··· 138 142 rocks, 139 143 fixture, 140 144 args.data, 145 + args.did_web_domain, 141 146 stream, 142 147 bind, 143 148 metrics_bind, ··· 159 164 mut storage: impl LinkStorage, 160 165 fixture: Option<PathBuf>, 161 166 data_dir: Option<PathBuf>, 167 + did_web_domain: Option<String>, 162 168 stream: String, 163 169 bind: SocketAddr, 164 170 metrics_bind: SocketAddr, ··· 211 217 if collect_metrics { 212 218 install_metrics_server(metrics_bind)?; 213 219 } 214 - serve(readable, bind, staying_alive).await 220 + serve(readable, bind, did_web_domain, staying_alive).await 215 221 }) 216 222 .unwrap(); 217 223 stay_alive.drop_guard();
+32 -7
constellation/src/server/mod.rs
··· 3 3 extract::{Query, Request}, 4 4 http::{self, header}, 5 5 middleware::{self, Next}, 6 - response::{IntoResponse, Response}, 6 + response::{IntoResponse, Json, Response}, 7 7 routing::get, 8 8 Router, 9 9 }; ··· 37 37 http::StatusCode::INTERNAL_SERVER_ERROR 38 38 } 39 39 40 - pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 41 - where 42 - S: LinkReader, 43 - A: ToSocketAddrs, 44 - { 45 - let app = Router::new() 40 + pub async fn serve<S: LinkReader, A: ToSocketAddrs>( 41 + store: S, 42 + addr: A, 43 + did_web_domain: Option<String>, 44 + stay_alive: CancellationToken, 45 + ) -> anyhow::Result<()> { 46 + let mut app = Router::new(); 47 + 48 + if let Some(d) = did_web_domain { 49 + app = app.route( 50 + "/.well-known/did.json", 51 + get({ 52 + let domain = d.clone(); 53 + move || did_web(domain) 54 + }), 55 + ) 56 + } 57 + 58 + let app = app 46 59 .route("/robots.txt", get(robots)) 47 60 .route( 48 61 "/", ··· 204 217 User-agent: * 205 218 Disallow: /links 206 219 Disallow: /links/ 220 + Disallow: /xrpc/ 207 221 " 222 + } 223 + 224 + async fn did_web(domain: String) -> impl IntoResponse { 225 + Json(serde_json::json!({ 226 + "id": format!("did:web:{domain}"), 227 + "service": [{ 228 + "id": "#constellation", 229 + "type": "ConstellationGraphService", 230 + "serviceEndpoint": format!("https://{domain}") 231 + }] 232 + })) 208 233 } 209 234 210 235 #[derive(Template, Serialize, Deserialize)]
+1 -1
slingshot/Cargo.toml
··· 28 28 tokio = { version = "1.47.0", features = ["full"] } 29 29 tokio-util = "0.7.15" 30 30 tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } 31 - url = { version = "2.5.4", features = ["serde"] } 31 + url = "2.5.4"
-10
slingshot/src/error.rs
··· 91 91 #[error("upstream non-atproto bad request")] 92 92 UpstreamBadBadNotGoodRequest(reqwest::Error), 93 93 } 94 - 95 - #[derive(Debug, Error)] 96 - pub enum ProxyError { 97 - #[error("failed to parse path: {0}")] 98 - PathParseError(String), 99 - #[error(transparent)] 100 - UrlParseError(#[from] url::ParseError), 101 - #[error(transparent)] 102 - ReqwestError(#[from] reqwest::Error), 103 - }
-2
slingshot/src/lib.rs
··· 3 3 mod firehose_cache; 4 4 mod healthcheck; 5 5 mod identity; 6 - mod proxy; 7 6 mod record; 8 7 mod server; 9 8 ··· 11 10 pub use firehose_cache::firehose_cache; 12 11 pub use healthcheck::healthcheck; 13 12 pub use identity::{Identity, IdentityKey}; 14 - pub use proxy::Proxy; 15 13 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 16 14 pub use server::serve;
+3 -4
slingshot/src/main.rs
··· 2 2 // use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3 3 use metrics_exporter_prometheus::PrometheusBuilder; 4 4 use slingshot::{ 5 - Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 5 + Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6 6 }; 7 7 use std::net::SocketAddr; 8 8 use std::path::PathBuf; ··· 143 143 ) 144 144 .await 145 145 .map_err(|e| format!("identity setup failed: {e:?}"))?; 146 + 147 + log::info!("identity service ready."); 146 148 let identity_refresher = identity.clone(); 147 149 let identity_shutdown = shutdown.clone(); 148 150 tasks.spawn(async move { 149 151 identity_refresher.run_refresher(identity_shutdown).await?; 150 152 Ok(()) 151 153 }); 152 - log::info!("identity service ready."); 153 154 154 155 let repo = Repo::new(identity.clone()); 155 - let proxy = Proxy::new(repo.clone()); 156 156 157 157 let identity_for_server = identity.clone(); 158 158 let server_shutdown = shutdown.clone(); ··· 163 163 server_cache_handle, 164 164 identity_for_server, 165 165 repo, 166 - proxy, 167 166 args.acme_domain, 168 167 args.acme_contact, 169 168 args.acme_cache_path,
-505
slingshot/src/proxy.rs
··· 1 - use serde::Deserialize; 2 - use url::Url; 3 - use std::{collections::HashMap, time::Duration}; 4 - use crate::{Repo, server::HydrationSource, error::ProxyError}; 5 - use reqwest::Client; 6 - use serde_json::{Map, Value}; 7 - 8 - pub enum ParamValue { 9 - String(Vec<String>), 10 - Int(Vec<i64>), 11 - Bool(Vec<bool>), 12 - } 13 - pub struct Params(HashMap<String, ParamValue>); 14 - 15 - impl TryFrom<Map<String, Value>> for Params { 16 - type Error = (); // TODO 17 - fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> { 18 - let mut out = HashMap::new(); 19 - for (k, v) in val { 20 - match v { 21 - Value::String(s) => out.insert(k, ParamValue::String(vec![s])), 22 - Value::Bool(b) => out.insert(k, ParamValue::Bool(vec![b])), 23 - Value::Number(n) => { 24 - let Some(i) = n.as_i64() else { 25 - return Err(()); 26 - }; 27 - out.insert(k, ParamValue::Int(vec![i])) 28 - } 29 - Value::Array(a) => { 30 - let Some(first) = a.first() else { 31 - continue; 32 - }; 33 - if first.is_string() { 34 - let mut vals = Vec::with_capacity(a.len()); 35 - for v in a { 36 - let Some(v) = v.as_str() else { 37 - return Err(()); 38 - }; 39 - vals.push(v.to_string()); 40 - } 41 - out.insert(k, ParamValue::String(vals)); 42 - } else if first.is_i64() { 43 - let mut vals = Vec::with_capacity(a.len()); 44 - for v in a { 45 - let Some(v) = v.as_i64() else { 46 - return Err(()); 47 - }; 48 - vals.push(v); 49 - } 50 - out.insert(k, ParamValue::Int(vals)); 51 - } else if first.is_boolean() { 52 - let mut vals = Vec::with_capacity(a.len()); 53 - for v in a { 54 - let Some(v) = v.as_bool() else { 55 - return Err(()); 56 - }; 57 - vals.push(v); 58 - } 59 - out.insert(k, ParamValue::Bool(vals)); 60 - } 61 - todo!(); 62 - } 63 - _ => return Err(()), 64 - }; 65 - } 66 - 67 - Ok(Self(out)) 68 - } 69 - } 70 - 71 - #[derive(Clone)] 72 - pub struct Proxy { 73 - repo: Repo, 74 - client: Client, 75 - } 76 - 77 - impl Proxy { 78 - pub fn new(repo: Repo) -> Self { 79 - let client = Client::builder() 80 - .user_agent(format!( 81 - "microcosm slingshot v{} (contact: @bad-example.com)", 82 - env!("CARGO_PKG_VERSION") 83 - )) 84 - .no_proxy() 85 - .timeout(Duration::from_secs(6)) 86 - .build() 87 - .unwrap(); 88 - Self { repo, client } 89 - } 90 - 91 - pub async fn proxy( 92 - &self, 93 - xrpc: String, 94 - service: String, 95 - params: Option<Map<String, Value>>, 96 - ) -> Result<Value, ProxyError> { 97 - 98 - // hackin it to start 99 - 100 - // 1. assume did-web (TODO) and get the did doc 101 - #[derive(Debug, Deserialize)] 102 - struct ServiceDoc { 103 - id: String, 104 - service: Vec<ServiceItem>, 105 - } 106 - #[derive(Debug, Deserialize)] 107 - struct ServiceItem { 108 - id: String, 109 - #[expect(unused)] 110 - r#type: String, 111 - #[serde(rename = "serviceEndpoint")] 112 - service_endpoint: Url, 113 - } 114 - let dw = service.strip_prefix("did:web:").expect("a did web"); 115 - let (dw, service_id) = dw.split_once("#").expect("whatever"); 116 - let mut dw_url = Url::parse(&format!("https://{dw}"))?; 117 - dw_url.set_path("/.well-known/did.json"); 118 - let doc: ServiceDoc = self.client 119 - .get(dw_url) 120 - .send() 121 - .await? 122 - .error_for_status()? 123 - .json() 124 - .await?; 125 - 126 - assert_eq!(doc.id, format!("did:web:{}", dw)); 127 - 128 - let mut upstream = None; 129 - for ServiceItem { id, service_endpoint, .. } in doc.service { 130 - let Some((_, id)) = id.split_once("#") else { continue; }; 131 - if id != service_id { continue; }; 132 - upstream = Some(service_endpoint); 133 - break; 134 - } 135 - 136 - // 2. proxy the request forward 137 - let mut upstream = upstream.expect("to find it"); 138 - upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid 139 - 140 - if let Some(params) = params { 141 - let mut query = upstream.query_pairs_mut(); 142 - let Params(ps) = params.try_into().expect("valid params"); 143 - for (k, pvs) in ps { 144 - match pvs { 145 - ParamValue::String(s) => { 146 - for s in s { 147 - query.append_pair(&k, &s); 148 - } 149 - } 150 - ParamValue::Int(i) => { 151 - for i in i { 152 - query.append_pair(&k, &i.to_string()); 153 - } 154 - } 155 - ParamValue::Bool(b) => { 156 - for b in b { 157 - query.append_pair(&k, &b.to_string()); 158 - } 159 - } 160 - } 161 - } 162 - } 163 - 164 - // TODO: other headers to proxy 165 - Ok(self.client 166 - .get(upstream) 167 - .send() 168 - .await? 169 - .error_for_status()? 170 - .json() 171 - .await?) 172 - } 173 - } 174 - 175 - #[derive(Debug, PartialEq)] 176 - pub enum PathPart { 177 - Scalar(String), 178 - Vector(String, Option<String>), // key, $type 179 - } 180 - 181 - pub fn parse_record_path(input: &str) -> Result<Vec<PathPart>, String> { 182 - let mut out = Vec::new(); 183 - 184 - let mut key_acc = String::new(); 185 - let mut type_acc = String::new(); 186 - let mut in_bracket = false; 187 - let mut chars = input.chars().enumerate(); 188 - while let Some((i, c)) = chars.next() { 189 - match c { 190 - '[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")), 191 - '[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")), 192 - '[' => in_bracket = true, 193 - ']' if in_bracket => { 194 - in_bracket = false; 195 - let key = std::mem::take(&mut key_acc); 196 - let r#type = std::mem::take(&mut type_acc); 197 - let t = if r#type.is_empty() { None } else { Some(r#type) }; 198 - out.push(PathPart::Vector(key, t)); 199 - // peek ahead because we need a dot after array if there's more and i don't want to add more loop state 200 - let Some((i, c)) = chars.next() else { 201 - break; 202 - }; 203 - if c != '.' { 204 - return Err(format!("expected dot after close bracket, found {c:?} at {i}")); 205 - } 206 - } 207 - ']' => return Err(format!("unexpected close bracket at {i}")), 208 - '.' if in_bracket => type_acc.push(c), 209 - '.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")), 210 - '.' => { 211 - let key = std::mem::take(&mut key_acc); 212 - assert!(type_acc.is_empty()); 213 - out.push(PathPart::Scalar(key)); 214 - } 215 - _ if in_bracket => type_acc.push(c), 216 - _ => key_acc.push(c), 217 - } 218 - } 219 - if in_bracket { 220 - return Err("unclosed bracket".into()); 221 - } 222 - if !key_acc.is_empty() { 223 - out.push(PathPart::Scalar(key_acc)); 224 - } 225 - Ok(out) 226 - } 227 - 228 - #[derive(Debug, Clone, PartialEq)] 229 - pub enum RefShape { 230 - StrongRef, 231 - AtUri, 232 - AtUriParts, 233 - Did, 234 - Handle, 235 - AtIdentifier, 236 - Blob, 237 - // TODO: blob with type? 238 - } 239 - 240 - impl TryFrom<&str> for RefShape { 241 - type Error = String; 242 - fn try_from(s: &str) -> Result<Self, Self::Error> { 243 - match s { 244 - "strong-ref" => Ok(Self::StrongRef), 245 - "at-uri" => Ok(Self::AtUri), 246 - "at-uri-parts" => Ok(Self::AtUriParts), 247 - "did" => Ok(Self::Did), 248 - "handle" => Ok(Self::Handle), 249 - "at-identifier" => Ok(Self::AtIdentifier), 250 - "blob" => Ok(Self::Blob), 251 - _ => Err(format!("unknown shape: {s}")), 252 - } 253 - } 254 - } 255 - 256 - #[derive(Debug, PartialEq)] 257 - pub enum MatchedRef { 258 - AtUri { 259 - uri: String, 260 - cid: Option<String>, 261 - }, 262 - Identifier(String), 263 - Blob { 264 - link: String, 265 - mime: String, 266 - size: u64, 267 - } 268 - } 269 - 270 - pub fn match_shape(shape: &RefShape, val: &Value) -> Option<MatchedRef> { 271 - // TODO: actually validate at-uri format 272 - // TODO: actually validate everything else also 273 - // TODO: should this function normalize identifiers to DIDs probably? 274 - // or just return at-uri parts so the caller can resolve and reassemble 275 - match shape { 276 - RefShape::StrongRef => { 277 - let o = val.as_object()?; 278 - let uri = o.get("uri")?.as_str()?.to_string(); 279 - let cid = o.get("cid")?.as_str()?.to_string(); 280 - Some(MatchedRef::AtUri { uri, cid: Some(cid) }) 281 - } 282 - RefShape::AtUri => { 283 - let uri = val.as_str()?.to_string(); 284 - Some(MatchedRef::AtUri { uri, cid: None }) 285 - } 286 - RefShape::AtUriParts => { 287 - let o = val.as_object()?; 288 - let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string(); 289 - let collection = o.get("collection")?.as_str()?.to_string(); 290 - let rkey = o.get("rkey")?.as_str()?.to_string(); 291 - let uri = format!("at://{identifier}/{collection}/{rkey}"); 292 - let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string); 293 - Some(MatchedRef::AtUri { uri, cid }) 294 - } 295 - RefShape::Did => { 296 - let id = val.as_str()?; 297 - if !id.starts_with("did:") { 298 - return None; 299 - } 300 - Some(MatchedRef::Identifier(id.to_string())) 301 - } 302 - RefShape::Handle => { 303 - let id = val.as_str()?; 304 - if id.contains(':') { 305 - return None; 306 - } 307 - Some(MatchedRef::Identifier(id.to_string())) 308 - } 309 - RefShape::AtIdentifier => { 310 - Some(MatchedRef::Identifier(val.as_str()?.to_string())) 311 - } 312 - RefShape::Blob => { 313 - let o = val.as_object()?; 314 - if o.get("$type")? != "blob" { 315 - return None; 316 - } 317 - let link = o.get("ref")?.as_object()?.get("$link")?.as_str()?.to_string(); 318 - let mime = o.get("mimeType")?.as_str()?.to_string(); 319 - let size = o.get("size")?.as_u64()?; 320 - Some(MatchedRef::Blob { link, mime, size }) 321 - } 322 - } 323 - } 324 - 325 - // TODO: send back metadata about the matching 326 - pub fn extract_links( 327 - sources: Vec<HydrationSource>, 328 - skeleton: &Value, 329 - ) -> Result<Vec<MatchedRef>, String> { 330 - // collect early to catch errors from the client 331 - // (TODO maybe the handler should do this and pass in the processed stuff probably definitely yeah) 332 - let sources = sources 333 - .into_iter() 334 - .map(|HydrationSource { path, shape }| { 335 - let path_parts = parse_record_path(&path)?; 336 - let shape: RefShape = shape.as_str().try_into()?; 337 - Ok((path_parts, shape)) 338 - }) 339 - .collect::<Result<Vec<_>, String>>()?; 340 - 341 - // lazy first impl, just re-walk the skeleton as many times as needed 342 - // not deduplicating for now 343 - let mut out = Vec::new(); 344 - for (path_parts, shape) in sources { 345 - for val in PathWalker::new(&path_parts, skeleton) { 346 - if let Some(matched) = match_shape(&shape, val) { 347 - out.push(matched); 348 - } 349 - } 350 - } 351 - 352 - Ok(out) 353 - } 354 - 355 - struct PathWalker<'a> { 356 - todo: Vec<(&'a [PathPart], &'a Value)>, 357 - } 358 - impl<'a> PathWalker<'a> { 359 - fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self { 360 - Self { todo: vec![(path_parts, skeleton)] } 361 - } 362 - } 363 - impl<'a> Iterator for PathWalker<'a> { 364 - type Item = &'a Value; 365 - fn next(&mut self) -> Option<Self::Item> { 366 - loop { 367 - let (parts, val) = self.todo.pop()?; 368 - let Some((part, rest)) = parts.split_first() else { 369 - return Some(val); 370 - }; 371 - let Some(o) = val.as_object() else { 372 - continue; 373 - }; 374 - match part { 375 - PathPart::Scalar(k) => { 376 - let Some(v) = o.get(k) else { 377 - continue; 378 - }; 379 - self.todo.push((rest, v)); 380 - } 381 - PathPart::Vector(k, t) => { 382 - let Some(a) = o.get(k).and_then(|v| v.as_array()) else { 383 - continue; 384 - }; 385 - for v in a 386 - .iter() 387 - .rev() 388 - .filter(|c| { 389 - let Some(t) = t else { return true }; 390 - c 391 - .as_object() 392 - .and_then(|o| o.get("$type")) 393 - .and_then(|v| v.as_str()) 394 - .map(|s| s == t) 395 - .unwrap_or(false) 396 - }) 397 - { 398 - self.todo.push((rest, v)) 399 - } 400 - } 401 - } 402 - } 403 - } 404 - } 405 - 406 - 407 - #[cfg(test)] 408 - mod tests { 409 - use super::*; 410 - use serde_json::json; 411 - 412 - #[test] 413 - fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> { 414 - let cases = [ 415 - ("", vec![]), 416 - ("subject", vec![PathPart::Scalar("subject".into())]), 417 - ("authorDid", vec![PathPart::Scalar("authorDid".into())]), 418 - ("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]), 419 - ("members[]", vec![PathPart::Vector("members".into(), None)]), 420 - ("add[].key", vec![ 421 - PathPart::Vector("add".into(), None), 422 - PathPart::Scalar("key".into()), 423 - ]), 424 - ("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]), 425 - ("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]), 426 - ("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![ 427 - PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())), 428 - PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())), 429 - PathPart::Scalar("did".into()), 430 - ]), 431 - ]; 432 - 433 - for (path, expected) in cases { 434 - let parsed = parse_record_path(path)?; 435 - assert_eq!(parsed, expected, "path: {path:?}"); 436 - } 437 - 438 - Ok(()) 439 - } 440 - 441 - #[test] 442 - fn test_match_shape() { 443 - let cases = [ 444 - ("strong-ref", json!(""), None), 445 - ("strong-ref", json!({}), None), 446 - ("strong-ref", json!({ "uri": "abc" }), None), 447 - ("strong-ref", json!({ "cid": "def" }), None), 448 - ( 449 - "strong-ref", 450 - json!({ "uri": "abc", "cid": "def" }), 451 - Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }), 452 - ), 453 - ("at-uri", json!({ "uri": "abc" }), None), 454 - ("at-uri", json!({ "uri": "abc", "cid": "def" }), None), 455 - ( 456 - "at-uri", 457 - json!("abc"), 458 - Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }), 459 - ), 460 - ("at-uri-parts", json!("abc"), None), 461 - ("at-uri-parts", json!({}), None), 462 - ( 463 - "at-uri-parts", 464 - json!({"repo": "a", "collection": "b", "rkey": "c"}), 465 - Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 466 - ), 467 - ( 468 - "at-uri-parts", 469 - json!({"did": "a", "collection": "b", "rkey": "c"}), 470 - Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 471 - ), 472 - ( 473 - "at-uri-parts", 474 - // 'repo' takes precedence over 'did' 475 - json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}), 476 - Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }), 477 - ), 478 - ( 479 - "at-uri-parts", 480 - json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}), 481 - Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }), 482 - ), 483 - ( 484 - "at-uri-parts", 485 - json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}), 486 - Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 487 - ), 488 - ("did", json!({}), None), 489 - ("did", json!(""), None), 490 - ("did", json!("bad-example.com"), None), 491 - ("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))), 492 - ("handle", json!({}), None), 493 - ("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))), 494 - ("handle", json!("did:plc:xyz"), None), 495 - ("at-identifier", json!({}), None), 496 - ("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))), 497 - ("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))), 498 - ]; 499 - for (shape, val, expected) in cases { 500 - let s = shape.try_into().unwrap(); 501 - let matched = match_shape(&s, &val); 502 - assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}"); 503 - } 504 - } 505 - }
+2 -2
slingshot/src/record.rs
··· 11 11 12 12 #[derive(Debug, Serialize, Deserialize)] 13 13 pub struct RawRecord { 14 - pub cid: Cid, 15 - pub record: String, 14 + cid: Cid, 15 + record: String, 16 16 } 17 17 18 18 // TODO: should be able to do typed CID
+12 -326
slingshot/src/server.rs
··· 1 1 use crate::{ 2 - CachedRecord, ErrorResponseObject, Identity, Proxy, Repo, 2 + CachedRecord, ErrorResponseObject, Identity, Repo, 3 3 error::{RecordError, ServerError}, 4 - proxy::{extract_links, MatchedRef}, 5 - record::RawRecord, 6 4 }; 7 5 use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey}; 8 6 use foyer::HybridCache; 9 7 use links::at_uri::parse_at_uri as normalize_at_uri; 10 8 use serde::Serialize; 11 - use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap}; 12 - use tokio::sync::mpsc; 9 + use std::path::PathBuf; 10 + use std::str::FromStr; 11 + use std::sync::Arc; 12 + use std::time::Instant; 13 13 use tokio_util::sync::CancellationToken; 14 14 15 15 use poem::{ ··· 24 24 }; 25 25 use poem_openapi::{ 26 26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags, 27 - Union, 28 27 param::Query, payload::Json, types::Example, 29 28 }; 30 29 ··· 55 54 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string() 56 55 } 57 56 58 - #[derive(Debug, Object)] 57 + #[derive(Object)] 59 58 #[oai(example = true)] 60 59 struct XrpcErrorResponseObject { 61 60 /// Should correspond an error `name` in the lexicon errors array ··· 88 87 89 88 fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse { 90 89 ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject { 91 - error: "InvalidRequest".to_string(), 92 - message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 93 - })) 94 - } 95 - 96 - fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse { 97 - ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject { 98 90 error: "InvalidRequest".to_string(), 99 91 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 100 92 })) ··· 199 191 } 200 192 201 193 #[derive(Object)] 202 - struct ProxyHydrationError { 203 - reason: String, 204 - } 205 - 206 - #[derive(Object)] 207 - struct ProxyHydrationPending { 208 - url: String, 209 - } 210 - 211 - #[derive(Object)] 212 - struct ProxyHydrationRecordFound { 213 - record: serde_json::Value, 214 - } 215 - 216 - #[derive(Object)] 217 - struct ProxyHydrationIdentifierFound { 218 - mini_doc: MiniDocResponseObject, 219 - } 220 - 221 - #[derive(Object)] 222 - #[oai(rename_all = "camelCase")] 223 - struct ProxyHydrationBlobFound { 224 - /// cdn url 225 - link: String, 226 - mime_type: String, 227 - size: u64, 228 - } 229 - 230 - // todo: there's gotta be a supertrait that collects these? 231 - use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType}; 232 - 233 - #[derive(Union)] 234 - #[oai(discriminator_name = "status", rename_all = "camelCase")] 235 - enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> { 236 - Error(ProxyHydrationError), 237 - Pending(ProxyHydrationPending), 238 - Found(T), 239 - } 240 - 241 - #[derive(Object)] 242 - #[oai(example = true)] 243 - struct ProxyHydrateResponseObject { 244 - /// The original upstream response content 245 - output: serde_json::Value, 246 - /// Any hydrated records 247 - records: HashMap<String, Hydration<ProxyHydrationRecordFound>>, 248 - /// Any hydrated identifiers 249 - /// 250 - /// TODO: "identifiers" feels wrong as the name, probably "identities"? 251 - identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>, 252 - /// Any hydrated blob CDN urls 253 - blobs: HashMap<String, Hydration<ProxyHydrationBlobFound>>, 254 - } 255 - impl Example for ProxyHydrateResponseObject { 256 - fn example() -> Self { 257 - Self { 258 - output: serde_json::json!({}), 259 - records: HashMap::from([ 260 - ("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })), 261 - ]), 262 - identifiers: HashMap::new(), 263 - blobs: HashMap::new(), 264 - } 265 - } 266 - } 267 - 268 - #[derive(ApiResponse)] 269 - #[oai(bad_request_handler = "bad_request_handler_proxy_query")] 270 - enum ProxyHydrateResponse { 271 - #[oai(status = 200)] 272 - Ok(Json<ProxyHydrateResponseObject>), 273 - #[oai(status = 400)] 274 - BadRequest(XrpcError) 275 - } 276 - 277 - #[derive(Object)] 278 - pub struct HydrationSource { 279 - /// Record Path syntax for locating fields 280 - pub path: String, 281 - /// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'. 282 - /// 283 - /// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys. 284 - /// - `at-uri`: string, must have all segments present (identifier, collection, rkey) 285 - /// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored. 286 - /// - `did`: string, `did` format 287 - /// - `handle`: string, `handle` format 288 - /// - `at-identifier`: string, `did` or `handle` format 289 - pub shape: String, 290 - } 291 - 292 - #[derive(Object)] 293 - #[oai(example = true)] 294 - struct ProxyQueryPayload { 295 - /// The NSID of the XRPC you wish to forward 296 - xrpc: String, 297 - /// The destination service the request will be forwarded to 298 - atproto_proxy: String, 299 - /// The `params` for the destination service XRPC endpoint 300 - /// 301 - /// Currently this will be passed along unchecked, but a future version of 302 - /// slingshot may attempt to do lexicon resolution to validate `params` 303 - /// based on the upstream service 304 - params: Option<serde_json::Value>, 305 - /// Paths within the response to look for at-uris that can be hydrated 306 - hydration_sources: Vec<HydrationSource>, 307 - // todo: deadline thing 308 - 309 - } 310 - impl Example for ProxyQueryPayload { 311 - fn example() -> Self { 312 - Self { 313 - xrpc: "app.bsky.feed.getFeedSkeleton".to_string(), 314 - atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(), 315 - params: Some(serde_json::json!({ 316 - "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto", 317 - })), 318 - hydration_sources: vec![ 319 - HydrationSource { 320 - path: "feed[].post".to_string(), 321 - shape: "at-uri".to_string(), 322 - } 323 - ], 324 - } 325 - } 326 - } 327 - 328 - #[derive(Object)] 329 194 #[oai(example = true)] 330 195 struct FoundDidResponseObject { 331 196 /// the DID, bi-directionally verified if using Slingshot ··· 356 221 struct Xrpc { 357 222 cache: HybridCache<String, CachedRecord>, 358 223 identity: Identity, 359 - proxy: Arc<Proxy>, 360 224 repo: Arc<Repo>, 361 225 } 362 226 ··· 611 475 #[oai(example = "example_handle")] 612 476 Query(identifier): Query<String>, 613 477 ) -> ResolveMiniIDResponse { 614 - Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await 615 - } 616 - 617 - async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniIDResponse { 618 478 let invalid = |reason: &'static str| { 619 479 ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 620 480 }; 621 481 622 482 let mut unverified_handle = None; 623 - let did = match Did::new(identifier.to_string()) { 483 + let did = match Did::new(identifier.clone()) { 624 484 Ok(did) => did, 625 485 Err(_) => { 626 486 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 627 487 return invalid("Identifier was not a valid DID or handle"); 628 488 }; 629 489 630 - match identity.handle_to_did(alleged_handle.clone()).await { 490 + match self.identity.handle_to_did(alleged_handle.clone()).await { 631 491 Ok(res) => { 632 492 if let Some(did) = res { 633 493 // we did it joe ··· 645 505 } 646 506 } 647 507 }; 648 - let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else { 508 + let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else { 649 509 return invalid("Failed to get DID doc"); 650 510 }; 651 511 let Some(partial_doc) = partial_doc else { ··· 665 525 "handle.invalid".to_string() 666 526 } 667 527 } else { 668 - let Ok(handle_did) = identity 528 + let Ok(handle_did) = self 529 + .identity 669 530 .handle_to_did(partial_doc.unverified_handle.clone()) 670 531 .await 671 532 else { ··· 689 550 })) 690 551 } 691 552 692 - /// com.bad-example.proxy.hydrateQueryResponse 693 - /// 694 - /// > [!important] 695 - /// > Unstable! This endpoint is experimental and may change. 696 - /// 697 - /// Fetch + include records referenced from an upstream xrpc query response 698 - #[oai( 699 - path = "/com.bad-example.proxy.hydrateQueryResponse", 700 - method = "post", 701 - tag = "ApiTags::Custom" 702 - )] 703 - async fn proxy_hydrate_query( 704 - &self, 705 - Json(payload): Json<ProxyQueryPayload>, 706 - ) -> ProxyHydrateResponse { 707 - // TODO: the Accept request header, if present, gotta be json 708 - // TODO: find any Authorization header and verify it. TBD about `aud`. 709 - 710 - let params = if let Some(p) = payload.params { 711 - let serde_json::Value::Object(map) = p else { 712 - panic!("params have to be an object"); 713 - }; 714 - Some(map) 715 - } else { None }; 716 - 717 - match self.proxy.proxy( 718 - payload.xrpc, 719 - payload.atproto_proxy, 720 - params, 721 - ).await { 722 - Ok(skeleton) => { 723 - let links = match extract_links(payload.hydration_sources, &skeleton) { 724 - Ok(l) => l, 725 - Err(e) => { 726 - log::warn!("problem extracting: {e:?}"); 727 - return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting")) 728 - } 729 - }; 730 - let mut records = HashMap::new(); 731 - let mut identifiers = HashMap::new(); 732 - let mut blobs = HashMap::new(); 733 - 734 - enum GetThing { 735 - Record(String, Hydration<ProxyHydrationRecordFound>), 736 - Identifier(String, Hydration<ProxyHydrationIdentifierFound>), 737 - Blob(String, Hydration<ProxyHydrationBlobFound>), 738 - } 739 - 740 - let (tx, mut rx) = mpsc::channel(1); 741 - 742 - for link in links { 743 - match link { 744 - MatchedRef::AtUri { uri, cid } => { 745 - if records.contains_key(&uri) { 746 - log::warn!("skipping duplicate record without checking cid"); 747 - continue; 748 - } 749 - let mut u = url::Url::parse("https://example.com").unwrap(); 750 - u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo 751 - records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending { 752 - url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc. 753 - })); 754 - let tx = tx.clone(); 755 - let identity = self.identity.clone(); 756 - let repo = self.repo.clone(); 757 - tokio::task::spawn(async move { 758 - let rest = uri.strip_prefix("at://").unwrap(); 759 - let (identifier, rest) = rest.split_once('/').unwrap(); 760 - let (collection, rkey) = rest.split_once('/').unwrap(); 761 - 762 - let did = if identifier.starts_with("did:") { 763 - Did::new(identifier.to_string()).unwrap() 764 - } else { 765 - let handle = Handle::new(identifier.to_string()).unwrap(); 766 - identity.handle_to_did(handle).await.unwrap().unwrap() 767 - }; 768 - 769 - let res = match repo.get_record( 770 - &did, 771 - &Nsid::new(collection.to_string()).unwrap(), 772 - &RecordKey::new(rkey.to_string()).unwrap(), 773 - &cid.as_ref().map(|s| Cid::from_str(s).unwrap()), 774 - ).await { 775 - Ok(CachedRecord::Deleted) => 776 - Hydration::Error(ProxyHydrationError { 777 - reason: "record deleted".to_string(), 778 - }), 779 - Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => { 780 - if let Some(c) = cid && found_cid.as_ref().to_string() != c { 781 - log::warn!("ignoring cid mismatch"); 782 - } 783 - let value = serde_json::from_str(&record).unwrap(); 784 - Hydration::Found(ProxyHydrationRecordFound { 785 - record: value, 786 - }) 787 - } 788 - Err(e) => { 789 - log::warn!("finally oop {e:?}"); 790 - Hydration::Error(ProxyHydrationError { 791 - reason: "failed to fetch record".to_string(), 792 - }) 793 - } 794 - }; 795 - tx.send(GetThing::Record(uri, res)).await 796 - }); 797 - } 798 - MatchedRef::Identifier(id) => { 799 - if identifiers.contains_key(&id) { 800 - continue; 801 - } 802 - let mut u = url::Url::parse("https://example.com").unwrap(); 803 - u.query_pairs_mut().append_pair("identifier", &id); 804 - identifiers.insert(id.clone(), Hydration::Pending(ProxyHydrationPending { 805 - url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross 806 - })); 807 - let tx = tx.clone(); 808 - let identity = self.identity.clone(); 809 - tokio::task::spawn(async move { 810 - let res = match Self::resolve_mini_doc_impl(&id, identity).await { 811 - ResolveMiniIDResponse::Ok(Json(mini_doc)) => Hydration::Found(ProxyHydrationIdentifierFound { 812 - mini_doc 813 - }), 814 - ResolveMiniIDResponse::BadRequest(e) => { 815 - log::warn!("minidoc fail: {:?}", e.0); 816 - Hydration::Error(ProxyHydrationError { 817 - reason: "failed to resolve mini doc".to_string(), 818 - }) 819 - } 820 - }; 821 - tx.send(GetThing::Identifier(id, res)).await 822 - }); 823 - } 824 - MatchedRef::Blob { link, mime, size: _ } => { 825 - if blobs.contains_key(&link) { 826 - continue; 827 - } 828 - if mime != "image/jpeg" { 829 - Hydration::<ProxyHydrationBlobFound>::Error(ProxyHydrationError { 830 - reason: "only image/jpeg supported for now".to_string(), 831 - }); 832 - } 833 - todo!("oops we need to know the account too") 834 - } 835 - } 836 - } 837 - // so the channel can close when all are completed 838 - // (we shoudl be doing a timeout...) 839 - drop(tx); 840 - 841 - while let Some(hydration) = rx.recv().await { 842 - match hydration { 843 - GetThing::Record(uri, h) => { records.insert(uri, h); } 844 - GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); } 845 - GetThing::Blob(cid, asdf) => { blobs.insert(cid, asdf); } 846 - }; 847 - } 848 - 849 - ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject { 850 - output: skeleton, 851 - records, 852 - identifiers, 853 - blobs, 854 - })) 855 - } 856 - Err(e) => { 857 - log::warn!("oh no: {e:?}"); 858 - ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry")) 859 - } 860 - } 861 - 862 - } 863 - 864 553 async fn get_record_impl( 865 554 &self, 866 555 repo: String, ··· 1059 748 cache: HybridCache<String, CachedRecord>, 1060 749 identity: Identity, 1061 750 repo: Repo, 1062 - proxy: Proxy, 1063 751 acme_domain: Option<String>, 1064 752 acme_contact: Option<String>, 1065 753 acme_cache_path: Option<PathBuf>, ··· 1068 756 bind: std::net::SocketAddr, 1069 757 ) -> Result<(), ServerError> { 1070 758 let repo = Arc::new(repo); 1071 - let proxy = Arc::new(proxy); 1072 759 let api_service = OpenApiService::new( 1073 760 Xrpc { 1074 761 cache, 1075 762 identity, 1076 - proxy, 1077 763 repo, 1078 764 }, 1079 765 "Slingshot", ··· 1137 823 .with( 1138 824 Cors::new() 1139 825 .allow_origin_regex("*") 1140 - .allow_methods([Method::GET, Method::POST]) 826 + .allow_methods([Method::GET]) 1141 827 .allow_credentials(false), 1142 828 ) 1143 829 .with(CatchPanic::new())