porting all github actions from bluesky-social/indigo to tangled CI
at main 8.2 kB view raw
1package main 2 3import ( 4 "bufio" 5 "bytes" 6 "compress/gzip" 7 "encoding/csv" 8 "encoding/json" 9 "errors" 10 "fmt" 11 "io" 12 "log/slog" 13 "net/http" 14 "net/url" 15 "os" 16 "strconv" 17 "strings" 18 "time" 19 20 "github.com/carlmjohnson/versioninfo" 21 "github.com/urfave/cli/v2" 22) 23 24func main() { 25 app := cli.App{ 26 Name: "collectiondir", 27 Usage: "collection directory service", 28 Version: versioninfo.Short(), 29 Flags: []cli.Flag{ 30 &cli.BoolFlag{ 31 Name: "verbose", 32 }, 33 }, 34 Commands: []*cli.Command{ 35 serveCmd, 36 offlineCrawlCmd, 37 buildCmd, 38 statsCmd, 39 exportCmd, 40 adminCrawlCmd, 41 }, 42 } 43 err := app.Run(os.Args) 44 if err != nil { 45 fmt.Fprintf(os.Stderr, "%s\n", err.Error()) 46 os.Exit(1) 47 } 48} 49 50var statsCmd = &cli.Command{ 51 Name: "stats", 52 Usage: "read stats from a pebble db", 53 Flags: []cli.Flag{ 54 &cli.StringFlag{ 55 Name: "pebble", 56 Usage: "path to pebble db", 57 Required: true, 58 }, 59 }, 60 Action: func(cctx *cli.Context) error { 61 logLevel := slog.LevelInfo 62 if cctx.Bool("verbose") { 63 logLevel = slog.LevelDebug 64 } 65 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 66 slog.SetDefault(log) 67 pebblePath := cctx.String("pebble") 68 var db PebbleCollectionDirectory 69 db.log = log 70 err := db.Open(pebblePath) 71 if err != nil { 72 return err 73 } 74 defer db.Close() 75 76 stats, err := db.GetCollectionStats() 77 if err != nil { 78 return err 79 } 80 blob, err := json.MarshalIndent(stats, "", " ") 81 os.Stdout.Write(blob) 82 os.Stdout.Write([]byte{'\n'}) 83 return nil 84 }, 85} 86 87var exportCmd = &cli.Command{ 88 Name: "export", 89 Usage: "export a pebble db to CSV on stdout", 90 Flags: []cli.Flag{ 91 &cli.StringFlag{ 92 Name: "pebble", 93 Usage: "path to pebble db", 94 Required: true, 95 }, 96 }, 97 Action: func(cctx *cli.Context) error { 98 logLevel := slog.LevelInfo 99 if cctx.Bool("verbose") { 100 logLevel = slog.LevelDebug 101 } 102 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 103 slog.SetDefault(log) 104 pebblePath := cctx.String("pebble") 105 var db PebbleCollectionDirectory 106 db.log = log 107 err := db.Open(pebblePath) 108 if err != nil { 109 return err 110 } 111 defer db.Close() 112 113 rows := make(chan CollectionDidTime, 100) 114 go func() { 115 err := db.ReadAllPrimary(cctx.Context, rows) 116 if err != nil { 117 log.Error("db read", "path", pebblePath, "err", err) 118 } 119 }() 120 121 writer := csv.NewWriter(os.Stdout) 122 defer writer.Flush() 123 err = writer.Write([]string{"did", "collection", "millis"}) 124 if err != nil { 125 log.Error("csv write header", "err", err) 126 } 127 var row [3]string 128 for rowi := range rows { 129 row[0] = rowi.Did 130 row[1] = rowi.Collection 131 row[2] = strconv.FormatInt(rowi.UnixMillis, 10) 132 err = writer.Write(row[:]) 133 if err != nil { 134 log.Error("csv write row", "err", err) 135 } 136 } 137 138 return nil 139 }, 140} 141 142var buildCmd = &cli.Command{ 143 Name: "build", 144 Usage: "collect csv into a database", 145 Flags: []cli.Flag{ 146 &cli.StringFlag{ 147 Name: "csv", 148 Required: true, 149 }, 150 &cli.StringFlag{ 151 Name: "pebble", 152 Usage: "path to store pebble db", 153 Required: true, 154 }, 155 }, 156 Action: func(cctx *cli.Context) error { 157 logLevel := slog.LevelInfo 158 if cctx.Bool("verbose") { 159 logLevel = slog.LevelDebug 160 } 161 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 162 slog.SetDefault(log) 163 pebblePath := cctx.String("pebble") 164 var db PebbleCollectionDirectory 165 db.log = log 166 err := db.Open(pebblePath) 167 if err != nil { 168 return err 169 } 170 defer db.Close() 171 csvPath := cctx.String("csv") 172 var fin io.Reader 173 if csvPath == "-" { 174 fin = os.Stdin 175 } else if strings.HasSuffix(csvPath, ".gz") { 176 osin, err := os.Open(csvPath) 177 if err != nil { 178 return fmt.Errorf("%s: could not open csv, %w", csvPath, err) 179 } 180 defer osin.Close() 181 gzin, err := gzip.NewReader(osin) 182 if err != nil { 183 return fmt.Errorf("%s: could not open csv, %w", csvPath, err) 184 } 185 defer gzin.Close() 186 fin = gzin 187 } else { 188 osin, err := os.Open(csvPath) 189 if err != nil { 190 return fmt.Errorf("%s: could not open csv, %w", csvPath, err) 191 } 192 defer osin.Close() 193 fin = osin 194 } 195 reader := csv.NewReader(fin) 196 rowcount := 0 197 results := make(chan DidCollection, 100) 198 go db.SetFromResults(results) 199 for { 200 row, err := reader.Read() 201 if errors.Is(err, io.EOF) { 202 break 203 } 204 did := row[0] 205 collection := row[1] 206 results <- DidCollection{ 207 Did: did, 208 Collection: collection, 209 } 210 rowcount++ 211 } 212 close(results) 213 log.Debug("read csv", "rows", rowcount) 214 return nil 215 }, 216} 217 218var adminCrawlCmd = &cli.Command{ 219 Name: "crawl", 220 Usage: "admin service to crawl one or more PDSes", 221 Flags: []cli.Flag{ 222 &cli.StringFlag{ 223 Name: "csv", 224 Usage: "path to load csv from, use column 'host' or 'hostname'", 225 }, 226 &cli.StringFlag{ 227 Name: "list", 228 Usage: "path to load hostname list from, one per line", 229 }, 230 &cli.StringFlag{ 231 Name: "url", 232 Usage: "host:port of collectiondir server", 233 Required: true, 234 EnvVars: []string{"COLLECTIONDIR_URL"}, 235 }, 236 &cli.StringFlag{ 237 Name: "auth", 238 Usage: "Auth token for admin api", 239 EnvVars: []string{"ADMIN_AUTH"}, 240 }, 241 }, 242 Action: func(cctx *cli.Context) error { 243 logLevel := slog.LevelInfo 244 if cctx.Bool("verbose") { 245 logLevel = slog.LevelDebug 246 } 247 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 248 slog.SetDefault(log) 249 250 var hostList []string 251 252 serverUrl, err := url.Parse(cctx.String("url")) 253 if err != nil { 254 var e2 error 255 // try to fixup a bare host:port which can confuse url.Parse 256 serverUrl, e2 = url.Parse("http://" + cctx.String("url")) 257 if e2 != nil { 258 return fmt.Errorf("could not parse url, %w", err) 259 } 260 } 261 requestCrawlUrl := serverUrl.JoinPath("/admin/pds/requestCrawl") 262 263 if cctx.IsSet("list") { 264 fin, err := os.Open(cctx.String("list")) 265 if err != nil { 266 return fmt.Errorf("%s: could not open, %w", cctx.String("list"), err) 267 } 268 defer fin.Close() 269 bufin := bufio.NewScanner(fin) 270 for bufin.Scan() { 271 hostList = append(hostList, bufin.Text()) 272 } 273 err = bufin.Err() 274 if err != nil { 275 return fmt.Errorf("%s: error reading, %w", cctx.String("list"), err) 276 } 277 } else if cctx.IsSet("csv") { 278 fin, err := os.Open(cctx.String("csv")) 279 if err != nil { 280 return fmt.Errorf("%s: could not open, %w", cctx.String("csv"), err) 281 } 282 defer fin.Close() 283 data, err := csv.NewReader(fin).ReadAll() 284 if err != nil { 285 return fmt.Errorf("%s: could not read, %w", cctx.String("csv"), err) 286 } 287 if len(data) < 2 { 288 return fmt.Errorf("%s: empty CSV file", cctx.String("csv")) 289 } 290 headerRow := data[0] 291 hostCol := -1 292 for i, v := range headerRow { 293 v = strings.ToLower(v) 294 if v == "host" || v == "hostname" { 295 hostCol = i 296 break 297 } 298 } 299 if hostCol < 0 { 300 return fmt.Errorf("%s: header missing 'host' or 'hostname'", cctx.String("csv")) 301 } 302 for _, row := range data[1:] { 303 hostList = append(hostList, row[hostCol]) 304 } 305 } 306 307 if len(hostList) == 0 { 308 fmt.Println("no hosts") 309 } 310 311 client := http.Client{Timeout: 1 * time.Second} 312 var headers http.Header = make(http.Header) 313 if cctx.IsSet("auth") { 314 headers.Add("Authorization", "Bearer "+cctx.String("auth")) 315 } 316 headers.Add("Content-Type", "application/json") 317 var response *http.Response 318 postReqeust := CrawlRequest{ 319 Hosts: hostList, 320 } 321 reqBlob, err := json.Marshal(postReqeust) 322 reqReader := bytes.NewReader(reqBlob) 323 324 for try := 0; try < 3; try++ { 325 req, err := http.NewRequest("POST", requestCrawlUrl.String(), reqReader) 326 if err != nil { 327 return fmt.Errorf("could not create request, %w", err) 328 } 329 req.Header = headers 330 response, err = client.Do(req) 331 if err == nil && response.StatusCode == 200 { 332 break 333 } else { 334 log.Info("http err", "err", err, "status", response.StatusCode) 335 if try < 2 { 336 time.Sleep(time.Duration(try+1) * 2 * time.Second) 337 } 338 } 339 } 340 if err != nil { 341 return fmt.Errorf("POST %s err %w", requestCrawlUrl.String(), err) 342 } 343 if response.StatusCode != http.StatusOK { 344 return fmt.Errorf("POST %s err %s", requestCrawlUrl.String(), response.Status) 345 } 346 347 return nil 348 }, 349}