Alternative ATProto PDS implementation

reorganize service_proxy

Changed files
+145 -109
src
+2 -109
src/main.rs
··· 13 13 mod oauth; 14 14 mod plc; 15 15 mod schema; 16 + mod service_proxy; 16 17 #[cfg(test)] 17 18 mod tests; 18 19 ··· 49 50 use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 50 51 use rand::Rng as _; 51 52 use serde::{Deserialize, Serialize}; 53 + use service_proxy::service_proxy; 52 54 use std::{ 53 55 net::{IpAddr, Ipv4Addr, SocketAddr}, 54 56 path::PathBuf, ··· 191 193 Code: https://github.com/DrChat/bluepds 192 194 Protocol: https://atproto.com 193 195 " 194 - } 195 - 196 - /// Service proxy. 197 - /// 198 - /// Reference: <https://atproto.com/specs/xrpc#service-proxying> 199 - async fn service_proxy( 200 - uri: Uri, 201 - user: AuthenticatedUser, 202 - State(skey): State<SigningKey>, 203 - State(client): State<reqwest::Client>, 204 - headers: HeaderMap, 205 - request: Request<Body>, 206 - ) -> Result<Response<Body>> { 207 - let url_path = uri.path_and_query().context("invalid service proxy url")?; 208 - let lxm = url_path 209 - .path() 210 - .strip_prefix("/") 211 - .with_context(|| format!("invalid service proxy url prefix: {}", url_path.path()))?; 212 - 213 - let user_did = user.did(); 214 - let (did, id) = match headers.get("atproto-proxy") { 215 - Some(val) => { 216 - let val = 217 - std::str::from_utf8(val.as_bytes()).context("proxy header not valid utf-8")?; 218 - 219 - let (did, id) = val.split_once('#').context("invalid proxy header")?; 220 - 221 - let did = 222 - Did::from_str(did).map_err(|e| anyhow!("atproto proxy not a valid DID: {e}"))?; 223 - 224 - (did, format!("#{id}")) 225 - } 226 - // HACK: Assume the bluesky appview by default. 227 - None => ( 228 - Did::new("did:web:api.bsky.app".to_owned()) 229 - .expect("service proxy should be a valid DID"), 230 - "#bsky_appview".to_owned(), 231 - ), 232 - }; 233 - 234 - let did_doc = did::resolve(&Client::new(client.clone(), []), did.clone()) 235 - .await 236 - .with_context(|| format!("failed to resolve did document {}", did.as_str()))?; 237 - 238 - let Some(service) = did_doc.service.iter().find(|s| s.id == id) else { 239 - return Err(Error::with_status( 240 - StatusCode::BAD_REQUEST, 241 - anyhow!("could not find resolve service #{id}"), 242 - )); 243 - }; 244 - 245 - let target_url: url::Url = service 246 - .service_endpoint 247 - .join(&format!("/xrpc{url_path}")) 248 - .context("failed to construct target url")?; 249 - 250 - let exp = (chrono::Utc::now().checked_add_signed(chrono::Duration::minutes(1))) 251 - .context("should be valid expiration datetime")? 252 - .timestamp(); 253 - let jti = rand::thread_rng() 254 - .sample_iter(rand::distributions::Alphanumeric) 255 - .take(10) 256 - .map(char::from) 257 - .collect::<String>(); 258 - 259 - // Mint a bearer token by signing a JSON web token. 260 - // https://github.com/DavidBuchanan314/millipds/blob/5c7529a739d394e223c0347764f1cf4e8fd69f94/src/millipds/appview_proxy.py#L47-L59 261 - let token = auth::sign( 262 - &skey, 263 - "JWT", 264 - &serde_json::json!({ 265 - "iss": user_did.as_str(), 266 - "aud": did.as_str(), 267 - "lxm": lxm, 268 - "exp": exp, 269 - "jti": jti, 270 - }), 271 - ) 272 - .context("failed to sign jwt")?; 273 - 274 - let mut h = HeaderMap::new(); 275 - if let Some(hdr) = request.headers().get("atproto-accept-labelers") { 276 - drop(h.insert("atproto-accept-labelers", hdr.clone())); 277 - } 278 - if let Some(hdr) = request.headers().get(http::header::CONTENT_TYPE) { 279 - drop(h.insert(http::header::CONTENT_TYPE, hdr.clone())); 280 - } 281 - 282 - let r = client 283 - .request(request.method().clone(), target_url) 284 - .headers(h) 285 - .header(http::header::AUTHORIZATION, format!("Bearer {token}")) 286 - .body(reqwest::Body::wrap_stream( 287 - request.into_body().into_data_stream(), 288 - )) 289 - .send() 290 - .await 291 - .context("failed to send request")?; 292 - 293 - let mut resp = Response::builder().status(r.status()); 294 - if let Some(hdrs) = resp.headers_mut() { 295 - *hdrs = r.headers().clone(); 296 - } 297 - 298 - let resp = resp 299 - .body(Body::from_stream(r.bytes_stream())) 300 - .context("failed to construct response")?; 301 - 302 - Ok(resp) 303 196 } 304 197 305 198 /// The main application entry point.
+143
src/service_proxy.rs
··· 1 + /// Service proxy. 2 + /// 3 + /// Reference: <https://atproto.com/specs/xrpc#service-proxying> 4 + use anyhow::{Context as _, anyhow}; 5 + use atrium_api::types::string::Did; 6 + use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; 7 + use axum::{ 8 + Router, 9 + body::Body, 10 + extract::{FromRef, Request, State}, 11 + http::{self, HeaderMap, Response, StatusCode, Uri}, 12 + response::IntoResponse, 13 + routing::get, 14 + }; 15 + use azure_core::credentials::TokenCredential; 16 + use clap::Parser; 17 + use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter}; 18 + use deadpool_diesel::sqlite::Pool; 19 + use diesel::prelude::*; 20 + use diesel_migrations::{EmbeddedMigrations, embed_migrations}; 21 + use figment::{Figment, providers::Format as _}; 22 + use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 23 + use rand::Rng as _; 24 + use serde::{Deserialize, Serialize}; 25 + use std::{ 26 + net::{IpAddr, Ipv4Addr, SocketAddr}, 27 + path::PathBuf, 28 + str::FromStr as _, 29 + sync::Arc, 30 + }; 31 + use tokio::net::TcpListener; 32 + use tower_http::{cors::CorsLayer, trace::TraceLayer}; 33 + use tracing::{info, warn}; 34 + use uuid::Uuid; 35 + 36 + use super::{Client, Error, Result}; 37 + use crate::{AuthenticatedUser, SigningKey}; 38 + 39 + pub(super) async fn service_proxy( 40 + uri: Uri, 41 + user: AuthenticatedUser, 42 + State(skey): State<SigningKey>, 43 + State(client): State<reqwest::Client>, 44 + headers: HeaderMap, 45 + request: Request<Body>, 46 + ) -> Result<Response<Body>> { 47 + let url_path = uri.path_and_query().context("invalid service proxy url")?; 48 + let lxm = url_path 49 + .path() 50 + .strip_prefix("/") 51 + .with_context(|| format!("invalid service proxy url prefix: {}", url_path.path()))?; 52 + 53 + let user_did = user.did(); 54 + let (did, id) = match headers.get("atproto-proxy") { 55 + Some(val) => { 56 + let val = 57 + std::str::from_utf8(val.as_bytes()).context("proxy header not valid utf-8")?; 58 + 59 + let (did, id) = val.split_once('#').context("invalid proxy header")?; 60 + 61 + let did = 62 + Did::from_str(did).map_err(|e| anyhow!("atproto proxy not a valid DID: {e}"))?; 63 + 64 + (did, format!("#{id}")) 65 + } 66 + // HACK: Assume the bluesky appview by default. 67 + None => ( 68 + Did::new("did:web:api.bsky.app".to_owned()) 69 + .expect("service proxy should be a valid DID"), 70 + "#bsky_appview".to_owned(), 71 + ), 72 + }; 73 + 74 + let did_doc = super::did::resolve(&Client::new(client.clone(), []), did.clone()) 75 + .await 76 + .with_context(|| format!("failed to resolve did document {}", did.as_str()))?; 77 + 78 + let Some(service) = did_doc.service.iter().find(|s| s.id == id) else { 79 + return Err(Error::with_status( 80 + StatusCode::BAD_REQUEST, 81 + anyhow!("could not find resolve service #{id}"), 82 + )); 83 + }; 84 + 85 + let target_url: url::Url = service 86 + .service_endpoint 87 + .join(&format!("/xrpc{url_path}")) 88 + .context("failed to construct target url")?; 89 + 90 + let exp = (chrono::Utc::now().checked_add_signed(chrono::Duration::minutes(1))) 91 + .context("should be valid expiration datetime")? 92 + .timestamp(); 93 + let jti = rand::thread_rng() 94 + .sample_iter(rand::distributions::Alphanumeric) 95 + .take(10) 96 + .map(char::from) 97 + .collect::<String>(); 98 + 99 + // Mint a bearer token by signing a JSON web token. 100 + // https://github.com/DavidBuchanan314/millipds/blob/5c7529a739d394e223c0347764f1cf4e8fd69f94/src/millipds/appview_proxy.py#L47-L59 101 + let token = super::auth::sign( 102 + &skey, 103 + "JWT", 104 + &serde_json::json!({ 105 + "iss": user_did.as_str(), 106 + "aud": did.as_str(), 107 + "lxm": lxm, 108 + "exp": exp, 109 + "jti": jti, 110 + }), 111 + ) 112 + .context("failed to sign jwt")?; 113 + 114 + let mut h = HeaderMap::new(); 115 + if let Some(hdr) = request.headers().get("atproto-accept-labelers") { 116 + drop(h.insert("atproto-accept-labelers", hdr.clone())); 117 + } 118 + if let Some(hdr) = request.headers().get(http::header::CONTENT_TYPE) { 119 + drop(h.insert(http::header::CONTENT_TYPE, hdr.clone())); 120 + } 121 + 122 + let r = client 123 + .request(request.method().clone(), target_url) 124 + .headers(h) 125 + .header(http::header::AUTHORIZATION, format!("Bearer {token}")) 126 + .body(reqwest::Body::wrap_stream( 127 + request.into_body().into_data_stream(), 128 + )) 129 + .send() 130 + .await 131 + .context("failed to send request")?; 132 + 133 + let mut resp = Response::builder().status(r.status()); 134 + if let Some(hdrs) = resp.headers_mut() { 135 + *hdrs = r.headers().clone(); 136 + } 137 + 138 + let resp = resp 139 + .body(Body::from_stream(r.bytes_stream())) 140 + .context("failed to construct response")?; 141 + 142 + Ok(resp) 143 + }