A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

zstd frames update

Changed files
+559 -253
bundle
cmd
plcbundle
internal
server
+1 -1
bundle/bundle_test.go
··· 348 path := filepath.Join(tmpDir, "test_bundle.jsonl.zst") 349 350 // Save 351 - uncompHash, compHash, uncompSize, compSize, err := ops.SaveBundle(path, operations) 352 if err != nil { 353 t.Fatalf("SaveBundle failed: %v", err) 354 }
··· 348 path := filepath.Join(tmpDir, "test_bundle.jsonl.zst") 349 350 // Save 351 + uncompHash, compHash, uncompSize, compSize, err := ops.SaveBundle(path, operations, nil) 352 if err != nil { 353 t.Fatalf("SaveBundle failed: %v", err) 354 }
+51 -20
bundle/manager.go
··· 8 "os" 9 "path/filepath" 10 "runtime" 11 "sort" 12 "strings" 13 "sync" ··· 434 } 435 436 // SaveBundle saves a bundle to disk and updates the index 437 - // Returns the DID index update duration 438 func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, quiet bool) (time.Duration, error) { 439 if err := bundle.ValidateForSave(); err != nil { 440 return 0, fmt.Errorf("bundle validation failed: %w", err) ··· 442 443 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber)) 444 445 - // Save to disk 446 - uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations) 447 - if err != nil { 448 - return 0, fmt.Errorf("failed to save bundle: %w", err) 449 - } 450 - 451 - bundle.ContentHash = uncompressedHash 452 - bundle.CompressedHash = compressedHash 453 - bundle.UncompressedSize = uncompressedSize 454 - bundle.CompressedSize = compressedSize 455 - bundle.CreatedAt = time.Now().UTC() 456 - 457 // Get parent 458 var parent string 459 if bundle.BundleNumber > 1 { ··· 466 } 467 } 468 } 469 470 - bundle.Parent = parent 471 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 472 473 // Add to index 474 m.index.AddBundle(bundle.ToMetadata()) 475 476 // Save index 477 if err := m.SaveIndex(); err != nil { 478 return 0, fmt.Errorf("failed to save index: %w", err) 479 } 480 481 // Clean up old mempool 482 oldMempoolFile := m.mempool.GetFilename() 483 if err := m.mempool.Delete(); err != nil && !quiet { ··· 493 return 0, fmt.Errorf("failed to create new mempool: %w", err) 494 } 495 496 - oldMempool := m.mempool 497 m.mempool = newMempool 498 499 - oldMempool.Clear() 500 - 501 - // ✨ Update DID index if enabled and track timing 502 var indexUpdateDuration time.Duration 503 if m.didIndex != nil && m.didIndex.Exists() { 504 indexUpdateStart := time.Now() 505 - 506 if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil { 507 m.logger.Printf("Warning: failed to update DID index: %v", err) 508 } else { 509 indexUpdateDuration = time.Since(indexUpdateStart) 510 - 511 if !quiet { 512 m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration) 513 }
··· 8 "os" 9 "path/filepath" 10 "runtime" 11 + "runtime/debug" 12 "sort" 13 "strings" 14 "sync" ··· 435 } 436 437 // SaveBundle saves a bundle to disk and updates the index 438 func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, quiet bool) (time.Duration, error) { 439 if err := bundle.ValidateForSave(); err != nil { 440 return 0, fmt.Errorf("bundle validation failed: %w", err) ··· 442 443 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber)) 444 445 // Get parent 446 var parent string 447 if bundle.BundleNumber > 1 { ··· 454 } 455 } 456 } 457 + bundle.Parent = parent 458 459 + // Get origin 460 + origin := m.index.Origin 461 + if m.plcClient != nil { 462 + origin = m.plcClient.GetBaseURL() 463 + } 464 + 465 + // Get version 466 + version := "dev" 467 + if info, ok := debug.ReadBuildInfo(); ok && info.Main.Version != "" && info.Main.Version != "(devel)" { 468 + version = info.Main.Version 469 + } 470 + 471 + // Get hostname 472 + hostname, _ := os.Hostname() 473 + 474 + // ✅ Create BundleInfo 475 + bundleInfo := &storage.BundleInfo{ 476 + BundleNumber: bundle.BundleNumber, 477 + Origin: origin, 478 + ParentHash: parent, 479 + Cursor: bundle.Cursor, 480 + CreatedBy: fmt.Sprintf("plcbundle/%s", version), 481 + Hostname: hostname, 482 + } 483 + 484 + m.logger.Printf("DEBUG: Calling operations.SaveBundle with bundle=%d", bundleInfo.BundleNumber) 485 + 486 + // ✅ Save to disk with 3 parameters 487 + uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations, bundleInfo) 488 + if err != nil { 489 + m.logger.Printf("DEBUG: SaveBundle FAILED: %v", err) 490 + return 0, fmt.Errorf("failed to save bundle: %w", err) 491 + } 492 + 493 + m.logger.Printf("DEBUG: SaveBundle SUCCESS, setting bundle fields") 494 + 495 + bundle.ContentHash = uncompressedHash 496 + bundle.CompressedHash = compressedHash 497 + bundle.UncompressedSize = uncompressedSize 498 + bundle.CompressedSize = compressedSize 499 + bundle.CreatedAt = time.Now().UTC() 500 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 501 + 502 + m.logger.Printf("DEBUG: Adding bundle %d to index", bundle.BundleNumber) 503 504 // Add to index 505 m.index.AddBundle(bundle.ToMetadata()) 506 507 + m.logger.Printf("DEBUG: Index now has %d bundles", m.index.Count()) 508 + 509 // Save index 510 if err := m.SaveIndex(); err != nil { 511 + m.logger.Printf("DEBUG: SaveIndex FAILED: %v", err) 512 return 0, fmt.Errorf("failed to save index: %w", err) 513 } 514 515 + m.logger.Printf("DEBUG: Index saved, last bundle = %d", m.index.GetLastBundle().BundleNumber) 516 + 517 // Clean up old mempool 518 oldMempoolFile := m.mempool.GetFilename() 519 if err := m.mempool.Delete(); err != nil && !quiet { ··· 529 return 0, fmt.Errorf("failed to create new mempool: %w", err) 530 } 531 532 m.mempool = newMempool 533 534 + // DID index update (if enabled) 535 var indexUpdateDuration time.Duration 536 if m.didIndex != nil && m.didIndex.Exists() { 537 indexUpdateStart := time.Now() 538 if err := m.updateDIDIndexForBundle(ctx, bundle); err != nil { 539 m.logger.Printf("Warning: failed to update DID index: %v", err) 540 } else { 541 indexUpdateDuration = time.Since(indexUpdateStart) 542 if !quiet { 543 m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration) 544 }
+73 -45
cmd/plcbundle/commands/inspect.go
··· 244 result.FileSize = info.Size() 245 246 // Check for frame index 247 - indexPath := bundlePath + ".idx" 248 - if _, err := os.Stat(indexPath); err == nil { 249 - result.HasFrameIndex = true 250 } 251 252 fmt.Fprintf(os.Stderr, "Inspecting: %s\n", filepath.Base(bundlePath)) ··· 563 // DISPLAY FUNCTIONS 564 // ============================================================================ 565 566 - func displayInspectHuman(result *inspectResult, analysis *bundleAnalysis, opts inspectOptions) error { 567 fmt.Printf("\n") 568 fmt.Printf("═══════════════════════════════════════════════════════════════\n") 569 fmt.Printf(" Bundle Deep Inspection\n") ··· 582 meta := result.Metadata 583 fmt.Printf("📋 Embedded Metadata (Skippable Frame)\n") 584 fmt.Printf("──────────────────────────────────────\n") 585 fmt.Printf(" Bundle Number: %06d\n", meta.BundleNumber) 586 if meta.Origin != "" { 587 fmt.Printf(" Origin: %s\n", meta.Origin) 588 } 589 - fmt.Printf(" Operations: %s\n", formatNumber(meta.OperationCount)) 590 - fmt.Printf(" DIDs: %s unique\n", formatNumber(meta.DIDCount)) 591 - fmt.Printf(" Frames: %d\n", meta.FrameCount) 592 - fmt.Printf(" Uncompressed: %s\n", formatBytes(meta.UncompressedSize)) 593 - fmt.Printf(" Compressed: %s (%.2fx)\n", 594 - formatBytes(meta.CompressedSize), 595 - float64(meta.UncompressedSize)/float64(meta.CompressedSize)) 596 - fmt.Printf(" Timespan: %s → %s\n", 597 meta.StartTime.Format("2006-01-02 15:04:05"), 598 meta.EndTime.Format("2006-01-02 15:04:05")) 599 - fmt.Printf(" Duration: %s\n", 600 formatDuration(meta.EndTime.Sub(meta.StartTime))) 601 602 - if meta.ContentHash != "" { 603 - fmt.Printf("\n Hashes:\n") 604 - fmt.Printf(" Content: %s\n", meta.ContentHash[:16]+"...") 605 - fmt.Printf(" Compressed: %s\n", meta.CompressedHash[:16]+"...") 606 - if meta.ParentHash != "" { 607 - fmt.Printf(" Parent: %s\n", meta.ParentHash[:16]+"...") 608 - } 609 } 610 fmt.Printf("\n") 611 } ··· 794 func verifyCrypto(cmd *cobra.Command, path string, meta *storage.BundleMetadata, bundleNum int, verbose bool) (contentValid, compressedValid, metadataValid bool) { 795 ops := &storage.Operations{} 796 797 - // Calculate actual hashes 798 compHash, compSize, contentHash, contentSize, err := ops.CalculateFileHashes(path) 799 if err != nil { 800 if verbose { ··· 807 compressedValid = true 808 metadataValid = true 809 810 - // Verify against embedded metadata if available 811 if meta != nil { 812 if meta.ContentHash != "" && meta.ContentHash != contentHash { 813 contentValid = false 814 if verbose { ··· 818 } 819 } 820 821 - if meta.CompressedHash != "" && meta.CompressedHash != compHash { 822 - compressedValid = false 823 - if verbose { 824 - fmt.Fprintf(os.Stderr, " ✗ Compressed hash mismatch!\n") 825 - } 826 } 827 828 - if meta.UncompressedSize != contentSize { 829 - metadataValid = false 830 - if verbose { 831 - fmt.Fprintf(os.Stderr, " ✗ Uncompressed size mismatch: meta=%d, actual=%d\n", 832 - meta.UncompressedSize, contentSize) 833 - } 834 - } 835 836 - if meta.CompressedSize != compSize { 837 - metadataValid = false 838 - if verbose { 839 - fmt.Fprintf(os.Stderr, " ✗ Compressed size mismatch: meta=%d, actual=%d\n", 840 - meta.CompressedSize, compSize) 841 - } 842 } 843 } 844 845 - // Also verify against repository index if bundle number is known 846 if bundleNum > 0 { 847 - mgr, _, err := getManager(nil) 848 if err == nil { 849 defer mgr.Close() 850 851 ctx := context.Background() 852 vr, err := mgr.VerifyBundle(ctx, bundleNum) 853 - if err == nil { 854 - contentValid = contentValid && vr.Valid 855 - compressedValid = compressedValid && vr.HashMatch 856 } 857 } 858 } 859 860 return contentValid, compressedValid, metadataValid
··· 244 result.FileSize = info.Size() 245 246 // Check for frame index 247 + ops := &storage.Operations{} 248 + if _, err := ops.ExtractBundleMetadata(bundlePath); err == nil { 249 + result.HasFrameIndex = true // Has embedded index 250 + } else { 251 + // Check for external .idx file (legacy) 252 + indexPath := bundlePath + ".idx" 253 + if _, err := os.Stat(indexPath); err == nil { 254 + result.HasFrameIndex = true 255 + } 256 } 257 258 fmt.Fprintf(os.Stderr, "Inspecting: %s\n", filepath.Base(bundlePath)) ··· 569 // DISPLAY FUNCTIONS 570 // ============================================================================ 571 572 + func displayInspectHuman(result *inspectResult, _ *bundleAnalysis, opts inspectOptions) error { 573 fmt.Printf("\n") 574 fmt.Printf("═══════════════════════════════════════════════════════════════\n") 575 fmt.Printf(" Bundle Deep Inspection\n") ··· 588 meta := result.Metadata 589 fmt.Printf("📋 Embedded Metadata (Skippable Frame)\n") 590 fmt.Printf("──────────────────────────────────────\n") 591 + fmt.Printf(" Format: %s (v%d)\n", meta.Format, meta.Version) 592 + if meta.SpecURL != "" { 593 + fmt.Printf(" Specification: %s\n", meta.SpecURL) 594 + } 595 fmt.Printf(" Bundle Number: %06d\n", meta.BundleNumber) 596 if meta.Origin != "" { 597 fmt.Printf(" Origin: %s\n", meta.Origin) 598 } 599 + if meta.CreatedBy != "" { 600 + fmt.Printf(" Created by: %s\n", meta.CreatedBy) 601 + } 602 + if meta.CreatedByHost != "" { 603 + fmt.Printf(" Created on: %s\n", meta.CreatedByHost) 604 + } 605 + fmt.Printf(" Created at: %s\n", meta.CreatedAt.Format("2006-01-02 15:04:05 MST")) 606 + 607 + fmt.Printf("\n Content:\n") 608 + fmt.Printf(" Operations: %s\n", formatNumber(meta.OperationCount)) 609 + fmt.Printf(" Unique DIDs: %s\n", formatNumber(meta.DIDCount)) 610 + fmt.Printf(" Frames: %d × %d ops\n", meta.FrameCount, meta.FrameSize) 611 + fmt.Printf(" Timespan: %s → %s\n", 612 meta.StartTime.Format("2006-01-02 15:04:05"), 613 meta.EndTime.Format("2006-01-02 15:04:05")) 614 + fmt.Printf(" Duration: %s\n", 615 formatDuration(meta.EndTime.Sub(meta.StartTime))) 616 617 + fmt.Printf("\n Integrity:\n") 618 + fmt.Printf(" Content hash: %s\n", meta.ContentHash) 619 + if meta.ParentHash != "" { 620 + fmt.Printf(" Parent hash: %s\n", meta.ParentHash) 621 + } 622 + 623 + if len(meta.FrameOffsets) > 0 { 624 + fmt.Printf("\n Frame Index: %d offsets (embedded)\n", len(meta.FrameOffsets)) 625 + firstDataOffset := meta.FrameOffsets[0] 626 + fmt.Printf(" Metadata size: %s\n", formatBytes(firstDataOffset)) 627 + fmt.Printf(" First data frame: offset %d\n", firstDataOffset) 628 } 629 fmt.Printf("\n") 630 } ··· 813 func verifyCrypto(cmd *cobra.Command, path string, meta *storage.BundleMetadata, bundleNum int, verbose bool) (contentValid, compressedValid, metadataValid bool) { 814 ops := &storage.Operations{} 815 816 + // Calculate actual hashes from file 817 compHash, compSize, contentHash, contentSize, err := ops.CalculateFileHashes(path) 818 if err != nil { 819 if verbose { ··· 826 compressedValid = true 827 metadataValid = true 828 829 + // ✅ Verify against embedded metadata if available 830 if meta != nil { 831 + // Check content hash (this is in the metadata) 832 if meta.ContentHash != "" && meta.ContentHash != contentHash { 833 contentValid = false 834 if verbose { ··· 838 } 839 } 840 841 + if meta.OperationCount > 0 { 842 + // We can't verify this without loading, so skip 843 + metadataValid = true 844 } 845 846 + // ✅ Note: We don't check compressed hash/size because they're not in metadata 847 + // (The file IS the compressed data, so it's redundant) 848 849 + if verbose { 850 + fmt.Fprintf(os.Stderr, " Embedded metadata:\n") 851 + fmt.Fprintf(os.Stderr, " Content hash: %s\n", meta.ContentHash[:16]+"...") 852 + fmt.Fprintf(os.Stderr, " Operations: %d\n", meta.OperationCount) 853 + fmt.Fprintf(os.Stderr, " DIDs: %d\n", meta.DIDCount) 854 } 855 } 856 857 + // ✅ Also verify against repository index if bundle number is known 858 if bundleNum > 0 { 859 + mgr, _, err := getManager(&ManagerOptions{Cmd: cmd}) 860 if err == nil { 861 defer mgr.Close() 862 863 ctx := context.Background() 864 vr, err := mgr.VerifyBundle(ctx, bundleNum) 865 + if err == nil && vr != nil { 866 + // Index verification 867 + indexContentValid := vr.Valid 868 + indexHashMatch := vr.HashMatch 869 + 870 + if verbose { 871 + fmt.Fprintf(os.Stderr, " Repository index:\n") 872 + fmt.Fprintf(os.Stderr, " Content valid: %v\n", indexContentValid) 873 + fmt.Fprintf(os.Stderr, " Hash match: %v\n", indexHashMatch) 874 + } 875 + 876 + contentValid = contentValid && indexContentValid 877 + compressedValid = compressedValid && indexHashMatch 878 } 879 } 880 + } 881 + 882 + if verbose { 883 + fmt.Fprintf(os.Stderr, " Calculated hashes:\n") 884 + fmt.Fprintf(os.Stderr, " Content: %s (%s)\n", contentHash[:16]+"...", formatBytes(contentSize)) 885 + fmt.Fprintf(os.Stderr, " Compressed: %s (%s)\n", compHash[:16]+"...", formatBytes(compSize)) 886 } 887 888 return contentValid, compressedValid, metadataValid
+116 -35
cmd/plcbundle/commands/migrate.go
··· 8 9 "github.com/spf13/cobra" 10 "tangled.org/atscan.net/plcbundle/cmd/plcbundle/ui" 11 "tangled.org/atscan.net/plcbundle/internal/storage" 12 ) 13 ··· 88 func runMigration(mgr BundleManager, dir string, opts migrationOptions) error { 89 fmt.Printf("Scanning for legacy bundles in: %s\n\n", dir) 90 91 - // Find bundles needing migration 92 index := mgr.GetIndex() 93 bundles := index.GetBundles() 94 ··· 97 return nil 98 } 99 100 var needsMigration []int 101 var totalSize int64 102 103 for _, meta := range bundles { 104 bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", meta.BundleNumber)) 105 - idxPath := bundlePath + ".idx" 106 107 - // Check if .idx file exists 108 - if _, err := os.Stat(idxPath); os.IsNotExist(err) || opts.force { 109 needsMigration = append(needsMigration, meta.BundleNumber) 110 totalSize += meta.CompressedSize 111 } ··· 117 return nil 118 } 119 120 - // Display migration plan 121 fmt.Printf("Migration Plan\n") 122 fmt.Printf("══════════════\n\n") 123 fmt.Printf(" Bundles to migrate: %d\n", len(needsMigration)) 124 fmt.Printf(" Total size: %s\n", formatBytes(totalSize)) 125 fmt.Printf(" Workers: %d\n", opts.workers) 126 fmt.Printf("\n") 127 128 - if len(needsMigration) <= 20 { 129 - fmt.Printf(" Bundles: ") 130 - for i, num := range needsMigration { 131 - if i > 0 { 132 - fmt.Printf(", ") 133 - } 134 - fmt.Printf("%06d", num) 135 - } 136 - fmt.Printf("\n\n") 137 - } else { 138 - fmt.Printf(" Range: %06d - %06d\n\n", needsMigration[0], needsMigration[len(needsMigration)-1]) 139 - } 140 - 141 if opts.dryRun { 142 fmt.Printf("💡 This is a dry-run. No files will be modified.\n") 143 - fmt.Printf(" Run without --dry-run to perform migration.\n") 144 return nil 145 } 146 ··· 153 success := 0 154 failed := 0 155 var firstError error 156 157 for i, bundleNum := range needsMigration { 158 - if err := migrateBundle(dir, bundleNum, opts.verbose); err != nil { 159 failed++ 160 if firstError == nil { 161 firstError = err ··· 165 } 166 } else { 167 success++ 168 if opts.verbose { 169 fmt.Fprintf(os.Stderr, "✓ Migrated bundle %06d\n", bundleNum) 170 } ··· 176 progress.Finish() 177 elapsed := time.Since(start) 178 179 // Summary 180 fmt.Printf("\n") 181 if failed == 0 { 182 fmt.Printf("✓ Migration complete in %s\n", elapsed.Round(time.Millisecond)) 183 - fmt.Printf(" Migrated: %d bundles\n", success) 184 - fmt.Printf(" Speed: %.1f bundles/sec\n", float64(success)/elapsed.Seconds()) 185 } else { 186 fmt.Printf("⚠️ Migration completed with errors\n") 187 fmt.Printf(" Success: %d bundles\n", success) ··· 196 return nil 197 } 198 199 - func migrateBundle(dir string, bundleNum int, verbose bool) error { 200 bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 201 - idxPath := bundlePath + ".idx" 202 backupPath := bundlePath + ".bak" 203 204 - // 1. Load the bundle using legacy method (full decompression) 205 ops := &storage.Operations{} 206 operations, err := ops.LoadBundle(bundlePath) 207 if err != nil { ··· 212 fmt.Fprintf(os.Stderr, " Loaded %d operations\n", len(operations)) 213 } 214 215 - // 2. Backup original file 216 if err := os.Rename(bundlePath, backupPath); err != nil { 217 return fmt.Errorf("failed to backup: %w", err) 218 } 219 220 - // 3. Save using new multi-frame format 221 - contentHash, compHash, contentSize, compSize, err := ops.SaveBundle(bundlePath, operations) 222 if err != nil { 223 // Restore backup on failure 224 os.Rename(backupPath, bundlePath) 225 return fmt.Errorf("failed to save: %w", err) 226 } 227 228 - // 4. Verify .idx file was created 229 - if _, err := os.Stat(idxPath); os.IsNotExist(err) { 230 - // Restore backup if .idx wasn't created 231 os.Remove(bundlePath) 232 os.Rename(backupPath, bundlePath) 233 - return fmt.Errorf("frame index not created") 234 } 235 236 - // 5. Cleanup backup 237 os.Remove(backupPath) 238 239 if verbose { 240 - fmt.Fprintf(os.Stderr, " Content: %s (%s)\n", contentHash[:12], formatBytes(contentSize)) 241 - fmt.Fprintf(os.Stderr, " Compressed: %s (%s)\n", compHash[:12], formatBytes(compSize)) 242 } 243 244 return nil
··· 8 9 "github.com/spf13/cobra" 10 "tangled.org/atscan.net/plcbundle/cmd/plcbundle/ui" 11 + "tangled.org/atscan.net/plcbundle/internal/bundleindex" 12 "tangled.org/atscan.net/plcbundle/internal/storage" 13 ) 14 ··· 89 func runMigration(mgr BundleManager, dir string, opts migrationOptions) error { 90 fmt.Printf("Scanning for legacy bundles in: %s\n\n", dir) 91 92 index := mgr.GetIndex() 93 bundles := index.GetBundles() 94 ··· 97 return nil 98 } 99 100 + // Get plcbundle version 101 + version := GetVersion() 102 + 103 var needsMigration []int 104 var totalSize int64 105 106 + ops := &storage.Operations{} 107 for _, meta := range bundles { 108 bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", meta.BundleNumber)) 109 110 + // Check if already has embedded metadata 111 + _, err := ops.ExtractBundleMetadata(bundlePath) 112 + 113 + if err != nil || opts.force { 114 needsMigration = append(needsMigration, meta.BundleNumber) 115 totalSize += meta.CompressedSize 116 } ··· 122 return nil 123 } 124 125 + // Display plan 126 fmt.Printf("Migration Plan\n") 127 fmt.Printf("══════════════\n\n") 128 fmt.Printf(" Bundles to migrate: %d\n", len(needsMigration)) 129 fmt.Printf(" Total size: %s\n", formatBytes(totalSize)) 130 fmt.Printf(" Workers: %d\n", opts.workers) 131 + fmt.Printf(" plcbundle version: %s\n", version) 132 fmt.Printf("\n") 133 134 if opts.dryRun { 135 fmt.Printf("💡 This is a dry-run. No files will be modified.\n") 136 return nil 137 } 138 ··· 145 success := 0 146 failed := 0 147 var firstError error 148 + hashChanges := make([]int, 0, len(needsMigration)) 149 150 for i, bundleNum := range needsMigration { 151 + // ✅ Pass version to migrateBundle 152 + if err := migrateBundle(dir, bundleNum, index, version, opts.verbose); err != nil { 153 failed++ 154 if firstError == nil { 155 firstError = err ··· 159 } 160 } else { 161 success++ 162 + hashChanges = append(hashChanges, bundleNum) 163 + 164 if opts.verbose { 165 fmt.Fprintf(os.Stderr, "✓ Migrated bundle %06d\n", bundleNum) 166 } ··· 172 progress.Finish() 173 elapsed := time.Since(start) 174 175 + // ✅ Update index with new compressed hashes 176 + if len(hashChanges) > 0 { 177 + fmt.Printf("\nUpdating bundle index...\n") 178 + updateStart := time.Now() 179 + 180 + updated := 0 181 + for _, bundleNum := range hashChanges { 182 + bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 183 + 184 + // Recalculate hashes 185 + compHash, compSize, contentHash, contentSize, err := ops.CalculateFileHashes(bundlePath) 186 + if err != nil { 187 + fmt.Fprintf(os.Stderr, " ⚠️ Failed to hash bundle %06d: %v\n", bundleNum, err) 188 + continue 189 + } 190 + 191 + // Get and update metadata 192 + bundleMeta, err := index.GetBundle(bundleNum) 193 + if err != nil { 194 + continue 195 + } 196 + 197 + // Verify content hash unchanged 198 + if bundleMeta.ContentHash != contentHash { 199 + fmt.Fprintf(os.Stderr, " ⚠️ Content hash changed for %06d (unexpected!)\n", bundleNum) 200 + } 201 + 202 + // Update compressed info (this changed due to skippable frames) 203 + bundleMeta.CompressedHash = compHash 204 + bundleMeta.CompressedSize = compSize 205 + bundleMeta.UncompressedSize = contentSize 206 + 207 + index.AddBundle(bundleMeta) 208 + updated++ 209 + } 210 + 211 + // Save index 212 + if err := mgr.SaveIndex(); err != nil { 213 + fmt.Fprintf(os.Stderr, " ⚠️ Failed to save index: %v\n", err) 214 + } else { 215 + fmt.Printf(" ✓ Updated %d entries in %s\n", updated, time.Since(updateStart).Round(time.Millisecond)) 216 + } 217 + } 218 + 219 // Summary 220 fmt.Printf("\n") 221 if failed == 0 { 222 fmt.Printf("✓ Migration complete in %s\n", elapsed.Round(time.Millisecond)) 223 + fmt.Printf(" Migrated: %d bundles\n", success) 224 + fmt.Printf(" Index updated: %d entries\n", len(hashChanges)) 225 + fmt.Printf(" Speed: %.1f bundles/sec\n\n", float64(success)/elapsed.Seconds()) 226 + 227 + fmt.Printf("✨ New bundle format features:\n") 228 + fmt.Printf(" • Embedded metadata (JSON in skippable frame)\n") 229 + fmt.Printf(" • Frame offsets for instant random access\n") 230 + fmt.Printf(" • Multi-frame compression (100 ops/frame)\n") 231 + fmt.Printf(" • Self-contained (no .idx files)\n") 232 + fmt.Printf(" • Provenance tracking (version, origin, creator)\n") 233 + fmt.Printf(" • Compatible with standard zstd tools\n") 234 } else { 235 fmt.Printf("⚠️ Migration completed with errors\n") 236 fmt.Printf(" Success: %d bundles\n", success) ··· 245 return nil 246 } 247 248 + func migrateBundle(dir string, bundleNum int, index *bundleindex.Index, version string, verbose bool) error { 249 bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 250 backupPath := bundlePath + ".bak" 251 252 + // 1. Get metadata from index 253 + meta, err := index.GetBundle(bundleNum) 254 + if err != nil { 255 + return fmt.Errorf("bundle not in index: %w", err) 256 + } 257 + 258 + // 2. Load the bundle using old format 259 ops := &storage.Operations{} 260 operations, err := ops.LoadBundle(bundlePath) 261 if err != nil { ··· 266 fmt.Fprintf(os.Stderr, " Loaded %d operations\n", len(operations)) 267 } 268 269 + // 3. Backup original file 270 if err := os.Rename(bundlePath, backupPath); err != nil { 271 return fmt.Errorf("failed to backup: %w", err) 272 } 273 274 + // 4. Get hostname (optional) 275 + hostname, _ := os.Hostname() 276 + 277 + // 5. ✅ Create BundleInfo for new format 278 + bundleInfo := &storage.BundleInfo{ 279 + BundleNumber: meta.BundleNumber, 280 + Origin: index.Origin, // From index 281 + ParentHash: meta.Parent, 282 + Cursor: meta.Cursor, 283 + CreatedBy: fmt.Sprintf("plcbundle/%s", version), 284 + Hostname: hostname, 285 + } 286 + 287 + // 6. Save using new format (with skippable frame metadata) 288 + contentHash, compHash, contentSize, compSize, err := ops.SaveBundle(bundlePath, operations, bundleInfo) 289 if err != nil { 290 // Restore backup on failure 291 os.Rename(backupPath, bundlePath) 292 return fmt.Errorf("failed to save: %w", err) 293 } 294 295 + // 7. Verify embedded metadata was created 296 + embeddedMeta, err := ops.ExtractBundleMetadata(bundlePath) 297 + if err != nil { 298 + os.Remove(bundlePath) 299 + os.Rename(backupPath, bundlePath) 300 + return fmt.Errorf("embedded metadata not created: %w", err) 301 + } 302 + 303 + // 8. Verify frame offsets are present 304 + if len(embeddedMeta.FrameOffsets) == 0 { 305 os.Remove(bundlePath) 306 os.Rename(backupPath, bundlePath) 307 + return fmt.Errorf("frame offsets missing in metadata") 308 } 309 310 + // 9. Verify content hash matches (should be unchanged) 311 + if contentHash != meta.ContentHash { 312 + fmt.Fprintf(os.Stderr, " ⚠️ Content hash changed (unexpected): %s → %s\n", 313 + meta.ContentHash[:12], contentHash[:12]) 314 + } 315 + 316 + // 10. Cleanup backup 317 os.Remove(backupPath) 318 319 if verbose { 320 + fmt.Fprintf(os.Stderr, " Content hash: %s (%s)\n", contentHash[:12], formatBytes(contentSize)) 321 + fmt.Fprintf(os.Stderr, " New compressed hash: %s (%s)\n", compHash[:12], formatBytes(compSize)) 322 + fmt.Fprintf(os.Stderr, " Frames: %d (embedded in metadata)\n", len(embeddedMeta.FrameOffsets)-1) 323 } 324 325 return nil
+198 -76
internal/storage/storage.go
··· 16 "tangled.org/atscan.net/plcbundle/internal/plcclient" 17 ) 18 19 // Operations handles low-level bundle file operations 20 type Operations struct { 21 logger Logger ··· 33 34 func (op *Operations) Close() { 35 // Nothing to close 36 } 37 38 // ======================================== ··· 86 // FILE OPERATIONS (using zstd abstraction) 87 // ======================================== 88 89 - // SaveBundle saves operations to disk with embedded metadata 90 - func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation) (string, string, int64, int64, error) { 91 - // 1. Serialize all operations once 92 jsonlData := op.SerializeJSONL(operations) 93 contentSize := int64(len(jsonlData)) 94 contentHash := op.Hash(jsonlData) 95 96 - // 2. Create the destination file 97 - bundleFile, err := os.Create(path) 98 - if err != nil { 99 - return "", "", 0, 0, fmt.Errorf("could not create bundle file: %w", err) 100 } 101 - defer bundleFile.Close() 102 103 - // ✅ 3. Write metadata as skippable frame FIRST (placeholder - will update later) 104 - // We write a placeholder now and update after we know the compressed size 105 - placeholderMeta := &BundleMetadata{ 106 - Version: 1, 107 - BundleNumber: 0, // Will be set by caller 108 - UncompressedSize: contentSize, 109 - OperationCount: len(operations), 110 - FrameCount: (len(operations) + FrameSize - 1) / FrameSize, 111 - CreatedAt: time.Now().UTC(), 112 - } 113 114 - metadataStart, err := WriteMetadataFrame(bundleFile, placeholderMeta) 115 - if err != nil { 116 - return "", "", 0, 0, fmt.Errorf("failed to write metadata frame: %w", err) 117 - } 118 - 119 - frameOffsets := []int64{metadataStart} // First data frame starts after metadata 120 - 121 - // 4. Write data frames 122 for i := 0; i < len(operations); i += FrameSize { 123 end := i + FrameSize 124 if end > len(operations) { ··· 127 opChunk := operations[i:end] 128 chunkJsonlData := op.SerializeJSONL(opChunk) 129 130 - // Compress this chunk 131 compressedChunk, err := CompressFrame(chunkJsonlData) 132 if err != nil { 133 return "", "", 0, 0, fmt.Errorf("failed to compress frame: %w", err) 134 } 135 136 - // Write frame to file 137 - _, err = bundleFile.Write(compressedChunk) 138 - if err != nil { 139 - return "", "", 0, 0, fmt.Errorf("failed to write frame: %w", err) 140 - } 141 142 - // Get current offset for next frame 143 - currentOffset, err := bundleFile.Seek(0, io.SeekCurrent) 144 - if err != nil { 145 - return "", "", 0, 0, fmt.Errorf("failed to get file offset: %w", err) 146 - } 147 148 - if end < len(operations) { 149 - frameOffsets = append(frameOffsets, currentOffset) 150 - } 151 } 152 153 - // 5. Get final file size 154 - finalSize, _ := bundleFile.Seek(0, io.SeekCurrent) 155 - frameOffsets = append(frameOffsets, finalSize) 156 157 - // 6. Sync to disk 158 - if err := bundleFile.Sync(); err != nil { 159 - return "", "", 0, 0, fmt.Errorf("failed to sync file: %w", err) 160 } 161 162 - // 7. Save frame index (still useful for random access) 163 - indexPath := path + ".idx" 164 - indexData, _ := json.Marshal(frameOffsets) 165 - if err := os.WriteFile(indexPath, indexData, 0644); err != nil { 166 - os.Remove(path) 167 - return "", "", 0, 0, fmt.Errorf("failed to write frame index: %w", err) 168 } 169 170 - // 8. Calculate compressed hash 171 compressedData, err := os.ReadFile(path) 172 if err != nil { 173 - return "", "", 0, 0, fmt.Errorf("failed to re-read bundle for hashing: %w", err) 174 } 175 compressedHash := op.Hash(compressedData) 176 177 - return contentHash, compressedHash, contentSize, finalSize, nil 178 } 179 180 // LoadBundle loads a compressed bundle ··· 410 return nil, fmt.Errorf("invalid position: %d", position) 411 } 412 413 - indexPath := path + ".idx" 414 415 - // 1. Try to load frame index 416 indexData, err := os.ReadFile(indexPath) 417 if err != nil { 418 - if os.IsNotExist(err) { 419 - // Fallback to legacy full scan 420 - if op.logger != nil { 421 - op.logger.Printf("Frame index not found for %s, using legacy scan", filepath.Base(path)) 422 - } 423 - return op.loadOperationAtPositionLegacy(path, position) 424 - } 425 - return nil, fmt.Errorf("could not read frame index: %w", err) 426 } 427 428 - var frameOffsets []int64 429 - if err := json.Unmarshal(indexData, &frameOffsets); err != nil { 430 - return nil, fmt.Errorf("could not parse frame index: %w", err) 431 } 432 433 - // 2. Calculate target frame 434 frameIndex := position / FrameSize 435 lineInFrame := position % FrameSize 436 ··· 439 position, frameIndex, len(frameOffsets)-1) 440 } 441 442 - // 3. Read the specific frame from file 443 startOffset := frameOffsets[frameIndex] 444 endOffset := frameOffsets[frameIndex+1] 445 frameLength := endOffset - startOffset 446 447 - if frameLength <= 0 { 448 - return nil, fmt.Errorf("invalid frame length: %d", frameLength) 449 } 450 451 bundleFile, err := os.Open(path) ··· 455 defer bundleFile.Close() 456 457 compressedFrame := make([]byte, frameLength) 458 - _, err = bundleFile.ReadAt(compressedFrame, startOffset) 459 if err != nil { 460 - return nil, fmt.Errorf("failed to read frame %d: %w", frameIndex, err) 461 } 462 463 - // 4. ✅ Decompress this single frame 464 decompressed, err := DecompressFrame(compressedFrame) 465 if err != nil { 466 return nil, fmt.Errorf("failed to decompress frame %d: %w", frameIndex, err) 467 } 468 469 - // 5. Scan the decompressed data to find the target line 470 scanner := bufio.NewScanner(bytes.NewReader(decompressed)) 471 lineNum := 0 472
··· 16 "tangled.org/atscan.net/plcbundle/internal/plcclient" 17 ) 18 19 + const ( 20 + MetadataFormatVersion = 1 21 + ) 22 + 23 + // BundleMetadata - Self-describing bundle (content-focused, not container) 24 + type BundleMetadata struct { 25 + // === Format Info === 26 + Version int `json:"version"` // Metadata schema version (1) 27 + Format string `json:"format"` // "plcbundle-v1" 28 + SpecURL string `json:"spec_url"` // "https://github.com/atscan-net/plcbundle" 29 + 30 + // === Bundle Identity === 31 + BundleNumber int `json:"bundle_number"` // Sequential bundle number 32 + Origin string `json:"origin"` // Source PLC directory URL 33 + 34 + // === Creation Provenance === 35 + CreatedAt time.Time `json:"created_at"` // When bundle was created 36 + CreatedBy string `json:"created_by"` // "plcbundle/v1.2.3" 37 + CreatedByHost string `json:"created_by_host,omitempty"` // Optional: hostname that created it 38 + 39 + // === Content Integrity === 40 + ContentHash string `json:"content_hash"` // SHA256 of uncompressed JSONL content 41 + ParentHash string `json:"parent_hash,omitempty"` // Hash of previous bundle (chain) 42 + 43 + // === Content Description === 44 + OperationCount int `json:"operation_count"` // Always 10000 for complete bundles 45 + DIDCount int `json:"did_count"` // Unique DIDs in this bundle 46 + StartTime time.Time `json:"start_time"` // First operation timestamp 47 + EndTime time.Time `json:"end_time"` // Last operation timestamp 48 + 49 + // === Frame Structure (for random access) === 50 + FrameCount int `json:"frame_count"` // Number of zstd frames (usually 100) 51 + FrameSize int `json:"frame_size"` // Operations per frame (100) 52 + FrameOffsets []int64 `json:"frame_offsets"` // Byte offsets of each frame 53 + 54 + // === Optional Context === 55 + Cursor string `json:"cursor,omitempty"` // PLC export cursor for this bundle 56 + Notes string `json:"notes,omitempty"` // Optional description 57 + } 58 + 59 // Operations handles low-level bundle file operations 60 type Operations struct { 61 logger Logger ··· 73 74 func (op *Operations) Close() { 75 // Nothing to close 76 + } 77 + 78 + // BundleInfo contains info needed to create metadata 79 + type BundleInfo struct { 80 + BundleNumber int 81 + Origin string 82 + ParentHash string 83 + Cursor string 84 + CreatedBy string // "plcbundle/v1.2.3" 85 + Hostname string // Optional 86 } 87 88 // ======================================== ··· 136 // FILE OPERATIONS (using zstd abstraction) 137 // ======================================== 138 139 + // SaveBundle saves operations with metadata containing RELATIVE frame offsets 140 + func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation, bundleInfo *BundleInfo) (string, string, int64, int64, error) { 141 + // 1. Calculate content 142 jsonlData := op.SerializeJSONL(operations) 143 contentSize := int64(len(jsonlData)) 144 contentHash := op.Hash(jsonlData) 145 + dids := op.ExtractUniqueDIDs(operations) 146 147 + hostnameHash := "" 148 + if bundleInfo.Hostname != "" { 149 + hostnameHash = op.Hash([]byte(bundleInfo.Hostname))[:16] // First 16 chars (64 bits) 150 } 151 152 + // 2. Compress all frames 153 + compressedFrames := make([][]byte, 0) 154 155 for i := 0; i < len(operations); i += FrameSize { 156 end := i + FrameSize 157 if end > len(operations) { ··· 160 opChunk := operations[i:end] 161 chunkJsonlData := op.SerializeJSONL(opChunk) 162 163 compressedChunk, err := CompressFrame(chunkJsonlData) 164 if err != nil { 165 return "", "", 0, 0, fmt.Errorf("failed to compress frame: %w", err) 166 } 167 168 + compressedFrames = append(compressedFrames, compressedChunk) 169 + } 170 171 + // 3. ✅ Calculate RELATIVE offsets (relative to first data frame) 172 + relativeOffsets := make([]int64, len(compressedFrames)+1) 173 + relativeOffsets[0] = 0 174 175 + cumulative := int64(0) 176 + for i, frame := range compressedFrames { 177 + cumulative += int64(len(frame)) 178 + relativeOffsets[i+1] = cumulative 179 } 180 181 + // 4. ✅ Build metadata with RELATIVE offsets 182 + metadata := &BundleMetadata{ 183 + Version: MetadataFormatVersion, 184 + Format: "plcbundle-v1", 185 + SpecURL: "https://github.com/atscan-net/plcbundle", 186 + BundleNumber: bundleInfo.BundleNumber, 187 + Origin: bundleInfo.Origin, 188 + CreatedAt: time.Now().UTC(), 189 + CreatedBy: bundleInfo.CreatedBy, 190 + CreatedByHost: hostnameHash, 191 + ContentHash: contentHash, 192 + ParentHash: bundleInfo.ParentHash, 193 + OperationCount: len(operations), 194 + DIDCount: len(dids), 195 + FrameCount: len(compressedFrames), 196 + FrameSize: FrameSize, 197 + Cursor: bundleInfo.Cursor, 198 + FrameOffsets: relativeOffsets, // ✅ RELATIVE to data start! 199 + } 200 201 + if len(operations) > 0 { 202 + metadata.StartTime = operations[0].CreatedAt 203 + metadata.EndTime = operations[len(operations)-1].CreatedAt 204 } 205 206 + // 5. Write final file 207 + finalFile, err := os.Create(path) 208 + if err != nil { 209 + return "", "", 0, 0, fmt.Errorf("failed to create file: %w", err) 210 } 211 + defer func() { 212 + finalFile.Close() 213 + if err != nil { 214 + os.Remove(path) 215 + } 216 + }() 217 218 + // Write metadata frame 219 + if _, err := WriteMetadataFrame(finalFile, metadata); err != nil { 220 + return "", "", 0, 0, fmt.Errorf("failed to write metadata: %w", err) 221 + } 222 + 223 + // Write all data frames 224 + for _, frame := range compressedFrames { 225 + if _, err := finalFile.Write(frame); err != nil { 226 + return "", "", 0, 0, fmt.Errorf("failed to write frame: %w", err) 227 + } 228 + } 229 + 230 + finalFile.Sync() 231 + finalFile.Close() 232 + 233 + // 6. Hash 234 compressedData, err := os.ReadFile(path) 235 if err != nil { 236 + return "", "", 0, 0, err 237 } 238 compressedHash := op.Hash(compressedData) 239 240 + os.Remove(path + ".idx") 241 + 242 + return contentHash, compressedHash, contentSize, int64(len(compressedData)), nil 243 } 244 245 // LoadBundle loads a compressed bundle ··· 475 return nil, fmt.Errorf("invalid position: %d", position) 476 } 477 478 + // ✅ Try multiple sources for frame index (no goto!) 479 + frameOffsets, err := op.loadFrameIndex(path) 480 + if err != nil { 481 + // No frame index available - use legacy full scan 482 + if op.logger != nil { 483 + op.logger.Printf("No frame index found for %s, using legacy scan", filepath.Base(path)) 484 + } 485 + return op.loadOperationAtPositionLegacy(path, position) 486 + } 487 + 488 + // We have frame index - use it for fast random access 489 + return op.loadOperationFromFrame(path, position, frameOffsets) 490 + } 491 492 + // loadFrameIndex loads frame offsets and converts to absolute positions 493 + func (op *Operations) loadFrameIndex(path string) ([]int64, error) { 494 + // Try embedded metadata first 495 + meta, err := ExtractMetadataFromFile(path) 496 + if err == nil && len(meta.FrameOffsets) > 0 { 497 + // ✅ Convert relative offsets to absolute 498 + // First, get metadata frame size by re-reading 499 + file, _ := os.Open(path) 500 + if file != nil { 501 + defer file.Close() 502 + 503 + // Read metadata frame to find where data starts 504 + magic, data, readErr := ReadSkippableFrame(file) 505 + if readErr == nil && magic == SkippableMagicMetadata { 506 + // Metadata frame size = 4 (magic) + 4 (size) + len(data) 507 + metadataFrameSize := int64(8 + len(data)) 508 + 509 + // Convert relative to absolute 510 + absoluteOffsets := make([]int64, len(meta.FrameOffsets)) 511 + for i, relOffset := range meta.FrameOffsets { 512 + absoluteOffsets[i] = metadataFrameSize + relOffset 513 + } 514 + 515 + return absoluteOffsets, nil 516 + } 517 + } 518 + } 519 + 520 + // Fallback to external .idx file 521 + indexPath := path + ".idx" 522 indexData, err := os.ReadFile(indexPath) 523 if err != nil { 524 + return nil, fmt.Errorf("no frame index available: %w", err) 525 } 526 527 + var offsets []int64 528 + if err := json.Unmarshal(indexData, &offsets); err != nil { 529 + return nil, fmt.Errorf("invalid frame index: %w", err) 530 } 531 532 + return offsets, nil 533 + } 534 + 535 + // loadOperationFromFrame loads operation using frame index 536 + func (op *Operations) loadOperationFromFrame(path string, position int, frameOffsets []int64) (*plcclient.PLCOperation, error) { 537 frameIndex := position / FrameSize 538 lineInFrame := position % FrameSize 539 ··· 542 position, frameIndex, len(frameOffsets)-1) 543 } 544 545 startOffset := frameOffsets[frameIndex] 546 endOffset := frameOffsets[frameIndex+1] 547 frameLength := endOffset - startOffset 548 549 + // ✅ DEBUG 550 + if op.logger != nil { 551 + op.logger.Printf("DEBUG: Frame %d: offset %d-%d, length %d bytes", 552 + frameIndex, startOffset, endOffset, frameLength) 553 + } 554 + 555 + if frameLength <= 0 || frameLength > 10*1024*1024 { 556 + return nil, fmt.Errorf("invalid frame length: %d (offsets: %d-%d)", 557 + frameLength, startOffset, endOffset) 558 } 559 560 bundleFile, err := os.Open(path) ··· 564 defer bundleFile.Close() 565 566 compressedFrame := make([]byte, frameLength) 567 + n, err := bundleFile.ReadAt(compressedFrame, startOffset) 568 if err != nil { 569 + return nil, fmt.Errorf("failed to read frame %d (offset %d, length %d): %w", 570 + frameIndex, startOffset, frameLength, err) 571 } 572 573 + if op.logger != nil { 574 + op.logger.Printf("DEBUG: Read %d bytes from offset %d", n, startOffset) 575 + } 576 + 577 + // Decompress 578 decompressed, err := DecompressFrame(compressedFrame) 579 if err != nil { 580 + // ✅ DEBUG: Show first few bytes to diagnose 581 + if op.logger != nil { 582 + preview := compressedFrame 583 + if len(preview) > 16 { 584 + preview = preview[:16] 585 + } 586 + op.logger.Printf("DEBUG: Failed frame data (first 16 bytes): % x", preview) 587 + } 588 return nil, fmt.Errorf("failed to decompress frame %d: %w", frameIndex, err) 589 } 590 591 + // Scan to find the line 592 scanner := bufio.NewScanner(bytes.NewReader(decompressed)) 593 lineNum := 0 594
+13 -13
internal/storage/storage_test.go
··· 62 path := filepath.Join(tmpDir, tt.name+".jsonl.zst") 63 64 // Save 65 - _, _, _, _, err := ops.SaveBundle(path, original) 66 if err != nil { 67 t.Fatalf("SaveBundle failed: %v", err) 68 } ··· 98 operations := makeTestOperations(10000) 99 path := filepath.Join(tmpDir, "compression_test.jsonl.zst") 100 101 - _, _, uncompSize, compSize, err := ops.SaveBundle(path, operations) 102 if err != nil { 103 t.Fatalf("SaveBundle failed: %v", err) 104 } ··· 119 operations := makeTestOperations(100) 120 path := filepath.Join(tmpDir, "integrity_test.jsonl.zst") 121 122 - contentHash, compHash, _, _, err := ops.SaveBundle(path, operations) 123 if err != nil { 124 t.Fatalf("SaveBundle failed: %v", err) 125 } ··· 250 // Create test bundle 251 operations := makeTestOperations(10000) 252 path := filepath.Join(tmpDir, "parallel_test.jsonl.zst") 253 - _, _, _, _, err := ops.SaveBundle(path, operations) 254 if err != nil { 255 t.Fatalf("SaveBundle failed: %v", err) 256 } ··· 286 // Critical test - this is heavily used by DID lookups 287 operations := makeTestOperations(10000) 288 path := filepath.Join(tmpDir, "position_test.jsonl.zst") 289 - _, _, _, _, err := ops.SaveBundle(path, operations) 290 if err != nil { 291 t.Fatalf("SaveBundle failed: %v", err) 292 } ··· 321 t.Run("ConcurrentHashVerification", func(t *testing.T) { 322 operations := makeTestOperations(1000) 323 path := filepath.Join(tmpDir, "verify_test.jsonl.zst") 324 - _, compHash, _, _, err := ops.SaveBundle(path, operations) 325 if err != nil { 326 t.Fatalf("SaveBundle failed: %v", err) 327 } ··· 371 t.Run("TruncatedFile", func(t *testing.T) { 372 operations := makeTestOperations(100) 373 path := filepath.Join(tmpDir, "truncated.jsonl.zst") 374 - ops.SaveBundle(path, operations) 375 376 // Read and truncate 377 data, _ := os.ReadFile(path) ··· 389 390 // Manually compress invalid data 391 operations := makeTestOperations(10) 392 - ops.SaveBundle(path, operations) // Create valid file first 393 394 // Now corrupt it with invalid JSON 395 // This is hard to test properly since SaveBundle enforces valid data ··· 410 t.Run("InvalidPosition", func(t *testing.T) { 411 operations := makeTestOperations(100) 412 path := filepath.Join(tmpDir, "position_test.jsonl.zst") 413 - ops.SaveBundle(path, operations) 414 415 // Negative position 416 _, err := ops.LoadOperationAtPosition(path, -1) ··· 746 t.Run("StreamRaw", func(t *testing.T) { 747 operations := makeTestOperations(100) 748 path := filepath.Join(tmpDir, "stream_raw.jsonl.zst") 749 - _, _, _, _, err := ops.SaveBundle(path, operations) 750 if err != nil { 751 t.Fatalf("SaveBundle failed: %v", err) 752 } ··· 772 t.Run("StreamDecompressed", func(t *testing.T) { 773 operations := makeTestOperations(100) 774 path := filepath.Join(tmpDir, "stream_decomp.jsonl.zst") 775 - ops.SaveBundle(path, operations) 776 777 reader, err := ops.StreamDecompressed(path) 778 if err != nil { ··· 808 b.Run("SaveBundle", func(b *testing.B) { 809 for i := 0; i < b.N; i++ { 810 path := filepath.Join(tmpDir, fmt.Sprintf("bench_%d.jsonl.zst", i)) 811 - ops.SaveBundle(path, operations) 812 } 813 }) 814 815 // Create bundle for read benchmarks 816 testPath := filepath.Join(tmpDir, "bench_read.jsonl.zst") 817 - ops.SaveBundle(testPath, operations) 818 819 b.Run("LoadBundle", func(b *testing.B) { 820 for i := 0; i < b.N; i++ {
··· 62 path := filepath.Join(tmpDir, tt.name+".jsonl.zst") 63 64 // Save 65 + _, _, _, _, err := ops.SaveBundle(path, original, nil) 66 if err != nil { 67 t.Fatalf("SaveBundle failed: %v", err) 68 } ··· 98 operations := makeTestOperations(10000) 99 path := filepath.Join(tmpDir, "compression_test.jsonl.zst") 100 101 + _, _, uncompSize, compSize, err := ops.SaveBundle(path, operations, nil) 102 if err != nil { 103 t.Fatalf("SaveBundle failed: %v", err) 104 } ··· 119 operations := makeTestOperations(100) 120 path := filepath.Join(tmpDir, "integrity_test.jsonl.zst") 121 122 + contentHash, compHash, _, _, err := ops.SaveBundle(path, operations, nil) 123 if err != nil { 124 t.Fatalf("SaveBundle failed: %v", err) 125 } ··· 250 // Create test bundle 251 operations := makeTestOperations(10000) 252 path := filepath.Join(tmpDir, "parallel_test.jsonl.zst") 253 + _, _, _, _, err := ops.SaveBundle(path, operations, nil) 254 if err != nil { 255 t.Fatalf("SaveBundle failed: %v", err) 256 } ··· 286 // Critical test - this is heavily used by DID lookups 287 operations := makeTestOperations(10000) 288 path := filepath.Join(tmpDir, "position_test.jsonl.zst") 289 + _, _, _, _, err := ops.SaveBundle(path, operations, nil) 290 if err != nil { 291 t.Fatalf("SaveBundle failed: %v", err) 292 } ··· 321 t.Run("ConcurrentHashVerification", func(t *testing.T) { 322 operations := makeTestOperations(1000) 323 path := filepath.Join(tmpDir, "verify_test.jsonl.zst") 324 + _, compHash, _, _, err := ops.SaveBundle(path, operations, nil) 325 if err != nil { 326 t.Fatalf("SaveBundle failed: %v", err) 327 } ··· 371 t.Run("TruncatedFile", func(t *testing.T) { 372 operations := makeTestOperations(100) 373 path := filepath.Join(tmpDir, "truncated.jsonl.zst") 374 + ops.SaveBundle(path, operations, nil) 375 376 // Read and truncate 377 data, _ := os.ReadFile(path) ··· 389 390 // Manually compress invalid data 391 operations := makeTestOperations(10) 392 + ops.SaveBundle(path, operations, nil) // Create valid file first 393 394 // Now corrupt it with invalid JSON 395 // This is hard to test properly since SaveBundle enforces valid data ··· 410 t.Run("InvalidPosition", func(t *testing.T) { 411 operations := makeTestOperations(100) 412 path := filepath.Join(tmpDir, "position_test.jsonl.zst") 413 + ops.SaveBundle(path, operations, nil) 414 415 // Negative position 416 _, err := ops.LoadOperationAtPosition(path, -1) ··· 746 t.Run("StreamRaw", func(t *testing.T) { 747 operations := makeTestOperations(100) 748 path := filepath.Join(tmpDir, "stream_raw.jsonl.zst") 749 + _, _, _, _, err := ops.SaveBundle(path, operations, nil) 750 if err != nil { 751 t.Fatalf("SaveBundle failed: %v", err) 752 } ··· 772 t.Run("StreamDecompressed", func(t *testing.T) { 773 operations := makeTestOperations(100) 774 path := filepath.Join(tmpDir, "stream_decomp.jsonl.zst") 775 + ops.SaveBundle(path, operations, nil) 776 777 reader, err := ops.StreamDecompressed(path) 778 if err != nil { ··· 808 b.Run("SaveBundle", func(b *testing.B) { 809 for i := 0; i < b.N; i++ { 810 path := filepath.Join(tmpDir, fmt.Sprintf("bench_%d.jsonl.zst", i)) 811 + ops.SaveBundle(path, operations, nil) 812 } 813 }) 814 815 // Create bundle for read benchmarks 816 testPath := filepath.Join(tmpDir, "bench_read.jsonl.zst") 817 + ops.SaveBundle(testPath, operations, nil) 818 819 b.Run("LoadBundle", func(b *testing.B) { 820 for i := 0; i < b.N; i++ {
+106 -62
internal/storage/zstd.go
··· 1 package storage 2 3 import ( 4 "encoding/binary" 5 "encoding/json" 6 "fmt" 7 "io" 8 "os" 9 - "time" 10 11 "github.com/valyala/gozstd" 12 ) ··· 16 // ============================================================================ 17 18 const ( 19 - CompressionLevel = 3 20 FrameSize = 100 21 22 - // Skippable frame magic numbers (0x184D2A50 to 0x184D2A5F) 23 - // We use 0x184D2A50 for bundle metadata 24 SkippableMagicMetadata = 0x184D2A50 25 ) 26 27 - // BundleMetadata is stored in skippable frame at start of bundle file 28 - type BundleMetadata struct { 29 - Version int `json:"version"` // Metadata format version 30 - BundleNumber int `json:"bundle_number"` 31 - Origin string `json:"origin,omitempty"` 32 - 33 - // Hashes 34 - ContentHash string `json:"content_hash"` 35 - CompressedHash string `json:"compressed_hash"` 36 - ParentHash string `json:"parent_hash,omitempty"` 37 - 38 - // Sizes 39 - UncompressedSize int64 `json:"uncompressed_size"` 40 - CompressedSize int64 `json:"compressed_size"` 41 - 42 - // Timestamps 43 - StartTime time.Time `json:"start_time"` 44 - EndTime time.Time `json:"end_time"` 45 - CreatedAt time.Time `json:"created_at"` 46 - 47 - // Counts 48 - OperationCount int `json:"operation_count"` 49 - DIDCount int `json:"did_count"` 50 - FrameCount int `json:"frame_count"` 51 - 52 - // Additional info 53 - Cursor string `json:"cursor,omitempty"` 54 - } 55 - 56 // ============================================================================ 57 // SKIPPABLE FRAME FUNCTIONS 58 // ============================================================================ 59 60 // WriteSkippableFrame writes a skippable frame with the given data 61 - // Returns the number of bytes written 62 func WriteSkippableFrame(w io.Writer, magicNumber uint32, data []byte) (int64, error) { 63 - // Skippable frame format: 64 - // [4 bytes] Magic Number (0x184D2A5X) 65 - // [4 bytes] Frame Size (little-endian uint32) 66 - // [N bytes] Frame Data 67 - 68 frameSize := uint32(len(data)) 69 70 - // Write magic number 71 if err := binary.Write(w, binary.LittleEndian, magicNumber); err != nil { 72 return 0, err 73 } 74 75 - // Write frame size 76 if err := binary.Write(w, binary.LittleEndian, frameSize); err != nil { 77 return 0, err 78 } ··· 87 return totalBytes, nil 88 } 89 90 - // ReadSkippableFrame reads a skippable frame from the reader 91 - // Returns the magic number and data, or error if not a skippable frame 92 func ReadSkippableFrame(r io.Reader) (uint32, []byte, error) { 93 - // Read magic number 94 var magic uint32 95 if err := binary.Read(r, binary.LittleEndian, &magic); err != nil { 96 - return 0, nil, err 97 } 98 99 - // Verify it's a skippable frame (0x184D2A50 to 0x184D2A5F) 100 if magic < 0x184D2A50 || magic > 0x184D2A5F { 101 - return 0, nil, fmt.Errorf("not a skippable frame: magic=0x%08X", magic) 102 } 103 104 - // Read frame size 105 var frameSize uint32 106 if err := binary.Read(r, binary.LittleEndian, &frameSize); err != nil { 107 - return 0, nil, err 108 } 109 110 - // Read frame data 111 data := make([]byte, frameSize) 112 if _, err := io.ReadFull(r, data); err != nil { 113 - return 0, nil, err 114 } 115 116 return magic, data, nil 117 } 118 119 - // WriteMetadataFrame writes bundle metadata as a skippable frame 120 func WriteMetadataFrame(w io.Writer, meta *BundleMetadata) (int64, error) { 121 - // Serialize metadata to JSON 122 jsonData, err := json.Marshal(meta) 123 if err != nil { 124 return 0, fmt.Errorf("failed to marshal metadata: %w", err) 125 } 126 - 127 - // Write as skippable frame 128 return WriteSkippableFrame(w, SkippableMagicMetadata, jsonData) 129 } 130 ··· 148 return &meta, nil 149 } 150 151 - // ExtractMetadataFromFile reads just the metadata without decompressing the bundle 152 func ExtractMetadataFromFile(path string) (*BundleMetadata, error) { 153 file, err := os.Open(path) 154 if err != nil { ··· 156 } 157 defer file.Close() 158 159 - // Try to read skippable frame at start 160 meta, err := ReadMetadataFrame(file) 161 if err != nil { 162 return nil, fmt.Errorf("no metadata frame found: %w", err) ··· 165 return meta, nil 166 } 167 168 // ============================================================================ 169 // COMPRESSION/DECOMPRESSION 170 // ============================================================================ 171 172 - // CompressFrame compresses a single chunk of data into a zstd frame 173 func CompressFrame(data []byte) ([]byte, error) { 174 compressed := gozstd.Compress(nil, data) 175 return compressed, nil 176 } 177 178 - // DecompressAll decompresses all frames in the compressed data 179 func DecompressAll(compressed []byte) ([]byte, error) { 180 decompressed, err := gozstd.Decompress(nil, compressed) 181 if err != nil { ··· 184 return decompressed, nil 185 } 186 187 - // DecompressFrame decompresses a single frame 188 func DecompressFrame(compressedFrame []byte) ([]byte, error) { 189 return gozstd.Decompress(nil, compressedFrame) 190 } 191 192 - // NewStreamingReader creates a streaming decompressor 193 func NewStreamingReader(r io.Reader) (StreamReader, error) { 194 reader := gozstd.NewReader(r) 195 return &gozstdReader{reader: reader}, nil 196 } 197 198 - // NewStreamingWriter creates a streaming compressor at default level 199 func NewStreamingWriter(w io.Writer) (StreamWriter, error) { 200 writer := gozstd.NewWriterLevel(w, CompressionLevel) 201 return &gozstdWriter{writer: writer}, nil
··· 1 package storage 2 3 import ( 4 + "bytes" 5 "encoding/binary" 6 "encoding/json" 7 "fmt" 8 "io" 9 "os" 10 + "path/filepath" 11 12 "github.com/valyala/gozstd" 13 ) ··· 17 // ============================================================================ 18 19 const ( 20 + CompressionLevel = 2 21 FrameSize = 100 22 23 SkippableMagicMetadata = 0x184D2A50 24 ) 25 26 // ============================================================================ 27 // SKIPPABLE FRAME FUNCTIONS 28 // ============================================================================ 29 30 // WriteSkippableFrame writes a skippable frame with the given data 31 func WriteSkippableFrame(w io.Writer, magicNumber uint32, data []byte) (int64, error) { 32 frameSize := uint32(len(data)) 33 34 + // Write magic number (little-endian) 35 if err := binary.Write(w, binary.LittleEndian, magicNumber); err != nil { 36 return 0, err 37 } 38 39 + // Write frame size (little-endian) 40 if err := binary.Write(w, binary.LittleEndian, frameSize); err != nil { 41 return 0, err 42 } ··· 51 return totalBytes, nil 52 } 53 54 + // ReadSkippableFrame with debug 55 func ReadSkippableFrame(r io.Reader) (uint32, []byte, error) { 56 var magic uint32 57 if err := binary.Read(r, binary.LittleEndian, &magic); err != nil { 58 + return 0, nil, fmt.Errorf("failed to read magic: %w", err) 59 } 60 61 + fmt.Fprintf(os.Stderr, "DEBUG: Read magic number: 0x%08X\n", magic) 62 + 63 if magic < 0x184D2A50 || magic > 0x184D2A5F { 64 + return 0, nil, fmt.Errorf("not a skippable frame: magic=0x%08X (expected 0x184D2A50-0x184D2A5F)", magic) 65 } 66 67 var frameSize uint32 68 if err := binary.Read(r, binary.LittleEndian, &frameSize); err != nil { 69 + return 0, nil, fmt.Errorf("failed to read frame size: %w", err) 70 } 71 72 + fmt.Fprintf(os.Stderr, "DEBUG: Frame size: %d bytes\n", frameSize) 73 + 74 data := make([]byte, frameSize) 75 if _, err := io.ReadFull(r, data); err != nil { 76 + return 0, nil, fmt.Errorf("failed to read frame data: %w", err) 77 } 78 79 + fmt.Fprintf(os.Stderr, "DEBUG: Read %d bytes of frame data\n", len(data)) 80 + 81 return magic, data, nil 82 } 83 84 + // WriteMetadataFrame writes bundle metadata as skippable frame (compact JSON) 85 func WriteMetadataFrame(w io.Writer, meta *BundleMetadata) (int64, error) { 86 jsonData, err := json.Marshal(meta) 87 if err != nil { 88 return 0, fmt.Errorf("failed to marshal metadata: %w", err) 89 } 90 return WriteSkippableFrame(w, SkippableMagicMetadata, jsonData) 91 } 92 ··· 110 return &meta, nil 111 } 112 113 + // ExtractMetadataFromFile reads metadata without decompressing 114 func ExtractMetadataFromFile(path string) (*BundleMetadata, error) { 115 file, err := os.Open(path) 116 if err != nil { ··· 118 } 119 defer file.Close() 120 121 + // ✅ DEBUG: Check first bytes 122 + header := make([]byte, 8) 123 + if _, err := file.Read(header); err != nil { 124 + return nil, fmt.Errorf("failed to read header: %w", err) 125 + } 126 + 127 + fmt.Fprintf(os.Stderr, "DEBUG: First 8 bytes: % x\n", header) 128 + 129 + // Seek back to start 130 + file.Seek(0, io.SeekStart) 131 + 132 meta, err := ReadMetadataFrame(file) 133 if err != nil { 134 return nil, fmt.Errorf("no metadata frame found: %w", err) ··· 137 return meta, nil 138 } 139 140 + // ExtractFrameIndexFromFile now just reads from metadata 141 + func ExtractFrameIndexFromFile(path string) ([]int64, error) { 142 + meta, err := ExtractMetadataFromFile(path) 143 + if err != nil { 144 + return nil, err 145 + } 146 + 147 + if len(meta.FrameOffsets) == 0 { 148 + return nil, fmt.Errorf("metadata has no frame offsets") 149 + } 150 + 151 + return meta.FrameOffsets, nil 152 + } 153 + 154 + // DebugFrameOffsets extracts and displays frame offset information 155 + func DebugFrameOffsets(path string) error { 156 + meta, err := ExtractMetadataFromFile(path) 157 + if err != nil { 158 + return fmt.Errorf("failed to extract metadata: %w", err) 159 + } 160 + 161 + fmt.Printf("Frame Offset Debug for: %s\n\n", filepath.Base(path)) 162 + fmt.Printf("Metadata:\n") 163 + fmt.Printf(" Bundle: %d\n", meta.BundleNumber) 164 + fmt.Printf(" Frames: %d\n", meta.FrameCount) 165 + fmt.Printf(" Frame size: %d ops\n", meta.FrameSize) 166 + fmt.Printf(" Total ops: %d\n", meta.OperationCount) 167 + 168 + fmt.Printf("\nFrame Offsets (%d total):\n", len(meta.FrameOffsets)) 169 + for i, offset := range meta.FrameOffsets { 170 + if i < len(meta.FrameOffsets)-1 { 171 + nextOffset := meta.FrameOffsets[i+1] 172 + frameSize := nextOffset - offset 173 + fmt.Printf(" Frame %3d: offset %10d, size %10d bytes\n", i, offset, frameSize) 174 + } else { 175 + fmt.Printf(" End mark: offset %10d\n", offset) 176 + } 177 + } 178 + 179 + // Try to verify first frame 180 + fmt.Printf("\nVerifying first frame...\n") 181 + file, err := os.Open(path) 182 + if err != nil { 183 + return err 184 + } 185 + defer file.Close() 186 + 187 + if len(meta.FrameOffsets) < 2 { 188 + return fmt.Errorf("not enough frame offsets") 189 + } 190 + 191 + startOffset := meta.FrameOffsets[0] 192 + endOffset := meta.FrameOffsets[1] 193 + frameLength := endOffset - startOffset 194 + 195 + fmt.Printf(" Start: %d, End: %d, Length: %d\n", startOffset, endOffset, frameLength) 196 + 197 + compressedFrame := make([]byte, frameLength) 198 + _, err = file.ReadAt(compressedFrame, startOffset) 199 + if err != nil { 200 + return fmt.Errorf("failed to read: %w", err) 201 + } 202 + 203 + decompressed, err := DecompressFrame(compressedFrame) 204 + if err != nil { 205 + return fmt.Errorf("failed to decompress: %w", err) 206 + } 207 + 208 + fmt.Printf(" ✓ Decompressed: %d bytes\n", len(decompressed)) 209 + 210 + // Count lines 211 + lines := bytes.Count(decompressed, []byte("\n")) 212 + fmt.Printf(" ✓ Lines: %d\n", lines) 213 + 214 + return nil 215 + } 216 + 217 // ============================================================================ 218 // COMPRESSION/DECOMPRESSION 219 // ============================================================================ 220 221 func CompressFrame(data []byte) ([]byte, error) { 222 compressed := gozstd.Compress(nil, data) 223 return compressed, nil 224 } 225 226 func DecompressAll(compressed []byte) ([]byte, error) { 227 decompressed, err := gozstd.Decompress(nil, compressed) 228 if err != nil { ··· 231 return decompressed, nil 232 } 233 234 func DecompressFrame(compressedFrame []byte) ([]byte, error) { 235 return gozstd.Decompress(nil, compressedFrame) 236 } 237 238 func NewStreamingReader(r io.Reader) (StreamReader, error) { 239 reader := gozstd.NewReader(r) 240 return &gozstdReader{reader: reader}, nil 241 } 242 243 func NewStreamingWriter(w io.Writer) (StreamWriter, error) { 244 writer := gozstd.NewWriterLevel(w, CompressionLevel) 245 return &gozstdWriter{writer: writer}, nil
+1 -1
server/server_test.go
··· 1006 path := filepath.Join(tmpDir, fmt.Sprintf("%06d.jsonl.zst", i)) 1007 ops := makeMinimalTestOperations(10000, i*10000) // Unique ops per bundle 1008 1009 - contentHash, compHash, uncompSize, compSize, err := storageOps.SaveBundle(path, ops) 1010 if err != nil { 1011 t.Fatalf("failed to save test bundle %d: %v", i, err) 1012 }
··· 1006 path := filepath.Join(tmpDir, fmt.Sprintf("%06d.jsonl.zst", i)) 1007 ops := makeMinimalTestOperations(10000, i*10000) // Unique ops per bundle 1008 1009 + contentHash, compHash, uncompSize, compSize, err := storageOps.SaveBundle(path, ops, nil) 1010 if err != nil { 1011 t.Fatalf("failed to save test bundle %d: %v", i, err) 1012 }