porting all github actions from bluesky-social/indigo to tangled CI
at main 28 kB view raw
1package main 2 3import ( 4 "compress/gzip" 5 "context" 6 "encoding/csv" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "log/slog" 11 "net" 12 "net/http" 13 _ "net/http/pprof" 14 "net/url" 15 "os" 16 "os/signal" 17 "path/filepath" 18 "regexp" 19 "sort" 20 "strconv" 21 "strings" 22 "sync" 23 "syscall" 24 "time" 25 26 comatproto "github.com/bluesky-social/indigo/api/atproto" 27 "github.com/bluesky-social/indigo/atproto/syntax" 28 "github.com/bluesky-social/indigo/events" 29 "github.com/bluesky-social/indigo/util/svcutil" 30 "github.com/bluesky-social/indigo/xrpc" 31 32 "github.com/hashicorp/golang-lru/v2" 33 "github.com/labstack/echo/v4" 34 "github.com/labstack/echo/v4/middleware" 35 "github.com/prometheus/client_golang/prometheus/promhttp" 36 "github.com/urfave/cli/v2" 37) 38 39var serveCmd = &cli.Command{ 40 Name: "serve", 41 Flags: []cli.Flag{ 42 &cli.StringFlag{ 43 Name: "api-listen", 44 Value: ":2510", 45 EnvVars: []string{"COLLECTIONS_API_LISTEN"}, 46 }, 47 &cli.StringFlag{ 48 Name: "metrics-listen", 49 Value: ":2511", 50 EnvVars: []string{"COLLECTIONS_METRICS_LISTEN"}, 51 }, 52 &cli.StringFlag{ 53 Name: "pebble", 54 Usage: "path to store pebble db", 55 Required: true, 56 }, 57 &cli.StringFlag{ 58 Name: "dau-directory", 59 Usage: "directory to store DAU pebble db", 60 Required: true, 61 }, 62 &cli.StringFlag{ 63 Name: "upstream", 64 Usage: "URL, e.g. wss://bsky.network", 65 EnvVars: []string{"COLLECTIONS_UPSTREAM"}, 66 }, 67 &cli.StringFlag{ 68 Name: "admin-token", 69 Usage: "admin authentication", 70 EnvVars: []string{"COLLECTIONS_ADMIN_TOKEN"}, 71 }, 72 &cli.Float64Flag{ 73 Name: "crawl-qps", 74 Usage: "per-PDS crawl queries-per-second limit", 75 Value: 100, 76 }, 77 &cli.StringFlag{ 78 Name: "ratelimit-header", 79 Usage: "secret for friend PDSes", 80 EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"}, 81 }, 82 &cli.Uint64Flag{ 83 Name: "clist-min-dids", 84 Usage: "filter collection list to >= N dids", 85 Value: 5, 86 EnvVars: []string{"COLLECTIONS_CLIST_MIN_DIDS"}, 87 }, 88 &cli.IntFlag{ 89 Name: "max-did-collections", 90 Usage: "stop recording new collections per did after it has >= this many collections", 91 Value: 1000, 92 EnvVars: []string{"COLLECTIONS_MAX_DID_COLLECTIONS"}, 93 }, 94 &cli.StringFlag{ 95 Name: "sets-json-path", 96 Usage: "file path of JSON file containing static word sets", 97 EnvVars: []string{"HEPA_SETS_JSON_PATH", "COLLECTIONS_SETS_JSON_PATH"}, 98 }, 99 &cli.BoolFlag{ 100 Name: "verbose", 101 }, 102 }, 103 Action: func(cctx *cli.Context) error { 104 var server collectionServer 105 return server.run(cctx) 106 }, 107} 108 109type BadwordChecker interface { 110 HasBadword(string) bool 111} 112 113type collectionServer struct { 114 ctx context.Context 115 116 // the primary directory, all repos ever and their collections 117 pcd *PebbleCollectionDirectory 118 119 // daily-active-user directory, new directory every 00:00:00 UTC 120 dauDirectory *PebbleCollectionDirectory 121 dauDirectoryPath string // currently open dauDirectory, {dauDirectoryDir}/{YYYY}{mm}{dd}.pebble 122 dauDay time.Time // YYYY-MM-DD 00:00:00 UTC 123 dauTomorrow time.Time 124 dauDirectoryDir string 125 126 statsCache *CollectionStats 127 statsCacheWhen time.Time 128 statsCacheLock sync.Mutex 129 statsCacheFresh sync.Cond 130 statsCachePending bool 131 132 // (did,collection) pairs from firehose 133 ingestFirehose chan DidCollection 134 // (did,collection) pairs from PDS crawl (don't apply to dauDirectory) 135 ingestCrawl chan DidCollection 136 137 log *slog.Logger 138 139 AdminToken string 140 ExepctedAuthHeader string 141 PerPDSCrawlQPS float64 142 143 activeCrawls map[string]activeCrawl 144 activeCrawlsLock sync.Mutex 145 146 shutdown chan struct{} 147 148 wg sync.WaitGroup 149 150 ratelimitHeader string 151 152 apiServer *http.Server 153 metricsServer *http.Server 154 155 MinDidsForCollectionList uint64 156 MaxDidCollections int 157 158 didCollectionCounts *lru.Cache[string, int] 159 160 badwords BadwordChecker 161} 162 163type activeCrawl struct { 164 start time.Time 165 stats *CrawlStats 166} 167 168func (cs *collectionServer) run(cctx *cli.Context) error { 169 signals := make(chan os.Signal, 1) 170 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 171 cs.shutdown = make(chan struct{}) 172 level := slog.LevelInfo 173 if cctx.Bool("verbose") { 174 level = slog.LevelDebug 175 } 176 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})) 177 slog.SetDefault(log) 178 179 if cctx.IsSet("ratelimit-header") { 180 cs.ratelimitHeader = cctx.String("ratelimit-header") 181 } 182 if cctx.IsSet("sets-json-path") { 183 badwords, err := loadBadwords(cctx.String("sets-json-path")) 184 if err != nil { 185 return err 186 } 187 cs.badwords = badwords 188 } 189 cs.MinDidsForCollectionList = cctx.Uint64("clist-min-dids") 190 cs.MaxDidCollections = cctx.Int("max-did-collections") 191 cs.ingestFirehose = make(chan DidCollection, 1000) 192 cs.ingestCrawl = make(chan DidCollection, 1000) 193 var err error 194 cs.didCollectionCounts, err = lru.New[string, int](1_000_000) // TODO: configurable LRU size 195 if err != nil { 196 return fmt.Errorf("lru init, %w", err) 197 } 198 cs.log = log 199 cs.ctx = cctx.Context 200 cs.AdminToken = cctx.String("admin-token") 201 cs.ExepctedAuthHeader = "Bearer " + cs.AdminToken 202 cs.wg.Add(1) 203 go cs.ingestReceiver() 204 pebblePath := cctx.String("pebble") 205 cs.pcd = &PebbleCollectionDirectory{ 206 log: cs.log, 207 } 208 err = cs.pcd.Open(pebblePath) 209 if err != nil { 210 return fmt.Errorf("%s: failed to open pebble db: %w", pebblePath, err) 211 } 212 cs.dauDirectoryDir = cctx.String("dau-directory") 213 if cs.dauDirectoryDir != "" { 214 err := cs.openDau() 215 if err != nil { 216 return err 217 } 218 } 219 cs.statsCacheFresh.L = &cs.statsCacheLock 220 221 apiServerEcho, err := cs.createApiServer(cctx.Context, cctx.String("api-listen")) 222 if err != nil { 223 return err 224 } 225 cs.wg.Add(1) 226 go func() { cs.StartApiServer(cctx.Context, apiServerEcho) }() 227 228 cs.createMetricsServer(cctx.String("metrics-listen")) 229 cs.wg.Add(1) 230 go func() { cs.StartMetricsServer(cctx.Context) }() 231 232 upstream := cctx.String("upstream") 233 if upstream != "" { 234 fh := Firehose{ 235 Log: log, 236 Host: upstream, 237 Seq: -1, 238 } 239 seq, seqok, err := cs.pcd.GetSequence() 240 if err != nil { 241 cs.log.Warn("db get seq", "err", err) 242 } else if seqok { 243 fh.Seq = seq 244 } 245 fhevents := make(chan *events.XRPCStreamEvent, 1000) 246 cs.wg.Add(1) 247 go cs.firehoseThread(&fh, fhevents) 248 cs.wg.Add(1) 249 go cs.handleFirehose(fhevents) 250 } 251 252 <-signals 253 log.Info("received shutdown signal") 254 return cs.Shutdown() 255} 256 257func (cs *collectionServer) openDau() error { 258 now := time.Now().UTC() 259 ymd := now.Format("2006-01-02") 260 fname := fmt.Sprintf("d%s.pebble", ymd) 261 fpath := filepath.Join(cs.dauDirectoryDir, fname) 262 daud := &PebbleCollectionDirectory{ 263 log: cs.log, 264 } 265 err := daud.Open(fpath) 266 if err != nil { 267 return fmt.Errorf("%s: failed to open dau pebble db: %w", fpath, err) 268 } 269 cs.dauDirectory = daud 270 cs.dauDirectoryPath = fpath 271 cs.dauDay = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) 272 cs.dauTomorrow = cs.dauDay.AddDate(0, 0, 1) 273 cs.log.Info("DAU db opened", "path", fpath) 274 return nil 275} 276 277func (cs *collectionServer) Shutdown() error { 278 close(cs.shutdown) 279 280 func() { 281 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 282 defer cancel() 283 284 cs.log.Info("metrics shutdown start") 285 sherr := cs.metricsServer.Shutdown(ctx) 286 cs.log.Info("metrics shutdown", "err", sherr) 287 }() 288 289 func() { 290 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 291 defer cancel() 292 293 cs.log.Info("api shutdown start...") 294 err := cs.apiServer.Shutdown(ctx) 295 cs.log.Info("api shutdown, thread wait...", "err", err) 296 }() 297 298 cs.log.Info("threads done, db close...") 299 err := cs.pcd.Close() 300 if err != nil { 301 cs.log.Error("failed to shutdown pebble", "err", err) 302 } 303 cs.log.Info("db done. done.") 304 cs.wg.Wait() 305 return err 306} 307 308// firehoseThreads is responsible for connecting to upstream firehose source 309func (cs *collectionServer) firehoseThread(fh *Firehose, fhevents chan<- *events.XRPCStreamEvent) { 310 defer cs.wg.Done() 311 defer cs.log.Info("firehoseThread exit") 312 ctx, cancel := context.WithCancel(cs.ctx) 313 go func() { 314 <-cs.shutdown 315 cancel() 316 }() 317 err := fh.subscribeWithRedialer(ctx, fhevents) 318 if err != nil { 319 cs.log.Error("failed to subscribe to redialer", "err", err) 320 } 321 if fh.Seq >= 0 { 322 err := cs.pcd.SetSequence(fh.Seq) 323 if err != nil { 324 cs.log.Warn("db set seq", "err", err) 325 } 326 } 327} 328 329// handleFirehose consumes XRPCStreamEvent from firehoseThread(), further parses data and applies 330func (cs *collectionServer) handleFirehose(fhevents <-chan *events.XRPCStreamEvent) { 331 defer cs.wg.Done() 332 defer cs.log.Info("handleFirehose exit") 333 defer close(cs.ingestFirehose) 334 var lastSeq int64 335 lastSeqSet := false 336 notDone := true 337 for notDone { 338 select { 339 case <-cs.shutdown: 340 cs.log.Info("firehose handler shutdown") 341 notDone = false 342 case evt, ok := <-fhevents: 343 if !ok { 344 notDone = false 345 cs.log.Info("firehose handler closed") 346 break 347 } 348 firehoseReceivedCounter.Inc() 349 seq, ok := evt.GetSequence() 350 if ok { 351 lastSeq = seq 352 lastSeqSet = true 353 } 354 if evt.RepoCommit != nil { 355 firehoseCommits.Inc() 356 cs.handleCommit(evt.RepoCommit) 357 } 358 } 359 } 360 if lastSeqSet { 361 cs.pcd.SetSequence(lastSeq) 362 } 363} 364 365func (cs *collectionServer) handleCommit(commit *comatproto.SyncSubscribeRepos_Commit) { 366 for _, op := range commit.Ops { 367 // op.Path is collection/rkey 368 nsid, _, err := syntax.ParseRepoPath(op.Path) 369 if err != nil { 370 cs.log.Warn("bad op path", "repo", commit.Repo, "err", err) 371 return 372 } 373 firehoseCommitOps.WithLabelValues(op.Action).Inc() 374 if op.Action == "create" || op.Action == "update" { 375 firehoseDidcSet.Inc() 376 cs.ingestFirehose <- DidCollection{ 377 Did: commit.Repo, 378 Collection: nsid.String(), 379 } 380 } 381 } 382} 383 384func (cs *collectionServer) createMetricsServer(addr string) { 385 e := echo.New() 386 e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 387 e.Any("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux)) 388 389 cs.metricsServer = &http.Server{ 390 Addr: addr, 391 Handler: e, 392 } 393} 394 395func (cs *collectionServer) StartMetricsServer(ctx context.Context) { 396 defer cs.wg.Done() 397 defer cs.log.Info("metrics server exit") 398 399 err := cs.metricsServer.ListenAndServe() 400 if err != nil && !errors.Is(err, http.ErrServerClosed) { 401 slog.Error("error in metrics server", "err", err) 402 os.Exit(1) 403 } 404} 405 406func (cs *collectionServer) createApiServer(ctx context.Context, addr string) (*echo.Echo, error) { 407 var lc net.ListenConfig 408 li, err := lc.Listen(ctx, "tcp", addr) 409 if err != nil { 410 return nil, err 411 } 412 e := echo.New() 413 e.HideBanner = true 414 415 e.Use(svcutil.MetricsMiddleware) 416 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ 417 AllowOrigins: []string{"*"}, 418 AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, 419 })) 420 421 e.GET("/_health", cs.healthz) 422 423 e.GET("/xrpc/com.atproto.sync.listReposByCollection", cs.getDidsForCollection) 424 e.GET("/v1/getDidsForCollection", cs.getDidsForCollection) 425 e.GET("/v1/listCollections", cs.listCollections) 426 427 // TODO: allow public 'requestCrawl' API? 428 //e.GET("/xrpc/com.atproto.sync.requestCrawl", cs.crawlPds) 429 //e.POST("/xrpc/com.atproto.sync.requestCrawl", cs.crawlPds) 430 431 // admin auth heador required 432 e.POST("/admin/pds/requestCrawl", cs.crawlPds) // same as relay 433 e.GET("/admin/crawlStatus", cs.crawlStatus) 434 435 e.Listener = li 436 srv := &http.Server{ 437 Handler: e, 438 } 439 cs.apiServer = srv 440 return e, nil 441} 442 443func (cs *collectionServer) StartApiServer(ctx context.Context, e *echo.Echo) { 444 defer cs.wg.Done() 445 defer cs.log.Info("api server exit") 446 err := cs.apiServer.Serve(e.Listener) 447 if err != nil && !errors.Is(err, http.ErrServerClosed) { 448 slog.Error("error in api server", "err", err) 449 os.Exit(1) 450 } 451} 452 453const statsCacheDuration = time.Second * 300 454 455func getLimit(c echo.Context, min, defaultLim, max int) int { 456 limstr := c.QueryParam("limit") 457 if limstr == "" { 458 return defaultLim 459 } 460 lvx, err := strconv.ParseInt(limstr, 10, 64) 461 if err != nil { 462 return defaultLim 463 } 464 lv := int(lvx) 465 if lv < min { 466 return min 467 } 468 if lv > max { 469 return max 470 } 471 return lv 472} 473 474// /xrpc/com.atproto.sync.listReposByCollection?collection={}&cursor={}&limit={50<=N<=1000} 475// /v1/getDidsForCollection?collection={}&cursor={}&limit={50<=N<=1000} 476// 477// returns 478// {"dids":["did:A", "..."], "cursor":"opaque text"} 479func (cs *collectionServer) getDidsForCollection(c echo.Context) error { 480 ctx := c.Request().Context() 481 collection := c.QueryParam("collection") 482 _, err := syntax.ParseNSID(collection) 483 if err != nil { 484 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("bad collection nsid, %s", err.Error())}) 485 } 486 cursor := c.QueryParam("cursor") 487 limit := getLimit(c, 1, 500, 10_000) 488 they, nextCursor, err := cs.pcd.ReadCollection(ctx, collection, cursor, limit) 489 if err != nil { 490 slog.Error("ReadCollection", "collection", collection, "cursor", cursor, "limit", limit, "err", err) 491 return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to read DIDs for collection"}) 492 } 493 cs.log.Info("getDidsForCollection", "collection", collection, "cursor", cursor, "limit", limit, "count", len(they), "nextCursor", nextCursor) 494 var out comatproto.SyncListReposByCollection_Output 495 out.Repos = make([]*comatproto.SyncListReposByCollection_Repo, len(they)) 496 for i, rec := range they { 497 out.Repos[i] = &comatproto.SyncListReposByCollection_Repo{Did: rec.Did} 498 } 499 if nextCursor != "" { 500 out.Cursor = &nextCursor 501 } 502 return c.JSON(http.StatusOK, out) 503} 504 505// return cached collection stats if they're fresh 506// return new collection stats if they can be calculated quickly 507// return stale cached collection stats if new stats take too long 508// just wait for fresh stats if there are no cached stats 509// stalenessAllowed is how old stats can be before we try to recalculate them, 0=default of 5 minutes 510func (cs *collectionServer) getStatsCache(stalenessAllowed time.Duration) (*CollectionStats, error) { 511 if stalenessAllowed <= 0 { 512 stalenessAllowed = statsCacheDuration 513 } 514 var statsCache *CollectionStats 515 var staleCache *CollectionStats 516 var waiter *freshStatsWaiter 517 cs.statsCacheLock.Lock() 518 if cs.statsCache != nil { 519 if time.Since(cs.statsCacheWhen) < stalenessAllowed { 520 // has fresh! 521 statsCache = cs.statsCache 522 } else if !cs.statsCachePending { 523 cs.statsCachePending = true 524 go cs.statsBuilder() 525 staleCache = cs.statsCache 526 } else { 527 staleCache = cs.statsCache 528 } 529 if staleCache != nil { 530 waiter = &freshStatsWaiter{ 531 cs: cs, 532 freshCache: make(chan *CollectionStats), 533 } 534 go waiter.waiter() 535 } 536 } else if !cs.statsCachePending { 537 cs.statsCachePending = true 538 go cs.statsBuilder() 539 } 540 cs.statsCacheLock.Unlock() 541 542 if statsCache != nil { 543 // return fresh-enough data 544 return statsCache, nil 545 } 546 547 if staleCache == nil { 548 // block forever waiting for fresh data 549 cs.statsCacheLock.Lock() 550 for cs.statsCache == nil { 551 cs.statsCacheFresh.Wait() 552 } 553 statsCache = cs.statsCache 554 cs.statsCacheLock.Unlock() 555 return statsCache, nil 556 } 557 558 // wait for up to a second for fresh data, on timeout return stale data 559 timeout := time.NewTimer(time.Second) 560 defer timeout.Stop() 561 select { 562 case <-timeout.C: 563 cs.statsCacheLock.Lock() 564 waiter.l.Lock() 565 waiter.obsolete = true 566 waiter.l.Unlock() 567 cs.statsCacheLock.Unlock() 568 return staleCache, nil 569 case statsCache = <-waiter.freshCache: 570 return statsCache, nil 571 } 572} 573 574type freshStatsWaiter struct { 575 cs *collectionServer 576 l sync.Mutex 577 obsolete bool 578 freshCache chan *CollectionStats 579} 580 581func (fsw *freshStatsWaiter) waiter() { 582 fsw.cs.statsCacheLock.Lock() 583 defer fsw.cs.statsCacheLock.Unlock() 584 fsw.cs.statsCacheFresh.Wait() 585 fsw.l.Lock() 586 defer fsw.l.Unlock() 587 if fsw.obsolete { 588 close(fsw.freshCache) 589 } else { 590 fsw.freshCache <- fsw.cs.statsCache 591 } 592} 593 594func (cs *collectionServer) statsBuilder() { 595 for { 596 start := time.Now() 597 stats, err := cs.pcd.GetCollectionStats() 598 dt := time.Since(start) 599 if err == nil { 600 statsCalculations.Observe(dt.Seconds()) 601 countsum := uint64(0) 602 for _, v := range stats.CollectionCounts { 603 countsum += v 604 } 605 cs.log.Info("stats built", "dt", dt, "total", countsum) 606 cs.statsCacheLock.Lock() 607 cs.statsCache = &stats 608 cs.statsCacheWhen = time.Now() 609 cs.statsCacheFresh.Broadcast() 610 cs.statsCachePending = false 611 cs.statsCacheLock.Unlock() 612 return 613 } else { 614 cs.log.Error("GetCollectionStats", "dt", dt, "err", err) 615 time.Sleep(2 * time.Second) 616 } 617 } 618} 619 620func (cs *collectionServer) hasBadword(collection string) bool { 621 if cs.badwords != nil { 622 return cs.badwords.HasBadword(collection) 623 } 624 return false 625} 626 627// /v1/listCollections?c={}&cursor={}&limit={50<=limit<=1000} 628// 629// admin may set ?stalesec={} for a maximum number of seconds stale data is accepted 630// 631// returns 632// {"collections":{"app.bsky.feed.post": 123456789, "some collection": 42}, "cursor":"opaque text"} 633func (cs *collectionServer) listCollections(c echo.Context) error { 634 stalenessAllowed := statsCacheDuration 635 stalesecStr := c.QueryParam("stalesec") 636 if stalesecStr != "" && cs.isAdmin(c) { 637 stalesec, err := strconv.ParseInt(stalesecStr, 10, 64) 638 if err != nil { 639 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "invalid 'stalesec' query parameter"}) 640 } 641 if stalesec == 0 { 642 stalenessAllowed = 1 643 } else { 644 stalenessAllowed = time.Duration(stalesec) * time.Second 645 } 646 cs.log.Info("stalesec", "q", stalesecStr, "d", stalenessAllowed) 647 } 648 stats, err := cs.getStatsCache(stalenessAllowed) 649 if err != nil { 650 slog.Error("getStatsCache", "err", err) 651 return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to read stats"}) 652 } 653 cursor := c.QueryParam("cursor") 654 collections, hasQueryCollections := c.QueryParams()["c"] 655 limit := getLimit(c, 50, 500, 1000) 656 var out ListCollectionsResponse 657 if hasQueryCollections { 658 out.Collections = make(map[string]uint64, len(collections)) 659 for _, collection := range collections { 660 count, ok := stats.CollectionCounts[collection] 661 if ok { 662 out.Collections[collection] = count 663 } 664 } 665 } else { 666 allCollections := make([]string, 0, len(stats.CollectionCounts)) 667 for collection := range stats.CollectionCounts { 668 allCollections = append(allCollections, collection) 669 } 670 sort.Strings(allCollections) 671 out.Collections = make(map[string]uint64, limit) 672 count := 0 673 for _, collection := range allCollections { 674 if (cursor == "") || (collection > cursor) { 675 if cs.hasBadword(collection) { 676 // don't show badwords in public list of collections 677 continue 678 } 679 if stats.CollectionCounts[collection] < cs.MinDidsForCollectionList { 680 // don't show experimental/spam collections only implemented by a few DIDs 681 continue 682 } 683 // TODO: probably regex based filter for collection-spam 684 out.Collections[collection] = stats.CollectionCounts[collection] 685 count++ 686 if count >= limit { 687 out.Cursor = collection 688 } 689 } 690 } 691 } 692 return c.JSON(http.StatusOK, out) 693} 694 695type ListCollectionsResponse struct { 696 Collections map[string]uint64 `json:"collections"` 697 Cursor string `json:"cursor"` 698} 699 700func (cs *collectionServer) ingestReceiver() { 701 defer cs.wg.Done() 702 defer cs.log.Info("ingestReceiver exit") 703 errcount := 0 704 for { 705 select { 706 case didc, ok := <-cs.ingestFirehose: 707 if !ok { 708 cs.log.Info("ingestFirehose closed") 709 return 710 } 711 err := cs.ingestDidc(didc, true) 712 if err != nil { 713 errcount++ 714 } else { 715 errcount = 0 716 } 717 case didc := <-cs.ingestCrawl: 718 err := cs.ingestDidc(didc, false) 719 if err != nil { 720 errcount++ 721 } else { 722 errcount = 0 723 } 724 case <-cs.shutdown: 725 cs.log.Info("shutting down ingestReceiver") 726 return 727 } 728 if errcount > 10 { 729 cs.log.Error("ingestReceiver too many errors") 730 return // TODO: cancel parent somehow 731 } 732 } 733} 734 735func (cs *collectionServer) ingestDidc(didc DidCollection, dau bool) error { 736 count, ok := cs.didCollectionCounts.Get(didc.Did) 737 var err error 738 if !ok { 739 count, err = cs.pcd.CountDidCollections(didc.Did) 740 if err != nil { 741 return fmt.Errorf("count did collections, %s %w", didc.Did, err) 742 } 743 cs.didCollectionCounts.Add(didc.Did, count) 744 } 745 if count >= cs.MaxDidCollections { 746 cs.log.Warn("did too many collections", "did", didc.Did) 747 return nil 748 } 749 err = cs.pcd.MaybeSetCollection(didc.Did, didc.Collection) 750 if err != nil { 751 cs.log.Warn("pcd write", "err", err) 752 return err 753 } 754 if dau && cs.dauDirectory != nil { 755 err = cs.maybeDauWrite(didc) 756 if err != nil { 757 cs.log.Warn("dau write", "err", err) 758 return err 759 } 760 } 761 return nil 762} 763 764func (cs *collectionServer) maybeDauWrite(didc DidCollection) error { 765 now := time.Now() 766 if now.After(cs.dauTomorrow) { 767 go dauStats(cs.dauDirectory, cs.dauDay, cs.dauDirectoryDir, cs.log) 768 cs.dauDirectory = nil 769 err := cs.openDau() 770 if err != nil { 771 return fmt.Errorf("dau reopen, %w", err) 772 } 773 } 774 return cs.dauDirectory.MaybeSetCollection(didc.Did, didc.Collection) 775} 776 777// write {dauDirectoryDir}/d{YYYY-MM-DD}.pebble stats summary to {dauDirectoryDir}/d{YYYY-MM-DD}.csv.gz 778func dauStats(oldDau *PebbleCollectionDirectory, dauDay time.Time, dauDir string, log *slog.Logger) { 779 fname := fmt.Sprintf("d%s.csv.gz", dauDay.Format("2006-01-02")) 780 outstatsPath := filepath.Join(dauDir, fname) 781 log = log.With("path", outstatsPath) 782 log.Info("DAU stats summarize") 783 stats, err := oldDau.GetCollectionStats() 784 e2 := oldDau.Close() 785 if e2 != nil { 786 log.Error("old DAU close", "err", e2) 787 } 788 if err != nil { 789 log.Error("old DAU stats", "err", err) 790 } else { 791 log.Info("DAU stats summarized", "rows", len(stats.CollectionCounts)) 792 pcdStatsToCsvGz(stats, outstatsPath, log) 793 } 794} 795 796func pcdStatsToCsvGz(stats CollectionStats, outpath string, log *slog.Logger) { 797 fout, err := os.Create(outpath) 798 if err != nil { 799 log.Error("DAU stats open", "err", err) 800 return 801 } 802 defer fout.Close() 803 gzout := gzip.NewWriter(fout) 804 defer gzout.Close() 805 csvout := csv.NewWriter(gzout) 806 defer csvout.Flush() 807 err = csvout.Write([]string{"collection", "count"}) 808 if err != nil { 809 log.Error("DAU stats header", "err", err) 810 return 811 } 812 var row [2]string 813 rowcount := 0 814 for collection, count := range stats.CollectionCounts { 815 row[0] = collection 816 row[1] = strconv.FormatUint(count, 10) 817 err = csvout.Write(row[:]) 818 if err != nil { 819 log.Error("DAU stats row", "err", err) 820 return 821 } 822 rowcount++ 823 } 824 log.Info("DAU stats ok", "rows", rowcount) 825} 826 827type CrawlRequest struct { 828 Host string `json:"hostname,omitempty"` 829 Hosts []string `json:"hosts,omitempty"` 830} 831 832type CrawlRequestResponse struct { 833 Message string `json:"message,omitempty"` 834 Error string `json:"error,omitempty"` 835} 836 837func hostOrUrlToUrl(host string) string { 838 xu, err := url.Parse(host) 839 if err != nil { 840 xu = new(url.URL) 841 xu.Host = host 842 xu.Scheme = "https" 843 return xu.String() 844 } else if xu.Scheme == "" { 845 xu.Scheme = "https" 846 return xu.String() 847 } 848 return host 849} 850 851func (cs *collectionServer) isAdmin(c echo.Context) bool { 852 authHeader := c.Request().Header.Get("Authorization") 853 if authHeader == "" { 854 return false 855 } 856 if authHeader == cs.ExepctedAuthHeader { 857 return true 858 } 859 cs.log.Info("wrong auth header", "header", authHeader, "expected", cs.ExepctedAuthHeader) 860 return false 861} 862 863// /admin/pds/requestCrawl 864// same API signature as relay admin requestCrawl 865// starts a crawl and returns. See /v1/crawlStatus 866// requires header `Authorization: Bearer {admin token}` 867// 868// POST {"hostname":"one hostname or URL", "hosts":["up to 1000 hosts", "..."]} 869// OR 870// POST /admin/pds/requestCrawl?hostname={one host} 871func (cs *collectionServer) crawlPds(c echo.Context) error { 872 isAdmin := cs.isAdmin(c) 873 if !isAdmin { 874 return c.JSON(http.StatusForbidden, xrpc.XRPCError{ErrStr: "AdminRequired", Message: "this endpoint requires admin auth"}) 875 } 876 hostQ := c.QueryParam("host") 877 if hostQ != "" { 878 go cs.crawlThread(hostQ) 879 return c.JSON(http.StatusOK, CrawlRequestResponse{Message: "ok"}) 880 } 881 882 var req CrawlRequest 883 err := c.Bind(&req) 884 if err != nil { 885 cs.log.Info("bad crawl bind", "err", err) 886 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("failed to parse body: %s", err)}) 887 } 888 if req.Host != "" { 889 go cs.crawlThread(req.Host) 890 } 891 for _, host := range req.Hosts { 892 go cs.crawlThread(host) 893 } 894 return c.JSON(http.StatusOK, CrawlRequestResponse{Message: "ok"}) 895} 896 897func (cs *collectionServer) crawlThread(hostIn string) { 898 host := hostOrUrlToUrl(hostIn) 899 if host != hostIn { 900 cs.log.Info("going to crawl", "in", hostIn, "as", host) 901 } 902 httpClient := http.Client{} 903 rpcClient := xrpc.Client{ 904 Host: host, 905 Client: &httpClient, 906 } 907 if cs.ratelimitHeader != "" { 908 rpcClient.Headers = map[string]string{ 909 "x-ratelimit-bypass": cs.ratelimitHeader, 910 } 911 } 912 crawler := Crawler{ 913 Ctx: cs.ctx, 914 RpcClient: &rpcClient, 915 QPS: cs.PerPDSCrawlQPS, 916 Results: cs.ingestCrawl, 917 Log: cs.log, 918 } 919 start := time.Now() 920 ok, crawlStats := cs.recordCrawlStart(host, start) 921 if !ok { 922 cs.log.Info("not crawling dup", "host", host) 923 return 924 } 925 crawler.Stats = crawlStats 926 cs.log.Info("crawling", "host", host) 927 err := crawler.CrawlPDSRepoCollections() 928 cs.clearActiveCrawl(host) 929 pdsCrawledCounter.Inc() 930 if err != nil { 931 cs.log.Warn("crawl err", "host", host, "err", err) 932 } else { 933 dt := time.Since(start) 934 cs.log.Info("crawl done", "host", host, "dt", dt) 935 } 936} 937 938// recordCrawlStart returns true if ok, false if duplicate 939func (cs *collectionServer) recordCrawlStart(host string, start time.Time) (ok bool, stats *CrawlStats) { 940 cs.activeCrawlsLock.Lock() 941 defer cs.activeCrawlsLock.Unlock() 942 if cs.activeCrawls == nil { 943 cs.activeCrawls = make(map[string]activeCrawl) 944 } else { 945 _, dup := cs.activeCrawls[host] 946 if dup { 947 return false, nil 948 } 949 } 950 stats = new(CrawlStats) 951 cs.activeCrawls[host] = activeCrawl{ 952 start: start, 953 stats: stats, 954 } 955 return true, stats 956} 957 958func (cs *collectionServer) clearActiveCrawl(host string) { 959 cs.activeCrawlsLock.Lock() 960 defer cs.activeCrawlsLock.Unlock() 961 if cs.activeCrawls == nil { 962 return 963 } 964 delete(cs.activeCrawls, host) 965} 966 967type CrawlStatusResponse struct { 968 HostCrawls map[string]HostCrawl `json:"host_starts"` 969 ServerTime string `json:"server_time"` 970} 971type HostCrawl struct { 972 Start string `json:"start"` 973 ReposDescribed uint32 `json:"seen"` 974} 975 976// GET /v1/crawlStatus 977func (cs *collectionServer) crawlStatus(c echo.Context) error { 978 authHeader := c.Request().Header.Get("Authorization") 979 if authHeader != cs.ExepctedAuthHeader { 980 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "AdminAuthRequired", Message: "this endpoint requires admin-level auth"}) 981 } 982 var out CrawlStatusResponse 983 out.HostCrawls = make(map[string]HostCrawl) 984 cs.activeCrawlsLock.Lock() 985 defer cs.activeCrawlsLock.Unlock() 986 for host, rec := range cs.activeCrawls { 987 start := rec.start 988 out.HostCrawls[host] = HostCrawl{ 989 Start: start.UTC().Format(time.RFC3339Nano), 990 ReposDescribed: rec.stats.ReposDescribed.Load(), 991 } 992 } 993 out.ServerTime = time.Now().UTC().Format(time.RFC3339Nano) 994 return c.JSON(http.StatusOK, out) 995} 996 997func (cs *collectionServer) healthz(c echo.Context) error { 998 // TODO: check database or upstream health? 999 return c.JSON(http.StatusOK, map[string]any{"status": "ok"}) 1000} 1001 1002func loadBadwords(path string) (*BadwordsRE, error) { 1003 fin, err := os.Open(path) 1004 if err != nil { 1005 return nil, fmt.Errorf("%s: could not open badwords, %w", path, err) 1006 } 1007 dec := json.NewDecoder(fin) 1008 var rules map[string][]string 1009 err = dec.Decode(&rules) 1010 if err != nil { 1011 return nil, fmt.Errorf("%s: badwords json, %w", path, err) 1012 } 1013 1014 // compile a regex to search a string for any instance of a bad word, because we're expecting things runpooptogether 1015 badwords := rules["worst-words"] 1016 rwords := make([]string, len(badwords)) 1017 for i, word := range badwords { 1018 rwords[i] = regexp.QuoteMeta(word) 1019 } 1020 reStr := strings.Join(rwords, "|") 1021 re, err := regexp.Compile(reStr) 1022 if err != nil { 1023 return nil, fmt.Errorf("%s: badwords regex, %w", path, err) 1024 } 1025 return &BadwordsRE{re: re}, nil 1026} 1027 1028type BadwordsRE struct { 1029 re *regexp.Regexp 1030} 1031 1032func (bw *BadwordsRE) HasBadword(s string) bool { 1033 // TODO: if this is too slow, try more specialized algorithm e.g. https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm 1034 return bw.re.FindString(s) != "" 1035}