+1
-1
bundle/bundle_test.go
+1
-1
bundle/bundle_test.go
+4
-4
bundle/manager.go
+4
-4
bundle/manager.go
···
85
85
}
86
86
87
87
// Initialize operations handler
88
-
ops, err := storage.NewOperations(config.Logger)
88
+
ops, err := storage.NewOperations(config.Logger, config.Verbose)
89
89
if err != nil {
90
90
return nil, fmt.Errorf("failed to initialize operations: %w", err)
91
91
}
···
471
471
// Get hostname
472
472
hostname, _ := os.Hostname()
473
473
474
-
// ✅ Create BundleInfo
474
+
// Create BundleInfo
475
475
bundleInfo := &storage.BundleInfo{
476
476
BundleNumber: bundle.BundleNumber,
477
477
Origin: origin,
···
483
483
484
484
m.logger.Printf("DEBUG: Calling operations.SaveBundle with bundle=%d", bundleInfo.BundleNumber)
485
485
486
-
// ✅ Save to disk with 3 parameters
486
+
// Save to disk with 3 parameters
487
487
uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations, bundleInfo)
488
488
if err != nil {
489
489
m.logger.Printf("DEBUG: SaveBundle FAILED: %v", err)
···
1560
1560
if err := plcclient.ValidateDIDFormat(input); err != nil {
1561
1561
return "", 0, err
1562
1562
}
1563
-
return input, 0, nil // ✅ No resolution needed
1563
+
return input, 0, nil // No resolution needed
1564
1564
}
1565
1565
1566
1566
// Support did:web too
+1
-1
bundle/metadata.go
+1
-1
bundle/metadata.go
···
131
131
}
132
132
defer file.Close()
133
133
134
-
// ✅ Use abstracted reader from storage package
134
+
// Use abstracted reader from storage package
135
135
reader, err := storage.NewStreamingReader(file)
136
136
if err != nil {
137
137
return 0, 0, time.Time{}, time.Time{}, fmt.Errorf("failed to create reader: %w", err)
+1
-1
bundle/scanner.go
+1
-1
bundle/scanner.go
···
205
205
for num := range jobs {
206
206
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
207
207
208
-
// ✅ NEW: Stream metadata WITHOUT loading all operations
208
+
// Stream metadata WITHOUT loading all operations
209
209
meta, err := m.CalculateMetadataStreaming(num, path)
210
210
if err != nil {
211
211
results <- bundleResult{index: num, err: err}
+7
-5
cmd/plcbundle/commands/common.go
+7
-5
cmd/plcbundle/commands/common.go
···
123
123
config := bundle.DefaultConfig(absDir)
124
124
config.AutoInit = opts.AutoInit
125
125
126
-
// Set verbose from command if available
126
+
// Check BOTH global AND local verbose flags
127
127
if opts.Cmd != nil {
128
-
if verbose, err := opts.Cmd.Root().PersistentFlags().GetBool("verbose"); err == nil {
129
-
config.Verbose = verbose
130
-
}
128
+
globalVerbose, _ := opts.Cmd.Root().PersistentFlags().GetBool("verbose")
129
+
localVerbose, _ := opts.Cmd.Flags().GetBool("verbose")
130
+
131
+
// Use OR logic: verbose if EITHER flag is set
132
+
config.Verbose = globalVerbose || localVerbose
131
133
}
132
134
133
135
// Create PLC client if URL provided
···
146
148
// Set handle resolver URL from flag or option
147
149
handleResolverURL := opts.HandleResolverURL
148
150
if handleResolverURL == "" && opts.Cmd != nil {
149
-
handleResolverURL, _ = opts.Cmd.Root().PersistentFlags().GetString("handle-resolver") // ✅ Fixed flag name
151
+
handleResolverURL, _ = opts.Cmd.Root().PersistentFlags().GetString("handle-resolver")
150
152
}
151
153
// Only override default if explicitly provided
152
154
if handleResolverURL != "" {
+10
-8
cmd/plcbundle/commands/inspect.go
+10
-8
cmd/plcbundle/commands/inspect.go
···
255
255
result.FileSize = info.Size()
256
256
257
257
// Check for frame index
258
-
ops := &storage.Operations{}
258
+
ops, _ := storage.NewOperations(nil, opts.verbose)
259
+
259
260
if _, err := ops.ExtractBundleMetadata(bundlePath); err == nil {
260
261
result.HasFrameIndex = true // Has embedded index
261
262
} else {
···
274
275
fmt.Fprintf(os.Stderr, "Reading embedded metadata...\n")
275
276
metaStart := time.Now()
276
277
277
-
ops := &storage.Operations{}
278
+
ops, _ := storage.NewOperations(nil, opts.verbose)
279
+
278
280
meta, err := ops.ExtractBundleMetadata(bundlePath)
279
281
if err != nil {
280
282
if opts.verbose {
···
329
331
fmt.Fprintf(os.Stderr, "Verifying cryptographic hashes...\n")
330
332
verifyStart := time.Now()
331
333
332
-
// ✅ Pass cmd parameter
334
+
// Pass cmd parameter
333
335
result.ContentHashValid, result.CompressedHashValid, result.MetadataValid =
334
336
verifyCrypto(cmd, bundlePath, result.Metadata, bundleNum, opts.verbose)
335
337
···
352
354
// ============================================================================
353
355
354
356
func analyzeBundle(path string, opts inspectOptions) (*bundleAnalysis, error) {
355
-
ops := &storage.Operations{}
357
+
ops, _ := storage.NewOperations(nil, opts.verbose)
356
358
operations, err := ops.LoadBundle(path)
357
359
if err != nil {
358
360
return nil, err
···
852
854
}
853
855
854
856
func verifyCrypto(cmd *cobra.Command, path string, meta *storage.BundleMetadata, bundleNum int, verbose bool) (contentValid, compressedValid, metadataValid bool) {
855
-
ops := &storage.Operations{}
857
+
ops, _ := storage.NewOperations(nil, verbose)
856
858
857
859
// Calculate actual hashes from file
858
860
compHash, compSize, contentHash, contentSize, err := ops.CalculateFileHashes(path)
···
867
869
compressedValid = true
868
870
metadataValid = true
869
871
870
-
// ✅ Verify against embedded metadata if available
872
+
// Verify against embedded metadata if available
871
873
if meta != nil {
872
874
// Check content hash (this is in the metadata)
873
875
if meta.ContentHash != "" && meta.ContentHash != contentHash {
···
884
886
metadataValid = true
885
887
}
886
888
887
-
// ✅ Note: We don't check compressed hash/size because they're not in metadata
889
+
// Note: We don't check compressed hash/size because they're not in metadata
888
890
// (The file IS the compressed data, so it's redundant)
889
891
890
892
if verbose {
···
895
897
}
896
898
}
897
899
898
-
// ✅ Also verify against repository index if bundle number is known
900
+
// Also verify against repository index if bundle number is known
899
901
if bundleNum > 0 {
900
902
mgr, _, err := getManager(&ManagerOptions{Cmd: cmd})
901
903
if err == nil {
+3
-10
cmd/plcbundle/commands/migrate.go
+3
-10
cmd/plcbundle/commands/migrate.go
···
148
148
hashChanges := make([]int, 0, len(needsMigration))
149
149
150
150
for i, bundleNum := range needsMigration {
151
-
// ✅ Pass version to migrateBundle
151
+
// Pass version to migrateBundle
152
152
if err := migrateBundle(dir, bundleNum, index, version, opts.verbose); err != nil {
153
153
failed++
154
154
if firstError == nil {
···
172
172
progress.Finish()
173
173
elapsed := time.Since(start)
174
174
175
-
// ✅ Update index with new compressed hashes
175
+
// Update index with new compressed hashes
176
176
if len(hashChanges) > 0 {
177
177
fmt.Printf("\nUpdating bundle index...\n")
178
178
updateStart := time.Now()
···
224
224
fmt.Printf(" Index updated: %d entries\n", len(hashChanges))
225
225
fmt.Printf(" Speed: %.1f bundles/sec\n\n", float64(success)/elapsed.Seconds())
226
226
227
-
fmt.Printf("✨ New bundle format features:\n")
228
-
fmt.Printf(" • Embedded metadata (JSON in skippable frame)\n")
229
-
fmt.Printf(" • Frame offsets for instant random access\n")
230
-
fmt.Printf(" • Multi-frame compression (100 ops/frame)\n")
231
-
fmt.Printf(" • Self-contained (no .idx files)\n")
232
-
fmt.Printf(" • Provenance tracking (version, origin, creator)\n")
233
-
fmt.Printf(" • Compatible with standard zstd tools\n")
234
227
} else {
235
228
fmt.Printf("⚠️ Migration completed with errors\n")
236
229
fmt.Printf(" Success: %d bundles\n", success)
···
274
267
// 4. Get hostname (optional)
275
268
hostname, _ := os.Hostname()
276
269
277
-
// 5. ✅ Create BundleInfo for new format
270
+
// 5. Create BundleInfo for new format
278
271
bundleInfo := &storage.BundleInfo{
279
272
BundleNumber: meta.BundleNumber,
280
273
Origin: index.Origin, // From index
+8
-18
cmd/plcbundle/commands/op.go
+8
-18
cmd/plcbundle/commands/op.go
···
97
97
98
98
ctx := context.Background()
99
99
100
-
// ✅ Time the operation load
100
+
// Time the operation load
101
101
totalStart := time.Now()
102
102
op, err := mgr.LoadOperation(ctx, bundleNum, position)
103
103
totalDuration := time.Since(totalStart)
···
106
106
return err
107
107
}
108
108
109
-
// Output timing to stderr if verbose
110
109
if verbose {
111
110
globalPos := (bundleNum * 10000) + position
112
111
113
-
fmt.Fprintf(os.Stderr, "Operation Load Metrics\n")
114
-
fmt.Fprintf(os.Stderr, "══════════════════════\n\n")
115
-
fmt.Fprintf(os.Stderr, " Location: Bundle %06d, Position %04d\n", bundleNum, position)
116
-
fmt.Fprintf(os.Stderr, " Global Position: %d\n", globalPos)
117
-
fmt.Fprintf(os.Stderr, " Total Time: %s\n", totalDuration)
118
-
119
-
// Calculate throughput
120
-
if len(op.RawJSON) > 0 {
121
-
mbPerSec := float64(len(op.RawJSON)) / totalDuration.Seconds() / (1024 * 1024)
122
-
fmt.Fprintf(os.Stderr, " Data Size: %d bytes\n", len(op.RawJSON))
123
-
fmt.Fprintf(os.Stderr, " Throughput: %.2f MB/s\n", mbPerSec)
124
-
}
125
-
112
+
// Log-style output (compact, single-line friendly)
113
+
fmt.Fprintf(os.Stderr, "[Load] Bundle %06d:%04d (pos=%d) in %s",
114
+
bundleNum, position, globalPos, totalDuration)
115
+
fmt.Fprintf(os.Stderr, " | %d bytes", len(op.RawJSON))
126
116
fmt.Fprintf(os.Stderr, "\n")
127
117
}
128
118
···
188
178
189
179
ctx := context.Background()
190
180
191
-
// ✅ Time the operation
181
+
// Time the operation
192
182
loadStart := time.Now()
193
183
op, err := mgr.LoadOperation(ctx, bundleNum, position)
194
184
loadDuration := time.Since(loadStart)
···
197
187
return err
198
188
}
199
189
200
-
// ✅ Time the parsing
190
+
// Time the parsing
201
191
parseStart := time.Now()
202
192
opData, parseErr := op.GetOperationData()
203
193
parseDuration := time.Since(parseStart)
···
417
407
fmt.Printf("──────\n")
418
408
fmt.Printf(" %s\n\n", status)
419
409
420
-
// ✅ Performance metrics (always shown if verbose)
410
+
// Performance metrics (always shown if verbose)
421
411
if verbose {
422
412
totalTime := loadDuration + parseDuration
423
413
+2
-2
internal/didindex/builder.go
+2
-2
internal/didindex/builder.go
···
176
176
177
177
type tempEntry struct {
178
178
identifier string
179
-
location OpLocation // ← Single packed value
179
+
location OpLocation
180
180
}
181
181
182
182
entries := make([]tempEntry, entryCount)
183
183
for i := 0; i < entryCount; i++ {
184
-
offset := i * 28 // ← 28 bytes
184
+
offset := i * 28
185
185
entries[i] = tempEntry{
186
186
identifier: string(data[offset : offset+24]),
187
187
location: OpLocation(binary.LittleEndian.Uint32(data[offset+24 : offset+28])),
+5
-5
internal/didindex/manager.go
+5
-5
internal/didindex/manager.go
···
110
110
dim.logger.Printf("DEBUG: Shard %02x loaded, size: %d bytes", shardNum, len(shard.data))
111
111
}
112
112
113
-
// ✅ Safe to read - refcount prevents eviction
113
+
// Safe to read - refcount prevents eviction
114
114
locations := dim.searchShard(shard, identifier)
115
115
116
116
if dim.verbose {
···
428
428
// Read locations
429
429
locations := make([]OpLocation, count)
430
430
for i := 0; i < int(count); i++ {
431
-
if offset+4 > len(data) { // ← 4 bytes now
431
+
if offset+4 > len(data) {
432
432
return locations[:i]
433
433
}
434
434
···
436
436
packed := binary.LittleEndian.Uint32(data[offset : offset+4])
437
437
locations[i] = OpLocation(packed)
438
438
439
-
offset += 4 // ← 4 bytes
439
+
offset += 4
440
440
}
441
441
442
442
return locations
···
646
646
for i, id := range identifiers {
647
647
offsetTable[i] = uint32(currentOffset)
648
648
locations := builder.entries[id]
649
-
entrySize := DID_IDENTIFIER_LEN + 2 + (len(locations) * 4) // ← 4 bytes
649
+
entrySize := DID_IDENTIFIER_LEN + 2 + (len(locations) * 4)
650
650
currentOffset += entrySize
651
651
}
652
652
···
688
688
for _, loc := range locations {
689
689
// Write packed uint32 (global position + nullified bit)
690
690
binary.LittleEndian.PutUint32(buf[offset:offset+4], uint32(loc))
691
-
offset += 4 // ← 4 bytes per location
691
+
offset += 4
692
692
}
693
693
}
694
694
+1
-2
internal/mempool/mempool_test.go
+1
-2
internal/mempool/mempool_test.go
···
352
352
m.Save()
353
353
354
354
// Add 10 more and save
355
-
// FIX: makeTestOperationsFrom(start, COUNT) - so we want (10, 10) not (10, 20)
356
-
ops2 := makeTestOperationsFrom(10, 10) // ← Changed from (10, 20)
355
+
ops2 := makeTestOperationsFrom(10, 10)
357
356
m.Add(ops2)
358
357
m.Save()
359
358
+1
-1
internal/plcclient/plc_test.go
+1
-1
internal/plcclient/plc_test.go
+4
-4
internal/plcclient/resolver.go
+4
-4
internal/plcclient/resolver.go
···
176
176
// Base contexts - ALWAYS include multikey (matches PLC directory behavior)
177
177
contexts := []string{
178
178
"https://www.w3.org/ns/did/v1",
179
-
"https://w3id.org/security/multikey/v1", // ← Always include this
179
+
"https://w3id.org/security/multikey/v1",
180
180
}
181
181
182
182
hasSecp256k1 := false
···
204
204
doc := &DIDDocument{
205
205
Context: contexts,
206
206
ID: state.DID,
207
-
AlsoKnownAs: []string{}, // ← Empty slice
208
-
VerificationMethod: []VerificationMethod{}, // ← Empty slice
209
-
Service: []Service{}, // ← Empty slice
207
+
AlsoKnownAs: []string{},
208
+
VerificationMethod: []VerificationMethod{},
209
+
Service: []Service{},
210
210
}
211
211
212
212
// Copy alsoKnownAs if present
+35
-40
internal/storage/storage.go
+35
-40
internal/storage/storage.go
···
39
39
StartTime time.Time `json:"start_time"` // First operation timestamp
40
40
EndTime time.Time `json:"end_time"` // Last operation timestamp
41
41
42
-
// === Frame Structure (for random access) ===
43
-
FrameCount int `json:"frame_count"` // Number of zstd frames (usually 100)
44
-
FrameSize int `json:"frame_size"` // Operations per frame (100)
45
-
FrameOffsets []int64 `json:"frame_offsets"` // Byte offsets of each frame
42
+
// === Creation Provenance ===
43
+
CreatedAt time.Time `json:"created_at"` // When bundle was created
44
+
CreatedBy string `json:"created_by"` // "plcbundle/v1.2.3"
45
+
CreatedByHost string `json:"created_by_host,omitempty"` // Optional: hostname that created it
46
46
47
47
// === Optional Context ===
48
48
Cursor string `json:"cursor,omitempty"` // PLC export cursor for this bundle
49
49
Notes string `json:"notes,omitempty"` // Optional description
50
50
51
-
// === Creation Provenance ===
52
-
CreatedAt time.Time `json:"created_at"` // When bundle was created
53
-
CreatedBy string `json:"created_by"` // "plcbundle/v1.2.3"
54
-
CreatedByHost string `json:"created_by_host,omitempty"` // Optional: hostname that created it
51
+
// === Frame Structure (for random access) ===
52
+
FrameCount int `json:"frame_count"` // Number of zstd frames (usually 100)
53
+
FrameSize int `json:"frame_size"` // Operations per frame (100)
54
+
FrameOffsets []int64 `json:"frame_offsets"` // Byte offsets of each frame
55
55
}
56
56
57
57
// Operations handles low-level bundle file operations
58
58
type Operations struct {
59
-
logger Logger
59
+
logger Logger
60
+
verbose bool
60
61
}
61
62
62
63
// Logger interface
···
65
66
Println(v ...interface{})
66
67
}
67
68
68
-
func NewOperations(logger Logger) (*Operations, error) {
69
-
return &Operations{logger: logger}, nil
69
+
func NewOperations(logger Logger, verbose bool) (*Operations, error) {
70
+
return &Operations{
71
+
logger: logger,
72
+
verbose: verbose,
73
+
}, nil
70
74
}
71
75
72
76
func (op *Operations) Close() {
···
166
170
compressedFrames = append(compressedFrames, compressedChunk)
167
171
}
168
172
169
-
// 3. ✅ Calculate RELATIVE offsets (relative to first data frame)
173
+
// 3. Calculate RELATIVE offsets (relative to first data frame)
170
174
relativeOffsets := make([]int64, len(compressedFrames)+1)
171
175
relativeOffsets[0] = 0
172
176
···
176
180
relativeOffsets[i+1] = cumulative
177
181
}
178
182
179
-
// 4. ✅ Build metadata with RELATIVE offsets
183
+
// 4. Build metadata with RELATIVE offsets
180
184
metadata := &BundleMetadata{
181
185
Format: fmt.Sprintf("plcbundle-v%d", MetadataFormatVersion),
182
186
BundleNumber: bundleInfo.BundleNumber,
···
191
195
FrameCount: len(compressedFrames),
192
196
FrameSize: FrameSize,
193
197
Cursor: bundleInfo.Cursor,
194
-
FrameOffsets: relativeOffsets, // ✅ RELATIVE to data start!
198
+
FrameOffsets: relativeOffsets, // RELATIVE to data start!
195
199
}
196
200
197
201
if len(operations) > 0 {
···
212
216
}()
213
217
214
218
// Write metadata frame
215
-
if _, err := WriteMetadataFrame(finalFile, metadata); err != nil {
219
+
if _, err := op.WriteMetadataFrame(finalFile, metadata); err != nil {
216
220
return "", "", 0, 0, fmt.Errorf("failed to write metadata: %w", err)
217
221
}
218
222
···
246
250
}
247
251
defer file.Close()
248
252
249
-
// ✅ Use abstracted streaming reader
253
+
// Use abstracted streaming reader
250
254
reader, err := NewStreamingReader(file)
251
255
if err != nil {
252
256
return nil, fmt.Errorf("failed to create reader: %w", err)
···
283
287
return nil, fmt.Errorf("failed to open bundle: %w", err)
284
288
}
285
289
286
-
// ✅ Use abstracted reader
290
+
// Use abstracted reader
287
291
reader, err := NewStreamingReader(file)
288
292
if err != nil {
289
293
file.Close()
···
343
347
compressedHash = op.Hash(compressedData)
344
348
compressedSize = int64(len(compressedData))
345
349
346
-
// ✅ Use abstracted decompression
350
+
// Use abstracted decompression
347
351
decompressed, err := DecompressAll(compressedData)
348
352
if err != nil {
349
353
return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err)
···
471
475
return nil, fmt.Errorf("invalid position: %d", position)
472
476
}
473
477
474
-
// ✅ Try multiple sources for frame index (no goto!)
478
+
// Try multiple sources for frame index (no goto!)
475
479
frameOffsets, err := op.loadFrameIndex(path)
476
480
if err != nil {
477
481
// No frame index available - use legacy full scan
···
488
492
// loadFrameIndex loads frame offsets and converts to absolute positions
489
493
func (op *Operations) loadFrameIndex(path string) ([]int64, error) {
490
494
// Try embedded metadata first
491
-
meta, err := ExtractMetadataFromFile(path)
495
+
meta, err := op.ExtractMetadataFromFile(path)
492
496
if err == nil && len(meta.FrameOffsets) > 0 {
493
-
// ✅ Convert relative offsets to absolute
497
+
// Convert relative offsets to absolute
494
498
// First, get metadata frame size by re-reading
495
499
file, _ := os.Open(path)
496
500
if file != nil {
497
501
defer file.Close()
498
502
499
503
// Read metadata frame to find where data starts
500
-
magic, data, readErr := ReadSkippableFrame(file)
504
+
magic, data, readErr := op.ReadSkippableFrame(file)
501
505
if readErr == nil && magic == SkippableMagicMetadata {
502
506
// Metadata frame size = 4 (magic) + 4 (size) + len(data)
503
507
metadataFrameSize := int64(8 + len(data))
···
542
546
endOffset := frameOffsets[frameIndex+1]
543
547
frameLength := endOffset - startOffset
544
548
545
-
// ✅ DEBUG
546
-
if op.logger != nil {
547
-
op.logger.Printf("DEBUG: Frame %d: offset %d-%d, length %d bytes",
548
-
frameIndex, startOffset, endOffset, frameLength)
549
-
}
550
-
551
549
if frameLength <= 0 || frameLength > 10*1024*1024 {
552
550
return nil, fmt.Errorf("invalid frame length: %d (offsets: %d-%d)",
553
551
frameLength, startOffset, endOffset)
···
560
558
defer bundleFile.Close()
561
559
562
560
compressedFrame := make([]byte, frameLength)
563
-
n, err := bundleFile.ReadAt(compressedFrame, startOffset)
561
+
_, err = bundleFile.ReadAt(compressedFrame, startOffset)
564
562
if err != nil {
565
563
return nil, fmt.Errorf("failed to read frame %d (offset %d, length %d): %w",
566
564
frameIndex, startOffset, frameLength, err)
567
565
}
568
566
569
-
if op.logger != nil {
570
-
op.logger.Printf("DEBUG: Read %d bytes from offset %d", n, startOffset)
571
-
}
572
-
573
567
// Decompress
574
568
decompressed, err := DecompressFrame(compressedFrame)
575
569
if err != nil {
576
-
// ✅ DEBUG: Show first few bytes to diagnose
577
570
if op.logger != nil {
578
571
preview := compressedFrame
579
572
if len(preview) > 16 {
580
573
preview = preview[:16]
581
574
}
582
-
op.logger.Printf("DEBUG: Failed frame data (first 16 bytes): % x", preview)
575
+
if op.verbose {
576
+
op.logger.Printf("DEBUG: Failed frame data (first 16 bytes): % x", preview)
577
+
}
583
578
}
584
579
return nil, fmt.Errorf("failed to decompress frame %d: %w", frameIndex, err)
585
580
}
···
617
612
}
618
613
defer file.Close()
619
614
620
-
// ✅ Use abstracted streaming reader
615
+
// Use abstracted streaming reader
621
616
reader, err := NewStreamingReader(file)
622
617
if err != nil {
623
618
return nil, fmt.Errorf("failed to create reader: %w", err)
···
675
670
}
676
671
defer file.Close()
677
672
678
-
// ✅ Use abstracted streaming reader
673
+
// Use abstracted streaming reader
679
674
reader, err := NewStreamingReader(file)
680
675
if err != nil {
681
676
return nil, fmt.Errorf("failed to create reader: %w", err)
···
733
728
}
734
729
defer file.Close()
735
730
736
-
// ✅ Use abstracted reader
731
+
// Use abstracted reader
737
732
reader, err := NewStreamingReader(file)
738
733
if err != nil {
739
734
return 0, 0, time.Time{}, time.Time{}, fmt.Errorf("failed to create reader: %w", err)
···
777
772
778
773
// ExtractBundleMetadata extracts metadata from bundle file without decompressing
779
774
func (op *Operations) ExtractBundleMetadata(path string) (*BundleMetadata, error) {
780
-
meta, err := ExtractMetadataFromFile(path)
775
+
meta, err := op.ExtractMetadataFromFile(path)
781
776
if err != nil {
782
777
return nil, fmt.Errorf("failed to extract metadata: %w", err)
783
778
}
···
793
788
defer file.Close()
794
789
795
790
// 1. Try to read metadata frame first
796
-
meta, err := ReadMetadataFrame(file)
791
+
meta, err := op.ReadMetadataFrame(file)
797
792
if err != nil {
798
793
// No metadata frame - fall back to regular load
799
794
file.Seek(0, io.SeekStart) // Reset to beginning
+9
-9
internal/storage/storage_test.go
+9
-9
internal/storage/storage_test.go
···
33
33
func TestStorageCompression(t *testing.T) {
34
34
tmpDir := t.TempDir()
35
35
logger := &testLogger{t: t}
36
-
ops, err := storage.NewOperations(logger)
36
+
ops, err := storage.NewOperations(logger, false)
37
37
if err != nil {
38
38
t.Fatalf("NewOperations failed: %v", err)
39
39
}
···
146
146
147
147
func TestStorageHashing(t *testing.T) {
148
148
logger := &testLogger{t: t}
149
-
ops, err := storage.NewOperations(logger)
149
+
ops, err := storage.NewOperations(logger, false)
150
150
if err != nil {
151
151
t.Fatalf("NewOperations failed: %v", err)
152
152
}
···
240
240
func TestStorageConcurrency(t *testing.T) {
241
241
tmpDir := t.TempDir()
242
242
logger := &testLogger{t: t}
243
-
ops, err := storage.NewOperations(logger)
243
+
ops, err := storage.NewOperations(logger, false)
244
244
if err != nil {
245
245
t.Fatalf("NewOperations failed: %v", err)
246
246
}
···
351
351
func TestStorageEdgeCases(t *testing.T) {
352
352
tmpDir := t.TempDir()
353
353
logger := &testLogger{t: t}
354
-
ops, err := storage.NewOperations(logger)
354
+
ops, err := storage.NewOperations(logger, false)
355
355
if err != nil {
356
356
t.Fatalf("NewOperations failed: %v", err)
357
357
}
···
432
432
433
433
func TestStorageBoundaryConditions(t *testing.T) {
434
434
logger := &testLogger{t: t}
435
-
ops, err := storage.NewOperations(logger)
435
+
ops, err := storage.NewOperations(logger, false)
436
436
if err != nil {
437
437
t.Fatalf("NewOperations failed: %v", err)
438
438
}
···
587
587
588
588
func TestStorageSerialization(t *testing.T) {
589
589
logger := &testLogger{t: t}
590
-
ops, err := storage.NewOperations(logger)
590
+
ops, err := storage.NewOperations(logger, false)
591
591
if err != nil {
592
592
t.Fatalf("NewOperations failed: %v", err)
593
593
}
···
659
659
func TestStorageUtilities(t *testing.T) {
660
660
tmpDir := t.TempDir()
661
661
logger := &testLogger{t: t}
662
-
ops, err := storage.NewOperations(logger)
662
+
ops, err := storage.NewOperations(logger, false)
663
663
if err != nil {
664
664
t.Fatalf("NewOperations failed: %v", err)
665
665
}
···
737
737
func TestStorageStreaming(t *testing.T) {
738
738
tmpDir := t.TempDir()
739
739
logger := &testLogger{t: t}
740
-
ops, err := storage.NewOperations(logger)
740
+
ops, err := storage.NewOperations(logger, false)
741
741
if err != nil {
742
742
t.Fatalf("NewOperations failed: %v", err)
743
743
}
···
800
800
func BenchmarkStorageOperations(b *testing.B) {
801
801
tmpDir := b.TempDir()
802
802
logger := &testLogger{t: &testing.T{}}
803
-
ops, _ := storage.NewOperations(logger)
803
+
ops, _ := storage.NewOperations(logger, false)
804
804
defer ops.Close()
805
805
806
806
operations := makeTestOperations(10000)
+11
-19
internal/storage/zstd.go
+11
-19
internal/storage/zstd.go
···
52
52
}
53
53
54
54
// ReadSkippableFrame with debug
55
-
func ReadSkippableFrame(r io.Reader) (uint32, []byte, error) {
55
+
func (ops *Operations) ReadSkippableFrame(r io.Reader) (uint32, []byte, error) {
56
56
var magic uint32
57
57
if err := binary.Read(r, binary.LittleEndian, &magic); err != nil {
58
58
return 0, nil, fmt.Errorf("failed to read magic: %w", err)
59
59
}
60
-
61
-
fmt.Fprintf(os.Stderr, "DEBUG: Read magic number: 0x%08X\n", magic)
62
60
63
61
if magic < 0x184D2A50 || magic > 0x184D2A5F {
64
62
return 0, nil, fmt.Errorf("not a skippable frame: magic=0x%08X (expected 0x184D2A50-0x184D2A5F)", magic)
···
69
67
return 0, nil, fmt.Errorf("failed to read frame size: %w", err)
70
68
}
71
69
72
-
fmt.Fprintf(os.Stderr, "DEBUG: Frame size: %d bytes\n", frameSize)
73
-
74
70
data := make([]byte, frameSize)
75
71
if _, err := io.ReadFull(r, data); err != nil {
76
72
return 0, nil, fmt.Errorf("failed to read frame data: %w", err)
77
73
}
78
-
79
-
fmt.Fprintf(os.Stderr, "DEBUG: Read %d bytes of frame data\n", len(data))
80
74
81
75
return magic, data, nil
82
76
}
83
77
84
78
// WriteMetadataFrame writes bundle metadata as skippable frame (compact JSON)
85
-
func WriteMetadataFrame(w io.Writer, meta *BundleMetadata) (int64, error) {
79
+
func (op *Operations) WriteMetadataFrame(w io.Writer, meta *BundleMetadata) (int64, error) {
86
80
jsonData, err := json.Marshal(meta)
87
81
if err != nil {
88
82
return 0, fmt.Errorf("failed to marshal metadata: %w", err)
···
91
85
}
92
86
93
87
// ReadMetadataFrame reads bundle metadata from skippable frame
94
-
func ReadMetadataFrame(r io.Reader) (*BundleMetadata, error) {
95
-
magic, data, err := ReadSkippableFrame(r)
88
+
func (ops *Operations) ReadMetadataFrame(r io.Reader) (*BundleMetadata, error) {
89
+
magic, data, err := ops.ReadSkippableFrame(r)
96
90
if err != nil {
97
91
return nil, err
98
92
}
···
111
105
}
112
106
113
107
// ExtractMetadataFromFile reads metadata without decompressing
114
-
func ExtractMetadataFromFile(path string) (*BundleMetadata, error) {
108
+
func (ops *Operations) ExtractMetadataFromFile(path string) (*BundleMetadata, error) {
115
109
file, err := os.Open(path)
116
110
if err != nil {
117
111
return nil, err
118
112
}
119
113
defer file.Close()
120
114
121
-
// ✅ DEBUG: Check first bytes
115
+
// Check first bytes
122
116
header := make([]byte, 8)
123
117
if _, err := file.Read(header); err != nil {
124
118
return nil, fmt.Errorf("failed to read header: %w", err)
125
119
}
126
120
127
-
fmt.Fprintf(os.Stderr, "DEBUG: First 8 bytes: % x\n", header)
128
-
129
121
// Seek back to start
130
122
file.Seek(0, io.SeekStart)
131
123
132
-
meta, err := ReadMetadataFrame(file)
124
+
meta, err := ops.ReadMetadataFrame(file)
133
125
if err != nil {
134
126
return nil, fmt.Errorf("no metadata frame found: %w", err)
135
127
}
···
138
130
}
139
131
140
132
// ExtractFrameIndexFromFile now just reads from metadata
141
-
func ExtractFrameIndexFromFile(path string) ([]int64, error) {
142
-
meta, err := ExtractMetadataFromFile(path)
133
+
func (ops *Operations) ExtractFrameIndexFromFile(path string) ([]int64, error) {
134
+
meta, err := ops.ExtractMetadataFromFile(path)
143
135
if err != nil {
144
136
return nil, err
145
137
}
···
152
144
}
153
145
154
146
// DebugFrameOffsets extracts and displays frame offset information
155
-
func DebugFrameOffsets(path string) error {
156
-
meta, err := ExtractMetadataFromFile(path)
147
+
func (ops *Operations) DebugFrameOffsets(path string) error {
148
+
meta, err := ops.ExtractMetadataFromFile(path)
157
149
if err != nil {
158
150
return fmt.Errorf("failed to extract metadata: %w", err)
159
151
}
+4
-4
internal/sync/fetcher.go
+4
-4
internal/sync/fetcher.go
···
48
48
49
49
seenCIDs := make(map[string]bool)
50
50
51
-
// ✅ Initialize current boundaries from previous bundle (or empty if first fetch)
51
+
// Initialize current boundaries from previous bundle (or empty if first fetch)
52
52
currentBoundaryCIDs := prevBoundaryCIDs
53
53
if currentBoundaryCIDs == nil {
54
54
currentBoundaryCIDs = make(map[string]bool)
···
118
118
originalBatchSize := len(batch)
119
119
totalReceived += originalBatchSize
120
120
121
-
// ✅ CRITICAL: Strip boundary duplicates using current boundaries
121
+
// CRITICAL: Strip boundary duplicates using current boundaries
122
122
batch = f.operations.StripBoundaryDuplicates(
123
123
batch,
124
124
currentAfter,
···
178
178
}
179
179
}
180
180
181
-
// ✅ CRITICAL: Calculate NEW boundary CIDs from this fetch for next iteration
181
+
// CRITICAL: Calculate NEW boundary CIDs from this fetch for next iteration
182
182
if len(batch) > 0 {
183
183
boundaryTime, newBoundaryCIDs := f.operations.GetBoundaryCIDs(batch)
184
184
currentBoundaryCIDs = newBoundaryCIDs
···
202
202
}
203
203
allNewOps = append(allNewOps, batchNewOps...)
204
204
205
-
// ✅ Still update boundaries even without mempool
205
+
// Still update boundaries even without mempool
206
206
if len(batch) > 0 {
207
207
boundaryTime, newBoundaryCIDs := f.operations.GetBoundaryCIDs(batch)
208
208
currentBoundaryCIDs = newBoundaryCIDs
+6
-6
internal/sync/sync_test.go
+6
-6
internal/sync/sync_test.go
···
61
61
}
62
62
}
63
63
64
-
return addedCount, nil // ← Return actual added count
64
+
return addedCount, nil
65
65
}
66
66
67
67
func (m *mockMempool) Save() error {
···
113
113
defer client.Close()
114
114
115
115
logger := &testLogger{t: t}
116
-
ops, _ := storage.NewOperations(logger)
116
+
ops, _ := storage.NewOperations(logger, false)
117
117
defer ops.Close()
118
118
119
119
fetcher := internalsync.NewFetcher(client, ops, logger)
···
174
174
defer client.Close()
175
175
176
176
logger := &testLogger{t: t}
177
-
storageOps, _ := storage.NewOperations(logger)
177
+
storageOps, _ := storage.NewOperations(logger, false)
178
178
defer storageOps.Close()
179
179
180
180
fetcher := internalsync.NewFetcher(client, storageOps, logger)
···
242
242
defer client.Close()
243
243
244
244
logger := &testLogger{t: t}
245
-
storageOps, _ := storage.NewOperations(logger)
245
+
storageOps, _ := storage.NewOperations(logger, false)
246
246
defer storageOps.Close()
247
247
248
248
fetcher := internalsync.NewFetcher(client, storageOps, logger)
···
385
385
defer client.Close()
386
386
387
387
logger := &testLogger{t: t}
388
-
storageOps, _ := storage.NewOperations(logger)
388
+
storageOps, _ := storage.NewOperations(logger, false)
389
389
defer storageOps.Close()
390
390
391
391
fetcher := internalsync.NewFetcher(client, storageOps, logger)
···
556
556
557
557
func TestBundlerCreateBundle(t *testing.T) {
558
558
logger := &testLogger{t: t}
559
-
storageOps, _ := storage.NewOperations(logger)
559
+
storageOps, _ := storage.NewOperations(logger, false)
560
560
defer storageOps.Close()
561
561
562
562
t.Run("BasicBundleCreation", func(t *testing.T) {
+1
-1
server/server_test.go
+1
-1
server/server_test.go
···
989
989
990
990
// Create storage operations ONCE and reuse
991
991
logger := &testLogger{t: t}
992
-
storageOps, err := storage.NewOperations(logger)
992
+
storageOps, err := storage.NewOperations(logger, false)
993
993
if err != nil {
994
994
t.Fatalf("failed to create storage operations: %v", err)
995
995
}