atproto blogging
1//! Telemetry infrastructure for weaver services.
2//!
3//! Provides:
4//! - Prometheus metrics with `/metrics` endpoint
5//! - Tracing with pretty console output + optional Loki push
6//!
7//! # Usage
8//!
9//! ```ignore
10//! use weaver_common::telemetry::{self, TelemetryConfig};
11//!
12//! #[tokio::main]
13//! async fn main() {
14//! // Initialize telemetry (metrics + tracing)
15//! let config = TelemetryConfig::from_env("weaver-index");
16//! telemetry::init(config).await;
17//!
18//! // Mount the metrics endpoint in your axum router
19//! let app = Router::new()
20//! .route("/metrics", get(|| async { telemetry::render() }));
21//!
22//! // Use metrics
23//! metrics::counter!("requests_total").increment(1);
24//!
25//! // Use tracing (goes to both console and loki if configured)
26//! tracing::info!("server started");
27//! }
28//! ```
29
30use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
31use std::sync::OnceLock;
32use tracing::Level;
33use tracing_subscriber::layer::SubscriberExt;
34use tracing_subscriber::util::SubscriberInitExt;
35use tracing_subscriber::{EnvFilter, Layer};
36
37static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
38
39/// Telemetry configuration
40#[derive(Debug, Clone)]
41pub struct TelemetryConfig {
42 /// Service name for labeling (e.g., "weaver-index", "weaver-app")
43 pub service_name: String,
44 /// Loki push URL (e.g., "http://localhost:3100"). None disables Loki.
45 pub loki_url: Option<String>,
46 /// Console log level (default: INFO, DEBUG in debug builds)
47 pub console_level: Level,
48}
49
50impl TelemetryConfig {
51 /// Load config from environment variables.
52 ///
53 /// - `LOKI_URL`: Loki push endpoint (optional)
54 /// - `RUST_LOG`: Standard env filter (optional, overrides console_level)
55 pub fn from_env(service_name: impl Into<String>) -> Self {
56 let console_level = if cfg!(debug_assertions) {
57 Level::DEBUG
58 } else {
59 Level::INFO
60 };
61
62 Self {
63 service_name: service_name.into(),
64 loki_url: std::env::var("LOKI_URL").ok(),
65 console_level,
66 }
67 }
68}
69
70/// Opaque handle for the Loki background task.
71pub struct LokiTask(tracing_loki::BackgroundTask);
72
73/// Initialize telemetry (metrics + tracing).
74///
75/// Call once at application startup. If `LOKI_URL` is set, spawns a background
76/// task to push logs to Loki.
77pub async fn init(config: TelemetryConfig) {
78 // Initialize prometheus metrics
79 init_metrics();
80
81 // Initialize tracing subscriber
82 if let Some(task) = init_tracing(config) {
83 // Spawn the loki background task
84 tokio::spawn(task.0);
85 }
86}
87
88/// Initialize telemetry without spawning the Loki task.
89///
90/// Use this when you need to set up tracing before a tokio runtime is available.
91/// Returns the Loki task if configured - caller must spawn it later with `spawn_loki_task`.
92pub fn init_sync(config: TelemetryConfig) -> Option<LokiTask> {
93 init_metrics();
94 init_tracing(config)
95}
96
97/// Spawn the Loki background task.
98///
99/// Call this inside a tokio runtime after `init_sync`.
100pub fn spawn_loki_task(task: LokiTask) {
101 tokio::spawn(task.0);
102}
103
104/// Initialize just the prometheus metrics recorder.
105pub fn init_metrics() -> &'static PrometheusHandle {
106 PROMETHEUS_HANDLE.get_or_init(|| {
107 // HTTP request duration buckets (in seconds)
108 let http_buckets = vec![
109 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
110 ];
111
112 PrometheusBuilder::new()
113 .set_buckets_for_metric(
114 metrics_exporter_prometheus::Matcher::Prefix("http_request_duration".to_string()),
115 &http_buckets,
116 )
117 .expect("failed to set histogram buckets")
118 .install_recorder()
119 .expect("failed to install prometheus recorder")
120 })
121}
122
123/// Initialize tracing with console + optional Loki layers.
124///
125/// Returns the Loki background task if Loki is configured.
126fn init_tracing(config: TelemetryConfig) -> Option<LokiTask> {
127 let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
128 EnvFilter::new(format!("{}", config.console_level.as_str().to_lowercase()))
129 });
130
131 // Pretty console layer for human-readable stdout
132 let console_layer = tracing_subscriber::fmt::layer()
133 .with_target(true)
134 .with_thread_ids(false)
135 .with_file(false)
136 .with_line_number(false)
137 .compact()
138 .with_filter(env_filter);
139
140 // Optional Loki layer for structured logs
141 if let Some(loki_url) = config.loki_url {
142 match tracing_loki::url::Url::parse(&loki_url) {
143 Ok(url) => {
144 let (loki_layer, loki_task) = tracing_loki::builder()
145 .label("service", config.service_name.clone())
146 .expect("invalid label")
147 .build_url(url)
148 .expect("failed to build loki layer");
149
150 tracing_subscriber::registry()
151 .with(console_layer)
152 .with(loki_layer)
153 .init();
154
155 tracing::info!(
156 service = %config.service_name,
157 loki_url = %loki_url,
158 "telemetry initialized with loki"
159 );
160
161 Some(LokiTask(loki_task))
162 }
163 Err(e) => {
164 // Invalid URL - fall back to console only
165 tracing_subscriber::registry().with(console_layer).init();
166
167 tracing::warn!(
168 error = %e,
169 loki_url = %loki_url,
170 "invalid LOKI_URL, falling back to console only"
171 );
172 None
173 }
174 }
175 } else {
176 // No Loki URL - console only
177 tracing_subscriber::registry().with(console_layer).init();
178
179 tracing::debug!(
180 service = %config.service_name,
181 "telemetry initialized (console only, set LOKI_URL to enable loki)"
182 );
183 None
184 }
185}
186
187/// Get the prometheus handle.
188pub fn handle() -> &'static PrometheusHandle {
189 PROMETHEUS_HANDLE.get_or_init(|| {
190 PrometheusBuilder::new()
191 .install_recorder()
192 .expect("failed to install prometheus recorder")
193 })
194}
195
196/// Render metrics in prometheus text format.
197pub fn render() -> String {
198 handle().render()
199}
200
201// Re-export the metrics crate for convenience
202pub use metrics::{counter, gauge, histogram};
203
204/// HTTP metrics middleware for axum.
205///
206/// Records `http_requests_total` counter and `http_request_duration_seconds` histogram.
207/// Use with `axum::middleware::from_fn`.
208///
209/// # Example
210/// ```ignore
211/// use axum::middleware;
212/// use weaver_common::telemetry::http_metrics;
213///
214/// let app = Router::new()
215/// .route("/", get(handler))
216/// .layer(middleware::from_fn(http_metrics));
217/// ```
218#[cfg(feature = "telemetry")]
219pub async fn http_metrics(
220 req: axum::extract::Request,
221 next: axum::middleware::Next,
222) -> axum::response::Response {
223 let start = std::time::Instant::now();
224 let method = req.method().to_string();
225 let path = req.uri().path().to_string();
226
227 let response = next.run(req).await;
228
229 let duration = start.elapsed().as_secs_f64();
230 let status = response.status().as_u16().to_string();
231
232 metrics::counter!(
233 "http_requests_total",
234 "method" => method.clone(),
235 "path" => normalize_path(&path),
236 "status" => status
237 )
238 .increment(1);
239
240 metrics::histogram!(
241 "http_request_duration_seconds",
242 "method" => method,
243 "path" => normalize_path(&path)
244 )
245 .record(duration);
246
247 response
248}
249
250/// Normalize path for metrics labels.
251/// Keeps first 3 segments, collapses rest to reduce cardinality.
252#[cfg(feature = "telemetry")]
253fn normalize_path(path: &str) -> String {
254 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
255 match parts.len() {
256 0 => "/".to_string(),
257 1 => format!("/{}", parts[0]),
258 2 => format!("/{}/{}", parts[0], parts[1]),
259 3 => format!("/{}/{}/{}", parts[0], parts[1], parts[2]),
260 _ => format!("/{}/{}/{}/*", parts[0], parts[1], parts[2]),
261 }
262}