A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at test-validate 398 lines 9.8 kB view raw
1package bundle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "os" 10 "path/filepath" 11 "strings" 12 "sync" 13 "time" 14) 15 16// CloneFromRemote clones bundles from a remote HTTP endpoint 17func (m *Manager) CloneFromRemote(ctx context.Context, opts CloneOptions) (*CloneResult, error) { 18 if opts.Workers <= 0 { 19 opts.Workers = 4 20 } 21 if opts.SaveInterval <= 0 { 22 opts.SaveInterval = 5 * time.Second 23 } 24 if opts.Logger == nil { 25 opts.Logger = m.logger 26 } 27 28 result := &CloneResult{} 29 startTime := time.Now() 30 31 // Step 1: Fetch remote index 32 opts.Logger.Printf("Fetching remote index from %s", opts.RemoteURL) 33 remoteIndex, err := m.loadRemoteIndex(opts.RemoteURL) 34 if err != nil { 35 return nil, fmt.Errorf("failed to load remote index: %w", err) 36 } 37 38 remoteBundles := remoteIndex.GetBundles() 39 if len(remoteBundles) == 0 { 40 opts.Logger.Printf("Remote has no bundles") 41 return result, nil 42 } 43 44 result.RemoteBundles = len(remoteBundles) 45 opts.Logger.Printf("Remote has %d bundles", len(remoteBundles)) 46 47 // Step 2: Determine which bundles to download 48 localBundleMap := make(map[int]*BundleMetadata) 49 for _, meta := range m.index.GetBundles() { 50 localBundleMap[meta.BundleNumber] = meta 51 } 52 53 // Create map of remote metadata for easy lookup 54 remoteBundleMap := make(map[int]*BundleMetadata) 55 for _, meta := range remoteBundles { 56 remoteBundleMap[meta.BundleNumber] = meta 57 } 58 59 var bundlesToDownload []int 60 var totalBytes int64 61 for _, meta := range remoteBundles { 62 if opts.SkipExisting && localBundleMap[meta.BundleNumber] != nil { 63 result.Skipped++ 64 if opts.Verbose { 65 opts.Logger.Printf("Skipping existing bundle %06d", meta.BundleNumber) 66 } 67 continue 68 } 69 bundlesToDownload = append(bundlesToDownload, meta.BundleNumber) 70 totalBytes += meta.CompressedSize 71 } 72 73 if len(bundlesToDownload) == 0 { 74 opts.Logger.Printf("All bundles already exist locally") 75 return result, nil 76 } 77 78 opts.Logger.Printf("Downloading %d bundles (%d bytes)", len(bundlesToDownload), totalBytes) 79 80 // Step 3: Set up periodic index saving (using remote metadata) 81 saveCtx, saveCancel := context.WithCancel(ctx) 82 defer saveCancel() 83 84 var downloadedBundles []int 85 var downloadedMu sync.Mutex 86 87 saveDone := make(chan struct{}) 88 go func() { 89 defer close(saveDone) 90 ticker := time.NewTicker(opts.SaveInterval) 91 defer ticker.Stop() 92 93 for { 94 select { 95 case <-saveCtx.Done(): 96 return 97 case <-ticker.C: 98 // Save index using remote metadata for downloaded bundles 99 downloadedMu.Lock() 100 bundles := make([]int, len(downloadedBundles)) 101 copy(bundles, downloadedBundles) 102 downloadedMu.Unlock() 103 104 if opts.Verbose { 105 opts.Logger.Printf("Periodic save: updating index with %d bundles", len(bundles)) 106 } 107 m.updateIndexFromRemote(bundles, remoteBundleMap, false) // silent during periodic save 108 } 109 } 110 }() 111 112 // Step 4: Download bundles concurrently 113 successList, failedList, bytes := m.downloadBundlesConcurrent( 114 ctx, 115 opts.RemoteURL, 116 bundlesToDownload, 117 remoteBundleMap, // Pass the metadata map for hash verification 118 totalBytes, 119 opts.Workers, 120 opts.ProgressFunc, 121 opts.Verbose, 122 &downloadedBundles, 123 &downloadedMu, 124 ) 125 126 result.Downloaded = len(successList) 127 result.Failed = len(failedList) 128 result.TotalBytes = bytes 129 result.FailedBundles = failedList 130 result.Interrupted = ctx.Err() != nil 131 132 // Stop periodic saves 133 saveCancel() 134 <-saveDone 135 136 // Step 5: Final index update using remote metadata 137 opts.Logger.Printf("Updating local index...") 138 if err := m.updateIndexFromRemote(successList, remoteBundleMap, opts.Verbose); err != nil { 139 return result, fmt.Errorf("failed to update index: %w", err) 140 } 141 142 result.Duration = time.Since(startTime) 143 return result, nil 144} 145 146// downloadBundlesConcurrent downloads bundles using a worker pool 147func (m *Manager) downloadBundlesConcurrent( 148 ctx context.Context, 149 baseURL string, 150 bundleNumbers []int, 151 remoteBundleMap map[int]*BundleMetadata, 152 totalBytes int64, 153 workers int, 154 progressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64), 155 verbose bool, 156 downloadedBundles *[]int, 157 downloadedMu *sync.Mutex, 158) (successList []int, failedList []int, downloadedBytes int64) { 159 160 type job struct { 161 bundleNum int 162 expectedHash string 163 } 164 165 type result struct { 166 bundleNum int 167 success bool 168 bytes int64 169 err error 170 } 171 172 jobs := make(chan job, len(bundleNumbers)) 173 results := make(chan result, len(bundleNumbers)) 174 175 // Shared state 176 var ( 177 mu sync.Mutex 178 processedCount int 179 processedBytes int64 180 success []int 181 failed []int 182 ) 183 184 // Start workers 185 var wg sync.WaitGroup 186 client := &http.Client{ 187 Timeout: 120 * time.Second, 188 } 189 190 for w := 0; w < workers; w++ { 191 wg.Add(1) 192 go func() { 193 defer wg.Done() 194 for j := range jobs { 195 // Check cancellation 196 select { 197 case <-ctx.Done(): 198 results <- result{ 199 bundleNum: j.bundleNum, 200 success: false, 201 err: ctx.Err(), 202 } 203 continue 204 default: 205 } 206 207 // Download bundle with hash verification 208 bytes, err := m.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash) 209 210 // Update progress 211 mu.Lock() 212 processedCount++ 213 if err == nil { 214 processedBytes += bytes 215 success = append(success, j.bundleNum) 216 if downloadedMu != nil && downloadedBundles != nil { 217 downloadedMu.Lock() 218 *downloadedBundles = append(*downloadedBundles, j.bundleNum) 219 downloadedMu.Unlock() 220 } 221 } else { 222 failed = append(failed, j.bundleNum) 223 } 224 225 if progressFunc != nil { 226 progressFunc(processedCount, len(bundleNumbers), processedBytes, totalBytes) 227 } 228 mu.Unlock() 229 230 results <- result{ 231 bundleNum: j.bundleNum, 232 success: err == nil, 233 bytes: bytes, 234 err: err, 235 } 236 } 237 }() 238 } 239 240 // Send jobs with expected hashes 241 for _, num := range bundleNumbers { 242 expectedHash := "" 243 if meta, exists := remoteBundleMap[num]; exists { 244 expectedHash = meta.CompressedHash 245 } 246 jobs <- job{ 247 bundleNum: num, 248 expectedHash: expectedHash, 249 } 250 } 251 close(jobs) 252 253 // Wait for completion 254 go func() { 255 wg.Wait() 256 close(results) 257 }() 258 259 // Collect results 260 for res := range results { 261 if res.err != nil && res.err != context.Canceled { 262 m.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err) 263 } else if res.success && verbose { 264 m.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes) 265 } 266 } 267 268 mu.Lock() 269 successList = success 270 failedList = failed 271 downloadedBytes = processedBytes 272 mu.Unlock() 273 274 return 275} 276 277// updateIndexFromRemote updates local index with metadata from remote index 278func (m *Manager) updateIndexFromRemote(bundleNumbers []int, remoteMeta map[int]*BundleMetadata, verbose bool) error { 279 if len(bundleNumbers) == 0 { 280 return nil 281 } 282 283 // Add/update bundles in local index using remote metadata 284 // Hash verification was already done during download 285 for _, num := range bundleNumbers { 286 if meta, exists := remoteMeta[num]; exists { 287 // Verify the file exists locally 288 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 289 if !m.operations.FileExists(path) { 290 m.logger.Printf("Warning: bundle %06d not found locally, skipping", num) 291 continue 292 } 293 294 // Add to index (no need to re-verify hash - already verified during download) 295 m.index.AddBundle(meta) 296 297 if verbose { 298 m.logger.Printf("Added bundle %06d to index", num) 299 } 300 } 301 } 302 303 // Save index 304 return m.SaveIndex() 305} 306 307// loadRemoteIndex loads an index from a remote URL 308func (m *Manager) loadRemoteIndex(baseURL string) (*Index, error) { 309 indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json" 310 311 client := &http.Client{ 312 Timeout: 30 * time.Second, 313 } 314 315 resp, err := client.Get(indexURL) 316 if err != nil { 317 return nil, fmt.Errorf("failed to download: %w", err) 318 } 319 defer resp.Body.Close() 320 321 if resp.StatusCode != http.StatusOK { 322 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 323 } 324 325 data, err := io.ReadAll(resp.Body) 326 if err != nil { 327 return nil, fmt.Errorf("failed to read response: %w", err) 328 } 329 330 var idx Index 331 if err := json.Unmarshal(data, &idx); err != nil { 332 return nil, fmt.Errorf("failed to parse index: %w", err) 333 } 334 335 return &idx, nil 336} 337 338// downloadBundle downloads a single bundle file and verifies its hash 339func (m *Manager) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) { 340 url := fmt.Sprintf("%s/data/%d", strings.TrimSuffix(baseURL, "/"), bundleNum) 341 filename := fmt.Sprintf("%06d.jsonl.zst", bundleNum) 342 filepath := filepath.Join(m.config.BundleDir, filename) 343 344 // Create request 345 req, err := http.NewRequest("GET", url, nil) 346 if err != nil { 347 return 0, err 348 } 349 350 // Download 351 resp, err := client.Do(req) 352 if err != nil { 353 return 0, err 354 } 355 defer resp.Body.Close() 356 357 if resp.StatusCode != http.StatusOK { 358 body, _ := io.ReadAll(resp.Body) 359 return 0, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) 360 } 361 362 // Write to temp file (atomic write) 363 tempPath := filepath + ".tmp" 364 outFile, err := os.Create(tempPath) 365 if err != nil { 366 return 0, err 367 } 368 369 written, err := io.Copy(outFile, resp.Body) 370 outFile.Close() 371 372 if err != nil { 373 os.Remove(tempPath) 374 return 0, err 375 } 376 377 // Verify hash before committing 378 if expectedHash != "" { 379 valid, actualHash, err := m.operations.VerifyHash(tempPath, expectedHash) 380 if err != nil { 381 os.Remove(tempPath) 382 return 0, fmt.Errorf("hash verification failed: %w", err) 383 } 384 if !valid { 385 os.Remove(tempPath) 386 return 0, fmt.Errorf("hash mismatch: expected %s, got %s", 387 expectedHash[:16]+"...", actualHash[:16]+"...") 388 } 389 } 390 391 // Rename to final location 392 if err := os.Rename(tempPath, filepath); err != nil { 393 os.Remove(tempPath) 394 return 0, err 395 } 396 397 return written, nil 398}