A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package bundle
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "net/http"
8 "os"
9 "path/filepath"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/goccy/go-json"
15)
16
17// CloneFromRemote clones bundles from a remote HTTP endpoint
18func (m *Manager) CloneFromRemote(ctx context.Context, opts CloneOptions) (*CloneResult, error) {
19 if opts.Workers <= 0 {
20 opts.Workers = 4
21 }
22 if opts.SaveInterval <= 0 {
23 opts.SaveInterval = 5 * time.Second
24 }
25 if opts.Logger == nil {
26 opts.Logger = m.logger
27 }
28
29 result := &CloneResult{}
30 startTime := time.Now()
31
32 // Step 1: Fetch remote index
33 opts.Logger.Printf("Fetching remote index from %s", opts.RemoteURL)
34 remoteIndex, err := m.loadRemoteIndex(opts.RemoteURL)
35 if err != nil {
36 return nil, fmt.Errorf("failed to load remote index: %w", err)
37 }
38
39 remoteBundles := remoteIndex.GetBundles()
40 if len(remoteBundles) == 0 {
41 opts.Logger.Printf("Remote has no bundles")
42 return result, nil
43 }
44
45 result.RemoteBundles = len(remoteBundles)
46 opts.Logger.Printf("Remote has %d bundles", len(remoteBundles))
47
48 // Step 2: Determine which bundles to download
49 localBundleMap := make(map[int]*BundleMetadata)
50 for _, meta := range m.index.GetBundles() {
51 localBundleMap[meta.BundleNumber] = meta
52 }
53
54 // Create map of remote metadata for easy lookup
55 remoteBundleMap := make(map[int]*BundleMetadata)
56 for _, meta := range remoteBundles {
57 remoteBundleMap[meta.BundleNumber] = meta
58 }
59
60 var bundlesToDownload []int
61 var totalBytes int64
62 for _, meta := range remoteBundles {
63 if opts.SkipExisting && localBundleMap[meta.BundleNumber] != nil {
64 result.Skipped++
65 if opts.Verbose {
66 opts.Logger.Printf("Skipping existing bundle %06d", meta.BundleNumber)
67 }
68 continue
69 }
70 bundlesToDownload = append(bundlesToDownload, meta.BundleNumber)
71 totalBytes += meta.CompressedSize
72 }
73
74 if len(bundlesToDownload) == 0 {
75 opts.Logger.Printf("All bundles already exist locally")
76 return result, nil
77 }
78
79 opts.Logger.Printf("Downloading %d bundles (%d bytes)", len(bundlesToDownload), totalBytes)
80
81 // Step 3: Set up periodic index saving (using remote metadata)
82 saveCtx, saveCancel := context.WithCancel(ctx)
83 defer saveCancel()
84
85 var downloadedBundles []int
86 var downloadedMu sync.Mutex
87
88 saveDone := make(chan struct{})
89 go func() {
90 defer close(saveDone)
91 ticker := time.NewTicker(opts.SaveInterval)
92 defer ticker.Stop()
93
94 for {
95 select {
96 case <-saveCtx.Done():
97 return
98 case <-ticker.C:
99 // Save index using remote metadata for downloaded bundles
100 downloadedMu.Lock()
101 bundles := make([]int, len(downloadedBundles))
102 copy(bundles, downloadedBundles)
103 downloadedMu.Unlock()
104
105 if opts.Verbose {
106 opts.Logger.Printf("Periodic save: updating index with %d bundles", len(bundles))
107 }
108 m.updateIndexFromRemote(bundles, remoteBundleMap, false) // silent during periodic save
109 }
110 }
111 }()
112
113 // Step 4: Download bundles concurrently
114 successList, failedList, bytes := m.downloadBundlesConcurrent(
115 ctx,
116 opts.RemoteURL,
117 bundlesToDownload,
118 remoteBundleMap, // Pass the metadata map for hash verification
119 totalBytes,
120 opts.Workers,
121 opts.ProgressFunc,
122 opts.Verbose,
123 &downloadedBundles,
124 &downloadedMu,
125 )
126
127 result.Downloaded = len(successList)
128 result.Failed = len(failedList)
129 result.TotalBytes = bytes
130 result.FailedBundles = failedList
131 result.Interrupted = ctx.Err() != nil
132
133 // Stop periodic saves
134 saveCancel()
135 <-saveDone
136
137 // Step 5: Final index update using remote metadata
138 opts.Logger.Printf("Updating local index...")
139 if err := m.updateIndexFromRemote(successList, remoteBundleMap, opts.Verbose); err != nil {
140 return result, fmt.Errorf("failed to update index: %w", err)
141 }
142
143 result.Duration = time.Since(startTime)
144 return result, nil
145}
146
147// downloadBundlesConcurrent downloads bundles using a worker pool
148func (m *Manager) downloadBundlesConcurrent(
149 ctx context.Context,
150 baseURL string,
151 bundleNumbers []int,
152 remoteBundleMap map[int]*BundleMetadata,
153 totalBytes int64,
154 workers int,
155 progressFunc func(downloaded, total int, bytesDownloaded, bytesTotal int64),
156 verbose bool,
157 downloadedBundles *[]int,
158 downloadedMu *sync.Mutex,
159) (successList []int, failedList []int, downloadedBytes int64) {
160
161 type job struct {
162 bundleNum int
163 expectedHash string
164 }
165
166 type result struct {
167 bundleNum int
168 success bool
169 bytes int64
170 err error
171 }
172
173 jobs := make(chan job, len(bundleNumbers))
174 results := make(chan result, len(bundleNumbers))
175
176 // Shared state
177 var (
178 mu sync.Mutex
179 processedCount int
180 processedBytes int64
181 success []int
182 failed []int
183 )
184
185 // Start workers
186 var wg sync.WaitGroup
187 client := &http.Client{
188 Timeout: 120 * time.Second,
189 }
190
191 for w := 0; w < workers; w++ {
192 wg.Add(1)
193 go func() {
194 defer wg.Done()
195 for j := range jobs {
196 // Check cancellation
197 select {
198 case <-ctx.Done():
199 results <- result{
200 bundleNum: j.bundleNum,
201 success: false,
202 err: ctx.Err(),
203 }
204 continue
205 default:
206 }
207
208 // Download bundle with hash verification
209 bytes, err := m.downloadBundle(client, baseURL, j.bundleNum, j.expectedHash)
210
211 // Update progress
212 mu.Lock()
213 processedCount++
214 if err == nil {
215 processedBytes += bytes
216 success = append(success, j.bundleNum)
217 if downloadedMu != nil && downloadedBundles != nil {
218 downloadedMu.Lock()
219 *downloadedBundles = append(*downloadedBundles, j.bundleNum)
220 downloadedMu.Unlock()
221 }
222 } else {
223 failed = append(failed, j.bundleNum)
224 }
225
226 if progressFunc != nil {
227 progressFunc(processedCount, len(bundleNumbers), processedBytes, totalBytes)
228 }
229 mu.Unlock()
230
231 results <- result{
232 bundleNum: j.bundleNum,
233 success: err == nil,
234 bytes: bytes,
235 err: err,
236 }
237 }
238 }()
239 }
240
241 // Send jobs with expected hashes
242 for _, num := range bundleNumbers {
243 expectedHash := ""
244 if meta, exists := remoteBundleMap[num]; exists {
245 expectedHash = meta.CompressedHash
246 }
247 jobs <- job{
248 bundleNum: num,
249 expectedHash: expectedHash,
250 }
251 }
252 close(jobs)
253
254 // Wait for completion
255 go func() {
256 wg.Wait()
257 close(results)
258 }()
259
260 // Collect results
261 for res := range results {
262 if res.err != nil && res.err != context.Canceled {
263 m.logger.Printf("Failed to download bundle %06d: %v", res.bundleNum, res.err)
264 } else if res.success && verbose {
265 m.logger.Printf("✓ Downloaded and verified bundle %06d (%d bytes)", res.bundleNum, res.bytes)
266 }
267 }
268
269 mu.Lock()
270 successList = success
271 failedList = failed
272 downloadedBytes = processedBytes
273 mu.Unlock()
274
275 return
276}
277
278// updateIndexFromRemote updates local index with metadata from remote index
279func (m *Manager) updateIndexFromRemote(bundleNumbers []int, remoteMeta map[int]*BundleMetadata, verbose bool) error {
280 if len(bundleNumbers) == 0 {
281 return nil
282 }
283
284 // Add/update bundles in local index using remote metadata
285 // Hash verification was already done during download
286 for _, num := range bundleNumbers {
287 if meta, exists := remoteMeta[num]; exists {
288 // Verify the file exists locally
289 path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num))
290 if !m.operations.FileExists(path) {
291 m.logger.Printf("Warning: bundle %06d not found locally, skipping", num)
292 continue
293 }
294
295 // Add to index (no need to re-verify hash - already verified during download)
296 m.index.AddBundle(meta)
297
298 if verbose {
299 m.logger.Printf("Added bundle %06d to index", num)
300 }
301 }
302 }
303
304 // Save index
305 return m.SaveIndex()
306}
307
308// loadRemoteIndex loads an index from a remote URL
309func (m *Manager) loadRemoteIndex(baseURL string) (*Index, error) {
310 indexURL := strings.TrimSuffix(baseURL, "/") + "/index.json"
311
312 client := &http.Client{
313 Timeout: 30 * time.Second,
314 }
315
316 resp, err := client.Get(indexURL)
317 if err != nil {
318 return nil, fmt.Errorf("failed to download: %w", err)
319 }
320 defer resp.Body.Close()
321
322 if resp.StatusCode != http.StatusOK {
323 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
324 }
325
326 data, err := io.ReadAll(resp.Body)
327 if err != nil {
328 return nil, fmt.Errorf("failed to read response: %w", err)
329 }
330
331 var idx Index
332 if err := json.Unmarshal(data, &idx); err != nil {
333 return nil, fmt.Errorf("failed to parse index: %w", err)
334 }
335
336 return &idx, nil
337}
338
339// downloadBundle downloads a single bundle file and verifies its hash
340func (m *Manager) downloadBundle(client *http.Client, baseURL string, bundleNum int, expectedHash string) (int64, error) {
341 url := fmt.Sprintf("%s/data/%d", strings.TrimSuffix(baseURL, "/"), bundleNum)
342 filename := fmt.Sprintf("%06d.jsonl.zst", bundleNum)
343 filepath := filepath.Join(m.config.BundleDir, filename)
344
345 // Create request
346 req, err := http.NewRequest("GET", url, nil)
347 if err != nil {
348 return 0, err
349 }
350
351 // Download
352 resp, err := client.Do(req)
353 if err != nil {
354 return 0, err
355 }
356 defer resp.Body.Close()
357
358 if resp.StatusCode != http.StatusOK {
359 body, _ := io.ReadAll(resp.Body)
360 return 0, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
361 }
362
363 // Write to temp file (atomic write)
364 tempPath := filepath + ".tmp"
365 outFile, err := os.Create(tempPath)
366 if err != nil {
367 return 0, err
368 }
369
370 written, err := io.Copy(outFile, resp.Body)
371 outFile.Close()
372
373 if err != nil {
374 os.Remove(tempPath)
375 return 0, err
376 }
377
378 // Verify hash before committing
379 if expectedHash != "" {
380 valid, actualHash, err := m.operations.VerifyHash(tempPath, expectedHash)
381 if err != nil {
382 os.Remove(tempPath)
383 return 0, fmt.Errorf("hash verification failed: %w", err)
384 }
385 if !valid {
386 os.Remove(tempPath)
387 return 0, fmt.Errorf("hash mismatch: expected %s, got %s",
388 expectedHash[:16]+"...", actualHash[:16]+"...")
389 }
390 }
391
392 // Rename to final location
393 if err := os.Rename(tempPath, filepath); err != nil {
394 os.Remove(tempPath)
395 return 0, err
396 }
397
398 return written, nil
399}