forked from
smokesignal.events/atproto-plc
Rust and WASM did-method-plc tools and structures
1//! PLC Fork Visualizer
2//!
3//! This binary fetches DID audit logs and visualizes any forks in the operation chain,
4//! showing which operations won/lost and why based on rotation key priority and the
5//! 72-hour recovery window.
6
7use atproto_plc::{Did, Operation, VerifyingKey};
8use chrono::{DateTime, Utc};
9use clap::Parser;
10use reqwest::blocking::Client;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::process;
14
15/// Command-line arguments
16#[derive(Parser, Debug)]
17#[command(
18 name = "plc-fork-viz",
19 about = "Visualize forks in PLC audit logs",
20 long_about = "Fetches and visualizes fork points in DID operation chains from plc.directory,\nshowing which operations won/lost based on rotation key priority and recovery window rules"
21)]
22struct Args {
23 /// The DID to analyze (e.g., did:plc:ewvi7nxzyoun6zhxrhs64oiz)
24 #[arg(value_name = "DID")]
25 did: String,
26
27 /// Custom PLC directory URL (default: https://plc.directory)
28 #[arg(long, default_value = "https://plc.directory")]
29 plc_url: String,
30
31 /// Show detailed information for each operation
32 #[arg(short, long)]
33 verbose: bool,
34
35 /// Show timing information (timestamps and recovery window calculations)
36 #[arg(short, long)]
37 timing: bool,
38
39 /// Show full DIDs and CIDs instead of truncated versions
40 #[arg(long)]
41 full_ids: bool,
42
43 /// Output format: tree (default), json, or markdown
44 #[arg(short, long, default_value = "tree")]
45 format: OutputFormat,
46}
47
48#[derive(Debug, Clone, clap::ValueEnum)]
49enum OutputFormat {
50 Tree,
51 Json,
52 Markdown,
53}
54
55/// Audit log response from plc.directory
56#[derive(Debug, Deserialize, Clone)]
57struct AuditLogEntry {
58 /// The DID this operation is for
59 #[allow(dead_code)]
60 did: String,
61
62 /// The operation itself
63 operation: Operation,
64
65 /// CID of this operation
66 cid: String,
67
68 /// Timestamp when this operation was created
69 #[serde(rename = "createdAt")]
70 created_at: String,
71
72 /// Nullified flag (if this operation was invalidated by fork resolution)
73 #[allow(dead_code)]
74 #[serde(default)]
75 nullified: bool,
76}
77
78/// Represents a fork point in the operation chain
79#[derive(Debug, Clone)]
80struct ForkPoint {
81 /// The prev CID that all operations in this fork reference
82 prev_cid: String,
83
84 /// Competing operations at this fork
85 operations: Vec<ForkOperation>,
86
87 /// The winning operation (after fork resolution)
88 winner_cid: String,
89}
90
91/// An operation that's part of a fork
92#[derive(Debug, Clone)]
93struct ForkOperation {
94 cid: String,
95 operation: Operation,
96 timestamp: DateTime<Utc>,
97 signing_key_index: Option<usize>,
98 signing_key: Option<String>,
99 is_winner: bool,
100 rejection_reason: Option<String>,
101}
102
103fn main() {
104 let args = Args::parse();
105
106 // Parse and validate the DID
107 let did = match Did::parse(&args.did) {
108 Ok(did) => did,
109 Err(e) => {
110 eprintln!("❌ Error: Invalid DID format: {}", e);
111 eprintln!(" Expected format: did:plc:<24 lowercase base32 characters>");
112 process::exit(1);
113 }
114 };
115
116 println!("🔍 Analyzing forks in: {}", did);
117 println!(" Source: {}", args.plc_url);
118 println!();
119
120 // Fetch the audit log
121 let audit_log = match fetch_audit_log(&args.plc_url, &did) {
122 Ok(log) => log,
123 Err(e) => {
124 eprintln!("❌ Error: Failed to fetch audit log: {}", e);
125 process::exit(1);
126 }
127 };
128
129 if audit_log.is_empty() {
130 eprintln!("❌ Error: No operations found in audit log");
131 process::exit(1);
132 }
133
134 println!("📊 Audit log contains {} operations", audit_log.len());
135
136 // Detect forks
137 let forks = detect_forks(&audit_log, &args);
138
139 if forks.is_empty() {
140 println!("\n✅ No forks detected - this is a linear operation chain");
141 println!(" All operations form a single canonical path from genesis to tip.");
142
143 if args.verbose {
144 println!("\n📋 Linear chain visualization:");
145 visualize_linear_chain(&audit_log, &args);
146 }
147
148 return;
149 }
150
151 println!("⚠️ Detected {} fork point(s)", forks.len());
152 println!();
153
154 // Visualize based on format
155 match args.format {
156 OutputFormat::Tree => visualize_tree(&audit_log, &forks, &args),
157 OutputFormat::Json => visualize_json(&forks),
158 OutputFormat::Markdown => visualize_markdown(&audit_log, &forks, &args),
159 }
160}
161
162/// Detect fork points in the audit log
163fn detect_forks(audit_log: &[AuditLogEntry], args: &Args) -> Vec<ForkPoint> {
164 let mut prev_to_operations: HashMap<String, Vec<AuditLogEntry>> = HashMap::new();
165
166 // Group operations by their prev CID
167 for entry in audit_log {
168 if let Some(prev) = entry.operation.prev() {
169 prev_to_operations
170 .entry(prev.to_string())
171 .or_default()
172 .push(entry.clone());
173 }
174 }
175
176 // Build operation map for state reconstruction
177 let operation_map: HashMap<String, AuditLogEntry> = audit_log
178 .iter()
179 .map(|e| (e.cid.clone(), e.clone()))
180 .collect();
181
182 let mut forks = Vec::new();
183
184 // Find fork points (where multiple operations reference the same prev)
185 for (prev_cid, operations) in prev_to_operations {
186 if operations.len() > 1 {
187 if args.verbose {
188 println!("🔀 Fork detected at {}", truncate(&prev_cid, args));
189 println!(" {} competing operations", operations.len());
190 }
191
192 // Get the state at the prev operation to determine rotation keys
193 let state = if let Some(prev_entry) = operation_map.get(&prev_cid) {
194 get_state_from_operation(&prev_entry.operation)
195 } else {
196 // This shouldn't happen in a valid chain
197 continue;
198 };
199
200 // Analyze each operation in the fork
201 let mut fork_ops = Vec::new();
202 for entry in &operations {
203 let timestamp = parse_timestamp(&entry.created_at);
204
205 // Determine which rotation key signed this operation
206 let (signing_key_index, signing_key) = if !state.rotation_keys.is_empty() {
207 find_signing_key(&entry.operation, &state.rotation_keys)
208 } else {
209 (None, None)
210 };
211
212 fork_ops.push(ForkOperation {
213 cid: entry.cid.clone(),
214 operation: entry.operation.clone(),
215 timestamp,
216 signing_key_index,
217 signing_key,
218 is_winner: false,
219 rejection_reason: None,
220 });
221 }
222
223 // Resolve the fork to determine winner
224 let winner_cid = resolve_fork(&mut fork_ops);
225
226 forks.push(ForkPoint {
227 prev_cid,
228 operations: fork_ops,
229 winner_cid,
230 });
231 }
232 }
233
234 // Sort forks chronologically
235 forks.sort_by_key(|f| {
236 f.operations
237 .iter()
238 .map(|op| op.timestamp)
239 .min()
240 .unwrap_or_else(Utc::now)
241 });
242
243 forks
244}
245
246/// Resolve a fork point and mark the winner
247fn resolve_fork(fork_ops: &mut [ForkOperation]) -> String {
248 // Sort by timestamp (chronological order)
249 fork_ops.sort_by_key(|op| op.timestamp);
250
251 // First-received is the default winner
252 let mut winner_idx = 0;
253 fork_ops[0].is_winner = true;
254
255 // Check if any later operation can invalidate based on priority
256 for i in 1..fork_ops.len() {
257 let competing_key_idx = fork_ops[i].signing_key_index;
258 let winner_key_idx = fork_ops[winner_idx].signing_key_index;
259
260 match (competing_key_idx, winner_key_idx) {
261 (Some(competing_idx), Some(winner_idx_val)) => {
262 if competing_idx < winner_idx_val {
263 // Higher priority (lower index)
264 let time_diff = fork_ops[i].timestamp - fork_ops[winner_idx].timestamp;
265
266 if time_diff <= chrono::Duration::hours(72) {
267 // Within recovery window - this operation wins
268 fork_ops[winner_idx].is_winner = false;
269 fork_ops[winner_idx].rejection_reason = Some(format!(
270 "Invalidated by higher-priority key[{}] within recovery window",
271 competing_idx
272 ));
273
274 fork_ops[i].is_winner = true;
275 winner_idx = i;
276 } else {
277 // Outside recovery window
278 fork_ops[i].rejection_reason = Some(format!(
279 "Higher-priority key[{}] but outside 72-hour recovery window ({:.1}h late)",
280 competing_idx,
281 time_diff.num_hours() as f64
282 ));
283 }
284 } else {
285 // Lower priority
286 fork_ops[i].rejection_reason = Some(format!(
287 "Lower-priority key[{}] (current winner has key[{}])",
288 competing_idx,
289 winner_idx_val
290 ));
291 }
292 }
293 _ => {
294 fork_ops[i].rejection_reason = Some("Could not determine signing key".to_string());
295 }
296 }
297 }
298
299 fork_ops[winner_idx].cid.clone()
300}
301
302/// Find which rotation key signed an operation
303fn find_signing_key(operation: &Operation, rotation_keys: &[String]) -> (Option<usize>, Option<String>) {
304 for (index, key_did) in rotation_keys.iter().enumerate() {
305 if let Ok(verifying_key) = VerifyingKey::from_did_key(key_did) {
306 if operation.verify(&[verifying_key]).is_ok() {
307 return (Some(index), Some(key_did.clone()));
308 }
309 }
310 }
311 (None, None)
312}
313
314/// Get state from an operation
315fn get_state_from_operation(operation: &Operation) -> atproto_plc::PlcState {
316 match operation {
317 Operation::PlcOperation {
318 rotation_keys,
319 verification_methods,
320 also_known_as,
321 services,
322 ..
323 } => atproto_plc::PlcState {
324 rotation_keys: rotation_keys.clone(),
325 verification_methods: verification_methods.clone(),
326 also_known_as: also_known_as.clone(),
327 services: services.clone(),
328 },
329 _ => atproto_plc::PlcState::new(),
330 }
331}
332
333/// Parse ISO 8601 timestamp
334fn parse_timestamp(timestamp: &str) -> DateTime<Utc> {
335 timestamp
336 .parse::<DateTime<Utc>>()
337 .unwrap_or_else(|_| Utc::now())
338}
339
340/// Visualize forks as a tree
341fn visualize_tree(audit_log: &[AuditLogEntry], forks: &[ForkPoint], args: &Args) {
342 println!("📊 Fork Visualization (Tree Format)");
343 println!("═══════════════════════════════════════════════════════════════");
344 println!();
345
346 // Build a map of which operations are part of forks
347 let mut fork_map: HashMap<String, &ForkPoint> = HashMap::new();
348 for fork in forks {
349 for op in &fork.operations {
350 fork_map.insert(op.cid.clone(), fork);
351 }
352 }
353
354 // Track which prev CIDs have been processed
355 let mut processed_forks: std::collections::HashSet<String> = std::collections::HashSet::new();
356
357 for entry in audit_log.iter() {
358 let is_genesis = entry.operation.is_genesis();
359 let prev = entry.operation.prev();
360
361 // Check if this operation is part of a fork
362 if let Some(_prev_cid) = prev {
363 if let Some(fork) = fork_map.get(&entry.cid) {
364 // This is a fork point
365 if !processed_forks.contains(&fork.prev_cid) {
366 processed_forks.insert(fork.prev_cid.clone());
367
368 println!("Fork at operation referencing {}", truncate(&fork.prev_cid, args));
369
370 for (j, fork_op) in fork.operations.iter().enumerate() {
371 let symbol = if fork_op.is_winner { "✓" } else { "✗" };
372 let color = if fork_op.is_winner { "🟢" } else { "🔴" };
373 let prefix = if j == fork.operations.len() - 1 { "└─" } else { "├─" };
374
375 println!(
376 " {} {} {} CID: {}",
377 prefix,
378 color,
379 symbol,
380 truncate(&fork_op.cid, args)
381 );
382
383 if let Some(key_idx) = fork_op.signing_key_index {
384 println!(" │ Signed by: rotation_key[{}]", key_idx);
385 if args.verbose {
386 if let Some(key) = &fork_op.signing_key {
387 println!(" │ Key: {}", truncate(key, args));
388 }
389 }
390 }
391
392 if args.timing {
393 println!(
394 " │ Timestamp: {}",
395 fork_op.timestamp.format("%Y-%m-%d %H:%M:%S UTC")
396 );
397 }
398
399 if !fork_op.is_winner {
400 if let Some(reason) = &fork_op.rejection_reason {
401 println!(" │ Reason: {}", reason);
402 }
403 } else {
404 println!(" │ Status: CANONICAL (winner)");
405 }
406
407 if args.verbose {
408 if let Some(Operation::PlcOperation { services, .. }) = Some(&fork_op.operation) {
409 if !services.is_empty() {
410 println!(" │ Services: {} configured", services.len());
411 }
412 }
413 }
414
415 if j < fork.operations.len() - 1 {
416 println!(" │");
417 }
418 }
419 println!();
420 }
421 continue;
422 }
423 }
424
425 // Regular operation (not part of a fork)
426 if is_genesis {
427 println!("🌱 Genesis");
428 println!(" CID: {}", truncate(&entry.cid, args));
429 if args.timing {
430 println!(" Timestamp: {}", entry.created_at);
431 }
432 if args.verbose {
433 if let Operation::PlcOperation { rotation_keys, .. } = &entry.operation {
434 println!(" Rotation keys: {}", rotation_keys.len());
435 }
436 }
437 println!();
438 }
439 }
440
441 // Summary
442 println!("═══════════════════════════════════════════════════════════════");
443 println!("📈 Summary:");
444 println!(" Total operations: {}", audit_log.len());
445 println!(" Fork points: {}", forks.len());
446
447 let total_competing_ops: usize = forks.iter().map(|f| f.operations.len()).sum();
448 let rejected_ops = total_competing_ops - forks.len();
449 println!(" Rejected operations: {}", rejected_ops);
450
451 if !forks.is_empty() {
452 println!("\n🔐 Fork Resolution Details:");
453 for (i, fork) in forks.iter().enumerate() {
454 let winner = fork.operations.iter().find(|op| op.is_winner).unwrap();
455 println!(
456 " Fork {}: Winner is {} (signed by key[{}])",
457 i + 1,
458 truncate(&winner.cid, args),
459 winner.signing_key_index.unwrap_or(999)
460 );
461 }
462 }
463}
464
465/// Visualize linear chain (no forks)
466fn visualize_linear_chain(audit_log: &[AuditLogEntry], args: &Args) {
467 for (i, entry) in audit_log.iter().enumerate() {
468 let symbol = if i == 0 { "🌱" } else { " ↓" };
469 println!("{} Operation {}: {}", symbol, i, truncate(&entry.cid, args));
470
471 if args.timing {
472 println!(" Timestamp: {}", entry.created_at);
473 }
474
475 if args.verbose {
476 if let Some(prev) = entry.operation.prev() {
477 println!(" Previous: {}", truncate(prev, args));
478 }
479 }
480 }
481}
482
483/// Visualize forks as JSON
484fn visualize_json(forks: &[ForkPoint]) {
485 #[derive(serde::Serialize)]
486 struct JsonFork {
487 prev_cid: String,
488 winner_cid: String,
489 operations: Vec<JsonForkOp>,
490 }
491
492 #[derive(serde::Serialize)]
493 struct JsonForkOp {
494 cid: String,
495 timestamp: String,
496 signing_key_index: Option<usize>,
497 is_winner: bool,
498 rejection_reason: Option<String>,
499 }
500
501 let json_forks: Vec<JsonFork> = forks
502 .iter()
503 .map(|f| JsonFork {
504 prev_cid: f.prev_cid.clone(),
505 winner_cid: f.winner_cid.clone(),
506 operations: f
507 .operations
508 .iter()
509 .map(|op| JsonForkOp {
510 cid: op.cid.clone(),
511 timestamp: op.timestamp.to_rfc3339(),
512 signing_key_index: op.signing_key_index,
513 is_winner: op.is_winner,
514 rejection_reason: op.rejection_reason.clone(),
515 })
516 .collect(),
517 })
518 .collect();
519
520 println!(
521 "{}",
522 serde_json::to_string_pretty(&json_forks).unwrap()
523 );
524}
525
526/// Visualize forks as Markdown
527fn visualize_markdown(audit_log: &[AuditLogEntry], forks: &[ForkPoint], args: &Args) {
528 println!("# PLC Fork Analysis Report\n");
529 println!("**DID**: {}\n", args.did);
530 println!("**Total Operations**: {}", audit_log.len());
531 println!("**Fork Points**: {}\n", forks.len());
532
533 if forks.is_empty() {
534 println!("✅ No forks detected - linear operation chain\n");
535 return;
536 }
537
538 println!("## Fork Details\n");
539
540 for (i, fork) in forks.iter().enumerate() {
541 println!("### Fork {} (at {})\n", i + 1, truncate(&fork.prev_cid, args));
542 println!("| Status | CID | Key Index | Timestamp | Reason |");
543 println!("|--------|-----|-----------|-----------|--------|");
544
545 for op in &fork.operations {
546 let status = if op.is_winner { "✅ Winner" } else { "❌ Rejected" };
547 let key_idx = op
548 .signing_key_index
549 .map(|i| i.to_string())
550 .unwrap_or_else(|| "?".to_string());
551 let timestamp = op.timestamp.format("%Y-%m-%d %H:%M:%S");
552 let reason = op
553 .rejection_reason
554 .as_deref()
555 .unwrap_or("Canonical operation");
556
557 println!(
558 "| {} | {} | {} | {} | {} |",
559 status,
560 truncate(&op.cid, args),
561 key_idx,
562 timestamp,
563 reason
564 );
565 }
566
567 println!();
568 }
569
570 println!("## Summary\n");
571 println!("- Total competing operations: {}", forks.iter().map(|f| f.operations.len()).sum::<usize>());
572 println!("- Rejected operations: {}", forks.iter().map(|f| f.operations.len() - 1).sum::<usize>());
573}
574
575/// Truncate a string for display
576fn truncate(s: &str, args: &Args) -> String {
577 if args.full_ids {
578 s.to_string()
579 } else {
580 if s.len() > 20 {
581 format!("{}...{}", &s[..8], &s[s.len() - 8..])
582 } else {
583 s.to_string()
584 }
585 }
586}
587
588/// Fetch the audit log for a DID from plc.directory
589fn fetch_audit_log(
590 plc_url: &str,
591 did: &Did,
592) -> Result<Vec<AuditLogEntry>, Box<dyn std::error::Error>> {
593 let url = format!("{}/{}/log/audit", plc_url, did);
594
595 let client = Client::builder()
596 .user_agent("atproto-plc-fork-viz/0.2.0")
597 .timeout(std::time::Duration::from_secs(30))
598 .build()?;
599
600 let response = client.get(&url).send()?;
601
602 if !response.status().is_success() {
603 return Err(format!(
604 "HTTP error: {} - {}",
605 response.status(),
606 response.text().unwrap_or_default()
607 )
608 .into());
609 }
610
611 let audit_log: Vec<AuditLogEntry> = response.json()?;
612 Ok(audit_log)
613}