+10
-2
atkafka/api.go
+10
-2
atkafka/api.go
···
8
8
"github.com/bluesky-social/indigo/api/bsky"
9
9
"github.com/bluesky-social/indigo/xrpc"
10
10
lru "github.com/hashicorp/golang-lru/v2/expirable"
11
+
"golang.org/x/time/rate"
11
12
)
12
13
13
14
type ApiClient struct {
14
15
xrpcClient *xrpc.Client
15
16
16
-
profileCache *lru.LRU[string, *bsky.ActorDefs_ProfileViewDetailed]
17
+
profileCache *lru.LRU[string, *bsky.ActorDefs_ProfileViewDetailed]
18
+
profileLimiter *rate.Limiter
17
19
}
18
20
19
21
type ApiClientArgs struct {
···
30
32
profileCache: lru.NewLRU(100_000, func(key string, value *bsky.ActorDefs_ProfileViewDetailed) {
31
33
cacheSize.WithLabelValues("profile").Dec()
32
34
}, 10*time.Minute),
35
+
profileLimiter: rate.NewLimiter(rate.Limit(200), 100),
33
36
}
34
37
35
38
return &pc, nil
···
40
43
cached, ok := pc.profileCache.Get(did)
41
44
42
45
defer func() {
43
-
apiRequests.WithLabelValues("profile", status, fmt.Sprintf("%t", ok))
46
+
apiRequests.WithLabelValues("profile", status, fmt.Sprintf("%t", ok)).Inc()
44
47
}()
45
48
46
49
if ok {
47
50
status = "ok"
48
51
return cached, nil
52
+
}
53
+
54
+
if err := pc.profileLimiter.Wait(ctx); err != nil {
55
+
status = "limited"
56
+
return nil, fmt.Errorf("failed to get limit: %w", err)
49
57
}
50
58
51
59
resp, err := bsky.ActorGetProfile(ctx, pc.xrpcClient, did)