1package bgs
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net"
10 "net/http"
11 _ "net/http/pprof"
12 "net/url"
13 "strconv"
14 "strings"
15 "sync"
16 "time"
17
18 atproto "github.com/bluesky-social/indigo/api/atproto"
19 comatproto "github.com/bluesky-social/indigo/api/atproto"
20 "github.com/bluesky-social/indigo/carstore"
21 "github.com/bluesky-social/indigo/did"
22 "github.com/bluesky-social/indigo/events"
23 "github.com/bluesky-social/indigo/handles"
24 "github.com/bluesky-social/indigo/indexer"
25 "github.com/bluesky-social/indigo/models"
26 "github.com/bluesky-social/indigo/repomgr"
27 "github.com/bluesky-social/indigo/util/svcutil"
28 "github.com/bluesky-social/indigo/xrpc"
29 lru "github.com/hashicorp/golang-lru/v2"
30 "golang.org/x/sync/semaphore"
31 "golang.org/x/time/rate"
32
33 "github.com/gorilla/websocket"
34 ipld "github.com/ipfs/go-ipld-format"
35 "github.com/labstack/echo/v4"
36 "github.com/labstack/echo/v4/middleware"
37 promclient "github.com/prometheus/client_golang/prometheus"
38 "github.com/prometheus/client_golang/prometheus/promhttp"
39 dto "github.com/prometheus/client_model/go"
40 "go.opentelemetry.io/otel"
41 "go.opentelemetry.io/otel/attribute"
42 "gorm.io/gorm"
43)
44
45var tracer = otel.Tracer("bgs")
46
47// serverListenerBootTimeout is how long to wait for the requested server socket
48// to become available for use. This is an arbitrary timeout that should be safe
49// on any platform, but there's no great way to weave this timeout without
50// adding another parameter to the (at time of writing) long signature of
51// NewServer.
52const serverListenerBootTimeout = 5 * time.Second
53
54type BGS struct {
55 Index *indexer.Indexer
56 db *gorm.DB
57 slurper *Slurper
58 events *events.EventManager
59 didr did.Resolver
60 repoFetcher *indexer.RepoFetcher
61
62 hr handles.HandleResolver
63
64 // TODO: work on doing away with this flag in favor of more pluggable
65 // pieces that abstract the need for explicit ssl checks
66 ssl bool
67
68 crawlOnly bool
69
70 // TODO: at some point we will want to lock specific DIDs, this lock as is
71 // is overly broad, but i dont expect it to be a bottleneck for now
72 extUserLk sync.Mutex
73
74 repoman *repomgr.RepoManager
75
76 // Management of Socket Consumers
77 consumersLk sync.RWMutex
78 nextConsumerID uint64
79 consumers map[uint64]*SocketConsumer
80
81 // Management of Resyncs
82 pdsResyncsLk sync.RWMutex
83 pdsResyncs map[uint]*PDSResync
84
85 // Management of Compaction
86 compactor *Compactor
87
88 // User cache
89 userCache *lru.Cache[string, *User]
90
91 // nextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
92 nextCrawlers []*url.URL
93 httpClient http.Client
94
95 log *slog.Logger
96}
97
98type PDSResync struct {
99 PDS models.PDS `json:"pds"`
100 NumRepoPages int `json:"numRepoPages"`
101 NumRepos int `json:"numRepos"`
102 NumReposChecked int `json:"numReposChecked"`
103 NumReposToResync int `json:"numReposToResync"`
104 Status string `json:"status"`
105 StatusChangedAt time.Time `json:"statusChangedAt"`
106}
107
108type SocketConsumer struct {
109 UserAgent string
110 RemoteAddr string
111 ConnectedAt time.Time
112 EventsSent promclient.Counter
113}
114
115type BGSConfig struct {
116 SSL bool
117 CompactInterval time.Duration
118 DefaultRepoLimit int64
119 ConcurrencyPerPDS int64
120 MaxQueuePerPDS int64
121 NumCompactionWorkers int
122
123 // NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
124 NextCrawlers []*url.URL
125}
126
127func DefaultBGSConfig() *BGSConfig {
128 return &BGSConfig{
129 SSL: true,
130 CompactInterval: 4 * time.Hour,
131 DefaultRepoLimit: 100,
132 ConcurrencyPerPDS: 100,
133 MaxQueuePerPDS: 1_000,
134 NumCompactionWorkers: 2,
135 }
136}
137
138func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr handles.HandleResolver, config *BGSConfig) (*BGS, error) {
139
140 if config == nil {
141 config = DefaultBGSConfig()
142 }
143 db.AutoMigrate(User{})
144 db.AutoMigrate(AuthToken{})
145 db.AutoMigrate(models.PDS{})
146 db.AutoMigrate(models.DomainBan{})
147
148 uc, _ := lru.New[string, *User](1_000_000)
149
150 bgs := &BGS{
151 Index: ix,
152 db: db,
153 repoFetcher: rf,
154
155 hr: hr,
156 repoman: repoman,
157 events: evtman,
158 didr: didr,
159 ssl: config.SSL,
160
161 consumersLk: sync.RWMutex{},
162 consumers: make(map[uint64]*SocketConsumer),
163
164 pdsResyncs: make(map[uint]*PDSResync),
165
166 userCache: uc,
167
168 log: slog.Default().With("system", "bgs"),
169 }
170
171 ix.CreateExternalUser = bgs.createExternalUser
172 slOpts := DefaultSlurperOptions()
173 slOpts.SSL = config.SSL
174 slOpts.DefaultRepoLimit = config.DefaultRepoLimit
175 slOpts.ConcurrencyPerPDS = config.ConcurrencyPerPDS
176 slOpts.MaxQueuePerPDS = config.MaxQueuePerPDS
177 s, err := NewSlurper(db, bgs.handleFedEvent, slOpts)
178 if err != nil {
179 return nil, err
180 }
181
182 bgs.slurper = s
183
184 if err := bgs.slurper.RestartAll(); err != nil {
185 return nil, err
186 }
187
188 cOpts := DefaultCompactorOptions()
189 cOpts.NumWorkers = config.NumCompactionWorkers
190 compactor := NewCompactor(cOpts)
191 compactor.requeueInterval = config.CompactInterval
192 compactor.Start(bgs)
193 bgs.compactor = compactor
194
195 bgs.nextCrawlers = config.NextCrawlers
196 bgs.httpClient.Timeout = time.Second * 5
197
198 return bgs, nil
199}
200
201func (bgs *BGS) StartMetrics(listen string) error {
202 http.Handle("/metrics", promhttp.Handler())
203 return http.ListenAndServe(listen, nil)
204}
205
206func (bgs *BGS) Start(addr string) error {
207 var lc net.ListenConfig
208 ctx, cancel := context.WithTimeout(context.Background(), serverListenerBootTimeout)
209 defer cancel()
210
211 li, err := lc.Listen(ctx, "tcp", addr)
212 if err != nil {
213 return err
214 }
215 return bgs.StartWithListener(li)
216}
217
218func (bgs *BGS) StartWithListener(listen net.Listener) error {
219 e := echo.New()
220 e.HideBanner = true
221
222 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
223 AllowOrigins: []string{"*"},
224 AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization},
225 }))
226
227 if !bgs.ssl {
228 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
229 Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n",
230 }))
231 } else {
232 e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig))
233 }
234
235 // React uses a virtual router, so we need to serve the index.html for all
236 // routes that aren't otherwise handled or in the /assets directory.
237 e.File("/dash", "public/index.html")
238 e.File("/dash/*", "public/index.html")
239 e.Static("/assets", "public/assets")
240
241 e.Use(svcutil.MetricsMiddleware)
242
243 e.HTTPErrorHandler = func(err error, ctx echo.Context) {
244 switch err := err.(type) {
245 case *echo.HTTPError:
246 if err2 := ctx.JSON(err.Code, map[string]any{
247 "error": err.Message,
248 }); err2 != nil {
249 bgs.log.Error("Failed to write http error", "err", err2)
250 }
251 default:
252 sendHeader := true
253 if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" {
254 sendHeader = false
255 }
256
257 bgs.log.Warn("HANDLER ERROR: (%s) %s", ctx.Path(), err)
258
259 if strings.HasPrefix(ctx.Path(), "/admin/") {
260 ctx.JSON(500, map[string]any{
261 "error": err.Error(),
262 })
263 return
264 }
265
266 if sendHeader {
267 ctx.Response().WriteHeader(500)
268 }
269 }
270 }
271
272 // TODO: this API is temporary until we formalize what we want here
273
274 e.GET("/xrpc/com.atproto.sync.subscribeRepos", bgs.EventsHandler)
275 e.GET("/xrpc/com.atproto.sync.getRecord", bgs.HandleComAtprotoSyncGetRecord)
276 e.GET("/xrpc/com.atproto.sync.getRepo", bgs.HandleComAtprotoSyncGetRepo)
277 e.GET("/xrpc/com.atproto.sync.getBlocks", bgs.HandleComAtprotoSyncGetBlocks)
278 e.GET("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl)
279 e.POST("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl)
280 e.GET("/xrpc/com.atproto.sync.listRepos", bgs.HandleComAtprotoSyncListRepos)
281 e.GET("/xrpc/com.atproto.sync.getLatestCommit", bgs.HandleComAtprotoSyncGetLatestCommit)
282 e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", bgs.HandleComAtprotoSyncNotifyOfUpdate)
283 e.GET("/xrpc/_health", bgs.HandleHealthCheck)
284 e.GET("/_health", bgs.HandleHealthCheck)
285 e.GET("/", bgs.HandleHomeMessage)
286
287 admin := e.Group("/admin", bgs.checkAdminAuth)
288
289 // Slurper-related Admin API
290 admin.GET("/subs/getUpstreamConns", bgs.handleAdminGetUpstreamConns)
291 admin.GET("/subs/getEnabled", bgs.handleAdminGetSubsEnabled)
292 admin.GET("/subs/perDayLimit", bgs.handleAdminGetNewPDSPerDayRateLimit)
293 admin.POST("/subs/setEnabled", bgs.handleAdminSetSubsEnabled)
294 admin.POST("/subs/killUpstream", bgs.handleAdminKillUpstreamConn)
295 admin.POST("/subs/setPerDayLimit", bgs.handleAdminSetNewPDSPerDayRateLimit)
296
297 // Domain-related Admin API
298 admin.GET("/subs/listDomainBans", bgs.handleAdminListDomainBans)
299 admin.POST("/subs/banDomain", bgs.handleAdminBanDomain)
300 admin.POST("/subs/unbanDomain", bgs.handleAdminUnbanDomain)
301
302 // Repo-related Admin API
303 admin.POST("/repo/takeDown", bgs.handleAdminTakeDownRepo)
304 admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown)
305 admin.GET("/repo/takedowns", bgs.handleAdminListRepoTakeDowns)
306 admin.POST("/repo/compact", bgs.handleAdminCompactRepo)
307 admin.POST("/repo/compactAll", bgs.handleAdminCompactAllRepos)
308 admin.POST("/repo/reset", bgs.handleAdminResetRepo)
309 admin.POST("/repo/verify", bgs.handleAdminVerifyRepo)
310
311 // PDS-related Admin API
312 admin.POST("/pds/requestCrawl", bgs.handleAdminRequestCrawl)
313 admin.GET("/pds/list", bgs.handleListPDSs)
314 admin.POST("/pds/resync", bgs.handleAdminPostResyncPDS)
315 admin.GET("/pds/resync", bgs.handleAdminGetResyncPDS)
316 admin.POST("/pds/changeLimits", bgs.handleAdminChangePDSRateLimits)
317 admin.POST("/pds/block", bgs.handleBlockPDS)
318 admin.POST("/pds/unblock", bgs.handleUnblockPDS)
319 admin.POST("/pds/addTrustedDomain", bgs.handleAdminAddTrustedDomain)
320
321 // Consumer-related Admin API
322 admin.GET("/consumers/list", bgs.handleAdminListConsumers)
323
324 // In order to support booting on random ports in tests, we need to tell the
325 // Echo instance it's already got a port, and then use its StartServer
326 // method to re-use that listener.
327 e.Listener = listen
328 srv := &http.Server{}
329 return e.StartServer(srv)
330}
331
332func (bgs *BGS) Shutdown() []error {
333 errs := bgs.slurper.Shutdown()
334
335 if err := bgs.events.Shutdown(context.TODO()); err != nil {
336 errs = append(errs, err)
337 }
338
339 bgs.compactor.Shutdown()
340
341 return errs
342}
343
344type HealthStatus struct {
345 Status string `json:"status"`
346 Message string `json:"msg,omitempty"`
347}
348
349func (bgs *BGS) HandleHealthCheck(c echo.Context) error {
350 if err := bgs.db.Exec("SELECT 1").Error; err != nil {
351 bgs.log.Error("healthcheck can't connect to database", "err", err)
352 return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"})
353 } else {
354 return c.JSON(200, HealthStatus{Status: "ok"})
355 }
356}
357
358var homeMessage string = `
359d8888b. d888888b d888b .d8888. db dD db db
36088 '8D '88' 88' Y8b 88' YP 88 ,8P' '8b d8'
36188oooY' 88 88 '8bo. 88,8P '8bd8'
36288~~~b. 88 88 ooo 'Y8b. 88'8b 88
36388 8D .88. 88. ~8~ db 8D 88 '88. 88
364Y8888P' Y888888P Y888P '8888Y' YP YD YP
365
366This is an atproto [https://atproto.com] relay instance, running the 'bigsky' codebase [https://github.com/bluesky-social/indigo]
367
368The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos
369`
370
371func (bgs *BGS) HandleHomeMessage(c echo.Context) error {
372 return c.String(http.StatusOK, homeMessage)
373}
374
375type AuthToken struct {
376 gorm.Model
377 Token string `gorm:"index"`
378}
379
380func (bgs *BGS) lookupAdminToken(tok string) (bool, error) {
381 var at AuthToken
382 if err := bgs.db.Find(&at, "token = ?", tok).Error; err != nil {
383 return false, err
384 }
385
386 if at.ID == 0 {
387 return false, nil
388 }
389
390 return true, nil
391}
392
393func (bgs *BGS) CreateAdminToken(tok string) error {
394 exists, err := bgs.lookupAdminToken(tok)
395 if err != nil {
396 return err
397 }
398
399 if exists {
400 return nil
401 }
402
403 return bgs.db.Create(&AuthToken{
404 Token: tok,
405 }).Error
406}
407
408func (bgs *BGS) checkAdminAuth(next echo.HandlerFunc) echo.HandlerFunc {
409 return func(e echo.Context) error {
410 ctx, span := tracer.Start(e.Request().Context(), "checkAdminAuth")
411 defer span.End()
412
413 e.SetRequest(e.Request().WithContext(ctx))
414
415 authheader := e.Request().Header.Get("Authorization")
416 pref := "Bearer "
417 if !strings.HasPrefix(authheader, pref) {
418 return echo.ErrForbidden
419 }
420
421 token := authheader[len(pref):]
422
423 exists, err := bgs.lookupAdminToken(token)
424 if err != nil {
425 return err
426 }
427
428 if !exists {
429 return echo.ErrForbidden
430 }
431
432 return next(e)
433 }
434}
435
436type User struct {
437 ID models.Uid `gorm:"primarykey;index:idx_user_id_active,where:taken_down = false AND tombstoned = false"`
438 CreatedAt time.Time
439 UpdatedAt time.Time
440 DeletedAt gorm.DeletedAt `gorm:"index"`
441 Handle sql.NullString `gorm:"index"`
442 Did string `gorm:"uniqueIndex"`
443 PDS uint
444 ValidHandle bool `gorm:"default:true"`
445
446 // TakenDown is set to true if the user in question has been taken down.
447 // A user in this state will have all future events related to it dropped
448 // and no data about this user will be served.
449 TakenDown bool
450 Tombstoned bool
451
452 // UpstreamStatus is the state of the user as reported by the upstream PDS
453 UpstreamStatus string `gorm:"index"`
454
455 lk sync.Mutex
456}
457
458func (u *User) SetTakenDown(v bool) {
459 u.lk.Lock()
460 defer u.lk.Unlock()
461 u.TakenDown = v
462}
463
464func (u *User) GetTakenDown() bool {
465 u.lk.Lock()
466 defer u.lk.Unlock()
467 return u.TakenDown
468}
469
470func (u *User) SetTombstoned(v bool) {
471 u.lk.Lock()
472 defer u.lk.Unlock()
473 u.Tombstoned = v
474}
475
476func (u *User) GetTombstoned() bool {
477 u.lk.Lock()
478 defer u.lk.Unlock()
479 return u.Tombstoned
480}
481
482func (u *User) SetUpstreamStatus(v string) {
483 u.lk.Lock()
484 defer u.lk.Unlock()
485 u.UpstreamStatus = v
486}
487
488func (u *User) GetUpstreamStatus() string {
489 u.lk.Lock()
490 defer u.lk.Unlock()
491 return u.UpstreamStatus
492}
493
494type addTargetBody struct {
495 Host string `json:"host"`
496}
497
498func (bgs *BGS) registerConsumer(c *SocketConsumer) uint64 {
499 bgs.consumersLk.Lock()
500 defer bgs.consumersLk.Unlock()
501
502 id := bgs.nextConsumerID
503 bgs.nextConsumerID++
504
505 bgs.consumers[id] = c
506
507 return id
508}
509
510func (bgs *BGS) cleanupConsumer(id uint64) {
511 bgs.consumersLk.Lock()
512 defer bgs.consumersLk.Unlock()
513
514 c := bgs.consumers[id]
515
516 var m = &dto.Metric{}
517 if err := c.EventsSent.Write(m); err != nil {
518 bgs.log.Error("failed to get sent counter", "err", err)
519 }
520
521 bgs.log.Info("consumer disconnected",
522 "consumer_id", id,
523 "remote_addr", c.RemoteAddr,
524 "user_agent", c.UserAgent,
525 "events_sent", m.Counter.GetValue())
526
527 delete(bgs.consumers, id)
528}
529
530func (bgs *BGS) EventsHandler(c echo.Context) error {
531 var since *int64
532 if sinceVal := c.QueryParam("cursor"); sinceVal != "" {
533 sval, err := strconv.ParseInt(sinceVal, 10, 64)
534 if err != nil {
535 return err
536 }
537 since = &sval
538 }
539
540 ctx, cancel := context.WithCancel(c.Request().Context())
541 defer cancel()
542
543 // TODO: authhhh
544 conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10)
545 if err != nil {
546 return fmt.Errorf("upgrading websocket: %w", err)
547 }
548
549 defer conn.Close()
550
551 lastWriteLk := sync.Mutex{}
552 lastWrite := time.Now()
553
554 // Start a goroutine to ping the client every 30 seconds to check if it's
555 // still alive. If the client doesn't respond to a ping within 5 seconds,
556 // we'll close the connection and teardown the consumer.
557 go func() {
558 ticker := time.NewTicker(30 * time.Second)
559 defer ticker.Stop()
560
561 for {
562 select {
563 case <-ticker.C:
564 lastWriteLk.Lock()
565 lw := lastWrite
566 lastWriteLk.Unlock()
567
568 if time.Since(lw) < 30*time.Second {
569 continue
570 }
571
572 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil {
573 bgs.log.Warn("failed to ping client", "err", err)
574 cancel()
575 return
576 }
577 case <-ctx.Done():
578 return
579 }
580 }
581 }()
582
583 conn.SetPingHandler(func(message string) error {
584 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60))
585 if err == websocket.ErrCloseSent {
586 return nil
587 } else if e, ok := err.(net.Error); ok && e.Temporary() {
588 return nil
589 }
590 return err
591 })
592
593 // Start a goroutine to read messages from the client and discard them.
594 go func() {
595 for {
596 _, _, err := conn.ReadMessage()
597 if err != nil {
598 bgs.log.Warn("failed to read message from client", "err", err)
599 cancel()
600 return
601 }
602 }
603 }()
604
605 ident := c.RealIP() + "-" + c.Request().UserAgent()
606
607 evts, cleanup, err := bgs.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since)
608 if err != nil {
609 return err
610 }
611 defer cleanup()
612
613 // Keep track of the consumer for metrics and admin endpoints
614 consumer := SocketConsumer{
615 RemoteAddr: c.RealIP(),
616 UserAgent: c.Request().UserAgent(),
617 ConnectedAt: time.Now(),
618 }
619 sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent)
620 consumer.EventsSent = sentCounter
621
622 consumerID := bgs.registerConsumer(&consumer)
623 defer bgs.cleanupConsumer(consumerID)
624
625 logger := bgs.log.With(
626 "consumer_id", consumerID,
627 "remote_addr", consumer.RemoteAddr,
628 "user_agent", consumer.UserAgent,
629 )
630
631 logger.Info("new consumer", "cursor", since)
632
633 for {
634 select {
635 case evt, ok := <-evts:
636 if !ok {
637 logger.Error("event stream closed unexpectedly")
638 return nil
639 }
640
641 wc, err := conn.NextWriter(websocket.BinaryMessage)
642 if err != nil {
643 logger.Error("failed to get next writer", "err", err)
644 return err
645 }
646
647 if evt.Preserialized != nil {
648 _, err = wc.Write(evt.Preserialized)
649 } else {
650 err = evt.Serialize(wc)
651 }
652 if err != nil {
653 return fmt.Errorf("failed to write event: %w", err)
654 }
655
656 if err := wc.Close(); err != nil {
657 logger.Warn("failed to flush-close our event write", "err", err)
658 return nil
659 }
660
661 lastWriteLk.Lock()
662 lastWrite = time.Now()
663 lastWriteLk.Unlock()
664 sentCounter.Inc()
665 case <-ctx.Done():
666 return nil
667 }
668 }
669}
670
671// domainIsBanned checks if the given host is banned, starting with the host
672// itself, then checking every parent domain up to the tld
673func (s *BGS) domainIsBanned(ctx context.Context, host string) (bool, error) {
674 // ignore ports when checking for ban status
675 hostport := strings.Split(host, ":")
676
677 segments := strings.Split(hostport[0], ".")
678
679 // TODO: use normalize method once that merges
680 var cleaned []string
681 for _, s := range segments {
682 if s == "" {
683 continue
684 }
685 s = strings.ToLower(s)
686
687 cleaned = append(cleaned, s)
688 }
689 segments = cleaned
690
691 for i := 0; i < len(segments)-1; i++ {
692 dchk := strings.Join(segments[i:], ".")
693 found, err := s.findDomainBan(ctx, dchk)
694 if err != nil {
695 return false, err
696 }
697
698 if found {
699 return true, nil
700 }
701 }
702 return false, nil
703}
704
705func (s *BGS) findDomainBan(ctx context.Context, host string) (bool, error) {
706 var db models.DomainBan
707 if err := s.db.Find(&db, "domain = ?", host).Error; err != nil {
708 return false, err
709 }
710
711 if db.ID == 0 {
712 return false, nil
713 }
714
715 return true, nil
716}
717
718func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) {
719 ctx, span := tracer.Start(ctx, "lookupUserByDid")
720 defer span.End()
721
722 cu, ok := bgs.userCache.Get(did)
723 if ok {
724 return cu, nil
725 }
726
727 var u User
728 if err := bgs.db.Find(&u, "did = ?", did).Error; err != nil {
729 return nil, err
730 }
731
732 if u.ID == 0 {
733 return nil, gorm.ErrRecordNotFound
734 }
735
736 bgs.userCache.Add(did, &u)
737
738 return &u, nil
739}
740
741func (bgs *BGS) lookupUserByUID(ctx context.Context, uid models.Uid) (*User, error) {
742 ctx, span := tracer.Start(ctx, "lookupUserByUID")
743 defer span.End()
744
745 var u User
746 if err := bgs.db.Find(&u, "id = ?", uid).Error; err != nil {
747 return nil, err
748 }
749
750 if u.ID == 0 {
751 return nil, gorm.ErrRecordNotFound
752 }
753
754 return &u, nil
755}
756
757func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error {
758 ctx, span := tracer.Start(ctx, "handleFedEvent")
759 defer span.End()
760
761 start := time.Now()
762 defer func() {
763 eventsHandleDuration.WithLabelValues(host.Host).Observe(time.Since(start).Seconds())
764 }()
765
766 eventsReceivedCounter.WithLabelValues(host.Host).Add(1)
767
768 switch {
769 case env.RepoCommit != nil:
770 repoCommitsReceivedCounter.WithLabelValues(host.Host).Add(1)
771 evt := env.RepoCommit
772 bgs.log.Debug("bgs got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo)
773
774 s := time.Now()
775 u, err := bgs.lookupUserByDid(ctx, evt.Repo)
776 userLookupDuration.Observe(time.Since(s).Seconds())
777 if err != nil {
778 if !errors.Is(err, gorm.ErrRecordNotFound) {
779 repoCommitsResultCounter.WithLabelValues(host.Host, "nou").Inc()
780 return fmt.Errorf("looking up event user: %w", err)
781 }
782
783 newUsersDiscovered.Inc()
784 start := time.Now()
785 subj, err := bgs.createExternalUser(ctx, evt.Repo)
786 newUserDiscoveryDuration.Observe(time.Since(start).Seconds())
787 if err != nil {
788 repoCommitsResultCounter.WithLabelValues(host.Host, "uerr").Inc()
789 return fmt.Errorf("fed event create external user: %w", err)
790 }
791
792 u = new(User)
793 u.ID = subj.Uid
794 u.Did = evt.Repo
795 }
796
797 ustatus := u.GetUpstreamStatus()
798 span.SetAttributes(attribute.String("upstream_status", ustatus))
799
800 if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
801 span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
802 bgs.log.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
803 repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc()
804 return nil
805 }
806
807 if ustatus == events.AccountStatusSuspended {
808 bgs.log.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
809 repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc()
810 return nil
811 }
812
813 if ustatus == events.AccountStatusDeactivated {
814 bgs.log.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
815 repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc()
816 return nil
817 }
818
819 if evt.Rebase {
820 repoCommitsResultCounter.WithLabelValues(host.Host, "rebase").Inc()
821 return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
822 }
823
824 if host.ID != u.PDS && u.PDS != 0 {
825 bgs.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
826 // Flush any cached DID documents for this user
827 bgs.didr.FlushCacheFor(env.RepoCommit.Repo)
828
829 subj, err := bgs.createExternalUser(ctx, evt.Repo)
830 if err != nil {
831 repoCommitsResultCounter.WithLabelValues(host.Host, "uerr2").Inc()
832 return err
833 }
834
835 if subj.PDS != host.ID {
836 repoCommitsResultCounter.WithLabelValues(host.Host, "noauth").Inc()
837 return fmt.Errorf("event from non-authoritative pds")
838 }
839 }
840
841 if u.GetTombstoned() {
842 span.SetAttributes(attribute.Bool("tombstoned", true))
843 // we've checked the authority of the users PDS, so reinstate the account
844 if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
845 repoCommitsResultCounter.WithLabelValues(host.Host, "tomb").Inc()
846 return fmt.Errorf("failed to un-tombstone a user: %w", err)
847 }
848 u.SetTombstoned(false)
849
850 ai, err := bgs.Index.LookupUser(ctx, u.ID)
851 if err != nil {
852 repoCommitsResultCounter.WithLabelValues(host.Host, "nou2").Inc()
853 return fmt.Errorf("failed to look up user (tombstone recover): %w", err)
854 }
855
856 // Now a simple re-crawl should suffice to bring the user back online
857 repoCommitsResultCounter.WithLabelValues(host.Host, "catchupt").Inc()
858 return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
859 }
860
861 // skip the fast path for rebases or if the user is already in the slow path
862 if bgs.Index.Crawler.RepoInSlowPath(ctx, u.ID) {
863 rebasesCounter.WithLabelValues(host.Host).Add(1)
864 ai, err := bgs.Index.LookupUser(ctx, u.ID)
865 if err != nil {
866 repoCommitsResultCounter.WithLabelValues(host.Host, "nou3").Inc()
867 return fmt.Errorf("failed to look up user (slow path): %w", err)
868 }
869
870 // TODO: we currently do not handle events that get queued up
871 // behind an already 'in progress' slow path event.
872 // this is strictly less efficient than it could be, and while it
873 // does 'work' (due to falling back to resyncing the repo), its
874 // technically incorrect. Now that we have the parallel event
875 // processor coming off of the pds stream, we should investigate
876 // whether or not we even need this 'slow path' logic, as it makes
877 // accounting for which events have been processed much harder
878 repoCommitsResultCounter.WithLabelValues(host.Host, "catchup").Inc()
879 return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
880 }
881
882 if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil {
883
884 if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) {
885 ai, lerr := bgs.Index.LookupUser(ctx, u.ID)
886 if lerr != nil {
887 log.Warn("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
888 repoCommitsResultCounter.WithLabelValues(host.Host, "nou4").Inc()
889 return fmt.Errorf("failed to look up user %s (%d) (err case: %s): %w", u.Did, u.ID, err, lerr)
890 }
891
892 span.SetAttributes(attribute.Bool("catchup_queue", true))
893
894 log.Info("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
895 repoCommitsResultCounter.WithLabelValues(host.Host, "catchup2").Inc()
896 return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
897 }
898
899 log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
900 repoCommitsResultCounter.WithLabelValues(host.Host, "err").Inc()
901 return fmt.Errorf("handle user event failed: %w", err)
902 }
903
904 repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc()
905 return nil
906 case env.RepoIdentity != nil:
907 bgs.log.Info("bgs got identity event", "did", env.RepoIdentity.Did)
908 // Flush any cached DID documents for this user
909 bgs.didr.FlushCacheFor(env.RepoIdentity.Did)
910
911 // Refetch the DID doc and update our cached keys and handle etc.
912 _, err := bgs.createExternalUser(ctx, env.RepoIdentity.Did)
913 if err != nil {
914 return err
915 }
916
917 // Broadcast the identity event to all consumers
918 err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
919 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
920 Did: env.RepoIdentity.Did,
921 Seq: env.RepoIdentity.Seq,
922 Time: env.RepoIdentity.Time,
923 Handle: env.RepoIdentity.Handle,
924 },
925 })
926 if err != nil {
927 bgs.log.Error("failed to broadcast Identity event", "error", err, "did", env.RepoIdentity.Did)
928 return fmt.Errorf("failed to broadcast Identity event: %w", err)
929 }
930
931 return nil
932 case env.RepoAccount != nil:
933 span.SetAttributes(
934 attribute.String("did", env.RepoAccount.Did),
935 attribute.Int64("seq", env.RepoAccount.Seq),
936 attribute.Bool("active", env.RepoAccount.Active),
937 )
938
939 if env.RepoAccount.Status != nil {
940 span.SetAttributes(attribute.String("repo_status", *env.RepoAccount.Status))
941 }
942
943 bgs.log.Info("bgs got account event", "did", env.RepoAccount.Did)
944 // Flush any cached DID documents for this user
945 bgs.didr.FlushCacheFor(env.RepoAccount.Did)
946
947 // Refetch the DID doc to make sure the PDS is still authoritative
948 ai, err := bgs.createExternalUser(ctx, env.RepoAccount.Did)
949 if err != nil {
950 span.RecordError(err)
951 return err
952 }
953
954 // Check if the PDS is still authoritative
955 // if not we don't want to be propagating this account event
956 if ai.PDS != host.ID {
957 bgs.log.Error("account event from non-authoritative pds",
958 "seq", env.RepoAccount.Seq,
959 "did", env.RepoAccount.Did,
960 "event_from", host.Host,
961 "did_doc_declared_pds", ai.PDS,
962 "account_evt", env.RepoAccount,
963 )
964 return fmt.Errorf("event from non-authoritative pds")
965 }
966
967 // Process the account status change
968 repoStatus := events.AccountStatusActive
969 if !env.RepoAccount.Active && env.RepoAccount.Status != nil {
970 repoStatus = *env.RepoAccount.Status
971 }
972
973 err = bgs.UpdateAccountStatus(ctx, env.RepoAccount.Did, repoStatus)
974 if err != nil {
975 span.RecordError(err)
976 return fmt.Errorf("failed to update account status: %w", err)
977 }
978
979 shouldBeActive := env.RepoAccount.Active
980 status := env.RepoAccount.Status
981 u, err := bgs.lookupUserByDid(ctx, env.RepoAccount.Did)
982 if err != nil {
983 return fmt.Errorf("failed to look up user by did: %w", err)
984 }
985
986 if u.GetTakenDown() {
987 shouldBeActive = false
988 status = &events.AccountStatusTakendown
989 }
990
991 // Broadcast the account event to all consumers
992 err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
993 RepoAccount: &comatproto.SyncSubscribeRepos_Account{
994 Did: env.RepoAccount.Did,
995 Seq: env.RepoAccount.Seq,
996 Time: env.RepoAccount.Time,
997 Active: shouldBeActive,
998 Status: status,
999 },
1000 })
1001 if err != nil {
1002 bgs.log.Error("failed to broadcast Account event", "error", err, "did", env.RepoAccount.Did)
1003 return fmt.Errorf("failed to broadcast Account event: %w", err)
1004 }
1005
1006 return nil
1007 default:
1008 return fmt.Errorf("invalid fed event")
1009 }
1010}
1011
1012// TODO: rename? This also updates users, and 'external' is an old phrasing
1013func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.ActorInfo, error) {
1014 ctx, span := tracer.Start(ctx, "createExternalUser")
1015 defer span.End()
1016
1017 externalUserCreationAttempts.Inc()
1018
1019 s.log.Debug("create external user", "did", did)
1020 doc, err := s.didr.GetDocument(ctx, did)
1021 if err != nil {
1022 return nil, fmt.Errorf("could not locate DID document for followed user (%s): %w", did, err)
1023 }
1024
1025 if len(doc.Service) == 0 {
1026 return nil, fmt.Errorf("external followed user %s had no services in did document", did)
1027 }
1028
1029 svc := doc.Service[0]
1030 durl, err := url.Parse(svc.ServiceEndpoint)
1031 if err != nil {
1032 return nil, err
1033 }
1034
1035 if strings.HasPrefix(durl.Host, "localhost:") {
1036 durl.Scheme = "http"
1037 }
1038
1039 // TODO: the PDS's DID should also be in the service, we could use that to look up?
1040 var peering models.PDS
1041 if err := s.db.Find(&peering, "host = ?", durl.Host).Error; err != nil {
1042 s.log.Error("failed to find pds", "host", durl.Host)
1043 return nil, err
1044 }
1045
1046 ban, err := s.domainIsBanned(ctx, durl.Host)
1047 if err != nil {
1048 return nil, fmt.Errorf("failed to check pds ban status: %w", err)
1049 }
1050
1051 if ban {
1052 return nil, fmt.Errorf("cannot create user on pds with banned domain")
1053 }
1054
1055 c := &xrpc.Client{Host: durl.String()}
1056 s.Index.ApplyPDSClientSettings(c)
1057
1058 if peering.ID == 0 {
1059 // TODO: the case of handling a new user on a new PDS probably requires more thought
1060 cfg, err := atproto.ServerDescribeServer(ctx, c)
1061 if err != nil {
1062 // TODO: failing this shouldn't halt our indexing
1063 return nil, fmt.Errorf("failed to check unrecognized pds: %w", err)
1064 }
1065
1066 // since handles can be anything, checking against this list doesn't matter...
1067 _ = cfg
1068
1069 // TODO: could check other things, a valid response is good enough for now
1070 peering.Host = durl.Host
1071 peering.SSL = (durl.Scheme == "https")
1072 peering.CrawlRateLimit = float64(s.slurper.DefaultCrawlLimit)
1073 peering.RateLimit = float64(s.slurper.DefaultPerSecondLimit)
1074 peering.HourlyEventLimit = s.slurper.DefaultPerHourLimit
1075 peering.DailyEventLimit = s.slurper.DefaultPerDayLimit
1076 peering.RepoLimit = s.slurper.DefaultRepoLimit
1077
1078 if s.ssl && !peering.SSL {
1079 return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, svc.ServiceEndpoint)
1080 }
1081
1082 if err := s.db.Create(&peering).Error; err != nil {
1083 return nil, err
1084 }
1085 }
1086
1087 if peering.ID == 0 {
1088 panic("somehow failed to create a pds entry?")
1089 }
1090
1091 if peering.Blocked {
1092 return nil, fmt.Errorf("refusing to create user with blocked PDS")
1093 }
1094
1095 if peering.RepoCount >= peering.RepoLimit {
1096 return nil, fmt.Errorf("refusing to create user on PDS at max repo limit for pds %q", peering.Host)
1097 }
1098
1099 // Increment the repo count for the PDS
1100 res := s.db.Model(&models.PDS{}).Where("id = ? AND repo_count < repo_limit", peering.ID).Update("repo_count", gorm.Expr("repo_count + 1"))
1101 if res.Error != nil {
1102 return nil, fmt.Errorf("failed to increment repo count for pds %q: %w", peering.Host, res.Error)
1103 }
1104
1105 if res.RowsAffected == 0 {
1106 return nil, fmt.Errorf("refusing to create user on PDS at max repo limit for pds %q", peering.Host)
1107 }
1108
1109 successfullyCreated := false
1110
1111 // Release the count if we fail to create the user
1112 defer func() {
1113 if !successfullyCreated {
1114 if err := s.db.Model(&models.PDS{}).Where("id = ?", peering.ID).Update("repo_count", gorm.Expr("repo_count - 1")).Error; err != nil {
1115 s.log.Error("failed to decrement repo count for pds", "err", err)
1116 }
1117 }
1118 }()
1119
1120 if len(doc.AlsoKnownAs) == 0 {
1121 return nil, fmt.Errorf("user has no 'known as' field in their DID document")
1122 }
1123
1124 hurl, err := url.Parse(doc.AlsoKnownAs[0])
1125 if err != nil {
1126 return nil, err
1127 }
1128
1129 s.log.Debug("creating external user", "did", did, "handle", hurl.Host, "pds", peering.ID)
1130
1131 handle := hurl.Host
1132
1133 validHandle := true
1134
1135 resdid, err := s.hr.ResolveHandleToDid(ctx, handle)
1136 if err != nil {
1137 s.log.Error("failed to resolve users claimed handle on pds", "handle", handle, "err", err)
1138 validHandle = false
1139 }
1140
1141 if resdid != did {
1142 s.log.Error("claimed handle did not match servers response", "resdid", resdid, "did", did)
1143 validHandle = false
1144 }
1145
1146 s.extUserLk.Lock()
1147 defer s.extUserLk.Unlock()
1148
1149 exu, err := s.Index.LookupUserByDid(ctx, did)
1150 if err == nil {
1151 s.log.Debug("lost the race to create a new user", "did", did, "handle", handle, "existing_hand", exu.Handle)
1152 if exu.PDS != peering.ID {
1153 // User is now on a different PDS, update
1154 if err := s.db.Model(User{}).Where("id = ?", exu.Uid).Update("pds", peering.ID).Error; err != nil {
1155 return nil, fmt.Errorf("failed to update users pds: %w", err)
1156 }
1157
1158 if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", exu.Uid).Update("pds", peering.ID).Error; err != nil {
1159 return nil, fmt.Errorf("failed to update users pds on actorInfo: %w", err)
1160 }
1161
1162 exu.PDS = peering.ID
1163 }
1164
1165 if exu.Handle.String != handle {
1166 // Users handle has changed, update
1167 if err := s.db.Model(User{}).Where("id = ?", exu.Uid).Update("handle", handle).Error; err != nil {
1168 return nil, fmt.Errorf("failed to update users handle: %w", err)
1169 }
1170
1171 // Update ActorInfos
1172 if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", exu.Uid).Update("handle", handle).Error; err != nil {
1173 return nil, fmt.Errorf("failed to update actorInfos handle: %w", err)
1174 }
1175
1176 exu.Handle = sql.NullString{String: handle, Valid: true}
1177 }
1178 return exu, nil
1179 }
1180
1181 if !errors.Is(err, gorm.ErrRecordNotFound) {
1182 return nil, err
1183 }
1184
1185 // TODO: request this users info from their server to fill out our data...
1186 u := User{
1187 Did: did,
1188 PDS: peering.ID,
1189 ValidHandle: validHandle,
1190 }
1191 if validHandle {
1192 u.Handle = sql.NullString{String: handle, Valid: true}
1193 }
1194
1195 if err := s.db.Create(&u).Error; err != nil {
1196 // If the new user's handle conflicts with an existing user,
1197 // since we just validated the handle for this user, we'll assume
1198 // the existing user no longer has control of the handle
1199 if errors.Is(err, gorm.ErrDuplicatedKey) {
1200 // Get the UID of the existing user
1201 var existingUser User
1202 if err := s.db.Find(&existingUser, "handle = ?", handle).Error; err != nil {
1203 return nil, fmt.Errorf("failed to find existing user: %w", err)
1204 }
1205
1206 // Set the existing user's handle to NULL and set the valid_handle flag to false
1207 if err := s.db.Model(User{}).Where("id = ?", existingUser.ID).Update("handle", nil).Update("valid_handle", false).Error; err != nil {
1208 return nil, fmt.Errorf("failed to update outdated user's handle: %w", err)
1209 }
1210
1211 // Do the same thing for the ActorInfo if it exists
1212 if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", existingUser.ID).Update("handle", nil).Update("valid_handle", false).Error; err != nil {
1213 if !errors.Is(err, gorm.ErrRecordNotFound) {
1214 return nil, fmt.Errorf("failed to update outdated actorInfo's handle: %w", err)
1215 }
1216 }
1217
1218 // Create the new user
1219 if err := s.db.Create(&u).Error; err != nil {
1220 return nil, fmt.Errorf("failed to create user after handle conflict: %w", err)
1221 }
1222
1223 s.userCache.Remove(did)
1224 } else {
1225 return nil, fmt.Errorf("failed to create other pds user: %w", err)
1226 }
1227 }
1228
1229 // okay cool, its a user on a server we are peered with
1230 // lets make a local record of that user for the future
1231 subj := &models.ActorInfo{
1232 Uid: u.ID,
1233 DisplayName: "", //*profile.DisplayName,
1234 Did: did,
1235 Type: "",
1236 PDS: peering.ID,
1237 ValidHandle: validHandle,
1238 }
1239 if validHandle {
1240 subj.Handle = sql.NullString{String: handle, Valid: true}
1241 }
1242 if err := s.db.Create(subj).Error; err != nil {
1243 return nil, err
1244 }
1245
1246 successfullyCreated = true
1247
1248 return subj, nil
1249}
1250
1251func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status string) error {
1252 ctx, span := tracer.Start(ctx, "UpdateAccountStatus")
1253 defer span.End()
1254
1255 span.SetAttributes(
1256 attribute.String("did", did),
1257 attribute.String("status", status),
1258 )
1259
1260 u, err := bgs.lookupUserByDid(ctx, did)
1261 if err != nil {
1262 return err
1263 }
1264
1265 switch status {
1266 case events.AccountStatusActive:
1267 // Unset the PDS-specific status flags
1268 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil {
1269 return fmt.Errorf("failed to set user active status: %w", err)
1270 }
1271 u.SetUpstreamStatus(events.AccountStatusActive)
1272 case events.AccountStatusDeactivated:
1273 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil {
1274 return fmt.Errorf("failed to set user deactivation status: %w", err)
1275 }
1276 u.SetUpstreamStatus(events.AccountStatusDeactivated)
1277 case events.AccountStatusSuspended:
1278 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil {
1279 return fmt.Errorf("failed to set user suspension status: %w", err)
1280 }
1281 u.SetUpstreamStatus(events.AccountStatusSuspended)
1282 case events.AccountStatusTakendown:
1283 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil {
1284 return fmt.Errorf("failed to set user taken down status: %w", err)
1285 }
1286 u.SetUpstreamStatus(events.AccountStatusTakendown)
1287
1288 if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
1289 "handle": nil,
1290 }).Error; err != nil {
1291 return err
1292 }
1293 case events.AccountStatusDeleted:
1294 if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{
1295 "tombstoned": true,
1296 "handle": nil,
1297 "upstream_status": events.AccountStatusDeleted,
1298 }).Error; err != nil {
1299 return err
1300 }
1301 u.SetUpstreamStatus(events.AccountStatusDeleted)
1302
1303 if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
1304 "handle": nil,
1305 }).Error; err != nil {
1306 return err
1307 }
1308
1309 // delete data from carstore
1310 if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
1311 // don't let a failure here prevent us from propagating this event
1312 bgs.log.Error("failed to delete user data from carstore", "err", err)
1313 }
1314 }
1315
1316 return nil
1317}
1318
1319func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error {
1320 u, err := bgs.lookupUserByDid(ctx, did)
1321 if err != nil {
1322 return err
1323 }
1324
1325 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil {
1326 return err
1327 }
1328 u.SetTakenDown(true)
1329
1330 if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
1331 return err
1332 }
1333
1334 if err := bgs.events.TakeDownRepo(ctx, u.ID); err != nil {
1335 return err
1336 }
1337
1338 return nil
1339}
1340
1341func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error {
1342 u, err := bgs.lookupUserByDid(ctx, did)
1343 if err != nil {
1344 return err
1345 }
1346
1347 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil {
1348 return err
1349 }
1350 u.SetTakenDown(false)
1351
1352 return nil
1353}
1354
1355type revCheckResult struct {
1356 ai *models.ActorInfo
1357 err error
1358}
1359
1360func (bgs *BGS) LoadOrStoreResync(pds models.PDS) (PDSResync, bool) {
1361 bgs.pdsResyncsLk.Lock()
1362 defer bgs.pdsResyncsLk.Unlock()
1363
1364 if r, ok := bgs.pdsResyncs[pds.ID]; ok && r != nil {
1365 return *r, true
1366 }
1367
1368 r := PDSResync{
1369 PDS: pds,
1370 Status: "started",
1371 StatusChangedAt: time.Now(),
1372 }
1373
1374 bgs.pdsResyncs[pds.ID] = &r
1375
1376 return r, false
1377}
1378
1379func (bgs *BGS) GetResync(pds models.PDS) (PDSResync, bool) {
1380 bgs.pdsResyncsLk.RLock()
1381 defer bgs.pdsResyncsLk.RUnlock()
1382
1383 if r, ok := bgs.pdsResyncs[pds.ID]; ok {
1384 return *r, true
1385 }
1386
1387 return PDSResync{}, false
1388}
1389
1390func (bgs *BGS) UpdateResync(resync PDSResync) {
1391 bgs.pdsResyncsLk.Lock()
1392 defer bgs.pdsResyncsLk.Unlock()
1393
1394 bgs.pdsResyncs[resync.PDS.ID] = &resync
1395}
1396
1397func (bgs *BGS) SetResyncStatus(id uint, status string) PDSResync {
1398 bgs.pdsResyncsLk.Lock()
1399 defer bgs.pdsResyncsLk.Unlock()
1400
1401 if r, ok := bgs.pdsResyncs[id]; ok {
1402 r.Status = status
1403 r.StatusChangedAt = time.Now()
1404 }
1405
1406 return *bgs.pdsResyncs[id]
1407}
1408
1409func (bgs *BGS) CompleteResync(resync PDSResync) {
1410 bgs.pdsResyncsLk.Lock()
1411 defer bgs.pdsResyncsLk.Unlock()
1412
1413 delete(bgs.pdsResyncs, resync.PDS.ID)
1414}
1415
1416func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
1417 ctx, span := tracer.Start(ctx, "ResyncPDS")
1418 defer span.End()
1419 log := bgs.log.With("pds", pds.Host, "source", "resync_pds")
1420 resync, found := bgs.LoadOrStoreResync(pds)
1421 if found {
1422 return fmt.Errorf("resync already in progress")
1423 }
1424 defer bgs.CompleteResync(resync)
1425
1426 start := time.Now()
1427
1428 log.Warn("starting PDS resync")
1429
1430 host := "http://"
1431 if pds.SSL {
1432 host = "https://"
1433 }
1434 host += pds.Host
1435
1436 xrpcc := xrpc.Client{Host: host}
1437 bgs.Index.ApplyPDSClientSettings(&xrpcc)
1438
1439 limiter := rate.NewLimiter(rate.Limit(50), 1)
1440 cursor := ""
1441 limit := int64(500)
1442
1443 repos := []comatproto.SyncListRepos_Repo{}
1444
1445 pages := 0
1446
1447 resync = bgs.SetResyncStatus(pds.ID, "listing repos")
1448 for {
1449 pages++
1450 if pages%10 == 0 {
1451 log.Warn("fetching PDS page during resync", "pages", pages, "total_repos", len(repos))
1452 resync.NumRepoPages = pages
1453 resync.NumRepos = len(repos)
1454 bgs.UpdateResync(resync)
1455 }
1456 if err := limiter.Wait(ctx); err != nil {
1457 log.Error("failed to wait for rate limiter", "error", err)
1458 return fmt.Errorf("failed to wait for rate limiter: %w", err)
1459 }
1460 repoList, err := comatproto.SyncListRepos(ctx, &xrpcc, cursor, limit)
1461 if err != nil {
1462 log.Error("failed to list repos", "error", err)
1463 return fmt.Errorf("failed to list repos: %w", err)
1464 }
1465
1466 for _, r := range repoList.Repos {
1467 if r != nil {
1468 repos = append(repos, *r)
1469 }
1470 }
1471
1472 if repoList.Cursor == nil || *repoList.Cursor == "" {
1473 break
1474 }
1475 cursor = *repoList.Cursor
1476 }
1477
1478 resync.NumRepoPages = pages
1479 resync.NumRepos = len(repos)
1480 bgs.UpdateResync(resync)
1481
1482 repolistDone := time.Now()
1483
1484 log.Warn("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start))
1485 resync = bgs.SetResyncStatus(pds.ID, "checking revs")
1486
1487 // run loop over repos with some concurrency
1488 sem := semaphore.NewWeighted(40)
1489
1490 // Check repo revs against our local copy and enqueue crawls for any that are out of date
1491 for i, r := range repos {
1492 if err := sem.Acquire(ctx, 1); err != nil {
1493 log.Error("failed to acquire semaphore", "error", err)
1494 continue
1495 }
1496 go func(r comatproto.SyncListRepos_Repo) {
1497 defer sem.Release(1)
1498 log := bgs.log.With("did", r.Did, "remote_rev", r.Rev)
1499 // Fetches the user if we have it, otherwise automatically enqueues it for crawling
1500 ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did)
1501 if err != nil {
1502 log.Error("failed to get user while resyncing PDS, we can't recrawl it", "error", err)
1503 return
1504 }
1505
1506 rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid)
1507 if err != nil {
1508 log.Warn("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid)
1509 err := bgs.Index.Crawler.Crawl(ctx, ai)
1510 if err != nil {
1511 log.Error("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
1512 }
1513 return
1514 }
1515
1516 if rev == "" || rev < r.Rev {
1517 log.Warn("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev)
1518 err := bgs.Index.Crawler.Crawl(ctx, ai)
1519 if err != nil {
1520 log.Error("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
1521 }
1522 return
1523 }
1524 }(r)
1525 if i%100 == 0 {
1526 if i%10_000 == 0 {
1527 log.Warn("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt))
1528 }
1529 resync.NumReposChecked = i
1530 bgs.UpdateResync(resync)
1531 }
1532 }
1533
1534 resync.NumReposChecked = len(repos)
1535 bgs.UpdateResync(resync)
1536
1537 bgs.log.Warn("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1)
1538
1539 return nil
1540}