Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Compare changes

Choose any two refs to compare.

+861 -59
+3 -2
Cargo.lock
··· 803 803 804 804 [[package]] 805 805 name = "bytes" 806 - version = "1.11.1" 806 + version = "1.10.1" 807 807 source = "registry+https://github.com/rust-lang/crates.io-index" 808 - checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" 808 + checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 809 809 810 810 [[package]] 811 811 name = "byteview" ··· 5965 5965 "form_urlencoded", 5966 5966 "idna", 5967 5967 "percent-encoding", 5968 + "serde", 5968 5969 ] 5969 5970 5970 5971 [[package]]
+1 -7
constellation/src/bin/main.rs
··· 45 45 #[arg(short, long)] 46 46 #[clap(value_enum, default_value_t = StorageBackend::Memory)] 47 47 backend: StorageBackend, 48 - /// Serve a did:web document for this domain 49 - #[arg(long)] 50 - did_web_domain: Option<String>, 51 48 /// Initiate a database backup into this dir, if supported by the storage 52 49 #[arg(long)] 53 50 backup: Option<PathBuf>, ··· 106 103 MemStorage::new(), 107 104 fixture, 108 105 None, 109 - args.did_web_domain, 110 106 stream, 111 107 bind, 112 108 metrics_bind, ··· 142 138 rocks, 143 139 fixture, 144 140 args.data, 145 - args.did_web_domain, 146 141 stream, 147 142 bind, 148 143 metrics_bind, ··· 164 159 mut storage: impl LinkStorage, 165 160 fixture: Option<PathBuf>, 166 161 data_dir: Option<PathBuf>, 167 - did_web_domain: Option<String>, 168 162 stream: String, 169 163 bind: SocketAddr, 170 164 metrics_bind: SocketAddr, ··· 217 211 if collect_metrics { 218 212 install_metrics_server(metrics_bind)?; 219 213 } 220 - serve(readable, bind, did_web_domain, staying_alive).await 214 + serve(readable, bind, staying_alive).await 221 215 }) 222 216 .unwrap(); 223 217 stay_alive.drop_guard();
+7 -32
constellation/src/server/mod.rs
··· 3 3 extract::{Query, Request}, 4 4 http::{self, header}, 5 5 middleware::{self, Next}, 6 - response::{IntoResponse, Json, Response}, 6 + response::{IntoResponse, Response}, 7 7 routing::get, 8 8 Router, 9 9 }; ··· 37 37 http::StatusCode::INTERNAL_SERVER_ERROR 38 38 } 39 39 40 - pub async fn serve<S: LinkReader, A: ToSocketAddrs>( 41 - store: S, 42 - addr: A, 43 - did_web_domain: Option<String>, 44 - stay_alive: CancellationToken, 45 - ) -> anyhow::Result<()> { 46 - let mut app = Router::new(); 47 - 48 - if let Some(d) = did_web_domain { 49 - app = app.route( 50 - "/.well-known/did.json", 51 - get({ 52 - let domain = d.clone(); 53 - move || did_web(domain) 54 - }), 55 - ) 56 - } 57 - 58 - let app = app 40 + pub async fn serve<S, A>(store: S, addr: A, stay_alive: CancellationToken) -> anyhow::Result<()> 41 + where 42 + S: LinkReader, 43 + A: ToSocketAddrs, 44 + { 45 + let app = Router::new() 59 46 .route("/robots.txt", get(robots)) 60 47 .route( 61 48 "/", ··· 217 204 User-agent: * 218 205 Disallow: /links 219 206 Disallow: /links/ 220 - Disallow: /xrpc/ 221 207 " 222 - } 223 - 224 - async fn did_web(domain: String) -> impl IntoResponse { 225 - Json(serde_json::json!({ 226 - "id": format!("did:web:{domain}"), 227 - "service": [{ 228 - "id": "#constellation", 229 - "type": "ConstellationGraphService", 230 - "serviceEndpoint": format!("https://{domain}") 231 - }] 232 - })) 233 208 } 234 209 235 210 #[derive(Template, Serialize, Deserialize)]
+1 -1
slingshot/Cargo.toml
··· 28 28 tokio = { version = "1.47.0", features = ["full"] } 29 29 tokio-util = "0.7.15" 30 30 tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } 31 - url = "2.5.4" 31 + url = { version = "2.5.4", features = ["serde"] }
+10
slingshot/src/error.rs
··· 91 91 #[error("upstream non-atproto bad request")] 92 92 UpstreamBadBadNotGoodRequest(reqwest::Error), 93 93 } 94 + 95 + #[derive(Debug, Error)] 96 + pub enum ProxyError { 97 + #[error("failed to parse path: {0}")] 98 + PathParseError(String), 99 + #[error(transparent)] 100 + UrlParseError(#[from] url::ParseError), 101 + #[error(transparent)] 102 + ReqwestError(#[from] reqwest::Error), 103 + }
+2
slingshot/src/lib.rs
··· 3 3 mod firehose_cache; 4 4 mod healthcheck; 5 5 mod identity; 6 + mod proxy; 6 7 mod record; 7 8 mod server; 8 9 ··· 10 11 pub use firehose_cache::firehose_cache; 11 12 pub use healthcheck::healthcheck; 12 13 pub use identity::{Identity, IdentityKey}; 14 + pub use proxy::Proxy; 13 15 pub use record::{CachedRecord, ErrorResponseObject, Repo}; 14 16 pub use server::serve;
+4 -3
slingshot/src/main.rs
··· 2 2 // use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3 3 use metrics_exporter_prometheus::PrometheusBuilder; 4 4 use slingshot::{ 5 - Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 5 + Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, 6 6 }; 7 7 use std::net::SocketAddr; 8 8 use std::path::PathBuf; ··· 143 143 ) 144 144 .await 145 145 .map_err(|e| format!("identity setup failed: {e:?}"))?; 146 - 147 - log::info!("identity service ready."); 148 146 let identity_refresher = identity.clone(); 149 147 let identity_shutdown = shutdown.clone(); 150 148 tasks.spawn(async move { 151 149 identity_refresher.run_refresher(identity_shutdown).await?; 152 150 Ok(()) 153 151 }); 152 + log::info!("identity service ready."); 154 153 155 154 let repo = Repo::new(identity.clone()); 155 + let proxy = Proxy::new(repo.clone()); 156 156 157 157 let identity_for_server = identity.clone(); 158 158 let server_shutdown = shutdown.clone(); ··· 163 163 server_cache_handle, 164 164 identity_for_server, 165 165 repo, 166 + proxy, 166 167 args.acme_domain, 167 168 args.acme_contact, 168 169 args.acme_cache_path,
+505
slingshot/src/proxy.rs
··· 1 + use serde::Deserialize; 2 + use url::Url; 3 + use std::{collections::HashMap, time::Duration}; 4 + use crate::{Repo, server::HydrationSource, error::ProxyError}; 5 + use reqwest::Client; 6 + use serde_json::{Map, Value}; 7 + 8 + pub enum ParamValue { 9 + String(Vec<String>), 10 + Int(Vec<i64>), 11 + Bool(Vec<bool>), 12 + } 13 + pub struct Params(HashMap<String, ParamValue>); 14 + 15 + impl TryFrom<Map<String, Value>> for Params { 16 + type Error = (); // TODO 17 + fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> { 18 + let mut out = HashMap::new(); 19 + for (k, v) in val { 20 + match v { 21 + Value::String(s) => out.insert(k, ParamValue::String(vec![s])), 22 + Value::Bool(b) => out.insert(k, ParamValue::Bool(vec![b])), 23 + Value::Number(n) => { 24 + let Some(i) = n.as_i64() else { 25 + return Err(()); 26 + }; 27 + out.insert(k, ParamValue::Int(vec![i])) 28 + } 29 + Value::Array(a) => { 30 + let Some(first) = a.first() else { 31 + continue; 32 + }; 33 + if first.is_string() { 34 + let mut vals = Vec::with_capacity(a.len()); 35 + for v in a { 36 + let Some(v) = v.as_str() else { 37 + return Err(()); 38 + }; 39 + vals.push(v.to_string()); 40 + } 41 + out.insert(k, ParamValue::String(vals)); 42 + } else if first.is_i64() { 43 + let mut vals = Vec::with_capacity(a.len()); 44 + for v in a { 45 + let Some(v) = v.as_i64() else { 46 + return Err(()); 47 + }; 48 + vals.push(v); 49 + } 50 + out.insert(k, ParamValue::Int(vals)); 51 + } else if first.is_boolean() { 52 + let mut vals = Vec::with_capacity(a.len()); 53 + for v in a { 54 + let Some(v) = v.as_bool() else { 55 + return Err(()); 56 + }; 57 + vals.push(v); 58 + } 59 + out.insert(k, ParamValue::Bool(vals)); 60 + } 61 + todo!(); 62 + } 63 + _ => return Err(()), 64 + }; 65 + } 66 + 67 + Ok(Self(out)) 68 + } 69 + } 70 + 71 + #[derive(Clone)] 72 + pub struct Proxy { 73 + repo: Repo, 74 + client: Client, 75 + } 76 + 77 + impl Proxy { 78 + pub fn new(repo: Repo) -> Self { 79 + let client = Client::builder() 80 + .user_agent(format!( 81 + "microcosm slingshot v{} (contact: @bad-example.com)", 82 + env!("CARGO_PKG_VERSION") 83 + )) 84 + .no_proxy() 85 + .timeout(Duration::from_secs(6)) 86 + .build() 87 + .unwrap(); 88 + Self { repo, client } 89 + } 90 + 91 + pub async fn proxy( 92 + &self, 93 + xrpc: String, 94 + service: String, 95 + params: Option<Map<String, Value>>, 96 + ) -> Result<Value, ProxyError> { 97 + 98 + // hackin it to start 99 + 100 + // 1. assume did-web (TODO) and get the did doc 101 + #[derive(Debug, Deserialize)] 102 + struct ServiceDoc { 103 + id: String, 104 + service: Vec<ServiceItem>, 105 + } 106 + #[derive(Debug, Deserialize)] 107 + struct ServiceItem { 108 + id: String, 109 + #[expect(unused)] 110 + r#type: String, 111 + #[serde(rename = "serviceEndpoint")] 112 + service_endpoint: Url, 113 + } 114 + let dw = service.strip_prefix("did:web:").expect("a did web"); 115 + let (dw, service_id) = dw.split_once("#").expect("whatever"); 116 + let mut dw_url = Url::parse(&format!("https://{dw}"))?; 117 + dw_url.set_path("/.well-known/did.json"); 118 + let doc: ServiceDoc = self.client 119 + .get(dw_url) 120 + .send() 121 + .await? 122 + .error_for_status()? 123 + .json() 124 + .await?; 125 + 126 + assert_eq!(doc.id, format!("did:web:{}", dw)); 127 + 128 + let mut upstream = None; 129 + for ServiceItem { id, service_endpoint, .. } in doc.service { 130 + let Some((_, id)) = id.split_once("#") else { continue; }; 131 + if id != service_id { continue; }; 132 + upstream = Some(service_endpoint); 133 + break; 134 + } 135 + 136 + // 2. proxy the request forward 137 + let mut upstream = upstream.expect("to find it"); 138 + upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid 139 + 140 + if let Some(params) = params { 141 + let mut query = upstream.query_pairs_mut(); 142 + let Params(ps) = params.try_into().expect("valid params"); 143 + for (k, pvs) in ps { 144 + match pvs { 145 + ParamValue::String(s) => { 146 + for s in s { 147 + query.append_pair(&k, &s); 148 + } 149 + } 150 + ParamValue::Int(i) => { 151 + for i in i { 152 + query.append_pair(&k, &i.to_string()); 153 + } 154 + } 155 + ParamValue::Bool(b) => { 156 + for b in b { 157 + query.append_pair(&k, &b.to_string()); 158 + } 159 + } 160 + } 161 + } 162 + } 163 + 164 + // TODO: other headers to proxy 165 + Ok(self.client 166 + .get(upstream) 167 + .send() 168 + .await? 169 + .error_for_status()? 170 + .json() 171 + .await?) 172 + } 173 + } 174 + 175 + #[derive(Debug, PartialEq)] 176 + pub enum PathPart { 177 + Scalar(String), 178 + Vector(String, Option<String>), // key, $type 179 + } 180 + 181 + pub fn parse_record_path(input: &str) -> Result<Vec<PathPart>, String> { 182 + let mut out = Vec::new(); 183 + 184 + let mut key_acc = String::new(); 185 + let mut type_acc = String::new(); 186 + let mut in_bracket = false; 187 + let mut chars = input.chars().enumerate(); 188 + while let Some((i, c)) = chars.next() { 189 + match c { 190 + '[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")), 191 + '[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")), 192 + '[' => in_bracket = true, 193 + ']' if in_bracket => { 194 + in_bracket = false; 195 + let key = std::mem::take(&mut key_acc); 196 + let r#type = std::mem::take(&mut type_acc); 197 + let t = if r#type.is_empty() { None } else { Some(r#type) }; 198 + out.push(PathPart::Vector(key, t)); 199 + // peek ahead because we need a dot after array if there's more and i don't want to add more loop state 200 + let Some((i, c)) = chars.next() else { 201 + break; 202 + }; 203 + if c != '.' { 204 + return Err(format!("expected dot after close bracket, found {c:?} at {i}")); 205 + } 206 + } 207 + ']' => return Err(format!("unexpected close bracket at {i}")), 208 + '.' if in_bracket => type_acc.push(c), 209 + '.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")), 210 + '.' => { 211 + let key = std::mem::take(&mut key_acc); 212 + assert!(type_acc.is_empty()); 213 + out.push(PathPart::Scalar(key)); 214 + } 215 + _ if in_bracket => type_acc.push(c), 216 + _ => key_acc.push(c), 217 + } 218 + } 219 + if in_bracket { 220 + return Err("unclosed bracket".into()); 221 + } 222 + if !key_acc.is_empty() { 223 + out.push(PathPart::Scalar(key_acc)); 224 + } 225 + Ok(out) 226 + } 227 + 228 + #[derive(Debug, Clone, PartialEq)] 229 + pub enum RefShape { 230 + StrongRef, 231 + AtUri, 232 + AtUriParts, 233 + Did, 234 + Handle, 235 + AtIdentifier, 236 + Blob, 237 + // TODO: blob with type? 238 + } 239 + 240 + impl TryFrom<&str> for RefShape { 241 + type Error = String; 242 + fn try_from(s: &str) -> Result<Self, Self::Error> { 243 + match s { 244 + "strong-ref" => Ok(Self::StrongRef), 245 + "at-uri" => Ok(Self::AtUri), 246 + "at-uri-parts" => Ok(Self::AtUriParts), 247 + "did" => Ok(Self::Did), 248 + "handle" => Ok(Self::Handle), 249 + "at-identifier" => Ok(Self::AtIdentifier), 250 + "blob" => Ok(Self::Blob), 251 + _ => Err(format!("unknown shape: {s}")), 252 + } 253 + } 254 + } 255 + 256 + #[derive(Debug, PartialEq)] 257 + pub enum MatchedRef { 258 + AtUri { 259 + uri: String, 260 + cid: Option<String>, 261 + }, 262 + Identifier(String), 263 + Blob { 264 + link: String, 265 + mime: String, 266 + size: u64, 267 + } 268 + } 269 + 270 + pub fn match_shape(shape: &RefShape, val: &Value) -> Option<MatchedRef> { 271 + // TODO: actually validate at-uri format 272 + // TODO: actually validate everything else also 273 + // TODO: should this function normalize identifiers to DIDs probably? 274 + // or just return at-uri parts so the caller can resolve and reassemble 275 + match shape { 276 + RefShape::StrongRef => { 277 + let o = val.as_object()?; 278 + let uri = o.get("uri")?.as_str()?.to_string(); 279 + let cid = o.get("cid")?.as_str()?.to_string(); 280 + Some(MatchedRef::AtUri { uri, cid: Some(cid) }) 281 + } 282 + RefShape::AtUri => { 283 + let uri = val.as_str()?.to_string(); 284 + Some(MatchedRef::AtUri { uri, cid: None }) 285 + } 286 + RefShape::AtUriParts => { 287 + let o = val.as_object()?; 288 + let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string(); 289 + let collection = o.get("collection")?.as_str()?.to_string(); 290 + let rkey = o.get("rkey")?.as_str()?.to_string(); 291 + let uri = format!("at://{identifier}/{collection}/{rkey}"); 292 + let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string); 293 + Some(MatchedRef::AtUri { uri, cid }) 294 + } 295 + RefShape::Did => { 296 + let id = val.as_str()?; 297 + if !id.starts_with("did:") { 298 + return None; 299 + } 300 + Some(MatchedRef::Identifier(id.to_string())) 301 + } 302 + RefShape::Handle => { 303 + let id = val.as_str()?; 304 + if id.contains(':') { 305 + return None; 306 + } 307 + Some(MatchedRef::Identifier(id.to_string())) 308 + } 309 + RefShape::AtIdentifier => { 310 + Some(MatchedRef::Identifier(val.as_str()?.to_string())) 311 + } 312 + RefShape::Blob => { 313 + let o = val.as_object()?; 314 + if o.get("$type")? != "blob" { 315 + return None; 316 + } 317 + let link = o.get("ref")?.as_object()?.get("$link")?.as_str()?.to_string(); 318 + let mime = o.get("mimeType")?.as_str()?.to_string(); 319 + let size = o.get("size")?.as_u64()?; 320 + Some(MatchedRef::Blob { link, mime, size }) 321 + } 322 + } 323 + } 324 + 325 + // TODO: send back metadata about the matching 326 + pub fn extract_links( 327 + sources: Vec<HydrationSource>, 328 + skeleton: &Value, 329 + ) -> Result<Vec<MatchedRef>, String> { 330 + // collect early to catch errors from the client 331 + // (TODO maybe the handler should do this and pass in the processed stuff probably definitely yeah) 332 + let sources = sources 333 + .into_iter() 334 + .map(|HydrationSource { path, shape }| { 335 + let path_parts = parse_record_path(&path)?; 336 + let shape: RefShape = shape.as_str().try_into()?; 337 + Ok((path_parts, shape)) 338 + }) 339 + .collect::<Result<Vec<_>, String>>()?; 340 + 341 + // lazy first impl, just re-walk the skeleton as many times as needed 342 + // not deduplicating for now 343 + let mut out = Vec::new(); 344 + for (path_parts, shape) in sources { 345 + for val in PathWalker::new(&path_parts, skeleton) { 346 + if let Some(matched) = match_shape(&shape, val) { 347 + out.push(matched); 348 + } 349 + } 350 + } 351 + 352 + Ok(out) 353 + } 354 + 355 + struct PathWalker<'a> { 356 + todo: Vec<(&'a [PathPart], &'a Value)>, 357 + } 358 + impl<'a> PathWalker<'a> { 359 + fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self { 360 + Self { todo: vec![(path_parts, skeleton)] } 361 + } 362 + } 363 + impl<'a> Iterator for PathWalker<'a> { 364 + type Item = &'a Value; 365 + fn next(&mut self) -> Option<Self::Item> { 366 + loop { 367 + let (parts, val) = self.todo.pop()?; 368 + let Some((part, rest)) = parts.split_first() else { 369 + return Some(val); 370 + }; 371 + let Some(o) = val.as_object() else { 372 + continue; 373 + }; 374 + match part { 375 + PathPart::Scalar(k) => { 376 + let Some(v) = o.get(k) else { 377 + continue; 378 + }; 379 + self.todo.push((rest, v)); 380 + } 381 + PathPart::Vector(k, t) => { 382 + let Some(a) = o.get(k).and_then(|v| v.as_array()) else { 383 + continue; 384 + }; 385 + for v in a 386 + .iter() 387 + .rev() 388 + .filter(|c| { 389 + let Some(t) = t else { return true }; 390 + c 391 + .as_object() 392 + .and_then(|o| o.get("$type")) 393 + .and_then(|v| v.as_str()) 394 + .map(|s| s == t) 395 + .unwrap_or(false) 396 + }) 397 + { 398 + self.todo.push((rest, v)) 399 + } 400 + } 401 + } 402 + } 403 + } 404 + } 405 + 406 + 407 + #[cfg(test)] 408 + mod tests { 409 + use super::*; 410 + use serde_json::json; 411 + 412 + #[test] 413 + fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> { 414 + let cases = [ 415 + ("", vec![]), 416 + ("subject", vec![PathPart::Scalar("subject".into())]), 417 + ("authorDid", vec![PathPart::Scalar("authorDid".into())]), 418 + ("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]), 419 + ("members[]", vec![PathPart::Vector("members".into(), None)]), 420 + ("add[].key", vec![ 421 + PathPart::Vector("add".into(), None), 422 + PathPart::Scalar("key".into()), 423 + ]), 424 + ("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]), 425 + ("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]), 426 + ("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![ 427 + PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())), 428 + PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())), 429 + PathPart::Scalar("did".into()), 430 + ]), 431 + ]; 432 + 433 + for (path, expected) in cases { 434 + let parsed = parse_record_path(path)?; 435 + assert_eq!(parsed, expected, "path: {path:?}"); 436 + } 437 + 438 + Ok(()) 439 + } 440 + 441 + #[test] 442 + fn test_match_shape() { 443 + let cases = [ 444 + ("strong-ref", json!(""), None), 445 + ("strong-ref", json!({}), None), 446 + ("strong-ref", json!({ "uri": "abc" }), None), 447 + ("strong-ref", json!({ "cid": "def" }), None), 448 + ( 449 + "strong-ref", 450 + json!({ "uri": "abc", "cid": "def" }), 451 + Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }), 452 + ), 453 + ("at-uri", json!({ "uri": "abc" }), None), 454 + ("at-uri", json!({ "uri": "abc", "cid": "def" }), None), 455 + ( 456 + "at-uri", 457 + json!("abc"), 458 + Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }), 459 + ), 460 + ("at-uri-parts", json!("abc"), None), 461 + ("at-uri-parts", json!({}), None), 462 + ( 463 + "at-uri-parts", 464 + json!({"repo": "a", "collection": "b", "rkey": "c"}), 465 + Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 466 + ), 467 + ( 468 + "at-uri-parts", 469 + json!({"did": "a", "collection": "b", "rkey": "c"}), 470 + Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 471 + ), 472 + ( 473 + "at-uri-parts", 474 + // 'repo' takes precedence over 'did' 475 + json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}), 476 + Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }), 477 + ), 478 + ( 479 + "at-uri-parts", 480 + json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}), 481 + Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }), 482 + ), 483 + ( 484 + "at-uri-parts", 485 + json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}), 486 + Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }), 487 + ), 488 + ("did", json!({}), None), 489 + ("did", json!(""), None), 490 + ("did", json!("bad-example.com"), None), 491 + ("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))), 492 + ("handle", json!({}), None), 493 + ("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))), 494 + ("handle", json!("did:plc:xyz"), None), 495 + ("at-identifier", json!({}), None), 496 + ("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))), 497 + ("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))), 498 + ]; 499 + for (shape, val, expected) in cases { 500 + let s = shape.try_into().unwrap(); 501 + let matched = match_shape(&s, &val); 502 + assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}"); 503 + } 504 + } 505 + }
+2 -2
slingshot/src/record.rs
··· 11 11 12 12 #[derive(Debug, Serialize, Deserialize)] 13 13 pub struct RawRecord { 14 - cid: Cid, 15 - record: String, 14 + pub cid: Cid, 15 + pub record: String, 16 16 } 17 17 18 18 // TODO: should be able to do typed CID
+326 -12
slingshot/src/server.rs
··· 1 1 use crate::{ 2 - CachedRecord, ErrorResponseObject, Identity, Repo, 2 + CachedRecord, ErrorResponseObject, Identity, Proxy, Repo, 3 3 error::{RecordError, ServerError}, 4 + proxy::{extract_links, MatchedRef}, 5 + record::RawRecord, 4 6 }; 5 7 use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey}; 6 8 use foyer::HybridCache; 7 9 use links::at_uri::parse_at_uri as normalize_at_uri; 8 10 use serde::Serialize; 9 - use std::path::PathBuf; 10 - use std::str::FromStr; 11 - use std::sync::Arc; 12 - use std::time::Instant; 11 + use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap}; 12 + use tokio::sync::mpsc; 13 13 use tokio_util::sync::CancellationToken; 14 14 15 15 use poem::{ ··· 24 24 }; 25 25 use poem_openapi::{ 26 26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags, 27 + Union, 27 28 param::Query, payload::Json, types::Example, 28 29 }; 29 30 ··· 54 55 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string() 55 56 } 56 57 57 - #[derive(Object)] 58 + #[derive(Debug, Object)] 58 59 #[oai(example = true)] 59 60 struct XrpcErrorResponseObject { 60 61 /// Should correspond an error `name` in the lexicon errors array ··· 87 88 88 89 fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse { 89 90 ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject { 91 + error: "InvalidRequest".to_string(), 92 + message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 93 + })) 94 + } 95 + 96 + fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse { 97 + ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject { 90 98 error: "InvalidRequest".to_string(), 91 99 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 92 100 })) ··· 191 199 } 192 200 193 201 #[derive(Object)] 202 + struct ProxyHydrationError { 203 + reason: String, 204 + } 205 + 206 + #[derive(Object)] 207 + struct ProxyHydrationPending { 208 + url: String, 209 + } 210 + 211 + #[derive(Object)] 212 + struct ProxyHydrationRecordFound { 213 + record: serde_json::Value, 214 + } 215 + 216 + #[derive(Object)] 217 + struct ProxyHydrationIdentifierFound { 218 + mini_doc: MiniDocResponseObject, 219 + } 220 + 221 + #[derive(Object)] 222 + #[oai(rename_all = "camelCase")] 223 + struct ProxyHydrationBlobFound { 224 + /// cdn url 225 + link: String, 226 + mime_type: String, 227 + size: u64, 228 + } 229 + 230 + // todo: there's gotta be a supertrait that collects these? 231 + use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType}; 232 + 233 + #[derive(Union)] 234 + #[oai(discriminator_name = "status", rename_all = "camelCase")] 235 + enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> { 236 + Error(ProxyHydrationError), 237 + Pending(ProxyHydrationPending), 238 + Found(T), 239 + } 240 + 241 + #[derive(Object)] 242 + #[oai(example = true)] 243 + struct ProxyHydrateResponseObject { 244 + /// The original upstream response content 245 + output: serde_json::Value, 246 + /// Any hydrated records 247 + records: HashMap<String, Hydration<ProxyHydrationRecordFound>>, 248 + /// Any hydrated identifiers 249 + /// 250 + /// TODO: "identifiers" feels wrong as the name, probably "identities"? 251 + identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>, 252 + /// Any hydrated blob CDN urls 253 + blobs: HashMap<String, Hydration<ProxyHydrationBlobFound>>, 254 + } 255 + impl Example for ProxyHydrateResponseObject { 256 + fn example() -> Self { 257 + Self { 258 + output: serde_json::json!({}), 259 + records: HashMap::from([ 260 + ("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })), 261 + ]), 262 + identifiers: HashMap::new(), 263 + blobs: HashMap::new(), 264 + } 265 + } 266 + } 267 + 268 + #[derive(ApiResponse)] 269 + #[oai(bad_request_handler = "bad_request_handler_proxy_query")] 270 + enum ProxyHydrateResponse { 271 + #[oai(status = 200)] 272 + Ok(Json<ProxyHydrateResponseObject>), 273 + #[oai(status = 400)] 274 + BadRequest(XrpcError) 275 + } 276 + 277 + #[derive(Object)] 278 + pub struct HydrationSource { 279 + /// Record Path syntax for locating fields 280 + pub path: String, 281 + /// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'. 282 + /// 283 + /// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys. 284 + /// - `at-uri`: string, must have all segments present (identifier, collection, rkey) 285 + /// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored. 286 + /// - `did`: string, `did` format 287 + /// - `handle`: string, `handle` format 288 + /// - `at-identifier`: string, `did` or `handle` format 289 + pub shape: String, 290 + } 291 + 292 + #[derive(Object)] 293 + #[oai(example = true)] 294 + struct ProxyQueryPayload { 295 + /// The NSID of the XRPC you wish to forward 296 + xrpc: String, 297 + /// The destination service the request will be forwarded to 298 + atproto_proxy: String, 299 + /// The `params` for the destination service XRPC endpoint 300 + /// 301 + /// Currently this will be passed along unchecked, but a future version of 302 + /// slingshot may attempt to do lexicon resolution to validate `params` 303 + /// based on the upstream service 304 + params: Option<serde_json::Value>, 305 + /// Paths within the response to look for at-uris that can be hydrated 306 + hydration_sources: Vec<HydrationSource>, 307 + // todo: deadline thing 308 + 309 + } 310 + impl Example for ProxyQueryPayload { 311 + fn example() -> Self { 312 + Self { 313 + xrpc: "app.bsky.feed.getFeedSkeleton".to_string(), 314 + atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(), 315 + params: Some(serde_json::json!({ 316 + "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto", 317 + })), 318 + hydration_sources: vec![ 319 + HydrationSource { 320 + path: "feed[].post".to_string(), 321 + shape: "at-uri".to_string(), 322 + } 323 + ], 324 + } 325 + } 326 + } 327 + 328 + #[derive(Object)] 194 329 #[oai(example = true)] 195 330 struct FoundDidResponseObject { 196 331 /// the DID, bi-directionally verified if using Slingshot ··· 221 356 struct Xrpc { 222 357 cache: HybridCache<String, CachedRecord>, 223 358 identity: Identity, 359 + proxy: Arc<Proxy>, 224 360 repo: Arc<Repo>, 225 361 } 226 362 ··· 475 611 #[oai(example = "example_handle")] 476 612 Query(identifier): Query<String>, 477 613 ) -> ResolveMiniIDResponse { 614 + Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await 615 + } 616 + 617 + async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniIDResponse { 478 618 let invalid = |reason: &'static str| { 479 619 ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 480 620 }; 481 621 482 622 let mut unverified_handle = None; 483 - let did = match Did::new(identifier.clone()) { 623 + let did = match Did::new(identifier.to_string()) { 484 624 Ok(did) => did, 485 625 Err(_) => { 486 626 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else { 487 627 return invalid("Identifier was not a valid DID or handle"); 488 628 }; 489 629 490 - match self.identity.handle_to_did(alleged_handle.clone()).await { 630 + match identity.handle_to_did(alleged_handle.clone()).await { 491 631 Ok(res) => { 492 632 if let Some(did) = res { 493 633 // we did it joe ··· 505 645 } 506 646 } 507 647 }; 508 - let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else { 648 + let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else { 509 649 return invalid("Failed to get DID doc"); 510 650 }; 511 651 let Some(partial_doc) = partial_doc else { ··· 525 665 "handle.invalid".to_string() 526 666 } 527 667 } else { 528 - let Ok(handle_did) = self 529 - .identity 668 + let Ok(handle_did) = identity 530 669 .handle_to_did(partial_doc.unverified_handle.clone()) 531 670 .await 532 671 else { ··· 550 689 })) 551 690 } 552 691 692 + /// com.bad-example.proxy.hydrateQueryResponse 693 + /// 694 + /// > [!important] 695 + /// > Unstable! This endpoint is experimental and may change. 696 + /// 697 + /// Fetch + include records referenced from an upstream xrpc query response 698 + #[oai( 699 + path = "/com.bad-example.proxy.hydrateQueryResponse", 700 + method = "post", 701 + tag = "ApiTags::Custom" 702 + )] 703 + async fn proxy_hydrate_query( 704 + &self, 705 + Json(payload): Json<ProxyQueryPayload>, 706 + ) -> ProxyHydrateResponse { 707 + // TODO: the Accept request header, if present, gotta be json 708 + // TODO: find any Authorization header and verify it. TBD about `aud`. 709 + 710 + let params = if let Some(p) = payload.params { 711 + let serde_json::Value::Object(map) = p else { 712 + panic!("params have to be an object"); 713 + }; 714 + Some(map) 715 + } else { None }; 716 + 717 + match self.proxy.proxy( 718 + payload.xrpc, 719 + payload.atproto_proxy, 720 + params, 721 + ).await { 722 + Ok(skeleton) => { 723 + let links = match extract_links(payload.hydration_sources, &skeleton) { 724 + Ok(l) => l, 725 + Err(e) => { 726 + log::warn!("problem extracting: {e:?}"); 727 + return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting")) 728 + } 729 + }; 730 + let mut records = HashMap::new(); 731 + let mut identifiers = HashMap::new(); 732 + let mut blobs = HashMap::new(); 733 + 734 + enum GetThing { 735 + Record(String, Hydration<ProxyHydrationRecordFound>), 736 + Identifier(String, Hydration<ProxyHydrationIdentifierFound>), 737 + Blob(String, Hydration<ProxyHydrationBlobFound>), 738 + } 739 + 740 + let (tx, mut rx) = mpsc::channel(1); 741 + 742 + for link in links { 743 + match link { 744 + MatchedRef::AtUri { uri, cid } => { 745 + if records.contains_key(&uri) { 746 + log::warn!("skipping duplicate record without checking cid"); 747 + continue; 748 + } 749 + let mut u = url::Url::parse("https://example.com").unwrap(); 750 + u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo 751 + records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending { 752 + url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc. 753 + })); 754 + let tx = tx.clone(); 755 + let identity = self.identity.clone(); 756 + let repo = self.repo.clone(); 757 + tokio::task::spawn(async move { 758 + let rest = uri.strip_prefix("at://").unwrap(); 759 + let (identifier, rest) = rest.split_once('/').unwrap(); 760 + let (collection, rkey) = rest.split_once('/').unwrap(); 761 + 762 + let did = if identifier.starts_with("did:") { 763 + Did::new(identifier.to_string()).unwrap() 764 + } else { 765 + let handle = Handle::new(identifier.to_string()).unwrap(); 766 + identity.handle_to_did(handle).await.unwrap().unwrap() 767 + }; 768 + 769 + let res = match repo.get_record( 770 + &did, 771 + &Nsid::new(collection.to_string()).unwrap(), 772 + &RecordKey::new(rkey.to_string()).unwrap(), 773 + &cid.as_ref().map(|s| Cid::from_str(s).unwrap()), 774 + ).await { 775 + Ok(CachedRecord::Deleted) => 776 + Hydration::Error(ProxyHydrationError { 777 + reason: "record deleted".to_string(), 778 + }), 779 + Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => { 780 + if let Some(c) = cid && found_cid.as_ref().to_string() != c { 781 + log::warn!("ignoring cid mismatch"); 782 + } 783 + let value = serde_json::from_str(&record).unwrap(); 784 + Hydration::Found(ProxyHydrationRecordFound { 785 + record: value, 786 + }) 787 + } 788 + Err(e) => { 789 + log::warn!("finally oop {e:?}"); 790 + Hydration::Error(ProxyHydrationError { 791 + reason: "failed to fetch record".to_string(), 792 + }) 793 + } 794 + }; 795 + tx.send(GetThing::Record(uri, res)).await 796 + }); 797 + } 798 + MatchedRef::Identifier(id) => { 799 + if identifiers.contains_key(&id) { 800 + continue; 801 + } 802 + let mut u = url::Url::parse("https://example.com").unwrap(); 803 + u.query_pairs_mut().append_pair("identifier", &id); 804 + identifiers.insert(id.clone(), Hydration::Pending(ProxyHydrationPending { 805 + url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross 806 + })); 807 + let tx = tx.clone(); 808 + let identity = self.identity.clone(); 809 + tokio::task::spawn(async move { 810 + let res = match Self::resolve_mini_doc_impl(&id, identity).await { 811 + ResolveMiniIDResponse::Ok(Json(mini_doc)) => Hydration::Found(ProxyHydrationIdentifierFound { 812 + mini_doc 813 + }), 814 + ResolveMiniIDResponse::BadRequest(e) => { 815 + log::warn!("minidoc fail: {:?}", e.0); 816 + Hydration::Error(ProxyHydrationError { 817 + reason: "failed to resolve mini doc".to_string(), 818 + }) 819 + } 820 + }; 821 + tx.send(GetThing::Identifier(id, res)).await 822 + }); 823 + } 824 + MatchedRef::Blob { link, mime, size: _ } => { 825 + if blobs.contains_key(&link) { 826 + continue; 827 + } 828 + if mime != "image/jpeg" { 829 + Hydration::<ProxyHydrationBlobFound>::Error(ProxyHydrationError { 830 + reason: "only image/jpeg supported for now".to_string(), 831 + }); 832 + } 833 + todo!("oops we need to know the account too") 834 + } 835 + } 836 + } 837 + // so the channel can close when all are completed 838 + // (we shoudl be doing a timeout...) 839 + drop(tx); 840 + 841 + while let Some(hydration) = rx.recv().await { 842 + match hydration { 843 + GetThing::Record(uri, h) => { records.insert(uri, h); } 844 + GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); } 845 + GetThing::Blob(cid, asdf) => { blobs.insert(cid, asdf); } 846 + }; 847 + } 848 + 849 + ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject { 850 + output: skeleton, 851 + records, 852 + identifiers, 853 + blobs, 854 + })) 855 + } 856 + Err(e) => { 857 + log::warn!("oh no: {e:?}"); 858 + ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry")) 859 + } 860 + } 861 + 862 + } 863 + 553 864 async fn get_record_impl( 554 865 &self, 555 866 repo: String, ··· 748 1059 cache: HybridCache<String, CachedRecord>, 749 1060 identity: Identity, 750 1061 repo: Repo, 1062 + proxy: Proxy, 751 1063 acme_domain: Option<String>, 752 1064 acme_contact: Option<String>, 753 1065 acme_cache_path: Option<PathBuf>, ··· 756 1068 bind: std::net::SocketAddr, 757 1069 ) -> Result<(), ServerError> { 758 1070 let repo = Arc::new(repo); 1071 + let proxy = Arc::new(proxy); 759 1072 let api_service = OpenApiService::new( 760 1073 Xrpc { 761 1074 cache, 762 1075 identity, 1076 + proxy, 763 1077 repo, 764 1078 }, 765 1079 "Slingshot", ··· 823 1137 .with( 824 1138 Cors::new() 825 1139 .allow_origin_regex("*") 826 - .allow_methods([Method::GET]) 1140 + .allow_methods([Method::GET, Method::POST]) 827 1141 .allow_credentials(false), 828 1142 ) 829 1143 .with(CatchPanic::new())