fork of indigo with slightly nicer lexgen
at main 45 kB view raw
1package bgs 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net" 10 "net/http" 11 _ "net/http/pprof" 12 "net/url" 13 "strconv" 14 "strings" 15 "sync" 16 "time" 17 18 atproto "github.com/bluesky-social/indigo/api/atproto" 19 comatproto "github.com/bluesky-social/indigo/api/atproto" 20 "github.com/bluesky-social/indigo/carstore" 21 "github.com/bluesky-social/indigo/did" 22 "github.com/bluesky-social/indigo/events" 23 "github.com/bluesky-social/indigo/handles" 24 "github.com/bluesky-social/indigo/indexer" 25 "github.com/bluesky-social/indigo/models" 26 "github.com/bluesky-social/indigo/repomgr" 27 "github.com/bluesky-social/indigo/util/svcutil" 28 "github.com/bluesky-social/indigo/xrpc" 29 lru "github.com/hashicorp/golang-lru/v2" 30 "golang.org/x/sync/semaphore" 31 "golang.org/x/time/rate" 32 33 "github.com/gorilla/websocket" 34 ipld "github.com/ipfs/go-ipld-format" 35 "github.com/labstack/echo/v4" 36 "github.com/labstack/echo/v4/middleware" 37 promclient "github.com/prometheus/client_golang/prometheus" 38 "github.com/prometheus/client_golang/prometheus/promhttp" 39 dto "github.com/prometheus/client_model/go" 40 "go.opentelemetry.io/otel" 41 "go.opentelemetry.io/otel/attribute" 42 "gorm.io/gorm" 43) 44 45var tracer = otel.Tracer("bgs") 46 47// serverListenerBootTimeout is how long to wait for the requested server socket 48// to become available for use. This is an arbitrary timeout that should be safe 49// on any platform, but there's no great way to weave this timeout without 50// adding another parameter to the (at time of writing) long signature of 51// NewServer. 52const serverListenerBootTimeout = 5 * time.Second 53 54type BGS struct { 55 Index *indexer.Indexer 56 db *gorm.DB 57 slurper *Slurper 58 events *events.EventManager 59 didr did.Resolver 60 repoFetcher *indexer.RepoFetcher 61 62 hr handles.HandleResolver 63 64 // TODO: work on doing away with this flag in favor of more pluggable 65 // pieces that abstract the need for explicit ssl checks 66 ssl bool 67 68 crawlOnly bool 69 70 // TODO: at some point we will want to lock specific DIDs, this lock as is 71 // is overly broad, but i dont expect it to be a bottleneck for now 72 extUserLk sync.Mutex 73 74 repoman *repomgr.RepoManager 75 76 // Management of Socket Consumers 77 consumersLk sync.RWMutex 78 nextConsumerID uint64 79 consumers map[uint64]*SocketConsumer 80 81 // Management of Resyncs 82 pdsResyncsLk sync.RWMutex 83 pdsResyncs map[uint]*PDSResync 84 85 // Management of Compaction 86 compactor *Compactor 87 88 // User cache 89 userCache *lru.Cache[string, *User] 90 91 // nextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl 92 nextCrawlers []*url.URL 93 httpClient http.Client 94 95 log *slog.Logger 96} 97 98type PDSResync struct { 99 PDS models.PDS `json:"pds"` 100 NumRepoPages int `json:"numRepoPages"` 101 NumRepos int `json:"numRepos"` 102 NumReposChecked int `json:"numReposChecked"` 103 NumReposToResync int `json:"numReposToResync"` 104 Status string `json:"status"` 105 StatusChangedAt time.Time `json:"statusChangedAt"` 106} 107 108type SocketConsumer struct { 109 UserAgent string 110 RemoteAddr string 111 ConnectedAt time.Time 112 EventsSent promclient.Counter 113} 114 115type BGSConfig struct { 116 SSL bool 117 CompactInterval time.Duration 118 DefaultRepoLimit int64 119 ConcurrencyPerPDS int64 120 MaxQueuePerPDS int64 121 NumCompactionWorkers int 122 123 // NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl 124 NextCrawlers []*url.URL 125} 126 127func DefaultBGSConfig() *BGSConfig { 128 return &BGSConfig{ 129 SSL: true, 130 CompactInterval: 4 * time.Hour, 131 DefaultRepoLimit: 100, 132 ConcurrencyPerPDS: 100, 133 MaxQueuePerPDS: 1_000, 134 NumCompactionWorkers: 2, 135 } 136} 137 138func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr handles.HandleResolver, config *BGSConfig) (*BGS, error) { 139 140 if config == nil { 141 config = DefaultBGSConfig() 142 } 143 db.AutoMigrate(User{}) 144 db.AutoMigrate(AuthToken{}) 145 db.AutoMigrate(models.PDS{}) 146 db.AutoMigrate(models.DomainBan{}) 147 148 uc, _ := lru.New[string, *User](1_000_000) 149 150 bgs := &BGS{ 151 Index: ix, 152 db: db, 153 repoFetcher: rf, 154 155 hr: hr, 156 repoman: repoman, 157 events: evtman, 158 didr: didr, 159 ssl: config.SSL, 160 161 consumersLk: sync.RWMutex{}, 162 consumers: make(map[uint64]*SocketConsumer), 163 164 pdsResyncs: make(map[uint]*PDSResync), 165 166 userCache: uc, 167 168 log: slog.Default().With("system", "bgs"), 169 } 170 171 ix.CreateExternalUser = bgs.createExternalUser 172 slOpts := DefaultSlurperOptions() 173 slOpts.SSL = config.SSL 174 slOpts.DefaultRepoLimit = config.DefaultRepoLimit 175 slOpts.ConcurrencyPerPDS = config.ConcurrencyPerPDS 176 slOpts.MaxQueuePerPDS = config.MaxQueuePerPDS 177 s, err := NewSlurper(db, bgs.handleFedEvent, slOpts) 178 if err != nil { 179 return nil, err 180 } 181 182 bgs.slurper = s 183 184 if err := bgs.slurper.RestartAll(); err != nil { 185 return nil, err 186 } 187 188 cOpts := DefaultCompactorOptions() 189 cOpts.NumWorkers = config.NumCompactionWorkers 190 compactor := NewCompactor(cOpts) 191 compactor.requeueInterval = config.CompactInterval 192 compactor.Start(bgs) 193 bgs.compactor = compactor 194 195 bgs.nextCrawlers = config.NextCrawlers 196 bgs.httpClient.Timeout = time.Second * 5 197 198 return bgs, nil 199} 200 201func (bgs *BGS) StartMetrics(listen string) error { 202 http.Handle("/metrics", promhttp.Handler()) 203 return http.ListenAndServe(listen, nil) 204} 205 206func (bgs *BGS) Start(addr string) error { 207 var lc net.ListenConfig 208 ctx, cancel := context.WithTimeout(context.Background(), serverListenerBootTimeout) 209 defer cancel() 210 211 li, err := lc.Listen(ctx, "tcp", addr) 212 if err != nil { 213 return err 214 } 215 return bgs.StartWithListener(li) 216} 217 218func (bgs *BGS) StartWithListener(listen net.Listener) error { 219 e := echo.New() 220 e.HideBanner = true 221 222 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ 223 AllowOrigins: []string{"*"}, 224 AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, 225 })) 226 227 if !bgs.ssl { 228 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 229 Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", 230 })) 231 } else { 232 e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig)) 233 } 234 235 // React uses a virtual router, so we need to serve the index.html for all 236 // routes that aren't otherwise handled or in the /assets directory. 237 e.File("/dash", "public/index.html") 238 e.File("/dash/*", "public/index.html") 239 e.Static("/assets", "public/assets") 240 241 e.Use(svcutil.MetricsMiddleware) 242 243 e.HTTPErrorHandler = func(err error, ctx echo.Context) { 244 switch err := err.(type) { 245 case *echo.HTTPError: 246 if err2 := ctx.JSON(err.Code, map[string]any{ 247 "error": err.Message, 248 }); err2 != nil { 249 bgs.log.Error("Failed to write http error", "err", err2) 250 } 251 default: 252 sendHeader := true 253 if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" { 254 sendHeader = false 255 } 256 257 bgs.log.Warn("HANDLER ERROR: (%s) %s", ctx.Path(), err) 258 259 if strings.HasPrefix(ctx.Path(), "/admin/") { 260 ctx.JSON(500, map[string]any{ 261 "error": err.Error(), 262 }) 263 return 264 } 265 266 if sendHeader { 267 ctx.Response().WriteHeader(500) 268 } 269 } 270 } 271 272 // TODO: this API is temporary until we formalize what we want here 273 274 e.GET("/xrpc/com.atproto.sync.subscribeRepos", bgs.EventsHandler) 275 e.GET("/xrpc/com.atproto.sync.getRecord", bgs.HandleComAtprotoSyncGetRecord) 276 e.GET("/xrpc/com.atproto.sync.getRepo", bgs.HandleComAtprotoSyncGetRepo) 277 e.GET("/xrpc/com.atproto.sync.getBlocks", bgs.HandleComAtprotoSyncGetBlocks) 278 e.GET("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl) 279 e.POST("/xrpc/com.atproto.sync.requestCrawl", bgs.HandleComAtprotoSyncRequestCrawl) 280 e.GET("/xrpc/com.atproto.sync.listRepos", bgs.HandleComAtprotoSyncListRepos) 281 e.GET("/xrpc/com.atproto.sync.getLatestCommit", bgs.HandleComAtprotoSyncGetLatestCommit) 282 e.GET("/xrpc/com.atproto.sync.notifyOfUpdate", bgs.HandleComAtprotoSyncNotifyOfUpdate) 283 e.GET("/xrpc/_health", bgs.HandleHealthCheck) 284 e.GET("/_health", bgs.HandleHealthCheck) 285 e.GET("/", bgs.HandleHomeMessage) 286 287 admin := e.Group("/admin", bgs.checkAdminAuth) 288 289 // Slurper-related Admin API 290 admin.GET("/subs/getUpstreamConns", bgs.handleAdminGetUpstreamConns) 291 admin.GET("/subs/getEnabled", bgs.handleAdminGetSubsEnabled) 292 admin.GET("/subs/perDayLimit", bgs.handleAdminGetNewPDSPerDayRateLimit) 293 admin.POST("/subs/setEnabled", bgs.handleAdminSetSubsEnabled) 294 admin.POST("/subs/killUpstream", bgs.handleAdminKillUpstreamConn) 295 admin.POST("/subs/setPerDayLimit", bgs.handleAdminSetNewPDSPerDayRateLimit) 296 297 // Domain-related Admin API 298 admin.GET("/subs/listDomainBans", bgs.handleAdminListDomainBans) 299 admin.POST("/subs/banDomain", bgs.handleAdminBanDomain) 300 admin.POST("/subs/unbanDomain", bgs.handleAdminUnbanDomain) 301 302 // Repo-related Admin API 303 admin.POST("/repo/takeDown", bgs.handleAdminTakeDownRepo) 304 admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown) 305 admin.GET("/repo/takedowns", bgs.handleAdminListRepoTakeDowns) 306 admin.POST("/repo/compact", bgs.handleAdminCompactRepo) 307 admin.POST("/repo/compactAll", bgs.handleAdminCompactAllRepos) 308 admin.POST("/repo/reset", bgs.handleAdminResetRepo) 309 admin.POST("/repo/verify", bgs.handleAdminVerifyRepo) 310 311 // PDS-related Admin API 312 admin.POST("/pds/requestCrawl", bgs.handleAdminRequestCrawl) 313 admin.GET("/pds/list", bgs.handleListPDSs) 314 admin.POST("/pds/resync", bgs.handleAdminPostResyncPDS) 315 admin.GET("/pds/resync", bgs.handleAdminGetResyncPDS) 316 admin.POST("/pds/changeLimits", bgs.handleAdminChangePDSRateLimits) 317 admin.POST("/pds/block", bgs.handleBlockPDS) 318 admin.POST("/pds/unblock", bgs.handleUnblockPDS) 319 admin.POST("/pds/addTrustedDomain", bgs.handleAdminAddTrustedDomain) 320 321 // Consumer-related Admin API 322 admin.GET("/consumers/list", bgs.handleAdminListConsumers) 323 324 // In order to support booting on random ports in tests, we need to tell the 325 // Echo instance it's already got a port, and then use its StartServer 326 // method to re-use that listener. 327 e.Listener = listen 328 srv := &http.Server{} 329 return e.StartServer(srv) 330} 331 332func (bgs *BGS) Shutdown() []error { 333 errs := bgs.slurper.Shutdown() 334 335 if err := bgs.events.Shutdown(context.TODO()); err != nil { 336 errs = append(errs, err) 337 } 338 339 bgs.compactor.Shutdown() 340 341 return errs 342} 343 344type HealthStatus struct { 345 Status string `json:"status"` 346 Message string `json:"msg,omitempty"` 347} 348 349func (bgs *BGS) HandleHealthCheck(c echo.Context) error { 350 if err := bgs.db.Exec("SELECT 1").Error; err != nil { 351 bgs.log.Error("healthcheck can't connect to database", "err", err) 352 return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"}) 353 } else { 354 return c.JSON(200, HealthStatus{Status: "ok"}) 355 } 356} 357 358var homeMessage string = ` 359d8888b. d888888b d888b .d8888. db dD db db 36088 '8D '88' 88' Y8b 88' YP 88 ,8P' '8b d8' 36188oooY' 88 88 '8bo. 88,8P '8bd8' 36288~~~b. 88 88 ooo 'Y8b. 88'8b 88 36388 8D .88. 88. ~8~ db 8D 88 '88. 88 364Y8888P' Y888888P Y888P '8888Y' YP YD YP 365 366This is an atproto [https://atproto.com] relay instance, running the 'bigsky' codebase [https://github.com/bluesky-social/indigo] 367 368The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos 369` 370 371func (bgs *BGS) HandleHomeMessage(c echo.Context) error { 372 return c.String(http.StatusOK, homeMessage) 373} 374 375type AuthToken struct { 376 gorm.Model 377 Token string `gorm:"index"` 378} 379 380func (bgs *BGS) lookupAdminToken(tok string) (bool, error) { 381 var at AuthToken 382 if err := bgs.db.Find(&at, "token = ?", tok).Error; err != nil { 383 return false, err 384 } 385 386 if at.ID == 0 { 387 return false, nil 388 } 389 390 return true, nil 391} 392 393func (bgs *BGS) CreateAdminToken(tok string) error { 394 exists, err := bgs.lookupAdminToken(tok) 395 if err != nil { 396 return err 397 } 398 399 if exists { 400 return nil 401 } 402 403 return bgs.db.Create(&AuthToken{ 404 Token: tok, 405 }).Error 406} 407 408func (bgs *BGS) checkAdminAuth(next echo.HandlerFunc) echo.HandlerFunc { 409 return func(e echo.Context) error { 410 ctx, span := tracer.Start(e.Request().Context(), "checkAdminAuth") 411 defer span.End() 412 413 e.SetRequest(e.Request().WithContext(ctx)) 414 415 authheader := e.Request().Header.Get("Authorization") 416 pref := "Bearer " 417 if !strings.HasPrefix(authheader, pref) { 418 return echo.ErrForbidden 419 } 420 421 token := authheader[len(pref):] 422 423 exists, err := bgs.lookupAdminToken(token) 424 if err != nil { 425 return err 426 } 427 428 if !exists { 429 return echo.ErrForbidden 430 } 431 432 return next(e) 433 } 434} 435 436type User struct { 437 ID models.Uid `gorm:"primarykey;index:idx_user_id_active,where:taken_down = false AND tombstoned = false"` 438 CreatedAt time.Time 439 UpdatedAt time.Time 440 DeletedAt gorm.DeletedAt `gorm:"index"` 441 Handle sql.NullString `gorm:"index"` 442 Did string `gorm:"uniqueIndex"` 443 PDS uint 444 ValidHandle bool `gorm:"default:true"` 445 446 // TakenDown is set to true if the user in question has been taken down. 447 // A user in this state will have all future events related to it dropped 448 // and no data about this user will be served. 449 TakenDown bool 450 Tombstoned bool 451 452 // UpstreamStatus is the state of the user as reported by the upstream PDS 453 UpstreamStatus string `gorm:"index"` 454 455 lk sync.Mutex 456} 457 458func (u *User) SetTakenDown(v bool) { 459 u.lk.Lock() 460 defer u.lk.Unlock() 461 u.TakenDown = v 462} 463 464func (u *User) GetTakenDown() bool { 465 u.lk.Lock() 466 defer u.lk.Unlock() 467 return u.TakenDown 468} 469 470func (u *User) SetTombstoned(v bool) { 471 u.lk.Lock() 472 defer u.lk.Unlock() 473 u.Tombstoned = v 474} 475 476func (u *User) GetTombstoned() bool { 477 u.lk.Lock() 478 defer u.lk.Unlock() 479 return u.Tombstoned 480} 481 482func (u *User) SetUpstreamStatus(v string) { 483 u.lk.Lock() 484 defer u.lk.Unlock() 485 u.UpstreamStatus = v 486} 487 488func (u *User) GetUpstreamStatus() string { 489 u.lk.Lock() 490 defer u.lk.Unlock() 491 return u.UpstreamStatus 492} 493 494type addTargetBody struct { 495 Host string `json:"host"` 496} 497 498func (bgs *BGS) registerConsumer(c *SocketConsumer) uint64 { 499 bgs.consumersLk.Lock() 500 defer bgs.consumersLk.Unlock() 501 502 id := bgs.nextConsumerID 503 bgs.nextConsumerID++ 504 505 bgs.consumers[id] = c 506 507 return id 508} 509 510func (bgs *BGS) cleanupConsumer(id uint64) { 511 bgs.consumersLk.Lock() 512 defer bgs.consumersLk.Unlock() 513 514 c := bgs.consumers[id] 515 516 var m = &dto.Metric{} 517 if err := c.EventsSent.Write(m); err != nil { 518 bgs.log.Error("failed to get sent counter", "err", err) 519 } 520 521 bgs.log.Info("consumer disconnected", 522 "consumer_id", id, 523 "remote_addr", c.RemoteAddr, 524 "user_agent", c.UserAgent, 525 "events_sent", m.Counter.GetValue()) 526 527 delete(bgs.consumers, id) 528} 529 530func (bgs *BGS) EventsHandler(c echo.Context) error { 531 var since *int64 532 if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 533 sval, err := strconv.ParseInt(sinceVal, 10, 64) 534 if err != nil { 535 return err 536 } 537 since = &sval 538 } 539 540 ctx, cancel := context.WithCancel(c.Request().Context()) 541 defer cancel() 542 543 // TODO: authhhh 544 conn, err := websocket.Upgrade(c.Response(), c.Request(), c.Response().Header(), 10<<10, 10<<10) 545 if err != nil { 546 return fmt.Errorf("upgrading websocket: %w", err) 547 } 548 549 defer conn.Close() 550 551 lastWriteLk := sync.Mutex{} 552 lastWrite := time.Now() 553 554 // Start a goroutine to ping the client every 30 seconds to check if it's 555 // still alive. If the client doesn't respond to a ping within 5 seconds, 556 // we'll close the connection and teardown the consumer. 557 go func() { 558 ticker := time.NewTicker(30 * time.Second) 559 defer ticker.Stop() 560 561 for { 562 select { 563 case <-ticker.C: 564 lastWriteLk.Lock() 565 lw := lastWrite 566 lastWriteLk.Unlock() 567 568 if time.Since(lw) < 30*time.Second { 569 continue 570 } 571 572 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil { 573 bgs.log.Warn("failed to ping client", "err", err) 574 cancel() 575 return 576 } 577 case <-ctx.Done(): 578 return 579 } 580 } 581 }() 582 583 conn.SetPingHandler(func(message string) error { 584 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60)) 585 if err == websocket.ErrCloseSent { 586 return nil 587 } else if e, ok := err.(net.Error); ok && e.Temporary() { 588 return nil 589 } 590 return err 591 }) 592 593 // Start a goroutine to read messages from the client and discard them. 594 go func() { 595 for { 596 _, _, err := conn.ReadMessage() 597 if err != nil { 598 bgs.log.Warn("failed to read message from client", "err", err) 599 cancel() 600 return 601 } 602 } 603 }() 604 605 ident := c.RealIP() + "-" + c.Request().UserAgent() 606 607 evts, cleanup, err := bgs.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { return true }, since) 608 if err != nil { 609 return err 610 } 611 defer cleanup() 612 613 // Keep track of the consumer for metrics and admin endpoints 614 consumer := SocketConsumer{ 615 RemoteAddr: c.RealIP(), 616 UserAgent: c.Request().UserAgent(), 617 ConnectedAt: time.Now(), 618 } 619 sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent) 620 consumer.EventsSent = sentCounter 621 622 consumerID := bgs.registerConsumer(&consumer) 623 defer bgs.cleanupConsumer(consumerID) 624 625 logger := bgs.log.With( 626 "consumer_id", consumerID, 627 "remote_addr", consumer.RemoteAddr, 628 "user_agent", consumer.UserAgent, 629 ) 630 631 logger.Info("new consumer", "cursor", since) 632 633 for { 634 select { 635 case evt, ok := <-evts: 636 if !ok { 637 logger.Error("event stream closed unexpectedly") 638 return nil 639 } 640 641 wc, err := conn.NextWriter(websocket.BinaryMessage) 642 if err != nil { 643 logger.Error("failed to get next writer", "err", err) 644 return err 645 } 646 647 if evt.Preserialized != nil { 648 _, err = wc.Write(evt.Preserialized) 649 } else { 650 err = evt.Serialize(wc) 651 } 652 if err != nil { 653 return fmt.Errorf("failed to write event: %w", err) 654 } 655 656 if err := wc.Close(); err != nil { 657 logger.Warn("failed to flush-close our event write", "err", err) 658 return nil 659 } 660 661 lastWriteLk.Lock() 662 lastWrite = time.Now() 663 lastWriteLk.Unlock() 664 sentCounter.Inc() 665 case <-ctx.Done(): 666 return nil 667 } 668 } 669} 670 671// domainIsBanned checks if the given host is banned, starting with the host 672// itself, then checking every parent domain up to the tld 673func (s *BGS) domainIsBanned(ctx context.Context, host string) (bool, error) { 674 // ignore ports when checking for ban status 675 hostport := strings.Split(host, ":") 676 677 segments := strings.Split(hostport[0], ".") 678 679 // TODO: use normalize method once that merges 680 var cleaned []string 681 for _, s := range segments { 682 if s == "" { 683 continue 684 } 685 s = strings.ToLower(s) 686 687 cleaned = append(cleaned, s) 688 } 689 segments = cleaned 690 691 for i := 0; i < len(segments)-1; i++ { 692 dchk := strings.Join(segments[i:], ".") 693 found, err := s.findDomainBan(ctx, dchk) 694 if err != nil { 695 return false, err 696 } 697 698 if found { 699 return true, nil 700 } 701 } 702 return false, nil 703} 704 705func (s *BGS) findDomainBan(ctx context.Context, host string) (bool, error) { 706 var db models.DomainBan 707 if err := s.db.Find(&db, "domain = ?", host).Error; err != nil { 708 return false, err 709 } 710 711 if db.ID == 0 { 712 return false, nil 713 } 714 715 return true, nil 716} 717 718func (bgs *BGS) lookupUserByDid(ctx context.Context, did string) (*User, error) { 719 ctx, span := tracer.Start(ctx, "lookupUserByDid") 720 defer span.End() 721 722 cu, ok := bgs.userCache.Get(did) 723 if ok { 724 return cu, nil 725 } 726 727 var u User 728 if err := bgs.db.Find(&u, "did = ?", did).Error; err != nil { 729 return nil, err 730 } 731 732 if u.ID == 0 { 733 return nil, gorm.ErrRecordNotFound 734 } 735 736 bgs.userCache.Add(did, &u) 737 738 return &u, nil 739} 740 741func (bgs *BGS) lookupUserByUID(ctx context.Context, uid models.Uid) (*User, error) { 742 ctx, span := tracer.Start(ctx, "lookupUserByUID") 743 defer span.End() 744 745 var u User 746 if err := bgs.db.Find(&u, "id = ?", uid).Error; err != nil { 747 return nil, err 748 } 749 750 if u.ID == 0 { 751 return nil, gorm.ErrRecordNotFound 752 } 753 754 return &u, nil 755} 756 757func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error { 758 ctx, span := tracer.Start(ctx, "handleFedEvent") 759 defer span.End() 760 761 start := time.Now() 762 defer func() { 763 eventsHandleDuration.WithLabelValues(host.Host).Observe(time.Since(start).Seconds()) 764 }() 765 766 eventsReceivedCounter.WithLabelValues(host.Host).Add(1) 767 768 switch { 769 case env.RepoCommit != nil: 770 repoCommitsReceivedCounter.WithLabelValues(host.Host).Add(1) 771 evt := env.RepoCommit 772 bgs.log.Debug("bgs got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo) 773 774 s := time.Now() 775 u, err := bgs.lookupUserByDid(ctx, evt.Repo) 776 userLookupDuration.Observe(time.Since(s).Seconds()) 777 if err != nil { 778 if !errors.Is(err, gorm.ErrRecordNotFound) { 779 repoCommitsResultCounter.WithLabelValues(host.Host, "nou").Inc() 780 return fmt.Errorf("looking up event user: %w", err) 781 } 782 783 newUsersDiscovered.Inc() 784 start := time.Now() 785 subj, err := bgs.createExternalUser(ctx, evt.Repo) 786 newUserDiscoveryDuration.Observe(time.Since(start).Seconds()) 787 if err != nil { 788 repoCommitsResultCounter.WithLabelValues(host.Host, "uerr").Inc() 789 return fmt.Errorf("fed event create external user: %w", err) 790 } 791 792 u = new(User) 793 u.ID = subj.Uid 794 u.Did = evt.Repo 795 } 796 797 ustatus := u.GetUpstreamStatus() 798 span.SetAttributes(attribute.String("upstream_status", ustatus)) 799 800 if u.GetTakenDown() || ustatus == events.AccountStatusTakendown { 801 span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown())) 802 bgs.log.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 803 repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc() 804 return nil 805 } 806 807 if ustatus == events.AccountStatusSuspended { 808 bgs.log.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 809 repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc() 810 return nil 811 } 812 813 if ustatus == events.AccountStatusDeactivated { 814 bgs.log.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 815 repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc() 816 return nil 817 } 818 819 if evt.Rebase { 820 repoCommitsResultCounter.WithLabelValues(host.Host, "rebase").Inc() 821 return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host) 822 } 823 824 if host.ID != u.PDS && u.PDS != 0 { 825 bgs.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host) 826 // Flush any cached DID documents for this user 827 bgs.didr.FlushCacheFor(env.RepoCommit.Repo) 828 829 subj, err := bgs.createExternalUser(ctx, evt.Repo) 830 if err != nil { 831 repoCommitsResultCounter.WithLabelValues(host.Host, "uerr2").Inc() 832 return err 833 } 834 835 if subj.PDS != host.ID { 836 repoCommitsResultCounter.WithLabelValues(host.Host, "noauth").Inc() 837 return fmt.Errorf("event from non-authoritative pds") 838 } 839 } 840 841 if u.GetTombstoned() { 842 span.SetAttributes(attribute.Bool("tombstoned", true)) 843 // we've checked the authority of the users PDS, so reinstate the account 844 if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil { 845 repoCommitsResultCounter.WithLabelValues(host.Host, "tomb").Inc() 846 return fmt.Errorf("failed to un-tombstone a user: %w", err) 847 } 848 u.SetTombstoned(false) 849 850 ai, err := bgs.Index.LookupUser(ctx, u.ID) 851 if err != nil { 852 repoCommitsResultCounter.WithLabelValues(host.Host, "nou2").Inc() 853 return fmt.Errorf("failed to look up user (tombstone recover): %w", err) 854 } 855 856 // Now a simple re-crawl should suffice to bring the user back online 857 repoCommitsResultCounter.WithLabelValues(host.Host, "catchupt").Inc() 858 return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) 859 } 860 861 // skip the fast path for rebases or if the user is already in the slow path 862 if bgs.Index.Crawler.RepoInSlowPath(ctx, u.ID) { 863 rebasesCounter.WithLabelValues(host.Host).Add(1) 864 ai, err := bgs.Index.LookupUser(ctx, u.ID) 865 if err != nil { 866 repoCommitsResultCounter.WithLabelValues(host.Host, "nou3").Inc() 867 return fmt.Errorf("failed to look up user (slow path): %w", err) 868 } 869 870 // TODO: we currently do not handle events that get queued up 871 // behind an already 'in progress' slow path event. 872 // this is strictly less efficient than it could be, and while it 873 // does 'work' (due to falling back to resyncing the repo), its 874 // technically incorrect. Now that we have the parallel event 875 // processor coming off of the pds stream, we should investigate 876 // whether or not we even need this 'slow path' logic, as it makes 877 // accounting for which events have been processed much harder 878 repoCommitsResultCounter.WithLabelValues(host.Host, "catchup").Inc() 879 return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) 880 } 881 882 if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil { 883 884 if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) { 885 ai, lerr := bgs.Index.LookupUser(ctx, u.ID) 886 if lerr != nil { 887 log.Warn("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String()) 888 repoCommitsResultCounter.WithLabelValues(host.Host, "nou4").Inc() 889 return fmt.Errorf("failed to look up user %s (%d) (err case: %s): %w", u.Did, u.ID, err, lerr) 890 } 891 892 span.SetAttributes(attribute.Bool("catchup_queue", true)) 893 894 log.Info("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String()) 895 repoCommitsResultCounter.WithLabelValues(host.Host, "catchup2").Inc() 896 return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) 897 } 898 899 log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String()) 900 repoCommitsResultCounter.WithLabelValues(host.Host, "err").Inc() 901 return fmt.Errorf("handle user event failed: %w", err) 902 } 903 904 repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc() 905 return nil 906 case env.RepoIdentity != nil: 907 bgs.log.Info("bgs got identity event", "did", env.RepoIdentity.Did) 908 // Flush any cached DID documents for this user 909 bgs.didr.FlushCacheFor(env.RepoIdentity.Did) 910 911 // Refetch the DID doc and update our cached keys and handle etc. 912 _, err := bgs.createExternalUser(ctx, env.RepoIdentity.Did) 913 if err != nil { 914 return err 915 } 916 917 // Broadcast the identity event to all consumers 918 err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ 919 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 920 Did: env.RepoIdentity.Did, 921 Seq: env.RepoIdentity.Seq, 922 Time: env.RepoIdentity.Time, 923 Handle: env.RepoIdentity.Handle, 924 }, 925 }) 926 if err != nil { 927 bgs.log.Error("failed to broadcast Identity event", "error", err, "did", env.RepoIdentity.Did) 928 return fmt.Errorf("failed to broadcast Identity event: %w", err) 929 } 930 931 return nil 932 case env.RepoAccount != nil: 933 span.SetAttributes( 934 attribute.String("did", env.RepoAccount.Did), 935 attribute.Int64("seq", env.RepoAccount.Seq), 936 attribute.Bool("active", env.RepoAccount.Active), 937 ) 938 939 if env.RepoAccount.Status != nil { 940 span.SetAttributes(attribute.String("repo_status", *env.RepoAccount.Status)) 941 } 942 943 bgs.log.Info("bgs got account event", "did", env.RepoAccount.Did) 944 // Flush any cached DID documents for this user 945 bgs.didr.FlushCacheFor(env.RepoAccount.Did) 946 947 // Refetch the DID doc to make sure the PDS is still authoritative 948 ai, err := bgs.createExternalUser(ctx, env.RepoAccount.Did) 949 if err != nil { 950 span.RecordError(err) 951 return err 952 } 953 954 // Check if the PDS is still authoritative 955 // if not we don't want to be propagating this account event 956 if ai.PDS != host.ID { 957 bgs.log.Error("account event from non-authoritative pds", 958 "seq", env.RepoAccount.Seq, 959 "did", env.RepoAccount.Did, 960 "event_from", host.Host, 961 "did_doc_declared_pds", ai.PDS, 962 "account_evt", env.RepoAccount, 963 ) 964 return fmt.Errorf("event from non-authoritative pds") 965 } 966 967 // Process the account status change 968 repoStatus := events.AccountStatusActive 969 if !env.RepoAccount.Active && env.RepoAccount.Status != nil { 970 repoStatus = *env.RepoAccount.Status 971 } 972 973 err = bgs.UpdateAccountStatus(ctx, env.RepoAccount.Did, repoStatus) 974 if err != nil { 975 span.RecordError(err) 976 return fmt.Errorf("failed to update account status: %w", err) 977 } 978 979 shouldBeActive := env.RepoAccount.Active 980 status := env.RepoAccount.Status 981 u, err := bgs.lookupUserByDid(ctx, env.RepoAccount.Did) 982 if err != nil { 983 return fmt.Errorf("failed to look up user by did: %w", err) 984 } 985 986 if u.GetTakenDown() { 987 shouldBeActive = false 988 status = &events.AccountStatusTakendown 989 } 990 991 // Broadcast the account event to all consumers 992 err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{ 993 RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 994 Did: env.RepoAccount.Did, 995 Seq: env.RepoAccount.Seq, 996 Time: env.RepoAccount.Time, 997 Active: shouldBeActive, 998 Status: status, 999 }, 1000 }) 1001 if err != nil { 1002 bgs.log.Error("failed to broadcast Account event", "error", err, "did", env.RepoAccount.Did) 1003 return fmt.Errorf("failed to broadcast Account event: %w", err) 1004 } 1005 1006 return nil 1007 default: 1008 return fmt.Errorf("invalid fed event") 1009 } 1010} 1011 1012// TODO: rename? This also updates users, and 'external' is an old phrasing 1013func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.ActorInfo, error) { 1014 ctx, span := tracer.Start(ctx, "createExternalUser") 1015 defer span.End() 1016 1017 externalUserCreationAttempts.Inc() 1018 1019 s.log.Debug("create external user", "did", did) 1020 doc, err := s.didr.GetDocument(ctx, did) 1021 if err != nil { 1022 return nil, fmt.Errorf("could not locate DID document for followed user (%s): %w", did, err) 1023 } 1024 1025 if len(doc.Service) == 0 { 1026 return nil, fmt.Errorf("external followed user %s had no services in did document", did) 1027 } 1028 1029 svc := doc.Service[0] 1030 durl, err := url.Parse(svc.ServiceEndpoint) 1031 if err != nil { 1032 return nil, err 1033 } 1034 1035 if strings.HasPrefix(durl.Host, "localhost:") { 1036 durl.Scheme = "http" 1037 } 1038 1039 // TODO: the PDS's DID should also be in the service, we could use that to look up? 1040 var peering models.PDS 1041 if err := s.db.Find(&peering, "host = ?", durl.Host).Error; err != nil { 1042 s.log.Error("failed to find pds", "host", durl.Host) 1043 return nil, err 1044 } 1045 1046 ban, err := s.domainIsBanned(ctx, durl.Host) 1047 if err != nil { 1048 return nil, fmt.Errorf("failed to check pds ban status: %w", err) 1049 } 1050 1051 if ban { 1052 return nil, fmt.Errorf("cannot create user on pds with banned domain") 1053 } 1054 1055 c := &xrpc.Client{Host: durl.String()} 1056 s.Index.ApplyPDSClientSettings(c) 1057 1058 if peering.ID == 0 { 1059 // TODO: the case of handling a new user on a new PDS probably requires more thought 1060 cfg, err := atproto.ServerDescribeServer(ctx, c) 1061 if err != nil { 1062 // TODO: failing this shouldn't halt our indexing 1063 return nil, fmt.Errorf("failed to check unrecognized pds: %w", err) 1064 } 1065 1066 // since handles can be anything, checking against this list doesn't matter... 1067 _ = cfg 1068 1069 // TODO: could check other things, a valid response is good enough for now 1070 peering.Host = durl.Host 1071 peering.SSL = (durl.Scheme == "https") 1072 peering.CrawlRateLimit = float64(s.slurper.DefaultCrawlLimit) 1073 peering.RateLimit = float64(s.slurper.DefaultPerSecondLimit) 1074 peering.HourlyEventLimit = s.slurper.DefaultPerHourLimit 1075 peering.DailyEventLimit = s.slurper.DefaultPerDayLimit 1076 peering.RepoLimit = s.slurper.DefaultRepoLimit 1077 1078 if s.ssl && !peering.SSL { 1079 return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, svc.ServiceEndpoint) 1080 } 1081 1082 if err := s.db.Create(&peering).Error; err != nil { 1083 return nil, err 1084 } 1085 } 1086 1087 if peering.ID == 0 { 1088 panic("somehow failed to create a pds entry?") 1089 } 1090 1091 if peering.Blocked { 1092 return nil, fmt.Errorf("refusing to create user with blocked PDS") 1093 } 1094 1095 if peering.RepoCount >= peering.RepoLimit { 1096 return nil, fmt.Errorf("refusing to create user on PDS at max repo limit for pds %q", peering.Host) 1097 } 1098 1099 // Increment the repo count for the PDS 1100 res := s.db.Model(&models.PDS{}).Where("id = ? AND repo_count < repo_limit", peering.ID).Update("repo_count", gorm.Expr("repo_count + 1")) 1101 if res.Error != nil { 1102 return nil, fmt.Errorf("failed to increment repo count for pds %q: %w", peering.Host, res.Error) 1103 } 1104 1105 if res.RowsAffected == 0 { 1106 return nil, fmt.Errorf("refusing to create user on PDS at max repo limit for pds %q", peering.Host) 1107 } 1108 1109 successfullyCreated := false 1110 1111 // Release the count if we fail to create the user 1112 defer func() { 1113 if !successfullyCreated { 1114 if err := s.db.Model(&models.PDS{}).Where("id = ?", peering.ID).Update("repo_count", gorm.Expr("repo_count - 1")).Error; err != nil { 1115 s.log.Error("failed to decrement repo count for pds", "err", err) 1116 } 1117 } 1118 }() 1119 1120 if len(doc.AlsoKnownAs) == 0 { 1121 return nil, fmt.Errorf("user has no 'known as' field in their DID document") 1122 } 1123 1124 hurl, err := url.Parse(doc.AlsoKnownAs[0]) 1125 if err != nil { 1126 return nil, err 1127 } 1128 1129 s.log.Debug("creating external user", "did", did, "handle", hurl.Host, "pds", peering.ID) 1130 1131 handle := hurl.Host 1132 1133 validHandle := true 1134 1135 resdid, err := s.hr.ResolveHandleToDid(ctx, handle) 1136 if err != nil { 1137 s.log.Error("failed to resolve users claimed handle on pds", "handle", handle, "err", err) 1138 validHandle = false 1139 } 1140 1141 if resdid != did { 1142 s.log.Error("claimed handle did not match servers response", "resdid", resdid, "did", did) 1143 validHandle = false 1144 } 1145 1146 s.extUserLk.Lock() 1147 defer s.extUserLk.Unlock() 1148 1149 exu, err := s.Index.LookupUserByDid(ctx, did) 1150 if err == nil { 1151 s.log.Debug("lost the race to create a new user", "did", did, "handle", handle, "existing_hand", exu.Handle) 1152 if exu.PDS != peering.ID { 1153 // User is now on a different PDS, update 1154 if err := s.db.Model(User{}).Where("id = ?", exu.Uid).Update("pds", peering.ID).Error; err != nil { 1155 return nil, fmt.Errorf("failed to update users pds: %w", err) 1156 } 1157 1158 if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", exu.Uid).Update("pds", peering.ID).Error; err != nil { 1159 return nil, fmt.Errorf("failed to update users pds on actorInfo: %w", err) 1160 } 1161 1162 exu.PDS = peering.ID 1163 } 1164 1165 if exu.Handle.String != handle { 1166 // Users handle has changed, update 1167 if err := s.db.Model(User{}).Where("id = ?", exu.Uid).Update("handle", handle).Error; err != nil { 1168 return nil, fmt.Errorf("failed to update users handle: %w", err) 1169 } 1170 1171 // Update ActorInfos 1172 if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", exu.Uid).Update("handle", handle).Error; err != nil { 1173 return nil, fmt.Errorf("failed to update actorInfos handle: %w", err) 1174 } 1175 1176 exu.Handle = sql.NullString{String: handle, Valid: true} 1177 } 1178 return exu, nil 1179 } 1180 1181 if !errors.Is(err, gorm.ErrRecordNotFound) { 1182 return nil, err 1183 } 1184 1185 // TODO: request this users info from their server to fill out our data... 1186 u := User{ 1187 Did: did, 1188 PDS: peering.ID, 1189 ValidHandle: validHandle, 1190 } 1191 if validHandle { 1192 u.Handle = sql.NullString{String: handle, Valid: true} 1193 } 1194 1195 if err := s.db.Create(&u).Error; err != nil { 1196 // If the new user's handle conflicts with an existing user, 1197 // since we just validated the handle for this user, we'll assume 1198 // the existing user no longer has control of the handle 1199 if errors.Is(err, gorm.ErrDuplicatedKey) { 1200 // Get the UID of the existing user 1201 var existingUser User 1202 if err := s.db.Find(&existingUser, "handle = ?", handle).Error; err != nil { 1203 return nil, fmt.Errorf("failed to find existing user: %w", err) 1204 } 1205 1206 // Set the existing user's handle to NULL and set the valid_handle flag to false 1207 if err := s.db.Model(User{}).Where("id = ?", existingUser.ID).Update("handle", nil).Update("valid_handle", false).Error; err != nil { 1208 return nil, fmt.Errorf("failed to update outdated user's handle: %w", err) 1209 } 1210 1211 // Do the same thing for the ActorInfo if it exists 1212 if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", existingUser.ID).Update("handle", nil).Update("valid_handle", false).Error; err != nil { 1213 if !errors.Is(err, gorm.ErrRecordNotFound) { 1214 return nil, fmt.Errorf("failed to update outdated actorInfo's handle: %w", err) 1215 } 1216 } 1217 1218 // Create the new user 1219 if err := s.db.Create(&u).Error; err != nil { 1220 return nil, fmt.Errorf("failed to create user after handle conflict: %w", err) 1221 } 1222 1223 s.userCache.Remove(did) 1224 } else { 1225 return nil, fmt.Errorf("failed to create other pds user: %w", err) 1226 } 1227 } 1228 1229 // okay cool, its a user on a server we are peered with 1230 // lets make a local record of that user for the future 1231 subj := &models.ActorInfo{ 1232 Uid: u.ID, 1233 DisplayName: "", //*profile.DisplayName, 1234 Did: did, 1235 Type: "", 1236 PDS: peering.ID, 1237 ValidHandle: validHandle, 1238 } 1239 if validHandle { 1240 subj.Handle = sql.NullString{String: handle, Valid: true} 1241 } 1242 if err := s.db.Create(subj).Error; err != nil { 1243 return nil, err 1244 } 1245 1246 successfullyCreated = true 1247 1248 return subj, nil 1249} 1250 1251func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status string) error { 1252 ctx, span := tracer.Start(ctx, "UpdateAccountStatus") 1253 defer span.End() 1254 1255 span.SetAttributes( 1256 attribute.String("did", did), 1257 attribute.String("status", status), 1258 ) 1259 1260 u, err := bgs.lookupUserByDid(ctx, did) 1261 if err != nil { 1262 return err 1263 } 1264 1265 switch status { 1266 case events.AccountStatusActive: 1267 // Unset the PDS-specific status flags 1268 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil { 1269 return fmt.Errorf("failed to set user active status: %w", err) 1270 } 1271 u.SetUpstreamStatus(events.AccountStatusActive) 1272 case events.AccountStatusDeactivated: 1273 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil { 1274 return fmt.Errorf("failed to set user deactivation status: %w", err) 1275 } 1276 u.SetUpstreamStatus(events.AccountStatusDeactivated) 1277 case events.AccountStatusSuspended: 1278 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil { 1279 return fmt.Errorf("failed to set user suspension status: %w", err) 1280 } 1281 u.SetUpstreamStatus(events.AccountStatusSuspended) 1282 case events.AccountStatusTakendown: 1283 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil { 1284 return fmt.Errorf("failed to set user taken down status: %w", err) 1285 } 1286 u.SetUpstreamStatus(events.AccountStatusTakendown) 1287 1288 if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ 1289 "handle": nil, 1290 }).Error; err != nil { 1291 return err 1292 } 1293 case events.AccountStatusDeleted: 1294 if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{ 1295 "tombstoned": true, 1296 "handle": nil, 1297 "upstream_status": events.AccountStatusDeleted, 1298 }).Error; err != nil { 1299 return err 1300 } 1301 u.SetUpstreamStatus(events.AccountStatusDeleted) 1302 1303 if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{ 1304 "handle": nil, 1305 }).Error; err != nil { 1306 return err 1307 } 1308 1309 // delete data from carstore 1310 if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { 1311 // don't let a failure here prevent us from propagating this event 1312 bgs.log.Error("failed to delete user data from carstore", "err", err) 1313 } 1314 } 1315 1316 return nil 1317} 1318 1319func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error { 1320 u, err := bgs.lookupUserByDid(ctx, did) 1321 if err != nil { 1322 return err 1323 } 1324 1325 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil { 1326 return err 1327 } 1328 u.SetTakenDown(true) 1329 1330 if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { 1331 return err 1332 } 1333 1334 if err := bgs.events.TakeDownRepo(ctx, u.ID); err != nil { 1335 return err 1336 } 1337 1338 return nil 1339} 1340 1341func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error { 1342 u, err := bgs.lookupUserByDid(ctx, did) 1343 if err != nil { 1344 return err 1345 } 1346 1347 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil { 1348 return err 1349 } 1350 u.SetTakenDown(false) 1351 1352 return nil 1353} 1354 1355type revCheckResult struct { 1356 ai *models.ActorInfo 1357 err error 1358} 1359 1360func (bgs *BGS) LoadOrStoreResync(pds models.PDS) (PDSResync, bool) { 1361 bgs.pdsResyncsLk.Lock() 1362 defer bgs.pdsResyncsLk.Unlock() 1363 1364 if r, ok := bgs.pdsResyncs[pds.ID]; ok && r != nil { 1365 return *r, true 1366 } 1367 1368 r := PDSResync{ 1369 PDS: pds, 1370 Status: "started", 1371 StatusChangedAt: time.Now(), 1372 } 1373 1374 bgs.pdsResyncs[pds.ID] = &r 1375 1376 return r, false 1377} 1378 1379func (bgs *BGS) GetResync(pds models.PDS) (PDSResync, bool) { 1380 bgs.pdsResyncsLk.RLock() 1381 defer bgs.pdsResyncsLk.RUnlock() 1382 1383 if r, ok := bgs.pdsResyncs[pds.ID]; ok { 1384 return *r, true 1385 } 1386 1387 return PDSResync{}, false 1388} 1389 1390func (bgs *BGS) UpdateResync(resync PDSResync) { 1391 bgs.pdsResyncsLk.Lock() 1392 defer bgs.pdsResyncsLk.Unlock() 1393 1394 bgs.pdsResyncs[resync.PDS.ID] = &resync 1395} 1396 1397func (bgs *BGS) SetResyncStatus(id uint, status string) PDSResync { 1398 bgs.pdsResyncsLk.Lock() 1399 defer bgs.pdsResyncsLk.Unlock() 1400 1401 if r, ok := bgs.pdsResyncs[id]; ok { 1402 r.Status = status 1403 r.StatusChangedAt = time.Now() 1404 } 1405 1406 return *bgs.pdsResyncs[id] 1407} 1408 1409func (bgs *BGS) CompleteResync(resync PDSResync) { 1410 bgs.pdsResyncsLk.Lock() 1411 defer bgs.pdsResyncsLk.Unlock() 1412 1413 delete(bgs.pdsResyncs, resync.PDS.ID) 1414} 1415 1416func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { 1417 ctx, span := tracer.Start(ctx, "ResyncPDS") 1418 defer span.End() 1419 log := bgs.log.With("pds", pds.Host, "source", "resync_pds") 1420 resync, found := bgs.LoadOrStoreResync(pds) 1421 if found { 1422 return fmt.Errorf("resync already in progress") 1423 } 1424 defer bgs.CompleteResync(resync) 1425 1426 start := time.Now() 1427 1428 log.Warn("starting PDS resync") 1429 1430 host := "http://" 1431 if pds.SSL { 1432 host = "https://" 1433 } 1434 host += pds.Host 1435 1436 xrpcc := xrpc.Client{Host: host} 1437 bgs.Index.ApplyPDSClientSettings(&xrpcc) 1438 1439 limiter := rate.NewLimiter(rate.Limit(50), 1) 1440 cursor := "" 1441 limit := int64(500) 1442 1443 repos := []comatproto.SyncListRepos_Repo{} 1444 1445 pages := 0 1446 1447 resync = bgs.SetResyncStatus(pds.ID, "listing repos") 1448 for { 1449 pages++ 1450 if pages%10 == 0 { 1451 log.Warn("fetching PDS page during resync", "pages", pages, "total_repos", len(repos)) 1452 resync.NumRepoPages = pages 1453 resync.NumRepos = len(repos) 1454 bgs.UpdateResync(resync) 1455 } 1456 if err := limiter.Wait(ctx); err != nil { 1457 log.Error("failed to wait for rate limiter", "error", err) 1458 return fmt.Errorf("failed to wait for rate limiter: %w", err) 1459 } 1460 repoList, err := comatproto.SyncListRepos(ctx, &xrpcc, cursor, limit) 1461 if err != nil { 1462 log.Error("failed to list repos", "error", err) 1463 return fmt.Errorf("failed to list repos: %w", err) 1464 } 1465 1466 for _, r := range repoList.Repos { 1467 if r != nil { 1468 repos = append(repos, *r) 1469 } 1470 } 1471 1472 if repoList.Cursor == nil || *repoList.Cursor == "" { 1473 break 1474 } 1475 cursor = *repoList.Cursor 1476 } 1477 1478 resync.NumRepoPages = pages 1479 resync.NumRepos = len(repos) 1480 bgs.UpdateResync(resync) 1481 1482 repolistDone := time.Now() 1483 1484 log.Warn("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start)) 1485 resync = bgs.SetResyncStatus(pds.ID, "checking revs") 1486 1487 // run loop over repos with some concurrency 1488 sem := semaphore.NewWeighted(40) 1489 1490 // Check repo revs against our local copy and enqueue crawls for any that are out of date 1491 for i, r := range repos { 1492 if err := sem.Acquire(ctx, 1); err != nil { 1493 log.Error("failed to acquire semaphore", "error", err) 1494 continue 1495 } 1496 go func(r comatproto.SyncListRepos_Repo) { 1497 defer sem.Release(1) 1498 log := bgs.log.With("did", r.Did, "remote_rev", r.Rev) 1499 // Fetches the user if we have it, otherwise automatically enqueues it for crawling 1500 ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did) 1501 if err != nil { 1502 log.Error("failed to get user while resyncing PDS, we can't recrawl it", "error", err) 1503 return 1504 } 1505 1506 rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid) 1507 if err != nil { 1508 log.Warn("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid) 1509 err := bgs.Index.Crawler.Crawl(ctx, ai) 1510 if err != nil { 1511 log.Error("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did) 1512 } 1513 return 1514 } 1515 1516 if rev == "" || rev < r.Rev { 1517 log.Warn("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev) 1518 err := bgs.Index.Crawler.Crawl(ctx, ai) 1519 if err != nil { 1520 log.Error("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did) 1521 } 1522 return 1523 } 1524 }(r) 1525 if i%100 == 0 { 1526 if i%10_000 == 0 { 1527 log.Warn("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt)) 1528 } 1529 resync.NumReposChecked = i 1530 bgs.UpdateResync(resync) 1531 } 1532 } 1533 1534 resync.NumReposChecked = len(repos) 1535 bgs.UpdateResync(resync) 1536 1537 bgs.log.Warn("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1) 1538 1539 return nil 1540}