fork of indigo with slightly nicer lexgen
at main 4.2 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/csv" 6 "fmt" 7 "io" 8 "log/slog" 9 "net/url" 10 "os" 11 "strings" 12 "sync/atomic" 13 14 "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/util" 16 "github.com/bluesky-social/indigo/xrpc" 17 18 "github.com/urfave/cli/v2" 19 "golang.org/x/time/rate" 20) 21 22type DidCollection struct { 23 Did string `json:"d"` 24 Collection string `json:"c"` 25} 26 27func DidCollectionsToCsv(out io.Writer, sources <-chan DidCollection) { 28 writer := csv.NewWriter(out) 29 defer writer.Flush() 30 var row [2]string 31 for dc := range sources { 32 row[0] = dc.Did 33 row[1] = dc.Collection 34 writer.Write(row[:]) 35 } 36} 37 38var offlineCrawlCmd = &cli.Command{ 39 Name: "offline_crawl", 40 Usage: "crawl a PDS to csv out", 41 Flags: []cli.Flag{ 42 &cli.StringFlag{ 43 Name: "host", 44 Usage: "hostname or URL of PDS", 45 }, 46 &cli.StringFlag{ 47 Name: "csv-out", 48 Usage: "path for output or - for stdout", 49 }, 50 &cli.Float64Flag{ 51 Name: "qps", 52 Usage: "queries per second to do vs target PDS", 53 Value: 50, // large PDS: 500_000 repos, 10_000 seconds, ~3 hours 54 }, 55 &cli.StringFlag{ 56 Name: "ratelimit-header", 57 Usage: "secret for friend PDSes", 58 EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"}, 59 }, 60 }, 61 Action: func(cctx *cli.Context) error { 62 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) 63 ctx, cancel := context.WithCancel(context.Background()) 64 defer cancel() 65 hostname := cctx.String("host") 66 hosturl, err := url.Parse(hostname) 67 if err != nil { 68 hosturl = new(url.URL) 69 hosturl.Scheme = "https" 70 hosturl.Host = hostname 71 } 72 rpcClient := xrpc.Client{ 73 Host: hosturl.String(), 74 Client: util.RobustHTTPClient(), 75 } 76 if cctx.IsSet("ratelimit-header") { 77 rpcClient.Headers = map[string]string{ 78 "x-ratelimit-bypass": cctx.String("ratelimit-header"), 79 } 80 } 81 log.Info("will crawl", "url", rpcClient.Host) 82 csvOutPath := cctx.String("csv-out") 83 var fout io.Writer = os.Stdout 84 if csvOutPath != "" { 85 if csvOutPath == "-" { 86 fout = os.Stdout 87 } else { 88 fout, err = os.Create(csvOutPath) 89 if err != nil { 90 return fmt.Errorf("%s: could not open for writing: %w", csvOutPath, err) 91 } 92 } 93 } 94 qps := cctx.Float64("qps") 95 results := make(chan DidCollection, 100) 96 defer close(results) 97 go DidCollectionsToCsv(fout, results) 98 crawler := Crawler{ 99 Ctx: ctx, 100 RpcClient: &rpcClient, 101 QPS: qps, 102 Results: results, 103 Log: log, 104 } 105 err = crawler.CrawlPDSRepoCollections() 106 log.Info("done") 107 108 return err 109 }, 110} 111 112type Crawler struct { 113 Ctx context.Context 114 RpcClient *xrpc.Client 115 QPS float64 116 Results chan<- DidCollection 117 Log *slog.Logger 118 Stats *CrawlStats 119} 120 121type CrawlStats struct { 122 ReposDescribed atomic.Uint32 123} 124 125// CrawlPDSRepoCollections 126// write results to chan 127// does _not_ close chan 128// (allow multiple threads of PDS queries running to one output chan, e.g. feeding into SetFromResults() ) 129func (cr *Crawler) CrawlPDSRepoCollections() error { 130 var cursor string 131 limiter := rate.NewLimiter(rate.Limit(cr.QPS), 1) 132 for { 133 limiter.Wait(cr.Ctx) 134 repos, err := atproto.SyncListRepos(cr.Ctx, cr.RpcClient, cursor, 1000) 135 if err != nil { 136 return fmt.Errorf("%s: sync repos: %w", cr.RpcClient.Host, err) 137 } 138 pdsRepoPages.Inc() 139 slog.Debug("got repo list", "count", len(repos.Repos)) 140 for _, xr := range repos.Repos { 141 limiter.Wait(cr.Ctx) 142 desc, err := atproto.RepoDescribeRepo(cr.Ctx, cr.RpcClient, xr.Did) 143 if err != nil { 144 erst := err.Error() 145 if strings.Contains(erst, "RepoDeactivated") || strings.Contains(erst, "RepoTakendown") { 146 slog.Info("repo unavail", "host", cr.RpcClient.Host, "did", xr.Did, "err", err) 147 } else { 148 slog.Warn("repo desc", "host", cr.RpcClient.Host, "did", xr.Did, "err", err) 149 } 150 continue 151 } 152 pdsReposDescribed.Inc() 153 for _, collection := range desc.Collections { 154 cr.Results <- DidCollection{Did: xr.Did, Collection: collection} 155 } 156 if cr.Stats != nil { 157 cr.Stats.ReposDescribed.Add(1) 158 } 159 } 160 if repos.Cursor != nil { 161 cursor = *repos.Cursor 162 } else { 163 break 164 } 165 } 166 return nil 167}