1package main
2
3import (
4 "bufio"
5 "bytes"
6 "compress/gzip"
7 "encoding/csv"
8 "encoding/json"
9 "errors"
10 "fmt"
11 "io"
12 "log/slog"
13 "net/http"
14 "net/url"
15 "os"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/carlmjohnson/versioninfo"
21 "github.com/urfave/cli/v2"
22)
23
24func main() {
25 app := cli.App{
26 Name: "collectiondir",
27 Usage: "collection directory service",
28 Version: versioninfo.Short(),
29 Flags: []cli.Flag{
30 &cli.BoolFlag{
31 Name: "verbose",
32 },
33 },
34 Commands: []*cli.Command{
35 serveCmd,
36 offlineCrawlCmd,
37 buildCmd,
38 statsCmd,
39 exportCmd,
40 adminCrawlCmd,
41 },
42 }
43 err := app.Run(os.Args)
44 if err != nil {
45 fmt.Fprintf(os.Stderr, "%s\n", err.Error())
46 os.Exit(1)
47 }
48}
49
50var statsCmd = &cli.Command{
51 Name: "stats",
52 Usage: "read stats from a pebble db",
53 Flags: []cli.Flag{
54 &cli.StringFlag{
55 Name: "pebble",
56 Usage: "path to pebble db",
57 Required: true,
58 },
59 },
60 Action: func(cctx *cli.Context) error {
61 logLevel := slog.LevelInfo
62 if cctx.Bool("verbose") {
63 logLevel = slog.LevelDebug
64 }
65 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel}))
66 slog.SetDefault(log)
67 pebblePath := cctx.String("pebble")
68 var db PebbleCollectionDirectory
69 db.log = log
70 err := db.Open(pebblePath)
71 if err != nil {
72 return err
73 }
74 defer db.Close()
75
76 stats, err := db.GetCollectionStats()
77 if err != nil {
78 return err
79 }
80 blob, err := json.MarshalIndent(stats, "", " ")
81 os.Stdout.Write(blob)
82 os.Stdout.Write([]byte{'\n'})
83 return nil
84 },
85}
86
87var exportCmd = &cli.Command{
88 Name: "export",
89 Usage: "export a pebble db to CSV on stdout",
90 Flags: []cli.Flag{
91 &cli.StringFlag{
92 Name: "pebble",
93 Usage: "path to pebble db",
94 Required: true,
95 },
96 },
97 Action: func(cctx *cli.Context) error {
98 logLevel := slog.LevelInfo
99 if cctx.Bool("verbose") {
100 logLevel = slog.LevelDebug
101 }
102 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel}))
103 slog.SetDefault(log)
104 pebblePath := cctx.String("pebble")
105 var db PebbleCollectionDirectory
106 db.log = log
107 err := db.Open(pebblePath)
108 if err != nil {
109 return err
110 }
111 defer db.Close()
112
113 rows := make(chan CollectionDidTime, 100)
114 go func() {
115 err := db.ReadAllPrimary(cctx.Context, rows)
116 if err != nil {
117 log.Error("db read", "path", pebblePath, "err", err)
118 }
119 }()
120
121 writer := csv.NewWriter(os.Stdout)
122 defer writer.Flush()
123 err = writer.Write([]string{"did", "collection", "millis"})
124 if err != nil {
125 log.Error("csv write header", "err", err)
126 }
127 var row [3]string
128 for rowi := range rows {
129 row[0] = rowi.Did
130 row[1] = rowi.Collection
131 row[2] = strconv.FormatInt(rowi.UnixMillis, 10)
132 err = writer.Write(row[:])
133 if err != nil {
134 log.Error("csv write row", "err", err)
135 }
136 }
137
138 return nil
139 },
140}
141
142var buildCmd = &cli.Command{
143 Name: "build",
144 Usage: "collect csv into a database",
145 Flags: []cli.Flag{
146 &cli.StringFlag{
147 Name: "csv",
148 Required: true,
149 },
150 &cli.StringFlag{
151 Name: "pebble",
152 Usage: "path to store pebble db",
153 Required: true,
154 },
155 },
156 Action: func(cctx *cli.Context) error {
157 logLevel := slog.LevelInfo
158 if cctx.Bool("verbose") {
159 logLevel = slog.LevelDebug
160 }
161 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel}))
162 slog.SetDefault(log)
163 pebblePath := cctx.String("pebble")
164 var db PebbleCollectionDirectory
165 db.log = log
166 err := db.Open(pebblePath)
167 if err != nil {
168 return err
169 }
170 defer db.Close()
171 csvPath := cctx.String("csv")
172 var fin io.Reader
173 if csvPath == "-" {
174 fin = os.Stdin
175 } else if strings.HasSuffix(csvPath, ".gz") {
176 osin, err := os.Open(csvPath)
177 if err != nil {
178 return fmt.Errorf("%s: could not open csv, %w", csvPath, err)
179 }
180 defer osin.Close()
181 gzin, err := gzip.NewReader(osin)
182 if err != nil {
183 return fmt.Errorf("%s: could not open csv, %w", csvPath, err)
184 }
185 defer gzin.Close()
186 fin = gzin
187 } else {
188 osin, err := os.Open(csvPath)
189 if err != nil {
190 return fmt.Errorf("%s: could not open csv, %w", csvPath, err)
191 }
192 defer osin.Close()
193 fin = osin
194 }
195 reader := csv.NewReader(fin)
196 rowcount := 0
197 results := make(chan DidCollection, 100)
198 go db.SetFromResults(results)
199 for {
200 row, err := reader.Read()
201 if errors.Is(err, io.EOF) {
202 break
203 }
204 did := row[0]
205 collection := row[1]
206 results <- DidCollection{
207 Did: did,
208 Collection: collection,
209 }
210 rowcount++
211 }
212 close(results)
213 log.Debug("read csv", "rows", rowcount)
214 return nil
215 },
216}
217
218var adminCrawlCmd = &cli.Command{
219 Name: "crawl",
220 Usage: "admin service to crawl one or more PDSes",
221 Flags: []cli.Flag{
222 &cli.StringFlag{
223 Name: "csv",
224 Usage: "path to load csv from, use column 'host' or 'hostname'",
225 },
226 &cli.StringFlag{
227 Name: "list",
228 Usage: "path to load hostname list from, one per line",
229 },
230 &cli.StringFlag{
231 Name: "url",
232 Usage: "host:port of collectiondir server",
233 Required: true,
234 EnvVars: []string{"COLLECTIONDIR_URL"},
235 },
236 &cli.StringFlag{
237 Name: "auth",
238 Usage: "Auth token for admin api",
239 EnvVars: []string{"ADMIN_AUTH"},
240 },
241 },
242 Action: func(cctx *cli.Context) error {
243 logLevel := slog.LevelInfo
244 if cctx.Bool("verbose") {
245 logLevel = slog.LevelDebug
246 }
247 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel}))
248 slog.SetDefault(log)
249
250 var hostList []string
251
252 serverUrl, err := url.Parse(cctx.String("url"))
253 if err != nil {
254 var e2 error
255 // try to fixup a bare host:port which can confuse url.Parse
256 serverUrl, e2 = url.Parse("http://" + cctx.String("url"))
257 if e2 != nil {
258 return fmt.Errorf("could not parse url, %w", err)
259 }
260 }
261 requestCrawlUrl := serverUrl.JoinPath("/admin/pds/requestCrawl")
262
263 if cctx.IsSet("list") {
264 fin, err := os.Open(cctx.String("list"))
265 if err != nil {
266 return fmt.Errorf("%s: could not open, %w", cctx.String("list"), err)
267 }
268 defer fin.Close()
269 bufin := bufio.NewScanner(fin)
270 for bufin.Scan() {
271 hostList = append(hostList, bufin.Text())
272 }
273 err = bufin.Err()
274 if err != nil {
275 return fmt.Errorf("%s: error reading, %w", cctx.String("list"), err)
276 }
277 } else if cctx.IsSet("csv") {
278 fin, err := os.Open(cctx.String("csv"))
279 if err != nil {
280 return fmt.Errorf("%s: could not open, %w", cctx.String("csv"), err)
281 }
282 defer fin.Close()
283 data, err := csv.NewReader(fin).ReadAll()
284 if err != nil {
285 return fmt.Errorf("%s: could not read, %w", cctx.String("csv"), err)
286 }
287 if len(data) < 2 {
288 return fmt.Errorf("%s: empty CSV file", cctx.String("csv"))
289 }
290 headerRow := data[0]
291 hostCol := -1
292 for i, v := range headerRow {
293 v = strings.ToLower(v)
294 if v == "host" || v == "hostname" {
295 hostCol = i
296 break
297 }
298 }
299 if hostCol < 0 {
300 return fmt.Errorf("%s: header missing 'host' or 'hostname'", cctx.String("csv"))
301 }
302 for _, row := range data[1:] {
303 hostList = append(hostList, row[hostCol])
304 }
305 }
306
307 if len(hostList) == 0 {
308 fmt.Println("no hosts")
309 }
310
311 client := http.Client{Timeout: 1 * time.Second}
312 var headers http.Header = make(http.Header)
313 if cctx.IsSet("auth") {
314 headers.Add("Authorization", "Bearer "+cctx.String("auth"))
315 }
316 headers.Add("Content-Type", "application/json")
317 var response *http.Response
318 postReqeust := CrawlRequest{
319 Hosts: hostList,
320 }
321 reqBlob, err := json.Marshal(postReqeust)
322 reqReader := bytes.NewReader(reqBlob)
323
324 for try := 0; try < 3; try++ {
325 req, err := http.NewRequest("POST", requestCrawlUrl.String(), reqReader)
326 if err != nil {
327 return fmt.Errorf("could not create request, %w", err)
328 }
329 req.Header = headers
330 response, err = client.Do(req)
331 if err == nil && response.StatusCode == 200 {
332 break
333 } else {
334 log.Info("http err", "err", err, "status", response.StatusCode)
335 if try < 2 {
336 time.Sleep(time.Duration(try+1) * 2 * time.Second)
337 }
338 }
339 }
340 if err != nil {
341 return fmt.Errorf("POST %s err %w", requestCrawlUrl.String(), err)
342 }
343 if response.StatusCode != http.StatusOK {
344 return fmt.Errorf("POST %s err %s", requestCrawlUrl.String(), response.Status)
345 }
346
347 return nil
348 },
349}