···1-use serde_json::value::RawValue;
2-use crate::CachedRecord;
3use foyer::HybridCache;
4-use crate::error::ServerError;
5-use dropshot::{
6- ApiDescription, Body, ConfigDropshot, ConfigLogging,
7- ConfigLoggingLevel, HttpError, HttpResponse, Query, RequestContext,
8- ServerBuilder, ServerContext, endpoint,
9- ClientErrorStatusCode,
10-};
11-use http::{
12- Response, StatusCode,
13- header::{ORIGIN, USER_AGENT},
14-};
15-use metrics::{counter, histogram};
16-use std::sync::Arc;
17-18-use schemars::JsonSchema;
19-use serde::{Deserialize, Serialize};
20-use tokio::time::Instant;
21use tokio_util::sync::CancellationToken;
2223-const INDEX_HTML: &str = include_str!("../static/index.html");
24-const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
25-26-pub async fn serve(
27- cache: HybridCache<String, CachedRecord>,
28- shutdown: CancellationToken,
29-) -> Result<(), ServerError> {
30- let config_logging = ConfigLogging::StderrTerminal {
31- level: ConfigLoggingLevel::Info,
32- };
33-34- let log = config_logging
35- .to_logger("example-basic")
36- .map_err(ServerError::ConfigLogError)?;
37-38- let mut api = ApiDescription::new();
39- api.register(index).unwrap();
40- api.register(favicon).unwrap();
41- api.register(openapi).unwrap();
42- api.register(get_record).unwrap();
43-44- // TODO: put spec in a once cell / lazy lock thing?
45- let spec = Arc::new(
46- api.openapi(
47- "Slingshot",
48- env!("CARGO_PKG_VERSION")
49- .parse()
50- .inspect_err(|e| {
51- eprintln!("failed to parse cargo package version for openapi: {e:?}")
52- })
53- .unwrap_or(semver::Version::new(0, 0, 1)),
54- )
55- .description("A fast edge cache for getRecord")
56- .contact_name("part of @microcosm.blue")
57- .contact_url("https://microcosm.blue")
58- .json()
59- .map_err(ServerError::OpenApiJsonFail)?,
60- );
6162- let sub_shutdown = shutdown.clone();
63- let ctx = Context {
64- cache,
65- spec,
66- shutdown: sub_shutdown,
67- };
68-69- let server = ServerBuilder::new(api, ctx, log)
70- .config(ConfigDropshot {
71- bind_address: "0.0.0.0:9996".parse().unwrap(),
72- ..Default::default()
73- })
74- .start()?;
75-76- tokio::select! {
77- s = server.wait_for_shutdown() => {
78- s.map_err(ServerError::ServerExited)?;
79- log::info!("server shut down normally.");
80- },
81- _ = shutdown.cancelled() => {
82- log::info!("shutting down: closing server");
83- server.close().await.map_err(ServerError::BadClose)?;
84- },
85- }
86- Ok(())
87}
8889-#[derive(Debug, Clone)]
90-struct Context {
91- pub cache: HybridCache<String, CachedRecord>,
92- pub spec: Arc<serde_json::Value>,
93- pub shutdown: CancellationToken,
0094}
95-96-async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError>
97-where
98- R: HttpResponse,
99- H: Future<Output = Result<R, HttpError>>,
100- T: ServerContext,
101-{
102- let start = Instant::now();
103- let result = handler.await;
104- let latency = start.elapsed();
105- let status_code = match &result {
106- Ok(response) => response.status_code(),
107- Err(e) => e.status_code.as_status(),
108 }
109- .as_str() // just the number (.to_string()'s Display does eg `200 OK`)
110- .to_string();
111- let endpoint = ctx.endpoint.operation_id.clone();
112- let headers = ctx.request.headers();
113- let origin = headers
114- .get(ORIGIN)
115- .and_then(|v| v.to_str().ok())
116- .unwrap_or("")
117- .to_string();
118- let ua = headers
119- .get(USER_AGENT)
120- .and_then(|v| v.to_str().ok())
121- .map(|ua| {
122- if ua.starts_with("Mozilla/5.0 ") {
123- "browser"
124- } else {
125- ua
126- }
127- })
128- .unwrap_or("")
129- .to_string();
130- counter!("server_requests_total",
131- "endpoint" => endpoint.clone(),
132- "origin" => origin,
133- "ua" => ua,
134- "status_code" => status_code,
135- )
136- .increment(1);
137- histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64);
138- result
139}
140141-use dropshot::{HttpResponseHeaders, HttpResponseOk};
142143-pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
144-145-/// Helper for constructing Ok responses: return OkCors(T).into()
146-/// (not happy with this yet)
147-pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
148-149-impl<T> From<OkCors<T>> for OkCorsResponse<T>
150-where
151- T: Serialize + JsonSchema + Send + Sync,
152-{
153- fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
154- let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
155- res.headers_mut()
156- .insert("access-control-allow-origin", "*".parse().unwrap());
157- Ok(res)
158- }
159}
160161-pub fn cors_err(e: HttpError) -> HttpError {
162- e.with_header("access-control-allow-origin", "*").unwrap()
0000000000163}
164-165-166-// TODO: cors for HttpError
167-168-/// Serve index page as html
169-#[endpoint {
170- method = GET,
171- path = "/",
172- /*
173- * not useful to have this in openapi
174- */
175- unpublished = true,
176-}]
177-async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
178- instrument_handler(&ctx, async {
179- Ok(Response::builder()
180- .status(StatusCode::OK)
181- .header(http::header::CONTENT_TYPE, "text/html")
182- .body(INDEX_HTML.into())?)
183- })
184- .await
185}
186187-/// Serve index page as html
188-#[endpoint {
189- method = GET,
190- path = "/favicon.ico",
191- /*
192- * not useful to have this in openapi
193- */
194- unpublished = true,
195-}]
196-async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
197- instrument_handler(&ctx, async {
198- Ok(Response::builder()
199- .status(StatusCode::OK)
200- .header(http::header::CONTENT_TYPE, "image/x-icon")
201- .body(FAVICON.to_vec().into())?)
202- })
203- .await
204}
205206-/// Meta: get the openapi spec for this api
207-#[endpoint {
208- method = GET,
209- path = "/openapi",
210- /*
211- * not useful to have this in openapi
212- */
213- unpublished = true,
214-}]
215-async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
216- instrument_handler(&ctx, async {
217- let spec = (*ctx.context().spec).clone();
218- OkCors(spec).into()
219- })
220- .await
221}
222223-224-#[derive(Debug, Deserialize, JsonSchema)]
225-struct GetRecordQuery {
226- /// The DID of the repo
227- ///
228- /// NOTE: handles should be accepted here but this is still TODO in slingshot
229- pub repo: String,
230- /// The NSID of the record collection
231- pub collection: String,
232- /// The Record key
233- pub rkey: String,
234- /// Optional: the CID of the version of the record.
235 ///
236- /// If not specified, then return the most recent version.
237 ///
238- /// If specified and a newer version of the record exists, returns 404 not
239- /// found. That is: slingshot only retains the most recent version of a
240- /// record.
241- #[serde(default)]
242- pub cid: Option<String>,
243-}
244-245-#[derive(Debug, Serialize, JsonSchema)]
246-struct GetRecordResponse {
247- pub uri: String,
248- pub cid: String,
249- pub value: Box<RawValue>,
250-}
251-252-/// com.atproto.repo.getRecord
253-///
254-/// Get a single record from a repository. Does not require auth.
255-///
256-/// See https://docs.bsky.app/docs/api/com-atproto-repo-get-record for the
257-/// canonical XRPC documentation that this endpoint aims to be compatible with.
258-#[endpoint {
259- method = GET,
260- path = "/xrpc/com.atproto.repo.getRecord",
261-}]
262-async fn get_record(
263- ctx: RequestContext<Context>,
264- query: Query<GetRecordQuery>,
265-) -> OkCorsResponse<GetRecordResponse> {
266-267- let Context { cache, .. } = ctx.context();
268- let GetRecordQuery { repo, collection, rkey, cid } = query.into_inner();
269270- // TODO: yeah yeah
271- let at_uri = format!(
272- "at://{}/{}/{}",
273- &*repo, &*collection, &*rkey
274- );
275-276- instrument_handler(&ctx, async {
277- let entry = cache
278 .fetch(at_uri.clone(), || async move {
279- Err(foyer::Error::Other(Box::new(ServerError::OhNo("booo".to_string()))))
280 })
281 .await
282 .unwrap();
00283284 match *entry {
285 CachedRecord::Found(ref raw) => {
286 let (found_cid, raw_value) = raw.into();
287 let found_cid = found_cid.as_ref().to_string();
288- if cid.map(|c| c != found_cid).unwrap_or(false) {
289- Err(HttpError::for_not_found(None, "CID mismatch".to_string()))
290- .map_err(cors_err)?;
00291 }
292- OkCors(GetRecordResponse {
00293 uri: at_uri,
294- cid: found_cid,
295- value: raw_value,
296- }).into()
297 },
298 CachedRecord::Deleted => {
299- Err(HttpError::for_client_error_with_status(
300- Some("Gone".to_string()),
301- ClientErrorStatusCode::GONE,
302- )).map_err(cors_err)
303 }
304 }
305- })
306- .await
307000000000000000000308}
···001use foyer::HybridCache;
2+use crate::{error::ServerError, CachedRecord};
00000000000000003use tokio_util::sync::CancellationToken;
45+use poem::{listener::TcpListener, Route, Server};
6+use poem_openapi::{
7+ payload::Json,
8+ param::Query,
9+ OpenApi, OpenApiService,
10+ ApiResponse,
11+ Object,
12+ types::Example,
13+};
000000000000000000000000000001415+fn example_did() -> String {
16+ "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string()
17+}
18+fn example_collection() -> String {
19+ "app.bsky.feed.like".to_string()
20+}
21+fn example_rkey() -> String {
22+ "3lv4ouczo2b2a".to_string()
0000000000000000023}
2425+#[derive(Object)]
26+#[oai(example = true)]
27+struct XrpcErrorResponseObject {
28+ /// Should correspond an error `name` in the lexicon errors array
29+ error: String,
30+ /// Human-readable description and possibly additonal context
31+ message: String,
32}
33+impl Example for XrpcErrorResponseObject {
34+ fn example() -> Self {
35+ Self {
36+ error: "RecordNotFound".to_string(),
37+ message: "This record was deleted".to_string(),
38+ }
000000039 }
00000000000000000000000000000040}
4104243+fn bad_request_handler(err: poem::Error) -> GetRecordResponse {
44+ GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
45+ error: "InvalidRequest".to_string(),
46+ message: format!("Bad request, here's some info that maybe should not be exposed: {err}"),
47+ }))
0000000000048}
4950+#[derive(Object)]
51+#[oai(example = true)]
52+struct FoundRecordResponseObject {
53+ /// at-uri for this record
54+ uri: String,
55+ /// CID for this exact version of the record
56+ ///
57+ /// Slingshot will always return the CID, despite it not being a required
58+ /// response property in the official lexicon.
59+ cid: Option<String>,
60+ /// the record itself as JSON
61+ value: serde_json::Value,
62}
63+impl Example for FoundRecordResponseObject {
64+ fn example() -> Self {
65+ Self {
66+ uri: format!("at://{}/{}/{}", example_did(), example_collection(), example_rkey()),
67+ cid: Some("bafyreialv3mzvvxaoyrfrwoer3xmabbmdchvrbyhayd7bga47qjbycy74e".to_string()),
68+ value: serde_json::json!({
69+ "$type": "app.bsky.feed.like",
70+ "createdAt": "2025-07-29T18:02:02.327Z",
71+ "subject": {
72+ "cid": "bafyreia2gy6eyk5qfetgahvshpq35vtbwy6negpy3gnuulcdi723mi7vxy",
73+ "uri": "at://did:plc:vwzwgnygau7ed7b7wt5ux7y2/app.bsky.feed.post/3lv4lkb4vgs2k"
74+ }
75+ }),
76+ }
77+ }
00000078}
7980+#[derive(ApiResponse)]
81+#[oai(bad_request_handler = "bad_request_handler")]
82+enum GetRecordResponse {
83+ /// Record found
84+ #[oai(status = 200)]
85+ Ok(Json<FoundRecordResponseObject>),
86+ /// Bad request or no record to return
87+ ///
88+ /// The only error name in the repo.getRecord lexicon is `RecordNotFound`,
89+ /// but the [canonical api docs](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
90+ /// also list `InvalidRequest`, `ExpiredToken`, and `InvalidToken`. Of
91+ /// these, slingshot will only return `RecordNotFound` or `InvalidRequest`.
92+ #[oai(status = 400)]
93+ BadRequest(Json<XrpcErrorResponseObject>),
00094}
9596+struct Xrpc {
97+ cache: HybridCache<String, CachedRecord>,
000000000000098}
99100+#[OpenApi]
101+impl Xrpc {
102+ /// com.atproto.repo.getRecord
000000000103 ///
104+ /// Get a single record from a repository. Does not require auth.
105 ///
106+ /// See https://docs.bsky.app/docs/api/com-atproto-repo-get-record for the
107+ /// canonical XRPC documentation that this endpoint aims to be compatible
108+ /// with.
109+ #[oai(path = "/com.atproto.repo.getRecord", method = "get")]
110+ async fn get_record(
111+ &self,
112+ /// The DID of the repo
113+ ///
114+ /// NOTE: handles should be accepted here but this is still TODO in slingshot
115+ #[oai(example = "example_did")]
116+ repo: Query<String>,
117+ /// The NSID of the record collection
118+ #[oai(example = "example_collection")]
119+ collection: Query<String>,
120+ /// The Record key
121+ #[oai(example = "example_rkey")]
122+ rkey: Query<String>,
123+ /// Optional: the CID of the version of the record.
124+ ///
125+ /// If not specified, then return the most recent version.
126+ ///
127+ /// If specified and a newer version of the record exists, returns 404 not
128+ /// found. That is: slingshot only retains the most recent version of a
129+ /// record.
130+ cid: Query<Option<String>>,
131+ ) -> GetRecordResponse {
132+ // TODO: yeah yeah
133+ let at_uri = format!(
134+ "at://{}/{}/{}",
135+ &*repo, &*collection, &*rkey
136+ );
137138+ let entry = self.cache
0000000139 .fetch(at_uri.clone(), || async move {
140+ todo!()
141 })
142 .await
143 .unwrap();
144+145+ // TODO: actual 404
146147 match *entry {
148 CachedRecord::Found(ref raw) => {
149 let (found_cid, raw_value) = raw.into();
150 let found_cid = found_cid.as_ref().to_string();
151+ if cid.clone().map(|c| c != found_cid).unwrap_or(false) {
152+ return GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
153+ error: "RecordNotFound".to_string(),
154+ message: "A record was found but its CID did not match that requested".to_string(),
155+ }));
156 }
157+ // TODO: thank u stellz: https://gist.github.com/stella3d/51e679e55b264adff89d00a1e58d0272
158+ let value = serde_json::from_str(raw_value.get()).expect("RawValue to be valid json");
159+ GetRecordResponse::Ok(Json(FoundRecordResponseObject {
160 uri: at_uri,
161+ cid: Some(found_cid),
162+ value,
163+ }))
164 },
165 CachedRecord::Deleted => {
166+ GetRecordResponse::BadRequest(Json(XrpcErrorResponseObject {
167+ error: "RecordNotFound".to_string(),
168+ message: "This record was deleted".to_string(),
169+ }))
170 }
171 }
172+ }
173+}
174175+pub async fn serve(
176+ cache: HybridCache<String, CachedRecord>,
177+ _shutdown: CancellationToken,
178+) -> Result<(), ServerError> {
179+ let api_service =
180+ OpenApiService::new(Xrpc { cache }, "Slingshot", env!("CARGO_PKG_VERSION"))
181+ .server("http://localhost:3000")
182+ .url_prefix("/xrpc");
183+184+ let app = Route::new()
185+ .nest("/", api_service.scalar())
186+ .nest("/openapi.json", api_service.spec_endpoint())
187+ .nest("/xrpc/", api_service);
188+189+ Server::new(TcpListener::bind("127.0.0.1:3000"))
190+ .run(app)
191+ .await
192+ .map_err(|e| ServerError::ServerExited(format!("uh oh: {e:?}")))
193}