Nix Observability Daemon
observability
nix
1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use rusqlite::Connection;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::fmt;
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::net::{UnixListener, UnixStream};
11use tracing::{error, info};
12
13use crate::stats::{collect_stats, collect_trend, BucketSize};
14
15const fn schema_hash(s: &[u8]) -> u32 {
16 let mut h: u32 = 2166136261;
17 let mut i = 0;
18 while i < s.len() { h ^= s[i] as u32; h = h.wrapping_mul(16777619); i += 1; }
19 h
20}
21
22const SCHEMA: &str = "
23CREATE TABLE IF NOT EXISTS events (
24 id INTEGER PRIMARY KEY AUTOINCREMENT,
25 nix_id INTEGER,
26 parent_id INTEGER,
27 event_type INTEGER,
28 text TEXT,
29 drv_path TEXT,
30 cache_url TEXT,
31 start_time TEXT,
32 end_time TEXT,
33 duration_ms INTEGER,
34 total_bytes INTEGER
35);
36";
37
38// Append a new hash entry when the schema changes - SCHEMA_VERSION auto-increments.
39const SCHEMA_HASHES: &[u32] = &[
40 0x9bc94a70, // v1
41];
42const SCHEMA_VERSION: u32 = SCHEMA_HASHES.len() as u32;
43const _: () = assert!(
44 schema_hash(SCHEMA.as_bytes()) == SCHEMA_HASHES[SCHEMA_VERSION as usize - 1],
45 "schema changed - append new hash to SCHEMA_HASHES"
46);
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
49#[repr(u64)]
50pub enum ActivityType {
51 Unknown = 0,
52 CopyPath = 100,
53 FileTransfer = 101, // Actual HTTP download
54 Realise = 102,
55 CopyPaths = 103,
56 Builds = 104,
57 Build = 105, // Individual derivation build
58 OptimiseStore = 106,
59 VerifyPaths = 107,
60 Substitute = 108, // High-level cache substitution
61 QueryPathInfo = 109, // Checking if path exists on cache
62 PostBuildHook = 110,
63 BuildWaiting = 111,
64 FetchTree = 112,
65}
66
67impl From<u64> for ActivityType {
68 fn from(n: u64) -> Self {
69 match n {
70 100 => Self::CopyPath,
71 101 => Self::FileTransfer,
72 102 => Self::Realise,
73 103 => Self::CopyPaths,
74 104 => Self::Builds,
75 105 => Self::Build,
76 106 => Self::OptimiseStore,
77 107 => Self::VerifyPaths,
78 108 => Self::Substitute,
79 109 => Self::QueryPathInfo,
80 110 => Self::PostBuildHook,
81 111 => Self::BuildWaiting,
82 112 => Self::FetchTree,
83 _ => Self::Unknown,
84 }
85 }
86}
87
88impl fmt::Display for ActivityType {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 write!(f, "{:?}", self)
91 }
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95enum ResultType {
96 Unknown,
97 FileLinked,
98 BuildLogLine,
99 UntrustedPath,
100 CorruptedPath,
101 SetPhase,
102 Progress,
103 SetExpected,
104 PostBuildLogLine,
105 FetchStatus,
106}
107
108impl From<u64> for ResultType {
109 fn from(n: u64) -> Self {
110 match n {
111 100 => Self::FileLinked,
112 101 => Self::BuildLogLine,
113 102 => Self::UntrustedPath,
114 103 => Self::CorruptedPath,
115 104 => Self::SetPhase,
116 105 => Self::Progress,
117 106 => Self::SetExpected,
118 107 => Self::PostBuildLogLine,
119 108 => Self::FetchStatus,
120 _ => Self::Unknown,
121 }
122 }
123}
124
125impl fmt::Display for ResultType {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 write!(f, "{:?}", self)
128 }
129}
130
131#[derive(Debug, Deserialize)]
132#[serde(tag = "action", rename_all = "snake_case")]
133enum NixEvent {
134 Start {
135 id: u64,
136 #[serde(rename = "type", default)]
137 event_type: u64,
138 #[serde(default)]
139 text: String,
140 #[serde(default)]
141 fields: Vec<serde_json::Value>,
142 #[serde(default)]
143 parent: u64,
144 },
145 Stop {
146 id: u64,
147 },
148 Result {
149 id: u64,
150 #[serde(rename = "type", default)]
151 event_type: u64,
152 #[serde(default)]
153 fields: Vec<serde_json::Value>,
154 },
155}
156
157#[derive(Debug, Deserialize)]
158#[serde(tag = "action", rename_all = "snake_case")]
159enum ClientCommand {
160 GetStats { since: Option<i64> },
161 GetTrend { since: Option<i64>, bucket: BucketSize, drv: Option<String> },
162 Clean,
163}
164
165// Untagged outer enum: serde tries ClientCommand first, then NixEvent.
166// The action values never overlap so routing is unambiguous.
167#[derive(Debug, Deserialize)]
168#[serde(untagged)]
169enum SocketMessage {
170 Command(ClientCommand),
171 Event(NixEvent),
172}
173
174struct Activity {
175 id: u64,
176 parent_id: u64,
177 event_type: u64,
178 text: String,
179 start_time: DateTime<Utc>,
180 fields: Vec<serde_json::Value>,
181 total_bytes: u64,
182}
183
184struct State {
185 active_activities: HashMap<u64, Activity>,
186}
187
188pub struct DbConnections {
189 pub writer: Mutex<Connection>,
190 pub reader: Mutex<Connection>,
191}
192
193pub fn open_db(path: &PathBuf) -> Result<Connection> {
194 let conn = Connection::open(path)
195 .with_context(|| format!("Failed to open database at {}", path.display()))?;
196
197 // Check schema version; reset if mismatched.
198 let version: u32 = conn
199 .query_row("PRAGMA user_version", [], |r| r.get(0))
200 .unwrap_or(0);
201
202 if version != 0 && version != SCHEMA_VERSION {
203 info!(current = version, expected = SCHEMA_VERSION, "Schema version mismatch, resetting database");
204 drop(conn);
205 std::fs::remove_file(path)
206 .with_context(|| format!("Failed to remove stale database at {}", path.display()))?;
207 return open_db(path);
208 }
209
210 conn.execute_batch(SCHEMA)
211 .context("Failed to create schema")?;
212 conn.execute_batch(&format!("PRAGMA user_version = {}", SCHEMA_VERSION))
213 .context("Failed to set schema version")?;
214 conn.execute_batch("
215 PRAGMA journal_mode = WAL;
216 PRAGMA synchronous = NORMAL;
217 PRAGMA temp_store = MEMORY;
218 PRAGMA mmap_size = 134217728;
219 PRAGMA cache_size = -8000;
220 PRAGMA wal_autocheckpoint = 0;
221 ").context("Failed to configure database")?;
222
223 Ok(conn)
224}
225
226pub async fn run_daemon(socket_path: PathBuf, db: Arc<DbConnections>) -> Result<()> {
227 let state = Arc::new(Mutex::new(State {
228 active_activities: HashMap::new(),
229 }));
230
231 if socket_path.exists() {
232 std::fs::remove_file(&socket_path)
233 .with_context(|| format!("Failed to remove existing socket at {}", socket_path.display()))?;
234 }
235
236 let listener = UnixListener::bind(&socket_path)
237 .with_context(|| format!("Failed to bind to socket at {}", socket_path.display()))?;
238 info!("Daemon listening on {}", socket_path.display());
239
240 loop {
241 let (stream, _) = listener.accept().await?;
242 let state = Arc::clone(&state);
243 let db = Arc::clone(&db);
244 tokio::spawn(async move {
245 if let Err(e) = handle_connection(stream, state, db).await {
246 error!("Connection error: {}", e);
247 }
248 });
249 }
250}
251
252async fn handle_connection(mut stream: UnixStream, state: Arc<Mutex<State>>, db: Arc<DbConnections>) -> Result<()> {
253 let (reader, mut writer) = stream.split();
254 let mut reader = BufReader::new(reader);
255 let mut line = String::new();
256 loop {
257 line.clear();
258 if reader.read_line(&mut line).await? == 0 { break; }
259
260 match serde_json::from_str::<SocketMessage>(line.trim()) {
261 Ok(SocketMessage::Command(ClientCommand::GetStats { since })) => {
262 let db = Arc::clone(&db);
263 let stats = tokio::task::spawn_blocking(move || collect_stats(&db.reader, since))
264 .await??;
265 writer.write_all((serde_json::to_string(&stats)? + "\n").as_bytes()).await?;
266 break;
267 }
268 Ok(SocketMessage::Command(ClientCommand::GetTrend { since, bucket, drv })) => {
269 let db = Arc::clone(&db);
270 let trend = tokio::task::spawn_blocking(move || collect_trend(&db.reader, since, bucket, drv))
271 .await??;
272 writer.write_all((serde_json::to_string(&trend)? + "\n").as_bytes()).await?;
273 break;
274 }
275 Ok(SocketMessage::Command(ClientCommand::Clean)) => {
276 let db = Arc::clone(&db);
277 tokio::task::spawn_blocking(move || -> Result<()> {
278 let conn = db.writer.lock().unwrap();
279 conn.execute_batch("DELETE FROM events; VACUUM; PRAGMA wal_checkpoint(TRUNCATE);")?;
280 Ok(())
281 }).await??;
282 writer.write_all(b"ok\n").await?;
283 info!("Database cleared via socket command");
284 break;
285 }
286 Ok(SocketMessage::Event(event)) => {
287 if let Err(e) = process_event(event, &state, &db) {
288 error!("Failed to process event: {}", e);
289 }
290 }
291 Err(e) => error!("Invalid message: {}", e),
292 }
293 }
294 Ok(())
295}
296
297fn process_event(event: NixEvent, state: &Arc<Mutex<State>>, db: &Arc<DbConnections>) -> Result<()> {
298 let mut s = state.lock().unwrap();
299
300 match event {
301 NixEvent::Start { id, event_type, text, fields, parent } => {
302 let act_type = ActivityType::from(event_type);
303 let text = if !text.is_empty() {
304 text
305 } else {
306 fields.get(0).and_then(|v| v.as_str()).unwrap_or("").to_string()
307 };
308
309 info!(id, parent, act_type = %act_type, text = %text, fields = ?fields, "start");
310
311 s.active_activities.insert(id, Activity {
312 id,
313 parent_id: parent,
314 event_type,
315 text,
316 start_time: Utc::now(),
317 fields,
318 total_bytes: 0,
319 });
320 }
321 NixEvent::Result { id, event_type, fields } => {
322 let res_type = ResultType::from(event_type);
323
324 if res_type != ResultType::BuildLogLine && res_type != ResultType::Progress && res_type != ResultType::SetExpected {
325 info!(id, res_type = %res_type, fields = ?fields, "result");
326 }
327
328 if let Some(act) = s.active_activities.get_mut(&id) {
329 if res_type == ResultType::Progress {
330 if let Some(total) = fields.get(1).and_then(|v| v.as_u64()) {
331 if total > 0 { act.total_bytes = total; }
332 }
333 }
334 }
335 }
336 NixEvent::Stop { id } => {
337 if let Some(act) = s.active_activities.remove(&id) {
338 let act_type = ActivityType::from(act.event_type);
339 let end_time = Utc::now();
340 let duration_ms = end_time.signed_duration_since(act.start_time).num_milliseconds();
341
342 info!(
343 id = act.id, act_type = %act_type, duration_ms,
344 total_bytes = act.total_bytes, text = %act.text, fields = ?act.fields,
345 "stop"
346 );
347
348 let drv_path = act.fields.get(0).and_then(|v| v.as_str()).map(|s| s.to_string());
349 // Both Substitute (fields[1] = substituter URL) and QueryPathInfo (fields[1] = cache URL)
350 // have the same layout. Store cache_url for both.
351 let cache_url = match act_type {
352 ActivityType::Substitute | ActivityType::QueryPathInfo => {
353 act.fields.get(1).and_then(|v| v.as_str()).map(|s| s.to_string())
354 }
355 _ => None,
356 };
357
358 drop(s);
359 db.writer.lock().unwrap().execute(
360 "INSERT INTO events (nix_id, parent_id, event_type, text, drv_path, cache_url, start_time, end_time, duration_ms, total_bytes)
361 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
362 rusqlite::params![
363 act.id as i64, act.parent_id as i64, act.event_type as i64,
364 act.text, drv_path, cache_url,
365 act.start_time.to_rfc3339(), end_time.to_rfc3339(),
366 duration_ms, act.total_bytes as i64,
367 ],
368 ).context("Failed to insert event")?;
369 }
370 }
371 }
372
373 Ok(())
374}