1package main
2
3import (
4 "compress/gzip"
5 "context"
6 "encoding/csv"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "log/slog"
11 "net"
12 "net/http"
13 _ "net/http/pprof"
14 "net/url"
15 "os"
16 "os/signal"
17 "path/filepath"
18 "regexp"
19 "sort"
20 "strconv"
21 "strings"
22 "sync"
23 "syscall"
24 "time"
25
26 comatproto "github.com/bluesky-social/indigo/api/atproto"
27 "github.com/bluesky-social/indigo/atproto/syntax"
28 "github.com/bluesky-social/indigo/events"
29 "github.com/bluesky-social/indigo/util/svcutil"
30 "github.com/bluesky-social/indigo/xrpc"
31
32 "github.com/hashicorp/golang-lru/v2"
33 "github.com/labstack/echo/v4"
34 "github.com/labstack/echo/v4/middleware"
35 "github.com/prometheus/client_golang/prometheus/promhttp"
36 "github.com/urfave/cli/v2"
37)
38
39var serveCmd = &cli.Command{
40 Name: "serve",
41 Flags: []cli.Flag{
42 &cli.StringFlag{
43 Name: "api-listen",
44 Value: ":2510",
45 EnvVars: []string{"COLLECTIONS_API_LISTEN"},
46 },
47 &cli.StringFlag{
48 Name: "metrics-listen",
49 Value: ":2511",
50 EnvVars: []string{"COLLECTIONS_METRICS_LISTEN"},
51 },
52 &cli.StringFlag{
53 Name: "pebble",
54 Usage: "path to store pebble db",
55 Required: true,
56 },
57 &cli.StringFlag{
58 Name: "dau-directory",
59 Usage: "directory to store DAU pebble db",
60 Required: true,
61 },
62 &cli.StringFlag{
63 Name: "upstream",
64 Usage: "URL, e.g. wss://bsky.network",
65 EnvVars: []string{"COLLECTIONS_UPSTREAM"},
66 },
67 &cli.StringFlag{
68 Name: "admin-token",
69 Usage: "admin authentication",
70 EnvVars: []string{"COLLECTIONS_ADMIN_TOKEN"},
71 },
72 &cli.Float64Flag{
73 Name: "crawl-qps",
74 Usage: "per-PDS crawl queries-per-second limit",
75 Value: 100,
76 },
77 &cli.StringFlag{
78 Name: "ratelimit-header",
79 Usage: "secret for friend PDSes",
80 EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"},
81 },
82 &cli.Uint64Flag{
83 Name: "clist-min-dids",
84 Usage: "filter collection list to >= N dids",
85 Value: 5,
86 EnvVars: []string{"COLLECTIONS_CLIST_MIN_DIDS"},
87 },
88 &cli.IntFlag{
89 Name: "max-did-collections",
90 Usage: "stop recording new collections per did after it has >= this many collections",
91 Value: 1000,
92 EnvVars: []string{"COLLECTIONS_MAX_DID_COLLECTIONS"},
93 },
94 &cli.StringFlag{
95 Name: "sets-json-path",
96 Usage: "file path of JSON file containing static word sets",
97 EnvVars: []string{"HEPA_SETS_JSON_PATH", "COLLECTIONS_SETS_JSON_PATH"},
98 },
99 &cli.BoolFlag{
100 Name: "verbose",
101 },
102 },
103 Action: func(cctx *cli.Context) error {
104 var server collectionServer
105 return server.run(cctx)
106 },
107}
108
109type BadwordChecker interface {
110 HasBadword(string) bool
111}
112
113type collectionServer struct {
114 ctx context.Context
115
116 // the primary directory, all repos ever and their collections
117 pcd *PebbleCollectionDirectory
118
119 // daily-active-user directory, new directory every 00:00:00 UTC
120 dauDirectory *PebbleCollectionDirectory
121 dauDirectoryPath string // currently open dauDirectory, {dauDirectoryDir}/{YYYY}{mm}{dd}.pebble
122 dauDay time.Time // YYYY-MM-DD 00:00:00 UTC
123 dauTomorrow time.Time
124 dauDirectoryDir string
125
126 statsCache *CollectionStats
127 statsCacheWhen time.Time
128 statsCacheLock sync.Mutex
129 statsCacheFresh sync.Cond
130 statsCachePending bool
131
132 // (did,collection) pairs from firehose
133 ingestFirehose chan DidCollection
134 // (did,collection) pairs from PDS crawl (don't apply to dauDirectory)
135 ingestCrawl chan DidCollection
136
137 log *slog.Logger
138
139 AdminToken string
140 ExepctedAuthHeader string
141 PerPDSCrawlQPS float64
142
143 activeCrawls map[string]activeCrawl
144 activeCrawlsLock sync.Mutex
145
146 shutdown chan struct{}
147
148 wg sync.WaitGroup
149
150 ratelimitHeader string
151
152 apiServer *http.Server
153 metricsServer *http.Server
154
155 MinDidsForCollectionList uint64
156 MaxDidCollections int
157
158 didCollectionCounts *lru.Cache[string, int]
159
160 badwords BadwordChecker
161}
162
163type activeCrawl struct {
164 start time.Time
165 stats *CrawlStats
166}
167
168func (cs *collectionServer) run(cctx *cli.Context) error {
169 signals := make(chan os.Signal, 1)
170 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
171 cs.shutdown = make(chan struct{})
172 level := slog.LevelInfo
173 if cctx.Bool("verbose") {
174 level = slog.LevelDebug
175 }
176 log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level}))
177 slog.SetDefault(log)
178
179 if cctx.IsSet("ratelimit-header") {
180 cs.ratelimitHeader = cctx.String("ratelimit-header")
181 }
182 if cctx.IsSet("sets-json-path") {
183 badwords, err := loadBadwords(cctx.String("sets-json-path"))
184 if err != nil {
185 return err
186 }
187 cs.badwords = badwords
188 }
189 cs.MinDidsForCollectionList = cctx.Uint64("clist-min-dids")
190 cs.MaxDidCollections = cctx.Int("max-did-collections")
191 cs.ingestFirehose = make(chan DidCollection, 1000)
192 cs.ingestCrawl = make(chan DidCollection, 1000)
193 var err error
194 cs.didCollectionCounts, err = lru.New[string, int](1_000_000) // TODO: configurable LRU size
195 if err != nil {
196 return fmt.Errorf("lru init, %w", err)
197 }
198 cs.log = log
199 cs.ctx = cctx.Context
200 cs.AdminToken = cctx.String("admin-token")
201 cs.ExepctedAuthHeader = "Bearer " + cs.AdminToken
202 cs.wg.Add(1)
203 go cs.ingestReceiver()
204 pebblePath := cctx.String("pebble")
205 cs.pcd = &PebbleCollectionDirectory{
206 log: cs.log,
207 }
208 err = cs.pcd.Open(pebblePath)
209 if err != nil {
210 return fmt.Errorf("%s: failed to open pebble db: %w", pebblePath, err)
211 }
212 cs.dauDirectoryDir = cctx.String("dau-directory")
213 if cs.dauDirectoryDir != "" {
214 err := cs.openDau()
215 if err != nil {
216 return err
217 }
218 }
219 cs.statsCacheFresh.L = &cs.statsCacheLock
220
221 apiServerEcho, err := cs.createApiServer(cctx.Context, cctx.String("api-listen"))
222 if err != nil {
223 return err
224 }
225 cs.wg.Add(1)
226 go func() { cs.StartApiServer(cctx.Context, apiServerEcho) }()
227
228 cs.createMetricsServer(cctx.String("metrics-listen"))
229 cs.wg.Add(1)
230 go func() { cs.StartMetricsServer(cctx.Context) }()
231
232 upstream := cctx.String("upstream")
233 if upstream != "" {
234 fh := Firehose{
235 Log: log,
236 Host: upstream,
237 Seq: -1,
238 }
239 seq, seqok, err := cs.pcd.GetSequence()
240 if err != nil {
241 cs.log.Warn("db get seq", "err", err)
242 } else if seqok {
243 fh.Seq = seq
244 }
245 fhevents := make(chan *events.XRPCStreamEvent, 1000)
246 cs.wg.Add(1)
247 go cs.firehoseThread(&fh, fhevents)
248 cs.wg.Add(1)
249 go cs.handleFirehose(fhevents)
250 }
251
252 <-signals
253 log.Info("received shutdown signal")
254 return cs.Shutdown()
255}
256
257func (cs *collectionServer) openDau() error {
258 now := time.Now().UTC()
259 ymd := now.Format("2006-01-02")
260 fname := fmt.Sprintf("d%s.pebble", ymd)
261 fpath := filepath.Join(cs.dauDirectoryDir, fname)
262 daud := &PebbleCollectionDirectory{
263 log: cs.log,
264 }
265 err := daud.Open(fpath)
266 if err != nil {
267 return fmt.Errorf("%s: failed to open dau pebble db: %w", fpath, err)
268 }
269 cs.dauDirectory = daud
270 cs.dauDirectoryPath = fpath
271 cs.dauDay = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
272 cs.dauTomorrow = cs.dauDay.AddDate(0, 0, 1)
273 cs.log.Info("DAU db opened", "path", fpath)
274 return nil
275}
276
277func (cs *collectionServer) Shutdown() error {
278 close(cs.shutdown)
279
280 func() {
281 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
282 defer cancel()
283
284 cs.log.Info("metrics shutdown start")
285 sherr := cs.metricsServer.Shutdown(ctx)
286 cs.log.Info("metrics shutdown", "err", sherr)
287 }()
288
289 func() {
290 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
291 defer cancel()
292
293 cs.log.Info("api shutdown start...")
294 err := cs.apiServer.Shutdown(ctx)
295 cs.log.Info("api shutdown, thread wait...", "err", err)
296 }()
297
298 cs.log.Info("threads done, db close...")
299 err := cs.pcd.Close()
300 if err != nil {
301 cs.log.Error("failed to shutdown pebble", "err", err)
302 }
303 cs.log.Info("db done. done.")
304 cs.wg.Wait()
305 return err
306}
307
308// firehoseThreads is responsible for connecting to upstream firehose source
309func (cs *collectionServer) firehoseThread(fh *Firehose, fhevents chan<- *events.XRPCStreamEvent) {
310 defer cs.wg.Done()
311 defer cs.log.Info("firehoseThread exit")
312 ctx, cancel := context.WithCancel(cs.ctx)
313 go func() {
314 <-cs.shutdown
315 cancel()
316 }()
317 err := fh.subscribeWithRedialer(ctx, fhevents)
318 if err != nil {
319 cs.log.Error("failed to subscribe to redialer", "err", err)
320 }
321 if fh.Seq >= 0 {
322 err := cs.pcd.SetSequence(fh.Seq)
323 if err != nil {
324 cs.log.Warn("db set seq", "err", err)
325 }
326 }
327}
328
329// handleFirehose consumes XRPCStreamEvent from firehoseThread(), further parses data and applies
330func (cs *collectionServer) handleFirehose(fhevents <-chan *events.XRPCStreamEvent) {
331 defer cs.wg.Done()
332 defer cs.log.Info("handleFirehose exit")
333 defer close(cs.ingestFirehose)
334 var lastSeq int64
335 lastSeqSet := false
336 notDone := true
337 for notDone {
338 select {
339 case <-cs.shutdown:
340 cs.log.Info("firehose handler shutdown")
341 notDone = false
342 case evt, ok := <-fhevents:
343 if !ok {
344 notDone = false
345 cs.log.Info("firehose handler closed")
346 break
347 }
348 firehoseReceivedCounter.Inc()
349 seq, ok := evt.GetSequence()
350 if ok {
351 lastSeq = seq
352 lastSeqSet = true
353 }
354 if evt.RepoCommit != nil {
355 firehoseCommits.Inc()
356 cs.handleCommit(evt.RepoCommit)
357 }
358 }
359 }
360 if lastSeqSet {
361 cs.pcd.SetSequence(lastSeq)
362 }
363}
364
365func (cs *collectionServer) handleCommit(commit *comatproto.SyncSubscribeRepos_Commit) {
366 for _, op := range commit.Ops {
367 // op.Path is collection/rkey
368 nsid, _, err := syntax.ParseRepoPath(op.Path)
369 if err != nil {
370 cs.log.Warn("bad op path", "repo", commit.Repo, "err", err)
371 return
372 }
373 firehoseCommitOps.WithLabelValues(op.Action).Inc()
374 if op.Action == "create" || op.Action == "update" {
375 firehoseDidcSet.Inc()
376 cs.ingestFirehose <- DidCollection{
377 Did: commit.Repo,
378 Collection: nsid.String(),
379 }
380 }
381 }
382}
383
384func (cs *collectionServer) createMetricsServer(addr string) {
385 e := echo.New()
386 e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
387 e.Any("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux))
388
389 cs.metricsServer = &http.Server{
390 Addr: addr,
391 Handler: e,
392 }
393}
394
395func (cs *collectionServer) StartMetricsServer(ctx context.Context) {
396 defer cs.wg.Done()
397 defer cs.log.Info("metrics server exit")
398
399 err := cs.metricsServer.ListenAndServe()
400 if err != nil && !errors.Is(err, http.ErrServerClosed) {
401 slog.Error("error in metrics server", "err", err)
402 os.Exit(1)
403 }
404}
405
406func (cs *collectionServer) createApiServer(ctx context.Context, addr string) (*echo.Echo, error) {
407 var lc net.ListenConfig
408 li, err := lc.Listen(ctx, "tcp", addr)
409 if err != nil {
410 return nil, err
411 }
412 e := echo.New()
413 e.HideBanner = true
414
415 e.Use(svcutil.MetricsMiddleware)
416 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
417 AllowOrigins: []string{"*"},
418 AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization},
419 }))
420
421 e.GET("/_health", cs.healthz)
422
423 e.GET("/xrpc/com.atproto.sync.listReposByCollection", cs.getDidsForCollection)
424 e.GET("/v1/getDidsForCollection", cs.getDidsForCollection)
425 e.GET("/v1/listCollections", cs.listCollections)
426
427 // TODO: allow public 'requestCrawl' API?
428 //e.GET("/xrpc/com.atproto.sync.requestCrawl", cs.crawlPds)
429 //e.POST("/xrpc/com.atproto.sync.requestCrawl", cs.crawlPds)
430
431 // admin auth heador required
432 e.POST("/admin/pds/requestCrawl", cs.crawlPds) // same as relay
433 e.GET("/admin/crawlStatus", cs.crawlStatus)
434
435 e.Listener = li
436 srv := &http.Server{
437 Handler: e,
438 }
439 cs.apiServer = srv
440 return e, nil
441}
442
443func (cs *collectionServer) StartApiServer(ctx context.Context, e *echo.Echo) {
444 defer cs.wg.Done()
445 defer cs.log.Info("api server exit")
446 err := cs.apiServer.Serve(e.Listener)
447 if err != nil && !errors.Is(err, http.ErrServerClosed) {
448 slog.Error("error in api server", "err", err)
449 os.Exit(1)
450 }
451}
452
453const statsCacheDuration = time.Second * 300
454
455func getLimit(c echo.Context, min, defaultLim, max int) int {
456 limstr := c.QueryParam("limit")
457 if limstr == "" {
458 return defaultLim
459 }
460 lvx, err := strconv.ParseInt(limstr, 10, 64)
461 if err != nil {
462 return defaultLim
463 }
464 lv := int(lvx)
465 if lv < min {
466 return min
467 }
468 if lv > max {
469 return max
470 }
471 return lv
472}
473
474// /xrpc/com.atproto.sync.listReposByCollection?collection={}&cursor={}&limit={50<=N<=1000}
475// /v1/getDidsForCollection?collection={}&cursor={}&limit={50<=N<=1000}
476//
477// returns
478// {"dids":["did:A", "..."], "cursor":"opaque text"}
479func (cs *collectionServer) getDidsForCollection(c echo.Context) error {
480 ctx := c.Request().Context()
481 collection := c.QueryParam("collection")
482 _, err := syntax.ParseNSID(collection)
483 if err != nil {
484 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("bad collection nsid, %s", err.Error())})
485 }
486 cursor := c.QueryParam("cursor")
487 limit := getLimit(c, 1, 500, 10_000)
488 they, nextCursor, err := cs.pcd.ReadCollection(ctx, collection, cursor, limit)
489 if err != nil {
490 slog.Error("ReadCollection", "collection", collection, "cursor", cursor, "limit", limit, "err", err)
491 return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to read DIDs for collection"})
492 }
493 cs.log.Info("getDidsForCollection", "collection", collection, "cursor", cursor, "limit", limit, "count", len(they), "nextCursor", nextCursor)
494 var out comatproto.SyncListReposByCollection_Output
495 out.Repos = make([]*comatproto.SyncListReposByCollection_Repo, len(they))
496 for i, rec := range they {
497 out.Repos[i] = &comatproto.SyncListReposByCollection_Repo{Did: rec.Did}
498 }
499 if nextCursor != "" {
500 out.Cursor = &nextCursor
501 }
502 return c.JSON(http.StatusOK, out)
503}
504
505// return cached collection stats if they're fresh
506// return new collection stats if they can be calculated quickly
507// return stale cached collection stats if new stats take too long
508// just wait for fresh stats if there are no cached stats
509// stalenessAllowed is how old stats can be before we try to recalculate them, 0=default of 5 minutes
510func (cs *collectionServer) getStatsCache(stalenessAllowed time.Duration) (*CollectionStats, error) {
511 if stalenessAllowed <= 0 {
512 stalenessAllowed = statsCacheDuration
513 }
514 var statsCache *CollectionStats
515 var staleCache *CollectionStats
516 var waiter *freshStatsWaiter
517 cs.statsCacheLock.Lock()
518 if cs.statsCache != nil {
519 if time.Since(cs.statsCacheWhen) < stalenessAllowed {
520 // has fresh!
521 statsCache = cs.statsCache
522 } else if !cs.statsCachePending {
523 cs.statsCachePending = true
524 go cs.statsBuilder()
525 staleCache = cs.statsCache
526 } else {
527 staleCache = cs.statsCache
528 }
529 if staleCache != nil {
530 waiter = &freshStatsWaiter{
531 cs: cs,
532 freshCache: make(chan *CollectionStats),
533 }
534 go waiter.waiter()
535 }
536 } else if !cs.statsCachePending {
537 cs.statsCachePending = true
538 go cs.statsBuilder()
539 }
540 cs.statsCacheLock.Unlock()
541
542 if statsCache != nil {
543 // return fresh-enough data
544 return statsCache, nil
545 }
546
547 if staleCache == nil {
548 // block forever waiting for fresh data
549 cs.statsCacheLock.Lock()
550 for cs.statsCache == nil {
551 cs.statsCacheFresh.Wait()
552 }
553 statsCache = cs.statsCache
554 cs.statsCacheLock.Unlock()
555 return statsCache, nil
556 }
557
558 // wait for up to a second for fresh data, on timeout return stale data
559 timeout := time.NewTimer(time.Second)
560 defer timeout.Stop()
561 select {
562 case <-timeout.C:
563 cs.statsCacheLock.Lock()
564 waiter.l.Lock()
565 waiter.obsolete = true
566 waiter.l.Unlock()
567 cs.statsCacheLock.Unlock()
568 return staleCache, nil
569 case statsCache = <-waiter.freshCache:
570 return statsCache, nil
571 }
572}
573
574type freshStatsWaiter struct {
575 cs *collectionServer
576 l sync.Mutex
577 obsolete bool
578 freshCache chan *CollectionStats
579}
580
581func (fsw *freshStatsWaiter) waiter() {
582 fsw.cs.statsCacheLock.Lock()
583 defer fsw.cs.statsCacheLock.Unlock()
584 fsw.cs.statsCacheFresh.Wait()
585 fsw.l.Lock()
586 defer fsw.l.Unlock()
587 if fsw.obsolete {
588 close(fsw.freshCache)
589 } else {
590 fsw.freshCache <- fsw.cs.statsCache
591 }
592}
593
594func (cs *collectionServer) statsBuilder() {
595 for {
596 start := time.Now()
597 stats, err := cs.pcd.GetCollectionStats()
598 dt := time.Since(start)
599 if err == nil {
600 statsCalculations.Observe(dt.Seconds())
601 countsum := uint64(0)
602 for _, v := range stats.CollectionCounts {
603 countsum += v
604 }
605 cs.log.Info("stats built", "dt", dt, "total", countsum)
606 cs.statsCacheLock.Lock()
607 cs.statsCache = &stats
608 cs.statsCacheWhen = time.Now()
609 cs.statsCacheFresh.Broadcast()
610 cs.statsCachePending = false
611 cs.statsCacheLock.Unlock()
612 return
613 } else {
614 cs.log.Error("GetCollectionStats", "dt", dt, "err", err)
615 time.Sleep(2 * time.Second)
616 }
617 }
618}
619
620func (cs *collectionServer) hasBadword(collection string) bool {
621 if cs.badwords != nil {
622 return cs.badwords.HasBadword(collection)
623 }
624 return false
625}
626
627// /v1/listCollections?c={}&cursor={}&limit={50<=limit<=1000}
628//
629// admin may set ?stalesec={} for a maximum number of seconds stale data is accepted
630//
631// returns
632// {"collections":{"app.bsky.feed.post": 123456789, "some collection": 42}, "cursor":"opaque text"}
633func (cs *collectionServer) listCollections(c echo.Context) error {
634 stalenessAllowed := statsCacheDuration
635 stalesecStr := c.QueryParam("stalesec")
636 if stalesecStr != "" && cs.isAdmin(c) {
637 stalesec, err := strconv.ParseInt(stalesecStr, 10, 64)
638 if err != nil {
639 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "invalid 'stalesec' query parameter"})
640 }
641 if stalesec == 0 {
642 stalenessAllowed = 1
643 } else {
644 stalenessAllowed = time.Duration(stalesec) * time.Second
645 }
646 cs.log.Info("stalesec", "q", stalesecStr, "d", stalenessAllowed)
647 }
648 stats, err := cs.getStatsCache(stalenessAllowed)
649 if err != nil {
650 slog.Error("getStatsCache", "err", err)
651 return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to read stats"})
652 }
653 cursor := c.QueryParam("cursor")
654 collections, hasQueryCollections := c.QueryParams()["c"]
655 limit := getLimit(c, 50, 500, 1000)
656 var out ListCollectionsResponse
657 if hasQueryCollections {
658 out.Collections = make(map[string]uint64, len(collections))
659 for _, collection := range collections {
660 count, ok := stats.CollectionCounts[collection]
661 if ok {
662 out.Collections[collection] = count
663 }
664 }
665 } else {
666 allCollections := make([]string, 0, len(stats.CollectionCounts))
667 for collection := range stats.CollectionCounts {
668 allCollections = append(allCollections, collection)
669 }
670 sort.Strings(allCollections)
671 out.Collections = make(map[string]uint64, limit)
672 count := 0
673 for _, collection := range allCollections {
674 if (cursor == "") || (collection > cursor) {
675 if cs.hasBadword(collection) {
676 // don't show badwords in public list of collections
677 continue
678 }
679 if stats.CollectionCounts[collection] < cs.MinDidsForCollectionList {
680 // don't show experimental/spam collections only implemented by a few DIDs
681 continue
682 }
683 // TODO: probably regex based filter for collection-spam
684 out.Collections[collection] = stats.CollectionCounts[collection]
685 count++
686 if count >= limit {
687 out.Cursor = collection
688 }
689 }
690 }
691 }
692 return c.JSON(http.StatusOK, out)
693}
694
695type ListCollectionsResponse struct {
696 Collections map[string]uint64 `json:"collections"`
697 Cursor string `json:"cursor"`
698}
699
700func (cs *collectionServer) ingestReceiver() {
701 defer cs.wg.Done()
702 defer cs.log.Info("ingestReceiver exit")
703 errcount := 0
704 for {
705 select {
706 case didc, ok := <-cs.ingestFirehose:
707 if !ok {
708 cs.log.Info("ingestFirehose closed")
709 return
710 }
711 err := cs.ingestDidc(didc, true)
712 if err != nil {
713 errcount++
714 } else {
715 errcount = 0
716 }
717 case didc := <-cs.ingestCrawl:
718 err := cs.ingestDidc(didc, false)
719 if err != nil {
720 errcount++
721 } else {
722 errcount = 0
723 }
724 case <-cs.shutdown:
725 cs.log.Info("shutting down ingestReceiver")
726 return
727 }
728 if errcount > 10 {
729 cs.log.Error("ingestReceiver too many errors")
730 return // TODO: cancel parent somehow
731 }
732 }
733}
734
735func (cs *collectionServer) ingestDidc(didc DidCollection, dau bool) error {
736 count, ok := cs.didCollectionCounts.Get(didc.Did)
737 var err error
738 if !ok {
739 count, err = cs.pcd.CountDidCollections(didc.Did)
740 if err != nil {
741 return fmt.Errorf("count did collections, %s %w", didc.Did, err)
742 }
743 cs.didCollectionCounts.Add(didc.Did, count)
744 }
745 if count >= cs.MaxDidCollections {
746 cs.log.Warn("did too many collections", "did", didc.Did)
747 return nil
748 }
749 err = cs.pcd.MaybeSetCollection(didc.Did, didc.Collection)
750 if err != nil {
751 cs.log.Warn("pcd write", "err", err)
752 return err
753 }
754 if dau && cs.dauDirectory != nil {
755 err = cs.maybeDauWrite(didc)
756 if err != nil {
757 cs.log.Warn("dau write", "err", err)
758 return err
759 }
760 }
761 return nil
762}
763
764func (cs *collectionServer) maybeDauWrite(didc DidCollection) error {
765 now := time.Now()
766 if now.After(cs.dauTomorrow) {
767 go dauStats(cs.dauDirectory, cs.dauDay, cs.dauDirectoryDir, cs.log)
768 cs.dauDirectory = nil
769 err := cs.openDau()
770 if err != nil {
771 return fmt.Errorf("dau reopen, %w", err)
772 }
773 }
774 return cs.dauDirectory.MaybeSetCollection(didc.Did, didc.Collection)
775}
776
777// write {dauDirectoryDir}/d{YYYY-MM-DD}.pebble stats summary to {dauDirectoryDir}/d{YYYY-MM-DD}.csv.gz
778func dauStats(oldDau *PebbleCollectionDirectory, dauDay time.Time, dauDir string, log *slog.Logger) {
779 fname := fmt.Sprintf("d%s.csv.gz", dauDay.Format("2006-01-02"))
780 outstatsPath := filepath.Join(dauDir, fname)
781 log = log.With("path", outstatsPath)
782 log.Info("DAU stats summarize")
783 stats, err := oldDau.GetCollectionStats()
784 e2 := oldDau.Close()
785 if e2 != nil {
786 log.Error("old DAU close", "err", e2)
787 }
788 if err != nil {
789 log.Error("old DAU stats", "err", err)
790 } else {
791 log.Info("DAU stats summarized", "rows", len(stats.CollectionCounts))
792 pcdStatsToCsvGz(stats, outstatsPath, log)
793 }
794}
795
796func pcdStatsToCsvGz(stats CollectionStats, outpath string, log *slog.Logger) {
797 fout, err := os.Create(outpath)
798 if err != nil {
799 log.Error("DAU stats open", "err", err)
800 return
801 }
802 defer fout.Close()
803 gzout := gzip.NewWriter(fout)
804 defer gzout.Close()
805 csvout := csv.NewWriter(gzout)
806 defer csvout.Flush()
807 err = csvout.Write([]string{"collection", "count"})
808 if err != nil {
809 log.Error("DAU stats header", "err", err)
810 return
811 }
812 var row [2]string
813 rowcount := 0
814 for collection, count := range stats.CollectionCounts {
815 row[0] = collection
816 row[1] = strconv.FormatUint(count, 10)
817 err = csvout.Write(row[:])
818 if err != nil {
819 log.Error("DAU stats row", "err", err)
820 return
821 }
822 rowcount++
823 }
824 log.Info("DAU stats ok", "rows", rowcount)
825}
826
827type CrawlRequest struct {
828 Host string `json:"hostname,omitempty"`
829 Hosts []string `json:"hosts,omitempty"`
830}
831
832type CrawlRequestResponse struct {
833 Message string `json:"message,omitempty"`
834 Error string `json:"error,omitempty"`
835}
836
837func hostOrUrlToUrl(host string) string {
838 xu, err := url.Parse(host)
839 if err != nil {
840 xu = new(url.URL)
841 xu.Host = host
842 xu.Scheme = "https"
843 return xu.String()
844 } else if xu.Scheme == "" {
845 xu.Scheme = "https"
846 return xu.String()
847 }
848 return host
849}
850
851func (cs *collectionServer) isAdmin(c echo.Context) bool {
852 authHeader := c.Request().Header.Get("Authorization")
853 if authHeader == "" {
854 return false
855 }
856 if authHeader == cs.ExepctedAuthHeader {
857 return true
858 }
859 cs.log.Info("wrong auth header", "header", authHeader, "expected", cs.ExepctedAuthHeader)
860 return false
861}
862
863// /admin/pds/requestCrawl
864// same API signature as relay admin requestCrawl
865// starts a crawl and returns. See /v1/crawlStatus
866// requires header `Authorization: Bearer {admin token}`
867//
868// POST {"hostname":"one hostname or URL", "hosts":["up to 1000 hosts", "..."]}
869// OR
870// POST /admin/pds/requestCrawl?hostname={one host}
871func (cs *collectionServer) crawlPds(c echo.Context) error {
872 isAdmin := cs.isAdmin(c)
873 if !isAdmin {
874 return c.JSON(http.StatusForbidden, xrpc.XRPCError{ErrStr: "AdminRequired", Message: "this endpoint requires admin auth"})
875 }
876 hostQ := c.QueryParam("host")
877 if hostQ != "" {
878 go cs.crawlThread(hostQ)
879 return c.JSON(http.StatusOK, CrawlRequestResponse{Message: "ok"})
880 }
881
882 var req CrawlRequest
883 err := c.Bind(&req)
884 if err != nil {
885 cs.log.Info("bad crawl bind", "err", err)
886 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("failed to parse body: %s", err)})
887 }
888 if req.Host != "" {
889 go cs.crawlThread(req.Host)
890 }
891 for _, host := range req.Hosts {
892 go cs.crawlThread(host)
893 }
894 return c.JSON(http.StatusOK, CrawlRequestResponse{Message: "ok"})
895}
896
897func (cs *collectionServer) crawlThread(hostIn string) {
898 host := hostOrUrlToUrl(hostIn)
899 if host != hostIn {
900 cs.log.Info("going to crawl", "in", hostIn, "as", host)
901 }
902 httpClient := http.Client{}
903 rpcClient := xrpc.Client{
904 Host: host,
905 Client: &httpClient,
906 }
907 if cs.ratelimitHeader != "" {
908 rpcClient.Headers = map[string]string{
909 "x-ratelimit-bypass": cs.ratelimitHeader,
910 }
911 }
912 crawler := Crawler{
913 Ctx: cs.ctx,
914 RpcClient: &rpcClient,
915 QPS: cs.PerPDSCrawlQPS,
916 Results: cs.ingestCrawl,
917 Log: cs.log,
918 }
919 start := time.Now()
920 ok, crawlStats := cs.recordCrawlStart(host, start)
921 if !ok {
922 cs.log.Info("not crawling dup", "host", host)
923 return
924 }
925 crawler.Stats = crawlStats
926 cs.log.Info("crawling", "host", host)
927 err := crawler.CrawlPDSRepoCollections()
928 cs.clearActiveCrawl(host)
929 pdsCrawledCounter.Inc()
930 if err != nil {
931 cs.log.Warn("crawl err", "host", host, "err", err)
932 } else {
933 dt := time.Since(start)
934 cs.log.Info("crawl done", "host", host, "dt", dt)
935 }
936}
937
938// recordCrawlStart returns true if ok, false if duplicate
939func (cs *collectionServer) recordCrawlStart(host string, start time.Time) (ok bool, stats *CrawlStats) {
940 cs.activeCrawlsLock.Lock()
941 defer cs.activeCrawlsLock.Unlock()
942 if cs.activeCrawls == nil {
943 cs.activeCrawls = make(map[string]activeCrawl)
944 } else {
945 _, dup := cs.activeCrawls[host]
946 if dup {
947 return false, nil
948 }
949 }
950 stats = new(CrawlStats)
951 cs.activeCrawls[host] = activeCrawl{
952 start: start,
953 stats: stats,
954 }
955 return true, stats
956}
957
958func (cs *collectionServer) clearActiveCrawl(host string) {
959 cs.activeCrawlsLock.Lock()
960 defer cs.activeCrawlsLock.Unlock()
961 if cs.activeCrawls == nil {
962 return
963 }
964 delete(cs.activeCrawls, host)
965}
966
967type CrawlStatusResponse struct {
968 HostCrawls map[string]HostCrawl `json:"host_starts"`
969 ServerTime string `json:"server_time"`
970}
971type HostCrawl struct {
972 Start string `json:"start"`
973 ReposDescribed uint32 `json:"seen"`
974}
975
976// GET /v1/crawlStatus
977func (cs *collectionServer) crawlStatus(c echo.Context) error {
978 authHeader := c.Request().Header.Get("Authorization")
979 if authHeader != cs.ExepctedAuthHeader {
980 return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "AdminAuthRequired", Message: "this endpoint requires admin-level auth"})
981 }
982 var out CrawlStatusResponse
983 out.HostCrawls = make(map[string]HostCrawl)
984 cs.activeCrawlsLock.Lock()
985 defer cs.activeCrawlsLock.Unlock()
986 for host, rec := range cs.activeCrawls {
987 start := rec.start
988 out.HostCrawls[host] = HostCrawl{
989 Start: start.UTC().Format(time.RFC3339Nano),
990 ReposDescribed: rec.stats.ReposDescribed.Load(),
991 }
992 }
993 out.ServerTime = time.Now().UTC().Format(time.RFC3339Nano)
994 return c.JSON(http.StatusOK, out)
995}
996
997func (cs *collectionServer) healthz(c echo.Context) error {
998 // TODO: check database or upstream health?
999 return c.JSON(http.StatusOK, map[string]any{"status": "ok"})
1000}
1001
1002func loadBadwords(path string) (*BadwordsRE, error) {
1003 fin, err := os.Open(path)
1004 if err != nil {
1005 return nil, fmt.Errorf("%s: could not open badwords, %w", path, err)
1006 }
1007 dec := json.NewDecoder(fin)
1008 var rules map[string][]string
1009 err = dec.Decode(&rules)
1010 if err != nil {
1011 return nil, fmt.Errorf("%s: badwords json, %w", path, err)
1012 }
1013
1014 // compile a regex to search a string for any instance of a bad word, because we're expecting things runpooptogether
1015 badwords := rules["worst-words"]
1016 rwords := make([]string, len(badwords))
1017 for i, word := range badwords {
1018 rwords[i] = regexp.QuoteMeta(word)
1019 }
1020 reStr := strings.Join(rwords, "|")
1021 re, err := regexp.Compile(reStr)
1022 if err != nil {
1023 return nil, fmt.Errorf("%s: badwords regex, %w", path, err)
1024 }
1025 return &BadwordsRE{re: re}, nil
1026}
1027
1028type BadwordsRE struct {
1029 re *regexp.Regexp
1030}
1031
1032func (bw *BadwordsRE) HasBadword(s string) bool {
1033 // TODO: if this is too slow, try more specialized algorithm e.g. https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm
1034 return bw.re.FindString(s) != ""
1035}