use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use rusqlite::Connection; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; use tracing::{error, info}; use crate::stats::{collect_stats, collect_trend, BucketSize}; const fn schema_hash(s: &[u8]) -> u32 { let mut h: u32 = 2166136261; let mut i = 0; while i < s.len() { h ^= s[i] as u32; h = h.wrapping_mul(16777619); i += 1; } h } const SCHEMA: &str = " CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, nix_id INTEGER, parent_id INTEGER, event_type INTEGER, text TEXT, drv_path TEXT, cache_url TEXT, start_time TEXT, end_time TEXT, duration_ms INTEGER, total_bytes INTEGER ); "; // Append a new hash entry when the schema changes - SCHEMA_VERSION auto-increments. const SCHEMA_HASHES: &[u32] = &[ 0x9bc94a70, // v1 ]; const SCHEMA_VERSION: u32 = SCHEMA_HASHES.len() as u32; const _: () = assert!( schema_hash(SCHEMA.as_bytes()) == SCHEMA_HASHES[SCHEMA_VERSION as usize - 1], "schema changed - append new hash to SCHEMA_HASHES" ); #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[repr(u64)] pub enum ActivityType { Unknown = 0, CopyPath = 100, FileTransfer = 101, // Actual HTTP download Realise = 102, CopyPaths = 103, Builds = 104, Build = 105, // Individual derivation build OptimiseStore = 106, VerifyPaths = 107, Substitute = 108, // High-level cache substitution QueryPathInfo = 109, // Checking if path exists on cache PostBuildHook = 110, BuildWaiting = 111, FetchTree = 112, } impl From for ActivityType { fn from(n: u64) -> Self { match n { 100 => Self::CopyPath, 101 => Self::FileTransfer, 102 => Self::Realise, 103 => Self::CopyPaths, 104 => Self::Builds, 105 => Self::Build, 106 => Self::OptimiseStore, 107 => Self::VerifyPaths, 108 => Self::Substitute, 109 => Self::QueryPathInfo, 110 => Self::PostBuildHook, 111 => Self::BuildWaiting, 112 => Self::FetchTree, _ => Self::Unknown, } } } impl fmt::Display for ActivityType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum ResultType { Unknown, FileLinked, BuildLogLine, UntrustedPath, CorruptedPath, SetPhase, Progress, SetExpected, PostBuildLogLine, FetchStatus, } impl From for ResultType { fn from(n: u64) -> Self { match n { 100 => Self::FileLinked, 101 => Self::BuildLogLine, 102 => Self::UntrustedPath, 103 => Self::CorruptedPath, 104 => Self::SetPhase, 105 => Self::Progress, 106 => Self::SetExpected, 107 => Self::PostBuildLogLine, 108 => Self::FetchStatus, _ => Self::Unknown, } } } impl fmt::Display for ResultType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) } } #[derive(Debug, Deserialize)] #[serde(tag = "action", rename_all = "snake_case")] enum NixEvent { Start { id: u64, #[serde(rename = "type", default)] event_type: u64, #[serde(default)] text: String, #[serde(default)] fields: Vec, #[serde(default)] parent: u64, }, Stop { id: u64, }, Result { id: u64, #[serde(rename = "type", default)] event_type: u64, #[serde(default)] fields: Vec, }, } #[derive(Debug, Deserialize)] #[serde(tag = "action", rename_all = "snake_case")] enum ClientCommand { GetStats { since: Option }, GetTrend { since: Option, bucket: BucketSize, drv: Option }, Clean, } // Untagged outer enum: serde tries ClientCommand first, then NixEvent. // The action values never overlap so routing is unambiguous. #[derive(Debug, Deserialize)] #[serde(untagged)] enum SocketMessage { Command(ClientCommand), Event(NixEvent), } struct Activity { id: u64, parent_id: u64, event_type: u64, text: String, start_time: DateTime, fields: Vec, total_bytes: u64, } struct State { active_activities: HashMap, } pub struct DbConnections { pub writer: Mutex, pub reader: Mutex, } pub fn open_db(path: &PathBuf) -> Result { let conn = Connection::open(path) .with_context(|| format!("Failed to open database at {}", path.display()))?; // Check schema version; reset if mismatched. let version: u32 = conn .query_row("PRAGMA user_version", [], |r| r.get(0)) .unwrap_or(0); if version != 0 && version != SCHEMA_VERSION { info!(current = version, expected = SCHEMA_VERSION, "Schema version mismatch, resetting database"); drop(conn); std::fs::remove_file(path) .with_context(|| format!("Failed to remove stale database at {}", path.display()))?; return open_db(path); } conn.execute_batch(SCHEMA) .context("Failed to create schema")?; conn.execute_batch(&format!("PRAGMA user_version = {}", SCHEMA_VERSION)) .context("Failed to set schema version")?; conn.execute_batch(" PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL; PRAGMA temp_store = MEMORY; PRAGMA mmap_size = 134217728; PRAGMA cache_size = -8000; PRAGMA wal_autocheckpoint = 0; ").context("Failed to configure database")?; Ok(conn) } pub async fn run_daemon(socket_path: PathBuf, db: Arc) -> Result<()> { let state = Arc::new(Mutex::new(State { active_activities: HashMap::new(), })); if socket_path.exists() { std::fs::remove_file(&socket_path) .with_context(|| format!("Failed to remove existing socket at {}", socket_path.display()))?; } let listener = UnixListener::bind(&socket_path) .with_context(|| format!("Failed to bind to socket at {}", socket_path.display()))?; info!("Daemon listening on {}", socket_path.display()); loop { let (stream, _) = listener.accept().await?; let state = Arc::clone(&state); let db = Arc::clone(&db); tokio::spawn(async move { if let Err(e) = handle_connection(stream, state, db).await { error!("Connection error: {}", e); } }); } } async fn handle_connection(mut stream: UnixStream, state: Arc>, db: Arc) -> Result<()> { let (reader, mut writer) = stream.split(); let mut reader = BufReader::new(reader); let mut line = String::new(); loop { line.clear(); if reader.read_line(&mut line).await? == 0 { break; } match serde_json::from_str::(line.trim()) { Ok(SocketMessage::Command(ClientCommand::GetStats { since })) => { let db = Arc::clone(&db); let stats = tokio::task::spawn_blocking(move || collect_stats(&db.reader, since)) .await??; writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; break; } Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv })) => { let db = Arc::clone(&db); let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv)) .await??; writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?; break; } Ok(SocketMessage::Command(ClientCommand::Clean)) => { let db = Arc::clone(&db); tokio::task::spawn_blocking(move || -> Result<()> { let conn = db.writer.lock().unwrap(); conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; Ok(()) }).await??; writer.write_all(b"ok\n").await?; info!("Database cleared via socket command"); break; } Ok(SocketMessage::Event(event)) => { if let Err(e) = process_event(event, &state, &db) { error!("Failed to process event: {}", e); } } Err(e) => error!("Invalid message: {}", e), } } Ok(()) } fn process_event(event: NixEvent, state: &Arc>, db: &Arc) -> Result<()> { let mut s = state.lock().unwrap(); match event { NixEvent::Start { id, event_type, text, fields, parent } => { let act_type = ActivityType::from(event_type); let text = if !text.is_empty() { text } else { fields.get(0).and_then(|v| v.as_str()).unwrap_or("").to_string() }; info!(id, parent, act_type = %act_type, text = %text, fields = ?fields, "start"); s.active_activities.insert(id, Activity { id, parent_id: parent, event_type, text, start_time: Utc::now(), fields, total_bytes: 0, }); } NixEvent::Result { id, event_type, fields } => { let res_type = ResultType::from(event_type); if res_type != ResultType::BuildLogLine && res_type != ResultType::Progress && res_type != ResultType::SetExpected { info!(id, res_type = %res_type, fields = ?fields, "result"); } if let Some(act) = s.active_activities.get_mut(&id) { if res_type == ResultType::Progress { if let Some(total) = fields.get(1).and_then(|v| v.as_u64()) { if total > 0 { act.total_bytes = total; } } } } } NixEvent::Stop { id } => { if let Some(act) = s.active_activities.remove(&id) { let act_type = ActivityType::from(act.event_type); let end_time = Utc::now(); let duration_ms = end_time.signed_duration_since(act.start_time).num_milliseconds(); info!( id = act.id, act_type = %act_type, duration_ms, total_bytes = act.total_bytes, text = %act.text, fields = ?act.fields, "stop" ); let drv_path = act.fields.get(0).and_then(|v| v.as_str()).map(|s| s.to_string()); // Both Substitute (fields[1] = substituter URL) and QueryPathInfo (fields[1] = cache URL) // have the same layout. Store cache_url for both. let cache_url = match act_type { ActivityType::Substitute | ActivityType::QueryPathInfo => { act.fields.get(1).and_then(|v| v.as_str()).map(|s| s.to_string()) } _ => None, }; drop(s); db.writer.lock().unwrap().execute( "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", rusqlite::params![ act.id as i64, act.parent_id as i64, act.event_type as i64, act.text, drv_path, cache_url, act.start_time.to_rfc3339(), end_time.to_rfc3339(), duration_ms, act.total_bytes as i64, ], ).context("Failed to insert event")?; } } } Ok(()) }