Nix Observability Daemon
observability nix

chore: refactor socket communication

+129 -82
+103 -65
src/daemon.rs
··· 12 12 13 13 use crate::stats::collect_stats; 14 14 15 - // Schema version - increment when the schema changes incompatibly. 16 - const SCHEMA_VERSION: u32 = 1; 15 + const fn schema_hash(s: &[u8]) -> u32 { 16 + let mut h: u32 = 2166136261; 17 + let mut i = 0; 18 + while i < s.len() { h ^= s[i] as u32; h = h.wrapping_mul(16777619); i += 1; } 19 + h 20 + } 17 21 18 22 const SCHEMA: &str = " 19 23 CREATE TABLE IF NOT EXISTS events ( ··· 30 34 total_bytes INTEGER 31 35 ); 32 36 "; 37 + 38 + // Append a new hash entry when the schema changes - SCHEMA_VERSION auto-increments. 39 + const SCHEMA_HASHES: &[u32] = &[ 40 + 0x9bc94a70, // v1 41 + ]; 42 + const SCHEMA_VERSION: u32 = SCHEMA_HASHES.len() as u32; 43 + const _: () = assert!( 44 + schema_hash(SCHEMA.as_bytes()) == SCHEMA_HASHES[SCHEMA_VERSION as usize - 1], 45 + "schema changed - append new hash to SCHEMA_HASHES" 46 + ); 33 47 34 48 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 35 49 #[repr(u64)] ··· 114 128 } 115 129 } 116 130 117 - #[derive(Debug, Deserialize, Serialize)] 118 - struct NixEvent { 119 - action: String, 120 - id: u64, 121 - // Raw number: ActivityType for "start", ResultType for "result", absent on "stop" 122 - #[serde(default, rename = "type")] 123 - event_type: u64, 124 - #[serde(default)] 125 - text: String, 126 - #[serde(default)] 127 - fields: Vec<serde_json::Value>, 128 - #[serde(default)] 129 - parent: u64, 131 + #[derive(Debug, Deserialize)] 132 + #[serde(tag = "action", rename_all = "snake_case")] 133 + enum NixEvent { 134 + Start { 135 + id: u64, 136 + #[serde(rename = "type", default)] 137 + event_type: u64, 138 + #[serde(default)] 139 + text: String, 140 + #[serde(default)] 141 + fields: Vec<serde_json::Value>, 142 + #[serde(default)] 143 + parent: u64, 144 + }, 145 + Stop { 146 + id: u64, 147 + }, 148 + Result { 149 + id: u64, 150 + #[serde(rename = "type", default)] 151 + event_type: u64, 152 + #[serde(default)] 153 + fields: Vec<serde_json::Value>, 154 + }, 155 + } 156 + 157 + #[derive(Debug, Deserialize)] 158 + #[serde(tag = "action", rename_all = "snake_case")] 159 + enum ClientCommand { 160 + GetStats { since: Option<i64> }, 161 + Clean, 162 + } 163 + 164 + // Untagged outer enum: serde tries ClientCommand first, then NixEvent. 165 + // The action values never overlap so routing is unambiguous. 166 + #[derive(Debug, Deserialize)] 167 + #[serde(untagged)] 168 + enum SocketMessage { 169 + Command(ClientCommand), 170 + Event(NixEvent), 130 171 } 131 172 132 173 struct Activity { ··· 211 252 loop { 212 253 line.clear(); 213 254 if reader.read_line(&mut line).await? == 0 { break; } 214 - let cmd = line.trim(); 215 255 216 - if cmd.starts_with("get_stats") { 217 - let since = cmd.split_whitespace().nth(1) 218 - .unwrap_or("1970-01-01T00:00:00+00:00") 219 - .to_string(); 220 - let db = state.lock().unwrap().db.clone(); 221 - let stats = tokio::task::spawn_blocking(move || collect_stats(&db, &since)) 222 - .await??; 223 - writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 224 - break; 225 - } 226 - if cmd == "clean" { 227 - let db = state.lock().unwrap().db.clone(); 228 - tokio::task::spawn_blocking(move || -> Result<()> { 229 - let conn = db.lock().unwrap(); 230 - conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; 231 - Ok(()) 232 - }).await??; 233 - writer.write_all(b"ok\n").await?; 234 - info!("Database cleared via socket command"); 235 - break; 236 - } 237 - 238 - if let Ok(event) = serde_json::from_str::<NixEvent>(cmd) { 239 - process_event(event, &state)?; 256 + match serde_json::from_str::<SocketMessage>(line.trim()) { 257 + Ok(SocketMessage::Command(ClientCommand::GetStats { since })) => { 258 + let db = state.lock().unwrap().db.clone(); 259 + let stats = tokio::task::spawn_blocking(move || collect_stats(&db, since)) 260 + .await??; 261 + writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?; 262 + break; 263 + } 264 + Ok(SocketMessage::Command(ClientCommand::Clean)) => { 265 + let db = state.lock().unwrap().db.clone(); 266 + tokio::task::spawn_blocking(move || -> Result<()> { 267 + let conn = db.lock().unwrap(); 268 + conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?; 269 + Ok(()) 270 + }).await??; 271 + writer.write_all(b"ok\n").await?; 272 + info!("Database cleared via socket command"); 273 + break; 274 + } 275 + Ok(SocketMessage::Event(event)) => { 276 + if let Err(e) = process_event(event, &state) { 277 + error!("Failed to process event: {}", e); 278 + } 279 + } 280 + Err(e) => error!("Invalid message: {}", e), 240 281 } 241 282 } 242 283 Ok(()) ··· 245 286 fn process_event(event: NixEvent, state: &Arc<Mutex<State>>) -> Result<()> { 246 287 let mut s = state.lock().unwrap(); 247 288 248 - match event.action.as_str() { 249 - "start" => { 250 - let act_type = ActivityType::from(event.event_type); 251 - let text = if !event.text.is_empty() { 252 - event.text.clone() 289 + match event { 290 + NixEvent::Start { id, event_type, text, fields, parent } => { 291 + let act_type = ActivityType::from(event_type); 292 + let text = if !text.is_empty() { 293 + text 253 294 } else { 254 - event.fields.get(0).and_then(|v| v.as_str()).unwrap_or("").to_string() 295 + fields.get(0).and_then(|v| v.as_str()).unwrap_or("").to_string() 255 296 }; 256 297 257 298 info!( 258 - id = event.id, 259 - parent = event.parent, 299 + id, 300 + parent, 260 301 act_type = %act_type, 261 302 text = %text, 262 - fields = ?event.fields, 303 + fields = ?fields, 263 304 "start" 264 305 ); 265 306 266 - s.active_activities.insert(event.id, Activity { 267 - id: event.id, 268 - parent_id: event.parent, 269 - event_type: event.event_type, 307 + s.active_activities.insert(id, Activity { 308 + id, 309 + parent_id: parent, 310 + event_type, 270 311 text, 271 312 start_time: Utc::now(), 272 - fields: event.fields, 313 + fields, 273 314 total_bytes: 0, 274 315 }); 275 316 } 276 - "result" => { 277 - let res_type = ResultType::from(event.event_type); 317 + NixEvent::Result { id, event_type, fields } => { 318 + let res_type = ResultType::from(event_type); 278 319 279 320 if res_type != ResultType::BuildLogLine && res_type != ResultType::Progress && res_type != ResultType::SetExpected { 280 321 info!( 281 - id = event.id, 322 + id, 282 323 res_type = %res_type, 283 - fields = ?event.fields, 324 + fields = ?fields, 284 325 "result" 285 326 ); 286 327 } 287 328 288 - if let Some(act) = s.active_activities.get_mut(&event.id) { 329 + if let Some(act) = s.active_activities.get_mut(&id) { 289 330 if res_type == ResultType::Progress { 290 - if let Some(total) = event.fields.get(1).and_then(|v| v.as_u64()) { 331 + if let Some(total) = fields.get(1).and_then(|v| v.as_u64()) { 291 332 if total > 0 { act.total_bytes = total; } 292 333 } 293 334 } 294 335 } 295 336 } 296 - "stop" => { 297 - if let Some(act) = s.active_activities.remove(&event.id) { 337 + NixEvent::Stop { id } => { 338 + if let Some(act) = s.active_activities.remove(&id) { 298 339 let act_type = ActivityType::from(act.event_type); 299 340 let end_time = Utc::now(); 300 341 let duration_ms = end_time.signed_duration_since(act.start_time).num_milliseconds(); ··· 337 378 ], 338 379 ).context("Failed to insert event")?; 339 380 } 340 - } 341 - _ => { 342 - info!(action = %event.action, id = event.id, "unknown action"); 343 381 } 344 382 } 345 383 Ok(())
+12 -10
src/main.rs
··· 103 103 .unwrap_or_else(|| PathBuf::from("/tmp/nod.sock")) 104 104 }); 105 105 106 - // Compute the since timestamp. Flags are additive (e.g. -y 1 -d 3 = 1 year + 3 days ago). 107 - let mut since = Utc::now(); 108 - if let Some(y) = years { since = since - chrono::Months::new(y * 12); } 109 - if let Some(m) = months { since = since - chrono::Months::new(m); } 110 - if let Some(d) = days { since = since - chrono::Duration::days(d as i64); } 111 - let has_filter = days.is_some() || months.is_some() || years.is_some(); 112 - // Epoch as the "no filter" sentinel — a WHERE start_time >= epoch matches everything. 113 - let since_str = if has_filter { since.to_rfc3339() } else { "1970-01-01T00:00:00+00:00".to_string() }; 106 + let since: Option<i64> = if days.is_some() || months.is_some() || years.is_some() { 107 + let mut t = Utc::now(); 108 + if let Some(y) = years { t = t - chrono::Months::new(y * 12); } 109 + if let Some(m) = months { t = t - chrono::Months::new(m); } 110 + if let Some(d) = days { t = t - chrono::Duration::days(d as i64); } 111 + Some(t.timestamp()) 112 + } else { 113 + None 114 + }; 114 115 115 116 let mut stream = UnixStream::connect(&socket_path) 116 117 .await 117 118 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 118 119 119 - stream.write_all(format!("get_stats {}\n", since_str).as_bytes()).await?; 120 + let cmd = serde_json::json!({"action": "get_stats", "since": since}); 121 + stream.write_all((cmd.to_string() + "\n").as_bytes()).await?; 120 122 121 123 let mut reader = BufReader::new(stream); 122 124 let mut line = String::new(); ··· 136 138 .await 137 139 .with_context(|| format!("Failed to connect to daemon at {}", socket_path.display()))?; 138 140 139 - stream.write_all(b"clean\n").await?; 141 + stream.write_all(b"{\"action\":\"clean\"}\n").await?; 140 142 141 143 let mut reader = BufReader::new(stream); 142 144 let mut line = String::new();
+14 -7
src/stats.rs
··· 30 30 pub count: i64, 31 31 } 32 32 33 - pub fn collect_stats(db: &Arc<Mutex<Connection>>, since: &str) -> Result<Stats> { 33 + pub fn collect_stats(db: &Arc<Mutex<Connection>>, since: Option<i64>) -> Result<Stats> { 34 34 let conn = db.lock().unwrap(); 35 35 36 + // Convert unix timestamp to RFC3339 for comparison with stored start_time strings. 37 + // None means no filter - SQL NULL makes the condition vacuously true. 38 + let since_str: Option<String> = since 39 + .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) 40 + .map(|dt| dt.to_rfc3339()); 41 + let p = since_str.as_deref(); 42 + 36 43 let (build_count, build_total_ms, subst_count, subst_total_ms, download_bytes, download_ms) = 37 44 conn.query_row( 38 45 "SELECT ··· 42 49 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 108), 0), 43 50 COALESCE(SUM(total_bytes) FILTER (WHERE event_type = 101), 0), 44 51 COALESCE(SUM(duration_ms) FILTER (WHERE event_type = 101), 0) 45 - FROM events WHERE start_time >= ?1", 46 - [since], 52 + FROM events WHERE (?1 IS NULL OR start_time >= ?1)", 53 + rusqlite::params![p], 47 54 |r| Ok((r.get::<_, i64>(0)?, r.get::<_, i64>(1)?, r.get::<_, i64>(2)?, 48 55 r.get::<_, i64>(3)?, r.get::<_, i64>(4)?, r.get::<_, i64>(5)?)), 49 56 ).context("Failed to query summary stats")?; 50 57 51 58 let mut stmt = conn.prepare( 52 59 "SELECT duration_ms, drv_path, text 53 - FROM events WHERE event_type = 105 AND start_time >= ?1 60 + FROM events WHERE event_type = 105 AND (?1 IS NULL OR start_time >= ?1) 54 61 ORDER BY duration_ms DESC LIMIT 10", 55 62 ).context("Failed to prepare slowest builds query")?; 56 - let slowest_builds: Vec<SlowBuild> = stmt.query_map([since], |r| { 63 + let slowest_builds: Vec<SlowBuild> = stmt.query_map(rusqlite::params![p], |r| { 57 64 Ok(SlowBuild { 58 65 duration_ms: r.get(0)?, 59 66 drv_path: r.get(1)?, ··· 65 72 // per substituter, not just metadata query time (QueryPathInfo). 66 73 let mut stmt = conn.prepare( 67 74 "SELECT cache_url, AVG(duration_ms), COUNT(*) 68 - FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND start_time >= ?1 75 + FROM events WHERE event_type = 108 AND cache_url IS NOT NULL AND (?1 IS NULL OR start_time >= ?1) 69 76 GROUP BY cache_url ORDER BY AVG(duration_ms) DESC", 70 77 ).context("Failed to prepare cache latency query")?; 71 - let cache_latency: Vec<CacheStat> = stmt.query_map([since], |r| { 78 + let cache_latency: Vec<CacheStat> = stmt.query_map(rusqlite::params![p], |r| { 72 79 Ok(CacheStat { 73 80 cache_url: r.get(0)?, 74 81 avg_ms: r.get(1)?,