+319
-437
internal/plc/bundle.go
+319
-437
internal/plc/bundle.go
···
17
"github.com/klauspost/compress/zstd"
18
)
19
20
-
// BUNDLE_SIZE is the number of operations per bundle
21
const BUNDLE_SIZE = 10000
22
23
type BundleManager struct {
···
27
decoder *zstd.Decoder
28
db storage.Database
29
}
30
31
func NewBundleManager(dir string, enabled bool, db storage.Database) (*BundleManager, error) {
32
if !enabled {
···
49
50
return &BundleManager{
51
dir: dir,
52
-
enabled: true,
53
encoder: encoder,
54
decoder: decoder,
55
db: db,
···
65
}
66
}
67
68
-
// GetBundleFilename returns filename for bundle number (6-digit decimal, JSONL format)
69
-
func (bm *BundleManager) GetBundleFilename(bundleNumber int) string {
70
-
return fmt.Sprintf("%06d.jsonl.zst", bundleNumber)
71
}
72
73
-
// GetBundlePath returns full path for bundle number
74
-
func (bm *BundleManager) GetBundlePath(bundleNumber int) string {
75
-
return filepath.Join(bm.dir, bm.GetBundleFilename(bundleNumber))
76
}
77
78
-
// BundleExists checks if bundle file exists locally
79
-
func (bm *BundleManager) BundleExists(bundleNumber int) bool {
80
-
_, err := os.Stat(bm.GetBundlePath(bundleNumber))
81
return err == nil
82
}
83
84
-
// LoadBundle returns exactly 1000 unique operations by fetching additional batches if needed
85
-
func (bm *BundleManager) LoadBundle(ctx context.Context, bundleNumber int, plcClient *Client) ([]PLCOperation, bool, error) {
86
-
if !bm.enabled {
87
-
return nil, false, fmt.Errorf("bundle manager disabled")
88
}
89
90
-
path := bm.GetBundlePath(bundleNumber)
91
92
-
// Try to load from local file first
93
-
if bm.BundleExists(bundleNumber) {
94
-
log.Verbose("→ Loading bundle %06d from local file", bundleNumber)
95
96
-
// Check if bundle exists in database
97
-
dbBundle, dbErr := bm.db.GetBundleByNumber(ctx, bundleNumber)
98
-
bundleInDB := dbErr == nil && dbBundle != nil
99
100
-
if bundleInDB {
101
-
// Verify compressed file hash
102
-
if dbBundle.CompressedHash != "" {
103
-
valid, err := bm.verifyBundleHash(path, dbBundle.CompressedHash)
104
-
if err != nil {
105
-
log.Error("Warning: failed to verify compressed hash for bundle %06d: %v", bundleNumber, err)
106
-
} else if !valid {
107
-
log.Error("⚠ Compressed hash mismatch for bundle %06d! Re-fetching...", bundleNumber)
108
-
os.Remove(path)
109
-
return bm.LoadBundle(ctx, bundleNumber, plcClient)
110
-
} else {
111
-
log.Verbose("✓ Hash verified for bundle %06d", bundleNumber)
112
-
}
113
-
}
114
-
}
115
116
-
// Load operations from file
117
-
operations, err := bm.loadBundleFromFile(path)
118
-
if err != nil {
119
-
return nil, false, fmt.Errorf("failed to load bundle from file: %w", err)
120
-
}
121
122
-
// If not in database, index it now
123
-
if !bundleInDB {
124
-
// Calculate both hashes
125
-
fileData, err := os.ReadFile(path)
126
-
if err != nil {
127
-
log.Error("Warning: failed to read file: %v", err)
128
-
} else {
129
-
compressedHash := bm.calculateHash(fileData)
130
131
-
// Calculate uncompressed hash
132
-
var jsonlData []byte
133
-
for _, op := range operations {
134
-
jsonlData = append(jsonlData, op.RawJSON...)
135
-
jsonlData = append(jsonlData, '\n')
136
-
}
137
-
uncompressedHash := bm.calculateHash(jsonlData)
138
139
-
if err := bm.indexBundleWithHash(ctx, bundleNumber, operations, path, uncompressedHash, compressedHash); err != nil {
140
-
log.Error("Warning: failed to index bundle: %v", err)
141
-
} else {
142
-
log.Info("✓ Indexed bundle %06d", bundleNumber)
143
-
}
144
-
}
145
}
146
-
147
-
// If loaded from disk, it's always complete
148
-
return operations, true, nil
149
}
150
151
-
// Bundle doesn't exist locally - fetch from PLC directory
152
-
log.Info("→ Bundle %06d not found locally, fetching from PLC directory...", bundleNumber)
153
-
154
-
var afterTimestamp string
155
-
var prevBoundaryCIDs map[string]bool
156
157
-
if bundleNumber > 1 {
158
-
prevBundle, err := bm.db.GetBundleByNumber(ctx, bundleNumber-1)
159
-
if err == nil && prevBundle != nil {
160
-
afterTimestamp = prevBundle.EndTime.Format(time.RFC3339Nano)
161
-
162
-
// Get boundary CIDs from previous bundle
163
-
if len(prevBundle.BoundaryCIDs) > 0 {
164
-
prevBoundaryCIDs = make(map[string]bool)
165
-
for _, cid := range prevBundle.BoundaryCIDs {
166
-
prevBoundaryCIDs[cid] = true
167
-
}
168
-
log.Verbose(" Using %d boundary CIDs from previous bundle", len(prevBoundaryCIDs))
169
-
} else {
170
-
// Fallback: load previous bundle's operations
171
-
prevPath := bm.GetBundlePath(bundleNumber - 1)
172
-
if bm.BundleExists(bundleNumber - 1) {
173
-
prevOps, err := bm.loadBundleFromFile(prevPath)
174
-
if err == nil {
175
-
_, prevBoundaryCIDs = GetBoundaryCIDs(prevOps)
176
-
log.Verbose(" Computed %d boundary CIDs from previous bundle file", len(prevBoundaryCIDs))
177
-
}
178
-
}
179
-
}
180
-
}
181
}
182
183
-
// Collect operations until we have exactly BUNDLE_SIZE unique ones
184
-
var allOperations []PLCOperation
185
-
seenCIDs := make(map[string]bool)
186
187
-
// Track what we've already seen from previous bundle
188
for cid := range prevBoundaryCIDs {
189
-
seenCIDs[cid] = true
190
}
191
192
-
currentAfter := afterTimestamp
193
-
194
-
// Scale maxFetches based on bundle size
195
-
// Assume worst case: 90% dedup rate, need buffer
196
-
maxFetches := (BUNDLE_SIZE / 900) + 5 // For 10k: ~16 fetches, for 1k: ~6 fetches
197
-
fetchCount := 0
198
-
199
-
for len(allOperations) < BUNDLE_SIZE && fetchCount < maxFetches {
200
-
fetchCount++
201
-
202
-
// Calculate how many more operations we need
203
-
remaining := BUNDLE_SIZE - len(allOperations)
204
-
205
-
// Determine fetch size based on remaining operations
206
-
var fetchSize int
207
-
if fetchCount == 1 {
208
-
// First fetch: always get 1000 (PLC limit)
209
-
fetchSize = 1000
210
-
} else if remaining < 100 {
211
-
// Need less than 100: fetch 50
212
-
fetchSize = 50
213
-
} else if remaining < 500 {
214
-
// Need 100-500: fetch 200
215
-
fetchSize = 200
216
-
} else {
217
-
// Need 500+: fetch 1000
218
-
fetchSize = 1000
219
-
}
220
221
-
// Fetch next batch
222
-
log.Verbose(" Fetch #%d: need %d more, requesting %d", fetchCount, remaining, fetchSize)
223
224
-
rawOperations, err := bm.fetchBundleFromPLCWithCount(ctx, plcClient, currentAfter, fetchSize)
225
-
if err != nil {
226
-
return nil, false, fmt.Errorf("failed to fetch bundle from PLC: %w", err)
227
-
}
228
229
-
if len(rawOperations) == 0 {
230
-
// No more data available
231
-
log.Info(" No more operations available after %d fetches (got %d/%d)",
232
-
fetchCount, len(allOperations), BUNDLE_SIZE)
233
-
break
234
-
}
235
236
-
log.Verbose(" Got %d raw operations", len(rawOperations))
237
238
-
// Filter out duplicates and add unique operations
239
-
newOpsAdded := 0
240
-
for _, op := range rawOperations {
241
-
if !seenCIDs[op.CID] {
242
-
seenCIDs[op.CID] = true
243
-
allOperations = append(allOperations, op)
244
-
newOpsAdded++
245
246
-
if len(allOperations) >= BUNDLE_SIZE {
247
-
break
248
}
249
}
250
}
251
252
-
log.Verbose(" Added %d unique operations (total: %d/%d, %d dupes)",
253
-
newOpsAdded, len(allOperations), BUNDLE_SIZE, len(rawOperations)-newOpsAdded)
254
-
255
-
// If we added no new operations, we're stuck
256
-
if newOpsAdded == 0 {
257
-
log.Error(" No new unique operations found, stopping")
258
break
259
}
260
261
-
// Update cursor for next fetch
262
-
if len(rawOperations) > 0 {
263
-
lastOp := rawOperations[len(rawOperations)-1]
264
-
currentAfter = lastOp.CreatedAt.Format(time.RFC3339Nano)
265
-
}
266
267
-
// If PLC returned less than requested, we've reached the end
268
-
if len(rawOperations) < fetchSize {
269
-
log.Info(" Reached end of PLC data (got %d < %d requested)", len(rawOperations), fetchSize)
270
-
break
271
-
}
272
}
273
-
274
-
// Warn if we hit the fetch limit
275
-
if fetchCount >= maxFetches {
276
-
log.Verbose(" ⚠ Hit maxFetches limit (%d) with only %d/%d operations",
277
-
maxFetches, len(allOperations), BUNDLE_SIZE)
278
}
279
280
-
// Check if we got exactly BUNDLE_SIZE operations
281
-
isComplete := len(allOperations) >= BUNDLE_SIZE
282
283
-
if len(allOperations) > BUNDLE_SIZE {
284
-
// Trim to exactly BUNDLE_SIZE
285
-
log.Verbose(" Trimming from %d to %d operations", len(allOperations), BUNDLE_SIZE)
286
-
allOperations = allOperations[:BUNDLE_SIZE]
287
}
288
289
-
log.Info(" Collected %d unique operations after %d fetches (complete=%v, target=%d)",
290
-
len(allOperations), fetchCount, isComplete, BUNDLE_SIZE)
291
-
292
-
// Only save as bundle if complete
293
-
if isComplete {
294
-
// Save bundle with both hashes
295
-
uncompressedHash, compressedHash, err := bm.saveBundleFileWithHash(path, allOperations)
296
-
if err != nil {
297
-
log.Error("Warning: failed to save bundle file: %v", err)
298
-
} else {
299
-
// Index with both hashes
300
-
if err := bm.indexBundleWithHash(ctx, bundleNumber, allOperations, path, uncompressedHash, compressedHash); err != nil {
301
-
log.Error("Warning: failed to index bundle: %v", err)
302
-
} else {
303
-
log.Info("✓ Bundle %06d saved [%d ops, hash: %s, compressed: %s]",
304
-
bundleNumber, len(allOperations), uncompressedHash[:16]+"...", compressedHash[:16]+"...")
305
-
}
306
-
}
307
}
308
309
-
return allOperations, isComplete, nil
310
}
311
312
-
// fetchBundleFromPLCWithCount fetches operations with a specific count
313
-
func (bm *BundleManager) fetchBundleFromPLCWithCount(ctx context.Context, client *Client, afterTimestamp string, count int) ([]PLCOperation, error) {
314
-
return client.Export(ctx, ExportOptions{
315
-
Count: count,
316
-
After: afterTimestamp,
317
-
})
318
-
}
319
320
-
// saveBundleFileWithHash - NO trailing newline
321
-
func (bm *BundleManager) saveBundleFileWithHash(path string, operations []PLCOperation) (string, string, error) {
322
-
var jsonlData []byte
323
-
for _, op := range operations {
324
-
jsonlData = append(jsonlData, op.RawJSON...)
325
-
jsonlData = append(jsonlData, '\n')
326
}
327
328
-
uncompressedHash := bm.calculateHash(jsonlData)
329
-
compressed := bm.encoder.EncodeAll(jsonlData, nil)
330
-
compressedHash := bm.calculateHash(compressed)
331
332
-
if err := os.WriteFile(path, compressed, 0644); err != nil {
333
-
return "", "", err
334
}
335
336
-
return uncompressedHash, compressedHash, nil
337
}
338
339
-
// fetchBundleFromPLC fetches operations from PLC directory (returns RAW operations)
340
-
func (bm *BundleManager) fetchBundleFromPLC(ctx context.Context, client *Client, afterTimestamp string) ([]PLCOperation, error) {
341
-
// Just fetch - no deduplication here
342
-
return client.Export(ctx, ExportOptions{
343
-
Count: 1000,
344
-
After: afterTimestamp,
345
-
})
346
-
}
347
348
-
// StripBoundaryDuplicates removes operations that were already seen on the previous page
349
-
// This is exported so it can be used in verification
350
-
func StripBoundaryDuplicates(operations []PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []PLCOperation {
351
-
if len(operations) == 0 {
352
-
return operations
353
}
354
355
-
boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp)
356
-
if err != nil {
357
-
return operations
358
}
359
360
-
// Skip operations at the start that match the boundary
361
-
startIdx := 0
362
-
for startIdx < len(operations) {
363
-
op := operations[startIdx]
364
-
365
-
// If timestamp is AFTER boundary, we're past duplicates
366
-
if op.CreatedAt.After(boundaryTime) {
367
-
break
368
-
}
369
-
370
-
// Same timestamp - check if we've seen this CID before
371
-
if op.CreatedAt.Equal(boundaryTime) {
372
-
if prevBoundaryCIDs[op.CID] {
373
-
// This is a duplicate, skip it
374
-
startIdx++
375
-
continue
376
-
}
377
-
// Same timestamp but new CID - keep it
378
-
break
379
-
}
380
-
381
-
// Earlier timestamp (shouldn't happen)
382
-
break
383
}
384
385
-
return operations[startIdx:]
386
}
387
388
-
// Keep the private version for internal use
389
-
func stripBoundaryDuplicates(operations []PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []PLCOperation {
390
-
return StripBoundaryDuplicates(operations, boundaryTimestamp, prevBoundaryCIDs)
391
-
}
392
393
-
// GetBoundaryCIDs returns all CIDs that share the same timestamp as the last operation
394
-
func GetBoundaryCIDs(operations []PLCOperation) (time.Time, map[string]bool) {
395
-
if len(operations) == 0 {
396
-
return time.Time{}, nil
397
-
}
398
399
-
lastOp := operations[len(operations)-1]
400
-
boundaryTime := lastOp.CreatedAt
401
-
cidSet := make(map[string]bool)
402
403
-
// Walk backwards from the end, collecting all CIDs with the same timestamp
404
-
for i := len(operations) - 1; i >= 0; i-- {
405
-
op := operations[i]
406
-
if op.CreatedAt.Equal(boundaryTime) {
407
-
cidSet[op.CID] = true
408
} else {
409
-
// Different timestamp, we're done
410
-
break
411
}
412
}
413
414
-
return boundaryTime, cidSet
415
-
}
416
-
417
-
// saveBundleFile (keep for compatibility, calls saveBundleFileWithHash)
418
-
func (bm *BundleManager) saveBundleFile(path string, operations []PLCOperation) error {
419
-
_, _, err := bm.saveBundleFileWithHash(path, operations) // ✅ All 3 values
420
-
return err
421
}
422
423
-
// loadBundleFromFile loads operations from bundle file (JSONL format)
424
-
func (bm *BundleManager) loadBundleFromFile(path string) ([]PLCOperation, error) {
425
-
// Read compressed file
426
-
compressedData, err := os.ReadFile(path)
427
-
if err != nil {
428
-
return nil, fmt.Errorf("failed to read bundle file: %w", err)
429
}
430
431
-
// Decompress
432
-
decompressed, err := bm.decoder.DecodeAll(compressedData, nil)
433
if err != nil {
434
-
return nil, fmt.Errorf("failed to decompress bundle: %w", err)
435
}
436
437
-
// Parse JSONL (newline-delimited JSON)
438
-
var operations []PLCOperation
439
-
scanner := bufio.NewScanner(bytes.NewReader(decompressed))
440
441
-
lineNum := 0
442
-
for scanner.Scan() {
443
-
lineNum++
444
-
line := scanner.Bytes()
445
-
446
-
// Skip empty lines
447
-
if len(line) == 0 {
448
-
continue
449
}
450
451
-
var op PLCOperation
452
-
if err := json.Unmarshal(line, &op); err != nil {
453
-
return nil, fmt.Errorf("failed to parse operation on line %d: %w", lineNum, err)
454
}
455
-
456
-
// CRITICAL: Store the original raw JSON bytes
457
-
op.RawJSON = make([]byte, len(line))
458
-
copy(op.RawJSON, line)
459
-
460
-
operations = append(operations, op)
461
}
462
463
-
if err := scanner.Err(); err != nil {
464
-
return nil, fmt.Errorf("error reading JSONL: %w", err)
465
-
}
466
-
467
-
return operations, nil
468
}
469
470
-
// indexBundleWithHash stores bundle with both hashes
471
-
func (bm *BundleManager) indexBundleWithHash(ctx context.Context, bundleNumber int, operations []PLCOperation, path string, uncompressedHash, compressedHash string) error {
472
-
// Get previous bundle's hash (uncompressed)
473
-
var prevBundleHash string
474
-
if bundleNumber > 1 {
475
-
prevBundle, err := bm.db.GetBundleByNumber(ctx, bundleNumber-1)
476
-
if err == nil && prevBundle != nil {
477
-
prevBundleHash = prevBundle.Hash // Use uncompressed hash for chain
478
-
log.Verbose(" Linking to previous bundle %06d (hash: %s)", bundleNumber-1, prevBundleHash[:16]+"...")
479
}
480
}
481
482
-
// Extract unique DIDs
483
-
didSet := make(map[string]bool)
484
-
for _, op := range operations {
485
-
didSet[op.DID] = true
486
-
}
487
-
488
-
dids := make([]string, 0, len(didSet))
489
-
for did := range didSet {
490
-
dids = append(dids, did)
491
-
}
492
-
493
-
// Get compressed file size
494
-
fileInfo, _ := os.Stat(path)
495
-
compressedSize := int64(0)
496
-
if fileInfo != nil {
497
-
compressedSize = fileInfo.Size()
498
-
}
499
500
bundle := &storage.PLCBundle{
501
-
BundleNumber: bundleNumber,
502
-
StartTime: operations[0].CreatedAt,
503
-
EndTime: operations[len(operations)-1].CreatedAt,
504
DIDs: dids,
505
-
Hash: uncompressedHash, // Primary hash (JSONL)
506
-
CompressedHash: compressedHash, // File integrity hash
507
-
CompressedSize: compressedSize, // Compressed size
508
-
PrevBundleHash: prevBundleHash, // Chain link
509
Compressed: true,
510
CreatedAt: time.Now(),
511
}
···
513
return bm.db.CreateBundle(ctx, bundle)
514
}
515
516
-
// indexBundle (keep for compatibility) - FIX: Calculate both hashes
517
-
func (bm *BundleManager) indexBundle(ctx context.Context, bundleNumber int, operations []PLCOperation, path string) error {
518
-
// Calculate compressed hash from file
519
-
fileData, err := os.ReadFile(path)
520
-
if err != nil {
521
-
return err
522
}
523
-
compressedHash := bm.calculateHash(fileData)
524
525
-
// Calculate uncompressed hash from operations
526
-
var jsonlData []byte
527
-
for _, op := range operations {
528
-
jsonlData = append(jsonlData, op.RawJSON...)
529
-
jsonlData = append(jsonlData, '\n')
530
}
531
-
uncompressedHash := bm.calculateHash(jsonlData)
532
-
533
-
return bm.indexBundleWithHash(ctx, bundleNumber, operations, path, uncompressedHash, compressedHash)
534
}
535
536
-
// Update CreateBundleFromMempool
537
func (bm *BundleManager) CreateBundleFromMempool(ctx context.Context, operations []PLCOperation) (int, error) {
538
if !bm.enabled {
539
return 0, fmt.Errorf("bundle manager disabled")
···
547
if err != nil {
548
return 0, err
549
}
550
-
bundleNumber := lastBundle + 1
551
552
-
path := bm.GetBundlePath(bundleNumber)
553
554
-
// Save bundle with both hashes
555
-
uncompressedHash, compressedHash, err := bm.saveBundleFileWithHash(path, operations)
556
-
if err != nil {
557
return 0, err
558
}
559
560
-
// Index bundle
561
-
if err := bm.indexBundleWithHash(ctx, bundleNumber, operations, path, uncompressedHash, compressedHash); err != nil {
562
return 0, err
563
}
564
565
-
log.Info("✓ Created bundle %06d from mempool (hash: %s)",
566
-
bundleNumber, uncompressedHash[:16]+"...")
567
-
568
-
return bundleNumber, nil
569
-
}
570
-
571
-
// EnsureBundleContinuity checks that all bundles from 1 to N exist
572
-
func (bm *BundleManager) EnsureBundleContinuity(ctx context.Context, targetBundle int) error {
573
-
if !bm.enabled {
574
-
return nil
575
-
}
576
-
577
-
for i := 1; i < targetBundle; i++ {
578
-
if !bm.BundleExists(i) {
579
-
// Check if in database
580
-
_, err := bm.db.GetBundleByNumber(ctx, i)
581
-
if err != nil {
582
-
return fmt.Errorf("bundle %06d is missing (required for continuity)", i)
583
-
}
584
-
}
585
-
}
586
-
587
-
return nil
588
-
}
589
590
-
// GetStats returns bundle statistics
591
-
func (bm *BundleManager) GetStats(ctx context.Context) (int64, int64, error) {
592
-
if !bm.enabled {
593
-
return 0, 0, nil
594
-
}
595
-
return bm.db.GetBundleStats(ctx)
596
}
597
598
-
// calculateHash computes SHA256 hash of data
599
-
func (bm *BundleManager) calculateHash(data []byte) string {
600
-
hash := sha256.Sum256(data)
601
-
return hex.EncodeToString(hash[:])
602
-
}
603
604
-
// verifyBundleHash checks if file hash matches expected hash
605
-
func (bm *BundleManager) verifyBundleHash(path string, expectedHash string) (bool, error) {
606
-
data, err := os.ReadFile(path)
607
-
if err != nil {
608
-
return false, err
609
-
}
610
-
611
-
actualHash := bm.calculateHash(data)
612
-
return actualHash == expectedHash, nil
613
-
}
614
-
615
-
// VerifyChain - FIX
616
func (bm *BundleManager) VerifyChain(ctx context.Context, endBundle int) error {
617
if !bm.enabled {
618
return fmt.Errorf("bundle manager disabled")
···
626
return fmt.Errorf("bundle %06d not found: %w", i, err)
627
}
628
629
-
// Compute file path
630
-
filePath := bm.GetBundlePath(i)
631
-
632
// Verify file hash
633
-
valid, err := bm.verifyBundleHash(filePath, bundle.CompressedHash)
634
-
if err != nil {
635
return fmt.Errorf("bundle %06d hash verification failed: %w", i, err)
636
-
}
637
-
if !valid {
638
-
return fmt.Errorf("bundle %06d compressed hash mismatch!", i)
639
}
640
641
// Verify chain link
···
660
return nil
661
}
662
663
-
// GetChainInfo returns information about the bundle chain
664
func (bm *BundleManager) GetChainInfo(ctx context.Context) (map[string]interface{}, error) {
665
lastBundle, err := bm.db.GetLastBundleNumber(ctx)
666
if err != nil {
···
674
}, nil
675
}
676
677
-
// Quick check first and last
678
-
firstBundle, err := bm.db.GetBundleByNumber(ctx, 1)
679
-
if err != nil {
680
-
return nil, err
681
-
}
682
-
683
-
lastBundleData, err := bm.db.GetBundleByNumber(ctx, lastBundle)
684
-
if err != nil {
685
-
return nil, err
686
-
}
687
688
return map[string]interface{}{
689
"chain_length": lastBundle,
···
694
"chain_head_hash": lastBundleData.Hash,
695
}, nil
696
}
···
17
"github.com/klauspost/compress/zstd"
18
)
19
20
const BUNDLE_SIZE = 10000
21
22
type BundleManager struct {
···
26
decoder *zstd.Decoder
27
db storage.Database
28
}
29
+
30
+
// ===== INITIALIZATION =====
31
32
func NewBundleManager(dir string, enabled bool, db storage.Database) (*BundleManager, error) {
33
if !enabled {
···
50
51
return &BundleManager{
52
dir: dir,
53
+
enabled: enabled,
54
encoder: encoder,
55
decoder: decoder,
56
db: db,
···
66
}
67
}
68
69
+
// ===== BUNDLE FILE ABSTRACTION =====
70
+
71
+
type bundleFile struct {
72
+
path string
73
+
operations []PLCOperation
74
+
uncompressedHash string
75
+
compressedHash string
76
}
77
78
+
func (bm *BundleManager) newBundleFile(bundleNum int) *bundleFile {
79
+
return &bundleFile{
80
+
path: filepath.Join(bm.dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)),
81
+
}
82
}
83
84
+
func (bf *bundleFile) exists() bool {
85
+
_, err := os.Stat(bf.path)
86
return err == nil
87
}
88
89
+
func (bm *BundleManager) load(bf *bundleFile) error {
90
+
compressed, err := os.ReadFile(bf.path)
91
+
if err != nil {
92
+
return fmt.Errorf("read failed: %w", err)
93
}
94
95
+
decompressed, err := bm.decoder.DecodeAll(compressed, nil)
96
+
if err != nil {
97
+
return fmt.Errorf("decompress failed: %w", err)
98
+
}
99
100
+
bf.operations = bm.parseJSONL(decompressed)
101
+
return nil
102
+
}
103
104
+
func (bm *BundleManager) save(bf *bundleFile) error {
105
+
jsonlData := bm.serializeJSONL(bf.operations)
106
+
bf.uncompressedHash = bm.hash(jsonlData)
107
108
+
compressed := bm.encoder.EncodeAll(jsonlData, nil)
109
+
bf.compressedHash = bm.hash(compressed)
110
111
+
return os.WriteFile(bf.path, compressed, 0644)
112
+
}
113
114
+
func (bm *BundleManager) parseJSONL(data []byte) []PLCOperation {
115
+
var ops []PLCOperation
116
+
scanner := bufio.NewScanner(bytes.NewReader(data))
117
118
+
for scanner.Scan() {
119
+
line := scanner.Bytes()
120
+
if len(line) == 0 {
121
+
continue
122
+
}
123
124
+
var op PLCOperation
125
+
if err := json.Unmarshal(line, &op); err == nil {
126
+
op.RawJSON = append([]byte(nil), line...)
127
+
ops = append(ops, op)
128
}
129
}
130
131
+
return ops
132
+
}
133
134
+
func (bm *BundleManager) serializeJSONL(ops []PLCOperation) []byte {
135
+
var buf []byte
136
+
for _, op := range ops {
137
+
buf = append(buf, op.RawJSON...)
138
+
buf = append(buf, '\n')
139
}
140
+
return buf
141
+
}
142
143
+
// ===== BUNDLE FETCHING =====
144
145
+
type bundleFetcher struct {
146
+
client *Client
147
+
seenCIDs map[string]bool
148
+
currentAfter string
149
+
fetchCount int
150
+
}
151
+
152
+
func newBundleFetcher(client *Client, afterTime string, prevBoundaryCIDs map[string]bool) *bundleFetcher {
153
+
seen := make(map[string]bool)
154
for cid := range prevBoundaryCIDs {
155
+
seen[cid] = true
156
}
157
158
+
return &bundleFetcher{
159
+
client: client,
160
+
seenCIDs: seen,
161
+
currentAfter: afterTime,
162
+
}
163
+
}
164
165
+
func (bf *bundleFetcher) fetchUntilComplete(ctx context.Context, target int) ([]PLCOperation, bool) {
166
+
var ops []PLCOperation
167
+
maxFetches := (target / 900) + 5
168
169
+
for len(ops) < target && bf.fetchCount < maxFetches {
170
+
bf.fetchCount++
171
+
batchSize := bf.calculateBatchSize(target - len(ops))
172
173
+
log.Verbose(" Fetch #%d: need %d more, requesting %d", bf.fetchCount, target-len(ops), batchSize)
174
175
+
batch, shouldContinue := bf.fetchBatch(ctx, batchSize)
176
177
+
for _, op := range batch {
178
+
if !bf.seenCIDs[op.CID] {
179
+
bf.seenCIDs[op.CID] = true
180
+
ops = append(ops, op)
181
182
+
if len(ops) >= target {
183
+
return ops[:target], true
184
}
185
}
186
}
187
188
+
if !shouldContinue {
189
break
190
}
191
+
}
192
193
+
return ops, len(ops) >= target
194
+
}
195
196
+
func (bf *bundleFetcher) calculateBatchSize(remaining int) int {
197
+
if bf.fetchCount == 0 {
198
+
return 1000
199
+
}
200
+
if remaining < 100 {
201
+
return 50
202
}
203
+
if remaining < 500 {
204
+
return 200
205
}
206
+
return 1000
207
+
}
208
209
+
func (bf *bundleFetcher) fetchBatch(ctx context.Context, size int) ([]PLCOperation, bool) {
210
+
ops, err := bf.client.Export(ctx, ExportOptions{
211
+
Count: size,
212
+
After: bf.currentAfter,
213
+
})
214
215
+
if err != nil || len(ops) == 0 {
216
+
return nil, false
217
}
218
219
+
if len(ops) > 0 {
220
+
bf.currentAfter = ops[len(ops)-1].CreatedAt.Format(time.RFC3339Nano)
221
}
222
223
+
return ops, len(ops) >= size
224
}
225
226
+
// ===== MAIN BUNDLE LOADING =====
227
228
+
func (bm *BundleManager) LoadBundle(ctx context.Context, bundleNum int, plcClient *Client) ([]PLCOperation, bool, error) {
229
+
if !bm.enabled {
230
+
return nil, false, fmt.Errorf("bundle manager disabled")
231
}
232
233
+
bf := bm.newBundleFile(bundleNum)
234
235
+
// Try local file first
236
+
if bf.exists() {
237
+
return bm.loadFromFile(ctx, bundleNum, bf)
238
}
239
240
+
// Fetch from PLC
241
+
return bm.fetchFromPLC(ctx, bundleNum, bf, plcClient)
242
}
243
244
+
func (bm *BundleManager) loadFromFile(ctx context.Context, bundleNum int, bf *bundleFile) ([]PLCOperation, bool, error) {
245
+
log.Verbose("→ Loading bundle %06d from local file", bundleNum)
246
247
+
// Verify hash if bundle is in DB
248
+
if dbBundle, err := bm.db.GetBundleByNumber(ctx, bundleNum); err == nil && dbBundle != nil {
249
+
if err := bm.verifyHash(bf.path, dbBundle.CompressedHash); err != nil {
250
+
log.Error("⚠ Hash mismatch for bundle %06d! Re-fetching...", bundleNum)
251
+
os.Remove(bf.path)
252
+
return nil, false, fmt.Errorf("hash mismatch")
253
+
}
254
+
log.Verbose("✓ Hash verified for bundle %06d", bundleNum)
255
}
256
257
+
if err := bm.load(bf); err != nil {
258
+
return nil, false, err
259
}
260
261
+
// Index if not in DB
262
+
if _, err := bm.db.GetBundleByNumber(ctx, bundleNum); err != nil {
263
+
bf.compressedHash = bm.hashFile(bf.path)
264
+
bf.uncompressedHash = bm.hash(bm.serializeJSONL(bf.operations))
265
+
bm.indexBundle(ctx, bundleNum, bf)
266
}
267
268
+
return bf.operations, true, nil
269
}
270
271
+
func (bm *BundleManager) fetchFromPLC(ctx context.Context, bundleNum int, bf *bundleFile, client *Client) ([]PLCOperation, bool, error) {
272
+
log.Info("→ Bundle %06d not found locally, fetching from PLC directory...", bundleNum)
273
274
+
afterTime, prevCIDs := bm.getBoundaryInfo(ctx, bundleNum)
275
+
fetcher := newBundleFetcher(client, afterTime, prevCIDs)
276
277
+
ops, isComplete := fetcher.fetchUntilComplete(ctx, BUNDLE_SIZE)
278
279
+
log.Info(" Collected %d unique operations after %d fetches (complete=%v)",
280
+
len(ops), fetcher.fetchCount, isComplete)
281
+
282
+
if isComplete {
283
+
bf.operations = ops
284
+
if err := bm.save(bf); err != nil {
285
+
log.Error("Warning: failed to save bundle: %v", err)
286
} else {
287
+
bm.indexBundle(ctx, bundleNum, bf)
288
+
log.Info("✓ Bundle %06d saved [%d ops, hash: %s...]",
289
+
bundleNum, len(ops), bf.uncompressedHash[:16])
290
}
291
}
292
293
+
return ops, isComplete, nil
294
}
295
296
+
func (bm *BundleManager) getBoundaryInfo(ctx context.Context, bundleNum int) (string, map[string]bool) {
297
+
if bundleNum == 1 {
298
+
return "", nil
299
}
300
301
+
prevBundle, err := bm.db.GetBundleByNumber(ctx, bundleNum-1)
302
if err != nil {
303
+
return "", nil
304
}
305
306
+
afterTime := prevBundle.EndTime.Format(time.RFC3339Nano)
307
308
+
// Return stored boundary CIDs if available
309
+
if len(prevBundle.BoundaryCIDs) > 0 {
310
+
cids := make(map[string]bool)
311
+
for _, cid := range prevBundle.BoundaryCIDs {
312
+
cids[cid] = true
313
}
314
+
return afterTime, cids
315
+
}
316
317
+
// Fallback: compute from file
318
+
bf := bm.newBundleFile(bundleNum - 1)
319
+
if bf.exists() {
320
+
if err := bm.load(bf); err == nil {
321
+
_, cids := GetBoundaryCIDs(bf.operations)
322
+
return afterTime, cids
323
}
324
}
325
326
+
return afterTime, nil
327
}
328
329
+
// ===== BUNDLE INDEXING =====
330
+
331
+
func (bm *BundleManager) indexBundle(ctx context.Context, bundleNum int, bf *bundleFile) error {
332
+
prevHash := ""
333
+
if bundleNum > 1 {
334
+
if prev, err := bm.db.GetBundleByNumber(ctx, bundleNum-1); err == nil {
335
+
prevHash = prev.Hash
336
}
337
}
338
339
+
dids := bm.extractUniqueDIDs(bf.operations)
340
+
fileSize := bm.getFileSize(bf.path)
341
342
bundle := &storage.PLCBundle{
343
+
BundleNumber: bundleNum,
344
+
StartTime: bf.operations[0].CreatedAt,
345
+
EndTime: bf.operations[len(bf.operations)-1].CreatedAt,
346
DIDs: dids,
347
+
Hash: bf.uncompressedHash,
348
+
CompressedHash: bf.compressedHash,
349
+
CompressedSize: fileSize,
350
+
PrevBundleHash: prevHash,
351
Compressed: true,
352
CreatedAt: time.Now(),
353
}
···
355
return bm.db.CreateBundle(ctx, bundle)
356
}
357
358
+
func (bm *BundleManager) extractUniqueDIDs(ops []PLCOperation) []string {
359
+
didSet := make(map[string]bool)
360
+
for _, op := range ops {
361
+
didSet[op.DID] = true
362
}
363
364
+
dids := make([]string, 0, len(didSet))
365
+
for did := range didSet {
366
+
dids = append(dids, did)
367
}
368
+
return dids
369
}
370
371
+
// ===== MEMPOOL BUNDLE CREATION =====
372
+
373
func (bm *BundleManager) CreateBundleFromMempool(ctx context.Context, operations []PLCOperation) (int, error) {
374
if !bm.enabled {
375
return 0, fmt.Errorf("bundle manager disabled")
···
383
if err != nil {
384
return 0, err
385
}
386
+
bundleNum := lastBundle + 1
387
388
+
bf := bm.newBundleFile(bundleNum)
389
+
bf.operations = operations
390
391
+
if err := bm.save(bf); err != nil {
392
return 0, err
393
}
394
395
+
if err := bm.indexBundle(ctx, bundleNum, bf); err != nil {
396
return 0, err
397
}
398
399
+
log.Info("✓ Created bundle %06d from mempool (hash: %s...)",
400
+
bundleNum, bf.uncompressedHash[:16])
401
402
+
return bundleNum, nil
403
}
404
405
+
// ===== VERIFICATION =====
406
407
func (bm *BundleManager) VerifyChain(ctx context.Context, endBundle int) error {
408
if !bm.enabled {
409
return fmt.Errorf("bundle manager disabled")
···
417
return fmt.Errorf("bundle %06d not found: %w", i, err)
418
}
419
420
// Verify file hash
421
+
path := bm.newBundleFile(i).path
422
+
if err := bm.verifyHash(path, bundle.CompressedHash); err != nil {
423
return fmt.Errorf("bundle %06d hash verification failed: %w", i, err)
424
}
425
426
// Verify chain link
···
445
return nil
446
}
447
448
+
func (bm *BundleManager) EnsureBundleContinuity(ctx context.Context, targetBundle int) error {
449
+
if !bm.enabled {
450
+
return nil
451
+
}
452
+
453
+
for i := 1; i < targetBundle; i++ {
454
+
if !bm.newBundleFile(i).exists() {
455
+
if _, err := bm.db.GetBundleByNumber(ctx, i); err != nil {
456
+
return fmt.Errorf("bundle %06d is missing (required for continuity)", i)
457
+
}
458
+
}
459
+
}
460
+
461
+
return nil
462
+
}
463
+
464
+
// ===== UTILITY METHODS =====
465
+
466
+
func (bm *BundleManager) hash(data []byte) string {
467
+
h := sha256.Sum256(data)
468
+
return hex.EncodeToString(h[:])
469
+
}
470
+
471
+
func (bm *BundleManager) hashFile(path string) string {
472
+
data, _ := os.ReadFile(path)
473
+
return bm.hash(data)
474
+
}
475
+
476
+
func (bm *BundleManager) verifyHash(path, expectedHash string) error {
477
+
if expectedHash == "" {
478
+
return nil
479
+
}
480
+
481
+
actualHash := bm.hashFile(path)
482
+
if actualHash != expectedHash {
483
+
return fmt.Errorf("hash mismatch")
484
+
}
485
+
return nil
486
+
}
487
+
488
+
func (bm *BundleManager) getFileSize(path string) int64 {
489
+
if info, err := os.Stat(path); err == nil {
490
+
return info.Size()
491
+
}
492
+
return 0
493
+
}
494
+
495
+
func (bm *BundleManager) GetStats(ctx context.Context) (int64, int64, error) {
496
+
if !bm.enabled {
497
+
return 0, 0, nil
498
+
}
499
+
return bm.db.GetBundleStats(ctx)
500
+
}
501
+
502
func (bm *BundleManager) GetChainInfo(ctx context.Context) (map[string]interface{}, error) {
503
lastBundle, err := bm.db.GetLastBundleNumber(ctx)
504
if err != nil {
···
512
}, nil
513
}
514
515
+
firstBundle, _ := bm.db.GetBundleByNumber(ctx, 1)
516
+
lastBundleData, _ := bm.db.GetBundleByNumber(ctx, lastBundle)
517
518
return map[string]interface{}{
519
"chain_length": lastBundle,
···
524
"chain_head_hash": lastBundleData.Hash,
525
}, nil
526
}
527
+
528
+
// ===== EXPORTED HELPERS =====
529
+
530
+
func GetBoundaryCIDs(operations []PLCOperation) (time.Time, map[string]bool) {
531
+
if len(operations) == 0 {
532
+
return time.Time{}, nil
533
+
}
534
+
535
+
lastOp := operations[len(operations)-1]
536
+
boundaryTime := lastOp.CreatedAt
537
+
cidSet := make(map[string]bool)
538
+
539
+
for i := len(operations) - 1; i >= 0; i-- {
540
+
op := operations[i]
541
+
if op.CreatedAt.Equal(boundaryTime) {
542
+
cidSet[op.CID] = true
543
+
} else {
544
+
break
545
+
}
546
+
}
547
+
548
+
return boundaryTime, cidSet
549
+
}
550
+
551
+
func StripBoundaryDuplicates(operations []PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []PLCOperation {
552
+
if len(operations) == 0 {
553
+
return operations
554
+
}
555
+
556
+
boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp)
557
+
if err != nil {
558
+
return operations
559
+
}
560
+
561
+
startIdx := 0
562
+
for startIdx < len(operations) {
563
+
op := operations[startIdx]
564
+
565
+
if op.CreatedAt.After(boundaryTime) {
566
+
break
567
+
}
568
+
569
+
if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] {
570
+
startIdx++
571
+
continue
572
+
}
573
+
574
+
break
575
+
}
576
+
577
+
return operations[startIdx:]
578
+
}
+25
-7
internal/plc/scanner.go
+25
-7
internal/plc/scanner.go
···
43
44
// ScanMetrics tracks scan progress
45
type ScanMetrics struct {
46
-
totalProcessed int64
47
endpointCounts map[string]int64
48
currentBundle int
49
startTime time.Time
···
59
60
func (m *ScanMetrics) logSummary() {
61
summary := formatEndpointCounts(m.endpointCounts)
62
-
log.Info("PLC scan completed: %d operations, %s in %v",
63
-
m.totalProcessed, summary, time.Since(m.startTime))
64
}
65
66
func (s *Scanner) Scan(ctx context.Context) error {
···
190
}
191
192
s.mergeCounts(m.endpointCounts, counts)
193
-
m.totalProcessed += int64(len(ops))
194
195
batchTotal := sumCounts(counts)
196
log.Verbose("✓ Processed bundle %06d: %d operations (after dedup), %d new endpoints",
···
268
}
269
270
fetchedCount := len(ops)
271
log.Verbose(" Fetched %d operations from PLC", fetchedCount)
272
273
if fetchedCount == 0 {
···
276
return false, nil
277
}
278
279
-
// ✅ Fix: Handle errors from GetMempoolCount
280
beforeCount, err := s.db.GetMempoolCount(ctx)
281
if err != nil {
282
return false, err
283
}
284
285
if err := s.addToMempool(ctx, ops, m.endpointCounts); err != nil {
286
return false, err
287
}
288
289
afterCount, err := s.db.GetMempoolCount(ctx)
290
if err != nil {
291
return false, err
292
}
293
294
-
m.totalProcessed += int64(fetchedCount)
295
log.Verbose(" Added %d new unique operations to mempool (%d were duplicates)",
296
-
afterCount-beforeCount, fetchedCount-(afterCount-beforeCount))
297
298
// Continue only if got full batch
299
shouldContinue := fetchedCount >= limit
···
347
}
348
349
// Process and update metrics
350
counts, _ := s.processBatch(ctx, ops)
351
s.mergeCounts(m.endpointCounts, counts)
352
m.currentBundle = bundleNum
353
354
if err := s.updateCursorForBundle(ctx, bundleNum, m.totalProcessed); err != nil {
···
43
44
// ScanMetrics tracks scan progress
45
type ScanMetrics struct {
46
+
totalFetched int64 // Total ops fetched from PLC/bundles
47
+
totalProcessed int64 // Unique ops processed (after dedup)
48
+
newEndpoints int64 // New endpoints discovered
49
endpointCounts map[string]int64
50
currentBundle int
51
startTime time.Time
···
61
62
func (m *ScanMetrics) logSummary() {
63
summary := formatEndpointCounts(m.endpointCounts)
64
+
if m.newEndpoints > 0 {
65
+
log.Info("PLC scan completed: %d operations processed (%d fetched), %s in %v",
66
+
m.totalProcessed, m.totalFetched, summary, time.Since(m.startTime))
67
+
} else {
68
+
log.Info("PLC scan completed: %d operations processed (%d fetched), 0 new endpoints in %v",
69
+
m.totalProcessed, m.totalFetched, time.Since(m.startTime))
70
+
}
71
}
72
73
func (s *Scanner) Scan(ctx context.Context) error {
···
197
}
198
199
s.mergeCounts(m.endpointCounts, counts)
200
+
m.totalProcessed += int64(len(ops)) // Unique ops after dedup
201
+
m.newEndpoints += sumCounts(counts) // NEW: Track new endpoints
202
203
batchTotal := sumCounts(counts)
204
log.Verbose("✓ Processed bundle %06d: %d operations (after dedup), %d new endpoints",
···
276
}
277
278
fetchedCount := len(ops)
279
+
m.totalFetched += int64(fetchedCount) // Track all fetched
280
log.Verbose(" Fetched %d operations from PLC", fetchedCount)
281
282
if fetchedCount == 0 {
···
285
return false, nil
286
}
287
288
beforeCount, err := s.db.GetMempoolCount(ctx)
289
if err != nil {
290
return false, err
291
}
292
293
+
endpointsBefore := sumCounts(m.endpointCounts)
294
if err := s.addToMempool(ctx, ops, m.endpointCounts); err != nil {
295
return false, err
296
}
297
+
endpointsAfter := sumCounts(m.endpointCounts)
298
+
m.newEndpoints += (endpointsAfter - endpointsBefore) // Add new endpoints found
299
300
afterCount, err := s.db.GetMempoolCount(ctx)
301
if err != nil {
302
return false, err
303
}
304
305
+
uniqueAdded := int64(afterCount - beforeCount) // Cast to int64
306
+
m.totalProcessed += uniqueAdded // Track unique ops processed
307
+
308
log.Verbose(" Added %d new unique operations to mempool (%d were duplicates)",
309
+
uniqueAdded, int64(fetchedCount)-uniqueAdded)
310
311
// Continue only if got full batch
312
shouldContinue := fetchedCount >= limit
···
360
}
361
362
// Process and update metrics
363
+
countsBefore := sumCounts(m.endpointCounts)
364
counts, _ := s.processBatch(ctx, ops)
365
s.mergeCounts(m.endpointCounts, counts)
366
+
newEndpointsFound := sumCounts(m.endpointCounts) - countsBefore
367
+
368
+
m.totalProcessed += int64(len(ops))
369
+
m.newEndpoints += newEndpointsFound // NEW: Track new endpoints
370
m.currentBundle = bundleNum
371
372
if err := s.updateCursorForBundle(ctx, bundleNum, m.totalProcessed); err != nil {