A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at test-validate 1345 lines 37 kB view raw
1package bundle 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "log" 8 "os" 9 "path/filepath" 10 "runtime" 11 "sort" 12 "strconv" 13 "strings" 14 "sync" 15 "time" 16 17 "tangled.org/atscan.net/plcbundle/plc" 18) 19 20// defaultLogger is a simple logger implementation 21type defaultLogger struct{} 22 23func (d defaultLogger) Printf(format string, v ...interface{}) { 24 log.Printf(format, v...) 25} 26 27func (d defaultLogger) Println(v ...interface{}) { 28 log.Println(v...) 29} 30 31// Manager handles bundle operations 32type Manager struct { 33 config *Config 34 operations *Operations 35 index *Index 36 indexPath string 37 plcClient *plc.Client 38 logger Logger 39 mempool *Mempool 40} 41 42// NewManager creates a new bundle manager 43func NewManager(config *Config, plcClient *plc.Client) (*Manager, error) { 44 if config == nil { 45 config = DefaultConfig("./plc_bundles") 46 } 47 48 if config.Logger == nil { 49 config.Logger = defaultLogger{} 50 } 51 52 // Ensure directory exists 53 if err := os.MkdirAll(config.BundleDir, 0755); err != nil { 54 return nil, fmt.Errorf("failed to create bundle directory: %w", err) 55 } 56 57 // Initialize operations handler 58 ops, err := NewOperations(config.Logger) 59 if err != nil { 60 return nil, fmt.Errorf("failed to initialize operations: %w", err) 61 } 62 63 // Load or create index 64 indexPath := filepath.Join(config.BundleDir, INDEX_FILE) 65 index, err := LoadIndex(indexPath) 66 67 // Check for bundle files in directory 68 bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst")) 69 bundleFiles = filterBundleFiles(bundleFiles) 70 hasBundleFiles := len(bundleFiles) > 0 71 72 // Check if clone/download is in progress (look for .tmp files) 73 tmpFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.tmp")) 74 cloneInProgress := len(tmpFiles) > 0 75 76 needsRebuild := false 77 78 if err != nil { 79 // Index doesn't exist or is invalid 80 if hasBundleFiles { 81 if cloneInProgress { 82 config.Logger.Printf("Clone/download in progress, skipping auto-rebuild") 83 } else { 84 // We have bundles but no index - need to rebuild 85 config.Logger.Printf("No valid index found, but detected %d bundle files", len(bundleFiles)) 86 needsRebuild = true 87 } 88 } else { 89 // No index and no bundles - create fresh index 90 config.Logger.Printf("Creating new index at %s", indexPath) 91 index = NewIndex() 92 if err := index.Save(indexPath); err != nil { 93 return nil, fmt.Errorf("failed to save new index: %w", err) 94 } 95 } 96 } else { 97 // Index exists - check if it's complete 98 config.Logger.Printf("Loaded index with %d bundles", index.Count()) 99 100 // Check if there are bundle files not in the index 101 if hasBundleFiles && len(bundleFiles) > index.Count() { 102 if cloneInProgress { 103 config.Logger.Printf("Clone/download in progress (%d .tmp files), skipping auto-rebuild", len(tmpFiles)) 104 } else { 105 config.Logger.Printf("Detected %d bundle files but index only has %d entries - rebuilding", 106 len(bundleFiles), index.Count()) 107 needsRebuild = true 108 } 109 } 110 } 111 112 // Perform rebuild if needed (using parallel scan) 113 if needsRebuild && config.AutoRebuild { 114 config.Logger.Printf("Rebuilding index from %d bundle files...", len(bundleFiles)) 115 116 // Create temporary manager for scanning 117 tempMgr := &Manager{ 118 config: config, 119 operations: ops, 120 index: NewIndex(), 121 indexPath: indexPath, 122 logger: config.Logger, 123 } 124 125 // Use parallel scan with auto-detected CPU count 126 workers := config.RebuildWorkers 127 if workers <= 0 { 128 workers = runtime.NumCPU() 129 if workers < 1 { 130 workers = 1 131 } 132 } 133 134 config.Logger.Printf("Using %d workers for parallel scan", workers) 135 136 // Create progress callback wrapper with new signature 137 var progressCallback func(current, total int, bytesProcessed int64) 138 if config.RebuildProgress != nil { 139 // Wrap the old-style callback to work with new signature 140 oldCallback := config.RebuildProgress 141 progressCallback = func(current, total int, bytesProcessed int64) { 142 oldCallback(current, total) 143 } 144 } else { 145 // Default: log every 100 bundles 146 progressCallback = func(current, total int, bytesProcessed int64) { 147 if current%100 == 0 || current == total { 148 mbProcessed := float64(bytesProcessed) / (1024 * 1024) 149 config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%), %.1f MB processed", 150 current, total, float64(current)/float64(total)*100, mbProcessed) 151 } 152 } 153 } 154 155 start := time.Now() 156 157 // Scan directory to rebuild index (parallel) 158 result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback) 159 if err != nil { 160 return nil, fmt.Errorf("failed to rebuild index: %w", err) 161 } 162 163 elapsed := time.Since(start) 164 165 // Reload the rebuilt index 166 index, err = LoadIndex(indexPath) 167 if err != nil { 168 return nil, fmt.Errorf("failed to load rebuilt index: %w", err) 169 } 170 171 // Calculate throughput 172 mbPerSec := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024) 173 174 config.Logger.Printf("✓ Index rebuilt with %d bundles in %s", 175 index.Count(), elapsed.Round(time.Millisecond)) 176 config.Logger.Printf(" Speed: %.1f bundles/sec, %.1f MB/s (uncompressed)", 177 float64(result.BundleCount)/elapsed.Seconds(), mbPerSec) 178 179 // Verify all chain hashes are present 180 bundles := index.GetBundles() 181 missingHashes := 0 182 for i, meta := range bundles { 183 if meta.ContentHash == "" { 184 missingHashes++ 185 } 186 if i > 0 && meta.Hash == "" { 187 missingHashes++ 188 } 189 } 190 if missingHashes > 0 { 191 config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes) 192 } 193 } 194 195 if index == nil { 196 index = NewIndex() 197 } 198 199 // Initialize mempool for next bundle 200 lastBundle := index.GetLastBundle() 201 nextBundleNum := 1 202 var minTimestamp time.Time 203 204 if lastBundle != nil { 205 nextBundleNum = lastBundle.BundleNumber + 1 206 minTimestamp = lastBundle.EndTime 207 } 208 209 mempool, err := NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger) 210 if err != nil { 211 return nil, fmt.Errorf("failed to initialize mempool: %w", err) 212 } 213 214 return &Manager{ 215 config: config, 216 operations: ops, 217 index: index, 218 indexPath: indexPath, 219 plcClient: plcClient, 220 logger: config.Logger, 221 mempool: mempool, 222 }, nil 223} 224 225// Close cleans up resources 226func (m *Manager) Close() { 227 if m.operations != nil { 228 m.operations.Close() 229 } 230 if m.plcClient != nil { 231 m.plcClient.Close() 232 } 233 if m.mempool != nil { 234 if err := m.mempool.Save(); err != nil { 235 m.logger.Printf("Warning: failed to save mempool: %v", err) 236 } 237 } 238} 239 240// GetIndex returns the current index 241func (m *Manager) GetIndex() *Index { 242 return m.index 243} 244 245// SaveIndex saves the index to disk 246func (m *Manager) SaveIndex() error { 247 return m.index.Save(m.indexPath) 248} 249 250// LoadBundle loads a bundle from disk 251func (m *Manager) LoadBundle(ctx context.Context, bundleNumber int) (*Bundle, error) { 252 // Get metadata from index 253 meta, err := m.index.GetBundle(bundleNumber) 254 if err != nil { 255 return nil, fmt.Errorf("bundle not in index: %w", err) 256 } 257 258 // Load file 259 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 260 if !m.operations.FileExists(path) { 261 return nil, fmt.Errorf("bundle file not found: %s", path) 262 } 263 264 // Verify hash if enabled 265 if m.config.VerifyOnLoad { 266 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 267 if err != nil { 268 return nil, fmt.Errorf("failed to verify hash: %w", err) 269 } 270 if !valid { 271 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 272 } 273 } 274 275 // Load operations 276 operations, err := m.operations.LoadBundle(path) 277 if err != nil { 278 return nil, fmt.Errorf("failed to load bundle: %w", err) 279 } 280 281 // Create bundle struct 282 bundle := &Bundle{ 283 BundleNumber: meta.BundleNumber, 284 StartTime: meta.StartTime, 285 EndTime: meta.EndTime, 286 Operations: operations, 287 DIDCount: meta.DIDCount, 288 Hash: meta.Hash, // Chain hash (primary) 289 ContentHash: meta.ContentHash, // Content hash 290 Parent: meta.Parent, // Parent chain hash 291 CompressedHash: meta.CompressedHash, 292 CompressedSize: meta.CompressedSize, 293 UncompressedSize: meta.UncompressedSize, 294 Cursor: meta.Cursor, 295 Compressed: true, 296 CreatedAt: meta.CreatedAt, 297 } 298 299 return bundle, nil 300} 301 302// SaveBundle saves a bundle to disk and updates the index 303func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle) error { 304 if err := bundle.ValidateForSave(); err != nil { 305 return fmt.Errorf("bundle validation failed: %w", err) 306 } 307 308 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber)) 309 310 // Save to disk 311 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations) 312 if err != nil { 313 return fmt.Errorf("failed to save bundle: %w", err) 314 } 315 316 // Set content hash and compressed metadata 317 bundle.ContentHash = uncompressedHash 318 bundle.CompressedHash = compressedHash 319 bundle.UncompressedSize = uncompressedSize 320 bundle.CompressedSize = compressedSize 321 bundle.CreatedAt = time.Now().UTC() 322 323 // Get parent (previous chain hash) 324 var parent string 325 if bundle.BundleNumber > 1 { 326 prevBundle := m.index.GetLastBundle() 327 if prevBundle != nil { 328 parent = prevBundle.Hash // Get previous chain hash 329 330 m.logger.Printf("Previous bundle %06d: hash=%s, content=%s", 331 prevBundle.BundleNumber, 332 formatHashPreview(prevBundle.Hash), 333 formatHashPreview(prevBundle.ContentHash)) 334 } else { 335 // Try to get specific previous bundle 336 if prevMeta, err := m.index.GetBundle(bundle.BundleNumber - 1); err == nil { 337 parent = prevMeta.Hash 338 339 m.logger.Printf("Found previous bundle %06d: hash=%s, content=%s", 340 prevMeta.BundleNumber, 341 formatHashPreview(prevMeta.Hash), 342 formatHashPreview(prevMeta.ContentHash)) 343 } 344 } 345 } 346 347 bundle.Parent = parent 348 349 // Calculate chain hash (now stored as primary Hash) 350 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 351 352 m.logger.Printf("Bundle %06d: hash=%s, content=%s, parent=%s", 353 bundle.BundleNumber, 354 formatHashPreview(bundle.Hash), 355 formatHashPreview(bundle.ContentHash), 356 formatHashPreview(bundle.Parent)) 357 358 // Add to index 359 m.index.AddBundle(bundle.ToMetadata()) 360 361 // Save index 362 if err := m.SaveIndex(); err != nil { 363 return fmt.Errorf("failed to save index: %w", err) 364 } 365 366 m.logger.Printf("Saved bundle %06d (hash: %s)", 367 bundle.BundleNumber, 368 formatHashPreview(bundle.Hash)) 369 370 // IMPORTANT: Clean up old mempool and create new one for next bundle 371 oldMempoolFile := m.mempool.GetFilename() 372 if err := m.mempool.Delete(); err != nil { 373 m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err) 374 } else { 375 m.logger.Printf("Deleted mempool: %s", oldMempoolFile) 376 } 377 378 // Create new mempool for next bundle 379 nextBundle := bundle.BundleNumber + 1 380 minTimestamp := bundle.EndTime 381 382 newMempool, err := NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger) 383 if err != nil { 384 return fmt.Errorf("failed to create new mempool: %w", err) 385 } 386 387 m.mempool = newMempool 388 m.logger.Printf("Created new mempool for bundle %06d (min timestamp: %s)", 389 nextBundle, minTimestamp.Format(time.RFC3339Nano)) 390 391 return nil 392} 393 394// formatHashPreview safely formats a hash for display 395func formatHashPreview(hash string) string { 396 if len(hash) == 0 { 397 return "(empty)" 398 } 399 if len(hash) < 16 { 400 return hash 401 } 402 return hash[:16] + "..." 403} 404 405// FetchNextBundle fetches the next bundle from PLC directory 406func (m *Manager) FetchNextBundle(ctx context.Context) (*Bundle, error) { 407 if m.plcClient == nil { 408 return nil, fmt.Errorf("PLC client not configured") 409 } 410 411 // Determine next bundle number 412 lastBundle := m.index.GetLastBundle() 413 nextBundleNum := 1 414 var afterTime string 415 var prevBoundaryCIDs map[string]bool 416 var prevBundleHash string 417 418 if lastBundle != nil { 419 nextBundleNum = lastBundle.BundleNumber + 1 420 afterTime = lastBundle.EndTime.Format(time.RFC3339Nano) 421 prevBundleHash = lastBundle.Hash 422 423 // Try to load boundary CIDs from previous bundle 424 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber) 425 if err == nil { 426 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations) 427 } 428 } 429 430 m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count()) 431 432 // Keep fetching until we have enough operations 433 for m.mempool.Count() < BUNDLE_SIZE { 434 m.logger.Printf("Fetching more operations (have %d/%d)...", m.mempool.Count(), BUNDLE_SIZE) 435 436 err := m.fetchToMempool(ctx, afterTime, prevBoundaryCIDs, BUNDLE_SIZE-m.mempool.Count()) 437 if err != nil { 438 // If we can't fetch more, check if we have enough 439 if m.mempool.Count() >= BUNDLE_SIZE { 440 break // We have enough now 441 } 442 // Save current state and return error 443 m.mempool.Save() 444 return nil, fmt.Errorf("insufficient operations: have %d, need %d", m.mempool.Count(), BUNDLE_SIZE) 445 } 446 447 // Check if we made progress 448 if m.mempool.Count() < BUNDLE_SIZE { 449 // Didn't get enough, but got some - save and return error 450 m.mempool.Save() 451 return nil, fmt.Errorf("insufficient operations: have %d, need %d (no more available)", m.mempool.Count(), BUNDLE_SIZE) 452 } 453 } 454 455 // Create bundle from mempool 456 m.logger.Printf("Creating bundle %06d from mempool", nextBundleNum) 457 operations, err := m.mempool.Take(BUNDLE_SIZE) // ✅ Fixed - handle both return values 458 if err != nil { 459 m.mempool.Save() 460 return nil, fmt.Errorf("failed to take operations from mempool: %w", err) 461 } 462 463 bundle := m.operations.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash) 464 465 // Save mempool state 466 if err := m.mempool.Save(); err != nil { 467 m.logger.Printf("Warning: failed to save mempool: %v", err) 468 } 469 470 m.logger.Printf("✓ Bundle %06d ready (%d ops, mempool: %d remaining)", 471 nextBundleNum, len(operations), m.mempool.Count()) 472 473 return bundle, nil 474} 475 476// fetchToMempool fetches operations and adds them to mempool (returns error if no progress) 477func (m *Manager) fetchToMempool(ctx context.Context, afterTime string, prevBoundaryCIDs map[string]bool, target int) error { 478 seenCIDs := make(map[string]bool) 479 480 // Mark previous boundary CIDs as seen 481 for cid := range prevBoundaryCIDs { 482 seenCIDs[cid] = true 483 } 484 485 // Use last mempool time if available 486 if m.mempool.Count() > 0 { 487 afterTime = m.mempool.GetLastTime() 488 m.logger.Printf(" Continuing from mempool cursor: %s", afterTime) 489 } 490 491 currentAfter := afterTime 492 maxFetches := 20 493 totalAdded := 0 494 startingCount := m.mempool.Count() 495 496 for fetchNum := 0; fetchNum < maxFetches; fetchNum++ { 497 // Calculate batch size 498 remaining := target - (m.mempool.Count() - startingCount) 499 if remaining <= 0 { 500 break 501 } 502 503 batchSize := 1000 504 if remaining < 500 { 505 batchSize = 200 506 } 507 508 m.logger.Printf(" Fetch #%d: requesting %d operations (mempool: %d)", 509 fetchNum+1, batchSize, m.mempool.Count()) 510 511 batch, err := m.plcClient.Export(ctx, plc.ExportOptions{ 512 Count: batchSize, 513 After: currentAfter, 514 }) 515 if err != nil { 516 m.mempool.Save() 517 return fmt.Errorf("export failed: %w", err) 518 } 519 520 if len(batch) == 0 { 521 m.logger.Printf(" No more operations available from PLC") 522 m.mempool.Save() 523 if totalAdded > 0 { 524 return nil 525 } 526 return fmt.Errorf("no operations available") 527 } 528 529 // Deduplicate 530 uniqueOps := make([]plc.PLCOperation, 0) 531 for _, op := range batch { 532 if !seenCIDs[op.CID] { 533 seenCIDs[op.CID] = true 534 uniqueOps = append(uniqueOps, op) 535 } 536 } 537 538 if len(uniqueOps) > 0 { 539 // CRITICAL: Add with validation 540 added, err := m.mempool.Add(uniqueOps) 541 if err != nil { 542 // Validation error - save current state and return 543 m.mempool.Save() 544 return fmt.Errorf("chronological validation failed: %w", err) 545 } 546 547 totalAdded += added 548 m.logger.Printf(" Added %d new operations (mempool now: %d)", added, m.mempool.Count()) 549 550 // Save after each successful addition 551 if err := m.mempool.Save(); err != nil { 552 m.logger.Printf(" Warning: failed to save mempool: %v", err) 553 } 554 } 555 556 // Update cursor 557 if len(batch) > 0 { 558 currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano) 559 } 560 561 // Stop if we got less than requested (caught up) 562 if len(batch) < batchSize { 563 m.logger.Printf(" Received incomplete batch (%d/%d), caught up to latest", len(batch), batchSize) 564 break 565 } 566 } 567 568 // Final save 569 m.mempool.Save() 570 571 if totalAdded > 0 { 572 m.logger.Printf("✓ Fetch complete: added %d operations (mempool: %d)", totalAdded, m.mempool.Count()) 573 return nil 574 } 575 576 return fmt.Errorf("no new operations added") 577} 578 579// GetMempoolStats returns mempool statistics 580func (m *Manager) GetMempoolStats() map[string]interface{} { 581 return m.mempool.Stats() 582} 583 584// GetMempoolOperations returns all operations currently in mempool 585func (m *Manager) GetMempoolOperations() ([]plc.PLCOperation, error) { 586 if m.mempool == nil { 587 return nil, fmt.Errorf("mempool not initialized") 588 } 589 590 // Use Peek to get operations without removing them 591 count := m.mempool.Count() 592 if count == 0 { 593 return []plc.PLCOperation{}, nil 594 } 595 596 return m.mempool.Peek(count), nil 597} 598 599// VerifyBundle verifies a bundle's integrity 600func (m *Manager) VerifyBundle(ctx context.Context, bundleNumber int) (*VerificationResult, error) { 601 result := &VerificationResult{ 602 BundleNumber: bundleNumber, 603 } 604 605 // Get from index 606 meta, err := m.index.GetBundle(bundleNumber) 607 if err != nil { 608 result.Error = err 609 return result, nil 610 } 611 612 result.ExpectedHash = meta.CompressedHash 613 614 // Check file exists 615 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 616 result.FileExists = m.operations.FileExists(path) 617 if !result.FileExists { 618 result.Error = fmt.Errorf("file not found") 619 return result, nil 620 } 621 622 // Verify hash 623 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 624 if err != nil { 625 result.Error = err 626 return result, nil 627 } 628 629 result.LocalHash = actualHash 630 result.HashMatch = valid 631 result.Valid = valid 632 633 return result, nil 634} 635 636// VerifyChain verifies the entire bundle chain 637func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) { 638 result := &ChainVerificationResult{ 639 VerifiedBundles: make([]int, 0), 640 } 641 642 bundles := m.index.GetBundles() 643 if len(bundles) == 0 { 644 result.Valid = true 645 return result, nil 646 } 647 648 result.ChainLength = len(bundles) 649 650 for i, meta := range bundles { 651 // Verify file hash 652 vr, err := m.VerifyBundle(ctx, meta.BundleNumber) 653 if err != nil || !vr.Valid { 654 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber) 655 result.BrokenAt = meta.BundleNumber 656 return result, nil 657 } 658 659 // Verify chain link 660 if i > 0 { 661 prevMeta := bundles[i-1] 662 663 // Check parent reference 664 if meta.Parent != prevMeta.Hash { 665 result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber) 666 result.BrokenAt = meta.BundleNumber 667 return result, nil 668 } 669 670 // Verify chain hash calculation 671 expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash) 672 if meta.Hash != expectedHash { 673 result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber) 674 result.BrokenAt = meta.BundleNumber 675 return result, nil 676 } 677 } 678 679 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber) 680 } 681 682 result.Valid = true 683 return result, nil 684} 685 686// ScanDirectory scans the bundle directory and rebuilds the index 687func (m *Manager) ScanDirectory() (*DirectoryScanResult, error) { 688 result := &DirectoryScanResult{ 689 BundleDir: m.config.BundleDir, 690 } 691 692 m.logger.Printf("Scanning directory: %s", m.config.BundleDir) 693 694 // Find all bundle files 695 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst")) 696 if err != nil { 697 return nil, fmt.Errorf("failed to scan directory: %w", err) 698 } 699 files = filterBundleFiles(files) 700 701 if len(files) == 0 { 702 m.logger.Printf("No bundle files found") 703 return result, nil 704 } 705 706 // Parse bundle numbers 707 var bundleNumbers []int 708 for _, file := range files { 709 base := filepath.Base(file) 710 numStr := strings.TrimSuffix(base, ".jsonl.zst") 711 num, err := strconv.Atoi(numStr) 712 if err != nil { 713 m.logger.Printf("Warning: skipping invalid filename: %s", base) 714 continue 715 } 716 bundleNumbers = append(bundleNumbers, num) 717 } 718 719 sort.Ints(bundleNumbers) 720 721 result.BundleCount = len(bundleNumbers) 722 if len(bundleNumbers) > 0 { 723 result.FirstBundle = bundleNumbers[0] 724 result.LastBundle = bundleNumbers[len(bundleNumbers)-1] 725 } 726 727 // Find gaps 728 if len(bundleNumbers) > 1 { 729 for i := result.FirstBundle; i <= result.LastBundle; i++ { 730 found := false 731 for _, num := range bundleNumbers { 732 if num == i { 733 found = true 734 break 735 } 736 } 737 if !found { 738 result.MissingGaps = append(result.MissingGaps, i) 739 } 740 } 741 } 742 743 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps)) 744 745 // Load each bundle and rebuild index 746 var newMetadata []*BundleMetadata 747 var totalSize int64 748 749 for _, num := range bundleNumbers { 750 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 751 752 // Load bundle 753 ops, err := m.operations.LoadBundle(path) 754 if err != nil { 755 m.logger.Printf("Warning: failed to load bundle %d: %v", num, err) 756 continue 757 } 758 759 // Get file size 760 size, _ := m.operations.GetFileSize(path) 761 totalSize += size 762 763 // Calculate parent and cursor from previous bundle 764 var parent string 765 var cursor string 766 if num > 1 && len(newMetadata) > 0 { 767 prevMeta := newMetadata[len(newMetadata)-1] 768 parent = prevMeta.Hash 769 cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 770 } 771 772 // Use the ONE method for metadata calculation 773 meta, err := m.operations.CalculateBundleMetadata(num, path, ops, parent, cursor) 774 if err != nil { 775 m.logger.Printf("Warning: failed to calculate metadata for bundle %d: %v", num, err) 776 continue 777 } 778 779 newMetadata = append(newMetadata, meta) 780 781 m.logger.Printf(" Scanned bundle %06d: %d ops, %d DIDs", num, len(ops), meta.DIDCount) 782 } 783 784 result.TotalSize = totalSize 785 786 // Rebuild index 787 m.index.Rebuild(newMetadata) 788 789 // Save index 790 if err := m.SaveIndex(); err != nil { 791 return nil, fmt.Errorf("failed to save index: %w", err) 792 } 793 794 result.IndexUpdated = true 795 796 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata)) 797 798 return result, nil 799} 800 801// ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index 802func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*DirectoryScanResult, error) { 803 result := &DirectoryScanResult{ 804 BundleDir: m.config.BundleDir, 805 } 806 807 m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir) 808 809 // Find all bundle files 810 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst")) 811 if err != nil { 812 return nil, fmt.Errorf("failed to scan directory: %w", err) 813 } 814 files = filterBundleFiles(files) 815 816 if len(files) == 0 { 817 m.logger.Printf("No bundle files found") 818 return result, nil 819 } 820 821 // Parse bundle numbers 822 var bundleNumbers []int 823 for _, file := range files { 824 base := filepath.Base(file) 825 numStr := strings.TrimSuffix(base, ".jsonl.zst") 826 num, err := strconv.Atoi(numStr) 827 if err != nil { 828 m.logger.Printf("Warning: skipping invalid filename: %s", base) 829 continue 830 } 831 bundleNumbers = append(bundleNumbers, num) 832 } 833 834 sort.Ints(bundleNumbers) 835 836 result.BundleCount = len(bundleNumbers) 837 if len(bundleNumbers) > 0 { 838 result.FirstBundle = bundleNumbers[0] 839 result.LastBundle = bundleNumbers[len(bundleNumbers)-1] 840 } 841 842 // Find gaps 843 if len(bundleNumbers) > 1 { 844 for i := result.FirstBundle; i <= result.LastBundle; i++ { 845 found := false 846 for _, num := range bundleNumbers { 847 if num == i { 848 found = true 849 break 850 } 851 } 852 if !found { 853 result.MissingGaps = append(result.MissingGaps, i) 854 } 855 } 856 } 857 858 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps)) 859 860 // Process bundles in parallel 861 type bundleResult struct { 862 index int 863 meta *BundleMetadata 864 err error 865 } 866 867 jobs := make(chan int, len(bundleNumbers)) 868 results := make(chan bundleResult, len(bundleNumbers)) 869 870 // Start workers 871 var wg sync.WaitGroup 872 for w := 0; w < workers; w++ { 873 wg.Add(1) 874 go func() { 875 defer wg.Done() 876 for num := range jobs { 877 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 878 879 // Load and process bundle 880 ops, err := m.operations.LoadBundle(path) 881 if err != nil { 882 results <- bundleResult{index: num, err: err} 883 continue 884 } 885 886 // Use the FAST method (cursor will be set later in sequential phase) 887 meta, err := m.operations.CalculateBundleMetadataFast(num, path, ops, "") 888 if err != nil { 889 results <- bundleResult{index: num, err: err} 890 continue 891 } 892 893 results <- bundleResult{index: num, meta: meta} 894 } 895 }() 896 } 897 898 // Send jobs 899 for _, num := range bundleNumbers { 900 jobs <- num 901 } 902 close(jobs) 903 904 // Wait for all workers to finish 905 go func() { 906 wg.Wait() 907 close(results) 908 }() 909 910 // Collect results (in a map first, then sort) 911 metadataMap := make(map[int]*BundleMetadata) 912 var totalSize int64 913 var totalUncompressed int64 914 processed := 0 915 916 for result := range results { 917 processed++ 918 919 // Update progress WITH bytes 920 if progressCallback != nil { 921 if result.meta != nil { 922 totalUncompressed += result.meta.UncompressedSize 923 } 924 progressCallback(processed, len(bundleNumbers), totalUncompressed) 925 } 926 927 if result.err != nil { 928 m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err) 929 continue 930 } 931 metadataMap[result.index] = result.meta 932 totalSize += result.meta.CompressedSize 933 } 934 935 // Build ordered metadata slice and calculate chain hashes 936 var newMetadata []*BundleMetadata 937 var parent string // Parent chain hash 938 939 for i, num := range bundleNumbers { 940 meta, ok := metadataMap[num] 941 if !ok { 942 continue // Skip failed bundles 943 } 944 945 // Set cursor from previous bundle's EndTime 946 if i > 0 && len(newMetadata) > 0 { 947 prevMeta := newMetadata[len(newMetadata)-1] 948 meta.Cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 949 } 950 951 // Now calculate chain hash (must be done sequentially) 952 meta.Hash = m.operations.CalculateChainHash(parent, meta.ContentHash) 953 meta.Parent = parent 954 955 newMetadata = append(newMetadata, meta) 956 parent = meta.Hash // Store for next iteration 957 } 958 959 result.TotalSize = totalSize 960 result.TotalUncompressed = totalUncompressed 961 962 // Rebuild index 963 m.index.Rebuild(newMetadata) 964 965 // Save index 966 if err := m.SaveIndex(); err != nil { 967 return nil, fmt.Errorf("failed to save index: %w", err) 968 } 969 970 result.IndexUpdated = true 971 972 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata)) 973 974 return result, nil 975} 976 977// GetInfo returns information about the bundle manager 978func (m *Manager) GetInfo() map[string]interface{} { 979 stats := m.index.GetStats() 980 stats["bundle_dir"] = m.config.BundleDir 981 stats["index_path"] = m.indexPath 982 stats["verify_on_load"] = m.config.VerifyOnLoad 983 return stats 984} 985 986// ExportOperations exports operations from bundles 987func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plc.PLCOperation, error) { 988 if count <= 0 { 989 count = 1000 990 } 991 992 var result []plc.PLCOperation 993 seenCIDs := make(map[string]bool) 994 995 bundles := m.index.GetBundles() 996 997 for _, meta := range bundles { 998 if result != nil && len(result) >= count { 999 break 1000 } 1001 1002 // Skip bundles before afterTime 1003 if !afterTime.IsZero() && meta.EndTime.Before(afterTime) { 1004 continue 1005 } 1006 1007 // Load bundle 1008 bundle, err := m.LoadBundle(ctx, meta.BundleNumber) 1009 if err != nil { 1010 m.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err) 1011 continue 1012 } 1013 1014 // Add operations 1015 for _, op := range bundle.Operations { 1016 if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) { 1017 continue 1018 } 1019 1020 if seenCIDs[op.CID] { 1021 continue 1022 } 1023 1024 seenCIDs[op.CID] = true 1025 result = append(result, op) 1026 1027 if len(result) >= count { 1028 break 1029 } 1030 } 1031 } 1032 1033 return result, nil 1034} 1035 1036// ScanBundle scans a single bundle file and returns its metadata 1037func (m *Manager) ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) { 1038 // Load bundle file 1039 operations, err := m.operations.LoadBundle(path) 1040 if err != nil { 1041 return nil, fmt.Errorf("failed to load bundle: %w", err) 1042 } 1043 1044 if len(operations) == 0 { 1045 return nil, fmt.Errorf("bundle is empty") 1046 } 1047 1048 // Get parent chain hash and cursor from previous bundle 1049 var parent string 1050 var cursor string 1051 if bundleNumber > 1 { 1052 if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil { 1053 parent = prevMeta.Hash 1054 cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 1055 } 1056 } 1057 1058 // Use the ONE method 1059 return m.operations.CalculateBundleMetadata(bundleNumber, path, operations, parent, cursor) 1060} 1061 1062// ScanAndIndexBundle scans a bundle file and adds it to the index 1063func (m *Manager) ScanAndIndexBundle(path string, bundleNumber int) (*BundleMetadata, error) { 1064 meta, err := m.ScanBundle(path, bundleNumber) 1065 if err != nil { 1066 return nil, err 1067 } 1068 1069 // Add to index 1070 m.index.AddBundle(meta) 1071 1072 // Save index 1073 if err := m.SaveIndex(); err != nil { 1074 return nil, fmt.Errorf("failed to save index: %w", err) 1075 } 1076 1077 return meta, nil 1078} 1079 1080// IsBundleIndexed checks if a bundle is already in the index 1081func (m *Manager) IsBundleIndexed(bundleNumber int) bool { 1082 _, err := m.index.GetBundle(bundleNumber) 1083 return err == nil 1084} 1085 1086// RefreshMempool reloads mempool from disk (useful for debugging) 1087func (m *Manager) RefreshMempool() error { 1088 if m.mempool == nil { 1089 return fmt.Errorf("mempool not initialized") 1090 } 1091 return m.mempool.Load() 1092} 1093 1094// ClearMempool clears all operations from the mempool and saves 1095func (m *Manager) ClearMempool() error { 1096 if m.mempool == nil { 1097 return fmt.Errorf("mempool not initialized") 1098 } 1099 1100 m.logger.Printf("Clearing mempool...") 1101 1102 // Get count before clearing 1103 count := m.mempool.Count() 1104 1105 // Clear the mempool 1106 m.mempool.Clear() 1107 1108 // Save the empty state (this will delete the file since it's empty) 1109 if err := m.mempool.Save(); err != nil { 1110 return fmt.Errorf("failed to save mempool: %w", err) 1111 } 1112 1113 m.logger.Printf("Cleared %d operations from mempool", count) 1114 1115 return nil 1116} 1117 1118// Add validation method 1119func (m *Manager) ValidateMempool() error { 1120 if m.mempool == nil { 1121 return fmt.Errorf("mempool not initialized") 1122 } 1123 return m.mempool.Validate() 1124} 1125 1126// StreamBundleRaw streams the raw compressed bundle file 1127func (m *Manager) StreamBundleRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 1128 // Get metadata from index 1129 meta, err := m.index.GetBundle(bundleNumber) 1130 if err != nil { 1131 return nil, fmt.Errorf("bundle not in index: %w", err) 1132 } 1133 1134 // Build file path 1135 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 1136 if !m.operations.FileExists(path) { 1137 return nil, fmt.Errorf("bundle file not found: %s", path) 1138 } 1139 1140 // Optionally verify hash before streaming 1141 if m.config.VerifyOnLoad { 1142 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 1143 if err != nil { 1144 return nil, fmt.Errorf("failed to verify hash: %w", err) 1145 } 1146 if !valid { 1147 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 1148 } 1149 } 1150 1151 return m.operations.StreamRaw(path) 1152} 1153 1154// StreamBundleDecompressed streams the decompressed bundle data as JSONL 1155func (m *Manager) StreamBundleDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 1156 // Get metadata from index 1157 _, err := m.index.GetBundle(bundleNumber) 1158 if err != nil { 1159 return nil, fmt.Errorf("bundle not in index: %w", err) 1160 } 1161 1162 // Build file path 1163 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 1164 if !m.operations.FileExists(path) { 1165 return nil, fmt.Errorf("bundle file not found: %s", path) 1166 } 1167 1168 return m.operations.StreamDecompressed(path) 1169} 1170 1171// RefreshIndex reloads the index from disk if it has been modified 1172func (m *Manager) RefreshIndex() error { 1173 // Check if index file has been modified 1174 info, err := os.Stat(m.indexPath) 1175 if err != nil { 1176 return err 1177 } 1178 1179 // If index was modified after we loaded it, reload 1180 if info.ModTime().After(m.index.UpdatedAt) { 1181 m.logger.Printf("Index file modified, reloading...") 1182 1183 newIndex, err := LoadIndex(m.indexPath) 1184 if err != nil { 1185 return fmt.Errorf("failed to reload index: %w", err) 1186 } 1187 1188 m.index = newIndex 1189 m.logger.Printf("Index reloaded: %d bundles", m.index.Count()) 1190 } 1191 1192 return nil 1193} 1194 1195// filterBundleFiles filters out files starting with . or _ (system/temp files) 1196func filterBundleFiles(files []string) []string { 1197 filtered := make([]string, 0, len(files)) 1198 for _, file := range files { 1199 basename := filepath.Base(file) 1200 // Skip files starting with . or _ 1201 if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') { 1202 continue 1203 } 1204 filtered = append(filtered, file) 1205 } 1206 return filtered 1207} 1208 1209// ValidateBundle validates all operations in a bundle using go-didplc 1210func (m *Manager) ValidateBundle(ctx context.Context, bundleNumber int) error { 1211 bundle, err := m.LoadBundle(ctx, bundleNumber) 1212 if err != nil { 1213 return err 1214 } 1215 1216 validator := NewValidator(m.logger) 1217 return validator.ValidateBundleOperations(bundle.Operations) 1218} 1219 1220// ValidateAllBundles validates all bundles in the repository 1221func (m *Manager) ValidateAllBundles(ctx context.Context, progressFunc func(current, total int)) error { 1222 index := m.GetIndex() 1223 bundles := index.GetBundles() 1224 1225 m.logger.Printf("Validating %d bundles...", len(bundles)) 1226 1227 validator := NewValidator(m.logger) 1228 errors := 0 1229 1230 for i, meta := range bundles { 1231 if progressFunc != nil { 1232 progressFunc(i+1, len(bundles)) 1233 } 1234 1235 bundle, err := m.LoadBundle(ctx, meta.BundleNumber) 1236 if err != nil { 1237 m.logger.Printf("Failed to load bundle %d: %v", meta.BundleNumber, err) 1238 errors++ 1239 continue 1240 } 1241 1242 if err := validator.ValidateBundleOperations(bundle.Operations); err != nil { 1243 m.logger.Printf("Bundle %d validation failed: %v", meta.BundleNumber, err) 1244 errors++ 1245 } 1246 } 1247 1248 if errors > 0 { 1249 return fmt.Errorf("%d bundles failed validation", errors) 1250 } 1251 1252 return nil 1253} 1254 1255// ValidateBundleWithDetails validates a bundle and returns invalid operations 1256func (m *Manager) ValidateBundleWithDetails(ctx context.Context, bundleNumber int) ([]InvalidOperation, error) { 1257 bundle, err := m.LoadBundle(ctx, bundleNumber) 1258 if err != nil { 1259 return nil, err 1260 } 1261 1262 validator := NewValidator(m.logger) 1263 return validator.ValidateBundleOperationsWithDetails(bundle.Operations) 1264} 1265 1266// ValidateAllBundlesWithDetails validates all bundles and returns all invalid operations 1267func (m *Manager) ValidateAllBundlesWithDetails(ctx context.Context, progressFunc func(current, total int)) ([]InvalidOperation, error) { 1268 index := m.GetIndex() 1269 bundles := index.GetBundles() 1270 1271 validator := NewValidator(m.logger) 1272 var allInvalid []InvalidOperation 1273 errors := 0 1274 1275 for i, meta := range bundles { 1276 if progressFunc != nil { 1277 progressFunc(i+1, len(bundles)) 1278 } 1279 1280 bundle, err := m.LoadBundle(ctx, meta.BundleNumber) 1281 if err != nil { 1282 m.logger.Printf("Failed to load bundle %d: %v", meta.BundleNumber, err) 1283 errors++ 1284 continue 1285 } 1286 1287 invalid, err := validator.ValidateBundleOperationsWithDetails(bundle.Operations) 1288 if err != nil { 1289 m.logger.Printf("Bundle %d validation failed: %v", meta.BundleNumber, err) 1290 errors++ 1291 } 1292 1293 allInvalid = append(allInvalid, invalid...) 1294 } 1295 1296 if errors > 0 { 1297 return allInvalid, fmt.Errorf("%d bundles failed validation", errors) 1298 } 1299 1300 return allInvalid, nil 1301} 1302 1303// ValidateBundleStreaming validates a bundle and streams invalid operations 1304func (m *Manager) ValidateBundleStreaming(ctx context.Context, bundleNumber int, callback InvalidCallback) error { 1305 bundle, err := m.LoadBundle(ctx, bundleNumber) 1306 if err != nil { 1307 return err 1308 } 1309 1310 validator := NewValidator(m.logger) 1311 return validator.ValidateBundleOperationsStreaming(bundle.Operations, callback) 1312} 1313 1314// ValidateAllBundlesStreaming validates all bundles and streams invalid operations 1315func (m *Manager) ValidateAllBundlesStreaming(ctx context.Context, callback InvalidCallback, progressFunc func(current, total int)) error { 1316 index := m.GetIndex() 1317 bundles := index.GetBundles() 1318 1319 validator := NewValidator(m.logger) 1320 errors := 0 1321 1322 for i, meta := range bundles { 1323 if progressFunc != nil { 1324 progressFunc(i+1, len(bundles)) 1325 } 1326 1327 bundle, err := m.LoadBundle(ctx, meta.BundleNumber) 1328 if err != nil { 1329 m.logger.Printf("Failed to load bundle %d: %v", meta.BundleNumber, err) 1330 errors++ 1331 continue 1332 } 1333 1334 if err := validator.ValidateBundleOperationsStreaming(bundle.Operations, callback); err != nil { 1335 // Errors already streamed via callback, just count them 1336 errors++ 1337 } 1338 } 1339 1340 if errors > 0 { 1341 return fmt.Errorf("%d bundles had validation errors", errors) 1342 } 1343 1344 return nil 1345}