Rust and WASM did-method-plc tools and structures
at main 21 kB view raw
1//! PLC Fork Visualizer 2//! 3//! This binary fetches DID audit logs and visualizes any forks in the operation chain, 4//! showing which operations won/lost and why based on rotation key priority and the 5//! 72-hour recovery window. 6 7use atproto_plc::{Did, Operation, VerifyingKey}; 8use chrono::{DateTime, Utc}; 9use clap::Parser; 10use reqwest::blocking::Client; 11use serde::Deserialize; 12use std::collections::HashMap; 13use std::process; 14 15/// Command-line arguments 16#[derive(Parser, Debug)] 17#[command( 18 name = "plc-fork-viz", 19 about = "Visualize forks in PLC audit logs", 20 long_about = "Fetches and visualizes fork points in DID operation chains from plc.directory,\nshowing which operations won/lost based on rotation key priority and recovery window rules" 21)] 22struct Args { 23 /// The DID to analyze (e.g., did:plc:ewvi7nxzyoun6zhxrhs64oiz) 24 #[arg(value_name = "DID")] 25 did: String, 26 27 /// Custom PLC directory URL (default: https://plc.directory) 28 #[arg(long, default_value = "https://plc.directory")] 29 plc_url: String, 30 31 /// Show detailed information for each operation 32 #[arg(short, long)] 33 verbose: bool, 34 35 /// Show timing information (timestamps and recovery window calculations) 36 #[arg(short, long)] 37 timing: bool, 38 39 /// Show full DIDs and CIDs instead of truncated versions 40 #[arg(long)] 41 full_ids: bool, 42 43 /// Output format: tree (default), json, or markdown 44 #[arg(short, long, default_value = "tree")] 45 format: OutputFormat, 46} 47 48#[derive(Debug, Clone, clap::ValueEnum)] 49enum OutputFormat { 50 Tree, 51 Json, 52 Markdown, 53} 54 55/// Audit log response from plc.directory 56#[derive(Debug, Deserialize, Clone)] 57struct AuditLogEntry { 58 /// The DID this operation is for 59 #[allow(dead_code)] 60 did: String, 61 62 /// The operation itself 63 operation: Operation, 64 65 /// CID of this operation 66 cid: String, 67 68 /// Timestamp when this operation was created 69 #[serde(rename = "createdAt")] 70 created_at: String, 71 72 /// Nullified flag (if this operation was invalidated by fork resolution) 73 #[allow(dead_code)] 74 #[serde(default)] 75 nullified: bool, 76} 77 78/// Represents a fork point in the operation chain 79#[derive(Debug, Clone)] 80struct ForkPoint { 81 /// The prev CID that all operations in this fork reference 82 prev_cid: String, 83 84 /// Competing operations at this fork 85 operations: Vec<ForkOperation>, 86 87 /// The winning operation (after fork resolution) 88 winner_cid: String, 89} 90 91/// An operation that's part of a fork 92#[derive(Debug, Clone)] 93struct ForkOperation { 94 cid: String, 95 operation: Operation, 96 timestamp: DateTime<Utc>, 97 signing_key_index: Option<usize>, 98 signing_key: Option<String>, 99 is_winner: bool, 100 rejection_reason: Option<String>, 101} 102 103fn main() { 104 let args = Args::parse(); 105 106 // Parse and validate the DID 107 let did = match Did::parse(&args.did) { 108 Ok(did) => did, 109 Err(e) => { 110 eprintln!("❌ Error: Invalid DID format: {}", e); 111 eprintln!(" Expected format: did:plc:<24 lowercase base32 characters>"); 112 process::exit(1); 113 } 114 }; 115 116 println!("🔍 Analyzing forks in: {}", did); 117 println!(" Source: {}", args.plc_url); 118 println!(); 119 120 // Fetch the audit log 121 let audit_log = match fetch_audit_log(&args.plc_url, &did) { 122 Ok(log) => log, 123 Err(e) => { 124 eprintln!("❌ Error: Failed to fetch audit log: {}", e); 125 process::exit(1); 126 } 127 }; 128 129 if audit_log.is_empty() { 130 eprintln!("❌ Error: No operations found in audit log"); 131 process::exit(1); 132 } 133 134 println!("📊 Audit log contains {} operations", audit_log.len()); 135 136 // Detect forks 137 let forks = detect_forks(&audit_log, &args); 138 139 if forks.is_empty() { 140 println!("\n✅ No forks detected - this is a linear operation chain"); 141 println!(" All operations form a single canonical path from genesis to tip."); 142 143 if args.verbose { 144 println!("\n📋 Linear chain visualization:"); 145 visualize_linear_chain(&audit_log, &args); 146 } 147 148 return; 149 } 150 151 println!("⚠️ Detected {} fork point(s)", forks.len()); 152 println!(); 153 154 // Visualize based on format 155 match args.format { 156 OutputFormat::Tree => visualize_tree(&audit_log, &forks, &args), 157 OutputFormat::Json => visualize_json(&forks), 158 OutputFormat::Markdown => visualize_markdown(&audit_log, &forks, &args), 159 } 160} 161 162/// Detect fork points in the audit log 163fn detect_forks(audit_log: &[AuditLogEntry], args: &Args) -> Vec<ForkPoint> { 164 let mut prev_to_operations: HashMap<String, Vec<AuditLogEntry>> = HashMap::new(); 165 166 // Group operations by their prev CID 167 for entry in audit_log { 168 if let Some(prev) = entry.operation.prev() { 169 prev_to_operations 170 .entry(prev.to_string()) 171 .or_default() 172 .push(entry.clone()); 173 } 174 } 175 176 // Build operation map for state reconstruction 177 let operation_map: HashMap<String, AuditLogEntry> = audit_log 178 .iter() 179 .map(|e| (e.cid.clone(), e.clone())) 180 .collect(); 181 182 let mut forks = Vec::new(); 183 184 // Find fork points (where multiple operations reference the same prev) 185 for (prev_cid, operations) in prev_to_operations { 186 if operations.len() > 1 { 187 if args.verbose { 188 println!("🔀 Fork detected at {}", truncate(&prev_cid, args)); 189 println!(" {} competing operations", operations.len()); 190 } 191 192 // Get the state at the prev operation to determine rotation keys 193 let state = if let Some(prev_entry) = operation_map.get(&prev_cid) { 194 get_state_from_operation(&prev_entry.operation) 195 } else { 196 // This shouldn't happen in a valid chain 197 continue; 198 }; 199 200 // Analyze each operation in the fork 201 let mut fork_ops = Vec::new(); 202 for entry in &operations { 203 let timestamp = parse_timestamp(&entry.created_at); 204 205 // Determine which rotation key signed this operation 206 let (signing_key_index, signing_key) = if !state.rotation_keys.is_empty() { 207 find_signing_key(&entry.operation, &state.rotation_keys) 208 } else { 209 (None, None) 210 }; 211 212 fork_ops.push(ForkOperation { 213 cid: entry.cid.clone(), 214 operation: entry.operation.clone(), 215 timestamp, 216 signing_key_index, 217 signing_key, 218 is_winner: false, 219 rejection_reason: None, 220 }); 221 } 222 223 // Resolve the fork to determine winner 224 let winner_cid = resolve_fork(&mut fork_ops); 225 226 forks.push(ForkPoint { 227 prev_cid, 228 operations: fork_ops, 229 winner_cid, 230 }); 231 } 232 } 233 234 // Sort forks chronologically 235 forks.sort_by_key(|f| { 236 f.operations 237 .iter() 238 .map(|op| op.timestamp) 239 .min() 240 .unwrap_or_else(Utc::now) 241 }); 242 243 forks 244} 245 246/// Resolve a fork point and mark the winner 247fn resolve_fork(fork_ops: &mut [ForkOperation]) -> String { 248 // Sort by timestamp (chronological order) 249 fork_ops.sort_by_key(|op| op.timestamp); 250 251 // First-received is the default winner 252 let mut winner_idx = 0; 253 fork_ops[0].is_winner = true; 254 255 // Check if any later operation can invalidate based on priority 256 for i in 1..fork_ops.len() { 257 let competing_key_idx = fork_ops[i].signing_key_index; 258 let winner_key_idx = fork_ops[winner_idx].signing_key_index; 259 260 match (competing_key_idx, winner_key_idx) { 261 (Some(competing_idx), Some(winner_idx_val)) => { 262 if competing_idx < winner_idx_val { 263 // Higher priority (lower index) 264 let time_diff = fork_ops[i].timestamp - fork_ops[winner_idx].timestamp; 265 266 if time_diff <= chrono::Duration::hours(72) { 267 // Within recovery window - this operation wins 268 fork_ops[winner_idx].is_winner = false; 269 fork_ops[winner_idx].rejection_reason = Some(format!( 270 "Invalidated by higher-priority key[{}] within recovery window", 271 competing_idx 272 )); 273 274 fork_ops[i].is_winner = true; 275 winner_idx = i; 276 } else { 277 // Outside recovery window 278 fork_ops[i].rejection_reason = Some(format!( 279 "Higher-priority key[{}] but outside 72-hour recovery window ({:.1}h late)", 280 competing_idx, 281 time_diff.num_hours() as f64 282 )); 283 } 284 } else { 285 // Lower priority 286 fork_ops[i].rejection_reason = Some(format!( 287 "Lower-priority key[{}] (current winner has key[{}])", 288 competing_idx, 289 winner_idx_val 290 )); 291 } 292 } 293 _ => { 294 fork_ops[i].rejection_reason = Some("Could not determine signing key".to_string()); 295 } 296 } 297 } 298 299 fork_ops[winner_idx].cid.clone() 300} 301 302/// Find which rotation key signed an operation 303fn find_signing_key(operation: &Operation, rotation_keys: &[String]) -> (Option<usize>, Option<String>) { 304 for (index, key_did) in rotation_keys.iter().enumerate() { 305 if let Ok(verifying_key) = VerifyingKey::from_did_key(key_did) { 306 if operation.verify(&[verifying_key]).is_ok() { 307 return (Some(index), Some(key_did.clone())); 308 } 309 } 310 } 311 (None, None) 312} 313 314/// Get state from an operation 315fn get_state_from_operation(operation: &Operation) -> atproto_plc::PlcState { 316 match operation { 317 Operation::PlcOperation { 318 rotation_keys, 319 verification_methods, 320 also_known_as, 321 services, 322 .. 323 } => atproto_plc::PlcState { 324 rotation_keys: rotation_keys.clone(), 325 verification_methods: verification_methods.clone(), 326 also_known_as: also_known_as.clone(), 327 services: services.clone(), 328 }, 329 _ => atproto_plc::PlcState::new(), 330 } 331} 332 333/// Parse ISO 8601 timestamp 334fn parse_timestamp(timestamp: &str) -> DateTime<Utc> { 335 timestamp 336 .parse::<DateTime<Utc>>() 337 .unwrap_or_else(|_| Utc::now()) 338} 339 340/// Visualize forks as a tree 341fn visualize_tree(audit_log: &[AuditLogEntry], forks: &[ForkPoint], args: &Args) { 342 println!("📊 Fork Visualization (Tree Format)"); 343 println!("═══════════════════════════════════════════════════════════════"); 344 println!(); 345 346 // Build a map of which operations are part of forks 347 let mut fork_map: HashMap<String, &ForkPoint> = HashMap::new(); 348 for fork in forks { 349 for op in &fork.operations { 350 fork_map.insert(op.cid.clone(), fork); 351 } 352 } 353 354 // Track which prev CIDs have been processed 355 let mut processed_forks: std::collections::HashSet<String> = std::collections::HashSet::new(); 356 357 for entry in audit_log.iter() { 358 let is_genesis = entry.operation.is_genesis(); 359 let prev = entry.operation.prev(); 360 361 // Check if this operation is part of a fork 362 if let Some(_prev_cid) = prev { 363 if let Some(fork) = fork_map.get(&entry.cid) { 364 // This is a fork point 365 if !processed_forks.contains(&fork.prev_cid) { 366 processed_forks.insert(fork.prev_cid.clone()); 367 368 println!("Fork at operation referencing {}", truncate(&fork.prev_cid, args)); 369 370 for (j, fork_op) in fork.operations.iter().enumerate() { 371 let symbol = if fork_op.is_winner { "" } else { "" }; 372 let color = if fork_op.is_winner { "🟢" } else { "🔴" }; 373 let prefix = if j == fork.operations.len() - 1 { "└─" } else { "├─" }; 374 375 println!( 376 " {} {} {} CID: {}", 377 prefix, 378 color, 379 symbol, 380 truncate(&fork_op.cid, args) 381 ); 382 383 if let Some(key_idx) = fork_op.signing_key_index { 384 println!(" │ Signed by: rotation_key[{}]", key_idx); 385 if args.verbose { 386 if let Some(key) = &fork_op.signing_key { 387 println!(" │ Key: {}", truncate(key, args)); 388 } 389 } 390 } 391 392 if args.timing { 393 println!( 394 " │ Timestamp: {}", 395 fork_op.timestamp.format("%Y-%m-%d %H:%M:%S UTC") 396 ); 397 } 398 399 if !fork_op.is_winner { 400 if let Some(reason) = &fork_op.rejection_reason { 401 println!(" │ Reason: {}", reason); 402 } 403 } else { 404 println!(" │ Status: CANONICAL (winner)"); 405 } 406 407 if args.verbose { 408 if let Some(Operation::PlcOperation { services, .. }) = Some(&fork_op.operation) { 409 if !services.is_empty() { 410 println!(" │ Services: {} configured", services.len()); 411 } 412 } 413 } 414 415 if j < fork.operations.len() - 1 { 416 println!(""); 417 } 418 } 419 println!(); 420 } 421 continue; 422 } 423 } 424 425 // Regular operation (not part of a fork) 426 if is_genesis { 427 println!("🌱 Genesis"); 428 println!(" CID: {}", truncate(&entry.cid, args)); 429 if args.timing { 430 println!(" Timestamp: {}", entry.created_at); 431 } 432 if args.verbose { 433 if let Operation::PlcOperation { rotation_keys, .. } = &entry.operation { 434 println!(" Rotation keys: {}", rotation_keys.len()); 435 } 436 } 437 println!(); 438 } 439 } 440 441 // Summary 442 println!("═══════════════════════════════════════════════════════════════"); 443 println!("📈 Summary:"); 444 println!(" Total operations: {}", audit_log.len()); 445 println!(" Fork points: {}", forks.len()); 446 447 let total_competing_ops: usize = forks.iter().map(|f| f.operations.len()).sum(); 448 let rejected_ops = total_competing_ops - forks.len(); 449 println!(" Rejected operations: {}", rejected_ops); 450 451 if !forks.is_empty() { 452 println!("\n🔐 Fork Resolution Details:"); 453 for (i, fork) in forks.iter().enumerate() { 454 let winner = fork.operations.iter().find(|op| op.is_winner).unwrap(); 455 println!( 456 " Fork {}: Winner is {} (signed by key[{}])", 457 i + 1, 458 truncate(&winner.cid, args), 459 winner.signing_key_index.unwrap_or(999) 460 ); 461 } 462 } 463} 464 465/// Visualize linear chain (no forks) 466fn visualize_linear_chain(audit_log: &[AuditLogEntry], args: &Args) { 467 for (i, entry) in audit_log.iter().enumerate() { 468 let symbol = if i == 0 { "🌱" } else { "" }; 469 println!("{} Operation {}: {}", symbol, i, truncate(&entry.cid, args)); 470 471 if args.timing { 472 println!(" Timestamp: {}", entry.created_at); 473 } 474 475 if args.verbose { 476 if let Some(prev) = entry.operation.prev() { 477 println!(" Previous: {}", truncate(prev, args)); 478 } 479 } 480 } 481} 482 483/// Visualize forks as JSON 484fn visualize_json(forks: &[ForkPoint]) { 485 #[derive(serde::Serialize)] 486 struct JsonFork { 487 prev_cid: String, 488 winner_cid: String, 489 operations: Vec<JsonForkOp>, 490 } 491 492 #[derive(serde::Serialize)] 493 struct JsonForkOp { 494 cid: String, 495 timestamp: String, 496 signing_key_index: Option<usize>, 497 is_winner: bool, 498 rejection_reason: Option<String>, 499 } 500 501 let json_forks: Vec<JsonFork> = forks 502 .iter() 503 .map(|f| JsonFork { 504 prev_cid: f.prev_cid.clone(), 505 winner_cid: f.winner_cid.clone(), 506 operations: f 507 .operations 508 .iter() 509 .map(|op| JsonForkOp { 510 cid: op.cid.clone(), 511 timestamp: op.timestamp.to_rfc3339(), 512 signing_key_index: op.signing_key_index, 513 is_winner: op.is_winner, 514 rejection_reason: op.rejection_reason.clone(), 515 }) 516 .collect(), 517 }) 518 .collect(); 519 520 println!( 521 "{}", 522 serde_json::to_string_pretty(&json_forks).unwrap() 523 ); 524} 525 526/// Visualize forks as Markdown 527fn visualize_markdown(audit_log: &[AuditLogEntry], forks: &[ForkPoint], args: &Args) { 528 println!("# PLC Fork Analysis Report\n"); 529 println!("**DID**: {}\n", args.did); 530 println!("**Total Operations**: {}", audit_log.len()); 531 println!("**Fork Points**: {}\n", forks.len()); 532 533 if forks.is_empty() { 534 println!("✅ No forks detected - linear operation chain\n"); 535 return; 536 } 537 538 println!("## Fork Details\n"); 539 540 for (i, fork) in forks.iter().enumerate() { 541 println!("### Fork {} (at {})\n", i + 1, truncate(&fork.prev_cid, args)); 542 println!("| Status | CID | Key Index | Timestamp | Reason |"); 543 println!("|--------|-----|-----------|-----------|--------|"); 544 545 for op in &fork.operations { 546 let status = if op.is_winner { "✅ Winner" } else { "❌ Rejected" }; 547 let key_idx = op 548 .signing_key_index 549 .map(|i| i.to_string()) 550 .unwrap_or_else(|| "?".to_string()); 551 let timestamp = op.timestamp.format("%Y-%m-%d %H:%M:%S"); 552 let reason = op 553 .rejection_reason 554 .as_deref() 555 .unwrap_or("Canonical operation"); 556 557 println!( 558 "| {} | {} | {} | {} | {} |", 559 status, 560 truncate(&op.cid, args), 561 key_idx, 562 timestamp, 563 reason 564 ); 565 } 566 567 println!(); 568 } 569 570 println!("## Summary\n"); 571 println!("- Total competing operations: {}", forks.iter().map(|f| f.operations.len()).sum::<usize>()); 572 println!("- Rejected operations: {}", forks.iter().map(|f| f.operations.len() - 1).sum::<usize>()); 573} 574 575/// Truncate a string for display 576fn truncate(s: &str, args: &Args) -> String { 577 if args.full_ids { 578 s.to_string() 579 } else { 580 if s.len() > 20 { 581 format!("{}...{}", &s[..8], &s[s.len() - 8..]) 582 } else { 583 s.to_string() 584 } 585 } 586} 587 588/// Fetch the audit log for a DID from plc.directory 589fn fetch_audit_log( 590 plc_url: &str, 591 did: &Did, 592) -> Result<Vec<AuditLogEntry>, Box<dyn std::error::Error>> { 593 let url = format!("{}/{}/log/audit", plc_url, did); 594 595 let client = Client::builder() 596 .user_agent("atproto-plc-fork-viz/0.2.0") 597 .timeout(std::time::Duration::from_secs(30)) 598 .build()?; 599 600 let response = client.get(&url).send()?; 601 602 if !response.status().is_success() { 603 return Err(format!( 604 "HTTP error: {} - {}", 605 response.status(), 606 response.text().unwrap_or_default() 607 ) 608 .into()); 609 } 610 611 let audit_log: Vec<AuditLogEntry> = response.json()?; 612 Ok(audit_log) 613}