[DEPRECATED] Go implementation of plcbundle

migrate for framed zstd

Changed files
+246
cmd
plcbundle
commands
+245
cmd/plcbundle/commands/migrate.go
··· 1 + package commands 2 + 3 + import ( 4 + "fmt" 5 + "os" 6 + "path/filepath" 7 + "time" 8 + 9 + "github.com/spf13/cobra" 10 + "tangled.org/atscan.net/plcbundle/cmd/plcbundle/ui" 11 + "tangled.org/atscan.net/plcbundle/internal/storage" 12 + ) 13 + 14 + func NewMigrateCommand() *cobra.Command { 15 + var ( 16 + dryRun bool 17 + force bool 18 + workers int 19 + ) 20 + 21 + cmd := &cobra.Command{ 22 + Use: "migrate [flags]", 23 + Short: "Migrate bundles to new zstd frame format", 24 + Long: `Migrate old single-frame zstd bundles to new multi-frame format 25 + 26 + This command converts bundles from the legacy single-frame zstd format 27 + to the new multi-frame format with .idx index files. This enables: 28 + • Faster random access to individual operations 29 + • Reduced memory usage when loading specific positions 30 + • Better performance for DID lookups 31 + 32 + The migration: 33 + 1. Scans for bundles missing .idx files (legacy format) 34 + 2. Re-compresses them using multi-frame format (100 ops/frame) 35 + 3. Generates .idx frame offset index files 36 + 4. Preserves all hashes and metadata 37 + 5. Verifies content integrity 38 + 39 + Original files are replaced atomically. Use --dry-run to preview.`, 40 + 41 + Example: ` # Preview migration (recommended first) 42 + plcbundle migrate --dry-run 43 + 44 + # Migrate all legacy bundles 45 + plcbundle migrate 46 + 47 + # Force migration even if .idx files exist 48 + plcbundle migrate --force 49 + 50 + # Parallel migration (faster) 51 + plcbundle migrate --workers 8 52 + 53 + # Verbose output 54 + plcbundle migrate -v`, 55 + 56 + RunE: func(cmd *cobra.Command, args []string) error { 57 + verbose, _ := cmd.Root().PersistentFlags().GetBool("verbose") 58 + 59 + mgr, dir, err := getManager(&ManagerOptions{Cmd: cmd}) 60 + if err != nil { 61 + return err 62 + } 63 + defer mgr.Close() 64 + 65 + return runMigration(mgr, dir, migrationOptions{ 66 + dryRun: dryRun, 67 + force: force, 68 + workers: workers, 69 + verbose: verbose, 70 + }) 71 + }, 72 + } 73 + 74 + cmd.Flags().BoolVarP(&dryRun, "dry-run", "n", false, "Show what would be migrated without migrating") 75 + cmd.Flags().BoolVarP(&force, "force", "f", false, "Re-migrate bundles that already have .idx files") 76 + cmd.Flags().IntVarP(&workers, "workers", "w", 4, "Number of parallel workers") 77 + 78 + return cmd 79 + } 80 + 81 + type migrationOptions struct { 82 + dryRun bool 83 + force bool 84 + workers int 85 + verbose bool 86 + } 87 + 88 + func runMigration(mgr BundleManager, dir string, opts migrationOptions) error { 89 + fmt.Printf("Scanning for legacy bundles in: %s\n\n", dir) 90 + 91 + // Find bundles needing migration 92 + index := mgr.GetIndex() 93 + bundles := index.GetBundles() 94 + 95 + if len(bundles) == 0 { 96 + fmt.Println("No bundles to migrate") 97 + return nil 98 + } 99 + 100 + var needsMigration []int 101 + var totalSize int64 102 + 103 + for _, meta := range bundles { 104 + bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", meta.BundleNumber)) 105 + idxPath := bundlePath + ".idx" 106 + 107 + // Check if .idx file exists 108 + if _, err := os.Stat(idxPath); os.IsNotExist(err) || opts.force { 109 + needsMigration = append(needsMigration, meta.BundleNumber) 110 + totalSize += meta.CompressedSize 111 + } 112 + } 113 + 114 + if len(needsMigration) == 0 { 115 + fmt.Println("✓ All bundles already migrated") 116 + fmt.Println("\nUse --force to re-migrate") 117 + return nil 118 + } 119 + 120 + // Display migration plan 121 + fmt.Printf("Migration Plan\n") 122 + fmt.Printf("══════════════\n\n") 123 + fmt.Printf(" Bundles to migrate: %d\n", len(needsMigration)) 124 + fmt.Printf(" Total size: %s\n", formatBytes(totalSize)) 125 + fmt.Printf(" Workers: %d\n", opts.workers) 126 + fmt.Printf("\n") 127 + 128 + if len(needsMigration) <= 20 { 129 + fmt.Printf(" Bundles: ") 130 + for i, num := range needsMigration { 131 + if i > 0 { 132 + fmt.Printf(", ") 133 + } 134 + fmt.Printf("%06d", num) 135 + } 136 + fmt.Printf("\n\n") 137 + } else { 138 + fmt.Printf(" Range: %06d - %06d\n\n", needsMigration[0], needsMigration[len(needsMigration)-1]) 139 + } 140 + 141 + if opts.dryRun { 142 + fmt.Printf("💡 This is a dry-run. No files will be modified.\n") 143 + fmt.Printf(" Run without --dry-run to perform migration.\n") 144 + return nil 145 + } 146 + 147 + // Execute migration 148 + fmt.Printf("Starting migration...\n\n") 149 + 150 + start := time.Now() 151 + progress := ui.NewProgressBar(len(needsMigration)) 152 + 153 + success := 0 154 + failed := 0 155 + var firstError error 156 + 157 + for i, bundleNum := range needsMigration { 158 + if err := migrateBundle(dir, bundleNum, opts.verbose); err != nil { 159 + failed++ 160 + if firstError == nil { 161 + firstError = err 162 + } 163 + if opts.verbose { 164 + fmt.Fprintf(os.Stderr, "\n✗ Bundle %06d failed: %v\n", bundleNum, err) 165 + } 166 + } else { 167 + success++ 168 + if opts.verbose { 169 + fmt.Fprintf(os.Stderr, "✓ Migrated bundle %06d\n", bundleNum) 170 + } 171 + } 172 + 173 + progress.Set(i + 1) 174 + } 175 + 176 + progress.Finish() 177 + elapsed := time.Since(start) 178 + 179 + // Summary 180 + fmt.Printf("\n") 181 + if failed == 0 { 182 + fmt.Printf("✓ Migration complete in %s\n", elapsed.Round(time.Millisecond)) 183 + fmt.Printf(" Migrated: %d bundles\n", success) 184 + fmt.Printf(" Speed: %.1f bundles/sec\n", float64(success)/elapsed.Seconds()) 185 + } else { 186 + fmt.Printf("⚠️ Migration completed with errors\n") 187 + fmt.Printf(" Success: %d bundles\n", success) 188 + fmt.Printf(" Failed: %d bundles\n", failed) 189 + fmt.Printf(" Duration: %s\n", elapsed.Round(time.Millisecond)) 190 + if firstError != nil { 191 + fmt.Printf(" First error: %v\n", firstError) 192 + } 193 + return fmt.Errorf("migration failed for %d bundles", failed) 194 + } 195 + 196 + return nil 197 + } 198 + 199 + func migrateBundle(dir string, bundleNum int, verbose bool) error { 200 + bundlePath := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", bundleNum)) 201 + idxPath := bundlePath + ".idx" 202 + backupPath := bundlePath + ".bak" 203 + 204 + // 1. Load the bundle using legacy method (full decompression) 205 + ops := &storage.Operations{} 206 + operations, err := ops.LoadBundle(bundlePath) 207 + if err != nil { 208 + return fmt.Errorf("failed to load: %w", err) 209 + } 210 + 211 + if verbose { 212 + fmt.Fprintf(os.Stderr, " Loaded %d operations\n", len(operations)) 213 + } 214 + 215 + // 2. Backup original file 216 + if err := os.Rename(bundlePath, backupPath); err != nil { 217 + return fmt.Errorf("failed to backup: %w", err) 218 + } 219 + 220 + // 3. Save using new multi-frame format 221 + contentHash, compHash, contentSize, compSize, err := ops.SaveBundle(bundlePath, operations) 222 + if err != nil { 223 + // Restore backup on failure 224 + os.Rename(backupPath, bundlePath) 225 + return fmt.Errorf("failed to save: %w", err) 226 + } 227 + 228 + // 4. Verify .idx file was created 229 + if _, err := os.Stat(idxPath); os.IsNotExist(err) { 230 + // Restore backup if .idx wasn't created 231 + os.Remove(bundlePath) 232 + os.Rename(backupPath, bundlePath) 233 + return fmt.Errorf("frame index not created") 234 + } 235 + 236 + // 5. Cleanup backup 237 + os.Remove(backupPath) 238 + 239 + if verbose { 240 + fmt.Fprintf(os.Stderr, " Content: %s (%s)\n", contentHash[:12], formatBytes(contentSize)) 241 + fmt.Fprintf(os.Stderr, " Compressed: %s (%s)\n", compHash[:12], formatBytes(compSize)) 242 + } 243 + 244 + return nil 245 + }
+1
cmd/plcbundle/main.go
··· 71 71 /*cmd.AddCommand(commands.NewWatchCommand()) 72 72 cmd.AddCommand(commands.NewHealCommand())*/ 73 73 cmd.AddCommand(commands.NewCleanCommand()) 74 + cmd.AddCommand(commands.NewMigrateCommand()) 74 75 75 76 // Server 76 77 cmd.AddCommand(commands.NewServerCommand())