mod daemon; mod stats; use anyhow::{Context, Result}; use chrono::Utc; use clap::{Parser, Subcommand}; use directories::ProjectDirs; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; use tracing::{error, info}; use daemon::{open_db, run_daemon, DbConnections}; use stats::{display_stats, display_trend, display_trend_test, output_csv_trend, BucketSize, Stats, Trend}; #[derive(Clone, clap::ValueEnum)] enum BucketArg { Hour, Day, Week, Month, } #[derive(Parser)] struct Cli { #[command(subcommand)] command: Option, } #[derive(Subcommand)] enum Commands { /// Run the observability daemon Daemon { /// Path to the SQLite database file #[arg(short, long, env = "NOD_DB")] db: Option, /// Path to the Unix socket #[arg(short, long, env = "NOD_SOCKET")] socket: Option, }, /// Show statistics from the daemon Stats { /// Path to the Unix socket #[arg(short, long, env = "NOD_SOCKET")] socket: Option, /// Limit to last N days #[arg(short = 'd', long)] days: Option, /// Limit to last N months #[arg(short = 'm', long)] months: Option, /// Limit to last N years #[arg(short = 'y', long)] years: Option, }, /// Show how builds, substitutions, and downloads trend over time Trend { /// Path to the Unix socket #[arg(short, long, env = "NOD_SOCKET")] socket: Option, /// Limit to last N days #[arg(short = 'd', long)] days: Option, /// Limit to last N months #[arg(short = 'm', long)] months: Option, /// Limit to last N years #[arg(short = 'y', long)] years: Option, /// Time bucket granularity (auto-detected from period if omitted) #[arg(short = 'b', long, value_enum)] bucket: Option, /// Filter builds and substitutions to derivations matching this substring #[arg(long)] drv: Option, /// Run Mann-Whitney U test between adjacent periods instead of showing the plain table #[arg(long)] test: bool, /// Output as CSV #[arg(long)] csv: bool, }, /// Clear all data from the database Clean { /// Path to the Unix socket #[arg(short, long, env = "NOD_SOCKET")] socket: Option, }, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt::init(); let cli = Cli::parse(); let project_dirs = ProjectDirs::from("org", "nixos", "nod"); let command = cli.command.unwrap_or(Commands::Stats { socket: None, days: None, months: None, years: None }); match command { Commands::Daemon { db, socket } => { let db_path = db.unwrap_or_else(|| { project_dirs.as_ref() .map(|d| d.data_dir().join("nod.db")) .unwrap_or_else(|| PathBuf::from("nod.db")) }); let socket_path = socket.unwrap_or_else(|| { project_dirs.as_ref() .and_then(|d| d.runtime_dir()) .map(|d| d.join("nod.sock")) .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) }); if let Some(parent) = db_path.parent() { if !parent.as_os_str().is_empty() { std::fs::create_dir_all(parent) .with_context(|| format!("Failed to create database directory: {}", parent.display()))?; } } if let Some(parent) = socket_path.parent() { if !parent.as_os_str().is_empty() { std::fs::create_dir_all(parent) .with_context(|| format!("Failed to create socket directory: {}", parent.display()))?; } } // Check if a daemon is already listening before we remove and rebind the socket. // A successful connect means another process owns it — refuse to start. if UnixStream::connect(&socket_path).await.is_ok() { anyhow::bail!("Daemon already running at {}", socket_path.display()); } let writer = open_db(&db_path)?; let reader = open_db(&db_path)?; let db = Arc::new(DbConnections { writer: Mutex::new(writer), reader: Mutex::new(reader), }); run_daemon(socket_path, db).await.context("Daemon failed")? } Commands::Stats { socket, days, months, years } => { let socket_path = socket.unwrap_or_else(|| { project_dirs.as_ref() .and_then(|d| d.runtime_dir()) .map(|d| d.join("nod.sock")) .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) }); let since: Option = if days.is_some() || months.is_some() || years.is_some() { let mut t = Utc::now(); if let Some(y) = years { t = t - chrono::Months::new(y * 12); } if let Some(m) = months { t = t - chrono::Months::new(m); } if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } Some(t.timestamp()) } else { None }; let mut stream = UnixStream::connect(&socket_path) .await .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; let cmd = serde_json::json!({"action": "get_stats", "since": since}); stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; let mut reader = BufReader::new(stream); let mut line = String::new(); reader.read_line(&mut line).await.context("Daemon closed connection without response")?; let stats: Stats = serde_json::from_str(&line).context("Invalid response from daemon")?; display_stats(stats); } Commands::Trend { socket, days, months, years, bucket, drv, test, csv } => { let socket_path = socket.unwrap_or_else(|| { project_dirs.as_ref() .and_then(|d| d.runtime_dir()) .map(|d| d.join("nod.sock")) .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) }); let since: Option = if days.is_some() || months.is_some() || years.is_some() { let mut t = Utc::now(); if let Some(y) = years { t = t - chrono::Months::new(y * 12); } if let Some(m) = months { t = t - chrono::Months::new(m); } if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } Some(t.timestamp()) } else { None }; // Auto-detect bucket granularity from the period length when not specified. let bucket_size = match bucket { Some(BucketArg::Hour) => BucketSize::Hour, Some(BucketArg::Day) => BucketSize::Day, Some(BucketArg::Week) => BucketSize::Week, Some(BucketArg::Month) => BucketSize::Month, None => match since { None => BucketSize::Month, Some(ts) => { let days_span = (Utc::now().timestamp() - ts) / 86400; if days_span <= 2 { BucketSize::Hour } else if days_span <= 60 { BucketSize::Day } else if days_span <= 365 { BucketSize::Week } else { BucketSize::Month } } }, }; let mut stream = UnixStream::connect(&socket_path) .await .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; let cmd = serde_json::json!({"action": "get_trend", "since": since, "bucket": bucket_size, "drv": drv}); stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; let mut reader = BufReader::new(stream); let mut line = String::new(); reader.read_line(&mut line).await.context("Daemon closed connection without response")?; let trend: Trend = serde_json::from_str(&line).context("Invalid response from daemon")?; if csv { output_csv_trend(&trend); } else if test { display_trend_test(&trend); } else { display_trend(&trend); } } Commands::Clean { socket } => { let socket_path = socket.unwrap_or_else(|| { project_dirs.as_ref() .and_then(|d| d.runtime_dir()) .map(|d| d.join("nod.sock")) .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) }); let mut stream = UnixStream::connect(&socket_path) .await .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; stream.write_all(b"{\"action\":\"clean\"}\n").await?; let mut reader = BufReader::new(stream); let mut line = String::new(); reader.read_line(&mut line).await.context("Daemon closed connection")?; if line.trim() == "ok" { info!("Database cleared successfully."); } else { error!("Daemon failed to clear database."); } } } Ok(()) }