Built for people who think better out loud.
at main 430 lines 13 kB view raw
1use std::sync::Arc; 2use std::time::{Instant, SystemTime, UNIX_EPOCH}; 3 4use axiom_rs::Client; 5use axum::body::Body; 6use axum::extract::State; 7use axum::http::{HeaderMap, Request, header}; 8use axum::middleware::Next; 9use axum::response::Response; 10use serde::Serialize; 11use serde_json::{Map, Value}; 12use tokio::sync::Mutex; 13use uuid::Uuid; 14 15use crate::config::Config; 16use crate::state::AppState; 17 18/// Shared, request-scoped event handle stored in request extensions. 19/// 20/// Usage: 21/// ```rust 22/// # use slipnote_backend::logging::SharedEvent; 23/// # use axum::Extension; 24/// async fn handler(Extension(event): Extension<SharedEvent>) { 25/// let mut event = event.lock().await; 26/// event.set_nested_field("request", "checked", true); 27/// } 28/// ``` 29pub type SharedEvent = Arc<Mutex<WideEvent>>; 30 31/// A wide event that accumulates request-scoped context for logging. 32/// 33/// Usage: 34/// ```rust 35/// # use slipnote_backend::logging::WideEvent; 36/// let mut event = WideEvent::default(); 37/// event.set_nested_field("request", "request_id", "req-123"); 38/// event.set_nested_field("transcription", "language", "en"); 39/// ``` 40#[derive(Debug, Clone, Default, Serialize)] 41pub struct WideEvent { 42 #[serde(flatten)] 43 fields: Map<String, Value>, 44} 45 46impl WideEvent { 47 /// Builds a new event from request metadata. 48 pub fn from_request(request: &Request<Body>) -> Self { 49 let mut event = Self::default(); 50 let request_id = header_value(request.headers(), "x-request-id") 51 .unwrap_or_else(|| Uuid::new_v4().to_string()); 52 let timestamp_ms = SystemTime::now() 53 .duration_since(UNIX_EPOCH) 54 .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)) 55 .unwrap_or_default(); 56 57 event.set_nested_field("request", "request_id", request_id); 58 event.set_nested_field("request", "timestamp", timestamp_ms); 59 event.set_nested_field("request", "path", request.uri().path()); 60 61 let user_agent = request 62 .headers() 63 .get(header::USER_AGENT) 64 .and_then(|value| value.to_str().ok()) 65 .map(str::to_string); 66 event.set_optional_nested_field("request", "user_agent", user_agent); 67 event.set_optional_nested_field("request", "ip", extract_ip(request.headers())); 68 69 event 70 } 71 72 /// Sets a nested field if the value is present. 73 /// 74 /// Usage: 75 /// ```rust 76 /// # use slipnote_backend::logging::WideEvent; 77 /// let mut event = WideEvent::default(); 78 /// event.set_optional_nested_field("request", "user_agent", Some("ua")); 79 /// event.set_optional_nested_field::<&str>("request", "ip", None); 80 /// ``` 81 pub fn set_optional_nested_field<T: Serialize>( 82 &mut self, 83 object_key: impl Into<String>, 84 field_key: impl Into<String>, 85 value: Option<T>, 86 ) { 87 let Some(value) = value else { 88 return; 89 }; 90 self.set_nested_field(object_key, field_key, value); 91 } 92 93 /// Sets a nested field inside an object, creating it if needed. 94 /// 95 /// Usage: 96 /// ```rust 97 /// # use slipnote_backend::logging::WideEvent; 98 /// let mut event = WideEvent::default(); 99 /// event.set_nested_field("transcription", "language", "en"); 100 /// event.set_nested_field("transcription", "duration", 1.2); 101 /// ``` 102 pub fn set_nested_field<T: Serialize>( 103 &mut self, 104 object_key: impl Into<String>, 105 field_key: impl Into<String>, 106 value: T, 107 ) { 108 let object_key = object_key.into(); 109 let field_key = field_key.into(); 110 let Ok(value) = serde_json::to_value(value) else { 111 return; 112 }; 113 114 if let Some(Value::Object(existing)) = self.fields.get_mut(&object_key) { 115 existing.insert(field_key, value); 116 } else { 117 let mut map = Map::new(); 118 map.insert(field_key, value); 119 self.fields.insert(object_key, Value::Object(map)); 120 } 121 } 122 123 /// Records an error message on the event. 124 /// 125 /// Usage: 126 /// ```rust 127 /// # use slipnote_backend::logging::WideEvent; 128 /// let mut event = WideEvent::default(); 129 /// event.set_error("upstream failed"); 130 /// ``` 131 pub fn set_error(&mut self, err: impl std::fmt::Display) { 132 self.set_nested_field("error", "message", err.to_string()); 133 } 134} 135 136/// A lightweight wrapper around the Axiom client. 137/// 138/// Usage: 139/// ```rust,no_run 140/// # use slipnote_backend::config::Config; 141/// # use slipnote_backend::logging::Logger; 142/// let config = Config::from_env().expect("config"); 143/// let logger = Logger::from_config(&config).expect("logger"); 144/// ``` 145#[derive(Clone, Default)] 146pub struct Logger { 147 inner: Option<Arc<LoggerInner>>, 148 sample_rate: f64, 149 #[cfg(test)] 150 test_sender: Option<tokio::sync::mpsc::Sender<WideEvent>>, 151} 152 153#[derive(Debug)] 154struct LoggerInner { 155 client: Client, 156 dataset: String, 157} 158 159impl Logger { 160 /// Builds a logger from config, enabling ingestion when Axiom is configured. 161 pub fn from_config(config: &Config) -> Result<Self, axiom_rs::Error> { 162 let sample_rate = clamp_sample_rate(config.log_sample_rate); 163 let Some(token) = config.axiom_token.clone() else { 164 return Ok(Self { 165 inner: None, 166 sample_rate, 167 #[cfg(test)] 168 test_sender: None, 169 }); 170 }; 171 let Some(dataset) = config.axiom_dataset.clone() else { 172 return Ok(Self { 173 inner: None, 174 sample_rate, 175 #[cfg(test)] 176 test_sender: None, 177 }); 178 }; 179 180 let mut builder = Client::builder().with_token(token); 181 if let Some(url) = config.axiom_url.clone() { 182 builder = builder.with_url(url); 183 } 184 let client = builder.build()?; 185 186 Ok(Self { 187 inner: Some(Arc::new(LoggerInner { client, dataset })), 188 sample_rate, 189 #[cfg(test)] 190 test_sender: None, 191 }) 192 } 193 194 /// Returns a disabled logger that drops events. 195 pub fn disabled() -> Self { 196 Self { 197 inner: None, 198 sample_rate: 1.0, 199 #[cfg(test)] 200 test_sender: None, 201 } 202 } 203 204 #[cfg(test)] 205 /// Builds a logger that captures events for tests. 206 pub fn with_test_sender(sender: tokio::sync::mpsc::Sender<WideEvent>) -> Self { 207 Self { 208 inner: None, 209 sample_rate: 1.0, 210 test_sender: Some(sender), 211 } 212 } 213 214 /// Emits a single event in the background when enabled. 215 pub fn emit(&self, event: WideEvent) { 216 if !should_sample(&event, self.sample_rate) { 217 return; 218 } 219 220 if cfg!(debug_assertions) && let Ok(line) = serde_json::to_string(&event) { 221 println!("{line}"); 222 } 223 224 #[cfg(test)] 225 if let Some(sender) = &self.test_sender { 226 let _ = sender.try_send(event.clone()); 227 } 228 229 let Some(inner) = &self.inner else { 230 return; 231 }; 232 233 let client = inner.client.clone(); 234 let dataset = inner.dataset.clone(); 235 tokio::spawn(async move { 236 let _ = client.ingest(dataset, vec![event]).await; 237 }); 238 } 239} 240 241fn clamp_sample_rate(rate: f64) -> f64 { 242 if !rate.is_finite() { 243 return 1.0; 244 } 245 rate.clamp(0.0, 1.0) 246} 247 248fn should_sample(event: &WideEvent, rate: f64) -> bool { 249 let request = (event 250 .fields 251 .get("request")) 252 .and_then(|value| value.as_object()); 253 let status_code = request 254 .and_then(|map| map.get("status_code")) 255 .and_then(|value| value.as_u64()); 256 if status_code.unwrap_or(0) >= 500 { 257 return true; 258 } 259 260 if event.fields.get("error").is_some() { 261 return true; 262 } 263 264 let duration_ms = request 265 .and_then(|map| map.get("duration_ms")) 266 .and_then(|value| value.as_u64()); 267 if duration_ms.unwrap_or(0) > 2000 { 268 return true; 269 } 270 271 let rate = clamp_sample_rate(rate); 272 if rate >= 1.0 { 273 return true; 274 } 275 if rate <= 0.0 { 276 return false; 277 } 278 279 rand::random::<f64>() < rate 280} 281 282/// Attaches a request-scoped event, then emits it after the handler completes. 283pub async fn logging_middleware( 284 State(state): State<AppState>, 285 mut request: Request<Body>, 286 next: Next, 287) -> Response { 288 let event = Arc::new(Mutex::new(WideEvent::from_request(&request))); 289 { 290 let mut record = event.lock().await; 291 record.set_nested_field("request", "environment", state.config.slipnote_env.clone()); 292 } 293 request.extensions_mut().insert(event.clone()); 294 let start = Instant::now(); 295 let response = next.run(request).await; 296 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); 297 298 let record = { 299 let mut record = event.lock().await; 300 let status = response.status(); 301 record.set_nested_field("request", "status_code", status.as_u16()); 302 record.set_nested_field("request", "duration_ms", duration_ms); 303 record.set_nested_field( 304 "request", 305 "outcome", 306 if status.is_success() { "success" } else { "error" }, 307 ); 308 record.clone() 309 }; 310 311 state.logger.emit(record); 312 313 response 314} 315 316fn extract_ip(headers: &HeaderMap) -> Option<String> { 317 if let Some(value) = header_value(headers, "x-forwarded-for") { 318 return value.split(',').next().map(|value| value.trim().to_string()); 319 } 320 header_value(headers, "x-real-ip") 321} 322 323fn header_value(headers: &HeaderMap, name: &str) -> Option<String> { 324 headers 325 .get(name) 326 .and_then(|value| value.to_str().ok()) 327 .map(str::to_string) 328} 329 330#[cfg(test)] 331mod tests { 332 use super::*; 333 use axum::routing::get; 334 use axum::{Extension, Router}; 335 use proptest::prelude::*; 336 use tokio::sync::mpsc; 337 use tower::ServiceExt; 338 339 use crate::test_support::TestStateBuilder; 340 341 #[tokio::test] 342 async fn middleware_injects_and_emits_event() { 343 let (sender, mut receiver) = mpsc::channel(1); 344 let logger = Logger::with_test_sender(sender); 345 let state = TestStateBuilder::new() 346 .with_logger(logger.clone()) 347 .build(); 348 349 let app = Router::new() 350 .route( 351 "/health", 352 get(|Extension(event): Extension<SharedEvent>| async move { 353 let mut event = event.lock().await; 354 event.set_nested_field("request", "checked", true); 355 "ok" 356 }), 357 ) 358 .layer(axum::middleware::from_fn_with_state( 359 state.clone(), 360 logging_middleware, 361 )) 362 .with_state(state); 363 364 let response = app 365 .oneshot(Request::builder().uri("/health").body(Body::empty()).unwrap()) 366 .await 367 .expect("health response"); 368 369 assert_eq!(response.status(), axum::http::StatusCode::OK); 370 let emitted = receiver.recv().await.expect("event"); 371 let request = emitted 372 .fields 373 .get("request") 374 .and_then(|value| value.as_object()) 375 .expect("request object"); 376 assert!(request.contains_key("request_id")); 377 } 378 379 #[test] 380 fn nested_fields_merge_into_object() { 381 let mut event = WideEvent::default(); 382 event.set_nested_field("transcription", "language", "en"); 383 event.set_nested_field("transcription", "duration", 1.2); 384 385 let transcription = event 386 .fields 387 .get("transcription") 388 .and_then(|value| value.as_object()) 389 .expect("transcription object"); 390 assert_eq!(transcription.get("language"), Some(&Value::String("en".to_string()))); 391 assert_eq!(transcription.get("duration"), Some(&Value::from(1.2))); 392 } 393 394 #[test] 395 fn optional_nested_fields_skip_none() { 396 let mut event = WideEvent::default(); 397 event.set_optional_nested_field("request", "user_agent", None::<String>); 398 event.set_optional_nested_field("request", "user_agent", Some("ua".to_string())); 399 400 let request = event 401 .fields 402 .get("request") 403 .and_then(|value| value.as_object()) 404 .expect("request object"); 405 assert_eq!(request.get("user_agent"), Some(&Value::String("ua".to_string()))); 406 } 407 408 proptest! { 409 #[test] 410 fn nested_field_overwrite_is_consistent( 411 object_key in "obj_[a-z]{1,8}", 412 field_key in "field_[a-z]{1,8}", 413 first in ".*", 414 second in ".*", 415 ) { 416 let mut event = WideEvent::default(); 417 event.set_nested_field(&object_key, &field_key, &first); 418 event.set_nested_field(&object_key, &field_key, &second); 419 420 let stored = event 421 .fields 422 .get(&object_key) 423 .and_then(|value| value.as_object()) 424 .and_then(|map| map.get(&field_key)) 425 .and_then(|value| value.as_str()) 426 .unwrap_or(""); 427 prop_assert_eq!(stored, second); 428 } 429 } 430}