1package main
2
3import (
4 "context"
5 "encoding/csv"
6 "fmt"
7 "io"
8 "log/slog"
9 "net/url"
10 "os"
11 "strings"
12 "sync/atomic"
13
14 "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/util"
16 "github.com/bluesky-social/indigo/xrpc"
17
18 "github.com/urfave/cli/v2"
19 "golang.org/x/time/rate"
20)
21
22type DidCollection struct {
23 Did string `json:"d"`
24 Collection string `json:"c"`
25}
26
27func DidCollectionsToCsv(out io.Writer, sources <-chan DidCollection) {
28 writer := csv.NewWriter(out)
29 defer writer.Flush()
30 var row [2]string
31 for dc := range sources {
32 row[0] = dc.Did
33 row[1] = dc.Collection
34 writer.Write(row[:])
35 }
36}
37
38var offlineCrawlCmd = &cli.Command{
39 Name: "offline_crawl",
40 Usage: "crawl a PDS to csv out",
41 Flags: []cli.Flag{
42 &cli.StringFlag{
43 Name: "host",
44 Usage: "hostname or URL of PDS",
45 },
46 &cli.StringFlag{
47 Name: "csv-out",
48 Usage: "path for output or - for stdout",
49 },
50 &cli.Float64Flag{
51 Name: "qps",
52 Usage: "queries per second to do vs target PDS",
53 Value: 50, // large PDS: 500_000 repos, 10_000 seconds, ~3 hours
54 },
55 &cli.StringFlag{
56 Name: "ratelimit-header",
57 Usage: "secret for friend PDSes",
58 EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"},
59 },
60 },
61 Action: func(cctx *cli.Context) error {
62 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
63 ctx, cancel := context.WithCancel(context.Background())
64 defer cancel()
65 hostname := cctx.String("host")
66 hosturl, err := url.Parse(hostname)
67 if err != nil {
68 hosturl = new(url.URL)
69 hosturl.Scheme = "https"
70 hosturl.Host = hostname
71 }
72 rpcClient := xrpc.Client{
73 Host: hosturl.String(),
74 Client: util.RobustHTTPClient(),
75 }
76 if cctx.IsSet("ratelimit-header") {
77 rpcClient.Headers = map[string]string{
78 "x-ratelimit-bypass": cctx.String("ratelimit-header"),
79 }
80 }
81 log.Info("will crawl", "url", rpcClient.Host)
82 csvOutPath := cctx.String("csv-out")
83 var fout io.Writer = os.Stdout
84 if csvOutPath != "" {
85 if csvOutPath == "-" {
86 fout = os.Stdout
87 } else {
88 fout, err = os.Create(csvOutPath)
89 if err != nil {
90 return fmt.Errorf("%s: could not open for writing: %w", csvOutPath, err)
91 }
92 }
93 }
94 qps := cctx.Float64("qps")
95 results := make(chan DidCollection, 100)
96 defer close(results)
97 go DidCollectionsToCsv(fout, results)
98 crawler := Crawler{
99 Ctx: ctx,
100 RpcClient: &rpcClient,
101 QPS: qps,
102 Results: results,
103 Log: log,
104 }
105 err = crawler.CrawlPDSRepoCollections()
106 log.Info("done")
107
108 return err
109 },
110}
111
112type Crawler struct {
113 Ctx context.Context
114 RpcClient *xrpc.Client
115 QPS float64
116 Results chan<- DidCollection
117 Log *slog.Logger
118 Stats *CrawlStats
119}
120
121type CrawlStats struct {
122 ReposDescribed atomic.Uint32
123}
124
125// CrawlPDSRepoCollections
126// write results to chan
127// does _not_ close chan
128// (allow multiple threads of PDS queries running to one output chan, e.g. feeding into SetFromResults() )
129func (cr *Crawler) CrawlPDSRepoCollections() error {
130 var cursor string
131 limiter := rate.NewLimiter(rate.Limit(cr.QPS), 1)
132 for {
133 limiter.Wait(cr.Ctx)
134 repos, err := atproto.SyncListRepos(cr.Ctx, cr.RpcClient, cursor, 1000)
135 if err != nil {
136 return fmt.Errorf("%s: sync repos: %w", cr.RpcClient.Host, err)
137 }
138 pdsRepoPages.Inc()
139 slog.Debug("got repo list", "count", len(repos.Repos))
140 for _, xr := range repos.Repos {
141 limiter.Wait(cr.Ctx)
142 desc, err := atproto.RepoDescribeRepo(cr.Ctx, cr.RpcClient, xr.Did)
143 if err != nil {
144 erst := err.Error()
145 if strings.Contains(erst, "RepoDeactivated") || strings.Contains(erst, "RepoTakendown") {
146 slog.Info("repo unavail", "host", cr.RpcClient.Host, "did", xr.Did, "err", err)
147 } else {
148 slog.Warn("repo desc", "host", cr.RpcClient.Host, "did", xr.Did, "err", err)
149 }
150 continue
151 }
152 pdsReposDescribed.Inc()
153 for _, collection := range desc.Collections {
154 cr.Results <- DidCollection{Did: xr.Did, Collection: collection}
155 }
156 if cr.Stats != nil {
157 cr.Stats.ReposDescribed.Add(1)
158 }
159 }
160 if repos.Cursor != nil {
161 cursor = *repos.Cursor
162 } else {
163 break
164 }
165 }
166 return nil
167}