Built for people who think better out loud.
1use std::sync::Arc;
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use axiom_rs::Client;
5use axum::body::Body;
6use axum::extract::State;
7use axum::http::{HeaderMap, Request, header};
8use axum::middleware::Next;
9use axum::response::Response;
10use serde::Serialize;
11use serde_json::{Map, Value};
12use tokio::sync::Mutex;
13use uuid::Uuid;
14
15use crate::config::Config;
16use crate::state::AppState;
17
18/// Shared, request-scoped event handle stored in request extensions.
19///
20/// Usage:
21/// ```rust
22/// # use slipnote_backend::logging::SharedEvent;
23/// # use axum::Extension;
24/// async fn handler(Extension(event): Extension<SharedEvent>) {
25/// let mut event = event.lock().await;
26/// event.set_nested_field("request", "checked", true);
27/// }
28/// ```
29pub type SharedEvent = Arc<Mutex<WideEvent>>;
30
31/// A wide event that accumulates request-scoped context for logging.
32///
33/// Usage:
34/// ```rust
35/// # use slipnote_backend::logging::WideEvent;
36/// let mut event = WideEvent::default();
37/// event.set_nested_field("request", "request_id", "req-123");
38/// event.set_nested_field("transcription", "language", "en");
39/// ```
40#[derive(Debug, Clone, Default, Serialize)]
41pub struct WideEvent {
42 #[serde(flatten)]
43 fields: Map<String, Value>,
44}
45
46impl WideEvent {
47 /// Builds a new event from request metadata.
48 pub fn from_request(request: &Request<Body>) -> Self {
49 let mut event = Self::default();
50 let request_id = header_value(request.headers(), "x-request-id")
51 .unwrap_or_else(|| Uuid::new_v4().to_string());
52 let timestamp_ms = SystemTime::now()
53 .duration_since(UNIX_EPOCH)
54 .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX))
55 .unwrap_or_default();
56
57 event.set_nested_field("request", "request_id", request_id);
58 event.set_nested_field("request", "timestamp", timestamp_ms);
59 event.set_nested_field("request", "path", request.uri().path());
60
61 let user_agent = request
62 .headers()
63 .get(header::USER_AGENT)
64 .and_then(|value| value.to_str().ok())
65 .map(str::to_string);
66 event.set_optional_nested_field("request", "user_agent", user_agent);
67 event.set_optional_nested_field("request", "ip", extract_ip(request.headers()));
68
69 event
70 }
71
72 /// Sets a nested field if the value is present.
73 ///
74 /// Usage:
75 /// ```rust
76 /// # use slipnote_backend::logging::WideEvent;
77 /// let mut event = WideEvent::default();
78 /// event.set_optional_nested_field("request", "user_agent", Some("ua"));
79 /// event.set_optional_nested_field::<&str>("request", "ip", None);
80 /// ```
81 pub fn set_optional_nested_field<T: Serialize>(
82 &mut self,
83 object_key: impl Into<String>,
84 field_key: impl Into<String>,
85 value: Option<T>,
86 ) {
87 let Some(value) = value else {
88 return;
89 };
90 self.set_nested_field(object_key, field_key, value);
91 }
92
93 /// Sets a nested field inside an object, creating it if needed.
94 ///
95 /// Usage:
96 /// ```rust
97 /// # use slipnote_backend::logging::WideEvent;
98 /// let mut event = WideEvent::default();
99 /// event.set_nested_field("transcription", "language", "en");
100 /// event.set_nested_field("transcription", "duration", 1.2);
101 /// ```
102 pub fn set_nested_field<T: Serialize>(
103 &mut self,
104 object_key: impl Into<String>,
105 field_key: impl Into<String>,
106 value: T,
107 ) {
108 let object_key = object_key.into();
109 let field_key = field_key.into();
110 let Ok(value) = serde_json::to_value(value) else {
111 return;
112 };
113
114 if let Some(Value::Object(existing)) = self.fields.get_mut(&object_key) {
115 existing.insert(field_key, value);
116 } else {
117 let mut map = Map::new();
118 map.insert(field_key, value);
119 self.fields.insert(object_key, Value::Object(map));
120 }
121 }
122
123 /// Records an error message on the event.
124 ///
125 /// Usage:
126 /// ```rust
127 /// # use slipnote_backend::logging::WideEvent;
128 /// let mut event = WideEvent::default();
129 /// event.set_error("upstream failed");
130 /// ```
131 pub fn set_error(&mut self, err: impl std::fmt::Display) {
132 self.set_nested_field("error", "message", err.to_string());
133 }
134}
135
136/// A lightweight wrapper around the Axiom client.
137///
138/// Usage:
139/// ```rust,no_run
140/// # use slipnote_backend::config::Config;
141/// # use slipnote_backend::logging::Logger;
142/// let config = Config::from_env().expect("config");
143/// let logger = Logger::from_config(&config).expect("logger");
144/// ```
145#[derive(Clone, Default)]
146pub struct Logger {
147 inner: Option<Arc<LoggerInner>>,
148 sample_rate: f64,
149 #[cfg(test)]
150 test_sender: Option<tokio::sync::mpsc::Sender<WideEvent>>,
151}
152
153#[derive(Debug)]
154struct LoggerInner {
155 client: Client,
156 dataset: String,
157}
158
159impl Logger {
160 /// Builds a logger from config, enabling ingestion when Axiom is configured.
161 pub fn from_config(config: &Config) -> Result<Self, axiom_rs::Error> {
162 let sample_rate = clamp_sample_rate(config.log_sample_rate);
163 let Some(token) = config.axiom_token.clone() else {
164 return Ok(Self {
165 inner: None,
166 sample_rate,
167 #[cfg(test)]
168 test_sender: None,
169 });
170 };
171 let Some(dataset) = config.axiom_dataset.clone() else {
172 return Ok(Self {
173 inner: None,
174 sample_rate,
175 #[cfg(test)]
176 test_sender: None,
177 });
178 };
179
180 let mut builder = Client::builder().with_token(token);
181 if let Some(url) = config.axiom_url.clone() {
182 builder = builder.with_url(url);
183 }
184 let client = builder.build()?;
185
186 Ok(Self {
187 inner: Some(Arc::new(LoggerInner { client, dataset })),
188 sample_rate,
189 #[cfg(test)]
190 test_sender: None,
191 })
192 }
193
194 /// Returns a disabled logger that drops events.
195 pub fn disabled() -> Self {
196 Self {
197 inner: None,
198 sample_rate: 1.0,
199 #[cfg(test)]
200 test_sender: None,
201 }
202 }
203
204 #[cfg(test)]
205 /// Builds a logger that captures events for tests.
206 pub fn with_test_sender(sender: tokio::sync::mpsc::Sender<WideEvent>) -> Self {
207 Self {
208 inner: None,
209 sample_rate: 1.0,
210 test_sender: Some(sender),
211 }
212 }
213
214 /// Emits a single event in the background when enabled.
215 pub fn emit(&self, event: WideEvent) {
216 if !should_sample(&event, self.sample_rate) {
217 return;
218 }
219
220 if cfg!(debug_assertions) && let Ok(line) = serde_json::to_string(&event) {
221 println!("{line}");
222 }
223
224 #[cfg(test)]
225 if let Some(sender) = &self.test_sender {
226 let _ = sender.try_send(event.clone());
227 }
228
229 let Some(inner) = &self.inner else {
230 return;
231 };
232
233 let client = inner.client.clone();
234 let dataset = inner.dataset.clone();
235 tokio::spawn(async move {
236 let _ = client.ingest(dataset, vec![event]).await;
237 });
238 }
239}
240
241fn clamp_sample_rate(rate: f64) -> f64 {
242 if !rate.is_finite() {
243 return 1.0;
244 }
245 rate.clamp(0.0, 1.0)
246}
247
248fn should_sample(event: &WideEvent, rate: f64) -> bool {
249 let request = (event
250 .fields
251 .get("request"))
252 .and_then(|value| value.as_object());
253 let status_code = request
254 .and_then(|map| map.get("status_code"))
255 .and_then(|value| value.as_u64());
256 if status_code.unwrap_or(0) >= 500 {
257 return true;
258 }
259
260 if event.fields.get("error").is_some() {
261 return true;
262 }
263
264 let duration_ms = request
265 .and_then(|map| map.get("duration_ms"))
266 .and_then(|value| value.as_u64());
267 if duration_ms.unwrap_or(0) > 2000 {
268 return true;
269 }
270
271 let rate = clamp_sample_rate(rate);
272 if rate >= 1.0 {
273 return true;
274 }
275 if rate <= 0.0 {
276 return false;
277 }
278
279 rand::random::<f64>() < rate
280}
281
282/// Attaches a request-scoped event, then emits it after the handler completes.
283pub async fn logging_middleware(
284 State(state): State<AppState>,
285 mut request: Request<Body>,
286 next: Next,
287) -> Response {
288 let event = Arc::new(Mutex::new(WideEvent::from_request(&request)));
289 {
290 let mut record = event.lock().await;
291 record.set_nested_field("request", "environment", state.config.slipnote_env.clone());
292 }
293 request.extensions_mut().insert(event.clone());
294 let start = Instant::now();
295 let response = next.run(request).await;
296 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
297
298 let record = {
299 let mut record = event.lock().await;
300 let status = response.status();
301 record.set_nested_field("request", "status_code", status.as_u16());
302 record.set_nested_field("request", "duration_ms", duration_ms);
303 record.set_nested_field(
304 "request",
305 "outcome",
306 if status.is_success() { "success" } else { "error" },
307 );
308 record.clone()
309 };
310
311 state.logger.emit(record);
312
313 response
314}
315
316fn extract_ip(headers: &HeaderMap) -> Option<String> {
317 if let Some(value) = header_value(headers, "x-forwarded-for") {
318 return value.split(',').next().map(|value| value.trim().to_string());
319 }
320 header_value(headers, "x-real-ip")
321}
322
323fn header_value(headers: &HeaderMap, name: &str) -> Option<String> {
324 headers
325 .get(name)
326 .and_then(|value| value.to_str().ok())
327 .map(str::to_string)
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use axum::routing::get;
334 use axum::{Extension, Router};
335 use proptest::prelude::*;
336 use tokio::sync::mpsc;
337 use tower::ServiceExt;
338
339 use crate::test_support::TestStateBuilder;
340
341 #[tokio::test]
342 async fn middleware_injects_and_emits_event() {
343 let (sender, mut receiver) = mpsc::channel(1);
344 let logger = Logger::with_test_sender(sender);
345 let state = TestStateBuilder::new()
346 .with_logger(logger.clone())
347 .build();
348
349 let app = Router::new()
350 .route(
351 "/health",
352 get(|Extension(event): Extension<SharedEvent>| async move {
353 let mut event = event.lock().await;
354 event.set_nested_field("request", "checked", true);
355 "ok"
356 }),
357 )
358 .layer(axum::middleware::from_fn_with_state(
359 state.clone(),
360 logging_middleware,
361 ))
362 .with_state(state);
363
364 let response = app
365 .oneshot(Request::builder().uri("/health").body(Body::empty()).unwrap())
366 .await
367 .expect("health response");
368
369 assert_eq!(response.status(), axum::http::StatusCode::OK);
370 let emitted = receiver.recv().await.expect("event");
371 let request = emitted
372 .fields
373 .get("request")
374 .and_then(|value| value.as_object())
375 .expect("request object");
376 assert!(request.contains_key("request_id"));
377 }
378
379 #[test]
380 fn nested_fields_merge_into_object() {
381 let mut event = WideEvent::default();
382 event.set_nested_field("transcription", "language", "en");
383 event.set_nested_field("transcription", "duration", 1.2);
384
385 let transcription = event
386 .fields
387 .get("transcription")
388 .and_then(|value| value.as_object())
389 .expect("transcription object");
390 assert_eq!(transcription.get("language"), Some(&Value::String("en".to_string())));
391 assert_eq!(transcription.get("duration"), Some(&Value::from(1.2)));
392 }
393
394 #[test]
395 fn optional_nested_fields_skip_none() {
396 let mut event = WideEvent::default();
397 event.set_optional_nested_field("request", "user_agent", None::<String>);
398 event.set_optional_nested_field("request", "user_agent", Some("ua".to_string()));
399
400 let request = event
401 .fields
402 .get("request")
403 .and_then(|value| value.as_object())
404 .expect("request object");
405 assert_eq!(request.get("user_agent"), Some(&Value::String("ua".to_string())));
406 }
407
408 proptest! {
409 #[test]
410 fn nested_field_overwrite_is_consistent(
411 object_key in "obj_[a-z]{1,8}",
412 field_key in "field_[a-z]{1,8}",
413 first in ".*",
414 second in ".*",
415 ) {
416 let mut event = WideEvent::default();
417 event.set_nested_field(&object_key, &field_key, &first);
418 event.set_nested_field(&object_key, &field_key, &second);
419
420 let stored = event
421 .fields
422 .get(&object_key)
423 .and_then(|value| value.as_object())
424 .and_then(|map| map.get(&field_key))
425 .and_then(|value| value.as_str())
426 .unwrap_or("");
427 prop_assert_eq!(stored, second);
428 }
429 }
430}