A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

clone cmd

+548
+389
bundle/clone.go
···
··· 1 + package bundle 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + "os" 10 + "path/filepath" 11 + "strings" 12 + "sync" 13 + "time" 14 + ) 15 + 16 + // CloneFromRemote clones bundles from a remote HTTP endpoint 17 + func (m *Manager) CloneFromRemote(ctx context.Context, opts CloneOptions) (*CloneResult, error) { 18 + if opts.Workers <= 0 { 19 + opts.Workers = 4 20 + } 21 + if opts.SaveInterval <= 0 { 22 + opts.SaveInterval = 5 * time.Second 23 + } 24 + if opts.Logger == nil { 25 + opts.Logger = m.logger 26 + } 27 + 28 + result := &CloneResult{} 29 + startTime := time.Now() 30 + 31 + // Step 1: Fetch remote index 32 + opts.Logger.Printf("Fetching remote index from %s", opts.RemoteURL) 33 + remoteIndex, err := m.loadRemoteIndex(opts.RemoteURL) 34 + if err != nil { 35 + return nil, fmt.Errorf("failed to load remote index: %w", err) 36 + } 37 + 38 + remoteBundles := remoteIndex.GetBundles() 39 + if len(remoteBundles) == 0 { 40 + opts.Logger.Printf("Remote has no bundles") 41 + return result, nil 42 + } 43 + 44 + result.RemoteBundles = len(remoteBundles) 45 + opts.Logger.Printf("Remote has %d bundles", len(remoteBundles)) 46 + 47 + // Step 2: Determine which bundles to download 48 + localBundleMap := make(map[int]*BundleMetadata) 49 + for _, meta := range m.index.GetBundles() { 50 + localBundleMap[meta.BundleNumber] = meta 51 + } 52 + 53 + // Create map of remote metadata for easy lookup 54 + remoteBundleMap := make(map[int]*BundleMetadata) 55 + for _, meta := range remoteBundles { 56 + remoteBundleMap[meta.BundleNumber] = meta 57 + } 58 + 59 + var bundlesToDownload []int 60 + var totalBytes int64 61 + for _, meta := range remoteBundles { 62 + if opts.SkipExisting && localBundleMap[meta.BundleNumber] != nil { 63 + result.Skipped++ 64 + if opts.Verbose { 65 + opts.Logger.Printf("Skipping existing bundle %06d", meta.BundleNumber) 66 + } 67 + continue 68 + } 69 + bundlesToDownload = append(bundlesToDownload, meta.BundleNumber) 70 + totalBytes += meta.CompressedSize 71 + } 72 + 73 + if len(bundlesToDownload) == 0 { 74 + opts.Logger.Printf("All bundles already exist locally") 75 + return result, nil 76 + } 77 + 78 + opts.Logger.Printf("Downloading %d bundles (%d bytes)", len(bundlesToDownload), totalBytes) 79 + 80 + // Step 3: Set up periodic index saving (using remote metadata) 81 + saveCtx, saveCancel := context.WithCancel(ctx) 82 + defer saveCancel() 83 + 84 + var downloadedBundles []int 85 + var downloadedMu sync.Mutex 86 + 87 + saveDone := make(chan struct{}) 88 + go func() { 89 + defer close(saveDone) 90 + ticker := time.NewTicker(opts.SaveInterval) 91 + defer ticker.Stop() 92 + 93 + for { 94 + select { 95 + case <-saveCtx.Done(): 96 + return 97 + case <-ticker.C: 98 + // Save index using remote metadata for downloaded bundles 99 + downloadedMu.Lock() 100 + bundles := make([]int, len(downloadedBundles)) 101 + copy(bundles, downloadedBundles) 102 + downloadedMu.Unlock() 103 + 104 + if opts.Verbose { 105 + opts.Logger.Printf("Periodic save: updating index with %d bundles", len(bundles)) 106 + } 107 + m.updateIndexFromRemote(bundles, remoteBundleMap, false) // silent during periodic save 108 + } 109 + } 110 + }() 111 + 112 + // Step 4: Download bundles concurrently 113 + successList, failedList, bytes := m.downloadBundlesConcurrent( 114 + ctx, 115 + opts.RemoteURL, 116 + bundlesToDownload, 117 + remoteBundleMap, // Pass the metadata map for hash verification 118 + totalBytes, 119 + opts.Workers, 120 + opts.ProgressFunc, 121 + opts.Verbose, 122 + ) 123 + 124 + result.Downloaded = len(successList) 125 + result.Failed = len(failedList) 126 + result.TotalBytes = bytes 127 + result.FailedBundles = failedList 128 + result.Interrupted = ctx.Err() != nil 129 + 130 + // Stop periodic saves 131 + saveCancel() 132 + <-saveDone 133 + 134 + // Step 5: Final index update using remote metadata 135 + opts.Logger.Printf("Updating local index...") 136 + if err := m.updateIndexFromRemote(successList, remoteBundleMap, opts.Verbose); err != nil { 137 + return result, fmt.Errorf("failed to update index: %w", err) 138 + } 139 + 140 + result.Duration = time.Since(startTime) 141 + return result, nil 142 + } 143 + 144 + // downloadBundlesConcurrent downloads bundles using a worker pool 145 + func (m *Manager) downloadBundlesConcurrent( 146 + ctx context.Context, 147 + baseURL string, 148 + bundleNumbers []int, 149 + remoteBundleMap map[int]*BundleMetadata, 150 + totalBytes int64, 151 + workers int, 152 + progressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64), 153 + verbose bool, 154 + ) (successList []int, failedList []int, downloadedBytes int64) { 155 + 156 + type job struct { 157 + bundleNum int 158 + expectedHash string 159 + } 160 + 161 + type result struct { 162 + bundleNum int 163 + success bool 164 + bytes int64 165 + err error 166 + } 167 + 168 + jobs := make(chan job, len(bundleNumbers)) 169 + results := make(chan result, len(bundleNumbers)) 170 + 171 + // Shared state 172 + var ( 173 + mu sync.Mutex 174 + processedCount int 175 + processedBytes int64 176 + success []int 177 + failed []int 178 + ) 179 + 180 + // Start workers 181 + var wg sync.WaitGroup 182 + client := &http.Client{ 183 + Timeout: 120 * time.Second, 184 + } 185 + 186 + for w := 0; w < workers; w++ { 187 + wg.Add(1) 188 + go func() { 189 + defer wg.Done() 190 + for j := range jobs { 191 + // Check cancellation 192 + select { 193 + case <-ctx.Done(): 194 + results <- result{ 195 + bundleNum: j.bundleNum, 196 + success: false, 197 + err: ctx.Err(), 198 + } 199 + continue 200 + default: 201 + } 202 + 203 + // Download bundle with hash verification 204 + bytes, err := m.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash) 205 + 206 + // Update progress 207 + mu.Lock() 208 + processedCount++ 209 + if err == nil { 210 + processedBytes += bytes 211 + success = append(success, j.bundleNum) 212 + } else { 213 + failed = append(failed, j.bundleNum) 214 + } 215 + 216 + if progressFunc != nil { 217 + progressFunc(processedCount, len(bundleNumbers), processedBytes, totalBytes) 218 + } 219 + mu.Unlock() 220 + 221 + results <- result{ 222 + bundleNum: j.bundleNum, 223 + success: err == nil, 224 + bytes: bytes, 225 + err: err, 226 + } 227 + } 228 + }() 229 + } 230 + 231 + // Send jobs with expected hashes 232 + for _, num := range bundleNumbers { 233 + expectedHash := "" 234 + if meta, exists := remoteBundleMap[num]; exists { 235 + expectedHash = meta.CompressedHash 236 + } 237 + jobs <- job{ 238 + bundleNum: num, 239 + expectedHash: expectedHash, 240 + } 241 + } 242 + close(jobs) 243 + 244 + // Wait for completion 245 + go func() { 246 + wg.Wait() 247 + close(results) 248 + }() 249 + 250 + // Collect results 251 + for res := range results { 252 + if res.err != nil && res.err != context.Canceled { 253 + m.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err) 254 + } else if res.success && verbose { 255 + m.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes) 256 + } 257 + } 258 + 259 + mu.Lock() 260 + successList = success 261 + failedList = failed 262 + downloadedBytes = processedBytes 263 + mu.Unlock() 264 + 265 + return 266 + } 267 + 268 + // updateIndexFromRemote updates local index with metadata from remote index 269 + func (m *Manager) updateIndexFromRemote(bundleNumbers []int, remoteMeta map[int]*BundleMetadata, verbose bool) error { 270 + if len(bundleNumbers) == 0 { 271 + return nil 272 + } 273 + 274 + // Add/update bundles in local index using remote metadata 275 + // Hash verification was already done during download 276 + for _, num := range bundleNumbers { 277 + if meta, exists := remoteMeta[num]; exists { 278 + // Verify the file exists locally 279 + path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 280 + if !m.operations.FileExists(path) { 281 + m.logger.Printf("Warning: bundle %06d not found locally, skipping", num) 282 + continue 283 + } 284 + 285 + // Add to index (no need to re-verify hash - already verified during download) 286 + m.index.AddBundle(meta) 287 + 288 + if verbose { 289 + m.logger.Printf("Added bundle %06d to index", num) 290 + } 291 + } 292 + } 293 + 294 + // Save index 295 + return m.SaveIndex() 296 + } 297 + 298 + // loadRemoteIndex loads an index from a remote URL 299 + func (m *Manager) loadRemoteIndex(baseURL string) (*Index, error) { 300 + indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json" 301 + 302 + client := &http.Client{ 303 + Timeout: 30 * time.Second, 304 + } 305 + 306 + resp, err := client.Get(indexURL) 307 + if err != nil { 308 + return nil, fmt.Errorf("failed to download: %w", err) 309 + } 310 + defer resp.Body.Close() 311 + 312 + if resp.StatusCode != http.StatusOK { 313 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 314 + } 315 + 316 + data, err := io.ReadAll(resp.Body) 317 + if err != nil { 318 + return nil, fmt.Errorf("failed to read response: %w", err) 319 + } 320 + 321 + var idx Index 322 + if err := json.Unmarshal(data, &idx); err != nil { 323 + return nil, fmt.Errorf("failed to parse index: %w", err) 324 + } 325 + 326 + return &idx, nil 327 + } 328 + 329 + // downloadBundle downloads a single bundle file and verifies its hash 330 + func (m *Manager) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) { 331 + url := fmt.Sprintf("%s/data/%d", strings.TrimSuffix(baseURL, "/"), bundleNum) 332 + filename := fmt.Sprintf("%06d.jsonl.zst", bundleNum) 333 + filepath := filepath.Join(m.config.BundleDir, filename) 334 + 335 + // Create request 336 + req, err := http.NewRequest("GET", url, nil) 337 + if err != nil { 338 + return 0, err 339 + } 340 + 341 + // Download 342 + resp, err := client.Do(req) 343 + if err != nil { 344 + return 0, err 345 + } 346 + defer resp.Body.Close() 347 + 348 + if resp.StatusCode != http.StatusOK { 349 + body, _ := io.ReadAll(resp.Body) 350 + return 0, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) 351 + } 352 + 353 + // Write to temp file (atomic write) 354 + tempPath := filepath + ".tmp" 355 + outFile, err := os.Create(tempPath) 356 + if err != nil { 357 + return 0, err 358 + } 359 + 360 + written, err := io.Copy(outFile, resp.Body) 361 + outFile.Close() 362 + 363 + if err != nil { 364 + os.Remove(tempPath) 365 + return 0, err 366 + } 367 + 368 + // Verify hash before committing 369 + if expectedHash != "" { 370 + valid, actualHash, err := m.operations.VerifyHash(tempPath, expectedHash) 371 + if err != nil { 372 + os.Remove(tempPath) 373 + return 0, fmt.Errorf("hash verification failed: %w", err) 374 + } 375 + if !valid { 376 + os.Remove(tempPath) 377 + return 0, fmt.Errorf("hash mismatch: expected %s, got %s", 378 + expectedHash[:16]+"...", actualHash[:16]+"...") 379 + } 380 + } 381 + 382 + // Rename to final location 383 + if err := os.Rename(tempPath, filepath); err != nil { 384 + os.Remove(tempPath) 385 + return 0, err 386 + } 387 + 388 + return written, nil 389 + }
+23
bundle/types.go
··· 190 Logger: nil, 191 } 192 }
··· 190 Logger: nil, 191 } 192 } 193 + 194 + // CloneOptions configures cloning behavior 195 + type CloneOptions struct { 196 + RemoteURL string 197 + Workers int 198 + SkipExisting bool 199 + ProgressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64) 200 + SaveInterval time.Duration 201 + Verbose bool 202 + Logger Logger 203 + } 204 + 205 + // CloneResult contains cloning results 206 + type CloneResult struct { 207 + RemoteBundles int 208 + Downloaded int 209 + Failed int 210 + Skipped int 211 + TotalBytes int64 212 + Duration time.Duration 213 + Interrupted bool 214 + FailedBundles []int 215 + }
+136
cmd/plcbundle/main.go
··· 6 "fmt" 7 "net/http" 8 "os" 9 "path/filepath" 10 "runtime" 11 "runtime/debug" 12 "sort" 13 "strings" 14 "time" 15 16 "tangled.org/atscan.net/plcbundle/bundle" ··· 61 switch command { 62 case "fetch": 63 cmdFetch() 64 case "rebuild": 65 cmdRebuild() 66 case "verify": ··· 96 97 Commands: 98 fetch Fetch next bundle from PLC directory 99 rebuild Rebuild index from existing bundle files 100 verify Verify bundle integrity 101 info Show bundle information ··· 237 } else { 238 fmt.Printf("\n✓ Already up to date!\n") 239 } 240 } 241 242 // isEndOfDataError checks if the error indicates we've reached the end of available data
··· 6 "fmt" 7 "net/http" 8 "os" 9 + "os/signal" 10 "path/filepath" 11 "runtime" 12 "runtime/debug" 13 "sort" 14 "strings" 15 + "sync" 16 + "syscall" 17 "time" 18 19 "tangled.org/atscan.net/plcbundle/bundle" ··· 64 switch command { 65 case "fetch": 66 cmdFetch() 67 + case "clone": 68 + cmdClone() 69 case "rebuild": 70 cmdRebuild() 71 case "verify": ··· 101 102 Commands: 103 fetch Fetch next bundle from PLC directory 104 + clone Clone bundles from remote HTTP endpoint 105 rebuild Rebuild index from existing bundle files 106 verify Verify bundle integrity 107 info Show bundle information ··· 243 } else { 244 fmt.Printf("\n✓ Already up to date!\n") 245 } 246 + } 247 + 248 + func cmdClone() { 249 + fs := flag.NewFlagSet("clone", flag.ExitOnError) 250 + workers := fs.Int("workers", 4, "number of concurrent download workers") 251 + verbose := fs.Bool("v", false, "verbose output") 252 + skipExisting := fs.Bool("skip-existing", true, "skip bundles that already exist locally") 253 + saveInterval := fs.Duration("save-interval", 5*time.Second, "interval to save index during download") 254 + fs.Parse(os.Args[2:]) 255 + 256 + if fs.NArg() < 1 { 257 + fmt.Fprintf(os.Stderr, "Usage: plcbundle clone <remote-url> [options]\n") 258 + fmt.Fprintf(os.Stderr, "\nClone bundles from a remote plcbundle HTTP endpoint\n\n") 259 + fmt.Fprintf(os.Stderr, "Options:\n") 260 + fs.PrintDefaults() 261 + fmt.Fprintf(os.Stderr, "\nExample:\n") 262 + fmt.Fprintf(os.Stderr, " plcbundle clone https://plc.example.com\n") 263 + fmt.Fprintf(os.Stderr, " plcbundle clone https://plc.example.com --workers 8\n") 264 + os.Exit(1) 265 + } 266 + 267 + remoteURL := strings.TrimSuffix(fs.Arg(0), "/") 268 + 269 + // Create manager 270 + mgr, dir, err := getManager("") 271 + if err != nil { 272 + fmt.Fprintf(os.Stderr, "Error: %v\n", err) 273 + os.Exit(1) 274 + } 275 + defer mgr.Close() 276 + 277 + fmt.Printf("Cloning from: %s\n", remoteURL) 278 + fmt.Printf("Target directory: %s\n", dir) 279 + fmt.Printf("Workers: %d\n", *workers) 280 + fmt.Printf("(Press Ctrl+C to safely interrupt - progress will be saved)\n\n") 281 + 282 + // Set up signal handling 283 + ctx, cancel := context.WithCancel(context.Background()) 284 + defer cancel() 285 + 286 + sigChan := make(chan os.Signal, 1) 287 + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 288 + 289 + go func() { 290 + <-sigChan 291 + fmt.Printf("\n\n⚠️ Interrupt received! Finishing current downloads and saving progress...\n") 292 + cancel() 293 + }() 294 + 295 + // Set up progress bar 296 + var progress *ProgressBar 297 + var progressMu sync.Mutex 298 + 299 + // Clone with library 300 + result, err := mgr.CloneFromRemote(ctx, bundle.CloneOptions{ 301 + RemoteURL: remoteURL, 302 + Workers: *workers, 303 + SkipExisting: *skipExisting, 304 + SaveInterval: *saveInterval, 305 + Verbose: *verbose, 306 + ProgressFunc: func(downloaded, total int, bytesDownloaded, bytesTotal int64) { 307 + progressMu.Lock() 308 + defer progressMu.Unlock() 309 + 310 + if progress == nil { 311 + progress = NewProgressBarWithBytes(total, bytesTotal) 312 + progress.showBytes = true 313 + } 314 + progress.SetWithBytes(downloaded, bytesDownloaded) 315 + }, 316 + }) 317 + 318 + if progress != nil { 319 + progress.Finish() 320 + } 321 + 322 + fmt.Printf("\n") 323 + 324 + if err != nil { 325 + fmt.Fprintf(os.Stderr, "Clone failed: %v\n", err) 326 + os.Exit(1) 327 + } 328 + 329 + // Display results 330 + if result.Interrupted { 331 + fmt.Printf("⚠️ Download interrupted by user\n") 332 + } else { 333 + fmt.Printf("✓ Clone complete in %s\n", result.Duration.Round(time.Millisecond)) 334 + } 335 + 336 + fmt.Printf("\nResults:\n") 337 + fmt.Printf(" Remote bundles: %d\n", result.RemoteBundles) 338 + if result.Skipped > 0 { 339 + fmt.Printf(" Skipped (existing): %d\n", result.Skipped) 340 + } 341 + fmt.Printf(" Downloaded: %d\n", result.Downloaded) 342 + if result.Failed > 0 { 343 + fmt.Printf(" Failed: %d\n", result.Failed) 344 + } 345 + fmt.Printf(" Total size: %s\n", formatBytes(result.TotalBytes)) 346 + 347 + if result.Duration.Seconds() > 0 && result.Downloaded > 0 { 348 + mbPerSec := float64(result.TotalBytes) / result.Duration.Seconds() / (1024 * 1024) 349 + bundlesPerSec := float64(result.Downloaded) / result.Duration.Seconds() 350 + fmt.Printf(" Average speed: %.1f MB/s (%.1f bundles/s)\n", mbPerSec, bundlesPerSec) 351 + } 352 + 353 + if result.Failed > 0 { 354 + fmt.Printf("\n⚠️ Failed bundles: ") 355 + for i, num := range result.FailedBundles { 356 + if i > 0 { 357 + fmt.Printf(", ") 358 + } 359 + if i > 10 { 360 + fmt.Printf("... and %d more", len(result.FailedBundles)-10) 361 + break 362 + } 363 + fmt.Printf("%06d", num) 364 + } 365 + fmt.Printf("\n") 366 + fmt.Printf("Re-run the clone command to retry failed bundles.\n") 367 + os.Exit(1) 368 + } 369 + 370 + if result.Interrupted { 371 + fmt.Printf("\n✓ Progress saved. Re-run the clone command to resume.\n") 372 + os.Exit(1) 373 + } 374 + 375 + fmt.Printf("\n✓ Clone complete!\n") 376 } 377 378 // isEndOfDataError checks if the error indicates we've reached the end of available data