A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package bundle
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log"
8 "os"
9 "path/filepath"
10 "runtime"
11 "sort"
12 "strconv"
13 "strings"
14 "sync"
15 "time"
16
17 "tangled.org/atscan.net/plcbundle/plc"
18)
19
20// defaultLogger is a simple logger implementation
21type defaultLogger struct{}
22
23func (d defaultLogger) Printf(format string, v ...interface{}) {
24 log.Printf(format, v...)
25}
26
27func (d defaultLogger) Println(v ...interface{}) {
28 log.Println(v...)
29}
30
31// Manager handles bundle operations
32type Manager struct {
33 config *Config
34 operations *Operations
35 index *Index
36 indexPath string
37 plcClient *plc.Client
38 logger Logger
39 mempool *Mempool
40 didIndex *DIDIndexManager
41}
42
43// NewManager creates a new bundle manager
44func NewManager(config *Config, plcClient *plc.Client) (*Manager, error) {
45 if config == nil {
46 config = DefaultConfig("./plc_bundles")
47 }
48
49 if config.Logger == nil {
50 config.Logger = defaultLogger{}
51 }
52
53 // Ensure directory exists
54 if err := os.MkdirAll(config.BundleDir, 0755); err != nil {
55 return nil, fmt.Errorf("failed to create bundle directory: %w", err)
56 }
57
58 // Initialize operations handler
59 ops, err := NewOperations(config.Logger)
60 if err != nil {
61 return nil, fmt.Errorf("failed to initialize operations: %w", err)
62 }
63
64 // Determine origin
65 var origin string
66 if plcClient != nil {
67 origin = plcClient.GetBaseURL()
68 }
69
70 // Load or create index
71 indexPath := filepath.Join(config.BundleDir, INDEX_FILE)
72 index, err := LoadIndex(indexPath)
73
74 // Check for bundle files in directory
75 bundleFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.jsonl.zst"))
76 bundleFiles = filterBundleFiles(bundleFiles)
77 hasBundleFiles := len(bundleFiles) > 0
78
79 // Check if clone/download is in progress (look for .tmp files)
80 tmpFiles, _ := filepath.Glob(filepath.Join(config.BundleDir, "*.tmp"))
81 cloneInProgress := len(tmpFiles) > 0
82
83 needsRebuild := false
84
85 if err != nil {
86 // Index doesn't exist or is invalid
87 if hasBundleFiles {
88 if cloneInProgress {
89 config.Logger.Printf("Clone/download in progress, skipping auto-rebuild")
90 } else {
91 config.Logger.Printf("No valid index found, but detected %d bundle files", len(bundleFiles))
92 needsRebuild = true
93 }
94 } else {
95 // No index and no bundles - create fresh index
96 config.Logger.Printf("Creating new index at %s", indexPath)
97 index = NewIndex(origin) // Pass origin
98 if err := index.Save(indexPath); err != nil {
99 return nil, fmt.Errorf("failed to save new index: %w", err)
100 }
101 }
102 } else {
103 // Index exists - auto-populate origin if missing (ONLY TIME THIS HAPPENS)
104 if index.Origin == "" {
105 if origin != "" {
106 config.Logger.Printf("⚠️ Upgrading old index: setting origin to %s", origin)
107 index.Origin = origin
108 if err := index.Save(indexPath); err != nil {
109 return nil, fmt.Errorf("failed to update index with origin: %w", err)
110 }
111 } else {
112 config.Logger.Printf("⚠️ Warning: index has no origin and no PLC client configured")
113 }
114 }
115
116 // Validate origin matches if both are set
117 if index.Origin != "" && origin != "" && index.Origin != origin {
118 return nil, fmt.Errorf(
119 "origin mismatch: index has origin %q but PLC client points to %q\n"+
120 "Cannot mix bundles from different sources. Use a different directory or reconfigure PLC client",
121 index.Origin, origin,
122 )
123 }
124
125 config.Logger.Printf("Loaded index with %d bundles (origin: %s)", index.Count(), index.Origin)
126
127 // Check if there are bundle files not in the index
128 if hasBundleFiles && len(bundleFiles) > index.Count() {
129 if cloneInProgress {
130 config.Logger.Printf("Clone/download in progress (%d .tmp files), skipping auto-rebuild", len(tmpFiles))
131 } else {
132 config.Logger.Printf("Detected %d bundle files but index only has %d entries - rebuilding",
133 len(bundleFiles), index.Count())
134 needsRebuild = true
135 }
136 }
137 }
138
139 if index != nil && plcClient != nil {
140 currentOrigin := plcClient.GetBaseURL()
141
142 // Check if origins match
143 if index.Origin != "" && index.Origin != currentOrigin {
144 return nil, fmt.Errorf(
145 "origin mismatch: index has origin %q but PLC client points to %q. "+
146 "Cannot mix bundles from different sources",
147 index.Origin, currentOrigin,
148 )
149 }
150
151 // Set origin if not set (for backward compatibility with old indexes)
152 if index.Origin == "" && currentOrigin != "" {
153 index.Origin = currentOrigin
154 config.Logger.Printf("Setting origin for existing index: %s", currentOrigin)
155 if err := index.Save(indexPath); err != nil {
156 return nil, fmt.Errorf("failed to update index with origin: %w", err)
157 }
158 }
159 }
160
161 // Perform rebuild if needed (using parallel scan)
162 if needsRebuild && config.AutoRebuild {
163 config.Logger.Printf("Rebuilding index from %d bundle files...", len(bundleFiles))
164
165 // Create temporary manager for scanning
166 tempMgr := &Manager{
167 config: config,
168 operations: ops,
169 index: NewIndex("test-origin"),
170 indexPath: indexPath,
171 logger: config.Logger,
172 }
173
174 // Use parallel scan with auto-detected CPU count
175 workers := config.RebuildWorkers
176 if workers <= 0 {
177 workers = runtime.NumCPU()
178 if workers < 1 {
179 workers = 1
180 }
181 }
182
183 config.Logger.Printf("Using %d workers for parallel scan", workers)
184
185 // Create progress callback wrapper with new signature
186 var progressCallback func(current, total int, bytesProcessed int64)
187 if config.RebuildProgress != nil {
188 // Wrap the old-style callback to work with new signature
189 oldCallback := config.RebuildProgress
190 progressCallback = func(current, total int, bytesProcessed int64) {
191 oldCallback(current, total)
192 }
193 } else {
194 // Default: log every 100 bundles
195 progressCallback = func(current, total int, bytesProcessed int64) {
196 if current%100 == 0 || current == total {
197 mbProcessed := float64(bytesProcessed) / (1024 * 1024)
198 config.Logger.Printf("Rebuild progress: %d/%d bundles (%.1f%%), %.1f MB processed",
199 current, total, float64(current)/float64(total)*100, mbProcessed)
200 }
201 }
202 }
203
204 start := time.Now()
205
206 // Scan directory to rebuild index (parallel)
207 result, err := tempMgr.ScanDirectoryParallel(workers, progressCallback)
208 if err != nil {
209 return nil, fmt.Errorf("failed to rebuild index: %w", err)
210 }
211
212 elapsed := time.Since(start)
213
214 // Reload the rebuilt index
215 index, err = LoadIndex(indexPath)
216 if err != nil {
217 return nil, fmt.Errorf("failed to load rebuilt index: %w", err)
218 }
219
220 // Calculate throughput
221 mbPerSec := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024)
222
223 config.Logger.Printf("✓ Index rebuilt with %d bundles in %s",
224 index.Count(), elapsed.Round(time.Millisecond))
225 config.Logger.Printf(" Speed: %.1f bundles/sec, %.1f MB/s (uncompressed)",
226 float64(result.BundleCount)/elapsed.Seconds(), mbPerSec)
227
228 // Verify all chain hashes are present
229 bundles := index.GetBundles()
230 missingHashes := 0
231 for i, meta := range bundles {
232 if meta.ContentHash == "" {
233 missingHashes++
234 }
235 if i > 0 && meta.Hash == "" {
236 missingHashes++
237 }
238 }
239 if missingHashes > 0 {
240 config.Logger.Printf("⚠️ Warning: %d bundles have missing hashes", missingHashes)
241 }
242 }
243
244 if index == nil {
245 index = NewIndex("test-origin")
246 }
247
248 // Initialize mempool for next bundle
249 lastBundle := index.GetLastBundle()
250 nextBundleNum := 1
251 var minTimestamp time.Time
252
253 if lastBundle != nil {
254 nextBundleNum = lastBundle.BundleNumber + 1
255 minTimestamp = lastBundle.EndTime
256 }
257
258 mempool, err := NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger)
259 if err != nil {
260 return nil, fmt.Errorf("failed to initialize mempool: %w", err)
261 }
262
263 // Initialize DID index manager
264 didIndex := NewDIDIndexManager(config.BundleDir, config.Logger)
265
266 return &Manager{
267 config: config,
268 operations: ops,
269 index: index,
270 indexPath: indexPath,
271 plcClient: plcClient,
272 logger: config.Logger,
273 mempool: mempool,
274 didIndex: didIndex,
275 }, nil
276}
277
278// Close cleans up resources
279func (m *Manager) Close() {
280 if m.operations != nil {
281 m.operations.Close()
282 }
283 if m.plcClient != nil {
284 m.plcClient.Close()
285 }
286 if m.mempool != nil {
287 if err := m.mempool.Save(); err != nil {
288 m.logger.Printf("Warning: failed to save mempool: %v", err)
289 }
290 }
291 if m.didIndex != nil { // ← ADD THIS
292 m.didIndex.Close()
293 }
294}
295
296// GetIndex returns the current index
297func (m *Manager) GetIndex() *Index {
298 return m.index
299}
300
301// SaveIndex saves the index to disk
302func (m *Manager) SaveIndex() error {
303 return m.index.Save(m.indexPath)
304}
305
306// LoadBundle loads a bundle from disk
307func (m *Manager) LoadBundle(ctx context.Context, bundleNumber int) (*Bundle, error) {
308 // Get metadata from index
309 meta, err := m.index.GetBundle(bundleNumber)
310 if err != nil {
311 return nil, fmt.Errorf("bundle not in index: %w", err)
312 }
313
314 // Load file
315 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
316 if !m.operations.FileExists(path) {
317 return nil, fmt.Errorf("bundle file not found: %s", path)
318 }
319
320 // Verify hash if enabled
321 if m.config.VerifyOnLoad {
322 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
323 if err != nil {
324 return nil, fmt.Errorf("failed to verify hash: %w", err)
325 }
326 if !valid {
327 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
328 }
329 }
330
331 // Load operations
332 operations, err := m.operations.LoadBundle(path)
333 if err != nil {
334 return nil, fmt.Errorf("failed to load bundle: %w", err)
335 }
336
337 // Create bundle struct
338 bundle := &Bundle{
339 BundleNumber: meta.BundleNumber,
340 StartTime: meta.StartTime,
341 EndTime: meta.EndTime,
342 Operations: operations,
343 DIDCount: meta.DIDCount,
344 Hash: meta.Hash, // Chain hash (primary)
345 ContentHash: meta.ContentHash, // Content hash
346 Parent: meta.Parent, // Parent chain hash
347 CompressedHash: meta.CompressedHash,
348 CompressedSize: meta.CompressedSize,
349 UncompressedSize: meta.UncompressedSize,
350 Cursor: meta.Cursor,
351 Compressed: true,
352 CreatedAt: meta.CreatedAt,
353 }
354
355 return bundle, nil
356}
357
358// SaveBundle saves a bundle to disk and updates the index
359func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, quiet bool) error {
360 if err := bundle.ValidateForSave(); err != nil {
361 return fmt.Errorf("bundle validation failed: %w", err)
362 }
363
364 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber))
365
366 // Save to disk
367 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations)
368 if err != nil {
369 return fmt.Errorf("failed to save bundle: %w", err)
370 }
371
372 bundle.ContentHash = uncompressedHash
373 bundle.CompressedHash = compressedHash
374 bundle.UncompressedSize = uncompressedSize
375 bundle.CompressedSize = compressedSize
376 bundle.CreatedAt = time.Now().UTC()
377
378 // Get parent
379 var parent string
380 if bundle.BundleNumber > 1 {
381 prevBundle := m.index.GetLastBundle()
382 if prevBundle != nil {
383 parent = prevBundle.Hash
384 } else {
385 if prevMeta, err := m.index.GetBundle(bundle.BundleNumber - 1); err == nil {
386 parent = prevMeta.Hash
387 }
388 }
389 }
390
391 bundle.Parent = parent
392 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash)
393
394 // Add to index
395 m.index.AddBundle(bundle.ToMetadata())
396
397 // Save index
398 if err := m.SaveIndex(); err != nil {
399 return fmt.Errorf("failed to save index: %w", err)
400 }
401
402 // Clean up old mempool (silent unless verbose)
403 oldMempoolFile := m.mempool.GetFilename()
404 if err := m.mempool.Delete(); err != nil && !quiet {
405 m.logger.Printf("Warning: failed to delete old mempool %s: %v", oldMempoolFile, err)
406 }
407
408 // Create new mempool
409 nextBundle := bundle.BundleNumber + 1
410 minTimestamp := bundle.EndTime
411
412 newMempool, err := NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger)
413 if err != nil {
414 return fmt.Errorf("failed to create new mempool: %w", err)
415 }
416
417 m.mempool = newMempool
418
419 // Update DID index if enabled
420 if m.didIndex != nil && m.didIndex.Exists() {
421 if err := m.UpdateDIDIndexForBundle(ctx, bundle); err != nil {
422 m.logger.Printf("Warning: failed to update DID index: %v", err)
423 }
424 }
425
426 return nil
427}
428
429// FetchNextBundle fetches the next bundle from PLC directory
430func (m *Manager) FetchNextBundle(ctx context.Context, quiet bool) (*Bundle, error) {
431 if m.plcClient == nil {
432 return nil, fmt.Errorf("PLC client not configured")
433 }
434
435 lastBundle := m.index.GetLastBundle()
436 nextBundleNum := 1
437 var afterTime string
438 var prevBoundaryCIDs map[string]bool
439 var prevBundleHash string
440
441 if lastBundle != nil {
442 nextBundleNum = lastBundle.BundleNumber + 1
443 afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
444 prevBundleHash = lastBundle.Hash
445
446 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
447 if err == nil {
448 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
449 }
450 }
451
452 if !quiet {
453 m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
454 }
455
456 for m.mempool.Count() < BUNDLE_SIZE {
457 if !quiet {
458 m.logger.Printf("Fetching more operations (have %d/%d)...", m.mempool.Count(), BUNDLE_SIZE)
459 }
460
461 err := m.fetchToMempool(ctx, afterTime, prevBoundaryCIDs, BUNDLE_SIZE-m.mempool.Count(), quiet)
462 if err != nil {
463 if m.mempool.Count() >= BUNDLE_SIZE {
464 break
465 }
466 m.mempool.Save()
467 return nil, fmt.Errorf("insufficient operations: have %d, need %d", m.mempool.Count(), BUNDLE_SIZE)
468 }
469
470 if m.mempool.Count() < BUNDLE_SIZE {
471 m.mempool.Save()
472 return nil, fmt.Errorf("insufficient operations: have %d, need %d (no more available)", m.mempool.Count(), BUNDLE_SIZE)
473 }
474 }
475
476 if !quiet {
477 m.logger.Printf("Creating bundle %06d from mempool", nextBundleNum)
478 }
479 operations, err := m.mempool.Take(BUNDLE_SIZE)
480 if err != nil {
481 m.mempool.Save()
482 return nil, fmt.Errorf("failed to take operations from mempool: %w", err)
483 }
484
485 bundle := m.operations.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash)
486
487 if err := m.mempool.Save(); err != nil {
488 m.logger.Printf("Warning: failed to save mempool: %v", err)
489 }
490
491 if !quiet {
492 m.logger.Printf("✓ Bundle %06d ready (%d ops, mempool: %d remaining)",
493 nextBundleNum, len(operations), m.mempool.Count())
494 }
495
496 return bundle, nil
497}
498
499// fetchToMempool fetches operations and adds them to mempool (returns error if no progress)
500func (m *Manager) fetchToMempool(ctx context.Context, afterTime string, prevBoundaryCIDs map[string]bool, target int, quiet bool) error {
501 seenCIDs := make(map[string]bool)
502
503 // Mark previous boundary CIDs as seen
504 for cid := range prevBoundaryCIDs {
505 seenCIDs[cid] = true
506 }
507
508 // Use last mempool time if available
509 if m.mempool.Count() > 0 {
510 afterTime = m.mempool.GetLastTime()
511 if !quiet {
512 m.logger.Printf(" Continuing from mempool cursor: %s", afterTime)
513 }
514 }
515
516 currentAfter := afterTime
517 maxFetches := 20
518 totalAdded := 0
519 startingCount := m.mempool.Count()
520
521 for fetchNum := 0; fetchNum < maxFetches; fetchNum++ {
522 // Calculate batch size
523 remaining := target - (m.mempool.Count() - startingCount)
524 if remaining <= 0 {
525 break
526 }
527
528 batchSize := 1000
529 if remaining < 500 {
530 batchSize = 200
531 }
532
533 if !quiet {
534 m.logger.Printf(" Fetch #%d: requesting %d operations (mempool: %d)",
535 fetchNum+1, batchSize, m.mempool.Count())
536 }
537
538 batch, err := m.plcClient.Export(ctx, plc.ExportOptions{
539 Count: batchSize,
540 After: currentAfter,
541 })
542 if err != nil {
543 m.mempool.Save()
544 return fmt.Errorf("export failed: %w", err)
545 }
546
547 if len(batch) == 0 {
548 if !quiet {
549 m.logger.Printf(" No more operations available from PLC")
550 }
551 m.mempool.Save()
552 if totalAdded > 0 {
553 return nil
554 }
555 return fmt.Errorf("no operations available")
556 }
557
558 // Deduplicate
559 uniqueOps := make([]plc.PLCOperation, 0)
560 for _, op := range batch {
561 if !seenCIDs[op.CID] {
562 seenCIDs[op.CID] = true
563 uniqueOps = append(uniqueOps, op)
564 }
565 }
566
567 if len(uniqueOps) > 0 {
568 added, err := m.mempool.Add(uniqueOps)
569 if err != nil {
570 m.mempool.Save()
571 return fmt.Errorf("chronological validation failed: %w", err)
572 }
573
574 totalAdded += added
575 if !quiet {
576 m.logger.Printf(" Added %d new operations (mempool now: %d)", added, m.mempool.Count())
577 }
578 }
579
580 // Update cursor
581 if len(batch) > 0 {
582 currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
583 }
584
585 // Stop if we got less than requested
586 if len(batch) < batchSize {
587 if !quiet {
588 m.logger.Printf(" Received incomplete batch (%d/%d), caught up to latest", len(batch), batchSize)
589 }
590 break
591 }
592 }
593
594 if totalAdded > 0 {
595 if !quiet {
596 m.logger.Printf("✓ Fetch complete: added %d operations (mempool: %d)", totalAdded, m.mempool.Count())
597 }
598 return nil
599 }
600
601 return fmt.Errorf("no new operations added")
602}
603
604// GetMempoolStats returns mempool statistics
605func (m *Manager) GetMempoolStats() map[string]interface{} {
606 return m.mempool.Stats()
607}
608
609// GetMempoolOperations returns all operations currently in mempool
610func (m *Manager) GetMempoolOperations() ([]plc.PLCOperation, error) {
611 if m.mempool == nil {
612 return nil, fmt.Errorf("mempool not initialized")
613 }
614
615 // Use Peek to get operations without removing them
616 count := m.mempool.Count()
617 if count == 0 {
618 return []plc.PLCOperation{}, nil
619 }
620
621 return m.mempool.Peek(count), nil
622}
623
624// VerifyBundle verifies a bundle's integrity
625func (m *Manager) VerifyBundle(ctx context.Context, bundleNumber int) (*VerificationResult, error) {
626 result := &VerificationResult{
627 BundleNumber: bundleNumber,
628 }
629
630 // Get from index
631 meta, err := m.index.GetBundle(bundleNumber)
632 if err != nil {
633 result.Error = err
634 return result, nil
635 }
636
637 result.ExpectedHash = meta.CompressedHash
638
639 // Check file exists
640 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
641 result.FileExists = m.operations.FileExists(path)
642 if !result.FileExists {
643 result.Error = fmt.Errorf("file not found")
644 return result, nil
645 }
646
647 // Verify BOTH compressed and content hashes
648 compHash, compSize, contentHash, contentSize, err := m.operations.CalculateFileHashes(path)
649 if err != nil {
650 result.Error = err
651 return result, nil
652 }
653
654 result.LocalHash = compHash
655
656 // Verify compressed hash
657 if compHash != meta.CompressedHash {
658 result.HashMatch = false
659 result.Valid = false
660 result.Error = fmt.Errorf("compressed hash mismatch: expected %s, got %s", meta.CompressedHash, compHash)
661 return result, nil
662 }
663
664 // Verify content hash
665 if contentHash != meta.ContentHash {
666 result.HashMatch = false
667 result.Valid = false
668 result.Error = fmt.Errorf("content hash mismatch: expected %s, got %s", meta.ContentHash, contentHash)
669 return result, nil
670 }
671
672 // Verify sizes match
673 if compSize != meta.CompressedSize {
674 result.Valid = false
675 result.Error = fmt.Errorf("compressed size mismatch: expected %d, got %d", meta.CompressedSize, compSize)
676 return result, nil
677 }
678
679 if contentSize != meta.UncompressedSize {
680 result.Valid = false
681 result.Error = fmt.Errorf("uncompressed size mismatch: expected %d, got %d", meta.UncompressedSize, contentSize)
682 return result, nil
683 }
684
685 result.HashMatch = true
686 result.Valid = true
687
688 return result, nil
689}
690
691// VerifyChain verifies the entire bundle chain
692func (m *Manager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) {
693 result := &ChainVerificationResult{
694 VerifiedBundles: make([]int, 0),
695 }
696
697 bundles := m.index.GetBundles()
698 if len(bundles) == 0 {
699 result.Valid = true
700 return result, nil
701 }
702
703 result.ChainLength = len(bundles)
704
705 for i, meta := range bundles {
706 // Verify file hash
707 vr, err := m.VerifyBundle(ctx, meta.BundleNumber)
708 if err != nil || !vr.Valid {
709 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber)
710 result.BrokenAt = meta.BundleNumber
711 return result, nil
712 }
713
714 // Verify chain link
715 if i > 0 {
716 prevMeta := bundles[i-1]
717
718 // Check parent reference
719 if meta.Parent != prevMeta.Hash {
720 result.Error = fmt.Sprintf("Chain broken at bundle %d: parent mismatch", meta.BundleNumber)
721 result.BrokenAt = meta.BundleNumber
722 return result, nil
723 }
724
725 // Verify chain hash calculation
726 expectedHash := m.operations.CalculateChainHash(prevMeta.Hash, meta.ContentHash)
727 if meta.Hash != expectedHash {
728 result.Error = fmt.Sprintf("Chain broken at bundle %d: hash mismatch", meta.BundleNumber)
729 result.BrokenAt = meta.BundleNumber
730 return result, nil
731 }
732 }
733
734 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber)
735 }
736
737 result.Valid = true
738 return result, nil
739}
740
741// ScanDirectory scans the bundle directory and rebuilds the index
742func (m *Manager) ScanDirectory() (*DirectoryScanResult, error) {
743 result := &DirectoryScanResult{
744 BundleDir: m.config.BundleDir,
745 }
746
747 m.logger.Printf("Scanning directory: %s", m.config.BundleDir)
748
749 // Find all bundle files
750 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
751 if err != nil {
752 return nil, fmt.Errorf("failed to scan directory: %w", err)
753 }
754 files = filterBundleFiles(files)
755
756 if len(files) == 0 {
757 m.logger.Printf("No bundle files found")
758 return result, nil
759 }
760
761 // Parse bundle numbers
762 var bundleNumbers []int
763 for _, file := range files {
764 base := filepath.Base(file)
765 numStr := strings.TrimSuffix(base, ".jsonl.zst")
766 num, err := strconv.Atoi(numStr)
767 if err != nil {
768 m.logger.Printf("Warning: skipping invalid filename: %s", base)
769 continue
770 }
771 bundleNumbers = append(bundleNumbers, num)
772 }
773
774 sort.Ints(bundleNumbers)
775
776 result.BundleCount = len(bundleNumbers)
777 if len(bundleNumbers) > 0 {
778 result.FirstBundle = bundleNumbers[0]
779 result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
780 }
781
782 // Find gaps
783 if len(bundleNumbers) > 1 {
784 for i := result.FirstBundle; i <= result.LastBundle; i++ {
785 found := false
786 for _, num := range bundleNumbers {
787 if num == i {
788 found = true
789 break
790 }
791 }
792 if !found {
793 result.MissingGaps = append(result.MissingGaps, i)
794 }
795 }
796 }
797
798 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
799
800 // Load each bundle and rebuild index
801 var newMetadata []*BundleMetadata
802 var totalSize int64
803
804 for _, num := range bundleNumbers {
805 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
806
807 // Load bundle
808 ops, err := m.operations.LoadBundle(path)
809 if err != nil {
810 m.logger.Printf("Warning: failed to load bundle %d: %v", num, err)
811 continue
812 }
813
814 // Get file size
815 size, _ := m.operations.GetFileSize(path)
816 totalSize += size
817
818 // Calculate parent and cursor from previous bundle
819 var parent string
820 var cursor string
821 if num > 1 && len(newMetadata) > 0 {
822 prevMeta := newMetadata[len(newMetadata)-1]
823 parent = prevMeta.Hash
824 cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
825 }
826
827 // Use the ONE method for metadata calculation
828 meta, err := m.operations.CalculateBundleMetadata(num, path, ops, parent, cursor)
829 if err != nil {
830 m.logger.Printf("Warning: failed to calculate metadata for bundle %d: %v", num, err)
831 continue
832 }
833
834 newMetadata = append(newMetadata, meta)
835
836 m.logger.Printf(" Scanned bundle %06d: %d ops, %d DIDs", num, len(ops), meta.DIDCount)
837 }
838
839 result.TotalSize = totalSize
840
841 // Rebuild index
842 m.index.Rebuild(newMetadata)
843
844 // Save index
845 if err := m.SaveIndex(); err != nil {
846 return nil, fmt.Errorf("failed to save index: %w", err)
847 }
848
849 result.IndexUpdated = true
850
851 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
852
853 return result, nil
854}
855
856// ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index
857func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int, bytesProcessed int64)) (*DirectoryScanResult, error) {
858 result := &DirectoryScanResult{
859 BundleDir: m.config.BundleDir,
860 }
861
862 m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir)
863
864 // Find all bundle files
865 files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst"))
866 if err != nil {
867 return nil, fmt.Errorf("failed to scan directory: %w", err)
868 }
869 files = filterBundleFiles(files)
870
871 if len(files) == 0 {
872 m.logger.Printf("No bundle files found")
873 return result, nil
874 }
875
876 // Parse bundle numbers
877 var bundleNumbers []int
878 for _, file := range files {
879 base := filepath.Base(file)
880 numStr := strings.TrimSuffix(base, ".jsonl.zst")
881 num, err := strconv.Atoi(numStr)
882 if err != nil {
883 m.logger.Printf("Warning: skipping invalid filename: %s", base)
884 continue
885 }
886 bundleNumbers = append(bundleNumbers, num)
887 }
888
889 sort.Ints(bundleNumbers)
890
891 result.BundleCount = len(bundleNumbers)
892 if len(bundleNumbers) > 0 {
893 result.FirstBundle = bundleNumbers[0]
894 result.LastBundle = bundleNumbers[len(bundleNumbers)-1]
895 }
896
897 // Find gaps
898 if len(bundleNumbers) > 1 {
899 for i := result.FirstBundle; i <= result.LastBundle; i++ {
900 found := false
901 for _, num := range bundleNumbers {
902 if num == i {
903 found = true
904 break
905 }
906 }
907 if !found {
908 result.MissingGaps = append(result.MissingGaps, i)
909 }
910 }
911 }
912
913 m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps))
914
915 // Process bundles in parallel
916 type bundleResult struct {
917 index int
918 meta *BundleMetadata
919 err error
920 }
921
922 jobs := make(chan int, len(bundleNumbers))
923 results := make(chan bundleResult, len(bundleNumbers))
924
925 // Start workers
926 var wg sync.WaitGroup
927 for w := 0; w < workers; w++ {
928 wg.Add(1)
929 go func() {
930 defer wg.Done()
931 for num := range jobs {
932 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
933
934 // Load and process bundle
935 ops, err := m.operations.LoadBundle(path)
936 if err != nil {
937 results <- bundleResult{index: num, err: err}
938 continue
939 }
940
941 // Use the FAST method (cursor will be set later in sequential phase)
942 meta, err := m.operations.CalculateBundleMetadataFast(num, path, ops, "")
943 if err != nil {
944 results <- bundleResult{index: num, err: err}
945 continue
946 }
947
948 results <- bundleResult{index: num, meta: meta}
949 }
950 }()
951 }
952
953 // Send jobs
954 for _, num := range bundleNumbers {
955 jobs <- num
956 }
957 close(jobs)
958
959 // Wait for all workers to finish
960 go func() {
961 wg.Wait()
962 close(results)
963 }()
964
965 // Collect results (in a map first, then sort)
966 metadataMap := make(map[int]*BundleMetadata)
967 var totalSize int64
968 var totalUncompressed int64
969 processed := 0
970
971 for result := range results {
972 processed++
973
974 // Update progress WITH bytes
975 if progressCallback != nil {
976 if result.meta != nil {
977 totalUncompressed += result.meta.UncompressedSize
978 }
979 progressCallback(processed, len(bundleNumbers), totalUncompressed)
980 }
981
982 if result.err != nil {
983 m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err)
984 continue
985 }
986 metadataMap[result.index] = result.meta
987 totalSize += result.meta.CompressedSize
988 }
989
990 // Build ordered metadata slice and calculate chain hashes
991 var newMetadata []*BundleMetadata
992 var parent string // Parent chain hash
993
994 for i, num := range bundleNumbers {
995 meta, ok := metadataMap[num]
996 if !ok {
997 continue // Skip failed bundles
998 }
999
1000 // Set cursor from previous bundle's EndTime
1001 if i > 0 && len(newMetadata) > 0 {
1002 prevMeta := newMetadata[len(newMetadata)-1]
1003 meta.Cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
1004 }
1005
1006 // Now calculate chain hash (must be done sequentially)
1007 meta.Hash = m.operations.CalculateChainHash(parent, meta.ContentHash)
1008 meta.Parent = parent
1009
1010 newMetadata = append(newMetadata, meta)
1011 parent = meta.Hash // Store for next iteration
1012 }
1013
1014 result.TotalSize = totalSize
1015 result.TotalUncompressed = totalUncompressed
1016
1017 // Rebuild index
1018 m.index.Rebuild(newMetadata)
1019
1020 // Save index
1021 if err := m.SaveIndex(); err != nil {
1022 return nil, fmt.Errorf("failed to save index: %w", err)
1023 }
1024
1025 result.IndexUpdated = true
1026
1027 m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata))
1028
1029 return result, nil
1030}
1031
1032// GetInfo returns information about the bundle manager
1033func (m *Manager) GetInfo() map[string]interface{} {
1034 stats := m.index.GetStats()
1035 stats["bundle_dir"] = m.config.BundleDir
1036 stats["index_path"] = m.indexPath
1037 stats["verify_on_load"] = m.config.VerifyOnLoad
1038 return stats
1039}
1040
1041// ExportOperations exports operations from bundles
1042func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plc.PLCOperation, error) {
1043 if count <= 0 {
1044 count = 1000
1045 }
1046
1047 var result []plc.PLCOperation
1048 seenCIDs := make(map[string]bool)
1049
1050 bundles := m.index.GetBundles()
1051
1052 for _, meta := range bundles {
1053 if result != nil && len(result) >= count {
1054 break
1055 }
1056
1057 // Skip bundles before afterTime
1058 if !afterTime.IsZero() && meta.EndTime.Before(afterTime) {
1059 continue
1060 }
1061
1062 // Load bundle
1063 bundle, err := m.LoadBundle(ctx, meta.BundleNumber)
1064 if err != nil {
1065 m.logger.Printf("Warning: failed to load bundle %d: %v", meta.BundleNumber, err)
1066 continue
1067 }
1068
1069 // Add operations
1070 for _, op := range bundle.Operations {
1071 if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) {
1072 continue
1073 }
1074
1075 if seenCIDs[op.CID] {
1076 continue
1077 }
1078
1079 seenCIDs[op.CID] = true
1080 result = append(result, op)
1081
1082 if len(result) >= count {
1083 break
1084 }
1085 }
1086 }
1087
1088 return result, nil
1089}
1090
1091// ScanBundle scans a single bundle file and returns its metadata
1092func (m *Manager) ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) {
1093 // Load bundle file
1094 operations, err := m.operations.LoadBundle(path)
1095 if err != nil {
1096 return nil, fmt.Errorf("failed to load bundle: %w", err)
1097 }
1098
1099 if len(operations) == 0 {
1100 return nil, fmt.Errorf("bundle is empty")
1101 }
1102
1103 // Get parent chain hash and cursor from previous bundle
1104 var parent string
1105 var cursor string
1106 if bundleNumber > 1 {
1107 if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil {
1108 parent = prevMeta.Hash
1109 cursor = prevMeta.EndTime.Format(time.RFC3339Nano)
1110 }
1111 }
1112
1113 // Use the ONE method
1114 return m.operations.CalculateBundleMetadata(bundleNumber, path, operations, parent, cursor)
1115}
1116
1117// ScanAndIndexBundle scans a bundle file and adds it to the index
1118func (m *Manager) ScanAndIndexBundle(path string, bundleNumber int) (*BundleMetadata, error) {
1119 meta, err := m.ScanBundle(path, bundleNumber)
1120 if err != nil {
1121 return nil, err
1122 }
1123
1124 // Add to index
1125 m.index.AddBundle(meta)
1126
1127 // Save index
1128 if err := m.SaveIndex(); err != nil {
1129 return nil, fmt.Errorf("failed to save index: %w", err)
1130 }
1131
1132 return meta, nil
1133}
1134
1135// IsBundleIndexed checks if a bundle is already in the index
1136func (m *Manager) IsBundleIndexed(bundleNumber int) bool {
1137 _, err := m.index.GetBundle(bundleNumber)
1138 return err == nil
1139}
1140
1141// RefreshMempool reloads mempool from disk (useful for debugging)
1142func (m *Manager) RefreshMempool() error {
1143 if m.mempool == nil {
1144 return fmt.Errorf("mempool not initialized")
1145 }
1146 return m.mempool.Load()
1147}
1148
1149// ClearMempool clears all operations from the mempool and saves
1150func (m *Manager) ClearMempool() error {
1151 if m.mempool == nil {
1152 return fmt.Errorf("mempool not initialized")
1153 }
1154
1155 m.logger.Printf("Clearing mempool...")
1156
1157 // Get count before clearing
1158 count := m.mempool.Count()
1159
1160 // Clear the mempool
1161 m.mempool.Clear()
1162
1163 // Save the empty state (this will delete the file since it's empty)
1164 if err := m.mempool.Save(); err != nil {
1165 return fmt.Errorf("failed to save mempool: %w", err)
1166 }
1167
1168 m.logger.Printf("Cleared %d operations from mempool", count)
1169
1170 return nil
1171}
1172
1173// Add validation method
1174func (m *Manager) ValidateMempool() error {
1175 if m.mempool == nil {
1176 return fmt.Errorf("mempool not initialized")
1177 }
1178 return m.mempool.Validate()
1179}
1180
1181// StreamBundleRaw streams the raw compressed bundle file
1182func (m *Manager) StreamBundleRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
1183 // Get metadata from index
1184 meta, err := m.index.GetBundle(bundleNumber)
1185 if err != nil {
1186 return nil, fmt.Errorf("bundle not in index: %w", err)
1187 }
1188
1189 // Build file path
1190 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
1191 if !m.operations.FileExists(path) {
1192 return nil, fmt.Errorf("bundle file not found: %s", path)
1193 }
1194
1195 // Optionally verify hash before streaming
1196 if m.config.VerifyOnLoad {
1197 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
1198 if err != nil {
1199 return nil, fmt.Errorf("failed to verify hash: %w", err)
1200 }
1201 if !valid {
1202 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
1203 }
1204 }
1205
1206 return m.operations.StreamRaw(path)
1207}
1208
1209// StreamBundleDecompressed streams the decompressed bundle data as JSONL
1210func (m *Manager) StreamBundleDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
1211 // Get metadata from index
1212 _, err := m.index.GetBundle(bundleNumber)
1213 if err != nil {
1214 return nil, fmt.Errorf("bundle not in index: %w", err)
1215 }
1216
1217 // Build file path
1218 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
1219 if !m.operations.FileExists(path) {
1220 return nil, fmt.Errorf("bundle file not found: %s", path)
1221 }
1222
1223 return m.operations.StreamDecompressed(path)
1224}
1225
1226// RefreshIndex reloads the index from disk if it has been modified
1227func (m *Manager) RefreshIndex() error {
1228 // Check if index file has been modified
1229 info, err := os.Stat(m.indexPath)
1230 if err != nil {
1231 return err
1232 }
1233
1234 // If index was modified after we loaded it, reload
1235 if info.ModTime().After(m.index.UpdatedAt) {
1236 m.logger.Printf("Index file modified, reloading...")
1237
1238 newIndex, err := LoadIndex(m.indexPath)
1239 if err != nil {
1240 return fmt.Errorf("failed to reload index: %w", err)
1241 }
1242
1243 m.index = newIndex
1244 m.logger.Printf("Index reloaded: %d bundles", m.index.Count())
1245 }
1246
1247 return nil
1248}
1249
1250// filterBundleFiles filters out files starting with . or _ (system/temp files)
1251func filterBundleFiles(files []string) []string {
1252 filtered := make([]string, 0, len(files))
1253 for _, file := range files {
1254 basename := filepath.Base(file)
1255 // Skip files starting with . or _
1256 if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') {
1257 continue
1258 }
1259 filtered = append(filtered, file)
1260 }
1261 return filtered
1262}
1263
1264// GetMempool returns the current mempool (for manual save operations)
1265func (m *Manager) GetMempool() *Mempool {
1266 return m.mempool
1267}
1268
1269// SaveMempool saves the current mempool state to disk
1270func (m *Manager) SaveMempool() error {
1271 if m.mempool == nil {
1272 return fmt.Errorf("mempool not initialized")
1273 }
1274 return m.mempool.Save()
1275}
1276
1277// GetPLCOrigin returns the PLC directory origin URL (empty if not configured)
1278func (m *Manager) GetPLCOrigin() string {
1279 if m.plcClient == nil {
1280 return ""
1281 }
1282 return m.plcClient.GetBaseURL()
1283}
1284
1285// GetCurrentCursor returns the current latest cursor position (including mempool)
1286// Cursor format: (bundleNumber × BUNDLE_SIZE) + position
1287func (m *Manager) GetCurrentCursor() int {
1288 index := m.GetIndex()
1289 bundles := index.GetBundles()
1290 cursor := len(bundles) * BUNDLE_SIZE
1291
1292 // Add mempool operations to get true latest position
1293 mempoolStats := m.GetMempoolStats()
1294 if count, ok := mempoolStats["count"].(int); ok {
1295 cursor += count
1296 }
1297
1298 return cursor
1299}
1300
1301// GetDIDIndex returns the DID index manager
1302func (m *Manager) GetDIDIndex() *DIDIndexManager {
1303 return m.didIndex
1304}
1305
1306// LoadOperation loads a single operation from a bundle efficiently
1307// This is much faster than LoadBundle() when you only need one operation
1308func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plc.PLCOperation, error) {
1309 // Validate bundle exists in index
1310 meta, err := m.index.GetBundle(bundleNumber)
1311 if err != nil {
1312 return nil, fmt.Errorf("bundle not in index: %w", err)
1313 }
1314
1315 // Validate position
1316 if position < 0 || position >= BUNDLE_SIZE {
1317 return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, BUNDLE_SIZE-1)
1318 }
1319
1320 // Build file path
1321 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
1322 if !m.operations.FileExists(path) {
1323 return nil, fmt.Errorf("bundle file not found: %s", path)
1324 }
1325
1326 // Verify hash if enabled (same as LoadBundle)
1327 if m.config.VerifyOnLoad {
1328 valid, actualHash, err := m.operations.VerifyHash(path, meta.CompressedHash)
1329 if err != nil {
1330 return nil, fmt.Errorf("failed to verify hash: %w", err)
1331 }
1332 if !valid {
1333 return nil, fmt.Errorf("hash mismatch: expected %s, got %s", meta.CompressedHash, actualHash)
1334 }
1335 }
1336
1337 // Load just the one operation (efficient!)
1338 return m.operations.LoadOperationAtPosition(path, position)
1339}