A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package bundle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "os"
10 "path/filepath"
11 "strings"
12 "sync"
13 "time"
14)
15
16// CloneFromRemote clones bundles from a remote HTTP endpoint
17func (m *Manager) CloneFromRemote(ctx context.Context, opts CloneOptions) (*CloneResult, error) {
18 if opts.Workers <= 0 {
19 opts.Workers = 4
20 }
21 if opts.SaveInterval <= 0 {
22 opts.SaveInterval = 5 * time.Second
23 }
24 if opts.Logger == nil {
25 opts.Logger = m.logger
26 }
27
28 result := &CloneResult{}
29 startTime := time.Now()
30
31 // Step 1: Fetch remote index
32 opts.Logger.Printf("Fetching remote index from %s", opts.RemoteURL)
33 remoteIndex, err := m.loadRemoteIndex(opts.RemoteURL)
34 if err != nil {
35 return nil, fmt.Errorf("failed to load remote index: %w", err)
36 }
37
38 remoteBundles := remoteIndex.GetBundles()
39 if len(remoteBundles) == 0 {
40 opts.Logger.Printf("Remote has no bundles")
41 return result, nil
42 }
43
44 result.RemoteBundles = len(remoteBundles)
45 opts.Logger.Printf("Remote has %d bundles", len(remoteBundles))
46
47 // Step 2: Determine which bundles to download
48 localBundleMap := make(map[int]*BundleMetadata)
49 for _, meta := range m.index.GetBundles() {
50 localBundleMap[meta.BundleNumber] = meta
51 }
52
53 // Create map of remote metadata for easy lookup
54 remoteBundleMap := make(map[int]*BundleMetadata)
55 for _, meta := range remoteBundles {
56 remoteBundleMap[meta.BundleNumber] = meta
57 }
58
59 var bundlesToDownload []int
60 var totalBytes int64
61 for _, meta := range remoteBundles {
62 if opts.SkipExisting && localBundleMap[meta.BundleNumber] != nil {
63 result.Skipped++
64 if opts.Verbose {
65 opts.Logger.Printf("Skipping existing bundle %06d", meta.BundleNumber)
66 }
67 continue
68 }
69 bundlesToDownload = append(bundlesToDownload, meta.BundleNumber)
70 totalBytes += meta.CompressedSize
71 }
72
73 if len(bundlesToDownload) == 0 {
74 opts.Logger.Printf("All bundles already exist locally")
75 return result, nil
76 }
77
78 opts.Logger.Printf("Downloading %d bundles (%d bytes)", len(bundlesToDownload), totalBytes)
79
80 // Step 3: Set up periodic index saving (using remote metadata)
81 saveCtx, saveCancel := context.WithCancel(ctx)
82 defer saveCancel()
83
84 var downloadedBundles []int
85 var downloadedMu sync.Mutex
86
87 saveDone := make(chan struct{})
88 go func() {
89 defer close(saveDone)
90 ticker := time.NewTicker(opts.SaveInterval)
91 defer ticker.Stop()
92
93 for {
94 select {
95 case <-saveCtx.Done():
96 return
97 case <-ticker.C:
98 // Save index using remote metadata for downloaded bundles
99 downloadedMu.Lock()
100 bundles := make([]int, len(downloadedBundles))
101 copy(bundles, downloadedBundles)
102 downloadedMu.Unlock()
103
104 if opts.Verbose {
105 opts.Logger.Printf("Periodic save: updating index with %d bundles", len(bundles))
106 }
107 m.updateIndexFromRemote(bundles, remoteBundleMap, false) // silent during periodic save
108 }
109 }
110 }()
111
112 // Step 4: Download bundles concurrently
113 successList, failedList, bytes := m.downloadBundlesConcurrent(
114 ctx,
115 opts.RemoteURL,
116 bundlesToDownload,
117 remoteBundleMap, // Pass the metadata map for hash verification
118 totalBytes,
119 opts.Workers,
120 opts.ProgressFunc,
121 opts.Verbose,
122 &downloadedBundles,
123 &downloadedMu,
124 )
125
126 result.Downloaded = len(successList)
127 result.Failed = len(failedList)
128 result.TotalBytes = bytes
129 result.FailedBundles = failedList
130 result.Interrupted = ctx.Err() != nil
131
132 // Stop periodic saves
133 saveCancel()
134 <-saveDone
135
136 // Step 5: Final index update using remote metadata
137 opts.Logger.Printf("Updating local index...")
138 if err := m.updateIndexFromRemote(successList, remoteBundleMap, opts.Verbose); err != nil {
139 return result, fmt.Errorf("failed to update index: %w", err)
140 }
141
142 result.Duration = time.Since(startTime)
143 return result, nil
144}
145
146// downloadBundlesConcurrent downloads bundles using a worker pool
147func (m *Manager) downloadBundlesConcurrent(
148 ctx context.Context,
149 baseURL string,
150 bundleNumbers []int,
151 remoteBundleMap map[int]*BundleMetadata,
152 totalBytes int64,
153 workers int,
154 progressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64),
155 verbose bool,
156 downloadedBundles *[]int,
157 downloadedMu *sync.Mutex,
158) (successList []int, failedList []int, downloadedBytes int64) {
159
160 type job struct {
161 bundleNum int
162 expectedHash string
163 }
164
165 type result struct {
166 bundleNum int
167 success bool
168 bytes int64
169 err error
170 }
171
172 jobs := make(chan job, len(bundleNumbers))
173 results := make(chan result, len(bundleNumbers))
174
175 // Shared state
176 var (
177 mu sync.Mutex
178 processedCount int
179 processedBytes int64
180 success []int
181 failed []int
182 )
183
184 // Start workers
185 var wg sync.WaitGroup
186 client := &http.Client{
187 Timeout: 120 * time.Second,
188 }
189
190 for w := 0; w < workers; w++ {
191 wg.Add(1)
192 go func() {
193 defer wg.Done()
194 for j := range jobs {
195 // Check cancellation
196 select {
197 case <-ctx.Done():
198 results <- result{
199 bundleNum: j.bundleNum,
200 success: false,
201 err: ctx.Err(),
202 }
203 continue
204 default:
205 }
206
207 // Download bundle with hash verification
208 bytes, err := m.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash)
209
210 // Update progress
211 mu.Lock()
212 processedCount++
213 if err == nil {
214 processedBytes += bytes
215 success = append(success, j.bundleNum)
216 if downloadedMu != nil && downloadedBundles != nil {
217 downloadedMu.Lock()
218 *downloadedBundles = append(*downloadedBundles, j.bundleNum)
219 downloadedMu.Unlock()
220 }
221 } else {
222 failed = append(failed, j.bundleNum)
223 }
224
225 if progressFunc != nil {
226 progressFunc(processedCount, len(bundleNumbers), processedBytes, totalBytes)
227 }
228 mu.Unlock()
229
230 results <- result{
231 bundleNum: j.bundleNum,
232 success: err == nil,
233 bytes: bytes,
234 err: err,
235 }
236 }
237 }()
238 }
239
240 // Send jobs with expected hashes
241 for _, num := range bundleNumbers {
242 expectedHash := ""
243 if meta, exists := remoteBundleMap[num]; exists {
244 expectedHash = meta.CompressedHash
245 }
246 jobs <- job{
247 bundleNum: num,
248 expectedHash: expectedHash,
249 }
250 }
251 close(jobs)
252
253 // Wait for completion
254 go func() {
255 wg.Wait()
256 close(results)
257 }()
258
259 // Collect results
260 for res := range results {
261 if res.err != nil && res.err != context.Canceled {
262 m.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err)
263 } else if res.success && verbose {
264 m.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes)
265 }
266 }
267
268 mu.Lock()
269 successList = success
270 failedList = failed
271 downloadedBytes = processedBytes
272 mu.Unlock()
273
274 return
275}
276
277// updateIndexFromRemote updates local index with metadata from remote index
278func (m *Manager) updateIndexFromRemote(bundleNumbers []int, remoteMeta map[int]*BundleMetadata, verbose bool) error {
279 if len(bundleNumbers) == 0 {
280 return nil
281 }
282
283 // Add/update bundles in local index using remote metadata
284 // Hash verification was already done during download
285 for _, num := range bundleNumbers {
286 if meta, exists := remoteMeta[num]; exists {
287 // Verify the file exists locally
288 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
289 if !m.operations.FileExists(path) {
290 m.logger.Printf("Warning: bundle %06d not found locally, skipping", num)
291 continue
292 }
293
294 // Add to index (no need to re-verify hash - already verified during download)
295 m.index.AddBundle(meta)
296
297 if verbose {
298 m.logger.Printf("Added bundle %06d to index", num)
299 }
300 }
301 }
302
303 // Save index
304 return m.SaveIndex()
305}
306
307// loadRemoteIndex loads an index from a remote URL
308func (m *Manager) loadRemoteIndex(baseURL string) (*Index, error) {
309 indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json"
310
311 client := &http.Client{
312 Timeout: 30 * time.Second,
313 }
314
315 resp, err := client.Get(indexURL)
316 if err != nil {
317 return nil, fmt.Errorf("failed to download: %w", err)
318 }
319 defer resp.Body.Close()
320
321 if resp.StatusCode != http.StatusOK {
322 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
323 }
324
325 data, err := io.ReadAll(resp.Body)
326 if err != nil {
327 return nil, fmt.Errorf("failed to read response: %w", err)
328 }
329
330 var idx Index
331 if err := json.Unmarshal(data, &idx); err != nil {
332 return nil, fmt.Errorf("failed to parse index: %w", err)
333 }
334
335 return &idx, nil
336}
337
338// downloadBundle downloads a single bundle file and verifies its hash
339func (m *Manager) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) {
340 url := fmt.Sprintf("%s/data/%d", strings.TrimSuffix(baseURL, "/"), bundleNum)
341 filename := fmt.Sprintf("%06d.jsonl.zst", bundleNum)
342 filepath := filepath.Join(m.config.BundleDir, filename)
343
344 // Create request
345 req, err := http.NewRequest("GET", url, nil)
346 if err != nil {
347 return 0, err
348 }
349
350 // Download
351 resp, err := client.Do(req)
352 if err != nil {
353 return 0, err
354 }
355 defer resp.Body.Close()
356
357 if resp.StatusCode != http.StatusOK {
358 body, _ := io.ReadAll(resp.Body)
359 return 0, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
360 }
361
362 // Write to temp file (atomic write)
363 tempPath := filepath + ".tmp"
364 outFile, err := os.Create(tempPath)
365 if err != nil {
366 return 0, err
367 }
368
369 written, err := io.Copy(outFile, resp.Body)
370 outFile.Close()
371
372 if err != nil {
373 os.Remove(tempPath)
374 return 0, err
375 }
376
377 // Verify hash before committing
378 if expectedHash != "" {
379 valid, actualHash, err := m.operations.VerifyHash(tempPath, expectedHash)
380 if err != nil {
381 os.Remove(tempPath)
382 return 0, fmt.Errorf("hash verification failed: %w", err)
383 }
384 if !valid {
385 os.Remove(tempPath)
386 return 0, fmt.Errorf("hash mismatch: expected %s, got %s",
387 expectedHash[:16]+"...", actualHash[:16]+"...")
388 }
389 }
390
391 // Rename to final location
392 if err := os.Rename(tempPath, filepath); err != nil {
393 os.Remove(tempPath)
394 return 0, err
395 }
396
397 return written, nil
398}