High-performance implementation of plcbundle written in Rust
at main 751 lines 22 kB view raw
1//! Query engine and processor supporting simple path and JMESPath queries with parallel bundle processing and batched output 2use crate::constants; 3use crate::index::Index; 4use crate::options::{Options, QueryMode}; 5use anyhow::Result; 6use rayon::prelude::*; 7use sonic_rs::JsonValueTrait; 8use std::fs::File; 9use std::io::{BufRead, BufReader}; 10use std::path::PathBuf; 11use std::sync::{Arc, Mutex}; 12 13#[derive(Default, Clone, Debug)] 14pub struct Stats { 15 pub operations: usize, 16 pub matches: usize, 17 pub total_bytes: u64, 18 pub matched_bytes: u64, 19} 20 21/// Query engine that supports both simple and JMESPath queries 22pub enum QueryEngine { 23 JmesPath(jmespath::Expression<'static>), 24 Simple(Vec<String>), 25} 26 27impl QueryEngine { 28 pub fn new(query: &str, mode: QueryMode) -> Result<Self> { 29 match mode { 30 QueryMode::Simple => { 31 let path: Vec<String> = query.split('.').map(|s| s.to_string()).collect(); 32 Ok(QueryEngine::Simple(path)) 33 } 34 QueryMode::JmesPath => { 35 let expr = jmespath::compile(query) 36 .map_err(|e| anyhow::anyhow!("Failed to compile JMESPath: {}", e))?; 37 let expr = Box::leak(Box::new(expr)); 38 Ok(QueryEngine::JmesPath(expr.clone())) 39 } 40 } 41 } 42 43 pub fn query(&self, json_line: &str) -> Result<Option<String>> { 44 match self { 45 QueryEngine::Simple(path) => self.query_simple(json_line, path), 46 QueryEngine::JmesPath(expr) => self.query_jmespath(json_line, expr), 47 } 48 } 49 50 fn query_simple(&self, json_line: &str, path: &[String]) -> Result<Option<String>> { 51 let data: sonic_rs::Value = sonic_rs::from_str(json_line)?; 52 53 let mut current = &data; 54 for key in path { 55 match current.get(key) { 56 Some(v) => current = v, 57 None => return Ok(None), 58 } 59 } 60 61 if current.is_null() { 62 return Ok(None); 63 } 64 65 let output = if current.is_str() { 66 current.as_str().unwrap().to_string() 67 } else { 68 sonic_rs::to_string(current)? 69 }; 70 71 Ok(Some(output)) 72 } 73 74 fn query_jmespath( 75 &self, 76 json_line: &str, 77 expr: &jmespath::Expression<'_>, 78 ) -> Result<Option<String>> { 79 let data = jmespath::Variable::from_json(json_line) 80 .map_err(|e| anyhow::anyhow!("Failed to parse JSON: {}", e))?; 81 82 let result = expr 83 .search(&data) 84 .map_err(|e| anyhow::anyhow!("JMESPath search failed: {}", e))?; 85 86 if result.is_null() { 87 return Ok(None); 88 } 89 90 let output = if result.is_string() { 91 result.as_string().unwrap().to_string() 92 } else { 93 // Convert jmespath result to JSON string 94 // Note: jmespath uses serde_json internally, so we use serde_json here (not bundle/operation data) 95 serde_json::to_string(&*result) 96 .map_err(|e| anyhow::anyhow!("Failed to serialize result: {}", e))? 97 }; 98 99 Ok(Some(output)) 100 } 101} 102 103/// Main processor for PLC bundles 104pub struct Processor { 105 options: Options, 106 query_engine: QueryEngine, 107} 108 109pub trait OutputHandler: Send + Sync { 110 fn write_batch(&self, batch: &str) -> Result<()>; 111} 112 113pub struct OutputBuffer { 114 buffer: String, 115 capacity: usize, 116 count: usize, 117 matched_bytes: u64, 118} 119 120impl OutputBuffer { 121 pub fn new(capacity: usize) -> Self { 122 Self { 123 buffer: String::with_capacity(capacity * 100), 124 capacity, 125 count: 0, 126 matched_bytes: 0, 127 } 128 } 129 130 pub fn push(&mut self, line: &str) -> bool { 131 self.matched_bytes += line.len() as u64 + 1; 132 self.buffer.push_str(line); 133 self.buffer.push('\n'); 134 self.count += 1; 135 self.count >= self.capacity 136 } 137 138 pub fn flush(&mut self) -> String { 139 self.count = 0; 140 std::mem::replace(&mut self.buffer, String::with_capacity(self.capacity * 100)) 141 } 142 143 pub fn is_empty(&self) -> bool { 144 self.count == 0 145 } 146 147 pub fn get_matched_bytes(&self) -> u64 { 148 self.matched_bytes 149 } 150} 151 152impl Processor { 153 /// Create a new processor with the given options 154 pub fn new(options: Options) -> Result<Self> { 155 let query_engine = QueryEngine::new(&options.query, options.query_mode)?; 156 157 Ok(Self { 158 options, 159 query_engine, 160 }) 161 } 162 163 /// Load the bundle index 164 pub fn load_index(&self) -> Result<Index> { 165 Index::load(&self.options.directory) 166 } 167 168 /// Get the directory path 169 pub fn directory(&self) -> &PathBuf { 170 &self.options.directory 171 } 172 173 /// Process specified bundles 174 pub fn process<F>( 175 &self, 176 bundle_numbers: &[u32], 177 output_handler: Arc<dyn OutputHandler>, 178 progress_callback: Option<F>, 179 ) -> Result<Stats> 180 where 181 F: Fn(usize, &Stats) + Send + Sync, 182 { 183 if self.options.num_threads > 1 { 184 rayon::ThreadPoolBuilder::new() 185 .num_threads(self.options.num_threads) 186 .build_global()?; 187 } 188 189 let total_stats = Arc::new(Mutex::new(Stats::default())); 190 let callback = Arc::new(progress_callback); 191 192 let process_fn = |bundle_num: &u32| -> Result<()> { 193 let bundle_path = constants::bundle_path(&self.options.directory, *bundle_num); 194 195 if !bundle_path.exists() { 196 return Ok(()); 197 } 198 199 let stats = self.process_bundle(&bundle_path, &output_handler)?; 200 201 let mut total = total_stats.lock().unwrap(); 202 total.operations += stats.operations; 203 total.matches += stats.matches; 204 total.total_bytes += stats.total_bytes; 205 total.matched_bytes += stats.matched_bytes; 206 207 if let Some(ref cb) = *callback { 208 cb(1, &total); 209 } 210 211 Ok(()) 212 }; 213 214 if self.options.num_threads == 1 { 215 bundle_numbers.iter().try_for_each(process_fn)?; 216 } else { 217 bundle_numbers.par_iter().try_for_each(process_fn)?; 218 } 219 220 let final_stats = total_stats.lock().unwrap().clone(); 221 Ok(final_stats) 222 } 223 224 fn process_bundle( 225 &self, 226 bundle_path: &PathBuf, 227 output_handler: &Arc<dyn OutputHandler>, 228 ) -> Result<Stats> { 229 let file = File::open(bundle_path)?; 230 let decoder = zstd::Decoder::new(file)?; 231 let reader = BufReader::with_capacity(1024 * 1024, decoder); 232 233 let mut stats = Stats::default(); 234 let mut output_buffer = OutputBuffer::new(self.options.batch_size); 235 236 for line in reader.lines() { 237 let line = line?; 238 if line.is_empty() { 239 continue; 240 } 241 242 stats.operations += 1; 243 stats.total_bytes += line.len() as u64 + 1; 244 245 if let Some(result) = self.query_engine.query(&line)? { 246 stats.matches += 1; 247 if output_buffer.push(&result) { 248 output_handler.write_batch(&output_buffer.flush())?; 249 } 250 } 251 } 252 253 if !output_buffer.is_empty() { 254 output_handler.write_batch(&output_buffer.flush())?; 255 } 256 257 stats.matched_bytes = output_buffer.get_matched_bytes(); 258 Ok(stats) 259 } 260} 261 262/// Resolve bundle keyword to concrete bundle number 263fn resolve_bundle_keyword(keyword: &str, max_bundle: u32) -> Result<u32> { 264 let keyword = keyword.trim(); 265 266 if keyword == "root" { 267 return Ok(1); 268 } 269 270 if keyword == "head" { 271 return Ok(max_bundle); 272 } 273 274 // Handle "head~N" syntax 275 if let Some(rest) = keyword.strip_prefix("head~") { 276 let offset: u32 = rest 277 .parse() 278 .map_err(|_| anyhow::anyhow!("Invalid offset in 'head~{}': expected a number", rest))?; 279 if offset >= max_bundle { 280 anyhow::bail!( 281 "Offset {} in 'head~{}' exceeds maximum bundle {}", 282 offset, 283 rest, 284 max_bundle 285 ); 286 } 287 return Ok(max_bundle - offset); 288 } 289 290 // Handle "~N" shorthand syntax 291 if let Some(rest) = keyword.strip_prefix('~') { 292 let offset: u32 = rest 293 .parse() 294 .map_err(|_| anyhow::anyhow!("Invalid offset in '~{}': expected a number", rest))?; 295 if offset >= max_bundle { 296 anyhow::bail!( 297 "Offset {} in '~{}' exceeds maximum bundle {}", 298 offset, 299 rest, 300 max_bundle 301 ); 302 } 303 return Ok(max_bundle - offset); 304 } 305 306 // Not a keyword, try parsing as number 307 let num: u32 = keyword.parse().map_err(|_| { 308 anyhow::anyhow!( 309 "Invalid bundle specifier: '{}' (expected number, 'root', 'head', 'head~N', or '~N')", 310 keyword 311 ) 312 })?; 313 Ok(num) 314} 315 316/// Parse bundle range specification (e.g., "1-10,15,20-25", "head", "head~5", "root") 317pub fn parse_bundle_range(spec: &str, max_bundle: u32) -> Result<Vec<u32>> { 318 if max_bundle == 0 { 319 anyhow::bail!("No bundles available"); 320 } 321 322 let mut bundles = Vec::new(); 323 for part in spec.split(',') { 324 let part = part.trim(); 325 if part.is_empty() { 326 continue; 327 } 328 329 if part.contains('-') { 330 let range: Vec<&str> = part.split('-').collect(); 331 if range.len() != 2 { 332 anyhow::bail!("Invalid range format: {}", part); 333 } 334 let start_str = range[0].trim(); 335 let end_str = range[1].trim(); 336 337 let start = resolve_bundle_keyword(start_str, max_bundle)?; 338 let end = resolve_bundle_keyword(end_str, max_bundle)?; 339 340 if start == 0 || end == 0 { 341 anyhow::bail!("Bundle number cannot be 0"); 342 } 343 if start > end { 344 anyhow::bail!("Invalid range: {} > {} (start must be <= end)", start, end); 345 } 346 if start > max_bundle || end > max_bundle { 347 anyhow::bail!( 348 "Invalid range: {}-{} (exceeds maximum bundle {})", 349 start, 350 end, 351 max_bundle 352 ); 353 } 354 bundles.extend(start..=end); 355 } else { 356 let num = resolve_bundle_keyword(part, max_bundle)?; 357 if num == 0 { 358 anyhow::bail!("Bundle number cannot be 0"); 359 } 360 if num > max_bundle { 361 anyhow::bail!("Bundle number {} out of range (max: {})", num, max_bundle); 362 } 363 bundles.push(num); 364 } 365 } 366 bundles.sort_unstable(); 367 bundles.dedup(); 368 Ok(bundles) 369} 370 371/// Parse operation range specification (e.g., "0-999", "3255,553,0-9") 372/// Operations are 0-indexed global positions (0 = first operation, bundle 1 position 0) 373pub fn parse_operation_range(spec: &str, max_operation: u64) -> Result<Vec<u64>> { 374 use anyhow::Context; 375 376 if max_operation == 0 { 377 anyhow::bail!("No operations available"); 378 } 379 380 let mut operations = Vec::new(); 381 for part in spec.split(',') { 382 let part = part.trim(); 383 if part.is_empty() { 384 continue; 385 } 386 387 if part.contains('-') { 388 let range: Vec<&str> = part.split('-').collect(); 389 if range.len() != 2 { 390 anyhow::bail!("Invalid range format: {}", part); 391 } 392 let start_str = range[0].trim(); 393 let end_str = range[1].trim(); 394 395 let start: u64 = start_str 396 .parse() 397 .with_context(|| format!("Invalid operation number: {}", start_str))?; 398 let end: u64 = end_str 399 .parse() 400 .with_context(|| format!("Invalid operation number: {}", end_str))?; 401 402 if start > end { 403 anyhow::bail!("Invalid range: {} > {} (start must be <= end)", start, end); 404 } 405 if start > max_operation || end > max_operation { 406 anyhow::bail!( 407 "Invalid range: {}-{} (exceeds maximum operation {})", 408 start, 409 end, 410 max_operation 411 ); 412 } 413 operations.extend(start..=end); 414 } else { 415 let num: u64 = part 416 .parse() 417 .with_context(|| format!("Invalid operation number: {}", part))?; 418 if num > max_operation { 419 anyhow::bail!( 420 "Operation number {} out of range (max: {})", 421 num, 422 max_operation 423 ); 424 } 425 operations.push(num); 426 } 427 } 428 operations.sort_unstable(); 429 operations.dedup(); 430 Ok(operations) 431} 432 433#[cfg(test)] 434mod tests { 435 use super::*; 436 437 #[test] 438 fn test_query_engine_simple() { 439 let engine = QueryEngine::new("did", QueryMode::Simple).unwrap(); 440 let json = r#"{"did": "did:plc:test", "operation": {"type": "create"}}"#; 441 let result = engine.query(json).unwrap(); 442 assert_eq!(result, Some("did:plc:test".to_string())); 443 } 444 445 #[test] 446 fn test_query_engine_simple_nested() { 447 let engine = QueryEngine::new("operation.type", QueryMode::Simple).unwrap(); 448 let json = r#"{"did": "did:plc:test", "operation": {"type": "create"}}"#; 449 let result = engine.query(json).unwrap(); 450 assert_eq!(result, Some("create".to_string())); 451 } 452 453 #[test] 454 fn test_query_engine_simple_missing() { 455 let engine = QueryEngine::new("missing.field", QueryMode::Simple).unwrap(); 456 let json = r#"{"did": "did:plc:test"}"#; 457 let result = engine.query(json).unwrap(); 458 assert_eq!(result, None); 459 } 460 461 #[test] 462 fn test_query_engine_simple_null() { 463 let engine = QueryEngine::new("null_field", QueryMode::Simple).unwrap(); 464 let json = r#"{"null_field": null}"#; 465 let result = engine.query(json).unwrap(); 466 assert_eq!(result, None); 467 } 468 469 #[test] 470 fn test_query_engine_jmespath() { 471 let engine = QueryEngine::new("did", QueryMode::JmesPath).unwrap(); 472 let json = r#"{"did": "did:plc:test", "operation": {"type": "create"}}"#; 473 let result = engine.query(json).unwrap(); 474 assert_eq!(result, Some("did:plc:test".to_string())); 475 } 476 477 #[test] 478 fn test_query_engine_jmespath_nested() { 479 let engine = QueryEngine::new("operation.type", QueryMode::JmesPath).unwrap(); 480 let json = r#"{"did": "did:plc:test", "operation": {"type": "create"}}"#; 481 let result = engine.query(json).unwrap(); 482 assert_eq!(result, Some("create".to_string())); 483 } 484 485 #[test] 486 fn test_output_buffer_new() { 487 let buffer = OutputBuffer::new(100); 488 assert!(buffer.is_empty()); 489 assert_eq!(buffer.get_matched_bytes(), 0); 490 } 491 492 #[test] 493 fn test_output_buffer_push() { 494 let mut buffer = OutputBuffer::new(2); 495 assert!(!buffer.push("line1")); // count = 1, capacity = 2, so false 496 // When we push line2, count becomes 2, which equals capacity, so returns true 497 assert!(buffer.push("line2")); // Should return true when capacity reached 498 // After flush, count resets to 0 499 buffer.flush(); 500 assert!(!buffer.push("line3")); // count = 1 again 501 } 502 503 #[test] 504 fn test_output_buffer_flush() { 505 let mut buffer = OutputBuffer::new(10); 506 buffer.push("line1"); 507 buffer.push("line2"); 508 509 let flushed = buffer.flush(); 510 assert_eq!(flushed, "line1\nline2\n"); 511 assert!(buffer.is_empty()); 512 } 513 514 #[test] 515 fn test_output_buffer_matched_bytes() { 516 let mut buffer = OutputBuffer::new(10); 517 buffer.push("line1"); 518 buffer.push("line2"); 519 520 // Each line adds len + 1 (for newline) 521 assert_eq!(buffer.get_matched_bytes(), 5 + 1 + 5 + 1); 522 } 523 524 #[test] 525 fn test_parse_bundle_range_single() { 526 let result = parse_bundle_range("1", 10).unwrap(); 527 assert_eq!(result, vec![1]); 528 } 529 530 #[test] 531 fn test_parse_bundle_range_multiple() { 532 let result = parse_bundle_range("1,3,5", 10).unwrap(); 533 assert_eq!(result, vec![1, 3, 5]); 534 } 535 536 #[test] 537 fn test_parse_bundle_range_range() { 538 let result = parse_bundle_range("1-3", 10).unwrap(); 539 assert_eq!(result, vec![1, 2, 3]); 540 } 541 542 #[test] 543 fn test_parse_bundle_range_mixed() { 544 let result = parse_bundle_range("1,3-5,7", 10).unwrap(); 545 assert_eq!(result, vec![1, 3, 4, 5, 7]); 546 } 547 548 #[test] 549 fn test_parse_bundle_range_dedup() { 550 let result = parse_bundle_range("1,1,2,2", 10).unwrap(); 551 assert_eq!(result, vec![1, 2]); 552 } 553 554 #[test] 555 fn test_parse_bundle_range_empty_max() { 556 let result = parse_bundle_range("1", 0); 557 assert!(result.is_err()); 558 assert!( 559 result 560 .unwrap_err() 561 .to_string() 562 .contains("No bundles available") 563 ); 564 } 565 566 #[test] 567 fn test_parse_bundle_range_out_of_range() { 568 let result = parse_bundle_range("11", 10); 569 assert!(result.is_err()); 570 assert!(result.unwrap_err().to_string().contains("out of range")); 571 } 572 573 #[test] 574 fn test_parse_bundle_range_invalid_range() { 575 let result = parse_bundle_range("5-3", 10); 576 assert!(result.is_err()); 577 assert!( 578 result 579 .unwrap_err() 580 .to_string() 581 .contains("start must be <= end") 582 ); 583 } 584 585 #[test] 586 fn test_parse_bundle_range_invalid_format() { 587 let result = parse_bundle_range("1-2-3", 10); 588 assert!(result.is_err()); 589 assert!( 590 result 591 .unwrap_err() 592 .to_string() 593 .contains("Invalid range format") 594 ); 595 } 596 597 #[test] 598 fn test_parse_bundle_range_keyword_root() { 599 let result = parse_bundle_range("root", 10).unwrap(); 600 assert_eq!(result, vec![1]); 601 } 602 603 #[test] 604 fn test_parse_bundle_range_keyword_head() { 605 let result = parse_bundle_range("head", 10).unwrap(); 606 assert_eq!(result, vec![10]); 607 } 608 609 #[test] 610 fn test_parse_bundle_range_keyword_head_offset() { 611 let result = parse_bundle_range("head~3", 10).unwrap(); 612 assert_eq!(result, vec![7]); // 10 - 3 = 7 613 } 614 615 #[test] 616 fn test_parse_bundle_range_keyword_shorthand_offset() { 617 let result = parse_bundle_range("~2", 10).unwrap(); 618 assert_eq!(result, vec![8]); // 10 - 2 = 8 619 } 620 621 #[test] 622 fn test_parse_bundle_range_keyword_mixed() { 623 let result = parse_bundle_range("root,head,head~1", 10).unwrap(); 624 assert_eq!(result, vec![1, 9, 10]); 625 } 626 627 #[test] 628 fn test_parse_bundle_range_keyword_head_offset_invalid() { 629 let result = parse_bundle_range("head~10", 10); 630 assert!(result.is_err()); 631 assert!( 632 result 633 .unwrap_err() 634 .to_string() 635 .contains("exceeds maximum bundle") 636 ); 637 } 638 639 #[test] 640 fn test_parse_bundle_range_keyword_shorthand_offset_invalid() { 641 let result = parse_bundle_range("~10", 10); 642 assert!(result.is_err()); 643 assert!( 644 result 645 .unwrap_err() 646 .to_string() 647 .contains("exceeds maximum bundle") 648 ); 649 } 650 651 #[test] 652 fn test_parse_bundle_range_keyword_invalid_offset() { 653 let result = parse_bundle_range("head~abc", 10); 654 assert!(result.is_err()); 655 assert!(result.unwrap_err().to_string().contains("Invalid offset")); 656 } 657 658 #[test] 659 fn test_parse_operation_range_single() { 660 let result = parse_operation_range("0", 1000).unwrap(); 661 assert_eq!(result, vec![0]); 662 } 663 664 #[test] 665 fn test_parse_operation_range_multiple() { 666 let result = parse_operation_range("0,5,10", 1000).unwrap(); 667 assert_eq!(result, vec![0, 5, 10]); 668 } 669 670 #[test] 671 fn test_parse_operation_range_range() { 672 let result = parse_operation_range("0-2", 1000).unwrap(); 673 assert_eq!(result, vec![0, 1, 2]); 674 } 675 676 #[test] 677 fn test_parse_operation_range_mixed() { 678 let result = parse_operation_range("0,5-7,10", 1000).unwrap(); 679 assert_eq!(result, vec![0, 5, 6, 7, 10]); 680 } 681 682 #[test] 683 fn test_parse_operation_range_dedup() { 684 let result = parse_operation_range("0,0,1,1", 1000).unwrap(); 685 assert_eq!(result, vec![0, 1]); 686 } 687 688 #[test] 689 fn test_parse_operation_range_empty_max() { 690 let result = parse_operation_range("0", 0); 691 assert!(result.is_err()); 692 assert!( 693 result 694 .unwrap_err() 695 .to_string() 696 .contains("No operations available") 697 ); 698 } 699 700 #[test] 701 fn test_parse_operation_range_out_of_range() { 702 let result = parse_operation_range("1001", 1000); 703 assert!(result.is_err()); 704 assert!(result.unwrap_err().to_string().contains("out of range")); 705 } 706 707 #[test] 708 fn test_parse_operation_range_invalid_range() { 709 let result = parse_operation_range("10-5", 1000); 710 assert!(result.is_err()); 711 assert!( 712 result 713 .unwrap_err() 714 .to_string() 715 .contains("start must be <= end") 716 ); 717 } 718 719 #[test] 720 fn test_parse_operation_range_invalid_format() { 721 let result = parse_operation_range("1-2-3", 1000); 722 assert!(result.is_err()); 723 assert!( 724 result 725 .unwrap_err() 726 .to_string() 727 .contains("Invalid range format") 728 ); 729 } 730 731 #[test] 732 fn test_parse_operation_range_invalid_number() { 733 let result = parse_operation_range("abc", 1000); 734 assert!(result.is_err()); 735 assert!( 736 result 737 .unwrap_err() 738 .to_string() 739 .contains("Invalid operation number") 740 ); 741 } 742 743 #[test] 744 fn test_stats_default() { 745 let stats = Stats::default(); 746 assert_eq!(stats.operations, 0); 747 assert_eq!(stats.matches, 0); 748 assert_eq!(stats.total_bytes, 0); 749 assert_eq!(stats.matched_bytes, 0); 750 } 751}