Alternative ATProto PDS implementation
1/// Service proxy.
2///
3/// Reference: <https://atproto.com/specs/xrpc#service-proxying>
4use anyhow::{Context as _, anyhow};
5use atrium_api::types::string::Did;
6use axum::{
7 body::Body,
8 extract::{Request, State},
9 http::{self, HeaderMap, Response, StatusCode, Uri},
10};
11use rand::Rng as _;
12use std::str::FromStr as _;
13
14use super::{
15 auth::AuthenticatedUser,
16 serve::{Client, Error, Result, SigningKey},
17};
18
19pub(super) async fn service_proxy(
20 uri: Uri,
21 user: AuthenticatedUser,
22 State(skey): State<SigningKey>,
23 State(client): State<reqwest::Client>,
24 headers: HeaderMap,
25 request: Request<Body>,
26) -> Result<Response<Body>> {
27 let url_path = uri.path_and_query().context("invalid service proxy url")?;
28 let lxm = url_path
29 .path()
30 .strip_prefix("/")
31 .with_context(|| format!("invalid service proxy url prefix: {}", url_path.path()))?;
32
33 let user_did = user.did();
34 let (did, id) = match headers.get("atproto-proxy") {
35 Some(val) => {
36 let val =
37 std::str::from_utf8(val.as_bytes()).context("proxy header not valid utf-8")?;
38
39 let (did, id) = val.split_once('#').context("invalid proxy header")?;
40
41 let did =
42 Did::from_str(did).map_err(|e| anyhow!("atproto proxy not a valid DID: {e}"))?;
43
44 (did, format!("#{id}"))
45 }
46 // HACK: Assume the bluesky appview by default.
47 None => (
48 Did::new("did:web:api.bsky.app".to_owned())
49 .expect("service proxy should be a valid DID"),
50 "#bsky_appview".to_owned(),
51 ),
52 };
53
54 let did_doc = super::did::resolve(&Client::new(client.clone(), []), did.clone())
55 .await
56 .with_context(|| format!("failed to resolve did document {}", did.as_str()))?;
57
58 let Some(service) = did_doc.service.iter().find(|s| s.id == id) else {
59 return Err(Error::with_status(
60 StatusCode::BAD_REQUEST,
61 anyhow!("could not find resolve service #{id}"),
62 ));
63 };
64
65 let target_url: url::Url = service
66 .service_endpoint
67 .join(&format!("/xrpc{url_path}"))
68 .context("failed to construct target url")?;
69
70 let exp = (chrono::Utc::now().checked_add_signed(chrono::Duration::minutes(1)))
71 .context("should be valid expiration datetime")?
72 .timestamp();
73 let jti = rand::thread_rng()
74 .sample_iter(rand::distributions::Alphanumeric)
75 .take(10)
76 .map(char::from)
77 .collect::<String>();
78
79 // Mint a bearer token by signing a JSON web token.
80 // https://github.com/DavidBuchanan314/millipds/blob/5c7529a739d394e223c0347764f1cf4e8fd69f94/src/millipds/appview_proxy.py#L47-L59
81 let token = super::auth::sign(
82 &skey,
83 "JWT",
84 &serde_json::json!({
85 "iss": user_did.as_str(),
86 "aud": did.as_str(),
87 "lxm": lxm,
88 "exp": exp,
89 "jti": jti,
90 }),
91 )
92 .context("failed to sign jwt")?;
93
94 let mut h = HeaderMap::new();
95 if let Some(hdr) = request.headers().get("atproto-accept-labelers") {
96 drop(h.insert("atproto-accept-labelers", hdr.clone()));
97 }
98 if let Some(hdr) = request.headers().get(http::header::CONTENT_TYPE) {
99 drop(h.insert(http::header::CONTENT_TYPE, hdr.clone()));
100 }
101
102 let r = client
103 .request(request.method().clone(), target_url)
104 .headers(h)
105 .header(http::header::AUTHORIZATION, format!("Bearer {token}"))
106 .body(reqwest::Body::wrap_stream(
107 request.into_body().into_data_stream(),
108 ))
109 .send()
110 .await
111 .context("failed to send request")?;
112
113 let mut resp = Response::builder().status(r.status());
114 if let Some(hdrs) = resp.headers_mut() {
115 *hdrs = r.headers().clone();
116 }
117
118 let resp = resp
119 .body(Body::from_stream(r.bytes_stream()))
120 .context("failed to construct response")?;
121
122 Ok(resp)
123}