High-performance implementation of plcbundle written in Rust
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

update

+1195 -18
+3
README.md
··· 1 + # plcbundle-rs 2 + 3 + Main purpose of this package is to provide universal and simple interface, as Rust library or C/GO bindings for manipulation with plcbundle repository.
+109
bindings/go/README.md
··· 1 + # PLC Bundle Go Bindings 2 + 3 + High-performance Go bindings for the PLC Bundle Rust library. 4 + 5 + ## Installation 6 + 7 + ```bash 8 + # Build the Rust library first 9 + cd ../.. 10 + cargo build --release 11 + ``` 12 + 13 + ## Quick Start 14 + 15 + ```go 16 + import "plcbundle" 17 + 18 + manager, err := plcbundle.NewBundleManager("./bundles") 19 + if err != nil { 20 + log.Fatal(err) 21 + } 22 + defer manager.Close() 23 + ``` 24 + 25 + ## API Reference 26 + 27 + ### BundleManager Methods 28 + 29 + - `NewBundleManager(bundleDir string) (*BundleManager, error)` - Create a new manager 30 + - `Close()` - Free resources 31 + 32 + **Loading:** 33 + - `LoadBundle(bundleNum uint32, opts *LoadOptions) (*LoadResult, error)` 34 + - `GetOperationsBatch(requests []OperationRequest) ([]Operation, error)` 35 + 36 + **DID Operations:** 37 + - `GetDIDOperations(did string) ([]Operation, error)` 38 + - `BatchResolveDIDs(dids []string) (map[string][]Operation, error)` 39 + 40 + **Query:** 41 + - `Query(queryStr string, bundleStart, bundleEnd uint32) ([]Operation, error)` 42 + 43 + **Export:** 44 + - `Export(spec ExportSpec, opts *ExportOptions) (*ExportStats, error)` 45 + 46 + **Verification:** 47 + - `VerifyBundle(bundleNum uint32, checkHash, checkChain bool) (*VerifyResult, error)` 48 + - `VerifyChain(startBundle, endBundle uint32) error` 49 + 50 + **Information:** 51 + - `GetBundleInfo(bundleNum uint32, includeOperations, includeDIDs bool) (*BundleInfo, error)` 52 + 53 + **Cache Management:** 54 + - `PrefetchBundles(bundleNums []uint32)` 55 + - `WarmUp(strategy WarmUpStrategy, startBundle, endBundle uint32) error` 56 + - `ClearCaches()` 57 + 58 + **DID Index:** 59 + - `RebuildDIDIndex(progressCallback func(uint32, uint64)) (*RebuildStats, error)` 60 + - `GetDIDIndexStats() (*DIDIndexStats, error)` 61 + 62 + **Observability:** 63 + - `GetStats() (*ManagerStats, error)` 64 + 65 + ## Types 66 + 67 + **Options & Results:** 68 + - `LoadOptions` - Bundle loading options 69 + - `LoadResult` - Load operation result 70 + - `ExportSpec` - Export configuration 71 + - `ExportOptions` - Export options (writer, progress callback) 72 + - `ExportStats` - Export statistics 73 + - `VerifyResult` - Verification result 74 + 75 + **Data:** 76 + - `Operation` - Operation data (DID, OpType, CID, JSON, etc.) 77 + - `OperationRequest` - Request for specific operation 78 + - `BundleInfo` - Bundle metadata 79 + 80 + **Statistics:** 81 + - `ManagerStats` - Manager performance stats 82 + - `DIDIndexStats` - DID index statistics 83 + - `RebuildStats` - Index rebuild statistics 84 + 85 + **Constants:** 86 + - `WarmUpStrategy` - WarmUpRecent, WarmUpAll, WarmUpRange 87 + 88 + ## Examples 89 + 90 + **Export:** 91 + ```go 92 + spec := plcbundle.ExportSpec{ 93 + BundleStart: 1, 94 + BundleEnd: 100, 95 + Format: 0, // 0=jsonl, 1=json, 2=csv 96 + } 97 + opts := &plcbundle.ExportOptions{Writer: os.Stdout} 98 + stats, err := manager.Export(spec, opts) 99 + ``` 100 + 101 + **Query:** 102 + ```go 103 + ops, err := manager.Query("@", 1, 100) // All operations from bundles 1-100 104 + ``` 105 + 106 + **DID Lookup:** 107 + ```go 108 + ops, err := manager.GetDIDOperations("did:plc:example") 109 + ```
-2
bindings/go/callback.c
··· 1 - // Callback utilities - currently unused 2 - // This file can be removed or used for future callback implementations
+136
bindings/go/cmd/example/main.go
··· 3 3 import ( 4 4 "flag" 5 5 "fmt" 6 + "io" 6 7 "log" 7 8 "os" 8 9 "strconv" ··· 29 30 fmt.Println(" warm-up Warm up the cache") 30 31 fmt.Println(" rebuild-index Rebuild the DID index") 31 32 fmt.Println(" index-stats Display DID index statistics") 33 + fmt.Println(" export Export operations to file or stdout") 32 34 fmt.Println(" help Show this help message") 33 35 fmt.Println("\nExamples:") 34 36 fmt.Println(" plcbundle-go -dir test_bundles load -bundle 1") ··· 107 109 cmdRebuildIndex(manager, args[1:]) 108 110 case "index-stats": 109 111 cmdIndexStats(manager, args[1:]) 112 + case "export": 113 + cmdExport(manager, args[1:]) 110 114 case "help": 111 115 printUsage() 112 116 default: ··· 487 491 fmt.Printf("Index Size: %d bytes (%.2f MB)\n", 488 492 stats.IndexSizeBytes, float64(stats.IndexSizeBytes)/1024/1024) 489 493 } 494 + 495 + func cmdExport(manager *plcbundle.BundleManager, args []string) { 496 + fs := flag.NewFlagSet("export", flag.ExitOnError) 497 + rangeFlag := fs.String("range", "", "Bundle range (e.g., \"1-100\")") 498 + all := fs.Bool("all", false, "Export all bundles") 499 + count := fs.Uint64("count", 0, "Limit number of operations (0 = no limit)") 500 + after := fs.String("after", "", "Export operations after timestamp (ISO 8601)") 501 + did := fs.String("did", "", "Filter by DID") 502 + opType := fs.String("op-type", "", "Filter by operation type") 503 + format := fs.String("f", "jsonl", "Output format: jsonl, json, csv") 504 + output := fs.String("o", "", "Output file (default: stdout)") 505 + quiet := fs.Bool("q", false, "Suppress progress output") 506 + 507 + fs.Usage = func() { 508 + fmt.Fprintf(os.Stderr, "Export operations from bundles\n\n") 509 + fmt.Fprintf(os.Stderr, "Usage: plcbundle-go export [OPTIONS]\n\n") 510 + fmt.Fprintf(os.Stderr, "Options:\n") 511 + fs.PrintDefaults() 512 + fmt.Fprintf(os.Stderr, "\nExamples:\n") 513 + fmt.Fprintf(os.Stderr, " plcbundle-go export --range 1-100\n") 514 + fmt.Fprintf(os.Stderr, " plcbundle-go export --all --count 50000\n") 515 + fmt.Fprintf(os.Stderr, " plcbundle-go export --all --after 2024-01-01T00:00:00Z\n") 516 + fmt.Fprintf(os.Stderr, " plcbundle-go export --range 1-100 --count 10000 --after 2024-01-01T00:00:00Z\n") 517 + } 518 + 519 + fs.Parse(args) 520 + 521 + // Determine bundle range 522 + var bundleStart, bundleEnd uint32 523 + exportAll := *all 524 + 525 + if !exportAll { 526 + if *rangeFlag == "" { 527 + log.Fatal("Must specify either --range or --all") 528 + } 529 + 530 + // Parse range like "1-100" 531 + parts := strings.Split(*rangeFlag, "-") 532 + if len(parts) == 2 { 533 + start, err := strconv.ParseUint(parts[0], 10, 32) 534 + if err != nil { 535 + log.Fatalf("Invalid range start: %v", err) 536 + } 537 + end, err := strconv.ParseUint(parts[1], 10, 32) 538 + if err != nil { 539 + log.Fatalf("Invalid range end: %v", err) 540 + } 541 + bundleStart = uint32(start) 542 + bundleEnd = uint32(end) 543 + } else { 544 + // Single bundle 545 + num, err := strconv.ParseUint(*rangeFlag, 10, 32) 546 + if err != nil { 547 + log.Fatalf("Invalid bundle number: %v", err) 548 + } 549 + bundleStart = uint32(num) 550 + bundleEnd = uint32(num) 551 + } 552 + } 553 + 554 + // Determine format 555 + var formatCode uint8 556 + switch *format { 557 + case "jsonl": 558 + formatCode = 0 559 + case "json": 560 + formatCode = 1 561 + case "csv": 562 + formatCode = 2 563 + default: 564 + log.Fatalf("Invalid format: %s (use jsonl, json, or csv)", *format) 565 + } 566 + 567 + // Open output 568 + var writer io.Writer 569 + if *output == "" { 570 + writer = os.Stdout 571 + } else { 572 + file, err := os.Create(*output) 573 + if err != nil { 574 + log.Fatalf("Failed to create output file: %v", err) 575 + } 576 + defer file.Close() 577 + writer = file 578 + } 579 + 580 + // Create export spec 581 + spec := plcbundle.ExportSpec{ 582 + BundleStart: bundleStart, 583 + BundleEnd: bundleEnd, 584 + ExportAll: exportAll, 585 + Format: formatCode, 586 + CountLimit: *count, 587 + AfterTimestamp: *after, 588 + DIDFilter: *did, 589 + OpTypeFilter: *opType, 590 + } 591 + 592 + // Progress callback 593 + var progressCallback func(uint64, uint64) 594 + if !*quiet { 595 + progressCallback = func(records, bytes uint64) { 596 + if records%10000 == 0 { 597 + fmt.Fprintf(os.Stderr, "\r Exported: %d operations (%.2f MB)", records, float64(bytes)/1024/1024) 598 + } 599 + } 600 + } 601 + 602 + // Export options 603 + opts := &plcbundle.ExportOptions{ 604 + Writer: writer, 605 + Progress: progressCallback, 606 + } 607 + 608 + if !*quiet { 609 + if exportAll { 610 + fmt.Fprintf(os.Stderr, "📤 Exporting all bundles...\n") 611 + } else { 612 + fmt.Fprintf(os.Stderr, "📤 Exporting bundles %d-%d...\n", bundleStart, bundleEnd) 613 + } 614 + } 615 + 616 + stats, err := manager.Export(spec, opts) 617 + if err != nil { 618 + log.Fatalf("Export failed: %v", err) 619 + } 620 + 621 + if !*quiet { 622 + fmt.Fprintf(os.Stderr, "\r Exported: %d operations (%.2f MB)\n", stats.RecordsWritten, float64(stats.BytesWritten)/1024/1024) 623 + fmt.Fprintf(os.Stderr, "✅ Export complete\n") 624 + } 625 + }
+11
bindings/go/export_bridge.c
··· 1 + #include "plcbundle.h" 2 + #include <stddef.h> 3 + 4 + // This function will be called by Rust, and it will call the Go function 5 + // The Go function is exported via //export, so it's available as a C function 6 + int exportCallbackBridge(const char* data, size_t len, void* user_data) { 7 + // The Go-exported function will be linked in 8 + extern int exportCallbackGo(const char* data, size_t len, void* user_data); 9 + return exportCallbackGo(data, len, user_data); 10 + } 11 +
+171
bindings/go/plcbundle.go
··· 1 1 package plcbundle 2 2 3 3 // #cgo LDFLAGS: -L../../target/release -lplcbundle -lm -ldl -lpthread 4 + // #cgo CFLAGS: -I. 4 5 // #include <stdlib.h> 6 + // #include <stddef.h> 5 7 // #include "plcbundle.h" 8 + // extern int exportCallbackBridge(const char* data, size_t len, void* user_data); 6 9 import "C" 7 10 import ( 8 11 "fmt" 12 + "io" 9 13 "runtime" 14 + "strings" 15 + "sync" 10 16 "unsafe" 11 17 ) 12 18 ··· 77 83 OperationsIndexed uint64 78 84 UniqueDIDs uint 79 85 DurationMs uint64 86 + } 87 + 88 + type ExportSpec struct { 89 + BundleStart uint32 90 + BundleEnd uint32 91 + ExportAll bool 92 + Format uint8 // 0=jsonl, 1=json, 2=csv 93 + CountLimit uint64 // 0 = no limit 94 + AfterTimestamp string // Empty = no filter 95 + DIDFilter string // Empty = no filter 96 + OpTypeFilter string // Empty = no filter 97 + } 98 + 99 + type ExportStats struct { 100 + RecordsWritten uint64 101 + BytesWritten uint64 80 102 } 81 103 82 104 type WarmUpStrategy uint8 ··· 463 485 } 464 486 465 487 return stats, nil 488 + } 489 + 490 + // ============================================================================ 491 + // Export 492 + // ============================================================================ 493 + 494 + // ExportOptions configures export behavior 495 + type ExportOptions struct { 496 + // Writer receives exported data in batches 497 + Writer io.Writer 498 + // Progress callback (optional) 499 + Progress func(records, bytes uint64) 500 + } 501 + 502 + // Export exports operations according to the spec 503 + func (m *BundleManager) Export(spec ExportSpec, opts *ExportOptions) (*ExportStats, error) { 504 + if opts == nil || opts.Writer == nil { 505 + return nil, fmt.Errorf("writer is required") 506 + } 507 + 508 + // Convert Go spec to C spec 509 + cSpec := C.CExportSpec{ 510 + bundle_start: C.uint32_t(spec.BundleStart), 511 + bundle_end: C.uint32_t(spec.BundleEnd), 512 + export_all: C._Bool(spec.ExportAll), 513 + format: C.uint8_t(spec.Format), 514 + count_limit: C.uint64_t(spec.CountLimit), 515 + after_timestamp: nil, 516 + did_filter: nil, 517 + op_type_filter: nil, 518 + } 519 + 520 + // Convert string filters to C strings 521 + var afterTs, didFilter, opTypeFilter *C.char 522 + if spec.AfterTimestamp != "" { 523 + afterTs = C.CString(spec.AfterTimestamp) 524 + defer C.free(unsafe.Pointer(afterTs)) 525 + cSpec.after_timestamp = afterTs 526 + } 527 + 528 + if spec.DIDFilter != "" { 529 + didFilter = C.CString(spec.DIDFilter) 530 + defer C.free(unsafe.Pointer(didFilter)) 531 + cSpec.did_filter = didFilter 532 + } 533 + 534 + if spec.OpTypeFilter != "" { 535 + opTypeFilter = C.CString(spec.OpTypeFilter) 536 + defer C.free(unsafe.Pointer(opTypeFilter)) 537 + cSpec.op_type_filter = opTypeFilter 538 + } 539 + 540 + // Create callback context 541 + ctx := &exportCallbackCtx{ 542 + writer: opts.Writer, 543 + progress: opts.Progress, 544 + } 545 + 546 + var cStats C.CExportStats 547 + 548 + // Use a global callback registry to work around cgo limitations 549 + exportCallbackID := registerExportCallback(ctx) 550 + defer unregisterExportCallback(exportCallbackID) 551 + 552 + // Use C bridge function that calls the Go-exported function 553 + result := C.bundle_manager_export( 554 + m.ptr, 555 + &cSpec, 556 + (C.ExportCallback)(C.exportCallbackBridge), 557 + unsafe.Pointer(uintptr(exportCallbackID)), 558 + &cStats, 559 + ) 560 + 561 + if result != 0 { 562 + return nil, fmt.Errorf("export failed") 563 + } 564 + 565 + stats := &ExportStats{ 566 + RecordsWritten: uint64(cStats.records_written), 567 + BytesWritten: uint64(cStats.bytes_written), 568 + } 569 + 570 + return stats, nil 571 + } 572 + 573 + // ============================================================================ 574 + // Export callback registry 575 + // ============================================================================ 576 + 577 + type exportCallbackCtx struct { 578 + writer io.Writer 579 + progress func(uint64, uint64) 580 + records uint64 581 + bytes uint64 582 + } 583 + 584 + var ( 585 + exportCallbacks = make(map[uintptr]*exportCallbackCtx) 586 + exportCallbackMux sync.Mutex 587 + exportCallbackID uintptr = 1 588 + ) 589 + 590 + func registerExportCallback(ctx *exportCallbackCtx) uintptr { 591 + exportCallbackMux.Lock() 592 + defer exportCallbackMux.Unlock() 593 + id := exportCallbackID 594 + exportCallbackID++ 595 + exportCallbacks[id] = ctx 596 + return id 597 + } 598 + 599 + func unregisterExportCallback(id uintptr) { 600 + exportCallbackMux.Lock() 601 + defer exportCallbackMux.Unlock() 602 + delete(exportCallbacks, id) 603 + } 604 + 605 + //export exportCallbackGo 606 + func exportCallbackGo(data *C.char, len C.size_t, userData unsafe.Pointer) C.int { 607 + id := uintptr(userData) 608 + exportCallbackMux.Lock() 609 + ctx, ok := exportCallbacks[id] 610 + exportCallbackMux.Unlock() 611 + 612 + if !ok || ctx == nil { 613 + return 1 614 + } 615 + 616 + if data == nil || len == 0 { 617 + return 0 618 + } 619 + 620 + // Copy C data to Go slice 621 + goData := C.GoBytes(unsafe.Pointer(data), C.int(len)) 622 + _, err := ctx.writer.Write(goData) 623 + if err != nil { 624 + return 1 // Stop on error 625 + } 626 + 627 + ctx.bytes += uint64(len) 628 + // Estimate records (rough - count newlines) 629 + ctx.records += uint64(strings.Count(string(goData), "\n")) 630 + 631 + // Call progress callback if provided 632 + if ctx.progress != nil { 633 + ctx.progress(ctx.records, ctx.bytes) 634 + } 635 + 636 + return 0 // Continue 466 637 } 467 638 468 639 // ============================================================================
+36
bindings/go/plcbundle.h
··· 84 84 uint64_t duration_ms; 85 85 } CRebuildStats; 86 86 87 + typedef struct { 88 + uint32_t bundle_start; 89 + uint32_t bundle_end; 90 + bool export_all; 91 + uint8_t format; // 0=jsonl, 1=json, 2=csv 92 + uint64_t count_limit; // 0 = no limit 93 + const char* after_timestamp; // NULL = no filter 94 + const char* did_filter; // NULL = no filter 95 + const char* op_type_filter; // NULL = no filter 96 + } CExportSpec; 97 + 98 + typedef struct { 99 + uint64_t records_written; 100 + uint64_t bytes_written; 101 + } CExportStats; 102 + 87 103 // Warm-up strategies 88 104 #define WARMUP_RECENT 0 89 105 #define WARMUP_ALL 1 ··· 300 316 int bundle_manager_get_stats( 301 317 CBundleManager manager, 302 318 CManagerStats* out_stats 319 + ); 320 + 321 + // ============================================================================ 322 + // Export 323 + // ============================================================================ 324 + 325 + /** 326 + * Export operations with streaming callback. 327 + * callback(data, len, user_data) is called for each batch of data. 328 + * Returns 0 to continue, non-zero to stop. 329 + * Returns 0 on success, -1 on error. 330 + */ 331 + typedef int (*ExportCallback)(const char* data, size_t len, void* user_data); 332 + 333 + int bundle_manager_export( 334 + CBundleManager manager, 335 + const CExportSpec* spec, 336 + ExportCallback callback, 337 + void* user_data, 338 + CExportStats* out_stats // NULL if stats not needed 303 339 ); 304 340 305 341 // ============================================================================
+315 -12
src/bin/plcbundle-rs.rs
··· 112 112 #[arg(short = 'd', long, default_value = ".")] 113 113 dir: PathBuf, 114 114 115 - /// Bundle range 115 + /// Bundle range (e.g., "1-100") 116 116 #[arg(short, long)] 117 + range: Option<String>, 118 + 119 + /// Export all bundles 120 + #[arg(long)] 121 + all: bool, 122 + 123 + /// Bundle range (legacy, use --range instead) 124 + #[arg(long, hide = true)] 117 125 bundles: Option<String>, 118 126 119 127 /// Output format 120 128 #[arg(short = 'f', long, default_value = "jsonl")] 121 129 format: ExportFormat, 122 130 123 - /// Output file 131 + /// Output file (default: stdout) 124 132 #[arg(short, long)] 125 - output: PathBuf, 133 + output: Option<PathBuf>, 134 + 135 + /// Limit number of operations to export 136 + #[arg(long)] 137 + count: Option<usize>, 138 + 139 + /// Export operations after this timestamp (ISO 8601 format) 140 + #[arg(long)] 141 + after: Option<String>, 126 142 127 143 /// Filter by DID 128 144 #[arg(long)] ··· 292 308 } 293 309 Commands::Export { 294 310 dir, 311 + range, 312 + all, 295 313 bundles, 296 314 format, 297 315 output, 316 + count, 317 + after, 298 318 did, 299 319 op_type, 300 320 compress, 301 321 } => { 302 - cmd_export(dir, bundles, format, output, did, op_type, compress, cli.quiet)?; 322 + cmd_export( 323 + dir, range, all, bundles, format, output, count, after, 324 + did, op_type, compress, cli.quiet, cli.verbose 325 + )?; 303 326 } 304 327 Commands::Stats { 305 328 dir, ··· 544 567 } 545 568 546 569 fn cmd_export( 547 - _dir: PathBuf, 548 - _bundles: Option<String>, 549 - _format: ExportFormat, 550 - _output: PathBuf, 551 - _did: Option<String>, 552 - _op_type: Option<String>, 570 + dir: PathBuf, 571 + range: Option<String>, 572 + all: bool, 573 + bundles: Option<String>, // Legacy flag 574 + format: ExportFormat, 575 + output: Option<PathBuf>, 576 + count: Option<usize>, 577 + after: Option<String>, 578 + did: Option<String>, 579 + op_type: Option<String>, 553 580 _compress: bool, 554 - _quiet: bool, 581 + quiet: bool, 582 + verbose: bool, 555 583 ) -> Result<()> { 556 - println!("Export not yet implemented"); 584 + use std::io::{BufWriter, Write}; 585 + use std::fs::File; 586 + use std::io::BufRead; 587 + 588 + // Load index to get max bundle and metadata 589 + let index = plcbundle::Index::load(&dir)?; 590 + let max_bundle = index.last_bundle; 591 + 592 + // Determine bundle numbers to process 593 + let bundle_numbers: Vec<u32> = if let Some(range_str) = range { 594 + // Parse range like "1-100" 595 + if range_str.contains('-') { 596 + let parts: Vec<&str> = range_str.split('-').collect(); 597 + if parts.len() == 2 { 598 + let start: u32 = parts[0].parse()?; 599 + let end: u32 = parts[1].parse()?; 600 + if start > end || start == 0 || end > max_bundle { 601 + anyhow::bail!("Invalid range: {}-{}", start, end); 602 + } 603 + (start..=end).collect() 604 + } else { 605 + anyhow::bail!("Invalid range format: {}", range_str); 606 + } 607 + } else { 608 + // Single bundle number 609 + let num: u32 = range_str.parse()?; 610 + if num == 0 || num > max_bundle { 611 + anyhow::bail!("Bundle number {} out of range", num); 612 + } 613 + vec![num] 614 + } 615 + } else if all { 616 + (1..=max_bundle).collect() 617 + } else if let Some(bundles_str) = bundles { 618 + // Legacy --bundles flag 619 + parse_bundle_spec(Some(bundles_str), max_bundle)? 620 + } else { 621 + anyhow::bail!("Must specify either --range, --all, or --bundles"); 622 + }; 623 + 624 + // Filter bundles by timestamp metadata if --after is specified 625 + let bundle_numbers: Vec<u32> = if let Some(ref after_ts) = after { 626 + bundle_numbers.into_iter() 627 + .filter_map(|num| { 628 + if let Some(meta) = index.get_bundle(num) { 629 + // Check if bundle's end_time is after the filter timestamp 630 + // If bundle ends before the filter, skip it 631 + if meta.end_time >= *after_ts { 632 + Some(num) 633 + } else { 634 + None 635 + } 636 + } else { 637 + Some(num) // Include if metadata not found (will be checked during processing) 638 + } 639 + }) 640 + .collect() 641 + } else { 642 + bundle_numbers 643 + }; 644 + 645 + if verbose && !quiet { 646 + eprintln!("📦 Index: v{} ({})", index.version, index.origin); 647 + eprintln!("📊 Processing {} bundles", bundle_numbers.len()); 648 + if let Some(ref count) = count { 649 + eprintln!("🔢 Export limit: {} operations", format_number(*count as u64)); 650 + } 651 + if let Some(ref after) = after { 652 + eprintln!("⏰ After timestamp: {}", after); 653 + } 654 + } 655 + 656 + // Open output with buffering 657 + let writer: Box<dyn Write> = if let Some(output_path) = output { 658 + Box::new(BufWriter::with_capacity(1024 * 1024, File::create(output_path)?)) 659 + } else { 660 + Box::new(BufWriter::with_capacity(1024 * 1024, io::stdout())) 661 + }; 662 + let mut writer = writer; 663 + 664 + if !quiet { 665 + eprintln!("📤 Exporting operations..."); 666 + } 667 + 668 + let start = Instant::now(); 669 + let mut exported_count = 0; 670 + let mut output_buffer = String::with_capacity(1024 * 1024); // 1MB buffer 671 + const BATCH_SIZE: usize = 10000; 672 + 673 + // Process bundles directly from files 674 + for bundle_num in bundle_numbers { 675 + // Check count limit 676 + if let Some(limit) = count { 677 + if exported_count >= limit { 678 + break; 679 + } 680 + } 681 + 682 + let bundle_path = dir.join(format!("{:06}.jsonl.zst", bundle_num)); 683 + if !bundle_path.exists() { 684 + continue; 685 + } 686 + 687 + // Open and decode bundle file 688 + let file = File::open(&bundle_path)?; 689 + let decoder = zstd::Decoder::new(file)?; 690 + let reader = std::io::BufReader::with_capacity(1024 * 1024, decoder); 691 + 692 + // Fast path: no filters and Jsonl format - just pass through lines 693 + let needs_parsing = after.is_some() || did.is_some() || op_type.is_some() || 694 + matches!(format, ExportFormat::Json | ExportFormat::Csv | ExportFormat::Parquet); 695 + 696 + if !needs_parsing { 697 + // Fast path: no parsing needed, just copy lines 698 + for line in reader.lines() { 699 + // Check count limit 700 + if let Some(limit) = count { 701 + if exported_count >= limit { 702 + break; 703 + } 704 + } 705 + 706 + let line = line?; 707 + if line.is_empty() { 708 + continue; 709 + } 710 + 711 + output_buffer.push_str(&line); 712 + output_buffer.push('\n'); 713 + exported_count += 1; 714 + 715 + // Flush buffer when it gets large 716 + if output_buffer.len() >= 1024 * 1024 { 717 + writer.write_all(output_buffer.as_bytes())?; 718 + output_buffer.clear(); 719 + } 720 + 721 + // Progress update 722 + if !quiet && exported_count % BATCH_SIZE == 0 { 723 + eprint!("\r Exported: {} operations", format_number(exported_count as u64)); 724 + io::stderr().flush()?; 725 + } 726 + } 727 + } else { 728 + // Slow path: need to parse for filtering or formatting 729 + use sonic_rs::JsonValueTrait; 730 + 731 + for line in reader.lines() { 732 + // Check count limit 733 + if let Some(limit) = count { 734 + if exported_count >= limit { 735 + break; 736 + } 737 + } 738 + 739 + let line = line?; 740 + if line.is_empty() { 741 + continue; 742 + } 743 + 744 + // Parse JSON using sonic-rs for faster parsing 745 + let data: sonic_rs::Value = match sonic_rs::from_str(&line) { 746 + Ok(data) => data, 747 + Err(_) => continue, // Skip invalid JSON 748 + }; 749 + 750 + // Apply filters using sonic-rs Value API 751 + if let Some(ref after_ts) = after { 752 + if let Some(created_at) = data.get("createdAt").or_else(|| data.get("created_at")) { 753 + if let Some(ts_str) = created_at.as_str() { 754 + if ts_str < after_ts.as_str() { 755 + continue; 756 + } 757 + } else { 758 + continue; 759 + } 760 + } else { 761 + continue; 762 + } 763 + } 764 + 765 + if let Some(ref did_filter) = did { 766 + if let Some(did_val) = data.get("did") { 767 + if let Some(did_str) = did_val.as_str() { 768 + if did_str != did_filter { 769 + continue; 770 + } 771 + } else { 772 + continue; 773 + } 774 + } else { 775 + continue; 776 + } 777 + } 778 + 779 + if let Some(ref op_type_filter) = op_type { 780 + if let Some(op_val) = data.get("operation") { 781 + let matches = if op_val.is_str() { 782 + op_val.as_str().unwrap() == op_type_filter 783 + } else if op_val.is_object() { 784 + if let Some(typ_val) = op_val.get("type") { 785 + typ_val.is_str() && typ_val.as_str().unwrap() == op_type_filter 786 + } else { 787 + false 788 + } 789 + } else { 790 + false 791 + }; 792 + if !matches { 793 + continue; 794 + } 795 + } else { 796 + continue; 797 + } 798 + } 799 + 800 + // Format operation 801 + let formatted = match format { 802 + ExportFormat::Jsonl => { 803 + // Already have the JSON string, use it directly 804 + line 805 + } 806 + ExportFormat::Json => { 807 + // Pretty print using sonic-rs 808 + sonic_rs::to_string_pretty(&data)? 809 + } 810 + ExportFormat::Csv => { 811 + let did = data.get("did").and_then(|v| v.as_str()).unwrap_or(""); 812 + let op = data.get("operation").map(|v| sonic_rs::to_string(v).unwrap_or_default()).unwrap_or_default(); 813 + let created_at = data.get("createdAt").or_else(|| data.get("created_at")) 814 + .and_then(|v| v.as_str()).unwrap_or(""); 815 + let nullified = data.get("nullified").and_then(|v| v.as_bool()).unwrap_or(false); 816 + format!("{},{},{},{}", did, op, created_at, nullified) 817 + } 818 + ExportFormat::Parquet => { 819 + // Fall back to JSON for now 820 + sonic_rs::to_string(&data)? 821 + } 822 + }; 823 + 824 + output_buffer.push_str(&formatted); 825 + output_buffer.push('\n'); 826 + exported_count += 1; 827 + 828 + // Flush buffer when it gets large 829 + if output_buffer.len() >= 1024 * 1024 { 830 + writer.write_all(output_buffer.as_bytes())?; 831 + output_buffer.clear(); 832 + } 833 + 834 + // Progress update 835 + if !quiet && exported_count % BATCH_SIZE == 0 { 836 + eprint!("\r Exported: {} operations", format_number(exported_count as u64)); 837 + io::stderr().flush()?; 838 + } 839 + } 840 + } 841 + } 842 + 843 + // Flush remaining buffer 844 + if !output_buffer.is_empty() { 845 + writer.write_all(output_buffer.as_bytes())?; 846 + } 847 + writer.flush()?; 848 + 849 + if !quiet { 850 + eprintln!("\r Exported: {} operations", format_number(exported_count as u64)); 851 + let elapsed = start.elapsed(); 852 + eprintln!("✅ Complete in {}", HumanDuration(elapsed)); 853 + if elapsed.as_secs_f64() > 0.0 { 854 + eprintln!(" Throughput: {:.0} ops/sec", 855 + exported_count as f64 / elapsed.as_secs_f64() 856 + ); 857 + } 858 + } 859 + 557 860 Ok(()) 558 861 } 559 862
+295
src/ffi.rs
··· 89 89 pub duration_ms: u64, 90 90 } 91 91 92 + #[repr(C)] 93 + pub struct CExportSpec { 94 + pub bundle_start: u32, 95 + pub bundle_end: u32, 96 + pub export_all: bool, 97 + pub format: u8, // 0=jsonl, 1=json, 2=csv 98 + pub count_limit: u64, // 0 = no limit 99 + pub after_timestamp: *const c_char, // NULL = no filter 100 + pub did_filter: *const c_char, // NULL = no filter 101 + pub op_type_filter: *const c_char, // NULL = no filter 102 + } 103 + 104 + #[repr(C)] 105 + pub struct CExportStats { 106 + pub records_written: u64, 107 + pub bytes_written: u64, 108 + } 109 + 92 110 // ============================================================================ 93 111 // BundleManager lifecycle 94 112 // ============================================================================ ··· 697 715 let _ = Vec::from_raw_parts(ops, count, count); 698 716 } 699 717 } 718 + } 719 + 720 + // ============================================================================ 721 + // Export 722 + // ============================================================================ 723 + 724 + /// Callback type for streaming export 725 + /// Returns 0 to continue, non-zero to stop 726 + pub type ExportCallback = extern "C" fn(data: *const c_char, len: usize, user_data: *mut std::ffi::c_void) -> i32; 727 + 728 + #[no_mangle] 729 + pub extern "C" fn bundle_manager_export( 730 + manager: *const CBundleManager, 731 + spec: *const CExportSpec, 732 + callback: ExportCallback, 733 + user_data: *mut std::ffi::c_void, 734 + out_stats: *mut CExportStats, 735 + ) -> i32 { 736 + if manager.is_null() || spec.is_null() || callback as usize == 0 { 737 + return -1; 738 + } 739 + 740 + let manager = unsafe { &*manager }; 741 + let spec = unsafe { &*spec }; 742 + 743 + use std::io::{BufRead, BufWriter, Write}; 744 + use std::fs::File; 745 + use crate::index::Index; 746 + 747 + // Get directory path 748 + let dir_path = manager.manager.directory().clone(); 749 + // Load index 750 + let index = match Index::load(&dir_path) { 751 + Ok(idx) => idx, 752 + Err(_) => return -1, 753 + }; 754 + 755 + // Determine bundle range 756 + let bundle_numbers: Vec<u32> = if spec.export_all { 757 + (1..=index.last_bundle).collect() 758 + } else if spec.bundle_end > 0 && spec.bundle_end >= spec.bundle_start { 759 + (spec.bundle_start..=spec.bundle_end).collect() 760 + } else { 761 + vec![spec.bundle_start] 762 + }; 763 + 764 + // Filter bundles by timestamp if specified 765 + let bundle_numbers: Vec<u32> = if !spec.after_timestamp.is_null() { 766 + let after_ts = unsafe { 767 + match CStr::from_ptr(spec.after_timestamp).to_str() { 768 + Ok(s) => s.to_string(), 769 + Err(_) => return -1, 770 + } 771 + }; 772 + bundle_numbers.into_iter() 773 + .filter_map(|num| { 774 + if let Some(meta) = index.get_bundle(num) { 775 + if meta.end_time >= after_ts { 776 + Some(num) 777 + } else { 778 + None 779 + } 780 + } else { 781 + Some(num) 782 + } 783 + }) 784 + .collect() 785 + } else { 786 + bundle_numbers 787 + }; 788 + 789 + let mut exported_count = 0u64; 790 + let mut bytes_written = 0u64; 791 + let mut output_buffer = Vec::with_capacity(1024 * 1024); 792 + 793 + // Determine if we need parsing 794 + let needs_parsing = !spec.after_timestamp.is_null() || 795 + !spec.did_filter.is_null() || 796 + !spec.op_type_filter.is_null() || 797 + spec.format == 1 || spec.format == 2; // JSON or CSV 798 + 799 + // Process bundles 800 + for bundle_num in bundle_numbers { 801 + if spec.count_limit > 0 && exported_count >= spec.count_limit { 802 + break; 803 + } 804 + 805 + let bundle_path = dir_path.join(format!("{:06}.jsonl.zst", bundle_num)); 806 + if !bundle_path.exists() { 807 + continue; 808 + } 809 + 810 + let file = match File::open(&bundle_path) { 811 + Ok(f) => f, 812 + Err(_) => continue, 813 + }; 814 + 815 + let decoder = match zstd::Decoder::new(file) { 816 + Ok(d) => d, 817 + Err(_) => continue, 818 + }; 819 + 820 + let reader = std::io::BufReader::with_capacity(1024 * 1024, decoder); 821 + 822 + if !needs_parsing { 823 + // Fast path: no parsing 824 + for line in reader.lines() { 825 + if spec.count_limit > 0 && exported_count >= spec.count_limit { 826 + break; 827 + } 828 + 829 + let line = match line { 830 + Ok(l) => l, 831 + Err(_) => continue, 832 + }; 833 + 834 + if line.is_empty() { 835 + continue; 836 + } 837 + 838 + output_buffer.extend_from_slice(line.as_bytes()); 839 + output_buffer.push(b'\n'); 840 + exported_count += 1; 841 + bytes_written += line.len() as u64 + 1; 842 + 843 + // Flush buffer when large 844 + if output_buffer.len() >= 1024 * 1024 { 845 + let result = callback(output_buffer.as_ptr() as *const c_char, output_buffer.len(), user_data); 846 + if result != 0 { 847 + break; 848 + } 849 + output_buffer.clear(); 850 + } 851 + } 852 + } else { 853 + // Slow path: parse and filter 854 + use sonic_rs::JsonValueTrait; 855 + 856 + let after_ts = if !spec.after_timestamp.is_null() { 857 + Some(unsafe { 858 + CStr::from_ptr(spec.after_timestamp).to_str().unwrap().to_string() 859 + }) 860 + } else { 861 + None 862 + }; 863 + 864 + let did_filter = if !spec.did_filter.is_null() { 865 + Some(unsafe { 866 + CStr::from_ptr(spec.did_filter).to_str().unwrap().to_string() 867 + }) 868 + } else { 869 + None 870 + }; 871 + 872 + let op_type_filter = if !spec.op_type_filter.is_null() { 873 + Some(unsafe { 874 + CStr::from_ptr(spec.op_type_filter).to_str().unwrap().to_string() 875 + }) 876 + } else { 877 + None 878 + }; 879 + 880 + for line in reader.lines() { 881 + if spec.count_limit > 0 && exported_count >= spec.count_limit { 882 + break; 883 + } 884 + 885 + let line = match line { 886 + Ok(l) => l, 887 + Err(_) => continue, 888 + }; 889 + 890 + if line.is_empty() { 891 + continue; 892 + } 893 + 894 + // Parse JSON 895 + let data: sonic_rs::Value = match sonic_rs::from_str(&line) { 896 + Ok(d) => d, 897 + Err(_) => continue, 898 + }; 899 + 900 + // Apply filters 901 + if let Some(ref after_ts) = after_ts { 902 + if let Some(created_at) = data.get("createdAt").or_else(|| data.get("created_at")) { 903 + if let Some(ts_str) = created_at.as_str() { 904 + if ts_str < after_ts.as_str() { 905 + continue; 906 + } 907 + } else { 908 + continue; 909 + } 910 + } else { 911 + continue; 912 + } 913 + } 914 + 915 + if let Some(ref did_filter) = did_filter { 916 + if let Some(did_val) = data.get("did") { 917 + if let Some(did_str) = did_val.as_str() { 918 + if did_str != did_filter { 919 + continue; 920 + } 921 + } else { 922 + continue; 923 + } 924 + } else { 925 + continue; 926 + } 927 + } 928 + 929 + if let Some(ref op_type_filter) = op_type_filter { 930 + if let Some(op_val) = data.get("operation") { 931 + let matches = if op_val.is_str() { 932 + op_val.as_str().unwrap() == op_type_filter 933 + } else if op_val.is_object() { 934 + if let Some(typ_val) = op_val.get("type") { 935 + typ_val.is_str() && typ_val.as_str().unwrap() == op_type_filter 936 + } else { 937 + false 938 + } 939 + } else { 940 + false 941 + }; 942 + if !matches { 943 + continue; 944 + } 945 + } else { 946 + continue; 947 + } 948 + } 949 + 950 + // Format 951 + let formatted = match spec.format { 952 + 0 => line, // JSONL - use original 953 + 1 => sonic_rs::to_string_pretty(&data).unwrap_or_default(), // JSON 954 + 2 => { 955 + let did = data.get("did").and_then(|v| v.as_str()).unwrap_or(""); 956 + let op = data.get("operation").map(|v| sonic_rs::to_string(v).unwrap_or_default()).unwrap_or_default(); 957 + let created_at = data.get("createdAt").or_else(|| data.get("created_at")) 958 + .and_then(|v| v.as_str()).unwrap_or(""); 959 + let nullified = data.get("nullified").and_then(|v| v.as_bool()).unwrap_or(false); 960 + format!("{},{},{},{}", did, op, created_at, nullified) 961 + } 962 + _ => line, 963 + }; 964 + 965 + output_buffer.extend_from_slice(formatted.as_bytes()); 966 + output_buffer.push(b'\n'); 967 + exported_count += 1; 968 + bytes_written += formatted.len() as u64 + 1; 969 + 970 + // Flush buffer when large 971 + if output_buffer.len() >= 1024 * 1024 { 972 + let result = callback(output_buffer.as_ptr() as *const c_char, output_buffer.len(), user_data); 973 + if result != 0 { 974 + break; 975 + } 976 + output_buffer.clear(); 977 + } 978 + } 979 + } 980 + } 981 + 982 + // Flush remaining buffer 983 + if !output_buffer.is_empty() { 984 + callback(output_buffer.as_ptr() as *const c_char, output_buffer.len(), user_data); 985 + } 986 + 987 + if !out_stats.is_null() { 988 + unsafe { 989 + (*out_stats).records_written = exported_count; 990 + (*out_stats).bytes_written = bytes_written; 991 + } 992 + } 993 + 994 + 0 700 995 }
+112 -4
src/iterators.rs
··· 91 91 pub struct ExportIterator { 92 92 manager: Arc<BundleManager>, 93 93 spec: ExportSpec, 94 - // TODO: Implement export iterator 94 + bundle_numbers: Vec<u32>, 95 + current_bundle_idx: usize, 96 + current_ops: Vec<Operation>, 97 + current_op_idx: usize, 98 + count_remaining: Option<usize>, 95 99 } 96 100 97 101 impl ExportIterator { 98 102 pub fn new(manager: Arc<BundleManager>, spec: ExportSpec) -> Self { 99 - Self { manager, spec } 103 + let bundle_numbers = match &spec.bundles { 104 + crate::manager::BundleRange::All => { 105 + let last = manager.get_last_bundle(); 106 + (1..=last).collect() 107 + } 108 + crate::manager::BundleRange::Single(n) => vec![*n], 109 + crate::manager::BundleRange::Range(start, end) => (*start..=*end).collect(), 110 + crate::manager::BundleRange::List(list) => list.clone(), 111 + }; 112 + 113 + Self { 114 + manager, 115 + spec, 116 + bundle_numbers, 117 + current_bundle_idx: 0, 118 + current_ops: Vec::new(), 119 + current_op_idx: 0, 120 + count_remaining: None, 121 + } 122 + } 123 + 124 + fn format_operation(&self, op: &Operation) -> Result<String> { 125 + match self.spec.format { 126 + crate::manager::ExportFormat::JsonLines => { 127 + Ok(serde_json::to_string(op)?) 128 + } 129 + crate::manager::ExportFormat::Csv => { 130 + // Simple CSV format - could be enhanced 131 + Ok(format!( 132 + "{},{},{},{}", 133 + op.did, 134 + op.operation, 135 + op.created_at, 136 + op.nullified 137 + )) 138 + } 139 + crate::manager::ExportFormat::Parquet => { 140 + // Parquet would require additional dependencies 141 + // For now, fall back to JSON 142 + Ok(serde_json::to_string(op)?) 143 + } 144 + } 145 + } 146 + 147 + fn matches_timestamp_filter(&self, op: &Operation) -> bool { 148 + if let Some(ref after) = self.spec.after_timestamp { 149 + // Compare timestamps as strings (ISO 8601 format) 150 + op.created_at >= *after 151 + } else { 152 + true 153 + } 100 154 } 101 155 } 102 156 ··· 104 158 type Item = Result<String>; 105 159 106 160 fn next(&mut self) -> Option<Self::Item> { 107 - // TODO: Implement 108 - None 161 + // Check count limit 162 + if let Some(ref mut remaining) = self.count_remaining { 163 + if *remaining == 0 { 164 + return None; 165 + } 166 + } 167 + 168 + loop { 169 + // Check if we have more operations in current bundle 170 + if self.current_op_idx < self.current_ops.len() { 171 + let op = &self.current_ops[self.current_op_idx]; 172 + self.current_op_idx += 1; 173 + 174 + // Apply timestamp filter 175 + if !self.matches_timestamp_filter(op) { 176 + continue; 177 + } 178 + 179 + // Decrement count if limit is set 180 + if let Some(ref mut remaining) = self.count_remaining { 181 + *remaining -= 1; 182 + } 183 + 184 + return Some(self.format_operation(op)); 185 + } 186 + 187 + // Load next bundle 188 + if self.current_bundle_idx >= self.bundle_numbers.len() { 189 + return None; 190 + } 191 + 192 + let bundle_num = self.bundle_numbers[self.current_bundle_idx]; 193 + self.current_bundle_idx += 1; 194 + 195 + // Initialize count_remaining on first bundle load 196 + if self.count_remaining.is_none() { 197 + self.count_remaining = self.spec.count; 198 + } 199 + 200 + match self.manager.load_bundle( 201 + bundle_num, 202 + LoadOptions { 203 + filter: self.spec.filter.clone(), 204 + ..Default::default() 205 + }, 206 + ) { 207 + Ok(result) => { 208 + self.current_ops = result.operations; 209 + self.current_op_idx = 0; 210 + } 211 + Err(_e) => { 212 + // Skip bundles that fail to load 213 + continue; 214 + } 215 + } 216 + } 109 217 } 110 218 }
+1
src/lib.rs
··· 17 17 pub use query::QueryEngine; 18 18 pub use index::{Index, BundleMetadata}; 19 19 pub use manager::{BundleManager, LoadOptions, LoadResult, QuerySpec, ExportSpec, 20 + BundleRange, ExportFormat, CompressionType, 20 21 VerifySpec, VerifyResult, ChainVerifySpec, ChainVerifyResult, 21 22 BundleInfo, InfoFlags, RollbackSpec, RollbackPlan, RollbackResult, 22 23 WarmUpSpec, WarmUpStrategy, RebuildStats, DIDIndexStats, ManagerStats};
+6
src/manager.rs
··· 334 334 self.index.read().unwrap().last_bundle 335 335 } 336 336 337 + pub fn directory(&self) -> &PathBuf { 338 + &self.directory 339 + } 340 + 337 341 fn clone_for_arc(&self) -> Self { 338 342 Self { 339 343 directory: self.directory.clone(), ··· 459 463 pub format: ExportFormat, 460 464 pub filter: Option<OperationFilter>, 461 465 pub compression: Option<CompressionType>, 466 + pub count: Option<usize>, 467 + pub after_timestamp: Option<String>, 462 468 } 463 469 464 470 #[derive(Debug, Clone)]