fork of indigo with slightly nicer lexgen
at main 14 kB view raw
1package main 2 3import ( 4 "archive/tar" 5 "bufio" 6 "compress/gzip" 7 "context" 8 "encoding/json" 9 "fmt" 10 "io" 11 "log" 12 "log/slog" 13 "net/http" 14 "os" 15 "os/signal" 16 "path/filepath" 17 "strings" 18 "sync" 19 "syscall" 20 "time" 21 22 "github.com/bluesky-social/indigo/atproto/data" 23 "github.com/bluesky-social/indigo/repo" 24 "github.com/ipfs/go-cid" 25 _ "github.com/joho/godotenv/autoload" 26 "github.com/prometheus/client_golang/prometheus" 27 "github.com/prometheus/client_golang/prometheus/promauto" 28 "github.com/prometheus/client_golang/prometheus/promhttp" 29 30 "golang.org/x/time/rate" 31 32 "github.com/carlmjohnson/versioninfo" 33 "github.com/urfave/cli/v2" 34) 35 36func main() { 37 app := cli.App{ 38 Name: "netsync", 39 Usage: "atproto network cloning tool", 40 Version: versioninfo.Short(), 41 } 42 43 app.Flags = []cli.Flag{ 44 &cli.IntFlag{ 45 Name: "port", 46 Usage: "listen port for metrics server", 47 Value: 8753, 48 }, 49 &cli.IntFlag{ 50 Name: "worker-count", 51 Usage: "number of workers to run concurrently", 52 Value: 10, 53 }, 54 &cli.Float64Flag{ 55 Name: "checkout-limit", 56 Usage: "maximum number of repos per second to checkout", 57 Value: 4, 58 }, 59 &cli.StringFlag{ 60 Name: "out-dir", 61 Usage: "directory to write cloned repos to", 62 Value: "netsync-out", 63 }, 64 &cli.StringFlag{ 65 Name: "repo-list", 66 Usage: "path to file containing list of repos to clone", 67 Value: "repos.txt", 68 }, 69 &cli.StringFlag{ 70 Name: "state-file", 71 Usage: "path to file to write state to", 72 Value: "state.json", 73 }, 74 &cli.StringFlag{ 75 Name: "checkout-path", 76 Usage: "path to checkout endpoint", 77 Value: "https://bsky.network/xrpc/com.atproto.sync.getRepo", 78 }, 79 &cli.StringFlag{ 80 Name: "magic-header-key", 81 Usage: "header key to send with checkout request", 82 Value: "", 83 EnvVars: []string{"MAGIC_HEADER_KEY"}, 84 }, 85 &cli.StringFlag{ 86 Name: "magic-header-val", 87 Usage: "header value to send with checkout request", 88 Value: "", 89 EnvVars: []string{"MAGIC_HEADER_VAL"}, 90 }, 91 } 92 93 app.Commands = []*cli.Command{ 94 { 95 Name: "retry", 96 Usage: "requeue failed repos", 97 Action: func(cctx *cli.Context) error { 98 state := &NetsyncState{ 99 StatePath: cctx.String("state-file"), 100 } 101 102 err := state.Resume() 103 if err != nil { 104 return err 105 } 106 107 // Look through finished repos for failed ones 108 for _, repoState := range state.FinishedRepos { 109 // Don't retry repos that failed due to a 400 (they've been deleted) 110 if strings.HasPrefix(repoState.State, "failed") && repoState.State != "failed (status: 400)" { 111 state.EnqueuedRepos[repoState.Repo] = &RepoState{ 112 Repo: repoState.Repo, 113 State: "enqueued", 114 } 115 } 116 } 117 118 // Save state 119 return state.Save() 120 }, 121 }, 122 } 123 124 app.Action = Netsync 125 126 err := app.Run(os.Args) 127 if err != nil { 128 log.Fatal(err) 129 } 130} 131 132type RepoState struct { 133 Repo string 134 State string 135 FinishedAt time.Time 136} 137 138type NetsyncState struct { 139 EnqueuedRepos map[string]*RepoState 140 FinishedRepos map[string]*RepoState 141 StatePath string 142 CheckoutPath string 143 144 outDir string 145 magicHeaderKey string 146 magicHeaderVal string 147 148 logger *slog.Logger 149 150 lk sync.RWMutex 151 wg sync.WaitGroup 152 exit chan struct{} 153 limiter *rate.Limiter 154 workerCount int 155 client *http.Client 156} 157 158type instrumentedReader struct { 159 source io.ReadCloser 160 counter prometheus.Counter 161} 162 163func (r instrumentedReader) Read(b []byte) (int, error) { 164 n, err := r.source.Read(b) 165 r.counter.Add(float64(n)) 166 return n, err 167} 168 169func (r instrumentedReader) Close() error { 170 var buf [32]byte 171 var n int 172 var err error 173 for err == nil { 174 n, err = r.source.Read(buf[:]) 175 r.counter.Add(float64(n)) 176 } 177 closeerr := r.source.Close() 178 if err != nil && err != io.EOF { 179 return err 180 } 181 return closeerr 182} 183 184func (s *NetsyncState) Save() error { 185 s.lk.RLock() 186 defer s.lk.RUnlock() 187 188 stateFile, err := os.OpenFile(s.StatePath, os.O_CREATE|os.O_WRONLY, 0644) 189 if err != nil { 190 return err 191 } 192 defer stateFile.Close() 193 194 stateBytes, err := json.Marshal(s) 195 if err != nil { 196 return err 197 } 198 199 _, err = stateFile.Write(stateBytes) 200 return err 201} 202 203func (s *NetsyncState) Resume() error { 204 stateFile, err := os.Open(s.StatePath) 205 if err != nil { 206 return err 207 } 208 209 stateBytes, err := io.ReadAll(stateFile) 210 if err != nil { 211 return err 212 } 213 214 err = json.Unmarshal(stateBytes, s) 215 if err != nil { 216 return err 217 } 218 219 return nil 220} 221 222var enqueuedJobs = promauto.NewGauge(prometheus.GaugeOpts{ 223 Name: "netsync_enqueued_jobs", 224 Help: "Number of enqueued jobs", 225}) 226 227func (s *NetsyncState) Dequeue() string { 228 s.lk.Lock() 229 defer s.lk.Unlock() 230 231 enqueuedJobs.Set(float64(len(s.EnqueuedRepos))) 232 233 for repo, state := range s.EnqueuedRepos { 234 if state.State == "enqueued" { 235 state.State = "dequeued" 236 return repo 237 } 238 } 239 240 return "" 241} 242 243var finishedJobs = promauto.NewGauge(prometheus.GaugeOpts{ 244 Name: "netsync_finished_jobs", 245 Help: "Number of finished jobs", 246}) 247 248func (s *NetsyncState) Finish(repo string, state string) { 249 s.lk.Lock() 250 defer s.lk.Unlock() 251 252 s.FinishedRepos[repo] = &RepoState{ 253 Repo: repo, 254 State: state, 255 FinishedAt: time.Now(), 256 } 257 258 finishedJobs.Set(float64(len(s.FinishedRepos))) 259 260 delete(s.EnqueuedRepos, repo) 261} 262 263func Netsync(cctx *cli.Context) error { 264 ctx := cctx.Context 265 ctx, cancel := context.WithCancel(ctx) 266 defer cancel() 267 268 logLevel := slog.LevelInfo 269 logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: logLevel, AddSource: true})) 270 slog.SetDefault(slog.New(logger.Handler())) 271 272 state := &NetsyncState{ 273 StatePath: cctx.String("state-file"), 274 CheckoutPath: cctx.String("checkout-path"), 275 276 outDir: cctx.String("out-dir"), 277 workerCount: cctx.Int("worker-count"), 278 limiter: rate.NewLimiter(rate.Limit(cctx.Float64("checkout-limit")), 1), 279 magicHeaderKey: cctx.String("magic-header-key"), 280 magicHeaderVal: cctx.String("magic-header-val"), 281 282 exit: make(chan struct{}), 283 wg: sync.WaitGroup{}, 284 client: &http.Client{ 285 Timeout: 180 * time.Second, 286 }, 287 288 logger: logger, 289 } 290 291 if state.magicHeaderKey != "" && state.magicHeaderVal != "" { 292 logger.Info("using magic header") 293 } 294 295 // Create out dir 296 err := os.MkdirAll(state.outDir, 0755) 297 if err != nil { 298 return err 299 } 300 301 // Try to resume from state file 302 err = state.Resume() 303 if state.EnqueuedRepos == nil { 304 state.EnqueuedRepos = make(map[string]*RepoState) 305 } else { 306 // Reset any dequeued repos 307 for _, repoState := range state.EnqueuedRepos { 308 if repoState.State == "dequeued" { 309 repoState.State = "enqueued" 310 } 311 } 312 } 313 314 if state.FinishedRepos == nil { 315 state.FinishedRepos = make(map[string]*RepoState) 316 } 317 318 if err != nil { 319 // Read repo list 320 repoListFile, err := os.Open(cctx.String("repo-list")) 321 if err != nil { 322 return err 323 } 324 325 fileScanner := bufio.NewScanner(repoListFile) 326 fileScanner.Split(bufio.ScanLines) 327 328 for fileScanner.Scan() { 329 repo := fileScanner.Text() 330 state.EnqueuedRepos[repo] = &RepoState{ 331 Repo: repo, 332 State: "enqueued", 333 } 334 } 335 } else { 336 logger.Info("Resuming from state file") 337 } 338 339 // Start metrics server 340 mux := http.NewServeMux() 341 mux.Handle("/metrics", promhttp.Handler()) 342 343 metricsServer := &http.Server{ 344 Addr: fmt.Sprintf(":%d", cctx.Int("port")), 345 Handler: mux, 346 } 347 348 state.wg.Add(1) 349 go func() { 350 defer state.wg.Done() 351 if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed { 352 logger.Error("failed to start metrics server", "err", err) 353 os.Exit(1) 354 } 355 logger.Info("metrics server shut down successfully") 356 }() 357 358 // Start workers 359 for i := 0; i < state.workerCount; i++ { 360 state.wg.Add(1) 361 go func(id int) { 362 defer state.wg.Done() 363 err := state.worker(id) 364 if err != nil { 365 logger.Error("worker failed", "err", err) 366 } 367 }(i) 368 } 369 370 // Check for empty queue 371 state.wg.Add(1) 372 go func() { 373 defer state.wg.Done() 374 t := time.NewTicker(30 * time.Second) 375 for { 376 select { 377 case <-ctx.Done(): 378 err := state.Save() 379 if err != nil { 380 logger.Error("failed to save state", "err", err) 381 } 382 return 383 case <-t.C: 384 err := state.Save() 385 if err != nil { 386 logger.Error("failed to save state", "err", err) 387 } 388 state.lk.RLock() 389 if len(state.EnqueuedRepos) == 0 { 390 logger.Info("no more repos to clone, shutting down") 391 close(state.exit) 392 return 393 } 394 state.lk.RUnlock() 395 } 396 } 397 }() 398 399 // Trap SIGINT to trigger a shutdown. 400 logger.Info("listening for signals") 401 signals := make(chan os.Signal, 1) 402 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 403 404 select { 405 case sig := <-signals: 406 cancel() 407 close(state.exit) 408 logger.Info("shutting down on signal", "signal", sig) 409 case <-ctx.Done(): 410 cancel() 411 close(state.exit) 412 logger.Info("shutting down on context done") 413 case <-state.exit: 414 cancel() 415 logger.Info("shutting down on exit signal") 416 } 417 418 logger.Info("shutting down, waiting for workers to clean up...") 419 420 if err := metricsServer.Shutdown(ctx); err != nil { 421 logger.Error("failed to shut down metrics server", "err", err) 422 } 423 424 state.wg.Wait() 425 426 logger.Info("shut down successfully") 427 428 return nil 429 430} 431 432func (s *NetsyncState) worker(id int) error { 433 log := s.logger.With("worker", id) 434 log.Info("starting worker") 435 defer log.Info("worker stopped") 436 for { 437 select { 438 case <-s.exit: 439 log.Info("worker exiting due to exit signal") 440 return nil 441 default: 442 ctx := context.Background() 443 // Dequeue repo 444 repo := s.Dequeue() 445 if repo == "" { 446 // No more repos to clone 447 return nil 448 } 449 450 // Wait for rate limiter 451 s.limiter.Wait(ctx) 452 453 // Clone repo 454 cloneState, err := s.cloneRepo(ctx, repo) 455 if err != nil { 456 log.Error("failed to clone repo", "repo", repo, "err", err) 457 } 458 459 // Update state 460 s.Finish(repo, cloneState) 461 log.Info("worker finished", "repo", repo, "status", cloneState) 462 } 463 } 464} 465 466var repoCloneDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 467 Name: "netsync_repo_clone_duration_seconds", 468 Help: "Duration of repo clone operations", 469}, []string{"status"}) 470 471var bytesProcessed = promauto.NewCounter(prometheus.CounterOpts{ 472 Name: "netsync_bytes_processed", 473 Help: "Number of bytes processed", 474}) 475 476func (s *NetsyncState) cloneRepo(ctx context.Context, did string) (cloneState string, err error) { 477 log := s.logger.With("repo", did, "source", "cloneRepo") 478 log.Info("cloning repo") 479 480 start := time.Now() 481 defer func() { 482 duration := time.Since(start) 483 repoCloneDuration.WithLabelValues(cloneState).Observe(duration.Seconds()) 484 }() 485 486 var url = fmt.Sprintf("%s?did=%s", s.CheckoutPath, did) 487 488 // Clone repo 489 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 490 if err != nil { 491 cloneState = "failed (request-creation)" 492 return cloneState, fmt.Errorf("failed to create request: %w", err) 493 } 494 495 req.Header.Set("Accept", "application/vnd.ipld.car") 496 req.Header.Set("User-Agent", "jaz-atproto-netsync/0.0.1") 497 if s.magicHeaderKey != "" && s.magicHeaderVal != "" { 498 req.Header.Set(s.magicHeaderKey, s.magicHeaderVal) 499 } 500 501 resp, err := s.client.Do(req) 502 if err != nil { 503 cloneState = "failed (client.do)" 504 return cloneState, fmt.Errorf("failed to get repo: %w", err) 505 } 506 507 if resp.StatusCode != http.StatusOK { 508 cloneState = fmt.Sprintf("failed (status: %d)", resp.StatusCode) 509 return cloneState, fmt.Errorf("failed to get repo: %s", resp.Status) 510 } 511 512 instrumentedReader := instrumentedReader{ 513 source: resp.Body, 514 counter: bytesProcessed, 515 } 516 defer instrumentedReader.Close() 517 518 // Write to file 519 outPath, err := filepath.Abs(fmt.Sprintf("%s/%s.tar.gz", s.outDir, did)) 520 if err != nil { 521 cloneState = "failed (file.abs)" 522 return cloneState, fmt.Errorf("failed to get absolute path: %w", err) 523 } 524 525 tarFile, err := os.OpenFile(outPath, os.O_CREATE|os.O_WRONLY, 0644) 526 if err != nil { 527 cloneState = "failed (file.open)" 528 return cloneState, fmt.Errorf("failed to open file: %w", err) 529 } 530 defer tarFile.Close() 531 532 gzipWriter := gzip.NewWriter(tarFile) 533 defer gzipWriter.Close() 534 535 tarWriter := tar.NewWriter(gzipWriter) 536 defer tarWriter.Close() 537 538 numRecords := 0 539 collectionsSeen := make(map[string]struct{}) 540 541 r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) 542 if err != nil { 543 log.Error("Error reading repo", "err", err) 544 return "failed (read-repo)", fmt.Errorf("Failed to read repo from CAR: %w", err) 545 } 546 547 err = r.ForEach(ctx, "", func(path string, nodeCid cid.Cid) error { 548 log := log.With("path", path, "nodeCid", nodeCid) 549 550 recordCid, rec, err := r.GetRecordBytes(ctx, path) 551 if err != nil { 552 log.Error("Error getting record", "err", err) 553 return nil 554 } 555 556 // Verify that the record CID matches the node CID 557 if recordCid != nodeCid { 558 log.Error("Mismatch in record and node CID", "recordCID", recordCid, "nodeCID", nodeCid) 559 return nil 560 } 561 562 parts := strings.Split(path, "/") 563 if len(parts) != 2 { 564 log.Error("Path does not have 2 parts", "path", path) 565 return nil 566 } 567 568 collection := parts[0] 569 rkey := parts[1] 570 571 numRecords++ 572 if _, ok := collectionsSeen[collection]; !ok { 573 collectionsSeen[collection] = struct{}{} 574 } 575 576 asCbor, err := data.UnmarshalCBOR(*rec) 577 if err != nil { 578 log.Error("Error unmarshalling record", "err", err) 579 return fmt.Errorf("Failed to unmarshal record: %w", err) 580 } 581 582 recJSON, err := json.Marshal(asCbor) 583 if err != nil { 584 log.Error("Error marshalling record to JSON", "err", err) 585 return fmt.Errorf("Failed to marshal record to JSON: %w", err) 586 } 587 588 // Write the record directly to the tar.gz file 589 hdr := &tar.Header{ 590 Name: fmt.Sprintf("%s/%s.json", collection, rkey), 591 Mode: 0600, 592 Size: int64(len(recJSON)), 593 } 594 if err := tarWriter.WriteHeader(hdr); err != nil { 595 log.Error("Error writing tar header", "err", err) 596 return err 597 } 598 if _, err := tarWriter.Write(recJSON); err != nil { 599 log.Error("Error writing record to tar file", "err", err) 600 return err 601 } 602 603 return nil 604 }) 605 if err != nil { 606 log.Error("Error during ForEach", "err", err) 607 return "failed (for-each)", fmt.Errorf("Error during ForEach: %w", err) 608 } 609 610 log.Info("checkout complete", "numRecords", numRecords, "numCollections", len(collectionsSeen)) 611 612 cloneState = "success" 613 return cloneState, nil 614}