Nix Observability Daemon
observability nix
at master 426 lines 15 kB view raw
1use anyhow::{Context, Result}; 2use rusqlite::Connection; 3use serde::{Deserialize, Serialize}; 4use std::sync::Mutex; 5 6#[derive(Debug, Serialize, Deserialize)] 7pub struct Stats { 8 pub build_count: i64, 9 pub build_total_ms: i64, 10 pub subst_count: i64, 11 pub subst_total_ms: i64, 12 pub download_bytes: i64, 13 pub download_ms: i64, 14 pub slowest_builds: Vec<SlowBuild>, 15 pub cache_latency: Vec<CacheStat>, 16} 17 18#[derive(Debug, Serialize, Deserialize)] 19pub struct SlowBuild { 20 pub duration_ms: i64, 21 pub drv_path: Option<String>, 22 pub text: Option<String>, 23} 24 25#[derive(Debug, Serialize, Deserialize)] 26pub struct CacheStat { 27 pub cache_url: String, 28 pub avg_ms: f64, 29 pub count: i64, 30} 31 32pub fn collect_stats(db: &Mutex<Connection>, since: Option<i64>) -> Result<Stats> { 33 let conn = db.lock().unwrap(); 34 35 // SQL NULL makes the WHERE condition vacuously true, giving us "no filter". 36 let since_str: Option<String> = since 37 .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) 38 .map(|dt| dt.to_rfc3339()); 39 let p = since_str.as_deref(); 40 41 let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = 42 conn.query_row( 43 "SELECT 44 COUNT(*) FILTER (WHERE event_type = 105), 45 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 105), 0), 46 COUNT(*) FILTER (WHERE event_type = 108), 47 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0), 48 COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0), 49 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0) 50 FROM events WHERE (?1 IS NULL OR start_time >= ?1)", 51 rusqlite::params![p], 52 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, 53 r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)), 54 ).context("Failed to query summary stats")?; 55 56 let mut stmt = conn.prepare( 57 "SELECT duration_ms, drv_path, text 58 FROM events WHERE event_type = 105 AND (?1 IS NULL OR start_time >= ?1) 59 ORDER BY duration_ms DESC LIMIT 10", 60 ).context("Failed to prepare slowest builds query")?; 61 let slowest_builds: Vec<SlowBuild> = stmt.query_map(rusqlite::params![p], |r| { 62 Ok(SlowBuild { 63 duration_ms: r.get(0)?, 64 drv_path: r.get(1)?, 65 text: r.get(2)?, 66 }) 67 })?.filter_map(|r| r.ok()).collect(); 68 69 // Substitute (108) measures full substitution time per cache; QueryPathInfo (109) 70 // only measures metadata lookup and would undercount latency. 71 let mut stmt = conn.prepare( 72 "SELECT cache_url, AVG(duration_ms), COUNT(*) 73 FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND (?1 IS NULL OR start_time >= ?1) 74 GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 75 ).context("Failed to prepare cache latency query")?; 76 let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![p], |r| { 77 Ok(CacheStat { 78 cache_url: r.get(0)?, 79 avg_ms: r.get(1)?, 80 count: r.get(2)?, 81 }) 82 })?.filter_map(|r| r.ok()).collect(); 83 84 Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency }) 85} 86 87// Mann-Whitney U is non-parametric and makes no distributional assumptions, 88// which is appropriate for build times that are right-skewed. 89pub struct MannWhitneyResult { 90 pub p_value: f64, 91} 92 93pub fn mann_whitney_u(a: &[i64], b: &[i64]) -> Option<MannWhitneyResult> { 94 if a.is_empty() || b.is_empty() { return None; } 95 96 let n1 = a.len(); 97 let n2 = b.len(); 98 let n1f = n1 as f64; 99 let n2f = n2 as f64; 100 101 let mut combined: Vec<(i64, usize)> = a.iter().map(|&v| (v, 0)) 102 .chain(b.iter().map(|&v| (v, 1))) 103 .collect(); 104 combined.sort_unstable_by_key(|&(v, _)| v); 105 106 let n = combined.len(); 107 108 // Average ranks within tie groups and accumulate the tie-correction term Σ(t³ - t). 109 let mut rank_sum_a = 0.0f64; 110 let mut tie_correction = 0.0f64; 111 let mut i = 0; 112 while i < n { 113 let mut j = i; 114 while j < n && combined[j].0 == combined[i].0 { j += 1; } 115 let avg_rank = (i as f64 + 1.0 + j as f64) / 2.0; 116 for k in i..j { 117 if combined[k].1 == 0 { rank_sum_a += avg_rank; } 118 } 119 let t = (j - i) as f64; 120 if t > 1.0 { tie_correction += t * t * t - t; } 121 i = j; 122 } 123 124 assert!(rank_sum_a >= 0.0); 125 126 let u_a = rank_sum_a - n1f * (n1f + 1.0) / 2.0; 127 let u_b = n1f * n2f - u_a; 128 129 assert!(u_a >= 0.0); 130 assert!(u_b >= 0.0); 131 assert!((u_a + u_b - n1f * n2f).abs() < 1e-6, "U_A + U_B must equal n1*n2"); 132 133 let cliffs_delta = (u_a - u_b) / (n1f * n2f); 134 assert!(cliffs_delta >= -1.0 - 1e-9); 135 assert!(cliffs_delta <= 1.0 + 1e-9); 136 let _ = cliffs_delta; 137 138 // Var[U] = n1*n2/12 * [(n+1) - Σ(t³-t)/(n*(n-1))] 139 let nf = n as f64; 140 let variance = (n1f * n2f / 12.0) * ((nf + 1.0) - tie_correction / (nf * (nf - 1.0))); 141 142 let p_value = if variance <= 0.0 { 143 1.0 144 } else { 145 let u_min = u_a.min(u_b); 146 let mean_u = n1f * n2f / 2.0; 147 // Continuity correction: +0.5 shifts U_min toward the mean, making z conservative. 148 let z = (u_min - mean_u + 0.5) / variance.sqrt(); 149 assert!(z <= 0.0 + 1e-9, "z must be non-positive for U_min ≤ mean_U"); 150 (2.0 * normal_cdf(z)).min(1.0) 151 }; 152 153 assert!(p_value >= 0.0); 154 assert!(p_value <= 1.0); 155 156 Some(MannWhitneyResult { p_value }) 157} 158 159// Abramowitz & Stegun 7.1.26, max error ≈ 1.5×10⁻⁷. 160fn normal_cdf(z: f64) -> f64 { 161 0.5 * (1.0 + erf_approx(z / std::f64::consts::SQRT_2)) 162} 163 164fn erf_approx(x: f64) -> f64 { 165 let t = 1.0 / (1.0 + 0.3275911 * x.abs()); 166 let poly = t * (0.254829592 167 + t * (-0.284496736 168 + t * (1.421413741 169 + t * (-1.453152027 170 + t * 1.061405429)))); 171 let result = 1.0 - poly * (-x * x).exp(); 172 if x >= 0.0 { result } else { -result } 173} 174 175fn median_sorted(sorted: &[i64]) -> f64 { 176 assert!(!sorted.is_empty()); 177 let n = sorted.len(); 178 if n % 2 == 0 { 179 (sorted[n / 2 - 1] + sorted[n / 2]) as f64 / 2.0 180 } else { 181 sorted[n / 2] as f64 182 } 183} 184 185fn fmt_ms(ms: i64) -> String { 186 if ms < 1000 { 187 format!("{}ms", ms) 188 } else if ms < 60_000 { 189 format!("{:.1}s", ms as f64 / 1000.0) 190 } else { 191 format!("{}m{:.1}s", ms / 60_000, (ms % 60_000) as f64 / 1000.0) 192 } 193} 194 195fn drv_name(path: &str) -> &str { 196 path.strip_prefix("/nix/store/").unwrap_or(path) 197} 198 199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 200#[serde(rename_all = "snake_case")] 201pub enum BucketSize { 202 Hour, 203 Day, 204 Week, 205 Month, 206} 207 208impl BucketSize { 209 fn strftime_fmt(&self) -> &'static str { 210 match self { 211 BucketSize::Hour => "%Y-%m-%dT%H", 212 BucketSize::Day => "%Y-%m-%d", 213 BucketSize::Week => "%Y-W%W", 214 BucketSize::Month => "%Y-%m", 215 } 216 } 217 218 fn col_width(&self) -> usize { 219 match self { 220 BucketSize::Hour => 13, 221 BucketSize::Day => 10, 222 BucketSize::Week => 8, 223 BucketSize::Month => 7, 224 } 225 } 226} 227 228// Raw durations are carried per-bucket so adjacent buckets can be compared 229// without a second round-trip to the daemon. 230#[derive(Debug, Serialize, Deserialize)] 231pub struct TrendBucket { 232 pub bucket: String, 233 pub build_durations: Vec<i64>, 234 pub subst_durations: Vec<i64>, 235 pub download_bytes: i64, 236} 237 238#[derive(Debug, Serialize, Deserialize)] 239pub struct Trend { 240 pub buckets: Vec<TrendBucket>, 241 pub bucket_size: BucketSize, 242 pub drv_filter: Option<String>, 243} 244 245pub fn collect_trend( 246 db: &Mutex<Connection>, 247 since: Option<i64>, 248 bucket: BucketSize, 249 drv: Option<String>, 250) -> Result<Trend> { 251 if let Some(ref d) = drv { 252 assert!(!d.is_empty(), "drv filter must not be empty"); 253 } 254 255 let conn = db.lock().unwrap(); 256 257 let since_str: Option<String> = since 258 .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) 259 .map(|dt| dt.to_rfc3339()); 260 let p = since_str.as_deref(); 261 let drv_ref = drv.as_deref(); 262 let fmt = bucket.strftime_fmt(); 263 264 // Rows come out ordered by bucket then time, so grouping by sequential scan is valid. 265 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. 266 let mut stmt = conn.prepare( 267 "SELECT strftime(?3, start_time), event_type, duration_ms, total_bytes 268 FROM events 269 WHERE event_type IN (101, 105, 108) 270 AND (?1 IS NULL OR start_time >= ?1) 271 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') 272 ORDER BY strftime(?3, start_time) ASC, start_time ASC", 273 ).context("Failed to prepare trend query")?; 274 275 let mut buckets: Vec<TrendBucket> = vec![]; 276 for row in stmt.query_map(rusqlite::params![p, drv_ref, fmt], |r| { 277 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) 278 })?.filter_map(|r| r.ok()) { 279 let (b, etype, dur, bytes) = row; 280 if buckets.last().map(|x: &TrendBucket| x.bucket.as_str()) != Some(&b) { 281 buckets.push(TrendBucket { bucket: b, build_durations: vec![], subst_durations: vec![], download_bytes: 0 }); 282 } 283 let last = buckets.last_mut().unwrap(); 284 match etype { 285 105 => { assert!(dur >= 0); last.build_durations.push(dur); } 286 108 => { assert!(dur >= 0); last.subst_durations.push(dur); } 287 101 => { assert!(bytes >= 0); last.download_bytes += bytes; } 288 _ => {} 289 } 290 } 291 292 for i in 1..buckets.len() { 293 assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); 294 } 295 296 Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv }) 297} 298 299pub fn display_trend(trend: &Trend) { 300 let has_downloads = trend.drv_filter.is_none() 301 && trend.buckets.iter().any(|b| b.download_bytes > 0); 302 let bw = trend.bucket_size.col_width(); 303 304 if let Some(ref drv) = trend.drv_filter { 305 println!("filter: {}", drv); 306 } 307 308 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", "period", "builds", "build med", "subst", "subst med"); 309 if has_downloads { print!(" {:>8}", "dl (MB)"); } 310 println!(); 311 312 if trend.buckets.is_empty() { 313 println!("(no data)"); 314 return; 315 } 316 317 for b in &trend.buckets { 318 let mut bs = b.build_durations.clone(); bs.sort_unstable(); 319 let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 320 let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 321 let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 322 323 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", 324 b.bucket, b.build_durations.len(), fmt_ms(build_med), 325 b.subst_durations.len(), fmt_ms(subst_med)); 326 if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); } 327 println!(); 328 } 329} 330 331fn print_test_section<F>(label: &str, buckets: &[TrendBucket], get_durs: F, bw: usize) 332where 333 F: Fn(&TrendBucket) -> &[i64], 334{ 335 let any_data = buckets.iter().any(|b| !get_durs(b).is_empty()); 336 if !any_data { return; } 337 338 println!("{}", label); 339 println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}", "period", "n", "median", "Δ", "p-value"); 340 341 for (i, b) in buckets.iter().enumerate() { 342 let durs = get_durs(b); 343 let mut sorted = durs.to_vec(); sorted.sort_unstable(); 344 let med = if sorted.is_empty() { 0.0 } else { median_sorted(&sorted) }; 345 346 let (delta_str, p_str) = if i == 0 || get_durs(&buckets[i - 1]).is_empty() { 347 (String::new(), String::new()) 348 } else { 349 let prev = get_durs(&buckets[i - 1]); 350 let mut prev_sorted = prev.to_vec(); prev_sorted.sort_unstable(); 351 let prev_med = median_sorted(&prev_sorted); 352 353 let delta = if prev_med > 0.0 { 354 let pct = (med - prev_med) / prev_med * 100.0; 355 let sign = if pct >= 0.0 { "+" } else { "" }; 356 format!("{}{:.0}%", sign, pct) 357 } else { 358 String::new() 359 }; 360 361 let p = match mann_whitney_u(prev, durs) { 362 Some(r) => format!("{:.3}", r.p_value), 363 None => String::new(), 364 }; 365 366 (delta, p) 367 }; 368 369 println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}", 370 b.bucket, durs.len(), fmt_ms(med as i64), delta_str, p_str); 371 } 372 println!(); 373} 374 375pub fn display_trend_test(trend: &Trend) { 376 let bw = trend.bucket_size.col_width(); 377 378 if let Some(ref drv) = trend.drv_filter { 379 println!("filter: {}", drv); 380 } 381 382 print_test_section("builds", &trend.buckets, |b| &b.build_durations, bw); 383 print_test_section("substitutions", &trend.buckets, |b| &b.subst_durations, bw); 384 385 println!("Mann-Whitney U (two-tailed). H0: adjacent periods have the same duration distribution."); 386} 387 388pub fn output_csv_trend(trend: &Trend) { 389 println!("period,build_count,build_median_ms,subst_count,subst_median_ms,download_bytes"); 390 for b in &trend.buckets { 391 let mut bs = b.build_durations.clone(); bs.sort_unstable(); 392 let mut ss = b.subst_durations.clone(); ss.sort_unstable(); 393 let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; 394 let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; 395 assert!(build_med >= 0); 396 assert!(subst_med >= 0); 397 println!("{},{},{},{},{},{}", b.bucket, b.build_durations.len(), build_med, b.subst_durations.len(), subst_med, b.download_bytes); 398 } 399} 400 401pub fn display_stats(stats: Stats) { 402 let build_avg = if stats.build_count > 0 { stats.build_total_ms / stats.build_count } else { 0 }; 403 let subst_avg = if stats.subst_count > 0 { stats.subst_total_ms / stats.subst_count } else { 0 }; 404 let mb = stats.download_bytes as f64 / 1_048_576.0; 405 let dl_speed = if stats.download_ms > 0 { mb / (stats.download_ms as f64 / 1000.0) } else { 0.0 }; 406 407 println!("{:<14} {:>6} total {:>9} avg {:>9}", "built", stats.build_count, fmt_ms(stats.build_total_ms), fmt_ms(build_avg)); 408 println!("{:<14} {:>6} total {:>9} avg {:>9}", "substituted", stats.subst_count, fmt_ms(stats.subst_total_ms), fmt_ms(subst_avg)); 409 println!("{:<14} {:>8.1} MB avg {:>6.1} MB/s", "downloaded", mb, dl_speed); 410 411 if !stats.slowest_builds.is_empty() { 412 println!(); 413 for row in &stats.slowest_builds { 414 let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?"); 415 println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path)); 416 } 417 } 418 419 if !stats.cache_latency.is_empty() { 420 let url_w = stats.cache_latency.iter().map(|r| r.cache_url.len()).max().unwrap_or(0); 421 println!(); 422 for row in &stats.cache_latency { 423 println!("{:<width$} avg {:>7} {:>6} queries", row.cache_url, fmt_ms(row.avg_ms as i64), row.count, width = url_w); 424 } 425 } 426}