Nix Observability Daemon
observability
nix
1use anyhow::{Context, Result};
2use rusqlite::Connection;
3use serde::{Deserialize, Serialize};
4use std::sync::Mutex;
5
6#[derive(Debug, Serialize, Deserialize)]
7pub struct Stats {
8 pub build_count: i64,
9 pub build_total_ms: i64,
10 pub subst_count: i64,
11 pub subst_total_ms: i64,
12 pub download_bytes: i64,
13 pub download_ms: i64,
14 pub slowest_builds: Vec<SlowBuild>,
15 pub cache_latency: Vec<CacheStat>,
16}
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct SlowBuild {
20 pub duration_ms: i64,
21 pub drv_path: Option<String>,
22 pub text: Option<String>,
23}
24
25#[derive(Debug, Serialize, Deserialize)]
26pub struct CacheStat {
27 pub cache_url: String,
28 pub avg_ms: f64,
29 pub count: i64,
30}
31
32pub fn collect_stats(db: &Mutex<Connection>, since: Option<i64>) -> Result<Stats> {
33 let conn = db.lock().unwrap();
34
35 // SQL NULL makes the WHERE condition vacuously true, giving us "no filter".
36 let since_str: Option<String> = since
37 .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0))
38 .map(|dt| dt.to_rfc3339());
39 let p = since_str.as_deref();
40
41 let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) =
42 conn.query_row(
43 "SELECT
44 COUNT(*) FILTER (WHERE event_type = 105),
45 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 105), 0),
46 COUNT(*) FILTER (WHERE event_type = 108),
47 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0),
48 COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0),
49 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0)
50 FROM events WHERE (?1 IS NULL OR start_time >= ?1)",
51 rusqlite::params![p],
52 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?,
53 r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)),
54 ).context("Failed to query summary stats")?;
55
56 let mut stmt = conn.prepare(
57 "SELECT duration_ms, drv_path, text
58 FROM events WHERE event_type = 105 AND (?1 IS NULL OR start_time >= ?1)
59 ORDER BY duration_ms DESC LIMIT 10",
60 ).context("Failed to prepare slowest builds query")?;
61 let slowest_builds: Vec<SlowBuild> = stmt.query_map(rusqlite::params![p], |r| {
62 Ok(SlowBuild {
63 duration_ms: r.get(0)?,
64 drv_path: r.get(1)?,
65 text: r.get(2)?,
66 })
67 })?.filter_map(|r| r.ok()).collect();
68
69 // Substitute (108) measures full substitution time per cache; QueryPathInfo (109)
70 // only measures metadata lookup and would undercount latency.
71 let mut stmt = conn.prepare(
72 "SELECT cache_url, AVG(duration_ms), COUNT(*)
73 FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND (?1 IS NULL OR start_time >= ?1)
74 GROUP BY cache_url ORDER BY AVG(duration_ms) DESC",
75 ).context("Failed to prepare cache latency query")?;
76 let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![p], |r| {
77 Ok(CacheStat {
78 cache_url: r.get(0)?,
79 avg_ms: r.get(1)?,
80 count: r.get(2)?,
81 })
82 })?.filter_map(|r| r.ok()).collect();
83
84 Ok(Stats { build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms, slowest_builds, cache_latency })
85}
86
87// Mann-Whitney U is non-parametric and makes no distributional assumptions,
88// which is appropriate for build times that are right-skewed.
89pub struct MannWhitneyResult {
90 pub p_value: f64,
91}
92
93pub fn mann_whitney_u(a: &[i64], b: &[i64]) -> Option<MannWhitneyResult> {
94 if a.is_empty() || b.is_empty() { return None; }
95
96 let n1 = a.len();
97 let n2 = b.len();
98 let n1f = n1 as f64;
99 let n2f = n2 as f64;
100
101 let mut combined: Vec<(i64, usize)> = a.iter().map(|&v| (v, 0))
102 .chain(b.iter().map(|&v| (v, 1)))
103 .collect();
104 combined.sort_unstable_by_key(|&(v, _)| v);
105
106 let n = combined.len();
107
108 // Average ranks within tie groups and accumulate the tie-correction term Σ(t³ - t).
109 let mut rank_sum_a = 0.0f64;
110 let mut tie_correction = 0.0f64;
111 let mut i = 0;
112 while i < n {
113 let mut j = i;
114 while j < n && combined[j].0 == combined[i].0 { j += 1; }
115 let avg_rank = (i as f64 + 1.0 + j as f64) / 2.0;
116 for k in i..j {
117 if combined[k].1 == 0 { rank_sum_a += avg_rank; }
118 }
119 let t = (j - i) as f64;
120 if t > 1.0 { tie_correction += t * t * t - t; }
121 i = j;
122 }
123
124 assert!(rank_sum_a >= 0.0);
125
126 let u_a = rank_sum_a - n1f * (n1f + 1.0) / 2.0;
127 let u_b = n1f * n2f - u_a;
128
129 assert!(u_a >= 0.0);
130 assert!(u_b >= 0.0);
131 assert!((u_a + u_b - n1f * n2f).abs() < 1e-6, "U_A + U_B must equal n1*n2");
132
133 let cliffs_delta = (u_a - u_b) / (n1f * n2f);
134 assert!(cliffs_delta >= -1.0 - 1e-9);
135 assert!(cliffs_delta <= 1.0 + 1e-9);
136 let _ = cliffs_delta;
137
138 // Var[U] = n1*n2/12 * [(n+1) - Σ(t³-t)/(n*(n-1))]
139 let nf = n as f64;
140 let variance = (n1f * n2f / 12.0) * ((nf + 1.0) - tie_correction / (nf * (nf - 1.0)));
141
142 let p_value = if variance <= 0.0 {
143 1.0
144 } else {
145 let u_min = u_a.min(u_b);
146 let mean_u = n1f * n2f / 2.0;
147 // Continuity correction: +0.5 shifts U_min toward the mean, making z conservative.
148 let z = (u_min - mean_u + 0.5) / variance.sqrt();
149 assert!(z <= 0.0 + 1e-9, "z must be non-positive for U_min ≤ mean_U");
150 (2.0 * normal_cdf(z)).min(1.0)
151 };
152
153 assert!(p_value >= 0.0);
154 assert!(p_value <= 1.0);
155
156 Some(MannWhitneyResult { p_value })
157}
158
159// Abramowitz & Stegun 7.1.26, max error ≈ 1.5×10⁻⁷.
160fn normal_cdf(z: f64) -> f64 {
161 0.5 * (1.0 + erf_approx(z / std::f64::consts::SQRT_2))
162}
163
164fn erf_approx(x: f64) -> f64 {
165 let t = 1.0 / (1.0 + 0.3275911 * x.abs());
166 let poly = t * (0.254829592
167 + t * (-0.284496736
168 + t * (1.421413741
169 + t * (-1.453152027
170 + t * 1.061405429))));
171 let result = 1.0 - poly * (-x * x).exp();
172 if x >= 0.0 { result } else { -result }
173}
174
175fn median_sorted(sorted: &[i64]) -> f64 {
176 assert!(!sorted.is_empty());
177 let n = sorted.len();
178 if n % 2 == 0 {
179 (sorted[n / 2 - 1] + sorted[n / 2]) as f64 / 2.0
180 } else {
181 sorted[n / 2] as f64
182 }
183}
184
185fn fmt_ms(ms: i64) -> String {
186 if ms < 1000 {
187 format!("{}ms", ms)
188 } else if ms < 60_000 {
189 format!("{:.1}s", ms as f64 / 1000.0)
190 } else {
191 format!("{}m{:.1}s", ms / 60_000, (ms % 60_000) as f64 / 1000.0)
192 }
193}
194
195fn drv_name(path: &str) -> &str {
196 path.strip_prefix("/nix/store/").unwrap_or(path)
197}
198
199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
200#[serde(rename_all = "snake_case")]
201pub enum BucketSize {
202 Hour,
203 Day,
204 Week,
205 Month,
206}
207
208impl BucketSize {
209 fn strftime_fmt(&self) -> &'static str {
210 match self {
211 BucketSize::Hour => "%Y-%m-%dT%H",
212 BucketSize::Day => "%Y-%m-%d",
213 BucketSize::Week => "%Y-W%W",
214 BucketSize::Month => "%Y-%m",
215 }
216 }
217
218 fn col_width(&self) -> usize {
219 match self {
220 BucketSize::Hour => 13,
221 BucketSize::Day => 10,
222 BucketSize::Week => 8,
223 BucketSize::Month => 7,
224 }
225 }
226}
227
228// Raw durations are carried per-bucket so adjacent buckets can be compared
229// without a second round-trip to the daemon.
230#[derive(Debug, Serialize, Deserialize)]
231pub struct TrendBucket {
232 pub bucket: String,
233 pub build_durations: Vec<i64>,
234 pub subst_durations: Vec<i64>,
235 pub download_bytes: i64,
236}
237
238#[derive(Debug, Serialize, Deserialize)]
239pub struct Trend {
240 pub buckets: Vec<TrendBucket>,
241 pub bucket_size: BucketSize,
242 pub drv_filter: Option<String>,
243}
244
245pub fn collect_trend(
246 db: &Mutex<Connection>,
247 since: Option<i64>,
248 bucket: BucketSize,
249 drv: Option<String>,
250) -> Result<Trend> {
251 if let Some(ref d) = drv {
252 assert!(!d.is_empty(), "drv filter must not be empty");
253 }
254
255 let conn = db.lock().unwrap();
256
257 let since_str: Option<String> = since
258 .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0))
259 .map(|dt| dt.to_rfc3339());
260 let p = since_str.as_deref();
261 let drv_ref = drv.as_deref();
262 let fmt = bucket.strftime_fmt();
263
264 // Rows come out ordered by bucket then time, so grouping by sequential scan is valid.
265 // FileTransfer (101) has NULL drv_path and is intentionally excluded by the drv filter.
266 let mut stmt = conn.prepare(
267 "SELECT strftime(?3, start_time), event_type, duration_ms, total_bytes
268 FROM events
269 WHERE event_type IN (101, 105, 108)
270 AND (?1 IS NULL OR start_time >= ?1)
271 AND (?2 IS NULL OR drv_path LIKE '%' || ?2 || '%')
272 ORDER BY strftime(?3, start_time) ASC, start_time ASC",
273 ).context("Failed to prepare trend query")?;
274
275 let mut buckets: Vec<TrendBucket> = vec![];
276 for row in stmt.query_map(rusqlite::params![p, drv_ref, fmt], |r| {
277 Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, r.get::<_, i64>(3)?))
278 })?.filter_map(|r| r.ok()) {
279 let (b, etype, dur, bytes) = row;
280 if buckets.last().map(|x: &TrendBucket| x.bucket.as_str()) != Some(&b) {
281 buckets.push(TrendBucket { bucket: b, build_durations: vec![], subst_durations: vec![], download_bytes: 0 });
282 }
283 let last = buckets.last_mut().unwrap();
284 match etype {
285 105 => { assert!(dur >= 0); last.build_durations.push(dur); }
286 108 => { assert!(dur >= 0); last.subst_durations.push(dur); }
287 101 => { assert!(bytes >= 0); last.download_bytes += bytes; }
288 _ => {}
289 }
290 }
291
292 for i in 1..buckets.len() {
293 assert!(buckets[i].bucket > buckets[i - 1].bucket, "buckets must be strictly ascending");
294 }
295
296 Ok(Trend { buckets, bucket_size: bucket, drv_filter: drv })
297}
298
299pub fn display_trend(trend: &Trend) {
300 let has_downloads = trend.drv_filter.is_none()
301 && trend.buckets.iter().any(|b| b.download_bytes > 0);
302 let bw = trend.bucket_size.col_width();
303
304 if let Some(ref drv) = trend.drv_filter {
305 println!("filter: {}", drv);
306 }
307
308 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}", "period", "builds", "build med", "subst", "subst med");
309 if has_downloads { print!(" {:>8}", "dl (MB)"); }
310 println!();
311
312 if trend.buckets.is_empty() {
313 println!("(no data)");
314 return;
315 }
316
317 for b in &trend.buckets {
318 let mut bs = b.build_durations.clone(); bs.sort_unstable();
319 let mut ss = b.subst_durations.clone(); ss.sort_unstable();
320 let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 };
321 let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 };
322
323 print!("{:<bw$} {:>6} {:>10} {:>6} {:>10}",
324 b.bucket, b.build_durations.len(), fmt_ms(build_med),
325 b.subst_durations.len(), fmt_ms(subst_med));
326 if has_downloads { print!(" {:>8.1}", b.download_bytes as f64 / 1_048_576.0); }
327 println!();
328 }
329}
330
331fn print_test_section<F>(label: &str, buckets: &[TrendBucket], get_durs: F, bw: usize)
332where
333 F: Fn(&TrendBucket) -> &[i64],
334{
335 let any_data = buckets.iter().any(|b| !get_durs(b).is_empty());
336 if !any_data { return; }
337
338 println!("{}", label);
339 println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}", "period", "n", "median", "Δ", "p-value");
340
341 for (i, b) in buckets.iter().enumerate() {
342 let durs = get_durs(b);
343 let mut sorted = durs.to_vec(); sorted.sort_unstable();
344 let med = if sorted.is_empty() { 0.0 } else { median_sorted(&sorted) };
345
346 let (delta_str, p_str) = if i == 0 || get_durs(&buckets[i - 1]).is_empty() {
347 (String::new(), String::new())
348 } else {
349 let prev = get_durs(&buckets[i - 1]);
350 let mut prev_sorted = prev.to_vec(); prev_sorted.sort_unstable();
351 let prev_med = median_sorted(&prev_sorted);
352
353 let delta = if prev_med > 0.0 {
354 let pct = (med - prev_med) / prev_med * 100.0;
355 let sign = if pct >= 0.0 { "+" } else { "" };
356 format!("{}{:.0}%", sign, pct)
357 } else {
358 String::new()
359 };
360
361 let p = match mann_whitney_u(prev, durs) {
362 Some(r) => format!("{:.3}", r.p_value),
363 None => String::new(),
364 };
365
366 (delta, p)
367 };
368
369 println!("{:<bw$} {:>5} {:>10} {:>7} {:>8}",
370 b.bucket, durs.len(), fmt_ms(med as i64), delta_str, p_str);
371 }
372 println!();
373}
374
375pub fn display_trend_test(trend: &Trend) {
376 let bw = trend.bucket_size.col_width();
377
378 if let Some(ref drv) = trend.drv_filter {
379 println!("filter: {}", drv);
380 }
381
382 print_test_section("builds", &trend.buckets, |b| &b.build_durations, bw);
383 print_test_section("substitutions", &trend.buckets, |b| &b.subst_durations, bw);
384
385 println!("Mann-Whitney U (two-tailed). H0: adjacent periods have the same duration distribution.");
386}
387
388pub fn output_csv_trend(trend: &Trend) {
389 println!("period,build_count,build_median_ms,subst_count,subst_median_ms,download_bytes");
390 for b in &trend.buckets {
391 let mut bs = b.build_durations.clone(); bs.sort_unstable();
392 let mut ss = b.subst_durations.clone(); ss.sort_unstable();
393 let build_med = if bs.is_empty() { 0 } else { median_sorted(&bs) as i64 };
394 let subst_med = if ss.is_empty() { 0 } else { median_sorted(&ss) as i64 };
395 assert!(build_med >= 0);
396 assert!(subst_med >= 0);
397 println!("{},{},{},{},{},{}", b.bucket, b.build_durations.len(), build_med, b.subst_durations.len(), subst_med, b.download_bytes);
398 }
399}
400
401pub fn display_stats(stats: Stats) {
402 let build_avg = if stats.build_count > 0 { stats.build_total_ms / stats.build_count } else { 0 };
403 let subst_avg = if stats.subst_count > 0 { stats.subst_total_ms / stats.subst_count } else { 0 };
404 let mb = stats.download_bytes as f64 / 1_048_576.0;
405 let dl_speed = if stats.download_ms > 0 { mb / (stats.download_ms as f64 / 1000.0) } else { 0.0 };
406
407 println!("{:<14} {:>6} total {:>9} avg {:>9}", "built", stats.build_count, fmt_ms(stats.build_total_ms), fmt_ms(build_avg));
408 println!("{:<14} {:>6} total {:>9} avg {:>9}", "substituted", stats.subst_count, fmt_ms(stats.subst_total_ms), fmt_ms(subst_avg));
409 println!("{:<14} {:>8.1} MB avg {:>6.1} MB/s", "downloaded", mb, dl_speed);
410
411 if !stats.slowest_builds.is_empty() {
412 println!();
413 for row in &stats.slowest_builds {
414 let path = row.drv_path.as_deref().or(row.text.as_deref()).unwrap_or("?");
415 println!("{:>9} {}", fmt_ms(row.duration_ms), drv_name(path));
416 }
417 }
418
419 if !stats.cache_latency.is_empty() {
420 let url_w = stats.cache_latency.iter().map(|r| r.cache_url.len()).max().unwrap_or(0);
421 println!();
422 for row in &stats.cache_latency {
423 println!("{:<width$} avg {:>7} {:>6} queries", row.cache_url, fmt_ms(row.avg_ms as i64), row.count, width = url_w);
424 }
425 }
426}