use anyhow::{Context, Result}; use rusqlite::Connection; use serde::{Deserialize, Serialize}; use std::sync::Mutex; #[derive(Debug, Serialize, Deserialize)] pub struct Stats { pub build_count: i64, pub build_total_ms: i64, pub subst_count: i64, pub subst_total_ms: i64, pub download_bytes: i64, pub download_ms: i64, pub slowest_builds: Vec, pub cache_latency: Vec, } #[derive(Debug, Serialize, Deserialize)] pub struct SlowBuild { pub duration_ms: i64, pub drv_path: Option, pub text: Option, } #[derive(Debug, Serialize, Deserialize)] pub struct CacheStat { pub cache_url: String, pub avg_ms: f64, pub count: i64, } pub fn collect_stats(db: &Mutex, since: Option) -> Result { let conn = db.lock().unwrap(); // SQL NULL makes the WHERE condition vacuously true, giving us "no filter". let since_str: Option = since .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) .map(|dt| dt.to_rfc3339()); let p = since_str.as_deref(); let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = conn.query_row( "SELECT COUNT(*) FILTER (WHERE event_type = 105), COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 105), 0), COUNT(*) FILTER (WHERE event_type = 108), COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0), COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0), COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0) FROM events WHERE (?1 IS NULL OR start_time >= ?1)", rusqlite::params![p], |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)), ).context("Failed to query summary stats")?; let mut stmt = conn.prepare( "SELECT duration_ms, drv_path, text FROM events WHERE event_type = 105 AND (?1 IS NULL OR start_time >= ?1) ORDER BY duration_ms DESC LIMIT 10", ).context("Failed to prepare slowest builds query")?; let slowest_builds: Vec = stmt.query_map(rusqlite::params![p], |r| { Ok(SlowBuild { duration_ms: r.get(0)?, drv_path: r.get(1)?, text: r.get(2)?, }) })?.filter_map(|r| r.ok()).collect(); // Substitute (108) measures full substitution time per cache; QueryPathInfo (109) // only measures metadata lookup and would undercount latency. let mut stmt = conn.prepare( "SELECT cache_url, AVG(duration_ms), COUNT(*) FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND (?1 IS NULL OR start_time >= ?1) GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", ).context("Failed to prepare cache latency query")?; let cache_latency: Vec = stmt.query_map(rusqlite::params![p], |r| { Ok(CacheStat { cache_url: r.get(0)?, avg_ms: r.get(1)?, count: r.get(2)?, }) })?.filter_map(|r| r.ok()).collect(); Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency }) } // Mann-Whitney U is non-parametric and makes no distributional assumptions, // which is appropriate for build times that are right-skewed. pub struct MannWhitneyResult { pub p_value: f64, } pub fn mann_whitney_u(a: &[i64], b: &[i64]) -> Option { if a.is_empty() || b.is_empty() { return None; } let n1 = a.len(); let n2 = b.len(); let n1f = n1 as f64; let n2f = n2 as f64; let mut combined: Vec<(i64, usize)> = a.iter().map(|&v| (v, 0)) .chain(b.iter().map(|&v| (v, 1))) .collect(); combined.sort_unstable_by_key(|&(v, _)| v); let n = combined.len(); // Average ranks within tie groups and accumulate the tie-correction term Σ(t³ - t). let mut rank_sum_a = 0.0f64; let mut tie_correction = 0.0f64; let mut i = 0; while i < n { let mut j = i; while j < n && combined[j].0 == combined[i].0 { j += 1; } let avg_rank = (i as f64 + 1.0 + j as f64) / 2.0; for k in i..j { if combined[k].1 == 0 { rank_sum_a += avg_rank; } } let t = (j - i) as f64; if t > 1.0 { tie_correction += t * t * t - t; } i = j; } assert!(rank_sum_a >= 0.0); let u_a = rank_sum_a - n1f * (n1f + 1.0) / 2.0; let u_b = n1f * n2f - u_a; assert!(u_a >= 0.0); assert!(u_b >= 0.0); assert!((u_a + u_b - n1f * n2f).abs() < 1e-6, "U_A + U_B must equal n1*n2"); let cliffs_delta = (u_a - u_b) / (n1f * n2f); assert!(cliffs_delta >= -1.0 - 1e-9); assert!(cliffs_delta <= 1.0 + 1e-9); let _ = cliffs_delta; // Var[U] = n1*n2/12 * [(n+1) - Σ(t³-t)/(n*(n-1))] let nf = n as f64; let variance = (n1f * n2f / 12.0) * ((nf + 1.0) - tie_correction / (nf * (nf - 1.0))); let p_value = if variance <= 0.0 { 1.0 } else { let u_min = u_a.min(u_b); let mean_u = n1f * n2f / 2.0; // Continuity correction: +0.5 shifts U_min toward the mean, making z conservative. let z = (u_min - mean_u + 0.5) / variance.sqrt(); assert!(z <= 0.0 + 1e-9, "z must be non-positive for U_min ≤ mean_U"); (2.0 * normal_cdf(z)).min(1.0) }; assert!(p_value >= 0.0); assert!(p_value <= 1.0); Some(MannWhitneyResult { p_value }) } // Abramowitz & Stegun 7.1.26, max error ≈ 1.5×10⁻⁷. fn normal_cdf(z: f64) -> f64 { 0.5 * (1.0 + erf_approx(z / std::f64::consts::SQRT_2)) } fn erf_approx(x: f64) -> f64 { let t = 1.0 / (1.0 + 0.3275911 * x.abs()); let poly = t * (0.254829592 + t * (-0.284496736 + t * (1.421413741 + t * (-1.453152027 + t * 1.061405429)))); let result = 1.0 - poly * (-x * x).exp(); if x >= 0.0 { result } else { -result } } fn median_sorted(sorted: &[i64]) -> f64 { assert!(!sorted.is_empty()); let n = sorted.len(); if n % 2 == 0 { (sorted[n / 2 - 1] + sorted[n / 2]) as f64 / 2.0 } else { sorted[n / 2] as f64 } } fn fmt_ms(ms: i64) -> String { if ms < 1000 { format!("{}ms", ms) } else if ms < 60_000 { format!("{:.1}s", ms as f64 / 1000.0) } else { format!("{}m{:.1}s", ms / 60_000, (ms % 60_000) as f64 / 1000.0) } } fn drv_name(path: &str) -> &str { path.strip_prefix("/nix/store/").unwrap_or(path) } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum BucketSize { Hour, Day, Week, Month, } impl BucketSize { fn strftime_fmt(&self) -> &'static str { match self { BucketSize::Hour => "%Y-%m-%dT%H", BucketSize::Day => "%Y-%m-%d", BucketSize::Week => "%Y-W%W", BucketSize::Month => "%Y-%m", } } fn col_width(&self) -> usize { match self { BucketSize::Hour => 13, BucketSize::Day => 10, BucketSize::Week => 8, BucketSize::Month => 7, } } } // Raw durations are carried per-bucket so adjacent buckets can be compared // without a second round-trip to the daemon. #[derive(Debug, Serialize, Deserialize)] pub struct TrendBucket { pub bucket: String, pub build_durations: Vec, pub subst_durations: Vec, pub download_bytes: i64, } #[derive(Debug, Serialize, Deserialize)] pub struct Trend { pub buckets: Vec, pub bucket_size: BucketSize, pub drv_filter: Option, } pub fn collect_trend( db: &Mutex, since: Option, bucket: BucketSize, drv: Option, ) -> Result { if let Some(ref d) = drv { assert!(!d.is_empty(), "drv filter must not be empty"); } let conn = db.lock().unwrap(); let since_str: Option = since .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) .map(|dt| dt.to_rfc3339()); let p = since_str.as_deref(); let drv_ref = drv.as_deref(); let fmt = bucket.strftime_fmt(); // Rows come out ordered by bucket then time, so grouping by sequential scan is valid. // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter. let mut stmt = conn.prepare( "SELECT strftime(?3, start_time), event_type, duration_ms, total_bytes FROM events WHERE event_type IN (101, 105, 108) AND (?1 IS NULL OR start_time >= ?1) AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%') ORDER BY strftime(?3, start_time) ASC, start_time ASC", ).context("Failed to prepare trend query")?; let mut buckets: Vec = vec![]; for row in stmt.query_map(rusqlite::params![p, drv_ref, fmt], |r| { Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?)) })?.filter_map(|r| r.ok()) { let (b, etype, dur, bytes) = row; if buckets.last().map(|x: &TrendBucket| x.bucket.as_str()) != Some(&b) { buckets.push(TrendBucket { bucket: b, build_durations: vec![], subst_durations: vec![], download_bytes: 0 }); } let last = buckets.last_mut().unwrap(); match etype { 105 => { assert!(dur >= 0); last.build_durations.push(dur); } 108 => { assert!(dur >= 0); last.subst_durations.push(dur); } 101 => { assert!(bytes >= 0); last.download_bytes += bytes; } _ => {} } } for i in 1..buckets.len() { assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending"); } Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv }) } pub fn display_trend(trend: &Trend) { let has_downloads = trend.drv_filter.is_none() && trend.buckets.iter().any(|b| b.download_bytes > 0); let bw = trend.bucket_size.col_width(); if let Some(ref drv) = trend.drv_filter { println!("filter: {}", drv); } print!("{:6} {:>10} {:>6} {:>10}", "period", "builds", "build med", "subst", "subst med"); if has_downloads { print!(" {:>8}", "dl (MB)"); } println!(); if trend.buckets.is_empty() { println!("(no data)"); return; } for b in &trend.buckets { let mut bs = b.build_durations.clone(); bs.sort_unstable(); let mut ss = b.subst_durations.clone(); ss.sort_unstable(); let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; print!("{:6} {:>10} {:>6} {:>10}", b.bucket, b.build_durations.len(), fmt_ms(build_med), b.subst_durations.len(), fmt_ms(subst_med)); if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); } println!(); } } fn print_test_section(label: &str, buckets: &[TrendBucket], get_durs: F, bw: usize) where F: Fn(&TrendBucket) -> &[i64], { let any_data = buckets.iter().any(|b| !get_durs(b).is_empty()); if !any_data { return; } println!("{}", label); println!("{:5} {:>10} {:>7} {:>8}", "period", "n", "median", "Δ", "p-value"); for (i, b) in buckets.iter().enumerate() { let durs = get_durs(b); let mut sorted = durs.to_vec(); sorted.sort_unstable(); let med = if sorted.is_empty() { 0.0 } else { median_sorted(&sorted) }; let (delta_str, p_str) = if i == 0 || get_durs(&buckets[i - 1]).is_empty() { (String::new(), String::new()) } else { let prev = get_durs(&buckets[i - 1]); let mut prev_sorted = prev.to_vec(); prev_sorted.sort_unstable(); let prev_med = median_sorted(&prev_sorted); let delta = if prev_med > 0.0 { let pct = (med - prev_med) / prev_med * 100.0; let sign = if pct >= 0.0 { "+" } else { "" }; format!("{}{:.0}%", sign, pct) } else { String::new() }; let p = match mann_whitney_u(prev, durs) { Some(r) => format!("{:.3}", r.p_value), None => String::new(), }; (delta, p) }; println!("{:5} {:>10} {:>7} {:>8}", b.bucket, durs.len(), fmt_ms(med as i64), delta_str, p_str); } println!(); } pub fn display_trend_test(trend: &Trend) { let bw = trend.bucket_size.col_width(); if let Some(ref drv) = trend.drv_filter { println!("filter: {}", drv); } print_test_section("builds", &trend.buckets, |b| &b.build_durations, bw); print_test_section("substitutions", &trend.buckets, |b| &b.subst_durations, bw); println!("Mann-Whitney U (two-tailed). H0: adjacent periods have the same duration distribution."); } pub fn output_csv_trend(trend: &Trend) { println!("period,build_count,build_median_ms,subst_count,subst_median_ms,download_bytes"); for b in &trend.buckets { let mut bs = b.build_durations.clone(); bs.sort_unstable(); let mut ss = b.subst_durations.clone(); ss.sort_unstable(); let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 }; let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 }; assert!(build_med >= 0); assert!(subst_med >= 0); println!("{},{},{},{},{},{}", b.bucket, b.build_durations.len(), build_med, b.subst_durations.len(), subst_med, b.download_bytes); } } pub fn display_stats(stats: Stats) { let build_avg = if stats.build_count > 0 { stats.build_total_ms / stats.build_count } else { 0 }; let subst_avg = if stats.subst_count > 0 { stats.subst_total_ms / stats.subst_count } else { 0 }; let mb = stats.download_bytes as f64 / 1_048_576.0; let dl_speed = if stats.download_ms > 0 { mb / (stats.download_ms as f64 / 1000.0) } else { 0.0 }; println!("{:<14} {:>6} total {:>9} avg {:>9}", "built", stats.build_count, fmt_ms(stats.build_total_ms), fmt_ms(build_avg)); println!("{:<14} {:>6} total {:>9} avg {:>9}", "substituted", stats.subst_count, fmt_ms(stats.subst_total_ms), fmt_ms(subst_avg)); println!("{:<14} {:>8.1} MB avg {:>6.1} MB/s", "downloaded", mb, dl_speed); if !stats.slowest_builds.is_empty() { println!(); for row in &stats.slowest_builds { let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?"); println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path)); } } if !stats.cache_latency.is_empty() { let url_w = stats.cache_latency.iter().map(|r| r.cache_url.len()).max().unwrap_or(0); println!(); for row in &stats.cache_latency { println!("{:7} {:>6} queries", row.cache_url, fmt_ms(row.avg_ms as i64), row.count, width = url_w); } } }