crossing the streams
at main 375 lines 12 kB view raw
1use tracing::span::{Attributes, Id}; 2use tracing::{field::Visit, Event, Level, Subscriber}; 3use tracing_subscriber::layer::Context; 4use tracing_subscriber::prelude::*; 5use tracing_subscriber::registry::LookupSpan; 6use tracing_subscriber::Layer; 7use tracing_subscriber::Registry; 8 9use chrono::{Local, Utc}; 10use console::{style, Term}; 11use std::collections::HashMap; 12use std::sync::{Arc, Mutex}; 13use std::time::{Duration, Instant}; 14 15use crate::store::{FollowOption, ReadOptions, Store}; 16 17const INITIAL_BACKOFF: Duration = Duration::from_secs(10); 18const MAX_BACKOFF: Duration = Duration::from_secs(1800); // 30 minutes 19 20#[derive(Debug, Clone)] 21struct TraceNode { 22 level: Level, 23 name: String, 24 parent_id: Option<Id>, 25 children: Vec<Child>, 26 module_path: Option<String>, 27 line: Option<u32>, 28 fields: HashMap<String, String>, 29 start_time: Option<Instant>, 30 took: Option<u128>, // Duration in microseconds 31} 32 33#[derive(Debug, Clone)] 34enum Child { 35 Event(TraceNode), 36 Span(Id), 37} 38 39impl Visit for TraceNode { 40 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { 41 self.fields 42 .insert(field.name().to_string(), format!("{:?}", value)); 43 } 44} 45 46impl TraceNode { 47 fn new( 48 level: Level, 49 name: String, 50 parent_id: Option<Id>, 51 module_path: Option<String>, 52 line: Option<u32>, 53 ) -> Self { 54 Self { 55 level, 56 name, 57 parent_id, 58 children: Vec::new(), 59 module_path, 60 line, 61 fields: HashMap::new(), 62 start_time: None, 63 took: None, 64 } 65 } 66 67 fn duration_text(&self) -> String { 68 match self.took { 69 Some(micros) if micros >= 1000 => format!("{}ms", micros / 1000), 70 _ => String::new(), 71 } 72 } 73 74 fn format_message(&self) -> String { 75 let mut parts = Vec::new(); 76 77 // Name is styled in cyan for spans (which have took value) 78 if self.took.is_some() { 79 parts.push(style(&self.name).cyan().to_string()); 80 } else { 81 parts.push(self.name.clone()); 82 } 83 84 // Message field doesn't get key=value treatment 85 if let Some(msg) = self.fields.get("message") { 86 parts.push(style(msg.trim_matches('"')).italic().to_string()); 87 } 88 89 // Other fields get key=value format 90 let fields: String = self 91 .fields 92 .iter() 93 .filter(|(k, _)| *k != "message") 94 .map(|(k, v)| format!("{}={}", k, v.trim_matches('"'))) 95 .collect::<Vec<_>>() 96 .join(" "); 97 98 if !fields.is_empty() { 99 parts.push(fields); 100 } 101 102 parts.join(" ") 103 } 104} 105 106#[derive(Clone)] 107pub struct HierarchicalSubscriber { 108 spans: Arc<Mutex<HashMap<Id, TraceNode>>>, 109 next_log_delta: Arc<Mutex<HashMap<Id, Duration>>>, 110} 111 112impl Default for HierarchicalSubscriber { 113 fn default() -> Self { 114 Self::new() 115 } 116} 117 118impl HierarchicalSubscriber { 119 pub fn new() -> Self { 120 HierarchicalSubscriber { 121 spans: Arc::new(Mutex::new(HashMap::new())), 122 next_log_delta: Arc::new(Mutex::new(HashMap::new())), 123 } 124 } 125 126 fn format_trace_node(&self, node: &TraceNode, depth: usize, is_last: bool) -> String { 127 let now = Utc::now().with_timezone(&Local); 128 let formatted_time = now.format("%H:%M:%S%.3f").to_string(); 129 130 // Format location info using module_path instead of file 131 let loc = if let Some(module_path) = &node.module_path { 132 if let Some(line) = node.line { 133 format!("{}:{}", module_path, line) 134 } else { 135 module_path.clone() 136 } 137 } else { 138 String::new() 139 }; 140 141 // Build the tree visualization 142 let mut prefix = String::new(); 143 if depth > 0 { 144 prefix.push_str(&"".repeat(depth - 1)); 145 prefix.push_str(if is_last { "└─ " } else { "├─ " }); 146 } 147 148 // Format duration with proper alignment 149 let duration_text = format!("{:>7}", node.duration_text()); 150 151 // Build the message content 152 let mut message = format!( 153 "{} {:>5} {} {}{}", 154 formatted_time, 155 node.level, 156 duration_text, 157 prefix, 158 node.format_message() 159 ); 160 161 // Add right-aligned module path 162 let terminal_width = Term::stdout().size().1 as usize; 163 let content_width = 164 console::measure_text_width(&message) + console::measure_text_width(&loc); 165 let padding = " ".repeat(terminal_width.saturating_sub(content_width)); 166 message.push_str(&padding); 167 message.push_str(&loc); 168 169 message 170 } 171 172 fn print_span_tree(&self, span_id: &Id, depth: usize, spans: &HashMap<Id, TraceNode>) { 173 if let Some(node) = spans.get(span_id) { 174 eprintln!("{}", self.format_trace_node(node, depth, false)); 175 let children_count = node.children.len(); 176 for (idx, child) in node.children.iter().enumerate() { 177 let is_last = idx == children_count - 1; 178 match child { 179 Child::Event(event_node) => { 180 eprintln!("{}", self.format_trace_node(event_node, depth + 1, is_last)); 181 } 182 Child::Span(child_id) => { 183 self.print_span_tree(child_id, depth + 1, spans); 184 } 185 } 186 } 187 } 188 } 189 190 pub fn monitor_long_spans(&self) { 191 let spans = self.spans.lock().unwrap(); 192 let mut next_log_delta = self.next_log_delta.lock().unwrap(); 193 let now = Instant::now(); 194 for (span_id, node) in spans.iter() { 195 if let Some(start_time) = node.start_time { 196 let next_delta = next_log_delta 197 .entry(span_id.clone()) 198 .or_insert_with(|| INITIAL_BACKOFF); 199 if now >= start_time + *next_delta { 200 eprintln!( 201 "{}", 202 self.format_trace_node_with_incomplete( 203 node, 204 now.duration_since(start_time) 205 ) 206 ); 207 self.print_span_tree(span_id, 1, &spans); 208 *next_delta = calculate_next_backoff(*next_delta); 209 } 210 } 211 } 212 } 213 214 fn format_trace_node_with_incomplete(&self, node: &TraceNode, duration: Duration) -> String { 215 let now = Utc::now().with_timezone(&Local); 216 let formatted_time = now.format("%H:%M:%S%.3f").to_string(); 217 218 let loc = if let Some(module_path) = &node.module_path { 219 if let Some(line) = node.line { 220 format!("{}:{}", module_path, line) 221 } else { 222 module_path.clone() 223 } 224 } else { 225 String::new() 226 }; 227 228 // Highlight incomplete spans 229 let duration_text = format!( 230 "{}{:>7}ms", 231 style(">").yellow(), 232 style(duration.as_millis()).yellow() 233 ); 234 235 let mut message = format!( 236 "{} {:>5} {} {}", 237 formatted_time, 238 node.level, 239 duration_text, 240 style(&node.name).yellow() 241 ); 242 243 let terminal_width = Term::stdout().size().1 as usize; 244 let content_width = 245 console::measure_text_width(&message) + console::measure_text_width(&loc); 246 let padding = " ".repeat(terminal_width.saturating_sub(content_width)); 247 message.push_str(&padding); 248 message.push_str(&loc); 249 250 message 251 } 252} 253 254impl<S> Layer<S> for HierarchicalSubscriber 255where 256 S: Subscriber + for<'a> LookupSpan<'a>, 257{ 258 fn on_enter(&self, id: &Id, _ctx: Context<'_, S>) { 259 let mut spans = self.spans.lock().unwrap(); 260 if let Some(node) = spans.get_mut(id) { 261 node.start_time = Some(Instant::now()); 262 } 263 } 264 265 fn on_exit(&self, id: &Id, _ctx: Context<'_, S>) { 266 let mut spans = self.spans.lock().unwrap(); 267 if let Some(node) = spans.get_mut(id) { 268 if let Some(start_time) = node.start_time.take() { 269 let elapsed = start_time.elapsed().as_micros(); 270 node.took = Some(node.took.unwrap_or(0) + elapsed); 271 } 272 } 273 } 274 275 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { 276 let metadata = event.metadata(); 277 278 let mut event_node = TraceNode::new( 279 *metadata.level(), 280 metadata.name().to_string(), 281 None, 282 metadata.module_path().map(ToString::to_string), 283 metadata.line(), 284 ); 285 286 event.record(&mut event_node); 287 288 let mut spans = self.spans.lock().unwrap(); 289 290 if let Some(span) = ctx.lookup_current() { 291 let id = span.id(); 292 if let Some(parent_span) = spans.get_mut(&id) { 293 parent_span.children.push(Child::Event(event_node.clone())); 294 } 295 } else { 296 eprintln!("{}", self.format_trace_node(&event_node, 0, true)); 297 } 298 } 299 300 fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { 301 let metadata = attrs.metadata(); 302 303 let curr = ctx.current_span(); 304 let parent_id = curr.id(); 305 306 let mut node = TraceNode::new( 307 *metadata.level(), 308 metadata.name().to_string(), 309 parent_id.cloned(), 310 metadata.module_path().map(ToString::to_string), 311 metadata.line(), 312 ); 313 attrs.record(&mut node); 314 315 let mut spans = self.spans.lock().unwrap(); 316 317 if let Some(parent_id) = &parent_id { 318 if let Some(parent_node) = spans.get_mut(parent_id) { 319 parent_node.children.push(Child::Span(id.clone())); 320 } 321 } 322 323 spans.insert(id.clone(), node); 324 } 325 326 fn on_close(&self, id: Id, _ctx: Context<'_, S>) { 327 let spans = self.spans.lock().unwrap(); 328 if let Some(node) = spans.get(&id) { 329 // Only print when a root span closes 330 if node.parent_id.is_none() { 331 self.print_span_tree(&id, 0, &spans); 332 } 333 } else { 334 eprintln!("DEBUG: No node found for closing span"); 335 } 336 } 337} 338 339fn calculate_next_backoff(current_backoff: Duration) -> Duration { 340 if current_backoff > MAX_BACKOFF { 341 current_backoff + MAX_BACKOFF 342 } else { 343 current_backoff * 2 344 } 345} 346 347pub async fn log_stream(store: Store) { 348 let options = ReadOptions::builder() 349 .follow(FollowOption::On) 350 .tail(true) 351 .build(); 352 let mut recver = store.read(options).await; 353 while let Some(frame) = recver.recv().await { 354 let now = Utc::now().with_timezone(&Local); 355 let formatted_time = now.format("%H:%M:%S%.3f").to_string(); 356 let id = frame.id.to_string(); 357 let id = &id[id.len() - 5..]; 358 eprintln!("{} {:>5} {}", formatted_time, id, frame.topic); 359 } 360} 361 362pub fn init() { 363 let subscriber = HierarchicalSubscriber::new(); 364 365 // Clone the subscriber for monitoring 366 let monitor_subscriber = Arc::new(subscriber.clone()); 367 std::thread::spawn(move || loop { 368 std::thread::sleep(Duration::from_secs(1)); 369 monitor_subscriber.monitor_long_spans(); 370 }); 371 372 // Register the subscriber directly 373 let registry = Registry::default().with(subscriber); 374 tracing::subscriber::set_global_default(registry).expect("setting tracing default failed"); 375}