A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at did-resolver 1339 lines 36 kB view raw
1package bundle 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "log" 8 "os" 9 "path/filepath" 10 "runtime" 11 "sort" 12 "strconv" 13 "strings" 14 "sync" 15 "time" 16 17 "tangled.org/atscan.net/plcbundle/plc" 18) 19 20// defaultLogger is a simple logger implementation 21type defaultLogger struct{} 22 23func (d defaultLogger) Printf(format string, v ...interface{}) { 24 log.Printf(format, v...) 25} 26 27func (d defaultLogger) Println(v ...interface{}) { 28 log.Println(v...) 29} 30 31// Manager handles bundle operations 32type Manager struct { 33 config *Config 34 operations *Operations 35 index *Index 36 indexPath string 37 plcClient *plc.Client 38 logger Logger 39 mempool *Mempool 40 didIndex *DIDIndexManager 41} 42 43// NewManager creates a new bundle manager 44func NewManager(config *Config, plcClient *plc.Client) (*Manager, error) { 45 if config == nil { 46 config = DefaultConfig("./plc_bundles") 47 } 48 49 if config.Logger == nil { 50 config.Logger = defaultLogger{} 51 } 52 53 // Ensure directory exists 54 if err := os.MkdirAll(config.BundleDir, 0755); err != nil { 55 return nil, fmt.Errorf("failed to create bundle directory: %w", err) 56 } 57 58 // Initialize operations handler 59 ops, err := NewOperations(config.Logger) 60 if err != nil { 61 return nil, fmt.Errorf("failed to initialize operations: %w", err) 62 } 63 64 // Determine origin 65 var origin string 66 if plcClient != nil { 67 origin = plcClient.GetBaseURL() 68 } 69 70 // Load or create index 71 indexPath := filepath.Join(config.BundleDir, INDEX_FILE) 72 index, err := LoadIndex(indexPath) 73 74 // Check for bundle files in directory 75 bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst")) 76 bundleFiles = filterBundleFiles(bundleFiles) 77 hasBundleFiles := len(bundleFiles) > 0 78 79 // Check if clone/download is in progress (look for .tmp files) 80 tmpFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.tmp")) 81 cloneInProgress := len(tmpFiles) > 0 82 83 needsRebuild := false 84 85 if err != nil { 86 // Index doesn't exist or is invalid 87 if hasBundleFiles { 88 if cloneInProgress { 89 config.Logger.Printf("Clone/download in progress, skipping auto-rebuild") 90 } else { 91 config.Logger.Printf("No valid index found, but detected %d bundle files", len(bundleFiles)) 92 needsRebuild = true 93 } 94 } else { 95 // No index and no bundles - create fresh index 96 config.Logger.Printf("Creating new index at %s", indexPath) 97 index = NewIndex(origin) // Pass origin 98 if err := index.Save(indexPath); err != nil { 99 return nil, fmt.Errorf("failed to save new index: %w", err) 100 } 101 } 102 } else { 103 // Index exists - auto-populate origin if missing (ONLY TIME THIS HAPPENS) 104 if index.Origin == "" { 105 if origin != "" { 106 config.Logger.Printf("⚠️ Upgrading old index: setting origin to %s", origin) 107 index.Origin = origin 108 if err := index.Save(indexPath); err != nil { 109 return nil, fmt.Errorf("failed to update index with origin: %w", err) 110 } 111 } else { 112 config.Logger.Printf("⚠️ Warning: index has no origin and no PLC client configured") 113 } 114 } 115 116 // Validate origin matches if both are set 117 if index.Origin != "" && origin != "" && index.Origin != origin { 118 return nil, fmt.Errorf( 119 "origin mismatch: index has origin %q but PLC client points to %q\n"+ 120 "Cannot mix bundles from different sources. Use a different directory or reconfigure PLC client", 121 index.Origin, origin, 122 ) 123 } 124 125 config.Logger.Printf("Loaded index with %d bundles (origin: %s)", index.Count(), index.Origin) 126 127 // Check if there are bundle files not in the index 128 if hasBundleFiles && len(bundleFiles) > index.Count() { 129 if cloneInProgress { 130 config.Logger.Printf("Clone/download in progress (%d .tmp files), skipping auto-rebuild", len(tmpFiles)) 131 } else { 132 config.Logger.Printf("Detected %d bundle files but index only has %d entries - rebuilding", 133 len(bundleFiles), index.Count()) 134 needsRebuild = true 135 } 136 } 137 } 138 139 if index != nil && plcClient != nil { 140 currentOrigin := plcClient.GetBaseURL() 141 142 // Check if origins match 143 if index.Origin != "" && index.Origin != currentOrigin { 144 return nil, fmt.Errorf( 145 "origin mismatch: index has origin %q but PLC client points to %q. "+ 146 "Cannot mix bundles from different sources", 147 index.Origin, currentOrigin, 148 ) 149 } 150 151 // Set origin if not set (for backward compatibility with old indexes) 152 if index.Origin == "" && currentOrigin != "" { 153 index.Origin = currentOrigin 154 config.Logger.Printf("Setting origin for existing index: %s", currentOrigin) 155 if err := index.Save(indexPath); err != nil { 156 return nil, fmt.Errorf("failed to update index with origin: %w", err) 157 } 158 } 159 } 160 161 // Perform rebuild if needed (using parallel scan) 162 if needsRebuild && config.AutoRebuild { 163 config.Logger.Printf("Rebuilding index from %d bundle files...", len(bundleFiles)) 164 165 // Create temporary manager for scanning 166 tempMgr := &Manager{ 167 config: config, 168 operations: ops, 169 index: NewIndex("test-origin"), 170 indexPath: indexPath, 171 logger: config.Logger, 172 } 173 174 // Use parallel scan with auto-detected CPU count 175 workers := config.RebuildWorkers 176 if workers <= 0 { 177 workers = runtime.NumCPU() 178 if workers < 1 { 179 workers = 1 180 } 181 } 182 183 config.Logger.Printf("Using %d workers for parallel scan", workers) 184 185 // Create progress callback wrapper with new signature 186 var progressCallback func(current, total int, bytesProcessed int64) 187 if config.RebuildProgress != nil { 188 // Wrap the old-style callback to work with new signature 189 oldCallback := config.RebuildProgress 190 progressCallback = func(current, total int, bytesProcessed int64) { 191 oldCallback(current, total) 192 } 193 } else { 194 // Default: log every 100 bundles 195 progressCallback = func(current, total int, bytesProcessed int64) { 196 if current%100 == 0 || current == total { 197 mbProcessed := float64(bytesProcessed) / (1024 * 1024) 198 config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%), %.1f MB processed", 199 current, total, float64(current)/float64(total)*100, mbProcessed) 200 } 201 } 202 } 203 204 start := time.Now() 205 206 // Scan directory to rebuild index (parallel) 207 result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback) 208 if err != nil { 209 return nil, fmt.Errorf("failed to rebuild index: %w", err) 210 } 211 212 elapsed := time.Since(start) 213 214 // Reload the rebuilt index 215 index, err = LoadIndex(indexPath) 216 if err != nil { 217 return nil, fmt.Errorf("failed to load rebuilt index: %w", err) 218 } 219 220 // Calculate throughput 221 mbPerSec := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024) 222 223 config.Logger.Printf("✓ Index rebuilt with %d bundles in %s", 224 index.Count(), elapsed.Round(time.Millisecond)) 225 config.Logger.Printf(" Speed: %.1f bundles/sec, %.1f MB/s (uncompressed)", 226 float64(result.BundleCount)/elapsed.Seconds(), mbPerSec) 227 228 // Verify all chain hashes are present 229 bundles := index.GetBundles() 230 missingHashes := 0 231 for i, meta := range bundles { 232 if meta.ContentHash == "" { 233 missingHashes++ 234 } 235 if i > 0 && meta.Hash == "" { 236 missingHashes++ 237 } 238 } 239 if missingHashes > 0 { 240 config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes) 241 } 242 } 243 244 if index == nil { 245 index = NewIndex("test-origin") 246 } 247 248 // Initialize mempool for next bundle 249 lastBundle := index.GetLastBundle() 250 nextBundleNum := 1 251 var minTimestamp time.Time 252 253 if lastBundle != nil { 254 nextBundleNum = lastBundle.BundleNumber + 1 255 minTimestamp = lastBundle.EndTime 256 } 257 258 mempool, err := NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger) 259 if err != nil { 260 return nil, fmt.Errorf("failed to initialize mempool: %w", err) 261 } 262 263 // Initialize DID index manager 264 didIndex := NewDIDIndexManager(config.BundleDir, config.Logger) 265 266 return &Manager{ 267 config: config, 268 operations: ops, 269 index: index, 270 indexPath: indexPath, 271 plcClient: plcClient, 272 logger: config.Logger, 273 mempool: mempool, 274 didIndex: didIndex, 275 }, nil 276} 277 278// Close cleans up resources 279func (m *Manager) Close() { 280 if m.operations != nil { 281 m.operations.Close() 282 } 283 if m.plcClient != nil { 284 m.plcClient.Close() 285 } 286 if m.mempool != nil { 287 if err := m.mempool.Save(); err != nil { 288 m.logger.Printf("Warning: failed to save mempool: %v", err) 289 } 290 } 291 if m.didIndex != nil { // ← ADD THIS 292 m.didIndex.Close() 293 } 294} 295 296// GetIndex returns the current index 297func (m *Manager) GetIndex() *Index { 298 return m.index 299} 300 301// SaveIndex saves the index to disk 302func (m *Manager) SaveIndex() error { 303 return m.index.Save(m.indexPath) 304} 305 306// LoadBundle loads a bundle from disk 307func (m *Manager) LoadBundle(ctx context.Context, bundleNumber int) (*Bundle, error) { 308 // Get metadata from index 309 meta, err := m.index.GetBundle(bundleNumber) 310 if err != nil { 311 return nil, fmt.Errorf("bundle not in index: %w", err) 312 } 313 314 // Load file 315 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 316 if !m.operations.FileExists(path) { 317 return nil, fmt.Errorf("bundle file not found: %s", path) 318 } 319 320 // Verify hash if enabled 321 if m.config.VerifyOnLoad { 322 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 323 if err != nil { 324 return nil, fmt.Errorf("failed to verify hash: %w", err) 325 } 326 if !valid { 327 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 328 } 329 } 330 331 // Load operations 332 operations, err := m.operations.LoadBundle(path) 333 if err != nil { 334 return nil, fmt.Errorf("failed to load bundle: %w", err) 335 } 336 337 // Create bundle struct 338 bundle := &Bundle{ 339 BundleNumber: meta.BundleNumber, 340 StartTime: meta.StartTime, 341 EndTime: meta.EndTime, 342 Operations: operations, 343 DIDCount: meta.DIDCount, 344 Hash: meta.Hash, // Chain hash (primary) 345 ContentHash: meta.ContentHash, // Content hash 346 Parent: meta.Parent, // Parent chain hash 347 CompressedHash: meta.CompressedHash, 348 CompressedSize: meta.CompressedSize, 349 UncompressedSize: meta.UncompressedSize, 350 Cursor: meta.Cursor, 351 Compressed: true, 352 CreatedAt: meta.CreatedAt, 353 } 354 355 return bundle, nil 356} 357 358// SaveBundle saves a bundle to disk and updates the index 359func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, quiet bool) error { 360 if err := bundle.ValidateForSave(); err != nil { 361 return fmt.Errorf("bundle validation failed: %w", err) 362 } 363 364 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber)) 365 366 // Save to disk 367 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations) 368 if err != nil { 369 return fmt.Errorf("failed to save bundle: %w", err) 370 } 371 372 bundle.ContentHash = uncompressedHash 373 bundle.CompressedHash = compressedHash 374 bundle.UncompressedSize = uncompressedSize 375 bundle.CompressedSize = compressedSize 376 bundle.CreatedAt = time.Now().UTC() 377 378 // Get parent 379 var parent string 380 if bundle.BundleNumber > 1 { 381 prevBundle := m.index.GetLastBundle() 382 if prevBundle != nil { 383 parent = prevBundle.Hash 384 } else { 385 if prevMeta, err := m.index.GetBundle(bundle.BundleNumber - 1); err == nil { 386 parent = prevMeta.Hash 387 } 388 } 389 } 390 391 bundle.Parent = parent 392 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 393 394 // Add to index 395 m.index.AddBundle(bundle.ToMetadata()) 396 397 // Save index 398 if err := m.SaveIndex(); err != nil { 399 return fmt.Errorf("failed to save index: %w", err) 400 } 401 402 // Clean up old mempool (silent unless verbose) 403 oldMempoolFile := m.mempool.GetFilename() 404 if err := m.mempool.Delete(); err != nil && !quiet { 405 m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err) 406 } 407 408 // Create new mempool 409 nextBundle := bundle.BundleNumber + 1 410 minTimestamp := bundle.EndTime 411 412 newMempool, err := NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger) 413 if err != nil { 414 return fmt.Errorf("failed to create new mempool: %w", err) 415 } 416 417 m.mempool = newMempool 418 419 // Update DID index if enabled 420 if m.didIndex != nil && m.didIndex.Exists() { 421 if err := m.UpdateDIDIndexForBundle(ctx, bundle); err != nil { 422 m.logger.Printf("Warning: failed to update DID index: %v", err) 423 } 424 } 425 426 return nil 427} 428 429// FetchNextBundle fetches the next bundle from PLC directory 430func (m *Manager) FetchNextBundle(ctx context.Context, quiet bool) (*Bundle, error) { 431 if m.plcClient == nil { 432 return nil, fmt.Errorf("PLC client not configured") 433 } 434 435 lastBundle := m.index.GetLastBundle() 436 nextBundleNum := 1 437 var afterTime string 438 var prevBoundaryCIDs map[string]bool 439 var prevBundleHash string 440 441 if lastBundle != nil { 442 nextBundleNum = lastBundle.BundleNumber + 1 443 afterTime = lastBundle.EndTime.Format(time.RFC3339Nano) 444 prevBundleHash = lastBundle.Hash 445 446 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber) 447 if err == nil { 448 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations) 449 } 450 } 451 452 if !quiet { 453 m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count()) 454 } 455 456 for m.mempool.Count() < BUNDLE_SIZE { 457 if !quiet { 458 m.logger.Printf("Fetching more operations (have %d/%d)...", m.mempool.Count(), BUNDLE_SIZE) 459 } 460 461 err := m.fetchToMempool(ctx, afterTime, prevBoundaryCIDs, BUNDLE_SIZE-m.mempool.Count(), quiet) 462 if err != nil { 463 if m.mempool.Count() >= BUNDLE_SIZE { 464 break 465 } 466 m.mempool.Save() 467 return nil, fmt.Errorf("insufficient operations: have %d, need %d", m.mempool.Count(), BUNDLE_SIZE) 468 } 469 470 if m.mempool.Count() < BUNDLE_SIZE { 471 m.mempool.Save() 472 return nil, fmt.Errorf("insufficient operations: have %d, need %d (no more available)", m.mempool.Count(), BUNDLE_SIZE) 473 } 474 } 475 476 if !quiet { 477 m.logger.Printf("Creating bundle %06d from mempool", nextBundleNum) 478 } 479 operations, err := m.mempool.Take(BUNDLE_SIZE) 480 if err != nil { 481 m.mempool.Save() 482 return nil, fmt.Errorf("failed to take operations from mempool: %w", err) 483 } 484 485 bundle := m.operations.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash) 486 487 if err := m.mempool.Save(); err != nil { 488 m.logger.Printf("Warning: failed to save mempool: %v", err) 489 } 490 491 if !quiet { 492 m.logger.Printf("✓ Bundle %06d ready (%d ops, mempool: %d remaining)", 493 nextBundleNum, len(operations), m.mempool.Count()) 494 } 495 496 return bundle, nil 497} 498 499// fetchToMempool fetches operations and adds them to mempool (returns error if no progress) 500func (m *Manager) fetchToMempool(ctx context.Context, afterTime string, prevBoundaryCIDs map[string]bool, target int, quiet bool) error { 501 seenCIDs := make(map[string]bool) 502 503 // Mark previous boundary CIDs as seen 504 for cid := range prevBoundaryCIDs { 505 seenCIDs[cid] = true 506 } 507 508 // Use last mempool time if available 509 if m.mempool.Count() > 0 { 510 afterTime = m.mempool.GetLastTime() 511 if !quiet { 512 m.logger.Printf(" Continuing from mempool cursor: %s", afterTime) 513 } 514 } 515 516 currentAfter := afterTime 517 maxFetches := 20 518 totalAdded := 0 519 startingCount := m.mempool.Count() 520 521 for fetchNum := 0; fetchNum < maxFetches; fetchNum++ { 522 // Calculate batch size 523 remaining := target - (m.mempool.Count() - startingCount) 524 if remaining <= 0 { 525 break 526 } 527 528 batchSize := 1000 529 if remaining < 500 { 530 batchSize = 200 531 } 532 533 if !quiet { 534 m.logger.Printf(" Fetch #%d: requesting %d operations (mempool: %d)", 535 fetchNum+1, batchSize, m.mempool.Count()) 536 } 537 538 batch, err := m.plcClient.Export(ctx, plc.ExportOptions{ 539 Count: batchSize, 540 After: currentAfter, 541 }) 542 if err != nil { 543 m.mempool.Save() 544 return fmt.Errorf("export failed: %w", err) 545 } 546 547 if len(batch) == 0 { 548 if !quiet { 549 m.logger.Printf(" No more operations available from PLC") 550 } 551 m.mempool.Save() 552 if totalAdded > 0 { 553 return nil 554 } 555 return fmt.Errorf("no operations available") 556 } 557 558 // Deduplicate 559 uniqueOps := make([]plc.PLCOperation, 0) 560 for _, op := range batch { 561 if !seenCIDs[op.CID] { 562 seenCIDs[op.CID] = true 563 uniqueOps = append(uniqueOps, op) 564 } 565 } 566 567 if len(uniqueOps) > 0 { 568 added, err := m.mempool.Add(uniqueOps) 569 if err != nil { 570 m.mempool.Save() 571 return fmt.Errorf("chronological validation failed: %w", err) 572 } 573 574 totalAdded += added 575 if !quiet { 576 m.logger.Printf(" Added %d new operations (mempool now: %d)", added, m.mempool.Count()) 577 } 578 } 579 580 // Update cursor 581 if len(batch) > 0 { 582 currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano) 583 } 584 585 // Stop if we got less than requested 586 if len(batch) < batchSize { 587 if !quiet { 588 m.logger.Printf(" Received incomplete batch (%d/%d), caught up to latest", len(batch), batchSize) 589 } 590 break 591 } 592 } 593 594 if totalAdded > 0 { 595 if !quiet { 596 m.logger.Printf("✓ Fetch complete: added %d operations (mempool: %d)", totalAdded, m.mempool.Count()) 597 } 598 return nil 599 } 600 601 return fmt.Errorf("no new operations added") 602} 603 604// GetMempoolStats returns mempool statistics 605func (m *Manager) GetMempoolStats() map[string]interface{} { 606 return m.mempool.Stats() 607} 608 609// GetMempoolOperations returns all operations currently in mempool 610func (m *Manager) GetMempoolOperations() ([]plc.PLCOperation, error) { 611 if m.mempool == nil { 612 return nil, fmt.Errorf("mempool not initialized") 613 } 614 615 // Use Peek to get operations without removing them 616 count := m.mempool.Count() 617 if count == 0 { 618 return []plc.PLCOperation{}, nil 619 } 620 621 return m.mempool.Peek(count), nil 622} 623 624// VerifyBundle verifies a bundle's integrity 625func (m *Manager) VerifyBundle(ctx context.Context, bundleNumber int) (*VerificationResult, error) { 626 result := &VerificationResult{ 627 BundleNumber: bundleNumber, 628 } 629 630 // Get from index 631 meta, err := m.index.GetBundle(bundleNumber) 632 if err != nil { 633 result.Error = err 634 return result, nil 635 } 636 637 result.ExpectedHash = meta.CompressedHash 638 639 // Check file exists 640 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 641 result.FileExists = m.operations.FileExists(path) 642 if !result.FileExists { 643 result.Error = fmt.Errorf("file not found") 644 return result, nil 645 } 646 647 // Verify BOTH compressed and content hashes 648 compHash, compSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path) 649 if err != nil { 650 result.Error = err 651 return result, nil 652 } 653 654 result.LocalHash = compHash 655 656 // Verify compressed hash 657 if compHash != meta.CompressedHash { 658 result.HashMatch = false 659 result.Valid = false 660 result.Error = fmt.Errorf("compressed hash mismatch: expected %s, got %s", meta.CompressedHash, compHash) 661 return result, nil 662 } 663 664 // Verify content hash 665 if contentHash != meta.ContentHash { 666 result.HashMatch = false 667 result.Valid = false 668 result.Error = fmt.Errorf("content hash mismatch: expected %s, got %s", meta.ContentHash, contentHash) 669 return result, nil 670 } 671 672 // Verify sizes match 673 if compSize != meta.CompressedSize { 674 result.Valid = false 675 result.Error = fmt.Errorf("compressed size mismatch: expected %d, got %d", meta.CompressedSize, compSize) 676 return result, nil 677 } 678 679 if contentSize != meta.UncompressedSize { 680 result.Valid = false 681 result.Error = fmt.Errorf("uncompressed size mismatch: expected %d, got %d", meta.UncompressedSize, contentSize) 682 return result, nil 683 } 684 685 result.HashMatch = true 686 result.Valid = true 687 688 return result, nil 689} 690 691// VerifyChain verifies the entire bundle chain 692func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) { 693 result := &ChainVerificationResult{ 694 VerifiedBundles: make([]int, 0), 695 } 696 697 bundles := m.index.GetBundles() 698 if len(bundles) == 0 { 699 result.Valid = true 700 return result, nil 701 } 702 703 result.ChainLength = len(bundles) 704 705 for i, meta := range bundles { 706 // Verify file hash 707 vr, err := m.VerifyBundle(ctx, meta.BundleNumber) 708 if err != nil || !vr.Valid { 709 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber) 710 result.BrokenAt = meta.BundleNumber 711 return result, nil 712 } 713 714 // Verify chain link 715 if i > 0 { 716 prevMeta := bundles[i-1] 717 718 // Check parent reference 719 if meta.Parent != prevMeta.Hash { 720 result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber) 721 result.BrokenAt = meta.BundleNumber 722 return result, nil 723 } 724 725 // Verify chain hash calculation 726 expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash) 727 if meta.Hash != expectedHash { 728 result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber) 729 result.BrokenAt = meta.BundleNumber 730 return result, nil 731 } 732 } 733 734 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber) 735 } 736 737 result.Valid = true 738 return result, nil 739} 740 741// ScanDirectory scans the bundle directory and rebuilds the index 742func (m *Manager) ScanDirectory() (*DirectoryScanResult, error) { 743 result := &DirectoryScanResult{ 744 BundleDir: m.config.BundleDir, 745 } 746 747 m.logger.Printf("Scanning directory: %s", m.config.BundleDir) 748 749 // Find all bundle files 750 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst")) 751 if err != nil { 752 return nil, fmt.Errorf("failed to scan directory: %w", err) 753 } 754 files = filterBundleFiles(files) 755 756 if len(files) == 0 { 757 m.logger.Printf("No bundle files found") 758 return result, nil 759 } 760 761 // Parse bundle numbers 762 var bundleNumbers []int 763 for _, file := range files { 764 base := filepath.Base(file) 765 numStr := strings.TrimSuffix(base, ".jsonl.zst") 766 num, err := strconv.Atoi(numStr) 767 if err != nil { 768 m.logger.Printf("Warning: skipping invalid filename: %s", base) 769 continue 770 } 771 bundleNumbers = append(bundleNumbers, num) 772 } 773 774 sort.Ints(bundleNumbers) 775 776 result.BundleCount = len(bundleNumbers) 777 if len(bundleNumbers) > 0 { 778 result.FirstBundle = bundleNumbers[0] 779 result.LastBundle = bundleNumbers[len(bundleNumbers)-1] 780 } 781 782 // Find gaps 783 if len(bundleNumbers) > 1 { 784 for i := result.FirstBundle; i <= result.LastBundle; i++ { 785 found := false 786 for _, num := range bundleNumbers { 787 if num == i { 788 found = true 789 break 790 } 791 } 792 if !found { 793 result.MissingGaps = append(result.MissingGaps, i) 794 } 795 } 796 } 797 798 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps)) 799 800 // Load each bundle and rebuild index 801 var newMetadata []*BundleMetadata 802 var totalSize int64 803 804 for _, num := range bundleNumbers { 805 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 806 807 // Load bundle 808 ops, err := m.operations.LoadBundle(path) 809 if err != nil { 810 m.logger.Printf("Warning: failed to load bundle %d: %v", num, err) 811 continue 812 } 813 814 // Get file size 815 size, _ := m.operations.GetFileSize(path) 816 totalSize += size 817 818 // Calculate parent and cursor from previous bundle 819 var parent string 820 var cursor string 821 if num > 1 && len(newMetadata) > 0 { 822 prevMeta := newMetadata[len(newMetadata)-1] 823 parent = prevMeta.Hash 824 cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 825 } 826 827 // Use the ONE method for metadata calculation 828 meta, err := m.operations.CalculateBundleMetadata(num, path, ops, parent, cursor) 829 if err != nil { 830 m.logger.Printf("Warning: failed to calculate metadata for bundle %d: %v", num, err) 831 continue 832 } 833 834 newMetadata = append(newMetadata, meta) 835 836 m.logger.Printf(" Scanned bundle %06d: %d ops, %d DIDs", num, len(ops), meta.DIDCount) 837 } 838 839 result.TotalSize = totalSize 840 841 // Rebuild index 842 m.index.Rebuild(newMetadata) 843 844 // Save index 845 if err := m.SaveIndex(); err != nil { 846 return nil, fmt.Errorf("failed to save index: %w", err) 847 } 848 849 result.IndexUpdated = true 850 851 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata)) 852 853 return result, nil 854} 855 856// ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index 857func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*DirectoryScanResult, error) { 858 result := &DirectoryScanResult{ 859 BundleDir: m.config.BundleDir, 860 } 861 862 m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir) 863 864 // Find all bundle files 865 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst")) 866 if err != nil { 867 return nil, fmt.Errorf("failed to scan directory: %w", err) 868 } 869 files = filterBundleFiles(files) 870 871 if len(files) == 0 { 872 m.logger.Printf("No bundle files found") 873 return result, nil 874 } 875 876 // Parse bundle numbers 877 var bundleNumbers []int 878 for _, file := range files { 879 base := filepath.Base(file) 880 numStr := strings.TrimSuffix(base, ".jsonl.zst") 881 num, err := strconv.Atoi(numStr) 882 if err != nil { 883 m.logger.Printf("Warning: skipping invalid filename: %s", base) 884 continue 885 } 886 bundleNumbers = append(bundleNumbers, num) 887 } 888 889 sort.Ints(bundleNumbers) 890 891 result.BundleCount = len(bundleNumbers) 892 if len(bundleNumbers) > 0 { 893 result.FirstBundle = bundleNumbers[0] 894 result.LastBundle = bundleNumbers[len(bundleNumbers)-1] 895 } 896 897 // Find gaps 898 if len(bundleNumbers) > 1 { 899 for i := result.FirstBundle; i <= result.LastBundle; i++ { 900 found := false 901 for _, num := range bundleNumbers { 902 if num == i { 903 found = true 904 break 905 } 906 } 907 if !found { 908 result.MissingGaps = append(result.MissingGaps, i) 909 } 910 } 911 } 912 913 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps)) 914 915 // Process bundles in parallel 916 type bundleResult struct { 917 index int 918 meta *BundleMetadata 919 err error 920 } 921 922 jobs := make(chan int, len(bundleNumbers)) 923 results := make(chan bundleResult, len(bundleNumbers)) 924 925 // Start workers 926 var wg sync.WaitGroup 927 for w := 0; w < workers; w++ { 928 wg.Add(1) 929 go func() { 930 defer wg.Done() 931 for num := range jobs { 932 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 933 934 // Load and process bundle 935 ops, err := m.operations.LoadBundle(path) 936 if err != nil { 937 results <- bundleResult{index: num, err: err} 938 continue 939 } 940 941 // Use the FAST method (cursor will be set later in sequential phase) 942 meta, err := m.operations.CalculateBundleMetadataFast(num, path, ops, "") 943 if err != nil { 944 results <- bundleResult{index: num, err: err} 945 continue 946 } 947 948 results <- bundleResult{index: num, meta: meta} 949 } 950 }() 951 } 952 953 // Send jobs 954 for _, num := range bundleNumbers { 955 jobs <- num 956 } 957 close(jobs) 958 959 // Wait for all workers to finish 960 go func() { 961 wg.Wait() 962 close(results) 963 }() 964 965 // Collect results (in a map first, then sort) 966 metadataMap := make(map[int]*BundleMetadata) 967 var totalSize int64 968 var totalUncompressed int64 969 processed := 0 970 971 for result := range results { 972 processed++ 973 974 // Update progress WITH bytes 975 if progressCallback != nil { 976 if result.meta != nil { 977 totalUncompressed += result.meta.UncompressedSize 978 } 979 progressCallback(processed, len(bundleNumbers), totalUncompressed) 980 } 981 982 if result.err != nil { 983 m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err) 984 continue 985 } 986 metadataMap[result.index] = result.meta 987 totalSize += result.meta.CompressedSize 988 } 989 990 // Build ordered metadata slice and calculate chain hashes 991 var newMetadata []*BundleMetadata 992 var parent string // Parent chain hash 993 994 for i, num := range bundleNumbers { 995 meta, ok := metadataMap[num] 996 if !ok { 997 continue // Skip failed bundles 998 } 999 1000 // Set cursor from previous bundle's EndTime 1001 if i > 0 && len(newMetadata) > 0 { 1002 prevMeta := newMetadata[len(newMetadata)-1] 1003 meta.Cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 1004 } 1005 1006 // Now calculate chain hash (must be done sequentially) 1007 meta.Hash = m.operations.CalculateChainHash(parent, meta.ContentHash) 1008 meta.Parent = parent 1009 1010 newMetadata = append(newMetadata, meta) 1011 parent = meta.Hash // Store for next iteration 1012 } 1013 1014 result.TotalSize = totalSize 1015 result.TotalUncompressed = totalUncompressed 1016 1017 // Rebuild index 1018 m.index.Rebuild(newMetadata) 1019 1020 // Save index 1021 if err := m.SaveIndex(); err != nil { 1022 return nil, fmt.Errorf("failed to save index: %w", err) 1023 } 1024 1025 result.IndexUpdated = true 1026 1027 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata)) 1028 1029 return result, nil 1030} 1031 1032// GetInfo returns information about the bundle manager 1033func (m *Manager) GetInfo() map[string]interface{} { 1034 stats := m.index.GetStats() 1035 stats["bundle_dir"] = m.config.BundleDir 1036 stats["index_path"] = m.indexPath 1037 stats["verify_on_load"] = m.config.VerifyOnLoad 1038 return stats 1039} 1040 1041// ExportOperations exports operations from bundles 1042func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plc.PLCOperation, error) { 1043 if count <= 0 { 1044 count = 1000 1045 } 1046 1047 var result []plc.PLCOperation 1048 seenCIDs := make(map[string]bool) 1049 1050 bundles := m.index.GetBundles() 1051 1052 for _, meta := range bundles { 1053 if result != nil && len(result) >= count { 1054 break 1055 } 1056 1057 // Skip bundles before afterTime 1058 if !afterTime.IsZero() && meta.EndTime.Before(afterTime) { 1059 continue 1060 } 1061 1062 // Load bundle 1063 bundle, err := m.LoadBundle(ctx, meta.BundleNumber) 1064 if err != nil { 1065 m.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err) 1066 continue 1067 } 1068 1069 // Add operations 1070 for _, op := range bundle.Operations { 1071 if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) { 1072 continue 1073 } 1074 1075 if seenCIDs[op.CID] { 1076 continue 1077 } 1078 1079 seenCIDs[op.CID] = true 1080 result = append(result, op) 1081 1082 if len(result) >= count { 1083 break 1084 } 1085 } 1086 } 1087 1088 return result, nil 1089} 1090 1091// ScanBundle scans a single bundle file and returns its metadata 1092func (m *Manager) ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) { 1093 // Load bundle file 1094 operations, err := m.operations.LoadBundle(path) 1095 if err != nil { 1096 return nil, fmt.Errorf("failed to load bundle: %w", err) 1097 } 1098 1099 if len(operations) == 0 { 1100 return nil, fmt.Errorf("bundle is empty") 1101 } 1102 1103 // Get parent chain hash and cursor from previous bundle 1104 var parent string 1105 var cursor string 1106 if bundleNumber > 1 { 1107 if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil { 1108 parent = prevMeta.Hash 1109 cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 1110 } 1111 } 1112 1113 // Use the ONE method 1114 return m.operations.CalculateBundleMetadata(bundleNumber, path, operations, parent, cursor) 1115} 1116 1117// ScanAndIndexBundle scans a bundle file and adds it to the index 1118func (m *Manager) ScanAndIndexBundle(path string, bundleNumber int) (*BundleMetadata, error) { 1119 meta, err := m.ScanBundle(path, bundleNumber) 1120 if err != nil { 1121 return nil, err 1122 } 1123 1124 // Add to index 1125 m.index.AddBundle(meta) 1126 1127 // Save index 1128 if err := m.SaveIndex(); err != nil { 1129 return nil, fmt.Errorf("failed to save index: %w", err) 1130 } 1131 1132 return meta, nil 1133} 1134 1135// IsBundleIndexed checks if a bundle is already in the index 1136func (m *Manager) IsBundleIndexed(bundleNumber int) bool { 1137 _, err := m.index.GetBundle(bundleNumber) 1138 return err == nil 1139} 1140 1141// RefreshMempool reloads mempool from disk (useful for debugging) 1142func (m *Manager) RefreshMempool() error { 1143 if m.mempool == nil { 1144 return fmt.Errorf("mempool not initialized") 1145 } 1146 return m.mempool.Load() 1147} 1148 1149// ClearMempool clears all operations from the mempool and saves 1150func (m *Manager) ClearMempool() error { 1151 if m.mempool == nil { 1152 return fmt.Errorf("mempool not initialized") 1153 } 1154 1155 m.logger.Printf("Clearing mempool...") 1156 1157 // Get count before clearing 1158 count := m.mempool.Count() 1159 1160 // Clear the mempool 1161 m.mempool.Clear() 1162 1163 // Save the empty state (this will delete the file since it's empty) 1164 if err := m.mempool.Save(); err != nil { 1165 return fmt.Errorf("failed to save mempool: %w", err) 1166 } 1167 1168 m.logger.Printf("Cleared %d operations from mempool", count) 1169 1170 return nil 1171} 1172 1173// Add validation method 1174func (m *Manager) ValidateMempool() error { 1175 if m.mempool == nil { 1176 return fmt.Errorf("mempool not initialized") 1177 } 1178 return m.mempool.Validate() 1179} 1180 1181// StreamBundleRaw streams the raw compressed bundle file 1182func (m *Manager) StreamBundleRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 1183 // Get metadata from index 1184 meta, err := m.index.GetBundle(bundleNumber) 1185 if err != nil { 1186 return nil, fmt.Errorf("bundle not in index: %w", err) 1187 } 1188 1189 // Build file path 1190 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 1191 if !m.operations.FileExists(path) { 1192 return nil, fmt.Errorf("bundle file not found: %s", path) 1193 } 1194 1195 // Optionally verify hash before streaming 1196 if m.config.VerifyOnLoad { 1197 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 1198 if err != nil { 1199 return nil, fmt.Errorf("failed to verify hash: %w", err) 1200 } 1201 if !valid { 1202 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 1203 } 1204 } 1205 1206 return m.operations.StreamRaw(path) 1207} 1208 1209// StreamBundleDecompressed streams the decompressed bundle data as JSONL 1210func (m *Manager) StreamBundleDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) { 1211 // Get metadata from index 1212 _, err := m.index.GetBundle(bundleNumber) 1213 if err != nil { 1214 return nil, fmt.Errorf("bundle not in index: %w", err) 1215 } 1216 1217 // Build file path 1218 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 1219 if !m.operations.FileExists(path) { 1220 return nil, fmt.Errorf("bundle file not found: %s", path) 1221 } 1222 1223 return m.operations.StreamDecompressed(path) 1224} 1225 1226// RefreshIndex reloads the index from disk if it has been modified 1227func (m *Manager) RefreshIndex() error { 1228 // Check if index file has been modified 1229 info, err := os.Stat(m.indexPath) 1230 if err != nil { 1231 return err 1232 } 1233 1234 // If index was modified after we loaded it, reload 1235 if info.ModTime().After(m.index.UpdatedAt) { 1236 m.logger.Printf("Index file modified, reloading...") 1237 1238 newIndex, err := LoadIndex(m.indexPath) 1239 if err != nil { 1240 return fmt.Errorf("failed to reload index: %w", err) 1241 } 1242 1243 m.index = newIndex 1244 m.logger.Printf("Index reloaded: %d bundles", m.index.Count()) 1245 } 1246 1247 return nil 1248} 1249 1250// filterBundleFiles filters out files starting with . or _ (system/temp files) 1251func filterBundleFiles(files []string) []string { 1252 filtered := make([]string, 0, len(files)) 1253 for _, file := range files { 1254 basename := filepath.Base(file) 1255 // Skip files starting with . or _ 1256 if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') { 1257 continue 1258 } 1259 filtered = append(filtered, file) 1260 } 1261 return filtered 1262} 1263 1264// GetMempool returns the current mempool (for manual save operations) 1265func (m *Manager) GetMempool() *Mempool { 1266 return m.mempool 1267} 1268 1269// SaveMempool saves the current mempool state to disk 1270func (m *Manager) SaveMempool() error { 1271 if m.mempool == nil { 1272 return fmt.Errorf("mempool not initialized") 1273 } 1274 return m.mempool.Save() 1275} 1276 1277// GetPLCOrigin returns the PLC directory origin URL (empty if not configured) 1278func (m *Manager) GetPLCOrigin() string { 1279 if m.plcClient == nil { 1280 return "" 1281 } 1282 return m.plcClient.GetBaseURL() 1283} 1284 1285// GetCurrentCursor returns the current latest cursor position (including mempool) 1286// Cursor format: (bundleNumber × BUNDLE_SIZE) + position 1287func (m *Manager) GetCurrentCursor() int { 1288 index := m.GetIndex() 1289 bundles := index.GetBundles() 1290 cursor := len(bundles) * BUNDLE_SIZE 1291 1292 // Add mempool operations to get true latest position 1293 mempoolStats := m.GetMempoolStats() 1294 if count, ok := mempoolStats["count"].(int); ok { 1295 cursor += count 1296 } 1297 1298 return cursor 1299} 1300 1301// GetDIDIndex returns the DID index manager 1302func (m *Manager) GetDIDIndex() *DIDIndexManager { 1303 return m.didIndex 1304} 1305 1306// LoadOperation loads a single operation from a bundle efficiently 1307// This is much faster than LoadBundle() when you only need one operation 1308func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plc.PLCOperation, error) { 1309 // Validate bundle exists in index 1310 meta, err := m.index.GetBundle(bundleNumber) 1311 if err != nil { 1312 return nil, fmt.Errorf("bundle not in index: %w", err) 1313 } 1314 1315 // Validate position 1316 if position < 0 || position >= BUNDLE_SIZE { 1317 return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, BUNDLE_SIZE-1) 1318 } 1319 1320 // Build file path 1321 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber)) 1322 if !m.operations.FileExists(path) { 1323 return nil, fmt.Errorf("bundle file not found: %s", path) 1324 } 1325 1326 // Verify hash if enabled (same as LoadBundle) 1327 if m.config.VerifyOnLoad { 1328 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash) 1329 if err != nil { 1330 return nil, fmt.Errorf("failed to verify hash: %w", err) 1331 } 1332 if !valid { 1333 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash) 1334 } 1335 } 1336 1337 // Load just the one operation (efficient!) 1338 return m.operations.LoadOperationAtPosition(path, position) 1339}