···6565 base := BaseDirectory{
6666 PLCURL: DefaultPLCURL,
6767 HTTPClient: http.Client{
6868- Timeout: time.Second * 15,
6868+ Timeout: time.Second * 10,
6969+ Transport: &http.Transport{
7070+ // would want this around 100ms for services doing lots of handle resolution. Impacts PLC connections as well, but not too bad.
7171+ IdleConnTimeout: time.Millisecond * 1000,
7272+ MaxIdleConns: 100,
7373+ },
6974 },
7075 Resolver: net.Resolver{
7176 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
7272- d := net.Dialer{Timeout: time.Second * 5}
7777+ d := net.Dialer{Timeout: time.Second * 3}
7378 return d.DialContext(ctx, network, address)
7479 },
7580 },
+74-7
bgs/bgs.go
···2727 "github.com/bluesky-social/indigo/models"
2828 "github.com/bluesky-social/indigo/repomgr"
2929 "github.com/bluesky-social/indigo/xrpc"
3030+ lru "github.com/hashicorp/golang-lru/v2"
3031 "golang.org/x/sync/semaphore"
3132 "golang.org/x/time/rate"
3233···87888889 // Management of Compaction
8990 compactor *Compactor
9191+9292+ // User cache
9393+ userCache *lru.Cache[string, *User]
9094}
91959296type PDSResync struct {
···136140 db.AutoMigrate(models.PDS{})
137141 db.AutoMigrate(models.DomainBan{})
138142143143+ uc, _ := lru.New[string, *User](1_000_000)
144144+139145 bgs := &BGS{
140146 Index: ix,
141147 db: db,
···151157 consumers: make(map[uint64]*SocketConsumer),
152158153159 pdsResyncs: make(map[uint]*PDSResync),
160160+161161+ userCache: uc,
154162 }
155163156164 ix.CreateExternalUser = bgs.createExternalUser
···521529522530 // UpstreamStatus is the state of the user as reported by the upstream PDS
523531 UpstreamStatus string `gorm:"index"`
532532+533533+ lk sync.Mutex
534534+}
535535+536536+func (u *User) SetTakenDown(v bool) {
537537+ u.lk.Lock()
538538+ defer u.lk.Unlock()
539539+ u.TakenDown = v
540540+}
541541+542542+func (u *User) GetTakenDown() bool {
543543+ u.lk.Lock()
544544+ defer u.lk.Unlock()
545545+ return u.TakenDown
546546+}
547547+548548+func (u *User) SetTombstoned(v bool) {
549549+ u.lk.Lock()
550550+ defer u.lk.Unlock()
551551+ u.Tombstoned = v
552552+}
553553+554554+func (u *User) GetTombstoned() bool {
555555+ u.lk.Lock()
556556+ defer u.lk.Unlock()
557557+ return u.Tombstoned
558558+}
559559+560560+func (u *User) SetUpstreamStatus(v string) {
561561+ u.lk.Lock()
562562+ defer u.lk.Unlock()
563563+ u.UpstreamStatus = v
564564+}
565565+566566+func (u *User) GetUpstreamStatus() string {
567567+ u.lk.Lock()
568568+ defer u.lk.Unlock()
569569+ return u.UpstreamStatus
524570}
525571526572type addTargetBody struct {
···771817 ctx, span := tracer.Start(ctx, "lookupUserByDid")
772818 defer span.End()
773819820820+ cu, ok := bgs.userCache.Get(did)
821821+ if ok {
822822+ return cu, nil
823823+ }
824824+774825 var u User
775826 if err := bgs.db.Find(&u, "did = ?", did).Error; err != nil {
776827 return nil, err
···779830 if u.ID == 0 {
780831 return nil, gorm.ErrRecordNotFound
781832 }
833833+834834+ bgs.userCache.Add(did, &u)
782835783836 return &u, nil
784837}
···823876 repoCommitsReceivedCounter.WithLabelValues(host.Host).Add(1)
824877 evt := env.RepoCommit
825878 log.Debugw("bgs got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo)
879879+880880+ s := time.Now()
826881 u, err := bgs.lookupUserByDid(ctx, evt.Repo)
882882+ userLookupDuration.Observe(time.Since(s).Seconds())
827883 if err != nil {
828884 if !errors.Is(err, gorm.ErrRecordNotFound) {
829885 return fmt.Errorf("looking up event user: %w", err)
830886 }
831887832888 newUsersDiscovered.Inc()
889889+ start := time.Now()
833890 subj, err := bgs.createExternalUser(ctx, evt.Repo)
891891+ newUserDiscoveryDuration.Observe(time.Since(start).Seconds())
834892 if err != nil {
835893 return fmt.Errorf("fed event create external user: %w", err)
836894 }
···840898 u.Did = evt.Repo
841899 }
842900843843- span.SetAttributes(attribute.String("upstream_status", u.UpstreamStatus))
901901+ ustatus := u.GetUpstreamStatus()
902902+ span.SetAttributes(attribute.String("upstream_status", ustatus))
844903845845- if u.TakenDown || u.UpstreamStatus == events.AccountStatusTakendown {
846846- span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.TakenDown))
904904+ if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
905905+ span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
847906 log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
848907 return nil
849908 }
850909851851- if u.UpstreamStatus == events.AccountStatusSuspended {
910910+ if ustatus == events.AccountStatusSuspended {
852911 log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
853912 return nil
854913 }
855914856856- if u.UpstreamStatus == events.AccountStatusDeactivated {
915915+ if ustatus == events.AccountStatusDeactivated {
857916 log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
858917 return nil
859918 }
···877936 }
878937 }
879938880880- if u.Tombstoned {
939939+ if u.GetTombstoned() {
881940 span.SetAttributes(attribute.Bool("tombstoned", true))
882941 // we've checked the authority of the users PDS, so reinstate the account
883942 if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
884943 return fmt.Errorf("failed to un-tombstone a user: %w", err)
885944 }
945945+ u.SetTombstoned(false)
886946887947 ai, err := bgs.Index.LookupUser(ctx, u.ID)
888948 if err != nil {
···10411101 return fmt.Errorf("failed to look up user by did: %w", err)
10421102 }
1043110310441044- if u.TakenDown {
11041104+ if u.GetTakenDown() {
10451105 shouldBeActive = false
10461106 status = &events.AccountStatusTakendown
10471107 }
···13701430 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil {
13711431 return fmt.Errorf("failed to set user active status: %w", err)
13721432 }
14331433+ u.SetUpstreamStatus(events.AccountStatusActive)
13731434 case events.AccountStatusDeactivated:
13741435 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil {
13751436 return fmt.Errorf("failed to set user deactivation status: %w", err)
13761437 }
14381438+ u.SetUpstreamStatus(events.AccountStatusDeactivated)
13771439 case events.AccountStatusSuspended:
13781440 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil {
13791441 return fmt.Errorf("failed to set user suspension status: %w", err)
13801442 }
14431443+ u.SetUpstreamStatus(events.AccountStatusSuspended)
13811444 case events.AccountStatusTakendown:
13821445 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil {
13831446 return fmt.Errorf("failed to set user taken down status: %w", err)
13841447 }
14481448+ u.SetUpstreamStatus(events.AccountStatusTakendown)
1385144913861450 if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
13871451 "handle": nil,
···13961460 }).Error; err != nil {
13971461 return err
13981462 }
14631463+ u.SetUpstreamStatus(events.AccountStatusDeleted)
1399146414001465 if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
14011466 "handle": nil,
···14221487 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil {
14231488 return err
14241489 }
14901490+ u.SetTakenDown(true)
1425149114261492 if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
14271493 return err
···14431509 if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil {
14441510 return err
14451511 }
15121512+ u.SetTakenDown(false)
1446151314471514 return nil
14481515}
+1-1
bgs/compactor.go
···349349 return state, nil
350350}
351351352352-func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {
352352+func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool) {
353353 ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo")
354354 defer span.End()
355355 log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
+18-15
bgs/handlers.go
···3434 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
3535 }
36363737- if u.Tombstoned {
3737+ if u.GetTombstoned() {
3838 return nil, fmt.Errorf("account was deleted")
3939 }
40404141- if u.TakenDown {
4141+ if u.GetTakenDown() {
4242 return nil, fmt.Errorf("account was taken down by the Relay")
4343 }
44444545- if u.UpstreamStatus == events.AccountStatusTakendown {
4545+ ustatus := u.GetUpstreamStatus()
4646+ if ustatus == events.AccountStatusTakendown {
4647 return nil, fmt.Errorf("account was taken down by its PDS")
4748 }
48494949- if u.UpstreamStatus == events.AccountStatusDeactivated {
5050+ if ustatus == events.AccountStatusDeactivated {
5051 return nil, fmt.Errorf("account is temporarily deactivated")
5152 }
52535353- if u.UpstreamStatus == events.AccountStatusSuspended {
5454+ if ustatus == events.AccountStatusSuspended {
5455 return nil, fmt.Errorf("account is suspended by its PDS")
5556 }
5657···9192 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
9293 }
93949494- if u.Tombstoned {
9595+ if u.GetTombstoned() {
9596 return nil, fmt.Errorf("account was deleted")
9697 }
97989898- if u.TakenDown {
9999+ if u.GetTakenDown() {
99100 return nil, fmt.Errorf("account was taken down by the Relay")
100101 }
101102102102- if u.UpstreamStatus == events.AccountStatusTakendown {
103103+ ustatus := u.GetUpstreamStatus()
104104+ if ustatus == events.AccountStatusTakendown {
103105 return nil, fmt.Errorf("account was taken down by its PDS")
104106 }
105107106106- if u.UpstreamStatus == events.AccountStatusDeactivated {
108108+ if ustatus == events.AccountStatusDeactivated {
107109 return nil, fmt.Errorf("account is temporarily deactivated")
108110 }
109111110110- if u.UpstreamStatus == events.AccountStatusSuspended {
112112+ if ustatus == events.AccountStatusSuspended {
111113 return nil, fmt.Errorf("account is suspended by its PDS")
112114 }
113115···253255 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
254256 }
255257256256- if u.Tombstoned {
258258+ if u.GetTombstoned() {
257259 return nil, fmt.Errorf("account was deleted")
258260 }
259261260260- if u.TakenDown {
262262+ if u.GetTakenDown() {
261263 return nil, fmt.Errorf("account was taken down by the Relay")
262264 }
263265264264- if u.UpstreamStatus == events.AccountStatusTakendown {
266266+ ustatus := u.GetUpstreamStatus()
267267+ if ustatus == events.AccountStatusTakendown {
265268 return nil, fmt.Errorf("account was taken down by its PDS")
266269 }
267270268268- if u.UpstreamStatus == events.AccountStatusDeactivated {
271271+ if ustatus == events.AccountStatusDeactivated {
269272 return nil, fmt.Errorf("account is temporarily deactivated")
270273 }
271274272272- if u.UpstreamStatus == events.AccountStatusSuspended {
275275+ if ustatus == events.AccountStatusSuspended {
273276 return nil, fmt.Errorf("account is suspended by its PDS")
274277 }
275278
+12
bgs/metrics.go
···8181 Buckets: prometheus.ExponentialBuckets(100, 10, 8),
8282}, []string{"code", "method", "path"})
83838484+var userLookupDuration = promauto.NewHistogram(prometheus.HistogramOpts{
8585+ Name: "relay_user_lookup_duration",
8686+ Help: "A histogram of user lookup latencies",
8787+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
8888+})
8989+9090+var newUserDiscoveryDuration = promauto.NewHistogram(prometheus.HistogramOpts{
9191+ Name: "relay_new_user_discovery_duration",
9292+ Help: "A histogram of new user discovery latencies",
9393+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
9494+})
9595+8496// MetricsMiddleware defines handler function for metrics middleware
8597func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc {
8698 return func(c echo.Context) error {
+24-13
carstore/bs.go
···6262}
63636464type FileCarStore struct {
6565- meta *CarStoreGormMeta
6666- rootDir string
6565+ meta *CarStoreGormMeta
6666+ rootDirs []string
67676868 lscLk sync.Mutex
6969 lastShardCache map[models.Uid]*CarShard
7070}
71717272-func NewCarStore(meta *gorm.DB, root string) (CarStore, error) {
7373- if _, err := os.Stat(root); err != nil {
7474- if !os.IsNotExist(err) {
7575- return nil, err
7676- }
7272+func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) {
7373+ for _, root := range roots {
7474+ if _, err := os.Stat(root); err != nil {
7575+ if !os.IsNotExist(err) {
7676+ return nil, err
7777+ }
77787878- if err := os.Mkdir(root, 0775); err != nil {
7979- return nil, err
7979+ if err := os.Mkdir(root, 0775); err != nil {
8080+ return nil, err
8181+ }
8082 }
8183 }
8284 if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil {
···88908991 return &FileCarStore{
9092 meta: &CarStoreGormMeta{meta: meta},
9191- rootDir: root,
9393+ rootDirs: roots,
9294 lastShardCache: make(map[models.Uid]*CarShard),
9395 }, nil
9496}
···541543func fnameForShard(user models.Uid, seq int) string {
542544 return fmt.Sprintf("sh-%d-%d", user, seq)
543545}
546546+547547+func (cs *FileCarStore) dirForUser(user models.Uid) string {
548548+ return cs.rootDirs[int(user)%len(cs.rootDirs)]
549549+}
550550+544551func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
545552 // TODO: some overwrite protections
546546- fname := filepath.Join(cs.rootDir, fnameForShard(user, seq))
553553+ fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq))
547554 fi, err := os.Create(fname)
548555 if err != nil {
549556 return nil, "", err
···557564 defer span.End()
558565559566 // TODO: some overwrite protections
560560- fname := filepath.Join(cs.rootDir, fnameForShard(user, seq))
567567+ fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq))
561568 if err := os.WriteFile(fname, data, 0664); err != nil {
562569 return "", err
563570 }
···638645 offset += nw
639646 }
640647648648+ start := time.Now()
641649 path, err := cs.writeNewShardFile(ctx, user, seq, buf.Bytes())
642650 if err != nil {
643651 return nil, fmt.Errorf("failed to write shard file: %w", err)
644652 }
653653+ writeShardFileDuration.Observe(time.Since(start).Seconds())
645654646655 shard := CarShard{
647656 Root: models.DbCID{CID: root},
···652661 Rev: rev,
653662 }
654663664664+ start = time.Now()
655665 if err := cs.putShard(ctx, &shard, brefs, rmcids, false); err != nil {
656666 return nil, err
657667 }
668668+ writeShardMetadataDuration.Observe(time.Since(start).Seconds())
658669659670 return buf.Bytes(), nil
660671}
···982993 // TODO: some overwrite protections
983994 // NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary".
984995 // This creates "sh-%d-%d%s" with some random stuff in the last position
985985- fi, err := os.CreateTemp(cs.rootDir, fnameForShard(user, seq))
996996+ fi, err := os.CreateTemp(cs.dirForUser(user), fnameForShard(user, seq))
986997 if err != nil {
987998 return nil, "", err
988999 }