High-performance implementation of plcbundle written in Rust
1//! Query engine and processor supporting simple path and JMESPath queries with parallel bundle processing and batched output
2use crate::constants;
3use crate::index::Index;
4use crate::options::{Options, QueryMode};
5use anyhow::Result;
6use rayon::prelude::*;
7use sonic_rs::JsonValueTrait;
8use std::fs::File;
9use std::io::{BufRead, BufReader};
10use std::path::PathBuf;
11use std::sync::{Arc, Mutex};
12
13#[derive(Default, Clone, Debug)]
14pub struct Stats {
15 pub operations: usize,
16 pub matches: usize,
17 pub total_bytes: u64,
18 pub matched_bytes: u64,
19}
20
21/// Query engine that supports both simple and JMESPath queries
22pub enum QueryEngine {
23 JmesPath(jmespath::Expression<'static>),
24 Simple(Vec<String>),
25}
26
27impl QueryEngine {
28 pub fn new(query: &str, mode: QueryMode) -> Result<Self> {
29 match mode {
30 QueryMode::Simple => {
31 let path: Vec<String> = query.split('.').map(|s| s.to_string()).collect();
32 Ok(QueryEngine::Simple(path))
33 }
34 QueryMode::JmesPath => {
35 let expr = jmespath::compile(query)
36 .map_err(|e| anyhow::anyhow!("Failed to compile JMESPath: {}", e))?;
37 let expr = Box::leak(Box::new(expr));
38 Ok(QueryEngine::JmesPath(expr.clone()))
39 }
40 }
41 }
42
43 pub fn query(&self, json_line: &str) -> Result<Option<String>> {
44 match self {
45 QueryEngine::Simple(path) => self.query_simple(json_line, path),
46 QueryEngine::JmesPath(expr) => self.query_jmespath(json_line, expr),
47 }
48 }
49
50 fn query_simple(&self, json_line: &str, path: &[String]) -> Result<Option<String>> {
51 let data: sonic_rs::Value = sonic_rs::from_str(json_line)?;
52
53 let mut current = &data;
54 for key in path {
55 match current.get(key) {
56 Some(v) => current = v,
57 None => return Ok(None),
58 }
59 }
60
61 if current.is_null() {
62 return Ok(None);
63 }
64
65 let output = if current.is_str() {
66 current.as_str().unwrap().to_string()
67 } else {
68 sonic_rs::to_string(current)?
69 };
70
71 Ok(Some(output))
72 }
73
74 fn query_jmespath(
75 &self,
76 json_line: &str,
77 expr: &jmespath::Expression<'_>,
78 ) -> Result<Option<String>> {
79 let data = jmespath::Variable::from_json(json_line)
80 .map_err(|e| anyhow::anyhow!("Failed to parse JSON: {}", e))?;
81
82 let result = expr
83 .search(&data)
84 .map_err(|e| anyhow::anyhow!("JMESPath search failed: {}", e))?;
85
86 if result.is_null() {
87 return Ok(None);
88 }
89
90 let output = if result.is_string() {
91 result.as_string().unwrap().to_string()
92 } else {
93 // Convert jmespath result to JSON string
94 // Note: jmespath uses serde_json internally, so we use serde_json here (not bundle/operation data)
95 serde_json::to_string(&*result)
96 .map_err(|e| anyhow::anyhow!("Failed to serialize result: {}", e))?
97 };
98
99 Ok(Some(output))
100 }
101}
102
103/// Main processor for PLC bundles
104pub struct Processor {
105 options: Options,
106 query_engine: QueryEngine,
107}
108
109pub trait OutputHandler: Send + Sync {
110 fn write_batch(&self, batch: &str) -> Result<()>;
111}
112
113pub struct OutputBuffer {
114 buffer: String,
115 capacity: usize,
116 count: usize,
117 matched_bytes: u64,
118}
119
120impl OutputBuffer {
121 pub fn new(capacity: usize) -> Self {
122 Self {
123 buffer: String::with_capacity(capacity * 100),
124 capacity,
125 count: 0,
126 matched_bytes: 0,
127 }
128 }
129
130 pub fn push(&mut self, line: &str) -> bool {
131 self.matched_bytes += line.len() as u64 + 1;
132 self.buffer.push_str(line);
133 self.buffer.push('\n');
134 self.count += 1;
135 self.count >= self.capacity
136 }
137
138 pub fn flush(&mut self) -> String {
139 self.count = 0;
140 std::mem::replace(&mut self.buffer, String::with_capacity(self.capacity * 100))
141 }
142
143 pub fn is_empty(&self) -> bool {
144 self.count == 0
145 }
146
147 pub fn get_matched_bytes(&self) -> u64 {
148 self.matched_bytes
149 }
150}
151
152impl Processor {
153 /// Create a new processor with the given options
154 pub fn new(options: Options) -> Result<Self> {
155 let query_engine = QueryEngine::new(&options.query, options.query_mode)?;
156
157 Ok(Self {
158 options,
159 query_engine,
160 })
161 }
162
163 /// Load the bundle index
164 pub fn load_index(&self) -> Result<Index> {
165 Index::load(&self.options.directory)
166 }
167
168 /// Get the directory path
169 pub fn directory(&self) -> &PathBuf {
170 &self.options.directory
171 }
172
173 /// Process specified bundles
174 pub fn process<F>(
175 &self,
176 bundle_numbers: &[u32],
177 output_handler: Arc<dyn OutputHandler>,
178 progress_callback: Option<F>,
179 ) -> Result<Stats>
180 where
181 F: Fn(usize, &Stats) + Send + Sync,
182 {
183 if self.options.num_threads > 1 {
184 rayon::ThreadPoolBuilder::new()
185 .num_threads(self.options.num_threads)
186 .build_global()?;
187 }
188
189 let total_stats = Arc::new(Mutex::new(Stats::default()));
190 let callback = Arc::new(progress_callback);
191
192 let process_fn = |bundle_num: &u32| -> Result<()> {
193 let bundle_path = constants::bundle_path(&self.options.directory, *bundle_num);
194
195 if !bundle_path.exists() {
196 return Ok(());
197 }
198
199 let stats = self.process_bundle(&bundle_path, &output_handler)?;
200
201 let mut total = total_stats.lock().unwrap();
202 total.operations += stats.operations;
203 total.matches += stats.matches;
204 total.total_bytes += stats.total_bytes;
205 total.matched_bytes += stats.matched_bytes;
206
207 if let Some(ref cb) = *callback {
208 cb(1, &total);
209 }
210
211 Ok(())
212 };
213
214 if self.options.num_threads == 1 {
215 bundle_numbers.iter().try_for_each(process_fn)?;
216 } else {
217 bundle_numbers.par_iter().try_for_each(process_fn)?;
218 }
219
220 let final_stats = total_stats.lock().unwrap().clone();
221 Ok(final_stats)
222 }
223
224 fn process_bundle(
225 &self,
226 bundle_path: &PathBuf,
227 output_handler: &Arc<dyn OutputHandler>,
228 ) -> Result<Stats> {
229 let file = File::open(bundle_path)?;
230 let decoder = zstd::Decoder::new(file)?;
231 let reader = BufReader::with_capacity(1024 * 1024, decoder);
232
233 let mut stats = Stats::default();
234 let mut output_buffer = OutputBuffer::new(self.options.batch_size);
235
236 for line in reader.lines() {
237 let line = line?;
238 if line.is_empty() {
239 continue;
240 }
241
242 stats.operations += 1;
243 stats.total_bytes += line.len() as u64 + 1;
244
245 if let Some(result) = self.query_engine.query(&line)? {
246 stats.matches += 1;
247 if output_buffer.push(&result) {
248 output_handler.write_batch(&output_buffer.flush())?;
249 }
250 }
251 }
252
253 if !output_buffer.is_empty() {
254 output_handler.write_batch(&output_buffer.flush())?;
255 }
256
257 stats.matched_bytes = output_buffer.get_matched_bytes();
258 Ok(stats)
259 }
260}
261
262/// Resolve bundle keyword to concrete bundle number
263fn resolve_bundle_keyword(keyword: &str, max_bundle: u32) -> Result<u32> {
264 let keyword = keyword.trim();
265
266 if keyword == "root" {
267 return Ok(1);
268 }
269
270 if keyword == "head" {
271 return Ok(max_bundle);
272 }
273
274 // Handle "head~N" syntax
275 if let Some(rest) = keyword.strip_prefix("head~") {
276 let offset: u32 = rest
277 .parse()
278 .map_err(|_| anyhow::anyhow!("Invalid offset in 'head~{}': expected a number", rest))?;
279 if offset >= max_bundle {
280 anyhow::bail!(
281 "Offset {} in 'head~{}' exceeds maximum bundle {}",
282 offset,
283 rest,
284 max_bundle
285 );
286 }
287 return Ok(max_bundle - offset);
288 }
289
290 // Handle "~N" shorthand syntax
291 if let Some(rest) = keyword.strip_prefix('~') {
292 let offset: u32 = rest
293 .parse()
294 .map_err(|_| anyhow::anyhow!("Invalid offset in '~{}': expected a number", rest))?;
295 if offset >= max_bundle {
296 anyhow::bail!(
297 "Offset {} in '~{}' exceeds maximum bundle {}",
298 offset,
299 rest,
300 max_bundle
301 );
302 }
303 return Ok(max_bundle - offset);
304 }
305
306 // Not a keyword, try parsing as number
307 let num: u32 = keyword.parse().map_err(|_| {
308 anyhow::anyhow!(
309 "Invalid bundle specifier: '{}' (expected number, 'root', 'head', 'head~N', or '~N')",
310 keyword
311 )
312 })?;
313 Ok(num)
314}
315
316/// Parse bundle range specification (e.g., "1-10,15,20-25", "head", "head~5", "root")
317pub fn parse_bundle_range(spec: &str, max_bundle: u32) -> Result<Vec<u32>> {
318 if max_bundle == 0 {
319 anyhow::bail!("No bundles available");
320 }
321
322 let mut bundles = Vec::new();
323 for part in spec.split(',') {
324 let part = part.trim();
325 if part.is_empty() {
326 continue;
327 }
328
329 if part.contains('-') {
330 let range: Vec<&str> = part.split('-').collect();
331 if range.len() != 2 {
332 anyhow::bail!("Invalid range format: {}", part);
333 }
334 let start_str = range[0].trim();
335 let end_str = range[1].trim();
336
337 let start = resolve_bundle_keyword(start_str, max_bundle)?;
338 let end = resolve_bundle_keyword(end_str, max_bundle)?;
339
340 if start == 0 || end == 0 {
341 anyhow::bail!("Bundle number cannot be 0");
342 }
343 if start > end {
344 anyhow::bail!("Invalid range: {} > {} (start must be <= end)", start, end);
345 }
346 if start > max_bundle || end > max_bundle {
347 anyhow::bail!(
348 "Invalid range: {}-{} (exceeds maximum bundle {})",
349 start,
350 end,
351 max_bundle
352 );
353 }
354 bundles.extend(start..=end);
355 } else {
356 let num = resolve_bundle_keyword(part, max_bundle)?;
357 if num == 0 {
358 anyhow::bail!("Bundle number cannot be 0");
359 }
360 if num > max_bundle {
361 anyhow::bail!("Bundle number {} out of range (max: {})", num, max_bundle);
362 }
363 bundles.push(num);
364 }
365 }
366 bundles.sort_unstable();
367 bundles.dedup();
368 Ok(bundles)
369}
370
371/// Parse operation range specification (e.g., "0-999", "3255,553,0-9")
372/// Operations are 0-indexed global positions (0 = first operation, bundle 1 position 0)
373pub fn parse_operation_range(spec: &str, max_operation: u64) -> Result<Vec<u64>> {
374 use anyhow::Context;
375
376 if max_operation == 0 {
377 anyhow::bail!("No operations available");
378 }
379
380 let mut operations = Vec::new();
381 for part in spec.split(',') {
382 let part = part.trim();
383 if part.is_empty() {
384 continue;
385 }
386
387 if part.contains('-') {
388 let range: Vec<&str> = part.split('-').collect();
389 if range.len() != 2 {
390 anyhow::bail!("Invalid range format: {}", part);
391 }
392 let start_str = range[0].trim();
393 let end_str = range[1].trim();
394
395 let start: u64 = start_str
396 .parse()
397 .with_context(|| format!("Invalid operation number: {}", start_str))?;
398 let end: u64 = end_str
399 .parse()
400 .with_context(|| format!("Invalid operation number: {}", end_str))?;
401
402 if start > end {
403 anyhow::bail!("Invalid range: {} > {} (start must be <= end)", start, end);
404 }
405 if start > max_operation || end > max_operation {
406 anyhow::bail!(
407 "Invalid range: {}-{} (exceeds maximum operation {})",
408 start,
409 end,
410 max_operation
411 );
412 }
413 operations.extend(start..=end);
414 } else {
415 let num: u64 = part
416 .parse()
417 .with_context(|| format!("Invalid operation number: {}", part))?;
418 if num > max_operation {
419 anyhow::bail!(
420 "Operation number {} out of range (max: {})",
421 num,
422 max_operation
423 );
424 }
425 operations.push(num);
426 }
427 }
428 operations.sort_unstable();
429 operations.dedup();
430 Ok(operations)
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
438 fn test_query_engine_simple() {
439 let engine = QueryEngine::new("did", QueryMode::Simple).unwrap();
440 let json = r#"{"did": "did:plc:test", "operation": {"type": "create"}}"#;
441 let result = engine.query(json).unwrap();
442 assert_eq!(result, Some("did:plc:test".to_string()));
443 }
444
445 #[test]
446 fn test_query_engine_simple_nested() {
447 let engine = QueryEngine::new("operation.type", QueryMode::Simple).unwrap();
448 let json = r#"{"did": "did:plc:test", "operation": {"type": "create"}}"#;
449 let result = engine.query(json).unwrap();
450 assert_eq!(result, Some("create".to_string()));
451 }
452
453 #[test]
454 fn test_query_engine_simple_missing() {
455 let engine = QueryEngine::new("missing.field", QueryMode::Simple).unwrap();
456 let json = r#"{"did": "did:plc:test"}"#;
457 let result = engine.query(json).unwrap();
458 assert_eq!(result, None);
459 }
460
461 #[test]
462 fn test_query_engine_simple_null() {
463 let engine = QueryEngine::new("null_field", QueryMode::Simple).unwrap();
464 let json = r#"{"null_field": null}"#;
465 let result = engine.query(json).unwrap();
466 assert_eq!(result, None);
467 }
468
469 #[test]
470 fn test_query_engine_jmespath() {
471 let engine = QueryEngine::new("did", QueryMode::JmesPath).unwrap();
472 let json = r#"{"did": "did:plc:test", "operation": {"type": "create"}}"#;
473 let result = engine.query(json).unwrap();
474 assert_eq!(result, Some("did:plc:test".to_string()));
475 }
476
477 #[test]
478 fn test_query_engine_jmespath_nested() {
479 let engine = QueryEngine::new("operation.type", QueryMode::JmesPath).unwrap();
480 let json = r#"{"did": "did:plc:test", "operation": {"type": "create"}}"#;
481 let result = engine.query(json).unwrap();
482 assert_eq!(result, Some("create".to_string()));
483 }
484
485 #[test]
486 fn test_output_buffer_new() {
487 let buffer = OutputBuffer::new(100);
488 assert!(buffer.is_empty());
489 assert_eq!(buffer.get_matched_bytes(), 0);
490 }
491
492 #[test]
493 fn test_output_buffer_push() {
494 let mut buffer = OutputBuffer::new(2);
495 assert!(!buffer.push("line1")); // count = 1, capacity = 2, so false
496 // When we push line2, count becomes 2, which equals capacity, so returns true
497 assert!(buffer.push("line2")); // Should return true when capacity reached
498 // After flush, count resets to 0
499 buffer.flush();
500 assert!(!buffer.push("line3")); // count = 1 again
501 }
502
503 #[test]
504 fn test_output_buffer_flush() {
505 let mut buffer = OutputBuffer::new(10);
506 buffer.push("line1");
507 buffer.push("line2");
508
509 let flushed = buffer.flush();
510 assert_eq!(flushed, "line1\nline2\n");
511 assert!(buffer.is_empty());
512 }
513
514 #[test]
515 fn test_output_buffer_matched_bytes() {
516 let mut buffer = OutputBuffer::new(10);
517 buffer.push("line1");
518 buffer.push("line2");
519
520 // Each line adds len + 1 (for newline)
521 assert_eq!(buffer.get_matched_bytes(), 5 + 1 + 5 + 1);
522 }
523
524 #[test]
525 fn test_parse_bundle_range_single() {
526 let result = parse_bundle_range("1", 10).unwrap();
527 assert_eq!(result, vec![1]);
528 }
529
530 #[test]
531 fn test_parse_bundle_range_multiple() {
532 let result = parse_bundle_range("1,3,5", 10).unwrap();
533 assert_eq!(result, vec![1, 3, 5]);
534 }
535
536 #[test]
537 fn test_parse_bundle_range_range() {
538 let result = parse_bundle_range("1-3", 10).unwrap();
539 assert_eq!(result, vec![1, 2, 3]);
540 }
541
542 #[test]
543 fn test_parse_bundle_range_mixed() {
544 let result = parse_bundle_range("1,3-5,7", 10).unwrap();
545 assert_eq!(result, vec![1, 3, 4, 5, 7]);
546 }
547
548 #[test]
549 fn test_parse_bundle_range_dedup() {
550 let result = parse_bundle_range("1,1,2,2", 10).unwrap();
551 assert_eq!(result, vec![1, 2]);
552 }
553
554 #[test]
555 fn test_parse_bundle_range_empty_max() {
556 let result = parse_bundle_range("1", 0);
557 assert!(result.is_err());
558 assert!(
559 result
560 .unwrap_err()
561 .to_string()
562 .contains("No bundles available")
563 );
564 }
565
566 #[test]
567 fn test_parse_bundle_range_out_of_range() {
568 let result = parse_bundle_range("11", 10);
569 assert!(result.is_err());
570 assert!(result.unwrap_err().to_string().contains("out of range"));
571 }
572
573 #[test]
574 fn test_parse_bundle_range_invalid_range() {
575 let result = parse_bundle_range("5-3", 10);
576 assert!(result.is_err());
577 assert!(
578 result
579 .unwrap_err()
580 .to_string()
581 .contains("start must be <= end")
582 );
583 }
584
585 #[test]
586 fn test_parse_bundle_range_invalid_format() {
587 let result = parse_bundle_range("1-2-3", 10);
588 assert!(result.is_err());
589 assert!(
590 result
591 .unwrap_err()
592 .to_string()
593 .contains("Invalid range format")
594 );
595 }
596
597 #[test]
598 fn test_parse_bundle_range_keyword_root() {
599 let result = parse_bundle_range("root", 10).unwrap();
600 assert_eq!(result, vec![1]);
601 }
602
603 #[test]
604 fn test_parse_bundle_range_keyword_head() {
605 let result = parse_bundle_range("head", 10).unwrap();
606 assert_eq!(result, vec![10]);
607 }
608
609 #[test]
610 fn test_parse_bundle_range_keyword_head_offset() {
611 let result = parse_bundle_range("head~3", 10).unwrap();
612 assert_eq!(result, vec![7]); // 10 - 3 = 7
613 }
614
615 #[test]
616 fn test_parse_bundle_range_keyword_shorthand_offset() {
617 let result = parse_bundle_range("~2", 10).unwrap();
618 assert_eq!(result, vec![8]); // 10 - 2 = 8
619 }
620
621 #[test]
622 fn test_parse_bundle_range_keyword_mixed() {
623 let result = parse_bundle_range("root,head,head~1", 10).unwrap();
624 assert_eq!(result, vec![1, 9, 10]);
625 }
626
627 #[test]
628 fn test_parse_bundle_range_keyword_head_offset_invalid() {
629 let result = parse_bundle_range("head~10", 10);
630 assert!(result.is_err());
631 assert!(
632 result
633 .unwrap_err()
634 .to_string()
635 .contains("exceeds maximum bundle")
636 );
637 }
638
639 #[test]
640 fn test_parse_bundle_range_keyword_shorthand_offset_invalid() {
641 let result = parse_bundle_range("~10", 10);
642 assert!(result.is_err());
643 assert!(
644 result
645 .unwrap_err()
646 .to_string()
647 .contains("exceeds maximum bundle")
648 );
649 }
650
651 #[test]
652 fn test_parse_bundle_range_keyword_invalid_offset() {
653 let result = parse_bundle_range("head~abc", 10);
654 assert!(result.is_err());
655 assert!(result.unwrap_err().to_string().contains("Invalid offset"));
656 }
657
658 #[test]
659 fn test_parse_operation_range_single() {
660 let result = parse_operation_range("0", 1000).unwrap();
661 assert_eq!(result, vec![0]);
662 }
663
664 #[test]
665 fn test_parse_operation_range_multiple() {
666 let result = parse_operation_range("0,5,10", 1000).unwrap();
667 assert_eq!(result, vec![0, 5, 10]);
668 }
669
670 #[test]
671 fn test_parse_operation_range_range() {
672 let result = parse_operation_range("0-2", 1000).unwrap();
673 assert_eq!(result, vec![0, 1, 2]);
674 }
675
676 #[test]
677 fn test_parse_operation_range_mixed() {
678 let result = parse_operation_range("0,5-7,10", 1000).unwrap();
679 assert_eq!(result, vec![0, 5, 6, 7, 10]);
680 }
681
682 #[test]
683 fn test_parse_operation_range_dedup() {
684 let result = parse_operation_range("0,0,1,1", 1000).unwrap();
685 assert_eq!(result, vec![0, 1]);
686 }
687
688 #[test]
689 fn test_parse_operation_range_empty_max() {
690 let result = parse_operation_range("0", 0);
691 assert!(result.is_err());
692 assert!(
693 result
694 .unwrap_err()
695 .to_string()
696 .contains("No operations available")
697 );
698 }
699
700 #[test]
701 fn test_parse_operation_range_out_of_range() {
702 let result = parse_operation_range("1001", 1000);
703 assert!(result.is_err());
704 assert!(result.unwrap_err().to_string().contains("out of range"));
705 }
706
707 #[test]
708 fn test_parse_operation_range_invalid_range() {
709 let result = parse_operation_range("10-5", 1000);
710 assert!(result.is_err());
711 assert!(
712 result
713 .unwrap_err()
714 .to_string()
715 .contains("start must be <= end")
716 );
717 }
718
719 #[test]
720 fn test_parse_operation_range_invalid_format() {
721 let result = parse_operation_range("1-2-3", 1000);
722 assert!(result.is_err());
723 assert!(
724 result
725 .unwrap_err()
726 .to_string()
727 .contains("Invalid range format")
728 );
729 }
730
731 #[test]
732 fn test_parse_operation_range_invalid_number() {
733 let result = parse_operation_range("abc", 1000);
734 assert!(result.is_err());
735 assert!(
736 result
737 .unwrap_err()
738 .to_string()
739 .contains("Invalid operation number")
740 );
741 }
742
743 #[test]
744 fn test_stats_default() {
745 let stats = Stats::default();
746 assert_eq!(stats.operations, 0);
747 assert_eq!(stats.matches, 0);
748 assert_eq!(stats.total_bytes, 0);
749 assert_eq!(stats.matched_bytes, 0);
750 }
751}