Nix Observability Daemon
observability nix
at master 251 lines 9.8 kB view raw
1mod daemon; 2mod stats; 3 4use anyhow::{Context, Result}; 5use chrono::Utc; 6use clap::{Parser, Subcommand}; 7use directories::ProjectDirs; 8use std::path::PathBuf; 9use std::sync::{Arc, Mutex}; 10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 11use tokio::net::UnixStream; 12use tracing::{error, info}; 13 14use daemon::{open_db, run_daemon, DbConnections}; 15use stats::{display_stats, display_trend, display_trend_test, output_csv_trend, BucketSize, Stats, Trend}; 16 17#[derive(Clone, clap::ValueEnum)] 18enum BucketArg { 19 Hour, 20 Day, 21 Week, 22 Month, 23} 24 25#[derive(Parser)] 26struct Cli { 27 #[command(subcommand)] 28 command: Option<Commands>, 29} 30 31#[derive(Subcommand)] 32enum Commands { 33 /// Run the observability daemon 34 Daemon { 35 /// Path to the SQLite database file 36 #[arg(short, long, env = "NOD_DB")] 37 db: Option<PathBuf>, 38 /// Path to the Unix socket 39 #[arg(short, long, env = "NOD_SOCKET")] 40 socket: Option<PathBuf>, 41 }, 42 /// Show statistics from the daemon 43 Stats { 44 /// Path to the Unix socket 45 #[arg(short, long, env = "NOD_SOCKET")] 46 socket: Option<PathBuf>, 47 /// Limit to last N days 48 #[arg(short = 'd', long)] 49 days: Option<u32>, 50 /// Limit to last N months 51 #[arg(short = 'm', long)] 52 months: Option<u32>, 53 /// Limit to last N years 54 #[arg(short = 'y', long)] 55 years: Option<u32>, 56 }, 57 /// Show how builds, substitutions, and downloads trend over time 58 Trend { 59 /// Path to the Unix socket 60 #[arg(short, long, env = "NOD_SOCKET")] 61 socket: Option<PathBuf>, 62 /// Limit to last N days 63 #[arg(short = 'd', long)] 64 days: Option<u32>, 65 /// Limit to last N months 66 #[arg(short = 'm', long)] 67 months: Option<u32>, 68 /// Limit to last N years 69 #[arg(short = 'y', long)] 70 years: Option<u32>, 71 /// Time bucket granularity (auto-detected from period if omitted) 72 #[arg(short = 'b', long, value_enum)] 73 bucket: Option<BucketArg>, 74 /// Filter builds and substitutions to derivations matching this substring 75 #[arg(long)] 76 drv: Option<String>, 77 /// Run Mann-Whitney U test between adjacent periods instead of showing the plain table 78 #[arg(long)] 79 test: bool, 80 /// Output as CSV 81 #[arg(long)] 82 csv: bool, 83 }, 84 /// Clear all data from the database 85 Clean { 86 /// Path to the Unix socket 87 #[arg(short, long, env = "NOD_SOCKET")] 88 socket: Option<PathBuf>, 89 }, 90} 91 92#[tokio::main] 93async fn main() -> Result<()> { 94 tracing_subscriber::fmt::init(); 95 let cli = Cli::parse(); 96 97 let project_dirs = ProjectDirs::from("org", "nixos", "nod"); 98 99 let command = cli.command.unwrap_or(Commands::Stats { socket: None, days: None, months: None, years: None }); 100 101 match command { 102 Commands::Daemon { db, socket } => { 103 let db_path = db.unwrap_or_else(|| { 104 project_dirs.as_ref() 105 .map(|d| d.data_dir().join("nod.db")) 106 .unwrap_or_else(|| PathBuf::from("nod.db")) 107 }); 108 109 let socket_path = socket.unwrap_or_else(|| { 110 project_dirs.as_ref() 111 .and_then(|d| d.runtime_dir()) 112 .map(|d| d.join("nod.sock")) 113 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 114 }); 115 116 if let Some(parent) = db_path.parent() { 117 if !parent.as_os_str().is_empty() { 118 std::fs::create_dir_all(parent) 119 .with_context(|| format!("Failed to create database directory: {}", parent.display()))?; 120 } 121 } 122 123 if let Some(parent) = socket_path.parent() { 124 if !parent.as_os_str().is_empty() { 125 std::fs::create_dir_all(parent) 126 .with_context(|| format!("Failed to create socket directory: {}", parent.display()))?; 127 } 128 } 129 130 // Check if a daemon is already listening before we remove and rebind the socket. 131 // A successful connect means another process owns it — refuse to start. 132 if UnixStream::connect(&socket_path).await.is_ok() { 133 anyhow::bail!("Daemon already running at {}", socket_path.display()); 134 } 135 136 let writer = open_db(&db_path)?; 137 let reader = open_db(&db_path)?; 138 let db = Arc::new(DbConnections { 139 writer: Mutex::new(writer), 140 reader: Mutex::new(reader), 141 }); 142 run_daemon(socket_path, db).await.context("Daemon failed")? 143 } 144 Commands::Stats { socket, days, months, years } => { 145 let socket_path = socket.unwrap_or_else(|| { 146 project_dirs.as_ref() 147 .and_then(|d| d.runtime_dir()) 148 .map(|d| d.join("nod.sock")) 149 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 150 }); 151 152 let since: Option<i64> = if days.is_some() || months.is_some() || years.is_some() { 153 let mut t = Utc::now(); 154 if let Some(y) = years { t = t - chrono::Months::new(y * 12); } 155 if let Some(m) = months { t = t - chrono::Months::new(m); } 156 if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } 157 Some(t.timestamp()) 158 } else { 159 None 160 }; 161 162 let mut stream = UnixStream::connect(&socket_path) 163 .await 164 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 165 166 let cmd = serde_json::json!({"action": "get_stats", "since": since}); 167 stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 168 169 let mut reader = BufReader::new(stream); 170 let mut line = String::new(); 171 reader.read_line(&mut line).await.context("Daemon closed connection without response")?; 172 let stats: Stats = serde_json::from_str(&line).context("Invalid response from daemon")?; 173 display_stats(stats); 174 } 175 Commands::Trend { socket, days, months, years, bucket, drv, test, csv } => { 176 let socket_path = socket.unwrap_or_else(|| { 177 project_dirs.as_ref() 178 .and_then(|d| d.runtime_dir()) 179 .map(|d| d.join("nod.sock")) 180 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 181 }); 182 183 let since: Option<i64> = if days.is_some() || months.is_some() || years.is_some() { 184 let mut t = Utc::now(); 185 if let Some(y) = years { t = t - chrono::Months::new(y * 12); } 186 if let Some(m) = months { t = t - chrono::Months::new(m); } 187 if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } 188 Some(t.timestamp()) 189 } else { 190 None 191 }; 192 193 // Auto-detect bucket granularity from the period length when not specified. 194 let bucket_size = match bucket { 195 Some(BucketArg::Hour) => BucketSize::Hour, 196 Some(BucketArg::Day) => BucketSize::Day, 197 Some(BucketArg::Week) => BucketSize::Week, 198 Some(BucketArg::Month) => BucketSize::Month, 199 None => match since { 200 None => BucketSize::Month, 201 Some(ts) => { 202 let days_span = (Utc::now().timestamp() - ts) / 86400; 203 if days_span <= 2 { BucketSize::Hour } 204 else if days_span <= 60 { BucketSize::Day } 205 else if days_span <= 365 { BucketSize::Week } 206 else { BucketSize::Month } 207 } 208 }, 209 }; 210 211 let mut stream = UnixStream::connect(&socket_path) 212 .await 213 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 214 215 let cmd = serde_json::json!({"action": "get_trend", "since": since, "bucket": bucket_size, "drv": drv}); 216 stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 217 218 let mut reader = BufReader::new(stream); 219 let mut line = String::new(); 220 reader.read_line(&mut line).await.context("Daemon closed connection without response")?; 221 let trend: Trend = serde_json::from_str(&line).context("Invalid response from daemon")?; 222 if csv { output_csv_trend(&trend); } else if test { display_trend_test(&trend); } else { display_trend(&trend); } 223 } 224 Commands::Clean { socket } => { 225 let socket_path = socket.unwrap_or_else(|| { 226 project_dirs.as_ref() 227 .and_then(|d| d.runtime_dir()) 228 .map(|d| d.join("nod.sock")) 229 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 230 }); 231 232 let mut stream = UnixStream::connect(&socket_path) 233 .await 234 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 235 236 stream.write_all(b"{\"action\":\"clean\"}\n").await?; 237 238 let mut reader = BufReader::new(stream); 239 let mut line = String::new(); 240 reader.read_line(&mut line).await.context("Daemon closed connection")?; 241 242 if line.trim() == "ok" { 243 info!("Database cleared successfully."); 244 } else { 245 error!("Daemon failed to clear database."); 246 } 247 } 248 } 249 250 Ok(()) 251}