Nix Observability Daemon
observability
nix
1mod daemon;
2mod stats;
3
4use anyhow::{Context, Result};
5use chrono::Utc;
6use clap::{Parser, Subcommand};
7use directories::ProjectDirs;
8use std::path::PathBuf;
9use std::sync::{Arc, Mutex};
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::net::UnixStream;
12use tracing::{error, info};
13
14use daemon::{open_db, run_daemon, DbConnections};
15use stats::{display_stats, display_trend, display_trend_test, output_csv_trend, BucketSize, Stats, Trend};
16
17#[derive(Clone, clap::ValueEnum)]
18enum BucketArg {
19 Hour,
20 Day,
21 Week,
22 Month,
23}
24
25#[derive(Parser)]
26struct Cli {
27 #[command(subcommand)]
28 command: Option<Commands>,
29}
30
31#[derive(Subcommand)]
32enum Commands {
33 /// Run the observability daemon
34 Daemon {
35 /// Path to the SQLite database file
36 #[arg(short, long, env = "NOD_DB")]
37 db: Option<PathBuf>,
38 /// Path to the Unix socket
39 #[arg(short, long, env = "NOD_SOCKET")]
40 socket: Option<PathBuf>,
41 },
42 /// Show statistics from the daemon
43 Stats {
44 /// Path to the Unix socket
45 #[arg(short, long, env = "NOD_SOCKET")]
46 socket: Option<PathBuf>,
47 /// Limit to last N days
48 #[arg(short = 'd', long)]
49 days: Option<u32>,
50 /// Limit to last N months
51 #[arg(short = 'm', long)]
52 months: Option<u32>,
53 /// Limit to last N years
54 #[arg(short = 'y', long)]
55 years: Option<u32>,
56 },
57 /// Show how builds, substitutions, and downloads trend over time
58 Trend {
59 /// Path to the Unix socket
60 #[arg(short, long, env = "NOD_SOCKET")]
61 socket: Option<PathBuf>,
62 /// Limit to last N days
63 #[arg(short = 'd', long)]
64 days: Option<u32>,
65 /// Limit to last N months
66 #[arg(short = 'm', long)]
67 months: Option<u32>,
68 /// Limit to last N years
69 #[arg(short = 'y', long)]
70 years: Option<u32>,
71 /// Time bucket granularity (auto-detected from period if omitted)
72 #[arg(short = 'b', long, value_enum)]
73 bucket: Option<BucketArg>,
74 /// Filter builds and substitutions to derivations matching this substring
75 #[arg(long)]
76 drv: Option<String>,
77 /// Run Mann-Whitney U test between adjacent periods instead of showing the plain table
78 #[arg(long)]
79 test: bool,
80 /// Output as CSV
81 #[arg(long)]
82 csv: bool,
83 },
84 /// Clear all data from the database
85 Clean {
86 /// Path to the Unix socket
87 #[arg(short, long, env = "NOD_SOCKET")]
88 socket: Option<PathBuf>,
89 },
90}
91
92#[tokio::main]
93async fn main() -> Result<()> {
94 tracing_subscriber::fmt::init();
95 let cli = Cli::parse();
96
97 let project_dirs = ProjectDirs::from("org", "nixos", "nod");
98
99 let command = cli.command.unwrap_or(Commands::Stats { socket: None, days: None, months: None, years: None });
100
101 match command {
102 Commands::Daemon { db, socket } => {
103 let db_path = db.unwrap_or_else(|| {
104 project_dirs.as_ref()
105 .map(|d| d.data_dir().join("nod.db"))
106 .unwrap_or_else(|| PathBuf::from("nod.db"))
107 });
108
109 let socket_path = socket.unwrap_or_else(|| {
110 project_dirs.as_ref()
111 .and_then(|d| d.runtime_dir())
112 .map(|d| d.join("nod.sock"))
113 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock"))
114 });
115
116 if let Some(parent) = db_path.parent() {
117 if !parent.as_os_str().is_empty() {
118 std::fs::create_dir_all(parent)
119 .with_context(|| format!("Failed to create database directory: {}", parent.display()))?;
120 }
121 }
122
123 if let Some(parent) = socket_path.parent() {
124 if !parent.as_os_str().is_empty() {
125 std::fs::create_dir_all(parent)
126 .with_context(|| format!("Failed to create socket directory: {}", parent.display()))?;
127 }
128 }
129
130 // Check if a daemon is already listening before we remove and rebind the socket.
131 // A successful connect means another process owns it — refuse to start.
132 if UnixStream::connect(&socket_path).await.is_ok() {
133 anyhow::bail!("Daemon already running at {}", socket_path.display());
134 }
135
136 let writer = open_db(&db_path)?;
137 let reader = open_db(&db_path)?;
138 let db = Arc::new(DbConnections {
139 writer: Mutex::new(writer),
140 reader: Mutex::new(reader),
141 });
142 run_daemon(socket_path, db).await.context("Daemon failed")?
143 }
144 Commands::Stats { socket, days, months, years } => {
145 let socket_path = socket.unwrap_or_else(|| {
146 project_dirs.as_ref()
147 .and_then(|d| d.runtime_dir())
148 .map(|d| d.join("nod.sock"))
149 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock"))
150 });
151
152 let since: Option<i64> = if days.is_some() || months.is_some() || years.is_some() {
153 let mut t = Utc::now();
154 if let Some(y) = years { t = t - chrono::Months::new(y * 12); }
155 if let Some(m) = months { t = t - chrono::Months::new(m); }
156 if let Some(d) = days { t = t - chrono::Duration::days(d as i64); }
157 Some(t.timestamp())
158 } else {
159 None
160 };
161
162 let mut stream = UnixStream::connect(&socket_path)
163 .await
164 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?;
165
166 let cmd = serde_json::json!({"action": "get_stats", "since": since});
167 stream.write_all((cmd.to_string() + "\n").as_bytes()).await?;
168
169 let mut reader = BufReader::new(stream);
170 let mut line = String::new();
171 reader.read_line(&mut line).await.context("Daemon closed connection without response")?;
172 let stats: Stats = serde_json::from_str(&line).context("Invalid response from daemon")?;
173 display_stats(stats);
174 }
175 Commands::Trend { socket, days, months, years, bucket, drv, test, csv } => {
176 let socket_path = socket.unwrap_or_else(|| {
177 project_dirs.as_ref()
178 .and_then(|d| d.runtime_dir())
179 .map(|d| d.join("nod.sock"))
180 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock"))
181 });
182
183 let since: Option<i64> = if days.is_some() || months.is_some() || years.is_some() {
184 let mut t = Utc::now();
185 if let Some(y) = years { t = t - chrono::Months::new(y * 12); }
186 if let Some(m) = months { t = t - chrono::Months::new(m); }
187 if let Some(d) = days { t = t - chrono::Duration::days(d as i64); }
188 Some(t.timestamp())
189 } else {
190 None
191 };
192
193 // Auto-detect bucket granularity from the period length when not specified.
194 let bucket_size = match bucket {
195 Some(BucketArg::Hour) => BucketSize::Hour,
196 Some(BucketArg::Day) => BucketSize::Day,
197 Some(BucketArg::Week) => BucketSize::Week,
198 Some(BucketArg::Month) => BucketSize::Month,
199 None => match since {
200 None => BucketSize::Month,
201 Some(ts) => {
202 let days_span = (Utc::now().timestamp() - ts) / 86400;
203 if days_span <= 2 { BucketSize::Hour }
204 else if days_span <= 60 { BucketSize::Day }
205 else if days_span <= 365 { BucketSize::Week }
206 else { BucketSize::Month }
207 }
208 },
209 };
210
211 let mut stream = UnixStream::connect(&socket_path)
212 .await
213 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?;
214
215 let cmd = serde_json::json!({"action": "get_trend", "since": since, "bucket": bucket_size, "drv": drv});
216 stream.write_all((cmd.to_string() + "\n").as_bytes()).await?;
217
218 let mut reader = BufReader::new(stream);
219 let mut line = String::new();
220 reader.read_line(&mut line).await.context("Daemon closed connection without response")?;
221 let trend: Trend = serde_json::from_str(&line).context("Invalid response from daemon")?;
222 if csv { output_csv_trend(&trend); } else if test { display_trend_test(&trend); } else { display_trend(&trend); }
223 }
224 Commands::Clean { socket } => {
225 let socket_path = socket.unwrap_or_else(|| {
226 project_dirs.as_ref()
227 .and_then(|d| d.runtime_dir())
228 .map(|d| d.join("nod.sock"))
229 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock"))
230 });
231
232 let mut stream = UnixStream::connect(&socket_path)
233 .await
234 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?;
235
236 stream.write_all(b"{\"action\":\"clean\"}\n").await?;
237
238 let mut reader = BufReader::new(stream);
239 let mut line = String::new();
240 reader.read_line(&mut line).await.context("Daemon closed connection")?;
241
242 if line.trim() == "ok" {
243 info!("Database cleared successfully.");
244 } else {
245 error!("Daemon failed to clear database.");
246 }
247 }
248 }
249
250 Ok(())
251}