Nix Observability Daemon
observability nix
at master 374 lines 12 kB view raw
1use anyhow::{Context, Result}; 2use chrono::{DateTime, Utc}; 3use rusqlite::Connection; 4use serde::{Deserialize, Serialize}; 5use std::collections::HashMap; 6use std::fmt; 7use std::path::PathBuf; 8use std::sync::{Arc, Mutex}; 9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 10use tokio::net::{UnixListener, UnixStream}; 11use tracing::{error, info}; 12 13use crate::stats::{collect_stats, collect_trend, BucketSize}; 14 15const fn schema_hash(s: &[u8]) -> u32 { 16 let mut h: u32 = 2166136261; 17 let mut i = 0; 18 while i < s.len() { h ^= s[i] as u32; h = h.wrapping_mul(16777619); i += 1; } 19 h 20} 21 22const SCHEMA: &str = " 23CREATE TABLE IF NOT EXISTS events ( 24 id INTEGER PRIMARY KEY AUTOINCREMENT, 25 nix_id INTEGER, 26 parent_id INTEGER, 27 event_type INTEGER, 28 text TEXT, 29 drv_path TEXT, 30 cache_url TEXT, 31 start_time TEXT, 32 end_time TEXT, 33 duration_ms INTEGER, 34 total_bytes INTEGER 35); 36"; 37 38// Append a new hash entry when the schema changes - SCHEMA_VERSION auto-increments. 39const SCHEMA_HASHES: &[u32] = &[ 40 0x9bc94a70, // v1 41]; 42const SCHEMA_VERSION: u32 = SCHEMA_HASHES.len() as u32; 43const _: () = assert!( 44 schema_hash(SCHEMA.as_bytes()) == SCHEMA_HASHES[SCHEMA_VERSION as usize - 1], 45 "schema changed - append new hash to SCHEMA_HASHES" 46); 47 48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 49#[repr(u64)] 50pub enum ActivityType { 51 Unknown = 0, 52 CopyPath = 100, 53 FileTransfer = 101, // Actual HTTP download 54 Realise = 102, 55 CopyPaths = 103, 56 Builds = 104, 57 Build = 105, // Individual derivation build 58 OptimiseStore = 106, 59 VerifyPaths = 107, 60 Substitute = 108, // High-level cache substitution 61 QueryPathInfo = 109, // Checking if path exists on cache 62 PostBuildHook = 110, 63 BuildWaiting = 111, 64 FetchTree = 112, 65} 66 67impl From<u64> for ActivityType { 68 fn from(n: u64) -> Self { 69 match n { 70 100 => Self::CopyPath, 71 101 => Self::FileTransfer, 72 102 => Self::Realise, 73 103 => Self::CopyPaths, 74 104 => Self::Builds, 75 105 => Self::Build, 76 106 => Self::OptimiseStore, 77 107 => Self::VerifyPaths, 78 108 => Self::Substitute, 79 109 => Self::QueryPathInfo, 80 110 => Self::PostBuildHook, 81 111 => Self::BuildWaiting, 82 112 => Self::FetchTree, 83 _ => Self::Unknown, 84 } 85 } 86} 87 88impl fmt::Display for ActivityType { 89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 90 write!(f, "{:?}", self) 91 } 92} 93 94#[derive(Debug, Clone, Copy, PartialEq, Eq)] 95enum ResultType { 96 Unknown, 97 FileLinked, 98 BuildLogLine, 99 UntrustedPath, 100 CorruptedPath, 101 SetPhase, 102 Progress, 103 SetExpected, 104 PostBuildLogLine, 105 FetchStatus, 106} 107 108impl From<u64> for ResultType { 109 fn from(n: u64) -> Self { 110 match n { 111 100 => Self::FileLinked, 112 101 => Self::BuildLogLine, 113 102 => Self::UntrustedPath, 114 103 => Self::CorruptedPath, 115 104 => Self::SetPhase, 116 105 => Self::Progress, 117 106 => Self::SetExpected, 118 107 => Self::PostBuildLogLine, 119 108 => Self::FetchStatus, 120 _ => Self::Unknown, 121 } 122 } 123} 124 125impl fmt::Display for ResultType { 126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 127 write!(f, "{:?}", self) 128 } 129} 130 131#[derive(Debug, Deserialize)] 132#[serde(tag = "action", rename_all = "snake_case")] 133enum NixEvent { 134 Start { 135 id: u64, 136 #[serde(rename = "type", default)] 137 event_type: u64, 138 #[serde(default)] 139 text: String, 140 #[serde(default)] 141 fields: Vec<serde_json::Value>, 142 #[serde(default)] 143 parent: u64, 144 }, 145 Stop { 146 id: u64, 147 }, 148 Result { 149 id: u64, 150 #[serde(rename = "type", default)] 151 event_type: u64, 152 #[serde(default)] 153 fields: Vec<serde_json::Value>, 154 }, 155} 156 157#[derive(Debug, Deserialize)] 158#[serde(tag = "action", rename_all = "snake_case")] 159enum ClientCommand { 160 GetStats { since: Option<i64> }, 161 GetTrend { since: Option<i64>, bucket: BucketSize, drv: Option<String> }, 162 Clean, 163} 164 165// Untagged outer enum: serde tries ClientCommand first, then NixEvent. 166// The action values never overlap so routing is unambiguous. 167#[derive(Debug, Deserialize)] 168#[serde(untagged)] 169enum SocketMessage { 170 Command(ClientCommand), 171 Event(NixEvent), 172} 173 174struct Activity { 175 id: u64, 176 parent_id: u64, 177 event_type: u64, 178 text: String, 179 start_time: DateTime<Utc>, 180 fields: Vec<serde_json::Value>, 181 total_bytes: u64, 182} 183 184struct State { 185 active_activities: HashMap<u64, Activity>, 186} 187 188pub struct DbConnections { 189 pub writer: Mutex<Connection>, 190 pub reader: Mutex<Connection>, 191} 192 193pub fn open_db(path: &PathBuf) -> Result<Connection> { 194 let conn = Connection::open(path) 195 .with_context(|| format!("Failed to open database at {}", path.display()))?; 196 197 // Check schema version; reset if mismatched. 198 let version: u32 = conn 199 .query_row("PRAGMA user_version", [], |r| r.get(0)) 200 .unwrap_or(0); 201 202 if version != 0 && version != SCHEMA_VERSION { 203 info!(current = version, expected = SCHEMA_VERSION, "Schema version mismatch, resetting database"); 204 drop(conn); 205 std::fs::remove_file(path) 206 .with_context(|| format!("Failed to remove stale database at {}", path.display()))?; 207 return open_db(path); 208 } 209 210 conn.execute_batch(SCHEMA) 211 .context("Failed to create schema")?; 212 conn.execute_batch(&format!("PRAGMA user_version = {}", SCHEMA_VERSION)) 213 .context("Failed to set schema version")?; 214 conn.execute_batch(" 215 PRAGMA journal_mode = WAL; 216 PRAGMA synchronous = NORMAL; 217 PRAGMA temp_store = MEMORY; 218 PRAGMA mmap_size = 134217728; 219 PRAGMA cache_size = -8000; 220 PRAGMA wal_autocheckpoint = 0; 221 ").context("Failed to configure database")?; 222 223 Ok(conn) 224} 225 226pub async fn run_daemon(socket_path: PathBuf, db: Arc<DbConnections>) -> Result<()> { 227 let state = Arc::new(Mutex::new(State { 228 active_activities: HashMap::new(), 229 })); 230 231 if socket_path.exists() { 232 std::fs::remove_file(&socket_path) 233 .with_context(|| format!("Failed to remove existing socket at {}", socket_path.display()))?; 234 } 235 236 let listener = UnixListener::bind(&socket_path) 237 .with_context(|| format!("Failed to bind to socket at {}", socket_path.display()))?; 238 info!("Daemon listening on {}", socket_path.display()); 239 240 loop { 241 let (stream, _) = listener.accept().await?; 242 let state = Arc::clone(&state); 243 let db = Arc::clone(&db); 244 tokio::spawn(async move { 245 if let Err(e) = handle_connection(stream, state, db).await { 246 error!("Connection error: {}", e); 247 } 248 }); 249 } 250} 251 252async fn handle_connection(mut stream: UnixStream, state: Arc<Mutex<State>>, db: Arc<DbConnections>) -> Result<()> { 253 let (reader, mut writer) = stream.split(); 254 let mut reader = BufReader::new(reader); 255 let mut line = String::new(); 256 loop { 257 line.clear(); 258 if reader.read_line(&mut line).await? == 0 { break; } 259 260 match serde_json::from_str::<SocketMessage>(line.trim()) { 261 Ok(SocketMessage::Command(ClientCommand::GetStats { since })) => { 262 let db = Arc::clone(&db); 263 let stats = tokio::task::spawn_blocking(move || collect_stats(&db.reader, since)) 264 .await??; 265 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 266 break; 267 } 268 Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv })) => { 269 let db = Arc::clone(&db); 270 let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv)) 271 .await??; 272 writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?; 273 break; 274 } 275 Ok(SocketMessage::Command(ClientCommand::Clean)) => { 276 let db = Arc::clone(&db); 277 tokio::task::spawn_blocking(move || -> Result<()> { 278 let conn = db.writer.lock().unwrap(); 279 conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; 280 Ok(()) 281 }).await??; 282 writer.write_all(b"ok\n").await?; 283 info!("Database cleared via socket command"); 284 break; 285 } 286 Ok(SocketMessage::Event(event)) => { 287 if let Err(e) = process_event(event, &state, &db) { 288 error!("Failed to process event: {}", e); 289 } 290 } 291 Err(e) => error!("Invalid message: {}", e), 292 } 293 } 294 Ok(()) 295} 296 297fn process_event(event: NixEvent, state: &Arc<Mutex<State>>, db: &Arc<DbConnections>) -> Result<()> { 298 let mut s = state.lock().unwrap(); 299 300 match event { 301 NixEvent::Start { id, event_type, text, fields, parent } => { 302 let act_type = ActivityType::from(event_type); 303 let text = if !text.is_empty() { 304 text 305 } else { 306 fields.get(0).and_then(|v| v.as_str()).unwrap_or("").to_string() 307 }; 308 309 info!(id, parent, act_type = %act_type, text = %text, fields = ?fields, "start"); 310 311 s.active_activities.insert(id, Activity { 312 id, 313 parent_id: parent, 314 event_type, 315 text, 316 start_time: Utc::now(), 317 fields, 318 total_bytes: 0, 319 }); 320 } 321 NixEvent::Result { id, event_type, fields } => { 322 let res_type = ResultType::from(event_type); 323 324 if res_type != ResultType::BuildLogLine && res_type != ResultType::Progress && res_type != ResultType::SetExpected { 325 info!(id, res_type = %res_type, fields = ?fields, "result"); 326 } 327 328 if let Some(act) = s.active_activities.get_mut(&id) { 329 if res_type == ResultType::Progress { 330 if let Some(total) = fields.get(1).and_then(|v| v.as_u64()) { 331 if total > 0 { act.total_bytes = total; } 332 } 333 } 334 } 335 } 336 NixEvent::Stop { id } => { 337 if let Some(act) = s.active_activities.remove(&id) { 338 let act_type = ActivityType::from(act.event_type); 339 let end_time = Utc::now(); 340 let duration_ms = end_time.signed_duration_since(act.start_time).num_milliseconds(); 341 342 info!( 343 id = act.id, act_type = %act_type, duration_ms, 344 total_bytes = act.total_bytes, text = %act.text, fields = ?act.fields, 345 "stop" 346 ); 347 348 let drv_path = act.fields.get(0).and_then(|v| v.as_str()).map(|s| s.to_string()); 349 // Both Substitute (fields[1] = substituter URL) and QueryPathInfo (fields[1] = cache URL) 350 // have the same layout. Store cache_url for both. 351 let cache_url = match act_type { 352 ActivityType::Substitute | ActivityType::QueryPathInfo => { 353 act.fields.get(1).and_then(|v| v.as_str()).map(|s| s.to_string()) 354 } 355 _ => None, 356 }; 357 358 drop(s); 359 db.writer.lock().unwrap().execute( 360 "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes) 361 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", 362 rusqlite::params![ 363 act.id as i64, act.parent_id as i64, act.event_type as i64, 364 act.text, drv_path, cache_url, 365 act.start_time.to_rfc3339(), end_time.to_rfc3339(), 366 duration_ms, act.total_bytes as i64, 367 ], 368 ).context("Failed to insert event")?; 369 } 370 } 371 } 372 373 Ok(()) 374}