1package main
2
3import (
4 "archive/tar"
5 "bufio"
6 "compress/gzip"
7 "context"
8 "encoding/json"
9 "fmt"
10 "io"
11 "log"
12 "log/slog"
13 "net/http"
14 "os"
15 "os/signal"
16 "path/filepath"
17 "strings"
18 "sync"
19 "syscall"
20 "time"
21
22 "github.com/bluesky-social/indigo/atproto/data"
23 "github.com/bluesky-social/indigo/repo"
24 "github.com/ipfs/go-cid"
25 _ "github.com/joho/godotenv/autoload"
26 "github.com/prometheus/client_golang/prometheus"
27 "github.com/prometheus/client_golang/prometheus/promauto"
28 "github.com/prometheus/client_golang/prometheus/promhttp"
29
30 "golang.org/x/time/rate"
31
32 "github.com/carlmjohnson/versioninfo"
33 "github.com/urfave/cli/v2"
34)
35
36func main() {
37 app := cli.App{
38 Name: "netsync",
39 Usage: "atproto network cloning tool",
40 Version: versioninfo.Short(),
41 }
42
43 app.Flags = []cli.Flag{
44 &cli.IntFlag{
45 Name: "port",
46 Usage: "listen port for metrics server",
47 Value: 8753,
48 },
49 &cli.IntFlag{
50 Name: "worker-count",
51 Usage: "number of workers to run concurrently",
52 Value: 10,
53 },
54 &cli.Float64Flag{
55 Name: "checkout-limit",
56 Usage: "maximum number of repos per second to checkout",
57 Value: 4,
58 },
59 &cli.StringFlag{
60 Name: "out-dir",
61 Usage: "directory to write cloned repos to",
62 Value: "netsync-out",
63 },
64 &cli.StringFlag{
65 Name: "repo-list",
66 Usage: "path to file containing list of repos to clone",
67 Value: "repos.txt",
68 },
69 &cli.StringFlag{
70 Name: "state-file",
71 Usage: "path to file to write state to",
72 Value: "state.json",
73 },
74 &cli.StringFlag{
75 Name: "checkout-path",
76 Usage: "path to checkout endpoint",
77 Value: "https://bsky.network/xrpc/com.atproto.sync.getRepo",
78 },
79 &cli.StringFlag{
80 Name: "magic-header-key",
81 Usage: "header key to send with checkout request",
82 Value: "",
83 EnvVars: []string{"MAGIC_HEADER_KEY"},
84 },
85 &cli.StringFlag{
86 Name: "magic-header-val",
87 Usage: "header value to send with checkout request",
88 Value: "",
89 EnvVars: []string{"MAGIC_HEADER_VAL"},
90 },
91 }
92
93 app.Commands = []*cli.Command{
94 {
95 Name: "retry",
96 Usage: "requeue failed repos",
97 Action: func(cctx *cli.Context) error {
98 state := &NetsyncState{
99 StatePath: cctx.String("state-file"),
100 }
101
102 err := state.Resume()
103 if err != nil {
104 return err
105 }
106
107 // Look through finished repos for failed ones
108 for _, repoState := range state.FinishedRepos {
109 // Don't retry repos that failed due to a 400 (they've been deleted)
110 if strings.HasPrefix(repoState.State, "failed") && repoState.State != "failed (status: 400)" {
111 state.EnqueuedRepos[repoState.Repo] = &RepoState{
112 Repo: repoState.Repo,
113 State: "enqueued",
114 }
115 }
116 }
117
118 // Save state
119 return state.Save()
120 },
121 },
122 }
123
124 app.Action = Netsync
125
126 err := app.Run(os.Args)
127 if err != nil {
128 log.Fatal(err)
129 }
130}
131
132type RepoState struct {
133 Repo string
134 State string
135 FinishedAt time.Time
136}
137
138type NetsyncState struct {
139 EnqueuedRepos map[string]*RepoState
140 FinishedRepos map[string]*RepoState
141 StatePath string
142 CheckoutPath string
143
144 outDir string
145 magicHeaderKey string
146 magicHeaderVal string
147
148 logger *slog.Logger
149
150 lk sync.RWMutex
151 wg sync.WaitGroup
152 exit chan struct{}
153 limiter *rate.Limiter
154 workerCount int
155 client *http.Client
156}
157
158type instrumentedReader struct {
159 source io.ReadCloser
160 counter prometheus.Counter
161}
162
163func (r instrumentedReader) Read(b []byte) (int, error) {
164 n, err := r.source.Read(b)
165 r.counter.Add(float64(n))
166 return n, err
167}
168
169func (r instrumentedReader) Close() error {
170 var buf [32]byte
171 var n int
172 var err error
173 for err == nil {
174 n, err = r.source.Read(buf[:])
175 r.counter.Add(float64(n))
176 }
177 closeerr := r.source.Close()
178 if err != nil && err != io.EOF {
179 return err
180 }
181 return closeerr
182}
183
184func (s *NetsyncState) Save() error {
185 s.lk.RLock()
186 defer s.lk.RUnlock()
187
188 stateFile, err := os.OpenFile(s.StatePath, os.O_CREATE|os.O_WRONLY, 0644)
189 if err != nil {
190 return err
191 }
192 defer stateFile.Close()
193
194 stateBytes, err := json.Marshal(s)
195 if err != nil {
196 return err
197 }
198
199 _, err = stateFile.Write(stateBytes)
200 return err
201}
202
203func (s *NetsyncState) Resume() error {
204 stateFile, err := os.Open(s.StatePath)
205 if err != nil {
206 return err
207 }
208
209 stateBytes, err := io.ReadAll(stateFile)
210 if err != nil {
211 return err
212 }
213
214 err = json.Unmarshal(stateBytes, s)
215 if err != nil {
216 return err
217 }
218
219 return nil
220}
221
222var enqueuedJobs = promauto.NewGauge(prometheus.GaugeOpts{
223 Name: "netsync_enqueued_jobs",
224 Help: "Number of enqueued jobs",
225})
226
227func (s *NetsyncState) Dequeue() string {
228 s.lk.Lock()
229 defer s.lk.Unlock()
230
231 enqueuedJobs.Set(float64(len(s.EnqueuedRepos)))
232
233 for repo, state := range s.EnqueuedRepos {
234 if state.State == "enqueued" {
235 state.State = "dequeued"
236 return repo
237 }
238 }
239
240 return ""
241}
242
243var finishedJobs = promauto.NewGauge(prometheus.GaugeOpts{
244 Name: "netsync_finished_jobs",
245 Help: "Number of finished jobs",
246})
247
248func (s *NetsyncState) Finish(repo string, state string) {
249 s.lk.Lock()
250 defer s.lk.Unlock()
251
252 s.FinishedRepos[repo] = &RepoState{
253 Repo: repo,
254 State: state,
255 FinishedAt: time.Now(),
256 }
257
258 finishedJobs.Set(float64(len(s.FinishedRepos)))
259
260 delete(s.EnqueuedRepos, repo)
261}
262
263func Netsync(cctx *cli.Context) error {
264 ctx := cctx.Context
265 ctx, cancel := context.WithCancel(ctx)
266 defer cancel()
267
268 logLevel := slog.LevelInfo
269 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: logLevel, AddSource: true}))
270 slog.SetDefault(slog.New(logger.Handler()))
271
272 state := &NetsyncState{
273 StatePath: cctx.String("state-file"),
274 CheckoutPath: cctx.String("checkout-path"),
275
276 outDir: cctx.String("out-dir"),
277 workerCount: cctx.Int("worker-count"),
278 limiter: rate.NewLimiter(rate.Limit(cctx.Float64("checkout-limit")), 1),
279 magicHeaderKey: cctx.String("magic-header-key"),
280 magicHeaderVal: cctx.String("magic-header-val"),
281
282 exit: make(chan struct{}),
283 wg: sync.WaitGroup{},
284 client: &http.Client{
285 Timeout: 180 * time.Second,
286 },
287
288 logger: logger,
289 }
290
291 if state.magicHeaderKey != "" && state.magicHeaderVal != "" {
292 logger.Info("using magic header")
293 }
294
295 // Create out dir
296 err := os.MkdirAll(state.outDir, 0755)
297 if err != nil {
298 return err
299 }
300
301 // Try to resume from state file
302 err = state.Resume()
303 if state.EnqueuedRepos == nil {
304 state.EnqueuedRepos = make(map[string]*RepoState)
305 } else {
306 // Reset any dequeued repos
307 for _, repoState := range state.EnqueuedRepos {
308 if repoState.State == "dequeued" {
309 repoState.State = "enqueued"
310 }
311 }
312 }
313
314 if state.FinishedRepos == nil {
315 state.FinishedRepos = make(map[string]*RepoState)
316 }
317
318 if err != nil {
319 // Read repo list
320 repoListFile, err := os.Open(cctx.String("repo-list"))
321 if err != nil {
322 return err
323 }
324
325 fileScanner := bufio.NewScanner(repoListFile)
326 fileScanner.Split(bufio.ScanLines)
327
328 for fileScanner.Scan() {
329 repo := fileScanner.Text()
330 state.EnqueuedRepos[repo] = &RepoState{
331 Repo: repo,
332 State: "enqueued",
333 }
334 }
335 } else {
336 logger.Info("Resuming from state file")
337 }
338
339 // Start metrics server
340 mux := http.NewServeMux()
341 mux.Handle("/metrics", promhttp.Handler())
342
343 metricsServer := &http.Server{
344 Addr: fmt.Sprintf(":%d", cctx.Int("port")),
345 Handler: mux,
346 }
347
348 state.wg.Add(1)
349 go func() {
350 defer state.wg.Done()
351 if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed {
352 logger.Error("failed to start metrics server", "err", err)
353 os.Exit(1)
354 }
355 logger.Info("metrics server shut down successfully")
356 }()
357
358 // Start workers
359 for i := 0; i < state.workerCount; i++ {
360 state.wg.Add(1)
361 go func(id int) {
362 defer state.wg.Done()
363 err := state.worker(id)
364 if err != nil {
365 logger.Error("worker failed", "err", err)
366 }
367 }(i)
368 }
369
370 // Check for empty queue
371 state.wg.Add(1)
372 go func() {
373 defer state.wg.Done()
374 t := time.NewTicker(30 * time.Second)
375 for {
376 select {
377 case <-ctx.Done():
378 err := state.Save()
379 if err != nil {
380 logger.Error("failed to save state", "err", err)
381 }
382 return
383 case <-t.C:
384 err := state.Save()
385 if err != nil {
386 logger.Error("failed to save state", "err", err)
387 }
388 state.lk.RLock()
389 if len(state.EnqueuedRepos) == 0 {
390 logger.Info("no more repos to clone, shutting down")
391 close(state.exit)
392 return
393 }
394 state.lk.RUnlock()
395 }
396 }
397 }()
398
399 // Trap SIGINT to trigger a shutdown.
400 logger.Info("listening for signals")
401 signals := make(chan os.Signal, 1)
402 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
403
404 select {
405 case sig := <-signals:
406 cancel()
407 close(state.exit)
408 logger.Info("shutting down on signal", "signal", sig)
409 case <-ctx.Done():
410 cancel()
411 close(state.exit)
412 logger.Info("shutting down on context done")
413 case <-state.exit:
414 cancel()
415 logger.Info("shutting down on exit signal")
416 }
417
418 logger.Info("shutting down, waiting for workers to clean up...")
419
420 if err := metricsServer.Shutdown(ctx); err != nil {
421 logger.Error("failed to shut down metrics server", "err", err)
422 }
423
424 state.wg.Wait()
425
426 logger.Info("shut down successfully")
427
428 return nil
429
430}
431
432func (s *NetsyncState) worker(id int) error {
433 log := s.logger.With("worker", id)
434 log.Info("starting worker")
435 defer log.Info("worker stopped")
436 for {
437 select {
438 case <-s.exit:
439 log.Info("worker exiting due to exit signal")
440 return nil
441 default:
442 ctx := context.Background()
443 // Dequeue repo
444 repo := s.Dequeue()
445 if repo == "" {
446 // No more repos to clone
447 return nil
448 }
449
450 // Wait for rate limiter
451 s.limiter.Wait(ctx)
452
453 // Clone repo
454 cloneState, err := s.cloneRepo(ctx, repo)
455 if err != nil {
456 log.Error("failed to clone repo", "repo", repo, "err", err)
457 }
458
459 // Update state
460 s.Finish(repo, cloneState)
461 log.Info("worker finished", "repo", repo, "status", cloneState)
462 }
463 }
464}
465
466var repoCloneDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
467 Name: "netsync_repo_clone_duration_seconds",
468 Help: "Duration of repo clone operations",
469}, []string{"status"})
470
471var bytesProcessed = promauto.NewCounter(prometheus.CounterOpts{
472 Name: "netsync_bytes_processed",
473 Help: "Number of bytes processed",
474})
475
476func (s *NetsyncState) cloneRepo(ctx context.Context, did string) (cloneState string, err error) {
477 log := s.logger.With("repo", did, "source", "cloneRepo")
478 log.Info("cloning repo")
479
480 start := time.Now()
481 defer func() {
482 duration := time.Since(start)
483 repoCloneDuration.WithLabelValues(cloneState).Observe(duration.Seconds())
484 }()
485
486 var url = fmt.Sprintf("%s?did=%s", s.CheckoutPath, did)
487
488 // Clone repo
489 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
490 if err != nil {
491 cloneState = "failed (request-creation)"
492 return cloneState, fmt.Errorf("failed to create request: %w", err)
493 }
494
495 req.Header.Set("Accept", "application/vnd.ipld.car")
496 req.Header.Set("User-Agent", "jaz-atproto-netsync/0.0.1")
497 if s.magicHeaderKey != "" && s.magicHeaderVal != "" {
498 req.Header.Set(s.magicHeaderKey, s.magicHeaderVal)
499 }
500
501 resp, err := s.client.Do(req)
502 if err != nil {
503 cloneState = "failed (client.do)"
504 return cloneState, fmt.Errorf("failed to get repo: %w", err)
505 }
506
507 if resp.StatusCode != http.StatusOK {
508 cloneState = fmt.Sprintf("failed (status: %d)", resp.StatusCode)
509 return cloneState, fmt.Errorf("failed to get repo: %s", resp.Status)
510 }
511
512 instrumentedReader := instrumentedReader{
513 source: resp.Body,
514 counter: bytesProcessed,
515 }
516 defer instrumentedReader.Close()
517
518 // Write to file
519 outPath, err := filepath.Abs(fmt.Sprintf("%s/%s.tar.gz", s.outDir, did))
520 if err != nil {
521 cloneState = "failed (file.abs)"
522 return cloneState, fmt.Errorf("failed to get absolute path: %w", err)
523 }
524
525 tarFile, err := os.OpenFile(outPath, os.O_CREATE|os.O_WRONLY, 0644)
526 if err != nil {
527 cloneState = "failed (file.open)"
528 return cloneState, fmt.Errorf("failed to open file: %w", err)
529 }
530 defer tarFile.Close()
531
532 gzipWriter := gzip.NewWriter(tarFile)
533 defer gzipWriter.Close()
534
535 tarWriter := tar.NewWriter(gzipWriter)
536 defer tarWriter.Close()
537
538 numRecords := 0
539 collectionsSeen := make(map[string]struct{})
540
541 r, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
542 if err != nil {
543 log.Error("Error reading repo", "err", err)
544 return "failed (read-repo)", fmt.Errorf("Failed to read repo from CAR: %w", err)
545 }
546
547 err = r.ForEach(ctx, "", func(path string, nodeCid cid.Cid) error {
548 log := log.With("path", path, "nodeCid", nodeCid)
549
550 recordCid, rec, err := r.GetRecordBytes(ctx, path)
551 if err != nil {
552 log.Error("Error getting record", "err", err)
553 return nil
554 }
555
556 // Verify that the record CID matches the node CID
557 if recordCid != nodeCid {
558 log.Error("Mismatch in record and node CID", "recordCID", recordCid, "nodeCID", nodeCid)
559 return nil
560 }
561
562 parts := strings.Split(path, "/")
563 if len(parts) != 2 {
564 log.Error("Path does not have 2 parts", "path", path)
565 return nil
566 }
567
568 collection := parts[0]
569 rkey := parts[1]
570
571 numRecords++
572 if _, ok := collectionsSeen[collection]; !ok {
573 collectionsSeen[collection] = struct{}{}
574 }
575
576 asCbor, err := data.UnmarshalCBOR(*rec)
577 if err != nil {
578 log.Error("Error unmarshalling record", "err", err)
579 return fmt.Errorf("Failed to unmarshal record: %w", err)
580 }
581
582 recJSON, err := json.Marshal(asCbor)
583 if err != nil {
584 log.Error("Error marshalling record to JSON", "err", err)
585 return fmt.Errorf("Failed to marshal record to JSON: %w", err)
586 }
587
588 // Write the record directly to the tar.gz file
589 hdr := &tar.Header{
590 Name: fmt.Sprintf("%s/%s.json", collection, rkey),
591 Mode: 0600,
592 Size: int64(len(recJSON)),
593 }
594 if err := tarWriter.WriteHeader(hdr); err != nil {
595 log.Error("Error writing tar header", "err", err)
596 return err
597 }
598 if _, err := tarWriter.Write(recJSON); err != nil {
599 log.Error("Error writing record to tar file", "err", err)
600 return err
601 }
602
603 return nil
604 })
605 if err != nil {
606 log.Error("Error during ForEach", "err", err)
607 return "failed (for-each)", fmt.Errorf("Error during ForEach: %w", err)
608 }
609
610 log.Info("checkout complete", "numRecords", numRecords, "numCollections", len(collectionsSeen))
611
612 cloneState = "success"
613 return cloneState, nil
614}