at main 262 lines 8.3 kB view raw
1//! Telemetry infrastructure for weaver services. 2//! 3//! Provides: 4//! - Prometheus metrics with `/metrics` endpoint 5//! - Tracing with pretty console output + optional Loki push 6//! 7//! # Usage 8//! 9//! ```ignore 10//! use weaver_common::telemetry::{self, TelemetryConfig}; 11//! 12//! #[tokio::main] 13//! async fn main() { 14//! // Initialize telemetry (metrics + tracing) 15//! let config = TelemetryConfig::from_env("weaver-index"); 16//! telemetry::init(config).await; 17//! 18//! // Mount the metrics endpoint in your axum router 19//! let app = Router::new() 20//! .route("/metrics", get(|| async { telemetry::render() })); 21//! 22//! // Use metrics 23//! metrics::counter!("requests_total").increment(1); 24//! 25//! // Use tracing (goes to both console and loki if configured) 26//! tracing::info!("server started"); 27//! } 28//! ``` 29 30use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; 31use std::sync::OnceLock; 32use tracing::Level; 33use tracing_subscriber::layer::SubscriberExt; 34use tracing_subscriber::util::SubscriberInitExt; 35use tracing_subscriber::{EnvFilter, Layer}; 36 37static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new(); 38 39/// Telemetry configuration 40#[derive(Debug, Clone)] 41pub struct TelemetryConfig { 42 /// Service name for labeling (e.g., "weaver-index", "weaver-app") 43 pub service_name: String, 44 /// Loki push URL (e.g., "http://localhost:3100"). None disables Loki. 45 pub loki_url: Option<String>, 46 /// Console log level (default: INFO, DEBUG in debug builds) 47 pub console_level: Level, 48} 49 50impl TelemetryConfig { 51 /// Load config from environment variables. 52 /// 53 /// - `LOKI_URL`: Loki push endpoint (optional) 54 /// - `RUST_LOG`: Standard env filter (optional, overrides console_level) 55 pub fn from_env(service_name: impl Into<String>) -> Self { 56 let console_level = if cfg!(debug_assertions) { 57 Level::DEBUG 58 } else { 59 Level::INFO 60 }; 61 62 Self { 63 service_name: service_name.into(), 64 loki_url: std::env::var("LOKI_URL").ok(), 65 console_level, 66 } 67 } 68} 69 70/// Opaque handle for the Loki background task. 71pub struct LokiTask(tracing_loki::BackgroundTask); 72 73/// Initialize telemetry (metrics + tracing). 74/// 75/// Call once at application startup. If `LOKI_URL` is set, spawns a background 76/// task to push logs to Loki. 77pub async fn init(config: TelemetryConfig) { 78 // Initialize prometheus metrics 79 init_metrics(); 80 81 // Initialize tracing subscriber 82 if let Some(task) = init_tracing(config) { 83 // Spawn the loki background task 84 tokio::spawn(task.0); 85 } 86} 87 88/// Initialize telemetry without spawning the Loki task. 89/// 90/// Use this when you need to set up tracing before a tokio runtime is available. 91/// Returns the Loki task if configured - caller must spawn it later with `spawn_loki_task`. 92pub fn init_sync(config: TelemetryConfig) -> Option<LokiTask> { 93 init_metrics(); 94 init_tracing(config) 95} 96 97/// Spawn the Loki background task. 98/// 99/// Call this inside a tokio runtime after `init_sync`. 100pub fn spawn_loki_task(task: LokiTask) { 101 tokio::spawn(task.0); 102} 103 104/// Initialize just the prometheus metrics recorder. 105pub fn init_metrics() -> &'static PrometheusHandle { 106 PROMETHEUS_HANDLE.get_or_init(|| { 107 // HTTP request duration buckets (in seconds) 108 let http_buckets = vec![ 109 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 110 ]; 111 112 PrometheusBuilder::new() 113 .set_buckets_for_metric( 114 metrics_exporter_prometheus::Matcher::Prefix("http_request_duration".to_string()), 115 &http_buckets, 116 ) 117 .expect("failed to set histogram buckets") 118 .install_recorder() 119 .expect("failed to install prometheus recorder") 120 }) 121} 122 123/// Initialize tracing with console + optional Loki layers. 124/// 125/// Returns the Loki background task if Loki is configured. 126fn init_tracing(config: TelemetryConfig) -> Option<LokiTask> { 127 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { 128 EnvFilter::new(format!("{}", config.console_level.as_str().to_lowercase())) 129 }); 130 131 // Pretty console layer for human-readable stdout 132 let console_layer = tracing_subscriber::fmt::layer() 133 .with_target(true) 134 .with_thread_ids(false) 135 .with_file(false) 136 .with_line_number(false) 137 .compact() 138 .with_filter(env_filter); 139 140 // Optional Loki layer for structured logs 141 if let Some(loki_url) = config.loki_url { 142 match tracing_loki::url::Url::parse(&loki_url) { 143 Ok(url) => { 144 let (loki_layer, loki_task) = tracing_loki::builder() 145 .label("service", config.service_name.clone()) 146 .expect("invalid label") 147 .build_url(url) 148 .expect("failed to build loki layer"); 149 150 tracing_subscriber::registry() 151 .with(console_layer) 152 .with(loki_layer) 153 .init(); 154 155 tracing::info!( 156 service = %config.service_name, 157 loki_url = %loki_url, 158 "telemetry initialized with loki" 159 ); 160 161 Some(LokiTask(loki_task)) 162 } 163 Err(e) => { 164 // Invalid URL - fall back to console only 165 tracing_subscriber::registry().with(console_layer).init(); 166 167 tracing::warn!( 168 error = %e, 169 loki_url = %loki_url, 170 "invalid LOKI_URL, falling back to console only" 171 ); 172 None 173 } 174 } 175 } else { 176 // No Loki URL - console only 177 tracing_subscriber::registry().with(console_layer).init(); 178 179 tracing::debug!( 180 service = %config.service_name, 181 "telemetry initialized (console only, set LOKI_URL to enable loki)" 182 ); 183 None 184 } 185} 186 187/// Get the prometheus handle. 188pub fn handle() -> &'static PrometheusHandle { 189 PROMETHEUS_HANDLE.get_or_init(|| { 190 PrometheusBuilder::new() 191 .install_recorder() 192 .expect("failed to install prometheus recorder") 193 }) 194} 195 196/// Render metrics in prometheus text format. 197pub fn render() -> String { 198 handle().render() 199} 200 201// Re-export the metrics crate for convenience 202pub use metrics::{counter, gauge, histogram}; 203 204/// HTTP metrics middleware for axum. 205/// 206/// Records `http_requests_total` counter and `http_request_duration_seconds` histogram. 207/// Use with `axum::middleware::from_fn`. 208/// 209/// # Example 210/// ```ignore 211/// use axum::middleware; 212/// use weaver_common::telemetry::http_metrics; 213/// 214/// let app = Router::new() 215/// .route("/", get(handler)) 216/// .layer(middleware::from_fn(http_metrics)); 217/// ``` 218#[cfg(feature = "telemetry")] 219pub async fn http_metrics( 220 req: axum::extract::Request, 221 next: axum::middleware::Next, 222) -> axum::response::Response { 223 let start = std::time::Instant::now(); 224 let method = req.method().to_string(); 225 let path = req.uri().path().to_string(); 226 227 let response = next.run(req).await; 228 229 let duration = start.elapsed().as_secs_f64(); 230 let status = response.status().as_u16().to_string(); 231 232 metrics::counter!( 233 "http_requests_total", 234 "method" => method.clone(), 235 "path" => normalize_path(&path), 236 "status" => status 237 ) 238 .increment(1); 239 240 metrics::histogram!( 241 "http_request_duration_seconds", 242 "method" => method, 243 "path" => normalize_path(&path) 244 ) 245 .record(duration); 246 247 response 248} 249 250/// Normalize path for metrics labels. 251/// Keeps first 3 segments, collapses rest to reduce cardinality. 252#[cfg(feature = "telemetry")] 253fn normalize_path(path: &str) -> String { 254 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect(); 255 match parts.len() { 256 0 => "/".to_string(), 257 1 => format!("/{}", parts[0]), 258 2 => format!("/{}/{}", parts[0], parts[1]), 259 3 => format!("/{}/{}/{}", parts[0], parts[1], parts[2]), 260 _ => format!("/{}/{}/{}/*", parts[0], parts[1], parts[2]), 261 } 262}