···1112#[derive(Debug, Serialize, Deserialize)]
13pub struct RawRecord {
14- pub cid: Cid,
15- pub record: String,
16}
1718// TODO: should be able to do typed CID
···1112#[derive(Debug, Serialize, Deserialize)]
13pub struct RawRecord {
14+ cid: Cid,
15+ record: String,
16}
1718// TODO: should be able to do typed CID
+12-326
slingshot/src/server.rs
···1use crate::{
2- CachedRecord, ErrorResponseObject, Identity, Proxy, Repo,
3 error::{RecordError, ServerError},
4- proxy::{extract_links, MatchedRef},
5- record::RawRecord,
6};
7use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
8use foyer::HybridCache;
9use links::at_uri::parse_at_uri as normalize_at_uri;
10use serde::Serialize;
11-use std::{path::PathBuf, str::FromStr, sync::Arc, time::Instant, collections::HashMap};
12-use tokio::sync::mpsc;
0013use tokio_util::sync::CancellationToken;
1415use poem::{
···24};
25use poem_openapi::{
26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
27- Union,
28 param::Query, payload::Json, types::Example,
29};
30···55 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
56}
5758-#[derive(Debug, Object)]
59#[oai(example = true)]
60struct XrpcErrorResponseObject {
61 /// Should correspond an error `name` in the lexicon errors array
···8889fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
90 ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
91- error: "InvalidRequest".to_string(),
92- message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
93- }))
94-}
95-96-fn bad_request_handler_proxy_query(err: poem::Error) -> ProxyHydrateResponse {
97- ProxyHydrateResponse::BadRequest(Json(XrpcErrorResponseObject {
98 error: "InvalidRequest".to_string(),
99 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
100 }))
···199}
200201#[derive(Object)]
202-struct ProxyHydrationError {
203- reason: String,
204-}
205-206-#[derive(Object)]
207-struct ProxyHydrationPending {
208- url: String,
209-}
210-211-#[derive(Object)]
212-struct ProxyHydrationRecordFound {
213- record: serde_json::Value,
214-}
215-216-#[derive(Object)]
217-struct ProxyHydrationIdentifierFound {
218- mini_doc: MiniDocResponseObject,
219-}
220-221-#[derive(Object)]
222-#[oai(rename_all = "camelCase")]
223-struct ProxyHydrationBlobFound {
224- /// cdn url
225- link: String,
226- mime_type: String,
227- size: u64,
228-}
229-230-// todo: there's gotta be a supertrait that collects these?
231-use poem_openapi::types::{Type, ToJSON, ParseFromJSON, IsObjectType};
232-233-#[derive(Union)]
234-#[oai(discriminator_name = "status", rename_all = "camelCase")]
235-enum Hydration<T: Send + Sync + Type + ToJSON + ParseFromJSON + IsObjectType> {
236- Error(ProxyHydrationError),
237- Pending(ProxyHydrationPending),
238- Found(T),
239-}
240-241-#[derive(Object)]
242-#[oai(example = true)]
243-struct ProxyHydrateResponseObject {
244- /// The original upstream response content
245- output: serde_json::Value,
246- /// Any hydrated records
247- records: HashMap<String, Hydration<ProxyHydrationRecordFound>>,
248- /// Any hydrated identifiers
249- ///
250- /// TODO: "identifiers" feels wrong as the name, probably "identities"?
251- identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>,
252- /// Any hydrated blob CDN urls
253- blobs: HashMap<String, Hydration<ProxyHydrationBlobFound>>,
254-}
255-impl Example for ProxyHydrateResponseObject {
256- fn example() -> Self {
257- Self {
258- output: serde_json::json!({}),
259- records: HashMap::from([
260- ("asdf".into(), Hydration::Pending(ProxyHydrationPending { url: "todo".into() })),
261- ]),
262- identifiers: HashMap::new(),
263- blobs: HashMap::new(),
264- }
265- }
266-}
267-268-#[derive(ApiResponse)]
269-#[oai(bad_request_handler = "bad_request_handler_proxy_query")]
270-enum ProxyHydrateResponse {
271- #[oai(status = 200)]
272- Ok(Json<ProxyHydrateResponseObject>),
273- #[oai(status = 400)]
274- BadRequest(XrpcError)
275-}
276-277-#[derive(Object)]
278-pub struct HydrationSource {
279- /// Record Path syntax for locating fields
280- pub path: String,
281- /// What to expect at the path: 'strong-ref', 'at-uri', 'at-uri-parts', 'did', 'handle', or 'at-identifier'.
282- ///
283- /// - `strong-ref`: object in the shape of `com.atproto.repo.strongRef` with `uri` and `cid` keys.
284- /// - `at-uri`: string, must have all segments present (identifier, collection, rkey)
285- /// - `at-uri-parts`: object with keys (`repo` or `did`), `collection`, `rkey`, and optional `cid`. Other keys may be present and will be ignored.
286- /// - `did`: string, `did` format
287- /// - `handle`: string, `handle` format
288- /// - `at-identifier`: string, `did` or `handle` format
289- pub shape: String,
290-}
291-292-#[derive(Object)]
293-#[oai(example = true)]
294-struct ProxyQueryPayload {
295- /// The NSID of the XRPC you wish to forward
296- xrpc: String,
297- /// The destination service the request will be forwarded to
298- atproto_proxy: String,
299- /// The `params` for the destination service XRPC endpoint
300- ///
301- /// Currently this will be passed along unchecked, but a future version of
302- /// slingshot may attempt to do lexicon resolution to validate `params`
303- /// based on the upstream service
304- params: Option<serde_json::Value>,
305- /// Paths within the response to look for at-uris that can be hydrated
306- hydration_sources: Vec<HydrationSource>,
307- // todo: deadline thing
308-309-}
310-impl Example for ProxyQueryPayload {
311- fn example() -> Self {
312- Self {
313- xrpc: "app.bsky.feed.getFeedSkeleton".to_string(),
314- atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(),
315- params: Some(serde_json::json!({
316- "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto",
317- })),
318- hydration_sources: vec![
319- HydrationSource {
320- path: "feed[].post".to_string(),
321- shape: "at-uri".to_string(),
322- }
323- ],
324- }
325- }
326-}
327-328-#[derive(Object)]
329#[oai(example = true)]
330struct FoundDidResponseObject {
331 /// the DID, bi-directionally verified if using Slingshot
···356struct Xrpc {
357 cache: HybridCache<String, CachedRecord>,
358 identity: Identity,
359- proxy: Arc<Proxy>,
360 repo: Arc<Repo>,
361}
362···611 #[oai(example = "example_handle")]
612 Query(identifier): Query<String>,
613 ) -> ResolveMiniIDResponse {
614- Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await
615- }
616-617- async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniIDResponse {
618 let invalid = |reason: &'static str| {
619 ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
620 };
621622 let mut unverified_handle = None;
623- let did = match Did::new(identifier.to_string()) {
624 Ok(did) => did,
625 Err(_) => {
626 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
627 return invalid("Identifier was not a valid DID or handle");
628 };
629630- match identity.handle_to_did(alleged_handle.clone()).await {
631 Ok(res) => {
632 if let Some(did) = res {
633 // we did it joe
···645 }
646 }
647 };
648- let Ok(partial_doc) = identity.did_to_partial_mini_doc(&did).await else {
649 return invalid("Failed to get DID doc");
650 };
651 let Some(partial_doc) = partial_doc else {
···665 "handle.invalid".to_string()
666 }
667 } else {
668- let Ok(handle_did) = identity
0669 .handle_to_did(partial_doc.unverified_handle.clone())
670 .await
671 else {
···689 }))
690 }
691692- /// com.bad-example.proxy.hydrateQueryResponse
693- ///
694- /// > [!important]
695- /// > Unstable! This endpoint is experimental and may change.
696- ///
697- /// Fetch + include records referenced from an upstream xrpc query response
698- #[oai(
699- path = "/com.bad-example.proxy.hydrateQueryResponse",
700- method = "post",
701- tag = "ApiTags::Custom"
702- )]
703- async fn proxy_hydrate_query(
704- &self,
705- Json(payload): Json<ProxyQueryPayload>,
706- ) -> ProxyHydrateResponse {
707- // TODO: the Accept request header, if present, gotta be json
708- // TODO: find any Authorization header and verify it. TBD about `aud`.
709-710- let params = if let Some(p) = payload.params {
711- let serde_json::Value::Object(map) = p else {
712- panic!("params have to be an object");
713- };
714- Some(map)
715- } else { None };
716-717- match self.proxy.proxy(
718- payload.xrpc,
719- payload.atproto_proxy,
720- params,
721- ).await {
722- Ok(skeleton) => {
723- let links = match extract_links(payload.hydration_sources, &skeleton) {
724- Ok(l) => l,
725- Err(e) => {
726- log::warn!("problem extracting: {e:?}");
727- return ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry, error extracting"))
728- }
729- };
730- let mut records = HashMap::new();
731- let mut identifiers = HashMap::new();
732- let mut blobs = HashMap::new();
733-734- enum GetThing {
735- Record(String, Hydration<ProxyHydrationRecordFound>),
736- Identifier(String, Hydration<ProxyHydrationIdentifierFound>),
737- Blob(String, Hydration<ProxyHydrationBlobFound>),
738- }
739-740- let (tx, mut rx) = mpsc::channel(1);
741-742- for link in links {
743- match link {
744- MatchedRef::AtUri { uri, cid } => {
745- if records.contains_key(&uri) {
746- log::warn!("skipping duplicate record without checking cid");
747- continue;
748- }
749- let mut u = url::Url::parse("https://example.com").unwrap();
750- u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo
751- records.insert(uri.clone(), Hydration::Pending(ProxyHydrationPending {
752- url: format!("/xrpc/blue.microcosm.repo.getRecordByUri?{}", u.query().unwrap()), // TODO better; with cid, etc.
753- }));
754- let tx = tx.clone();
755- let identity = self.identity.clone();
756- let repo = self.repo.clone();
757- tokio::task::spawn(async move {
758- let rest = uri.strip_prefix("at://").unwrap();
759- let (identifier, rest) = rest.split_once('/').unwrap();
760- let (collection, rkey) = rest.split_once('/').unwrap();
761-762- let did = if identifier.starts_with("did:") {
763- Did::new(identifier.to_string()).unwrap()
764- } else {
765- let handle = Handle::new(identifier.to_string()).unwrap();
766- identity.handle_to_did(handle).await.unwrap().unwrap()
767- };
768-769- let res = match repo.get_record(
770- &did,
771- &Nsid::new(collection.to_string()).unwrap(),
772- &RecordKey::new(rkey.to_string()).unwrap(),
773- &cid.as_ref().map(|s| Cid::from_str(s).unwrap()),
774- ).await {
775- Ok(CachedRecord::Deleted) =>
776- Hydration::Error(ProxyHydrationError {
777- reason: "record deleted".to_string(),
778- }),
779- Ok(CachedRecord::Found(RawRecord { cid: found_cid, record })) => {
780- if let Some(c) = cid && found_cid.as_ref().to_string() != c {
781- log::warn!("ignoring cid mismatch");
782- }
783- let value = serde_json::from_str(&record).unwrap();
784- Hydration::Found(ProxyHydrationRecordFound {
785- record: value,
786- })
787- }
788- Err(e) => {
789- log::warn!("finally oop {e:?}");
790- Hydration::Error(ProxyHydrationError {
791- reason: "failed to fetch record".to_string(),
792- })
793- }
794- };
795- tx.send(GetThing::Record(uri, res)).await
796- });
797- }
798- MatchedRef::Identifier(id) => {
799- if identifiers.contains_key(&id) {
800- continue;
801- }
802- let mut u = url::Url::parse("https://example.com").unwrap();
803- u.query_pairs_mut().append_pair("identifier", &id);
804- identifiers.insert(id.clone(), Hydration::Pending(ProxyHydrationPending {
805- url: format!("/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", u.query().unwrap()), // gross
806- }));
807- let tx = tx.clone();
808- let identity = self.identity.clone();
809- tokio::task::spawn(async move {
810- let res = match Self::resolve_mini_doc_impl(&id, identity).await {
811- ResolveMiniIDResponse::Ok(Json(mini_doc)) => Hydration::Found(ProxyHydrationIdentifierFound {
812- mini_doc
813- }),
814- ResolveMiniIDResponse::BadRequest(e) => {
815- log::warn!("minidoc fail: {:?}", e.0);
816- Hydration::Error(ProxyHydrationError {
817- reason: "failed to resolve mini doc".to_string(),
818- })
819- }
820- };
821- tx.send(GetThing::Identifier(id, res)).await
822- });
823- }
824- MatchedRef::Blob { link, mime, size: _ } => {
825- if blobs.contains_key(&link) {
826- continue;
827- }
828- if mime != "image/jpeg" {
829- Hydration::<ProxyHydrationBlobFound>::Error(ProxyHydrationError {
830- reason: "only image/jpeg supported for now".to_string(),
831- });
832- }
833- todo!("oops we need to know the account too")
834- }
835- }
836- }
837- // so the channel can close when all are completed
838- // (we shoudl be doing a timeout...)
839- drop(tx);
840-841- while let Some(hydration) = rx.recv().await {
842- match hydration {
843- GetThing::Record(uri, h) => { records.insert(uri, h); }
844- GetThing::Identifier(uri, md) => { identifiers.insert(uri, md); }
845- GetThing::Blob(cid, asdf) => { blobs.insert(cid, asdf); }
846- };
847- }
848-849- ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {
850- output: skeleton,
851- records,
852- identifiers,
853- blobs,
854- }))
855- }
856- Err(e) => {
857- log::warn!("oh no: {e:?}");
858- ProxyHydrateResponse::BadRequest(xrpc_error("oop", "sorry"))
859- }
860- }
861-862- }
863-864 async fn get_record_impl(
865 &self,
866 repo: String,
···1059 cache: HybridCache<String, CachedRecord>,
1060 identity: Identity,
1061 repo: Repo,
1062- proxy: Proxy,
1063 acme_domain: Option<String>,
1064 acme_contact: Option<String>,
1065 acme_cache_path: Option<PathBuf>,
···1068 bind: std::net::SocketAddr,
1069) -> Result<(), ServerError> {
1070 let repo = Arc::new(repo);
1071- let proxy = Arc::new(proxy);
1072 let api_service = OpenApiService::new(
1073 Xrpc {
1074 cache,
1075 identity,
1076- proxy,
1077 repo,
1078 },
1079 "Slingshot",
···1137 .with(
1138 Cors::new()
1139 .allow_origin_regex("*")
1140- .allow_methods([Method::GET, Method::POST])
1141 .allow_credentials(false),
1142 )
1143 .with(CatchPanic::new())
···1use crate::{
2+ CachedRecord, ErrorResponseObject, Identity, Repo,
3 error::{RecordError, ServerError},
004};
5use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey};
6use foyer::HybridCache;
7use links::at_uri::parse_at_uri as normalize_at_uri;
8use serde::Serialize;
9+use std::path::PathBuf;
10+use std::str::FromStr;
11+use std::sync::Arc;
12+use std::time::Instant;
13use tokio_util::sync::CancellationToken;
1415use poem::{
···24};
25use poem_openapi::{
26 ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
027 param::Query, payload::Json, types::Example,
28};
29···54 "zQ3shpq1g134o7HGDb86CtQFxnHqzx5pZWknrVX2Waum3fF6j".to_string()
55}
5657+#[derive(Object)]
58#[oai(example = true)]
59struct XrpcErrorResponseObject {
60 /// Should correspond an error `name` in the lexicon errors array
···8788fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse {
89 ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject {
000000090 error: "InvalidRequest".to_string(),
91 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
92 }))
···191}
192193#[derive(Object)]
0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000194#[oai(example = true)]
195struct FoundDidResponseObject {
196 /// the DID, bi-directionally verified if using Slingshot
···221struct Xrpc {
222 cache: HybridCache<String, CachedRecord>,
223 identity: Identity,
0224 repo: Arc<Repo>,
225}
226···475 #[oai(example = "example_handle")]
476 Query(identifier): Query<String>,
477 ) -> ResolveMiniIDResponse {
0000478 let invalid = |reason: &'static str| {
479 ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason))
480 };
481482 let mut unverified_handle = None;
483+ let did = match Did::new(identifier.clone()) {
484 Ok(did) => did,
485 Err(_) => {
486 let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
487 return invalid("Identifier was not a valid DID or handle");
488 };
489490+ match self.identity.handle_to_did(alleged_handle.clone()).await {
491 Ok(res) => {
492 if let Some(did) = res {
493 // we did it joe
···505 }
506 }
507 };
508+ let Ok(partial_doc) = self.identity.did_to_partial_mini_doc(&did).await else {
509 return invalid("Failed to get DID doc");
510 };
511 let Some(partial_doc) = partial_doc else {
···525 "handle.invalid".to_string()
526 }
527 } else {
528+ let Ok(handle_did) = self
529+ .identity
530 .handle_to_did(partial_doc.unverified_handle.clone())
531 .await
532 else {
···550 }))
551 }
5520000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000553 async fn get_record_impl(
554 &self,
555 repo: String,
···748 cache: HybridCache<String, CachedRecord>,
749 identity: Identity,
750 repo: Repo,
0751 acme_domain: Option<String>,
752 acme_contact: Option<String>,
753 acme_cache_path: Option<PathBuf>,
···756 bind: std::net::SocketAddr,
757) -> Result<(), ServerError> {
758 let repo = Arc::new(repo);
0759 let api_service = OpenApiService::new(
760 Xrpc {
761 cache,
762 identity,
0763 repo,
764 },
765 "Slingshot",
···823 .with(
824 Cors::new()
825 .allow_origin_regex("*")
826+ .allow_methods([Method::GET])
827 .allow_credentials(false),
828 )
829 .with(CatchPanic::new())