···1+use serde::Deserialize;
2+use url::Url;
3+use std::{collections::HashMap, time::Duration};
4+use crate::{Repo, server::HydrationSource, error::ProxyError};
5+use reqwest::Client;
6+use serde_json::{Map, Value};
7+8+pub enum ParamValue {
9+ String(Vec<String>),
10+ Int(Vec<i64>),
11+ Bool(Vec<bool>),
12+}
13+pub struct Params(HashMap<String, ParamValue>);
14+15+impl TryFrom<Map<String, Value>> for Params {
16+ type Error = (); // TODO
17+ fn try_from(val: Map<String, Value>) -> Result<Self, Self::Error> {
18+ let mut out = HashMap::new();
19+ for (k, v) in val {
20+ match v {
21+ Value::String(s) => out.insert(k, ParamValue::String(vec![s])),
22+ Value::Bool(b) => out.insert(k, ParamValue::Bool(vec![b])),
23+ Value::Number(n) => {
24+ let Some(i) = n.as_i64() else {
25+ return Err(());
26+ };
27+ out.insert(k, ParamValue::Int(vec![i]))
28+ }
29+ Value::Array(a) => {
30+ let Some(first) = a.first() else {
31+ continue;
32+ };
33+ if first.is_string() {
34+ let mut vals = Vec::with_capacity(a.len());
35+ for v in a {
36+ let Some(v) = v.as_str() else {
37+ return Err(());
38+ };
39+ vals.push(v.to_string());
40+ }
41+ out.insert(k, ParamValue::String(vals));
42+ } else if first.is_i64() {
43+ let mut vals = Vec::with_capacity(a.len());
44+ for v in a {
45+ let Some(v) = v.as_i64() else {
46+ return Err(());
47+ };
48+ vals.push(v);
49+ }
50+ out.insert(k, ParamValue::Int(vals));
51+ } else if first.is_boolean() {
52+ let mut vals = Vec::with_capacity(a.len());
53+ for v in a {
54+ let Some(v) = v.as_bool() else {
55+ return Err(());
56+ };
57+ vals.push(v);
58+ }
59+ out.insert(k, ParamValue::Bool(vals));
60+ }
61+ todo!();
62+ }
63+ _ => return Err(()),
64+ };
65+ }
66+67+ Ok(Self(out))
68+ }
69+}
70+71+#[derive(Clone)]
72+pub struct Proxy {
73+ repo: Repo,
74+ client: Client,
75+}
76+77+impl Proxy {
78+ pub fn new(repo: Repo) -> Self {
79+ let client = Client::builder()
80+ .user_agent(format!(
81+ "microcosm slingshot v{} (contact: @bad-example.com)",
82+ env!("CARGO_PKG_VERSION")
83+ ))
84+ .no_proxy()
85+ .timeout(Duration::from_secs(6))
86+ .build()
87+ .unwrap();
88+ Self { repo, client }
89+ }
90+91+ pub async fn proxy(
92+ &self,
93+ xrpc: String,
94+ service: String,
95+ params: Option<Map<String, Value>>,
96+ ) -> Result<Value, ProxyError> {
97+98+ // hackin it to start
99+100+ // 1. assume did-web (TODO) and get the did doc
101+ #[derive(Debug, Deserialize)]
102+ struct ServiceDoc {
103+ id: String,
104+ service: Vec<ServiceItem>,
105+ }
106+ #[derive(Debug, Deserialize)]
107+ struct ServiceItem {
108+ id: String,
109+ #[expect(unused)]
110+ r#type: String,
111+ #[serde(rename = "serviceEndpoint")]
112+ service_endpoint: Url,
113+ }
114+ let dw = service.strip_prefix("did:web:").expect("a did web");
115+ let (dw, service_id) = dw.split_once("#").expect("whatever");
116+ let mut dw_url = Url::parse(&format!("https://{dw}"))?;
117+ dw_url.set_path("/.well-known/did.json");
118+ let doc: ServiceDoc = self.client
119+ .get(dw_url)
120+ .send()
121+ .await?
122+ .error_for_status()?
123+ .json()
124+ .await?;
125+126+ assert_eq!(doc.id, format!("did:web:{}", dw));
127+128+ let mut upstream = None;
129+ for ServiceItem { id, service_endpoint, .. } in doc.service {
130+ let Some((_, id)) = id.split_once("#") else { continue; };
131+ if id != service_id { continue; };
132+ upstream = Some(service_endpoint);
133+ break;
134+ }
135+136+ // 2. proxy the request forward
137+ let mut upstream = upstream.expect("to find it");
138+ upstream.set_path(&format!("/xrpc/{xrpc}")); // TODO: validate nsid
139+140+ if let Some(params) = params {
141+ let mut query = upstream.query_pairs_mut();
142+ let Params(ps) = params.try_into().expect("valid params");
143+ for (k, pvs) in ps {
144+ match pvs {
145+ ParamValue::String(s) => {
146+ for s in s {
147+ query.append_pair(&k, &s);
148+ }
149+ }
150+ ParamValue::Int(i) => {
151+ for i in i {
152+ query.append_pair(&k, &i.to_string());
153+ }
154+ }
155+ ParamValue::Bool(b) => {
156+ for b in b {
157+ query.append_pair(&k, &b.to_string());
158+ }
159+ }
160+ }
161+ }
162+ }
163+164+ // TODO: other headers to proxy
165+ Ok(self.client
166+ .get(upstream)
167+ .send()
168+ .await?
169+ .error_for_status()?
170+ .json()
171+ .await?)
172+ }
173+}
174+175+#[derive(Debug, PartialEq)]
176+pub enum PathPart {
177+ Scalar(String),
178+ Vector(String, Option<String>), // key, $type
179+}
180+181+pub fn parse_record_path(input: &str) -> Result<Vec<PathPart>, String> {
182+ let mut out = Vec::new();
183+184+ let mut key_acc = String::new();
185+ let mut type_acc = String::new();
186+ let mut in_bracket = false;
187+ let mut chars = input.chars().enumerate();
188+ while let Some((i, c)) = chars.next() {
189+ match c {
190+ '[' if in_bracket => return Err(format!("nested opening bracket not allowed, at {i}")),
191+ '[' if key_acc.is_empty() => return Err(format!("missing key before opening bracket, at {i}")),
192+ '[' => in_bracket = true,
193+ ']' if in_bracket => {
194+ in_bracket = false;
195+ let key = std::mem::take(&mut key_acc);
196+ let r#type = std::mem::take(&mut type_acc);
197+ let t = if r#type.is_empty() { None } else { Some(r#type) };
198+ out.push(PathPart::Vector(key, t));
199+ // peek ahead because we need a dot after array if there's more and i don't want to add more loop state
200+ let Some((i, c)) = chars.next() else {
201+ break;
202+ };
203+ if c != '.' {
204+ return Err(format!("expected dot after close bracket, found {c:?} at {i}"));
205+ }
206+ }
207+ ']' => return Err(format!("unexpected close bracket at {i}")),
208+ '.' if in_bracket => type_acc.push(c),
209+ '.' if key_acc.is_empty() => return Err(format!("missing key before next segment, at {i}")),
210+ '.' => {
211+ let key = std::mem::take(&mut key_acc);
212+ assert!(type_acc.is_empty());
213+ out.push(PathPart::Scalar(key));
214+ }
215+ _ if in_bracket => type_acc.push(c),
216+ _ => key_acc.push(c),
217+ }
218+ }
219+ if in_bracket {
220+ return Err("unclosed bracket".into());
221+ }
222+ if !key_acc.is_empty() {
223+ out.push(PathPart::Scalar(key_acc));
224+ }
225+ Ok(out)
226+}
227+228+#[derive(Debug, Clone, PartialEq)]
229+pub enum RefShape {
230+ StrongRef,
231+ AtUri,
232+ AtUriParts,
233+ Did,
234+ Handle,
235+ AtIdentifier,
236+ Blob,
237+ // TODO: blob with type?
238+}
239+240+impl TryFrom<&str> for RefShape {
241+ type Error = String;
242+ fn try_from(s: &str) -> Result<Self, Self::Error> {
243+ match s {
244+ "strong-ref" => Ok(Self::StrongRef),
245+ "at-uri" => Ok(Self::AtUri),
246+ "at-uri-parts" => Ok(Self::AtUriParts),
247+ "did" => Ok(Self::Did),
248+ "handle" => Ok(Self::Handle),
249+ "at-identifier" => Ok(Self::AtIdentifier),
250+ "blob" => Ok(Self::Blob),
251+ _ => Err(format!("unknown shape: {s}")),
252+ }
253+ }
254+}
255+256+#[derive(Debug, PartialEq)]
257+pub enum MatchedRef {
258+ AtUri {
259+ uri: String,
260+ cid: Option<String>,
261+ },
262+ Identifier(String),
263+ Blob {
264+ link: String,
265+ mime: String,
266+ size: u64,
267+ }
268+}
269+270+pub fn match_shape(shape: &RefShape, val: &Value) -> Option<MatchedRef> {
271+ // TODO: actually validate at-uri format
272+ // TODO: actually validate everything else also
273+ // TODO: should this function normalize identifiers to DIDs probably?
274+ // or just return at-uri parts so the caller can resolve and reassemble
275+ match shape {
276+ RefShape::StrongRef => {
277+ let o = val.as_object()?;
278+ let uri = o.get("uri")?.as_str()?.to_string();
279+ let cid = o.get("cid")?.as_str()?.to_string();
280+ Some(MatchedRef::AtUri { uri, cid: Some(cid) })
281+ }
282+ RefShape::AtUri => {
283+ let uri = val.as_str()?.to_string();
284+ Some(MatchedRef::AtUri { uri, cid: None })
285+ }
286+ RefShape::AtUriParts => {
287+ let o = val.as_object()?;
288+ let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string();
289+ let collection = o.get("collection")?.as_str()?.to_string();
290+ let rkey = o.get("rkey")?.as_str()?.to_string();
291+ let uri = format!("at://{identifier}/{collection}/{rkey}");
292+ let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string);
293+ Some(MatchedRef::AtUri { uri, cid })
294+ }
295+ RefShape::Did => {
296+ let id = val.as_str()?;
297+ if !id.starts_with("did:") {
298+ return None;
299+ }
300+ Some(MatchedRef::Identifier(id.to_string()))
301+ }
302+ RefShape::Handle => {
303+ let id = val.as_str()?;
304+ if id.contains(':') {
305+ return None;
306+ }
307+ Some(MatchedRef::Identifier(id.to_string()))
308+ }
309+ RefShape::AtIdentifier => {
310+ Some(MatchedRef::Identifier(val.as_str()?.to_string()))
311+ }
312+ RefShape::Blob => {
313+ let o = val.as_object()?;
314+ if o.get("$type")? != "blob" {
315+ return None;
316+ }
317+ let link = o.get("ref")?.as_object()?.get("$link")?.as_str()?.to_string();
318+ let mime = o.get("mimeType")?.as_str()?.to_string();
319+ let size = o.get("size")?.as_u64()?;
320+ Some(MatchedRef::Blob { link, mime, size })
321+ }
322+ }
323+}
324+325+// TODO: send back metadata about the matching
326+pub fn extract_links(
327+ sources: Vec<HydrationSource>,
328+ skeleton: &Value,
329+) -> Result<Vec<MatchedRef>, String> {
330+ // collect early to catch errors from the client
331+ // (TODO maybe the handler should do this and pass in the processed stuff probably definitely yeah)
332+ let sources = sources
333+ .into_iter()
334+ .map(|HydrationSource { path, shape }| {
335+ let path_parts = parse_record_path(&path)?;
336+ let shape: RefShape = shape.as_str().try_into()?;
337+ Ok((path_parts, shape))
338+ })
339+ .collect::<Result<Vec<_>, String>>()?;
340+341+ // lazy first impl, just re-walk the skeleton as many times as needed
342+ // not deduplicating for now
343+ let mut out = Vec::new();
344+ for (path_parts, shape) in sources {
345+ for val in PathWalker::new(&path_parts, skeleton) {
346+ if let Some(matched) = match_shape(&shape, val) {
347+ out.push(matched);
348+ }
349+ }
350+ }
351+352+ Ok(out)
353+}
354+355+struct PathWalker<'a> {
356+ todo: Vec<(&'a [PathPart], &'a Value)>,
357+}
358+impl<'a> PathWalker<'a> {
359+ fn new(path_parts: &'a [PathPart], skeleton: &'a Value) -> Self {
360+ Self { todo: vec![(path_parts, skeleton)] }
361+ }
362+}
363+impl<'a> Iterator for PathWalker<'a> {
364+ type Item = &'a Value;
365+ fn next(&mut self) -> Option<Self::Item> {
366+ loop {
367+ let (parts, val) = self.todo.pop()?;
368+ let Some((part, rest)) = parts.split_first() else {
369+ return Some(val);
370+ };
371+ let Some(o) = val.as_object() else {
372+ continue;
373+ };
374+ match part {
375+ PathPart::Scalar(k) => {
376+ let Some(v) = o.get(k) else {
377+ continue;
378+ };
379+ self.todo.push((rest, v));
380+ }
381+ PathPart::Vector(k, t) => {
382+ let Some(a) = o.get(k).and_then(|v| v.as_array()) else {
383+ continue;
384+ };
385+ for v in a
386+ .iter()
387+ .rev()
388+ .filter(|c| {
389+ let Some(t) = t else { return true };
390+ c
391+ .as_object()
392+ .and_then(|o| o.get("$type"))
393+ .and_then(|v| v.as_str())
394+ .map(|s| s == t)
395+ .unwrap_or(false)
396+ })
397+ {
398+ self.todo.push((rest, v))
399+ }
400+ }
401+ }
402+ }
403+ }
404+}
405+406+407+#[cfg(test)]
408+mod tests {
409+ use super::*;
410+ use serde_json::json;
411+412+ #[test]
413+ fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> {
414+ let cases = [
415+ ("", vec![]),
416+ ("subject", vec![PathPart::Scalar("subject".into())]),
417+ ("authorDid", vec![PathPart::Scalar("authorDid".into())]),
418+ ("subject.uri", vec![PathPart::Scalar("subject".into()), PathPart::Scalar("uri".into())]),
419+ ("members[]", vec![PathPart::Vector("members".into(), None)]),
420+ ("add[].key", vec![
421+ PathPart::Vector("add".into(), None),
422+ PathPart::Scalar("key".into()),
423+ ]),
424+ ("a[b]", vec![PathPart::Vector("a".into(), Some("b".into()))]),
425+ ("a[b.c]", vec![PathPart::Vector("a".into(), Some("b.c".into()))]),
426+ ("facets[app.bsky.richtext.facet].features[app.bsky.richtext.facet#mention].did", vec![
427+ PathPart::Vector("facets".into(), Some("app.bsky.richtext.facet".into())),
428+ PathPart::Vector("features".into(), Some("app.bsky.richtext.facet#mention".into())),
429+ PathPart::Scalar("did".into()),
430+ ]),
431+ ];
432+433+ for (path, expected) in cases {
434+ let parsed = parse_record_path(path)?;
435+ assert_eq!(parsed, expected, "path: {path:?}");
436+ }
437+438+ Ok(())
439+ }
440+441+ #[test]
442+ fn test_match_shape() {
443+ let cases = [
444+ ("strong-ref", json!(""), None),
445+ ("strong-ref", json!({}), None),
446+ ("strong-ref", json!({ "uri": "abc" }), None),
447+ ("strong-ref", json!({ "cid": "def" }), None),
448+ (
449+ "strong-ref",
450+ json!({ "uri": "abc", "cid": "def" }),
451+ Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: Some("def".to_string()) }),
452+ ),
453+ ("at-uri", json!({ "uri": "abc" }), None),
454+ ("at-uri", json!({ "uri": "abc", "cid": "def" }), None),
455+ (
456+ "at-uri",
457+ json!("abc"),
458+ Some(MatchedRef::AtUri { uri: "abc".to_string(), cid: None }),
459+ ),
460+ ("at-uri-parts", json!("abc"), None),
461+ ("at-uri-parts", json!({}), None),
462+ (
463+ "at-uri-parts",
464+ json!({"repo": "a", "collection": "b", "rkey": "c"}),
465+ Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
466+ ),
467+ (
468+ "at-uri-parts",
469+ json!({"did": "a", "collection": "b", "rkey": "c"}),
470+ Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
471+ ),
472+ (
473+ "at-uri-parts",
474+ // 'repo' takes precedence over 'did'
475+ json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}),
476+ Some(MatchedRef::AtUri { uri: "at://z/b/c".to_string(), cid: None }),
477+ ),
478+ (
479+ "at-uri-parts",
480+ json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}),
481+ Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: Some("def".to_string()) }),
482+ ),
483+ (
484+ "at-uri-parts",
485+ json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}),
486+ Some(MatchedRef::AtUri { uri: "at://a/b/c".to_string(), cid: None }),
487+ ),
488+ ("did", json!({}), None),
489+ ("did", json!(""), None),
490+ ("did", json!("bad-example.com"), None),
491+ ("did", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
492+ ("handle", json!({}), None),
493+ ("handle", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
494+ ("handle", json!("did:plc:xyz"), None),
495+ ("at-identifier", json!({}), None),
496+ ("at-identifier", json!("bad-example.com"), Some(MatchedRef::Identifier("bad-example.com".to_string()))),
497+ ("at-identifier", json!("did:plc:xyz"), Some(MatchedRef::Identifier("did:plc:xyz".to_string()))),
498+ ];
499+ for (shape, val, expected) in cases {
500+ let s = shape.try_into().unwrap();
501+ let matched = match_shape(&s, &val);
502+ assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}");
503+ }
504+ }
505+}
+2-2
slingshot/src/record.rs
···1112#[derive(Debug, Serialize, Deserialize)]
13pub struct RawRecord {
14- cid: Cid,
15- record: String,
16}
1718// TODO: should be able to do typed CID
···1112#[derive(Debug, Serialize, Deserialize)]
13pub struct RawRecord {
14+ pub cid: Cid,
15+ pub record: String,
16}
1718// TODO: should be able to do typed CID
+326-12
slingshot/src/server.rs
···1use crate::{
2- CachedRecord, ErrorResponseObject, Identity, Repo,
3 error::{RecordError, ServerError},
004};
5use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
6use foyer::HybridCache;
7use links::at_uri::parse_at_uri as normalize_at_uri;
8use serde::Serialize;
9-use std::path::PathBuf;
10-use std::str::FromStr;
11-use std::sync::Arc;
12-use std::time::Instant;
13use tokio_util::sync::CancellationToken;
1415use poem::{
···24};
25use poem_openapi::{
26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
027 param::Query, payload::Json, types::Example,
28};
29···54 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
55}
5657-#[derive(Object)]
58#[oai(example = true)]
59struct XrpcErrorResponseObject {
60 /// Should correspond an error `name` in the lexicon errors array
···8788fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
89 ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
000000090 error: "InvalidRequest".to_string(),
91 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
92 }))
···191}
192193#[derive(Object)]
0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000194#[oai(example = true)]
195struct FoundDidResponseObject {
196 /// the DID, bi-directionally verified if using Slingshot
···221struct Xrpc {
222 cache: HybridCache<String, CachedRecord>,
223 identity: Identity,
0224 repo: Arc<Repo>,
225}
226···475 #[oai(example = "example_handle")]
476 Query(identifier): Query<String>,
477 ) -> ResolveMiniIDResponse {
0000478 let invalid = |reason: &'static str| {
479 ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
480 };
481482 let mut unverified_handle = None;
483- let did = match Did::new(identifier.clone()) {
484 Ok(did) => did,
485 Err(_) => {
486 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
487 return invalid("Identifier was not a valid DID or handle");
488 };
489490- match self.identity.handle_to_did(alleged_handle.clone()).await {
491 Ok(res) => {
492 if let Some(did) = res {
493 // we did it joe
···505 }
506 }
507 };
508- let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
509 return invalid("Failed to get DID doc");
510 };
511 let Some(partial_doc) = partial_doc else {
···525 "handle.invalid".to_string()
526 }
527 } else {
528- let Ok(handle_did) = self
529- .identity
530 .handle_to_did(partial_doc.unverified_handle.clone())
531 .await
532 else {
···550 }))
551 }
5520000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000553 async fn get_record_impl(
554 &self,
555 repo: String,
···748 cache: HybridCache<String, CachedRecord>,
749 identity: Identity,
750 repo: Repo,
0751 acme_domain: Option<String>,
752 acme_contact: Option<String>,
753 acme_cache_path: Option<PathBuf>,
···756 bind: std::net::SocketAddr,
757) -> Result<(), ServerError> {
758 let repo = Arc::new(repo);
0759 let api_service = OpenApiService::new(
760 Xrpc {
761 cache,
762 identity,
0763 repo,
764 },
765 "Slingshot",
···823 .with(
824 Cors::new()
825 .allow_origin_regex("*")
826- .allow_methods([Method::GET])
827 .allow_credentials(false),
828 )
829 .with(CatchPanic::new())
···1use crate::{
2+ CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3 error::{RecordError, ServerError},
4+ proxy::{extract_links, MatchedRef},
5+ record::RawRecord,
6};
7use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
8use foyer::HybridCache;
9use links::at_uri::parse_at_uri as normalize_at_uri;
10use serde::Serialize;
11+use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap};
12+use tokio::sync::mpsc;
0013use tokio_util::sync::CancellationToken;
1415use poem::{
···24};
25use poem_openapi::{
26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27+ Union,
28 param::Query, payload::Json, types::Example,
29};
30···55 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
56}
5758+#[derive(Debug, Object)]
59#[oai(example = true)]
60struct XrpcErrorResponseObject {
61 /// Should correspond an error `name` in the lexicon errors array
···8889fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
90 ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
91+ error: "InvalidRequest".to_string(),
92+ message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
93+ }))
94+}
95+96+fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
97+ ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
98 error: "InvalidRequest".to_string(),
99 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
100 }))
···199}
200201#[derive(Object)]
202+struct ProxyHydrationError {
203+ reason: String,
204+}
205+206+#[derive(Object)]
207+struct ProxyHydrationPending {
208+ url: String,
209+}
210+211+#[derive(Object)]
212+struct ProxyHydrationRecordFound {
213+ record: serde_json::Value,
214+}
215+216+#[derive(Object)]
217+struct ProxyHydrationIdentifierFound {
218+ mini_doc: MiniDocResponseObject,
219+}
220+221+#[derive(Object)]
222+#[oai(rename_all = "camelCase")]
223+struct ProxyHydrationBlobFound {
224+ /// cdn url
225+ link: String,
226+ mime_type: String,
227+ size: u64,
228+}
229+230+// todo: there's gotta be a supertrait that collects these?
231+use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType};
232+233+#[derive(Union)]
234+#[oai(discriminator_name = "status", rename_all = "camelCase")]
235+enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> {
236+ Error(ProxyHydrationError),
237+ Pending(ProxyHydrationPending),
238+ Found(T),
239+}
240+241+#[derive(Object)]
242+#[oai(example = true)]
243+struct ProxyHydrateResponseObject {
244+ /// The original upstream response content
245+ output: serde_json::Value,
246+ /// Any hydrated records
247+ records: HashMap<String, Hydration<ProxyHydrationRecordFound>>,
248+ /// Any hydrated identifiers
249+ ///
250+ /// TODO: "identifiers" feels wrong as the name, probably "identities"?
251+ identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>,
252+ /// Any hydrated blob CDN urls
253+ blobs: HashMap<String, Hydration<ProxyHydrationBlobFound>>,
254+}
255+impl Example for ProxyHydrateResponseObject {
256+ fn example() -> Self {
257+ Self {
258+ output: serde_json::json!({}),
259+ records: HashMap::from([
260+ ("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })),
261+ ]),
262+ identifiers: HashMap::new(),
263+ blobs: HashMap::new(),
264+ }
265+ }
266+}
267+268+#[derive(ApiResponse)]
269+#[oai(bad_request_handler = "bad_request_handler_proxy_query")]
270+enum ProxyHydrateResponse {
271+ #[oai(status = 200)]
272+ Ok(Json<ProxyHydrateResponseObject>),
273+ #[oai(status = 400)]
274+ BadRequest(XrpcError)
275+}
276+277+#[derive(Object)]
278+pub struct HydrationSource {
279+ /// Record Path syntax for locating fields
280+ pub path: String,
281+ /// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'.
282+ ///
283+ /// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys.
284+ /// - `at-uri`: string, must have all segments present (identifier, collection, rkey)
285+ /// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored.
286+ /// - `did`: string, `did` format
287+ /// - `handle`: string, `handle` format
288+ /// - `at-identifier`: string, `did` or `handle` format
289+ pub shape: String,
290+}
291+292+#[derive(Object)]
293+#[oai(example = true)]
294+struct ProxyQueryPayload {
295+ /// The NSID of the XRPC you wish to forward
296+ xrpc: String,
297+ /// The destination service the request will be forwarded to
298+ atproto_proxy: String,
299+ /// The `params` for the destination service XRPC endpoint
300+ ///
301+ /// Currently this will be passed along unchecked, but a future version of
302+ /// slingshot may attempt to do lexicon resolution to validate `params`
303+ /// based on the upstream service
304+ params: Option<serde_json::Value>,
305+ /// Paths within the response to look for at-uris that can be hydrated
306+ hydration_sources: Vec<HydrationSource>,
307+ // todo: deadline thing
308+309+}
310+impl Example for ProxyQueryPayload {
311+ fn example() -> Self {
312+ Self {
313+ xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
314+ atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
315+ params: Some(serde_json::json!({
316+ "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
317+ })),
318+ hydration_sources: vec![
319+ HydrationSource {
320+ path: "feed[].post".to_string(),
321+ shape: "at-uri".to_string(),
322+ }
323+ ],
324+ }
325+ }
326+}
327+328+#[derive(Object)]
329#[oai(example = true)]
330struct FoundDidResponseObject {
331 /// the DID, bi-directionally verified if using Slingshot
···356struct Xrpc {
357 cache: HybridCache<String, CachedRecord>,
358 identity: Identity,
359+ proxy: Arc<Proxy>,
360 repo: Arc<Repo>,
361}
362···611 #[oai(example = "example_handle")]
612 Query(identifier): Query<String>,
613 ) -> ResolveMiniIDResponse {
614+ Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await
615+ }
616+617+ async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniIDResponse {
618 let invalid = |reason: &'static str| {
619 ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
620 };
621622 let mut unverified_handle = None;
623+ let did = match Did::new(identifier.to_string()) {
624 Ok(did) => did,
625 Err(_) => {
626 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
627 return invalid("Identifier was not a valid DID or handle");
628 };
629630+ match identity.handle_to_did(alleged_handle.clone()).await {
631 Ok(res) => {
632 if let Some(did) = res {
633 // we did it joe
···645 }
646 }
647 };
648+ let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else {
649 return invalid("Failed to get DID doc");
650 };
651 let Some(partial_doc) = partial_doc else {
···665 "handle.invalid".to_string()
666 }
667 } else {
668+ let Ok(handle_did) = identity
0669 .handle_to_did(partial_doc.unverified_handle.clone())
670 .await
671 else {
···689 }))
690 }
691692+ /// com.bad-example.proxy.hydrateQueryResponse
693+ ///
694+ /// > [!important]
695+ /// > Unstable! This endpoint is experimental and may change.
696+ ///
697+ /// Fetch + include records referenced from an upstream xrpc query response
698+ #[oai(
699+ path = "/com.bad-example.proxy.hydrateQueryResponse",
700+ method = "post",
701+ tag = "ApiTags::Custom"
702+ )]
703+ async fn proxy_hydrate_query(
704+ &self,
705+ Json(payload): Json<ProxyQueryPayload>,
706+ ) -> ProxyHydrateResponse {
707+ // TODO: the Accept request header, if present, gotta be json
708+ // TODO: find any Authorization header and verify it. TBD about `aud`.
709+710+ let params = if let Some(p) = payload.params {
711+ let serde_json::Value::Object(map) = p else {
712+ panic!("params have to be an object");
713+ };
714+ Some(map)
715+ } else { None };
716+717+ match self.proxy.proxy(
718+ payload.xrpc,
719+ payload.atproto_proxy,
720+ params,
721+ ).await {
722+ Ok(skeleton) => {
723+ let links = match extract_links(payload.hydration_sources, &skeleton) {
724+ Ok(l) => l,
725+ Err(e) => {
726+ log::warn!("problem extracting: {e:?}");
727+ return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting"))
728+ }
729+ };
730+ let mut records = HashMap::new();
731+ let mut identifiers = HashMap::new();
732+ let mut blobs = HashMap::new();
733+734+ enum GetThing {
735+ Record(String, Hydration<ProxyHydrationRecordFound>),
736+ Identifier(String, Hydration<ProxyHydrationIdentifierFound>),
737+ Blob(String, Hydration<ProxyHydrationBlobFound>),
738+ }
739+740+ let (tx, mut rx) = mpsc::channel(1);
741+742+ for link in links {
743+ match link {
744+ MatchedRef::AtUri { uri, cid } => {
745+ if records.contains_key(&uri) {
746+ log::warn!("skipping duplicate record without checking cid");
747+ continue;
748+ }
749+ let mut u = url::Url::parse("https://example.com").unwrap();
750+ u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo
751+ records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending {
752+ url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc.
753+ }));
754+ let tx = tx.clone();
755+ let identity = self.identity.clone();
756+ let repo = self.repo.clone();
757+ tokio::task::spawn(async move {
758+ let rest = uri.strip_prefix("at://").unwrap();
759+ let (identifier, rest) = rest.split_once('/').unwrap();
760+ let (collection, rkey) = rest.split_once('/').unwrap();
761+762+ let did = if identifier.starts_with("did:") {
763+ Did::new(identifier.to_string()).unwrap()
764+ } else {
765+ let handle = Handle::new(identifier.to_string()).unwrap();
766+ identity.handle_to_did(handle).await.unwrap().unwrap()
767+ };
768+769+ let res = match repo.get_record(
770+ &did,
771+ &Nsid::new(collection.to_string()).unwrap(),
772+ &RecordKey::new(rkey.to_string()).unwrap(),
773+ &cid.as_ref().map(|s| Cid::from_str(s).unwrap()),
774+ ).await {
775+ Ok(CachedRecord::Deleted) =>
776+ Hydration::Error(ProxyHydrationError {
777+ reason: "record deleted".to_string(),
778+ }),
779+ Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => {
780+ if let Some(c) = cid && found_cid.as_ref().to_string() != c {
781+ log::warn!("ignoring cid mismatch");
782+ }
783+ let value = serde_json::from_str(&record).unwrap();
784+ Hydration::Found(ProxyHydrationRecordFound {
785+ record: value,
786+ })
787+ }
788+ Err(e) => {
789+ log::warn!("finally oop {e:?}");
790+ Hydration::Error(ProxyHydrationError {
791+ reason: "failed to fetch record".to_string(),
792+ })
793+ }
794+ };
795+ tx.send(GetThing::Record(uri, res)).await
796+ });
797+ }
798+ MatchedRef::Identifier(id) => {
799+ if identifiers.contains_key(&id) {
800+ continue;
801+ }
802+ let mut u = url::Url::parse("https://example.com").unwrap();
803+ u.query_pairs_mut().append_pair("identifier", &id);
804+ identifiers.insert(id.clone(), Hydration::Pending(ProxyHydrationPending {
805+ url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross
806+ }));
807+ let tx = tx.clone();
808+ let identity = self.identity.clone();
809+ tokio::task::spawn(async move {
810+ let res = match Self::resolve_mini_doc_impl(&id, identity).await {
811+ ResolveMiniIDResponse::Ok(Json(mini_doc)) => Hydration::Found(ProxyHydrationIdentifierFound {
812+ mini_doc
813+ }),
814+ ResolveMiniIDResponse::BadRequest(e) => {
815+ log::warn!("minidoc fail: {:?}", e.0);
816+ Hydration::Error(ProxyHydrationError {
817+ reason: "failed to resolve mini doc".to_string(),
818+ })
819+ }
820+ };
821+ tx.send(GetThing::Identifier(id, res)).await
822+ });
823+ }
824+ MatchedRef::Blob { link, mime, size: _ } => {
825+ if blobs.contains_key(&link) {
826+ continue;
827+ }
828+ if mime != "image/jpeg" {
829+ Hydration::<ProxyHydrationBlobFound>::Error(ProxyHydrationError {
830+ reason: "only image/jpeg supported for now".to_string(),
831+ });
832+ }
833+ todo!("oops we need to know the account too")
834+ }
835+ }
836+ }
837+ // so the channel can close when all are completed
838+ // (we shoudl be doing a timeout...)
839+ drop(tx);
840+841+ while let Some(hydration) = rx.recv().await {
842+ match hydration {
843+ GetThing::Record(uri, h) => { records.insert(uri, h); }
844+ GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); }
845+ GetThing::Blob(cid, asdf) => { blobs.insert(cid, asdf); }
846+ };
847+ }
848+849+ ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
850+ output: skeleton,
851+ records,
852+ identifiers,
853+ blobs,
854+ }))
855+ }
856+ Err(e) => {
857+ log::warn!("oh no: {e:?}");
858+ ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
859+ }
860+ }
861+862+ }
863+864 async fn get_record_impl(
865 &self,
866 repo: String,
···1059 cache: HybridCache<String, CachedRecord>,
1060 identity: Identity,
1061 repo: Repo,
1062+ proxy: Proxy,
1063 acme_domain: Option<String>,
1064 acme_contact: Option<String>,
1065 acme_cache_path: Option<PathBuf>,
···1068 bind: std::net::SocketAddr,
1069) -> Result<(), ServerError> {
1070 let repo = Arc::new(repo);
1071+ let proxy = Arc::new(proxy);
1072 let api_service = OpenApiService::new(
1073 Xrpc {
1074 cache,
1075 identity,
1076+ proxy,
1077 repo,
1078 },
1079 "Slingshot",
···1137 .with(
1138 Cors::new()
1139 .allow_origin_regex("*")
1140+ .allow_methods([Method::GET, Method::POST])
1141 .allow_credentials(false),
1142 )
1143 .with(CatchPanic::new())