A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at did-resolver 399 lines 9.8 kB view raw
1package bundle 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "net/http" 8 "os" 9 "path/filepath" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/goccy/go-json" 15) 16 17// CloneFromRemote clones bundles from a remote HTTP endpoint 18func (m *Manager) CloneFromRemote(ctx context.Context, opts CloneOptions) (*CloneResult, error) { 19 if opts.Workers <= 0 { 20 opts.Workers = 4 21 } 22 if opts.SaveInterval <= 0 { 23 opts.SaveInterval = 5 * time.Second 24 } 25 if opts.Logger == nil { 26 opts.Logger = m.logger 27 } 28 29 result := &CloneResult{} 30 startTime := time.Now() 31 32 // Step 1: Fetch remote index 33 opts.Logger.Printf("Fetching remote index from %s", opts.RemoteURL) 34 remoteIndex, err := m.loadRemoteIndex(opts.RemoteURL) 35 if err != nil { 36 return nil, fmt.Errorf("failed to load remote index: %w", err) 37 } 38 39 remoteBundles := remoteIndex.GetBundles() 40 if len(remoteBundles) == 0 { 41 opts.Logger.Printf("Remote has no bundles") 42 return result, nil 43 } 44 45 result.RemoteBundles = len(remoteBundles) 46 opts.Logger.Printf("Remote has %d bundles", len(remoteBundles)) 47 48 // Step 2: Determine which bundles to download 49 localBundleMap := make(map[int]*BundleMetadata) 50 for _, meta := range m.index.GetBundles() { 51 localBundleMap[meta.BundleNumber] = meta 52 } 53 54 // Create map of remote metadata for easy lookup 55 remoteBundleMap := make(map[int]*BundleMetadata) 56 for _, meta := range remoteBundles { 57 remoteBundleMap[meta.BundleNumber] = meta 58 } 59 60 var bundlesToDownload []int 61 var totalBytes int64 62 for _, meta := range remoteBundles { 63 if opts.SkipExisting && localBundleMap[meta.BundleNumber] != nil { 64 result.Skipped++ 65 if opts.Verbose { 66 opts.Logger.Printf("Skipping existing bundle %06d", meta.BundleNumber) 67 } 68 continue 69 } 70 bundlesToDownload = append(bundlesToDownload, meta.BundleNumber) 71 totalBytes += meta.CompressedSize 72 } 73 74 if len(bundlesToDownload) == 0 { 75 opts.Logger.Printf("All bundles already exist locally") 76 return result, nil 77 } 78 79 opts.Logger.Printf("Downloading %d bundles (%d bytes)", len(bundlesToDownload), totalBytes) 80 81 // Step 3: Set up periodic index saving (using remote metadata) 82 saveCtx, saveCancel := context.WithCancel(ctx) 83 defer saveCancel() 84 85 var downloadedBundles []int 86 var downloadedMu sync.Mutex 87 88 saveDone := make(chan struct{}) 89 go func() { 90 defer close(saveDone) 91 ticker := time.NewTicker(opts.SaveInterval) 92 defer ticker.Stop() 93 94 for { 95 select { 96 case <-saveCtx.Done(): 97 return 98 case <-ticker.C: 99 // Save index using remote metadata for downloaded bundles 100 downloadedMu.Lock() 101 bundles := make([]int, len(downloadedBundles)) 102 copy(bundles, downloadedBundles) 103 downloadedMu.Unlock() 104 105 if opts.Verbose { 106 opts.Logger.Printf("Periodic save: updating index with %d bundles", len(bundles)) 107 } 108 m.updateIndexFromRemote(bundles, remoteBundleMap, false) // silent during periodic save 109 } 110 } 111 }() 112 113 // Step 4: Download bundles concurrently 114 successList, failedList, bytes := m.downloadBundlesConcurrent( 115 ctx, 116 opts.RemoteURL, 117 bundlesToDownload, 118 remoteBundleMap, // Pass the metadata map for hash verification 119 totalBytes, 120 opts.Workers, 121 opts.ProgressFunc, 122 opts.Verbose, 123 &downloadedBundles, 124 &downloadedMu, 125 ) 126 127 result.Downloaded = len(successList) 128 result.Failed = len(failedList) 129 result.TotalBytes = bytes 130 result.FailedBundles = failedList 131 result.Interrupted = ctx.Err() != nil 132 133 // Stop periodic saves 134 saveCancel() 135 <-saveDone 136 137 // Step 5: Final index update using remote metadata 138 opts.Logger.Printf("Updating local index...") 139 if err := m.updateIndexFromRemote(successList, remoteBundleMap, opts.Verbose); err != nil { 140 return result, fmt.Errorf("failed to update index: %w", err) 141 } 142 143 result.Duration = time.Since(startTime) 144 return result, nil 145} 146 147// downloadBundlesConcurrent downloads bundles using a worker pool 148func (m *Manager) downloadBundlesConcurrent( 149 ctx context.Context, 150 baseURL string, 151 bundleNumbers []int, 152 remoteBundleMap map[int]*BundleMetadata, 153 totalBytes int64, 154 workers int, 155 progressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64), 156 verbose bool, 157 downloadedBundles *[]int, 158 downloadedMu *sync.Mutex, 159) (successList []int, failedList []int, downloadedBytes int64) { 160 161 type job struct { 162 bundleNum int 163 expectedHash string 164 } 165 166 type result struct { 167 bundleNum int 168 success bool 169 bytes int64 170 err error 171 } 172 173 jobs := make(chan job, len(bundleNumbers)) 174 results := make(chan result, len(bundleNumbers)) 175 176 // Shared state 177 var ( 178 mu sync.Mutex 179 processedCount int 180 processedBytes int64 181 success []int 182 failed []int 183 ) 184 185 // Start workers 186 var wg sync.WaitGroup 187 client := &http.Client{ 188 Timeout: 120 * time.Second, 189 } 190 191 for w := 0; w < workers; w++ { 192 wg.Add(1) 193 go func() { 194 defer wg.Done() 195 for j := range jobs { 196 // Check cancellation 197 select { 198 case <-ctx.Done(): 199 results <- result{ 200 bundleNum: j.bundleNum, 201 success: false, 202 err: ctx.Err(), 203 } 204 continue 205 default: 206 } 207 208 // Download bundle with hash verification 209 bytes, err := m.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash) 210 211 // Update progress 212 mu.Lock() 213 processedCount++ 214 if err == nil { 215 processedBytes += bytes 216 success = append(success, j.bundleNum) 217 if downloadedMu != nil && downloadedBundles != nil { 218 downloadedMu.Lock() 219 *downloadedBundles = append(*downloadedBundles, j.bundleNum) 220 downloadedMu.Unlock() 221 } 222 } else { 223 failed = append(failed, j.bundleNum) 224 } 225 226 if progressFunc != nil { 227 progressFunc(processedCount, len(bundleNumbers), processedBytes, totalBytes) 228 } 229 mu.Unlock() 230 231 results <- result{ 232 bundleNum: j.bundleNum, 233 success: err == nil, 234 bytes: bytes, 235 err: err, 236 } 237 } 238 }() 239 } 240 241 // Send jobs with expected hashes 242 for _, num := range bundleNumbers { 243 expectedHash := "" 244 if meta, exists := remoteBundleMap[num]; exists { 245 expectedHash = meta.CompressedHash 246 } 247 jobs <- job{ 248 bundleNum: num, 249 expectedHash: expectedHash, 250 } 251 } 252 close(jobs) 253 254 // Wait for completion 255 go func() { 256 wg.Wait() 257 close(results) 258 }() 259 260 // Collect results 261 for res := range results { 262 if res.err != nil && res.err != context.Canceled { 263 m.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err) 264 } else if res.success && verbose { 265 m.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes) 266 } 267 } 268 269 mu.Lock() 270 successList = success 271 failedList = failed 272 downloadedBytes = processedBytes 273 mu.Unlock() 274 275 return 276} 277 278// updateIndexFromRemote updates local index with metadata from remote index 279func (m *Manager) updateIndexFromRemote(bundleNumbers []int, remoteMeta map[int]*BundleMetadata, verbose bool) error { 280 if len(bundleNumbers) == 0 { 281 return nil 282 } 283 284 // Add/update bundles in local index using remote metadata 285 // Hash verification was already done during download 286 for _, num := range bundleNumbers { 287 if meta, exists := remoteMeta[num]; exists { 288 // Verify the file exists locally 289 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 290 if !m.operations.FileExists(path) { 291 m.logger.Printf("Warning: bundle %06d not found locally, skipping", num) 292 continue 293 } 294 295 // Add to index (no need to re-verify hash - already verified during download) 296 m.index.AddBundle(meta) 297 298 if verbose { 299 m.logger.Printf("Added bundle %06d to index", num) 300 } 301 } 302 } 303 304 // Save index 305 return m.SaveIndex() 306} 307 308// loadRemoteIndex loads an index from a remote URL 309func (m *Manager) loadRemoteIndex(baseURL string) (*Index, error) { 310 indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json" 311 312 client := &http.Client{ 313 Timeout: 30 * time.Second, 314 } 315 316 resp, err := client.Get(indexURL) 317 if err != nil { 318 return nil, fmt.Errorf("failed to download: %w", err) 319 } 320 defer resp.Body.Close() 321 322 if resp.StatusCode != http.StatusOK { 323 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 324 } 325 326 data, err := io.ReadAll(resp.Body) 327 if err != nil { 328 return nil, fmt.Errorf("failed to read response: %w", err) 329 } 330 331 var idx Index 332 if err := json.Unmarshal(data, &idx); err != nil { 333 return nil, fmt.Errorf("failed to parse index: %w", err) 334 } 335 336 return &idx, nil 337} 338 339// downloadBundle downloads a single bundle file and verifies its hash 340func (m *Manager) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) { 341 url := fmt.Sprintf("%s/data/%d", strings.TrimSuffix(baseURL, "/"), bundleNum) 342 filename := fmt.Sprintf("%06d.jsonl.zst", bundleNum) 343 filepath := filepath.Join(m.config.BundleDir, filename) 344 345 // Create request 346 req, err := http.NewRequest("GET", url, nil) 347 if err != nil { 348 return 0, err 349 } 350 351 // Download 352 resp, err := client.Do(req) 353 if err != nil { 354 return 0, err 355 } 356 defer resp.Body.Close() 357 358 if resp.StatusCode != http.StatusOK { 359 body, _ := io.ReadAll(resp.Body) 360 return 0, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) 361 } 362 363 // Write to temp file (atomic write) 364 tempPath := filepath + ".tmp" 365 outFile, err := os.Create(tempPath) 366 if err != nil { 367 return 0, err 368 } 369 370 written, err := io.Copy(outFile, resp.Body) 371 outFile.Close() 372 373 if err != nil { 374 os.Remove(tempPath) 375 return 0, err 376 } 377 378 // Verify hash before committing 379 if expectedHash != "" { 380 valid, actualHash, err := m.operations.VerifyHash(tempPath, expectedHash) 381 if err != nil { 382 os.Remove(tempPath) 383 return 0, fmt.Errorf("hash verification failed: %w", err) 384 } 385 if !valid { 386 os.Remove(tempPath) 387 return 0, fmt.Errorf("hash mismatch: expected %s, got %s", 388 expectedHash[:16]+"...", actualHash[:16]+"...") 389 } 390 } 391 392 // Rename to final location 393 if err := os.Rename(tempPath, filepath); err != nil { 394 os.Remove(tempPath) 395 return 0, err 396 } 397 398 return written, nil 399}