-3
bundle/manager.go
-3
bundle/manager.go
···
1268
1268
}
1269
1269
}
1270
1270
1271
-
// ✨ REMOVED: m.mempool.Save() - now handled by FetchToMempool
1272
-
1273
1271
totalDuration := time.Since(attemptStart)
1274
1272
1275
1273
if m.mempool.Count() < types.BUNDLE_SIZE {
···
1578
1576
input, input)
1579
1577
}
1580
1578
1581
-
// ✨ TIME THE RESOLUTION
1582
1579
resolveStart := time.Now()
1583
1580
m.logger.Printf("Resolving handle: %s", input)
1584
1581
did, err := m.handleResolver.ResolveHandle(ctx, input)
+3
-4
cmd/plcbundle/commands/clone.go
+3
-4
cmd/plcbundle/commands/clone.go
···
40
40
4. Updates local index
41
41
5. Can be interrupted and resumed safely`,
42
42
43
-
Args: cobra.RangeArgs(1, 2), // ✨ 1 or 2 arguments
43
+
Args: cobra.RangeArgs(1, 2),
44
44
45
45
Example: ` # Clone into default 'bundles' directory
46
46
plcbundle clone https://plc.example.com
···
60
60
RunE: func(cmd *cobra.Command, args []string) error {
61
61
remoteURL := strings.TrimSuffix(args[0], "/")
62
62
63
-
// ✨ Optional directory argument (default: "bundles")
64
63
targetDir := "bundles"
65
64
if len(args) > 1 {
66
65
targetDir = args[1]
···
93
92
}
94
93
95
94
func runClone(remoteURL string, targetDir string, opts cloneOptions) error {
96
-
// ✨ Create target directory if it doesn't exist
95
+
// Create target directory if it doesn't exist
97
96
absDir, err := filepath.Abs(targetDir)
98
97
if err != nil {
99
98
return fmt.Errorf("invalid directory path: %w", err)
100
99
}
101
100
102
-
// ✨ Clone creates new repository in specific directory
101
+
// Clone creates new repository in specific directory
103
102
mgr, dir, err := getManager(&ManagerOptions{
104
103
Dir: absDir,
105
104
PLCURL: "https://plc.directory",
+6
-2
cmd/plcbundle/commands/common.go
+6
-2
cmd/plcbundle/commands/common.go
···
55
55
type PLCOperationWithLocation = bundle.PLCOperationWithLocation
56
56
57
57
// ============================================================================
58
-
// ✨ MANAGER OPTIONS STRUCT
58
+
// MANAGER OPTIONS STRUCT
59
59
// ============================================================================
60
60
61
61
// ManagerOptions configures manager creation
···
68
68
}
69
69
70
70
// ============================================================================
71
-
// ✨ SINGLE UNIFIED getManager METHOD
71
+
// SINGLE UNIFIED getManager METHOD
72
72
// ============================================================================
73
73
74
74
// getManager creates or opens a bundle manager
···
200
200
// Formatting helpers
201
201
202
202
func formatBytes(bytes int64) string {
203
+
if bytes < 0 {
204
+
return fmt.Sprintf("-%s", formatBytes(-bytes))
205
+
}
206
+
203
207
const unit = 1000
204
208
if bytes < unit {
205
209
return fmt.Sprintf("%d B", bytes)
+3
-3
cmd/plcbundle/commands/detector.go
+3
-3
cmd/plcbundle/commands/detector.go
···
444
444
func runDetectionParallel(ctx context.Context, mgr BundleManager, setup *detectorSetup, start, end int, workers int, showProgress bool) error {
445
445
totalBundles := end - start + 1
446
446
447
-
// ✨ FIX: Don't create more workers than bundles
447
+
// Don't create more workers than bundles
448
448
if workers > totalBundles {
449
449
workers = totalBundles
450
450
}
451
451
452
-
// ✨ FIX: Use unbuffered channels to avoid blocking issues
452
+
// Use unbuffered channels to avoid blocking issues
453
453
jobs := make(chan int, workers*2) // Small buffer for job numbers only
454
454
results := make(chan detectionResult, workers*2)
455
455
···
538
538
// Collect matches
539
539
allMatches = append(allMatches, res.matches...)
540
540
541
-
// ✨ FIX: Output immediately (don't buffer too much)
541
+
// Output immediately (don't buffer too much)
542
542
if len(allMatches) >= 500 {
543
543
for _, match := range allMatches {
544
544
fmt.Printf("%d,%s,%d,%.2f,%s\n",
+4
-4
cmd/plcbundle/commands/did.go
+4
-4
cmd/plcbundle/commands/did.go
···
97
97
}
98
98
defer mgr.Close()
99
99
100
-
// ✨ Resolve handle to DID with timing
100
+
// Resolve handle to DID with timing
101
101
ctx := context.Background()
102
102
did, handleResolveTime, err := mgr.ResolveHandleOrDID(ctx, input)
103
103
if err != nil {
···
209
209
210
210
ctx := context.Background()
211
211
212
-
// ✨ Resolve handle to DID with timing
212
+
// Resolve handle to DID with timing
213
213
did, handleResolveTime, err := mgr.ResolveHandleOrDID(ctx, input)
214
214
if err != nil {
215
215
return err
···
1030
1030
writeResult(nil, fmt.Errorf("not found"))
1031
1031
}
1032
1032
1033
-
// ✨ Process bundles in parallel - LoadOperations once per bundle
1033
+
// Process bundles in parallel - LoadOperations once per bundle
1034
1034
bundleJobs := make(chan bundleGroup, len(bundles))
1035
1035
var wg sync.WaitGroup
1036
1036
···
1046
1046
positions[i] = locations[didIdx].position
1047
1047
}
1048
1048
1049
-
// ✨ Load operations once for this bundle
1049
+
// Load operations once for this bundle
1050
1050
ops, err := mgr.LoadOperations(ctx, job.bundleNum, positions)
1051
1051
1052
1052
if err != nil {
+204
-75
cmd/plcbundle/commands/migrate.go
+204
-75
cmd/plcbundle/commands/migrate.go
···
1
1
package commands
2
2
3
3
import (
4
+
"encoding/binary"
4
5
"fmt"
5
6
"os"
6
7
"path/filepath"
···
86
87
verbose bool
87
88
}
88
89
90
+
type bundleMigrationInfo struct {
91
+
bundleNumber int
92
+
oldSize int64
93
+
uncompressedSize int64
94
+
oldFormat string
95
+
oldCompressionRatio float64
96
+
}
97
+
89
98
func runMigration(mgr BundleManager, dir string, opts migrationOptions) error {
90
99
fmt.Printf("Scanning for legacy bundles in: %s\n\n", dir)
91
100
···
97
106
return nil
98
107
}
99
108
100
-
// Get plcbundle version
101
109
version := GetVersion()
110
+
ops := &storage.Operations{}
102
111
103
-
var needsMigration []int
112
+
var needsMigration []bundleMigrationInfo
104
113
var totalSize int64
105
114
106
-
ops := &storage.Operations{}
107
115
for _, meta := range bundles {
108
116
bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", meta.BundleNumber))
117
+
embeddedMeta, err := ops.ExtractBundleMetadata(bundlePath)
109
118
110
-
// Check if already has embedded metadata
111
-
_, err := ops.ExtractBundleMetadata(bundlePath)
119
+
info := bundleMigrationInfo{
120
+
bundleNumber: meta.BundleNumber,
121
+
oldSize: meta.CompressedSize,
122
+
uncompressedSize: meta.UncompressedSize,
123
+
}
124
+
125
+
if meta.CompressedSize > 0 {
126
+
info.oldCompressionRatio = float64(meta.UncompressedSize) / float64(meta.CompressedSize)
127
+
}
128
+
129
+
if err != nil {
130
+
info.oldFormat = "v0 (single-frame)"
131
+
} else {
132
+
info.oldFormat = embeddedMeta.Format
133
+
}
112
134
113
135
if err != nil || opts.force {
114
-
needsMigration = append(needsMigration, meta.BundleNumber)
136
+
needsMigration = append(needsMigration, info)
115
137
totalSize += meta.CompressedSize
116
138
}
117
139
}
···
122
144
return nil
123
145
}
124
146
125
-
// Display plan
147
+
// COMPACT PLAN
126
148
fmt.Printf("Migration Plan\n")
127
149
fmt.Printf("══════════════\n\n")
128
-
fmt.Printf(" Bundles to migrate: %d\n", len(needsMigration))
129
-
fmt.Printf(" Total size: %s\n", formatBytes(totalSize))
130
-
fmt.Printf(" Workers: %d\n", opts.workers)
131
-
fmt.Printf(" plcbundle version: %s\n", version)
132
-
fmt.Printf("\n")
150
+
151
+
formatCounts := make(map[string]int)
152
+
var totalUncompressed int64
153
+
for _, info := range needsMigration {
154
+
formatCounts[info.oldFormat]++
155
+
totalUncompressed += info.uncompressedSize
156
+
}
157
+
158
+
fmt.Printf(" Format: ")
159
+
first := true
160
+
for format, count := range formatCounts {
161
+
if !first {
162
+
fmt.Printf(" + ")
163
+
}
164
+
fmt.Printf("%s (%d)", format, count)
165
+
first = false
166
+
}
167
+
fmt.Printf(" → v%d\n", storage.MetadataFormatVersion)
168
+
169
+
fmt.Printf(" Bundles: %d\n", len(needsMigration))
170
+
fmt.Printf(" Size: %s (%.3fx compression)\n",
171
+
formatBytes(totalSize),
172
+
float64(totalUncompressed)/float64(totalSize))
173
+
fmt.Printf(" Workers: %d, Compression Level: %d\n\n", opts.workers, storage.CompressionLevel)
133
174
134
175
if opts.dryRun {
135
-
fmt.Printf("💡 This is a dry-run. No files will be modified.\n")
176
+
fmt.Printf("💡 Dry-run mode\n")
136
177
return nil
137
178
}
138
179
139
180
// Execute migration
140
-
fmt.Printf("Starting migration...\n\n")
181
+
fmt.Printf("Migrating...\n\n")
141
182
142
183
start := time.Now()
143
184
progress := ui.NewProgressBar(len(needsMigration))
···
145
186
success := 0
146
187
failed := 0
147
188
var firstError error
148
-
hashChanges := make([]int, 0, len(needsMigration))
189
+
hashChanges := make([]int, 0)
149
190
150
-
for i, bundleNum := range needsMigration {
151
-
// Pass version to migrateBundle
152
-
if err := migrateBundle(dir, bundleNum, index, version, opts.verbose); err != nil {
191
+
var totalOldSize int64
192
+
var totalNewSize int64
193
+
var totalOldUncompressed int64
194
+
var totalNewUncompressed int64
195
+
196
+
for i, info := range needsMigration {
197
+
totalOldSize += info.oldSize
198
+
totalOldUncompressed += info.uncompressedSize
199
+
200
+
sizeDiff, newUncompressedSize, err := migrateBundle(dir, info.bundleNumber, index, version, opts.verbose)
201
+
if err != nil {
153
202
failed++
154
203
if firstError == nil {
155
204
firstError = err
156
205
}
157
206
if opts.verbose {
158
-
fmt.Fprintf(os.Stderr, "\n✗ Bundle %06d failed: %v\n", bundleNum, err)
207
+
fmt.Fprintf(os.Stderr, "\n✗ Bundle %06d failed: %v\n", info.bundleNumber, err)
159
208
}
160
209
} else {
161
210
success++
162
-
hashChanges = append(hashChanges, bundleNum)
211
+
hashChanges = append(hashChanges, info.bundleNumber)
212
+
213
+
newSize := info.oldSize + sizeDiff
214
+
totalNewSize += newSize
215
+
totalNewUncompressed += newUncompressedSize
163
216
164
217
if opts.verbose {
165
-
fmt.Fprintf(os.Stderr, "✓ Migrated bundle %06d\n", bundleNum)
218
+
oldRatio := float64(info.uncompressedSize) / float64(info.oldSize)
219
+
newRatio := float64(newUncompressedSize) / float64(newSize)
220
+
221
+
fmt.Fprintf(os.Stderr, "✓ %06d: %.3fx→%.3fx %+s\n",
222
+
info.bundleNumber, oldRatio, newRatio, formatBytes(sizeDiff))
166
223
}
167
224
}
168
225
···
172
229
progress.Finish()
173
230
elapsed := time.Since(start)
174
231
175
-
// Update index with new compressed hashes
232
+
// Update index
176
233
if len(hashChanges) > 0 {
177
-
fmt.Printf("\nUpdating bundle index...\n")
234
+
fmt.Printf("\nUpdating index...\n")
178
235
updateStart := time.Now()
179
236
180
237
updated := 0
181
238
for _, bundleNum := range hashChanges {
182
239
bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum))
183
-
184
-
// Recalculate hashes
185
-
compHash, compSize, contentHash, contentSize, err := ops.CalculateFileHashes(bundlePath)
240
+
compHash, compSize, _, contentSize, err := ops.CalculateFileHashes(bundlePath)
186
241
if err != nil {
187
-
fmt.Fprintf(os.Stderr, " ⚠️ Failed to hash bundle %06d: %v\n", bundleNum, err)
242
+
fmt.Fprintf(os.Stderr, " ⚠️ Failed to hash %06d: %v\n", bundleNum, err)
188
243
continue
189
244
}
190
245
191
-
// Get and update metadata
192
246
bundleMeta, err := index.GetBundle(bundleNum)
193
247
if err != nil {
194
248
continue
195
249
}
196
250
197
-
// Verify content hash unchanged
198
-
if bundleMeta.ContentHash != contentHash {
199
-
fmt.Fprintf(os.Stderr, " ⚠️ Content hash changed for %06d (unexpected!)\n", bundleNum)
200
-
}
201
-
202
-
// Update compressed info (this changed due to skippable frames)
203
251
bundleMeta.CompressedHash = compHash
204
252
bundleMeta.CompressedSize = compSize
205
253
bundleMeta.UncompressedSize = contentSize
···
208
256
updated++
209
257
}
210
258
211
-
// Save index
212
259
if err := mgr.SaveIndex(); err != nil {
213
260
fmt.Fprintf(os.Stderr, " ⚠️ Failed to save index: %v\n", err)
214
261
} else {
215
-
fmt.Printf(" ✓ Updated %d entries in %s\n", updated, time.Since(updateStart).Round(time.Millisecond))
262
+
fmt.Printf(" ✓ %d entries in %s\n", updated, time.Since(updateStart).Round(time.Millisecond))
216
263
}
217
264
}
218
265
219
-
// Summary
266
+
// COMPACT SUMMARY
220
267
fmt.Printf("\n")
221
268
if failed == 0 {
222
-
fmt.Printf("✓ Migration complete in %s\n", elapsed.Round(time.Millisecond))
223
-
fmt.Printf(" Migrated: %d bundles\n", success)
224
-
fmt.Printf(" Index updated: %d entries\n", len(hashChanges))
225
-
fmt.Printf(" Speed: %.1f bundles/sec\n\n", float64(success)/elapsed.Seconds())
269
+
fmt.Printf("✓ Complete: %d bundles in %s\n\n", success, elapsed.Round(time.Millisecond))
270
+
271
+
if totalOldSize > 0 && success > 0 {
272
+
sizeDiff := totalNewSize - totalOldSize
273
+
oldRatio := float64(totalOldUncompressed) / float64(totalOldSize)
274
+
newRatio := float64(totalNewUncompressed) / float64(totalNewSize)
275
+
ratioDiff := newRatio - oldRatio
276
+
277
+
// MEASURE ACTUAL METADATA SIZE (not estimated)
278
+
var totalActualMetadata int64
279
+
for _, bundleNum := range hashChanges {
280
+
bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum))
281
+
metaSize, _ := measureMetadataSize(bundlePath)
282
+
totalActualMetadata += metaSize
283
+
}
284
+
285
+
// FIXED ALIGNMENT
286
+
fmt.Printf(" Old New Change\n")
287
+
fmt.Printf(" ──────── ──────── ─────────\n")
288
+
fmt.Printf("Size %-13s %-13s %+s (%.1f%%)\n",
289
+
formatBytes(totalOldSize),
290
+
formatBytes(totalNewSize),
291
+
formatBytes(sizeDiff),
292
+
float64(sizeDiff)/float64(totalOldSize)*100)
293
+
fmt.Printf("Ratio %-13s %-13s %+s\n",
294
+
fmt.Sprintf("%.3fx", oldRatio), fmt.Sprintf("%.3fx", newRatio), fmt.Sprintf("%+.3fx", ratioDiff))
295
+
fmt.Printf("Avg/bundle %-13s %-13s %+s\n\n",
296
+
formatBytes(totalOldSize/int64(success)),
297
+
formatBytes(totalNewSize/int64(success)),
298
+
formatBytes(sizeDiff/int64(success)))
299
+
300
+
// FIXED BREAKDOWN - use actual metadata size
301
+
if totalActualMetadata > 0 {
302
+
compressionEfficiency := sizeDiff - totalActualMetadata
303
+
304
+
fmt.Printf("Breakdown:\n")
305
+
fmt.Printf(" Metadata: %+s (~%s/bundle, structural)\n",
306
+
formatBytes(totalActualMetadata),
307
+
formatBytes(totalActualMetadata/int64(success)))
308
+
309
+
// FIX: Use absolute threshold based on old size, not metadata size
310
+
threshold := totalOldSize / 1000 // 0.1% of old size
226
311
312
+
if abs(compressionEfficiency) > threshold {
313
+
if compressionEfficiency > 0 {
314
+
// Compression got worse
315
+
pctWorse := float64(compressionEfficiency) / float64(totalOldSize) * 100
316
+
fmt.Printf(" Compression: %+s (%.2f%% worse)\n",
317
+
formatBytes(compressionEfficiency), pctWorse)
318
+
} else if compressionEfficiency < 0 {
319
+
// Compression improved
320
+
pctBetter := float64(-compressionEfficiency) / float64(totalOldSize) * 100
321
+
fmt.Printf(" Compression: %s (%.2f%% better)\n",
322
+
formatBytes(compressionEfficiency), pctBetter)
323
+
}
324
+
} else {
325
+
// Truly negligible
326
+
fmt.Printf(" Compression: unchanged\n")
327
+
}
328
+
}
329
+
330
+
fmt.Printf("\n")
331
+
}
227
332
} else {
228
-
fmt.Printf("⚠️ Migration completed with errors\n")
229
-
fmt.Printf(" Success: %d bundles\n", success)
230
-
fmt.Printf(" Failed: %d bundles\n", failed)
231
-
fmt.Printf(" Duration: %s\n", elapsed.Round(time.Millisecond))
333
+
fmt.Printf("⚠️ Failed: %d bundles\n", failed)
232
334
if firstError != nil {
233
-
fmt.Printf(" First error: %v\n", firstError)
335
+
fmt.Printf(" Error: %v\n", firstError)
234
336
}
235
337
return fmt.Errorf("migration failed for %d bundles", failed)
236
338
}
···
238
340
return nil
239
341
}
240
342
241
-
func migrateBundle(dir string, bundleNum int, index *bundleindex.Index, version string, verbose bool) error {
343
+
func migrateBundle(dir string, bundleNum int, index *bundleindex.Index, version string, verbose bool) (sizeDiff int64, newUncompressedSize int64, err error) {
242
344
bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum))
243
345
backupPath := bundlePath + ".bak"
244
346
245
-
// 1. Get metadata from index
246
347
meta, err := index.GetBundle(bundleNum)
247
348
if err != nil {
248
-
return fmt.Errorf("bundle not in index: %w", err)
349
+
return 0, 0, fmt.Errorf("bundle not in index: %w", err)
249
350
}
250
351
251
-
// 2. Load the bundle using old format
352
+
oldSize := meta.CompressedSize
353
+
252
354
ops := &storage.Operations{}
253
355
operations, err := ops.LoadBundle(bundlePath)
254
356
if err != nil {
255
-
return fmt.Errorf("failed to load: %w", err)
256
-
}
257
-
258
-
if verbose {
259
-
fmt.Fprintf(os.Stderr, " Loaded %d operations\n", len(operations))
357
+
return 0, 0, fmt.Errorf("failed to load: %w", err)
260
358
}
261
359
262
-
// 3. Backup original file
263
360
if err := os.Rename(bundlePath, backupPath); err != nil {
264
-
return fmt.Errorf("failed to backup: %w", err)
361
+
return 0, 0, fmt.Errorf("failed to backup: %w", err)
265
362
}
266
363
267
-
// 4. Get hostname (optional)
268
364
hostname, _ := os.Hostname()
269
365
270
-
// 5. Create BundleInfo for new format
271
366
bundleInfo := &storage.BundleInfo{
272
367
BundleNumber: meta.BundleNumber,
273
-
Origin: index.Origin, // From index
368
+
Origin: index.Origin,
274
369
ParentHash: meta.Parent,
275
370
Cursor: meta.Cursor,
276
371
CreatedBy: fmt.Sprintf("plcbundle/%s", version),
277
372
Hostname: hostname,
278
373
}
279
374
280
-
// 6. Save using new format (with skippable frame metadata)
281
-
contentHash, compHash, contentSize, compSize, err := ops.SaveBundle(bundlePath, operations, bundleInfo)
375
+
contentHash, _, contentSize, compSize, err := ops.SaveBundle(bundlePath, operations, bundleInfo)
282
376
if err != nil {
283
-
// Restore backup on failure
284
377
os.Rename(backupPath, bundlePath)
285
-
return fmt.Errorf("failed to save: %w", err)
378
+
return 0, 0, fmt.Errorf("failed to save: %w", err)
286
379
}
287
380
288
-
// 7. Verify embedded metadata was created
289
381
embeddedMeta, err := ops.ExtractBundleMetadata(bundlePath)
290
382
if err != nil {
291
383
os.Remove(bundlePath)
292
384
os.Rename(backupPath, bundlePath)
293
-
return fmt.Errorf("embedded metadata not created: %w", err)
385
+
return 0, 0, fmt.Errorf("embedded metadata not created: %w", err)
294
386
}
295
387
296
-
// 8. Verify frame offsets are present
297
388
if len(embeddedMeta.FrameOffsets) == 0 {
298
389
os.Remove(bundlePath)
299
390
os.Rename(backupPath, bundlePath)
300
-
return fmt.Errorf("frame offsets missing in metadata")
391
+
return 0, 0, fmt.Errorf("frame offsets missing in metadata")
301
392
}
302
393
303
-
// 9. Verify content hash matches (should be unchanged)
304
394
if contentHash != meta.ContentHash {
305
-
fmt.Fprintf(os.Stderr, " ⚠️ Content hash changed (unexpected): %s → %s\n",
395
+
fmt.Fprintf(os.Stderr, " ⚠️ Content hash changed: %s → %s\n",
306
396
meta.ContentHash[:12], contentHash[:12])
307
397
}
308
398
309
-
// 10. Cleanup backup
310
399
os.Remove(backupPath)
311
400
401
+
// Calculate changes
402
+
newSize := compSize
403
+
sizeDiff = newSize - oldSize
404
+
newUncompressedSize = contentSize
405
+
312
406
if verbose {
313
-
fmt.Fprintf(os.Stderr, " Content hash: %s (%s)\n", contentHash[:12], formatBytes(contentSize))
314
-
fmt.Fprintf(os.Stderr, " New compressed hash: %s (%s)\n", compHash[:12], formatBytes(compSize))
315
-
fmt.Fprintf(os.Stderr, " Frames: %d (embedded in metadata)\n", len(embeddedMeta.FrameOffsets)-1)
407
+
oldRatio := float64(meta.UncompressedSize) / float64(oldSize)
408
+
newRatio := float64(contentSize) / float64(newSize)
409
+
410
+
fmt.Fprintf(os.Stderr, " Frames: %d, Ratio: %.3fx→%.3fx, Size: %+s\n",
411
+
len(embeddedMeta.FrameOffsets)-1, oldRatio, newRatio, formatBytes(sizeDiff))
412
+
}
413
+
414
+
return sizeDiff, newUncompressedSize, nil
415
+
}
416
+
417
+
func measureMetadataSize(bundlePath string) (int64, error) {
418
+
file, err := os.Open(bundlePath)
419
+
if err != nil {
420
+
return 0, err
421
+
}
422
+
defer file.Close()
423
+
424
+
// Read magic (4 bytes) + size (4 bytes)
425
+
header := make([]byte, 8)
426
+
if _, err := file.Read(header); err != nil {
427
+
return 0, err
316
428
}
317
429
318
-
return nil
430
+
// Check if it's a skippable frame
431
+
magic := binary.LittleEndian.Uint32(header[0:4])
432
+
if magic < 0x184D2A50 || magic > 0x184D2A5F {
433
+
return 0, nil // No metadata frame
434
+
}
435
+
436
+
// Get frame data size
437
+
frameSize := binary.LittleEndian.Uint32(header[4:8])
438
+
439
+
// Total metadata size = 4 (magic) + 4 (size) + frameSize (data)
440
+
return int64(8 + frameSize), nil
441
+
}
442
+
443
+
func abs(n int64) int64 {
444
+
if n < 0 {
445
+
return -n
446
+
}
447
+
return n
319
448
}
+1
-1
cmd/plcbundle/commands/op.go
+1
-1
cmd/plcbundle/commands/op.go
···
291
291
func findOperationByCID(mgr BundleManager, cid string) error {
292
292
ctx := context.Background()
293
293
294
-
// ✨ CHECK MEMPOOL FIRST (most recent data)
294
+
// CHECK MEMPOOL FIRST (most recent data)
295
295
fmt.Fprintf(os.Stderr, "Checking mempool...\n")
296
296
mempoolOps, err := mgr.GetMempoolOperations()
297
297
if err == nil && len(mempoolOps) > 0 {
+1
-1
cmd/plcbundle/commands/server.go
+1
-1
cmd/plcbundle/commands/server.go
···
76
76
RunE: func(cmd *cobra.Command, args []string) error {
77
77
verbose, _ := cmd.Root().PersistentFlags().GetBool("verbose")
78
78
79
-
// ✨ Server in sync mode can create repo, read-only mode cannot
79
+
// Server in sync mode can create repo, read-only mode cannot
80
80
mgr, dir, err := getManager(&ManagerOptions{
81
81
Cmd: cmd,
82
82
PLCURL: plcURL,
+1
-1
cmd/plcbundle/commands/sync.go
+1
-1
cmd/plcbundle/commands/sync.go
···
58
58
verbose, _ := cmd.Root().PersistentFlags().GetBool("verbose")
59
59
quiet, _ := cmd.Root().PersistentFlags().GetBool("quiet")
60
60
61
-
// ✨ Sync creates repository if missing
61
+
// Sync creates repository if missing
62
62
mgr, dir, err := getManager(&ManagerOptions{
63
63
Cmd: cmd,
64
64
PLCURL: plcURL,
+4
-4
cmd/plcbundle/ui/progress.go
+4
-4
cmd/plcbundle/ui/progress.go
···
106
106
eta = time.Duration(float64(remaining)/speed) * time.Second
107
107
}
108
108
109
-
// ✨ FIX: Check if complete
109
+
// FIX: Check if complete
110
110
isComplete := pb.current >= pb.total
111
111
112
112
if pb.showBytes && pb.currentBytes > 0 {
···
114
114
mbPerSec := mbProcessed / elapsed.Seconds()
115
115
116
116
if isComplete {
117
-
// ✨ Don't show ETA when done
117
+
// Don't show ETA when done
118
118
fmt.Fprintf(os.Stderr, "\r [%s] %6.2f%% | %d/%d | %.1f/s | %.1f MB/s | Done ",
119
119
bar, percent, pb.current, pb.total, speed, mbPerSec)
120
120
} else {
···
123
123
}
124
124
} else {
125
125
if isComplete {
126
-
// ✨ Don't show ETA when done
126
+
// Don't show ETA when done
127
127
fmt.Fprintf(os.Stderr, "\r [%s] %6.2f%% | %d/%d | %.1f/s | Done ",
128
128
bar, percent, pb.current, pb.total, speed)
129
129
} else {
···
134
134
}
135
135
136
136
func formatETA(d time.Duration) string {
137
-
// ✨ This should never be called with 0 now, but keep as fallback
137
+
// This should never be called with 0 now, but keep as fallback
138
138
if d == 0 {
139
139
return "0s"
140
140
}
+1
-1
detector/script.go
+1
-1
detector/script.go
+1
-1
internal/mempool/mempool.go
+1
-1
internal/mempool/mempool.go
···
199
199
// Remove taken operations
200
200
m.operations = m.operations[n:]
201
201
202
-
// ✨ FIX: ALWAYS reset tracking after Take
202
+
// ALWAYS reset tracking after Take
203
203
// Take() means we're consuming these ops for a bundle
204
204
// Any remaining ops are "new" and unsaved
205
205
m.lastSavedLen = 0
+2
-2
internal/storage/zstd.go
+2
-2
internal/storage/zstd.go
···
17
17
// ============================================================================
18
18
19
19
const (
20
-
CompressionLevel = 2
20
+
CompressionLevel = 1
21
21
FrameSize = 100
22
22
23
23
SkippableMagicMetadata = 0x184D2A50
···
211
211
// ============================================================================
212
212
213
213
func CompressFrame(data []byte) ([]byte, error) {
214
-
compressed := gozstd.Compress(nil, data)
214
+
compressed := gozstd.CompressLevel(nil, data, CompressionLevel)
215
215
return compressed, nil
216
216
}
217
217