A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package bundle
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log"
8 "os"
9 "path/filepath"
10 "runtime"
11 "sort"
12 "strconv"
13 "strings"
14 "sync"
15 "time"
16
17 "tangled.org/atscan.net/plcbundle/plc"
18)
19
20// defaultLogger is a simple logger implementation
21type defaultLogger struct{}
22
23func (d defaultLogger) Printf(format string, v ...interface{}) {
24 log.Printf(format, v...)
25}
26
27func (d defaultLogger) Println(v ...interface{}) {
28 log.Println(v...)
29}
30
31// Manager handles bundle operations
32type Manager struct {
33 config *Config
34 operations *Operations
35 index *Index
36 indexPath string
37 plcClient *plc.Client
38 logger Logger
39 mempool *Mempool
40}
41
42// NewManager creates a new bundle manager
43func NewManager(config *Config, plcClient *plc.Client) (*Manager, error) {
44 if config == nil {
45 config = DefaultConfig("./plc_bundles")
46 }
47
48 if config.Logger == nil {
49 config.Logger = defaultLogger{}
50 }
51
52 // Ensure directory exists
53 if err := os.MkdirAll(config.BundleDir, 0755); err != nil {
54 return nil, fmt.Errorf("failed to create bundle directory: %w", err)
55 }
56
57 // Initialize operations handler
58 ops, err := NewOperations(config.Logger)
59 if err != nil {
60 return nil, fmt.Errorf("failed to initialize operations: %w", err)
61 }
62
63 // Load or create index
64 indexPath := filepath.Join(config.BundleDir, INDEX_FILE)
65 index, err := LoadIndex(indexPath)
66
67 // Check for bundle files in directory
68 bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst"))
69 bundleFiles = filterBundleFiles(bundleFiles)
70 hasBundleFiles := len(bundleFiles) > 0
71
72 // Check if clone/download is in progress (look for .tmp files)
73 tmpFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.tmp"))
74 cloneInProgress := len(tmpFiles) > 0
75
76 needsRebuild := false
77
78 if err != nil {
79 // Index doesn't exist or is invalid
80 if hasBundleFiles {
81 if cloneInProgress {
82 config.Logger.Printf("Clone/download in progress, skipping auto-rebuild")
83 } else {
84 // We have bundles but no index - need to rebuild
85 config.Logger.Printf("No valid index found, but detected %d bundle files", len(bundleFiles))
86 needsRebuild = true
87 }
88 } else {
89 // No index and no bundles - create fresh index
90 config.Logger.Printf("Creating new index at %s", indexPath)
91 index = NewIndex()
92 if err := index.Save(indexPath); err != nil {
93 return nil, fmt.Errorf("failed to save new index: %w", err)
94 }
95 }
96 } else {
97 // Index exists - check if it's complete
98 config.Logger.Printf("Loaded index with %d bundles", index.Count())
99
100 // Check if there are bundle files not in the index
101 if hasBundleFiles && len(bundleFiles) > index.Count() {
102 if cloneInProgress {
103 config.Logger.Printf("Clone/download in progress (%d .tmp files), skipping auto-rebuild", len(tmpFiles))
104 } else {
105 config.Logger.Printf("Detected %d bundle files but index only has %d entries - rebuilding",
106 len(bundleFiles), index.Count())
107 needsRebuild = true
108 }
109 }
110 }
111
112 // Perform rebuild if needed (using parallel scan)
113 if needsRebuild && config.AutoRebuild {
114 config.Logger.Printf("Rebuilding index from %d bundle files...", len(bundleFiles))
115
116 // Create temporary manager for scanning
117 tempMgr := &Manager{
118 config: config,
119 operations: ops,
120 index: NewIndex(),
121 indexPath: indexPath,
122 logger: config.Logger,
123 }
124
125 // Use parallel scan with auto-detected CPU count
126 workers := config.RebuildWorkers
127 if workers <= 0 {
128 workers = runtime.NumCPU()
129 if workers < 1 {
130 workers = 1
131 }
132 }
133
134 config.Logger.Printf("Using %d workers for parallel scan", workers)
135
136 // Create progress callback wrapper with new signature
137 var progressCallback func(current, total int, bytesProcessed int64)
138 if config.RebuildProgress != nil {
139 // Wrap the old-style callback to work with new signature
140 oldCallback := config.RebuildProgress
141 progressCallback = func(current, total int, bytesProcessed int64) {
142 oldCallback(current, total)
143 }
144 } else {
145 // Default: log every 100 bundles
146 progressCallback = func(current, total int, bytesProcessed int64) {
147 if current%100 == 0 || current == total {
148 mbProcessed := float64(bytesProcessed) / (1024 * 1024)
149 config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%), %.1f MB processed",
150 current, total, float64(current)/float64(total)*100, mbProcessed)
151 }
152 }
153 }
154
155 start := time.Now()
156
157 // Scan directory to rebuild index (parallel)
158 result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback)
159 if err != nil {
160 return nil, fmt.Errorf("failed to rebuild index: %w", err)
161 }
162
163 elapsed := time.Since(start)
164
165 // Reload the rebuilt index
166 index, err = LoadIndex(indexPath)
167 if err != nil {
168 return nil, fmt.Errorf("failed to load rebuilt index: %w", err)
169 }
170
171 // Calculate throughput
172 mbPerSec := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024)
173
174 config.Logger.Printf("✓ Index rebuilt with %d bundles in %s",
175 index.Count(), elapsed.Round(time.Millisecond))
176 config.Logger.Printf(" Speed: %.1f bundles/sec, %.1f MB/s (uncompressed)",
177 float64(result.BundleCount)/elapsed.Seconds(), mbPerSec)
178
179 // Verify all chain hashes are present
180 bundles := index.GetBundles()
181 missingHashes := 0
182 for i, meta := range bundles {
183 if meta.ContentHash == "" {
184 missingHashes++
185 }
186 if i > 0 && meta.Hash == "" {
187 missingHashes++
188 }
189 }
190 if missingHashes > 0 {
191 config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes)
192 }
193 }
194
195 if index == nil {
196 index = NewIndex()
197 }
198
199 // Initialize mempool for next bundle
200 lastBundle := index.GetLastBundle()
201 nextBundleNum := 1
202 var minTimestamp time.Time
203
204 if lastBundle != nil {
205 nextBundleNum = lastBundle.BundleNumber + 1
206 minTimestamp = lastBundle.EndTime
207 }
208
209 mempool, err := NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger)
210 if err != nil {
211 return nil, fmt.Errorf("failed to initialize mempool: %w", err)
212 }
213
214 return &Manager{
215 config: config,
216 operations: ops,
217 index: index,
218 indexPath: indexPath,
219 plcClient: plcClient,
220 logger: config.Logger,
221 mempool: mempool,
222 }, nil
223}
224
225// Close cleans up resources
226func (m *Manager) Close() {
227 if m.operations != nil {
228 m.operations.Close()
229 }
230 if m.plcClient != nil {
231 m.plcClient.Close()
232 }
233 if m.mempool != nil {
234 if err := m.mempool.Save(); err != nil {
235 m.logger.Printf("Warning: failed to save mempool: %v", err)
236 }
237 }
238}
239
240// GetIndex returns the current index
241func (m *Manager) GetIndex() *Index {
242 return m.index
243}
244
245// SaveIndex saves the index to disk
246func (m *Manager) SaveIndex() error {
247 return m.index.Save(m.indexPath)
248}
249
250// LoadBundle loads a bundle from disk
251func (m *Manager) LoadBundle(ctx context.Context, bundleNumber int) (*Bundle, error) {
252 // Get metadata from index
253 meta, err := m.index.GetBundle(bundleNumber)
254 if err != nil {
255 return nil, fmt.Errorf("bundle not in index: %w", err)
256 }
257
258 // Load file
259 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
260 if !m.operations.FileExists(path) {
261 return nil, fmt.Errorf("bundle file not found: %s", path)
262 }
263
264 // Verify hash if enabled
265 if m.config.VerifyOnLoad {
266 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
267 if err != nil {
268 return nil, fmt.Errorf("failed to verify hash: %w", err)
269 }
270 if !valid {
271 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
272 }
273 }
274
275 // Load operations
276 operations, err := m.operations.LoadBundle(path)
277 if err != nil {
278 return nil, fmt.Errorf("failed to load bundle: %w", err)
279 }
280
281 // Create bundle struct
282 bundle := &Bundle{
283 BundleNumber: meta.BundleNumber,
284 StartTime: meta.StartTime,
285 EndTime: meta.EndTime,
286 Operations: operations,
287 DIDCount: meta.DIDCount,
288 Hash: meta.Hash, // Chain hash (primary)
289 ContentHash: meta.ContentHash, // Content hash
290 Parent: meta.Parent, // Parent chain hash
291 CompressedHash: meta.CompressedHash,
292 CompressedSize: meta.CompressedSize,
293 UncompressedSize: meta.UncompressedSize,
294 Cursor: meta.Cursor,
295 Compressed: true,
296 CreatedAt: meta.CreatedAt,
297 }
298
299 return bundle, nil
300}
301
302// SaveBundle saves a bundle to disk and updates the index
303func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle) error {
304 if err := bundle.ValidateForSave(); err != nil {
305 return fmt.Errorf("bundle validation failed: %w", err)
306 }
307
308 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber))
309
310 // Save to disk
311 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations)
312 if err != nil {
313 return fmt.Errorf("failed to save bundle: %w", err)
314 }
315
316 // Set content hash and compressed metadata
317 bundle.ContentHash = uncompressedHash
318 bundle.CompressedHash = compressedHash
319 bundle.UncompressedSize = uncompressedSize
320 bundle.CompressedSize = compressedSize
321 bundle.CreatedAt = time.Now().UTC()
322
323 // Get parent (previous chain hash)
324 var parent string
325 if bundle.BundleNumber > 1 {
326 prevBundle := m.index.GetLastBundle()
327 if prevBundle != nil {
328 parent = prevBundle.Hash // Get previous chain hash
329
330 m.logger.Printf("Previous bundle %06d: hash=%s, content=%s",
331 prevBundle.BundleNumber,
332 formatHashPreview(prevBundle.Hash),
333 formatHashPreview(prevBundle.ContentHash))
334 } else {
335 // Try to get specific previous bundle
336 if prevMeta, err := m.index.GetBundle(bundle.BundleNumber - 1); err == nil {
337 parent = prevMeta.Hash
338
339 m.logger.Printf("Found previous bundle %06d: hash=%s, content=%s",
340 prevMeta.BundleNumber,
341 formatHashPreview(prevMeta.Hash),
342 formatHashPreview(prevMeta.ContentHash))
343 }
344 }
345 }
346
347 bundle.Parent = parent
348
349 // Calculate chain hash (now stored as primary Hash)
350 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash)
351
352 m.logger.Printf("Bundle %06d: hash=%s, content=%s, parent=%s",
353 bundle.BundleNumber,
354 formatHashPreview(bundle.Hash),
355 formatHashPreview(bundle.ContentHash),
356 formatHashPreview(bundle.Parent))
357
358 // Add to index
359 m.index.AddBundle(bundle.ToMetadata())
360
361 // Save index
362 if err := m.SaveIndex(); err != nil {
363 return fmt.Errorf("failed to save index: %w", err)
364 }
365
366 m.logger.Printf("Saved bundle %06d (hash: %s)",
367 bundle.BundleNumber,
368 formatHashPreview(bundle.Hash))
369
370 // IMPORTANT: Clean up old mempool and create new one for next bundle
371 oldMempoolFile := m.mempool.GetFilename()
372 if err := m.mempool.Delete(); err != nil {
373 m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err)
374 } else {
375 m.logger.Printf("Deleted mempool: %s", oldMempoolFile)
376 }
377
378 // Create new mempool for next bundle
379 nextBundle := bundle.BundleNumber + 1
380 minTimestamp := bundle.EndTime
381
382 newMempool, err := NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger)
383 if err != nil {
384 return fmt.Errorf("failed to create new mempool: %w", err)
385 }
386
387 m.mempool = newMempool
388 m.logger.Printf("Created new mempool for bundle %06d (min timestamp: %s)",
389 nextBundle, minTimestamp.Format(time.RFC3339Nano))
390
391 return nil
392}
393
394// formatHashPreview safely formats a hash for display
395func formatHashPreview(hash string) string {
396 if len(hash) == 0 {
397 return "(empty)"
398 }
399 if len(hash) < 16 {
400 return hash
401 }
402 return hash[:16] + "..."
403}
404
405// FetchNextBundle fetches the next bundle from PLC directory
406func (m *Manager) FetchNextBundle(ctx context.Context) (*Bundle, error) {
407 if m.plcClient == nil {
408 return nil, fmt.Errorf("PLC client not configured")
409 }
410
411 // Determine next bundle number
412 lastBundle := m.index.GetLastBundle()
413 nextBundleNum := 1
414 var afterTime string
415 var prevBoundaryCIDs map[string]bool
416 var prevBundleHash string
417
418 if lastBundle != nil {
419 nextBundleNum = lastBundle.BundleNumber + 1
420 afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
421 prevBundleHash = lastBundle.Hash
422
423 // Try to load boundary CIDs from previous bundle
424 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
425 if err == nil {
426 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
427 }
428 }
429
430 m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
431
432 // Keep fetching until we have enough operations
433 for m.mempool.Count() < BUNDLE_SIZE {
434 m.logger.Printf("Fetching more operations (have %d/%d)...", m.mempool.Count(), BUNDLE_SIZE)
435
436 err := m.fetchToMempool(ctx, afterTime, prevBoundaryCIDs, BUNDLE_SIZE-m.mempool.Count())
437 if err != nil {
438 // If we can't fetch more, check if we have enough
439 if m.mempool.Count() >= BUNDLE_SIZE {
440 break // We have enough now
441 }
442 // Save current state and return error
443 m.mempool.Save()
444 return nil, fmt.Errorf("insufficient operations: have %d, need %d", m.mempool.Count(), BUNDLE_SIZE)
445 }
446
447 // Check if we made progress
448 if m.mempool.Count() < BUNDLE_SIZE {
449 // Didn't get enough, but got some - save and return error
450 m.mempool.Save()
451 return nil, fmt.Errorf("insufficient operations: have %d, need %d (no more available)", m.mempool.Count(), BUNDLE_SIZE)
452 }
453 }
454
455 // Create bundle from mempool
456 m.logger.Printf("Creating bundle %06d from mempool", nextBundleNum)
457 operations, err := m.mempool.Take(BUNDLE_SIZE) // ✅ Fixed - handle both return values
458 if err != nil {
459 m.mempool.Save()
460 return nil, fmt.Errorf("failed to take operations from mempool: %w", err)
461 }
462
463 bundle := m.operations.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash)
464
465 // Save mempool state
466 if err := m.mempool.Save(); err != nil {
467 m.logger.Printf("Warning: failed to save mempool: %v", err)
468 }
469
470 m.logger.Printf("✓ Bundle %06d ready (%d ops, mempool: %d remaining)",
471 nextBundleNum, len(operations), m.mempool.Count())
472
473 return bundle, nil
474}
475
476// fetchToMempool fetches operations and adds them to mempool (returns error if no progress)
477func (m *Manager) fetchToMempool(ctx context.Context, afterTime string, prevBoundaryCIDs map[string]bool, target int) error {
478 seenCIDs := make(map[string]bool)
479
480 // Mark previous boundary CIDs as seen
481 for cid := range prevBoundaryCIDs {
482 seenCIDs[cid] = true
483 }
484
485 // Use last mempool time if available
486 if m.mempool.Count() > 0 {
487 afterTime = m.mempool.GetLastTime()
488 m.logger.Printf(" Continuing from mempool cursor: %s", afterTime)
489 }
490
491 currentAfter := afterTime
492 maxFetches := 20
493 totalAdded := 0
494 startingCount := m.mempool.Count()
495
496 for fetchNum := 0; fetchNum < maxFetches; fetchNum++ {
497 // Calculate batch size
498 remaining := target - (m.mempool.Count() - startingCount)
499 if remaining <= 0 {
500 break
501 }
502
503 batchSize := 1000
504 if remaining < 500 {
505 batchSize = 200
506 }
507
508 m.logger.Printf(" Fetch #%d: requesting %d operations (mempool: %d)",
509 fetchNum+1, batchSize, m.mempool.Count())
510
511 batch, err := m.plcClient.Export(ctx, plc.ExportOptions{
512 Count: batchSize,
513 After: currentAfter,
514 })
515 if err != nil {
516 m.mempool.Save()
517 return fmt.Errorf("export failed: %w", err)
518 }
519
520 if len(batch) == 0 {
521 m.logger.Printf(" No more operations available from PLC")
522 m.mempool.Save()
523 if totalAdded > 0 {
524 return nil
525 }
526 return fmt.Errorf("no operations available")
527 }
528
529 // Deduplicate
530 uniqueOps := make([]plc.PLCOperation, 0)
531 for _, op := range batch {
532 if !seenCIDs[op.CID] {
533 seenCIDs[op.CID] = true
534 uniqueOps = append(uniqueOps, op)
535 }
536 }
537
538 if len(uniqueOps) > 0 {
539 // CRITICAL: Add with validation
540 added, err := m.mempool.Add(uniqueOps)
541 if err != nil {
542 // Validation error - save current state and return
543 m.mempool.Save()
544 return fmt.Errorf("chronological validation failed: %w", err)
545 }
546
547 totalAdded += added
548 m.logger.Printf(" Added %d new operations (mempool now: %d)", added, m.mempool.Count())
549
550 // Save after each successful addition
551 if err := m.mempool.Save(); err != nil {
552 m.logger.Printf(" Warning: failed to save mempool: %v", err)
553 }
554 }
555
556 // Update cursor
557 if len(batch) > 0 {
558 currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
559 }
560
561 // Stop if we got less than requested (caught up)
562 if len(batch) < batchSize {
563 m.logger.Printf(" Received incomplete batch (%d/%d), caught up to latest", len(batch), batchSize)
564 break
565 }
566 }
567
568 // Final save
569 m.mempool.Save()
570
571 if totalAdded > 0 {
572 m.logger.Printf("✓ Fetch complete: added %d operations (mempool: %d)", totalAdded, m.mempool.Count())
573 return nil
574 }
575
576 return fmt.Errorf("no new operations added")
577}
578
579// GetMempoolStats returns mempool statistics
580func (m *Manager) GetMempoolStats() map[string]interface{} {
581 return m.mempool.Stats()
582}
583
584// GetMempoolOperations returns all operations currently in mempool
585func (m *Manager) GetMempoolOperations() ([]plc.PLCOperation, error) {
586 if m.mempool == nil {
587 return nil, fmt.Errorf("mempool not initialized")
588 }
589
590 // Use Peek to get operations without removing them
591 count := m.mempool.Count()
592 if count == 0 {
593 return []plc.PLCOperation{}, nil
594 }
595
596 return m.mempool.Peek(count), nil
597}
598
599// VerifyBundle verifies a bundle's integrity
600func (m *Manager) VerifyBundle(ctx context.Context, bundleNumber int) (*VerificationResult, error) {
601 result := &VerificationResult{
602 BundleNumber: bundleNumber,
603 }
604
605 // Get from index
606 meta, err := m.index.GetBundle(bundleNumber)
607 if err != nil {
608 result.Error = err
609 return result, nil
610 }
611
612 result.ExpectedHash = meta.CompressedHash
613
614 // Check file exists
615 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
616 result.FileExists = m.operations.FileExists(path)
617 if !result.FileExists {
618 result.Error = fmt.Errorf("file not found")
619 return result, nil
620 }
621
622 // Verify hash
623 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
624 if err != nil {
625 result.Error = err
626 return result, nil
627 }
628
629 result.LocalHash = actualHash
630 result.HashMatch = valid
631 result.Valid = valid
632
633 return result, nil
634}
635
636// VerifyChain verifies the entire bundle chain
637func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) {
638 result := &ChainVerificationResult{
639 VerifiedBundles: make([]int, 0),
640 }
641
642 bundles := m.index.GetBundles()
643 if len(bundles) == 0 {
644 result.Valid = true
645 return result, nil
646 }
647
648 result.ChainLength = len(bundles)
649
650 for i, meta := range bundles {
651 // Verify file hash
652 vr, err := m.VerifyBundle(ctx, meta.BundleNumber)
653 if err != nil || !vr.Valid {
654 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber)
655 result.BrokenAt = meta.BundleNumber
656 return result, nil
657 }
658
659 // Verify chain link
660 if i > 0 {
661 prevMeta := bundles[i-1]
662
663 // Check parent reference
664 if meta.Parent != prevMeta.Hash {
665 result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber)
666 result.BrokenAt = meta.BundleNumber
667 return result, nil
668 }
669
670 // Verify chain hash calculation
671 expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash)
672 if meta.Hash != expectedHash {
673 result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber)
674 result.BrokenAt = meta.BundleNumber
675 return result, nil
676 }
677 }
678
679 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber)
680 }
681
682 result.Valid = true
683 return result, nil
684}
685
686// ScanDirectory scans the bundle directory and rebuilds the index
687func (m *Manager) ScanDirectory() (*DirectoryScanResult, error) {
688 result := &DirectoryScanResult{
689 BundleDir: m.config.BundleDir,
690 }
691
692 m.logger.Printf("Scanning directory: %s", m.config.BundleDir)
693
694 // Find all bundle files
695 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
696 if err != nil {
697 return nil, fmt.Errorf("failed to scan directory: %w", err)
698 }
699 files = filterBundleFiles(files)
700
701 if len(files) == 0 {
702 m.logger.Printf("No bundle files found")
703 return result, nil
704 }
705
706 // Parse bundle numbers
707 var bundleNumbers []int
708 for _, file := range files {
709 base := filepath.Base(file)
710 numStr := strings.TrimSuffix(base, ".jsonl.zst")
711 num, err := strconv.Atoi(numStr)
712 if err != nil {
713 m.logger.Printf("Warning: skipping invalid filename: %s", base)
714 continue
715 }
716 bundleNumbers = append(bundleNumbers, num)
717 }
718
719 sort.Ints(bundleNumbers)
720
721 result.BundleCount = len(bundleNumbers)
722 if len(bundleNumbers) > 0 {
723 result.FirstBundle = bundleNumbers[0]
724 result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
725 }
726
727 // Find gaps
728 if len(bundleNumbers) > 1 {
729 for i := result.FirstBundle; i <= result.LastBundle; i++ {
730 found := false
731 for _, num := range bundleNumbers {
732 if num == i {
733 found = true
734 break
735 }
736 }
737 if !found {
738 result.MissingGaps = append(result.MissingGaps, i)
739 }
740 }
741 }
742
743 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
744
745 // Load each bundle and rebuild index
746 var newMetadata []*BundleMetadata
747 var totalSize int64
748
749 for _, num := range bundleNumbers {
750 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
751
752 // Load bundle
753 ops, err := m.operations.LoadBundle(path)
754 if err != nil {
755 m.logger.Printf("Warning: failed to load bundle %d: %v", num, err)
756 continue
757 }
758
759 // Get file size
760 size, _ := m.operations.GetFileSize(path)
761 totalSize += size
762
763 // Calculate parent and cursor from previous bundle
764 var parent string
765 var cursor string
766 if num > 1 && len(newMetadata) > 0 {
767 prevMeta := newMetadata[len(newMetadata)-1]
768 parent = prevMeta.Hash
769 cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
770 }
771
772 // Use the ONE method for metadata calculation
773 meta, err := m.operations.CalculateBundleMetadata(num, path, ops, parent, cursor)
774 if err != nil {
775 m.logger.Printf("Warning: failed to calculate metadata for bundle %d: %v", num, err)
776 continue
777 }
778
779 newMetadata = append(newMetadata, meta)
780
781 m.logger.Printf(" Scanned bundle %06d: %d ops, %d DIDs", num, len(ops), meta.DIDCount)
782 }
783
784 result.TotalSize = totalSize
785
786 // Rebuild index
787 m.index.Rebuild(newMetadata)
788
789 // Save index
790 if err := m.SaveIndex(); err != nil {
791 return nil, fmt.Errorf("failed to save index: %w", err)
792 }
793
794 result.IndexUpdated = true
795
796 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
797
798 return result, nil
799}
800
801// ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index
802func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*DirectoryScanResult, error) {
803 result := &DirectoryScanResult{
804 BundleDir: m.config.BundleDir,
805 }
806
807 m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir)
808
809 // Find all bundle files
810 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
811 if err != nil {
812 return nil, fmt.Errorf("failed to scan directory: %w", err)
813 }
814 files = filterBundleFiles(files)
815
816 if len(files) == 0 {
817 m.logger.Printf("No bundle files found")
818 return result, nil
819 }
820
821 // Parse bundle numbers
822 var bundleNumbers []int
823 for _, file := range files {
824 base := filepath.Base(file)
825 numStr := strings.TrimSuffix(base, ".jsonl.zst")
826 num, err := strconv.Atoi(numStr)
827 if err != nil {
828 m.logger.Printf("Warning: skipping invalid filename: %s", base)
829 continue
830 }
831 bundleNumbers = append(bundleNumbers, num)
832 }
833
834 sort.Ints(bundleNumbers)
835
836 result.BundleCount = len(bundleNumbers)
837 if len(bundleNumbers) > 0 {
838 result.FirstBundle = bundleNumbers[0]
839 result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
840 }
841
842 // Find gaps
843 if len(bundleNumbers) > 1 {
844 for i := result.FirstBundle; i <= result.LastBundle; i++ {
845 found := false
846 for _, num := range bundleNumbers {
847 if num == i {
848 found = true
849 break
850 }
851 }
852 if !found {
853 result.MissingGaps = append(result.MissingGaps, i)
854 }
855 }
856 }
857
858 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
859
860 // Process bundles in parallel
861 type bundleResult struct {
862 index int
863 meta *BundleMetadata
864 err error
865 }
866
867 jobs := make(chan int, len(bundleNumbers))
868 results := make(chan bundleResult, len(bundleNumbers))
869
870 // Start workers
871 var wg sync.WaitGroup
872 for w := 0; w < workers; w++ {
873 wg.Add(1)
874 go func() {
875 defer wg.Done()
876 for num := range jobs {
877 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
878
879 // Load and process bundle
880 ops, err := m.operations.LoadBundle(path)
881 if err != nil {
882 results <- bundleResult{index: num, err: err}
883 continue
884 }
885
886 // Use the FAST method (cursor will be set later in sequential phase)
887 meta, err := m.operations.CalculateBundleMetadataFast(num, path, ops, "")
888 if err != nil {
889 results <- bundleResult{index: num, err: err}
890 continue
891 }
892
893 results <- bundleResult{index: num, meta: meta}
894 }
895 }()
896 }
897
898 // Send jobs
899 for _, num := range bundleNumbers {
900 jobs <- num
901 }
902 close(jobs)
903
904 // Wait for all workers to finish
905 go func() {
906 wg.Wait()
907 close(results)
908 }()
909
910 // Collect results (in a map first, then sort)
911 metadataMap := make(map[int]*BundleMetadata)
912 var totalSize int64
913 var totalUncompressed int64
914 processed := 0
915
916 for result := range results {
917 processed++
918
919 // Update progress WITH bytes
920 if progressCallback != nil {
921 if result.meta != nil {
922 totalUncompressed += result.meta.UncompressedSize
923 }
924 progressCallback(processed, len(bundleNumbers), totalUncompressed)
925 }
926
927 if result.err != nil {
928 m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err)
929 continue
930 }
931 metadataMap[result.index] = result.meta
932 totalSize += result.meta.CompressedSize
933 }
934
935 // Build ordered metadata slice and calculate chain hashes
936 var newMetadata []*BundleMetadata
937 var parent string // Parent chain hash
938
939 for i, num := range bundleNumbers {
940 meta, ok := metadataMap[num]
941 if !ok {
942 continue // Skip failed bundles
943 }
944
945 // Set cursor from previous bundle's EndTime
946 if i > 0 && len(newMetadata) > 0 {
947 prevMeta := newMetadata[len(newMetadata)-1]
948 meta.Cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
949 }
950
951 // Now calculate chain hash (must be done sequentially)
952 meta.Hash = m.operations.CalculateChainHash(parent, meta.ContentHash)
953 meta.Parent = parent
954
955 newMetadata = append(newMetadata, meta)
956 parent = meta.Hash // Store for next iteration
957 }
958
959 result.TotalSize = totalSize
960 result.TotalUncompressed = totalUncompressed
961
962 // Rebuild index
963 m.index.Rebuild(newMetadata)
964
965 // Save index
966 if err := m.SaveIndex(); err != nil {
967 return nil, fmt.Errorf("failed to save index: %w", err)
968 }
969
970 result.IndexUpdated = true
971
972 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
973
974 return result, nil
975}
976
977// GetInfo returns information about the bundle manager
978func (m *Manager) GetInfo() map[string]interface{} {
979 stats := m.index.GetStats()
980 stats["bundle_dir"] = m.config.BundleDir
981 stats["index_path"] = m.indexPath
982 stats["verify_on_load"] = m.config.VerifyOnLoad
983 return stats
984}
985
986// ExportOperations exports operations from bundles
987func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plc.PLCOperation, error) {
988 if count <= 0 {
989 count = 1000
990 }
991
992 var result []plc.PLCOperation
993 seenCIDs := make(map[string]bool)
994
995 bundles := m.index.GetBundles()
996
997 for _, meta := range bundles {
998 if result != nil && len(result) >= count {
999 break
1000 }
1001
1002 // Skip bundles before afterTime
1003 if !afterTime.IsZero() && meta.EndTime.Before(afterTime) {
1004 continue
1005 }
1006
1007 // Load bundle
1008 bundle, err := m.LoadBundle(ctx, meta.BundleNumber)
1009 if err != nil {
1010 m.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err)
1011 continue
1012 }
1013
1014 // Add operations
1015 for _, op := range bundle.Operations {
1016 if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) {
1017 continue
1018 }
1019
1020 if seenCIDs[op.CID] {
1021 continue
1022 }
1023
1024 seenCIDs[op.CID] = true
1025 result = append(result, op)
1026
1027 if len(result) >= count {
1028 break
1029 }
1030 }
1031 }
1032
1033 return result, nil
1034}
1035
1036// ScanBundle scans a single bundle file and returns its metadata
1037func (m *Manager) ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) {
1038 // Load bundle file
1039 operations, err := m.operations.LoadBundle(path)
1040 if err != nil {
1041 return nil, fmt.Errorf("failed to load bundle: %w", err)
1042 }
1043
1044 if len(operations) == 0 {
1045 return nil, fmt.Errorf("bundle is empty")
1046 }
1047
1048 // Get parent chain hash and cursor from previous bundle
1049 var parent string
1050 var cursor string
1051 if bundleNumber > 1 {
1052 if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil {
1053 parent = prevMeta.Hash
1054 cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
1055 }
1056 }
1057
1058 // Use the ONE method
1059 return m.operations.CalculateBundleMetadata(bundleNumber, path, operations, parent, cursor)
1060}
1061
1062// ScanAndIndexBundle scans a bundle file and adds it to the index
1063func (m *Manager) ScanAndIndexBundle(path string, bundleNumber int) (*BundleMetadata, error) {
1064 meta, err := m.ScanBundle(path, bundleNumber)
1065 if err != nil {
1066 return nil, err
1067 }
1068
1069 // Add to index
1070 m.index.AddBundle(meta)
1071
1072 // Save index
1073 if err := m.SaveIndex(); err != nil {
1074 return nil, fmt.Errorf("failed to save index: %w", err)
1075 }
1076
1077 return meta, nil
1078}
1079
1080// IsBundleIndexed checks if a bundle is already in the index
1081func (m *Manager) IsBundleIndexed(bundleNumber int) bool {
1082 _, err := m.index.GetBundle(bundleNumber)
1083 return err == nil
1084}
1085
1086// RefreshMempool reloads mempool from disk (useful for debugging)
1087func (m *Manager) RefreshMempool() error {
1088 if m.mempool == nil {
1089 return fmt.Errorf("mempool not initialized")
1090 }
1091 return m.mempool.Load()
1092}
1093
1094// ClearMempool clears all operations from the mempool and saves
1095func (m *Manager) ClearMempool() error {
1096 if m.mempool == nil {
1097 return fmt.Errorf("mempool not initialized")
1098 }
1099
1100 m.logger.Printf("Clearing mempool...")
1101
1102 // Get count before clearing
1103 count := m.mempool.Count()
1104
1105 // Clear the mempool
1106 m.mempool.Clear()
1107
1108 // Save the empty state (this will delete the file since it's empty)
1109 if err := m.mempool.Save(); err != nil {
1110 return fmt.Errorf("failed to save mempool: %w", err)
1111 }
1112
1113 m.logger.Printf("Cleared %d operations from mempool", count)
1114
1115 return nil
1116}
1117
1118// Add validation method
1119func (m *Manager) ValidateMempool() error {
1120 if m.mempool == nil {
1121 return fmt.Errorf("mempool not initialized")
1122 }
1123 return m.mempool.Validate()
1124}
1125
1126// StreamBundleRaw streams the raw compressed bundle file
1127func (m *Manager) StreamBundleRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
1128 // Get metadata from index
1129 meta, err := m.index.GetBundle(bundleNumber)
1130 if err != nil {
1131 return nil, fmt.Errorf("bundle not in index: %w", err)
1132 }
1133
1134 // Build file path
1135 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
1136 if !m.operations.FileExists(path) {
1137 return nil, fmt.Errorf("bundle file not found: %s", path)
1138 }
1139
1140 // Optionally verify hash before streaming
1141 if m.config.VerifyOnLoad {
1142 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
1143 if err != nil {
1144 return nil, fmt.Errorf("failed to verify hash: %w", err)
1145 }
1146 if !valid {
1147 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
1148 }
1149 }
1150
1151 return m.operations.StreamRaw(path)
1152}
1153
1154// StreamBundleDecompressed streams the decompressed bundle data as JSONL
1155func (m *Manager) StreamBundleDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
1156 // Get metadata from index
1157 _, err := m.index.GetBundle(bundleNumber)
1158 if err != nil {
1159 return nil, fmt.Errorf("bundle not in index: %w", err)
1160 }
1161
1162 // Build file path
1163 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
1164 if !m.operations.FileExists(path) {
1165 return nil, fmt.Errorf("bundle file not found: %s", path)
1166 }
1167
1168 return m.operations.StreamDecompressed(path)
1169}
1170
1171// RefreshIndex reloads the index from disk if it has been modified
1172func (m *Manager) RefreshIndex() error {
1173 // Check if index file has been modified
1174 info, err := os.Stat(m.indexPath)
1175 if err != nil {
1176 return err
1177 }
1178
1179 // If index was modified after we loaded it, reload
1180 if info.ModTime().After(m.index.UpdatedAt) {
1181 m.logger.Printf("Index file modified, reloading...")
1182
1183 newIndex, err := LoadIndex(m.indexPath)
1184 if err != nil {
1185 return fmt.Errorf("failed to reload index: %w", err)
1186 }
1187
1188 m.index = newIndex
1189 m.logger.Printf("Index reloaded: %d bundles", m.index.Count())
1190 }
1191
1192 return nil
1193}
1194
1195// filterBundleFiles filters out files starting with . or _ (system/temp files)
1196func filterBundleFiles(files []string) []string {
1197 filtered := make([]string, 0, len(files))
1198 for _, file := range files {
1199 basename := filepath.Base(file)
1200 // Skip files starting with . or _
1201 if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') {
1202 continue
1203 }
1204 filtered = append(filtered, file)
1205 }
1206 return filtered
1207}
1208
1209// ValidateBundle validates all operations in a bundle using go-didplc
1210func (m *Manager) ValidateBundle(ctx context.Context, bundleNumber int) error {
1211 bundle, err := m.LoadBundle(ctx, bundleNumber)
1212 if err != nil {
1213 return err
1214 }
1215
1216 validator := NewValidator(m.logger)
1217 return validator.ValidateBundleOperations(bundle.Operations)
1218}
1219
1220// ValidateAllBundles validates all bundles in the repository
1221func (m *Manager) ValidateAllBundles(ctx context.Context, progressFunc func(current, total int)) error {
1222 index := m.GetIndex()
1223 bundles := index.GetBundles()
1224
1225 m.logger.Printf("Validating %d bundles...", len(bundles))
1226
1227 validator := NewValidator(m.logger)
1228 errors := 0
1229
1230 for i, meta := range bundles {
1231 if progressFunc != nil {
1232 progressFunc(i+1, len(bundles))
1233 }
1234
1235 bundle, err := m.LoadBundle(ctx, meta.BundleNumber)
1236 if err != nil {
1237 m.logger.Printf("Failed to load bundle %d: %v", meta.BundleNumber, err)
1238 errors++
1239 continue
1240 }
1241
1242 if err := validator.ValidateBundleOperations(bundle.Operations); err != nil {
1243 m.logger.Printf("Bundle %d validation failed: %v", meta.BundleNumber, err)
1244 errors++
1245 }
1246 }
1247
1248 if errors > 0 {
1249 return fmt.Errorf("%d bundles failed validation", errors)
1250 }
1251
1252 return nil
1253}
1254
1255// ValidateBundleWithDetails validates a bundle and returns invalid operations
1256func (m *Manager) ValidateBundleWithDetails(ctx context.Context, bundleNumber int) ([]InvalidOperation, error) {
1257 bundle, err := m.LoadBundle(ctx, bundleNumber)
1258 if err != nil {
1259 return nil, err
1260 }
1261
1262 validator := NewValidator(m.logger)
1263 return validator.ValidateBundleOperationsWithDetails(bundle.Operations)
1264}
1265
1266// ValidateAllBundlesWithDetails validates all bundles and returns all invalid operations
1267func (m *Manager) ValidateAllBundlesWithDetails(ctx context.Context, progressFunc func(current, total int)) ([]InvalidOperation, error) {
1268 index := m.GetIndex()
1269 bundles := index.GetBundles()
1270
1271 validator := NewValidator(m.logger)
1272 var allInvalid []InvalidOperation
1273 errors := 0
1274
1275 for i, meta := range bundles {
1276 if progressFunc != nil {
1277 progressFunc(i+1, len(bundles))
1278 }
1279
1280 bundle, err := m.LoadBundle(ctx, meta.BundleNumber)
1281 if err != nil {
1282 m.logger.Printf("Failed to load bundle %d: %v", meta.BundleNumber, err)
1283 errors++
1284 continue
1285 }
1286
1287 invalid, err := validator.ValidateBundleOperationsWithDetails(bundle.Operations)
1288 if err != nil {
1289 m.logger.Printf("Bundle %d validation failed: %v", meta.BundleNumber, err)
1290 errors++
1291 }
1292
1293 allInvalid = append(allInvalid, invalid...)
1294 }
1295
1296 if errors > 0 {
1297 return allInvalid, fmt.Errorf("%d bundles failed validation", errors)
1298 }
1299
1300 return allInvalid, nil
1301}
1302
1303// ValidateBundleStreaming validates a bundle and streams invalid operations
1304func (m *Manager) ValidateBundleStreaming(ctx context.Context, bundleNumber int, callback InvalidCallback) error {
1305 bundle, err := m.LoadBundle(ctx, bundleNumber)
1306 if err != nil {
1307 return err
1308 }
1309
1310 validator := NewValidator(m.logger)
1311 return validator.ValidateBundleOperationsStreaming(bundle.Operations, callback)
1312}
1313
1314// ValidateAllBundlesStreaming validates all bundles and streams invalid operations
1315func (m *Manager) ValidateAllBundlesStreaming(ctx context.Context, callback InvalidCallback, progressFunc func(current, total int)) error {
1316 index := m.GetIndex()
1317 bundles := index.GetBundles()
1318
1319 validator := NewValidator(m.logger)
1320 errors := 0
1321
1322 for i, meta := range bundles {
1323 if progressFunc != nil {
1324 progressFunc(i+1, len(bundles))
1325 }
1326
1327 bundle, err := m.LoadBundle(ctx, meta.BundleNumber)
1328 if err != nil {
1329 m.logger.Printf("Failed to load bundle %d: %v", meta.BundleNumber, err)
1330 errors++
1331 continue
1332 }
1333
1334 if err := validator.ValidateBundleOperationsStreaming(bundle.Operations, callback); err != nil {
1335 // Errors already streamed via callback, just count them
1336 errors++
1337 }
1338 }
1339
1340 if errors > 0 {
1341 return fmt.Errorf("%d bundles had validation errors", errors)
1342 }
1343
1344 return nil
1345}