1package main
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "log/slog"
9 "sync"
10 "time"
11
12 "github.com/bluesky-social/indigo/atproto/identity"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14
15 "github.com/go-redis/cache/v9"
16 "github.com/redis/go-redis/v9"
17)
18
19// This file is a fork of indigo:atproto/identity/redisdir. It stores raw DID documents, not identities, and implements `identity.Resolver`.
20
21// Uses redis as a cache for identity lookups.
22//
23// Includes an in-process LRU cache as well (provided by the redis client library), for hot key (identities).
24type RedisResolver struct {
25 Inner identity.Resolver
26 ErrTTL time.Duration
27 HitTTL time.Duration
28 InvalidHandleTTL time.Duration
29 Logger *slog.Logger
30
31 handleCache *cache.Cache
32 didCache *cache.Cache
33 didResolveChans sync.Map
34 handleResolveChans sync.Map
35}
36
37type handleEntry struct {
38 Updated time.Time
39 // needs to be pointer type, because unmarshalling empty string would be an error
40 DID *syntax.DID
41 Err error
42}
43
44type didEntry struct {
45 Updated time.Time
46 RawDoc json.RawMessage
47 Err error
48}
49
50var _ identity.Resolver = (*RedisResolver)(nil)
51
52// Creates a new caching `identity.Resolver` wrapper around an existing directory, using Redis and in-process LRU for caching.
53//
54// `redisURL` contains all the redis connection config options.
55// `hitTTL` and `errTTL` define how long successful and errored identity metadata should be cached (respectively). errTTL is expected to be shorted than hitTTL.
56// `lruSize` is the size of the in-process cache, for each of the handle and identity caches. 10000 is a reasonable default.
57//
58// NOTE: Errors returned may be inconsistent with the base directory, or between calls. This is because cached errors are serialized/deserialized and that may break equality checks.
59func NewRedisResolver(inner identity.Resolver, redisURL string, hitTTL, errTTL, invalidHandleTTL time.Duration, lruSize int) (*RedisResolver, error) {
60 opt, err := redis.ParseURL(redisURL)
61 if err != nil {
62 return nil, fmt.Errorf("could not configure redis identity cache: %w", err)
63 }
64 rdb := redis.NewClient(opt)
65 // check redis connection
66 _, err = rdb.Ping(context.TODO()).Result()
67 if err != nil {
68 return nil, fmt.Errorf("could not connect to redis identity cache: %w", err)
69 }
70 handleCache := cache.New(&cache.Options{
71 Redis: rdb,
72 LocalCache: cache.NewTinyLFU(lruSize, hitTTL),
73 })
74 didCache := cache.New(&cache.Options{
75 Redis: rdb,
76 LocalCache: cache.NewTinyLFU(lruSize, hitTTL),
77 })
78 return &RedisResolver{
79 Inner: inner,
80 ErrTTL: errTTL,
81 HitTTL: hitTTL,
82 InvalidHandleTTL: invalidHandleTTL,
83 handleCache: handleCache,
84 didCache: didCache,
85 }, nil
86}
87
88func (d *RedisResolver) isHandleStale(e *handleEntry) bool {
89 if e.Err != nil && time.Since(e.Updated) > d.ErrTTL {
90 return true
91 }
92 return false
93}
94
95func (d *RedisResolver) isDIDStale(e *didEntry) bool {
96 if e.Err != nil && time.Since(e.Updated) > d.ErrTTL {
97 return true
98 }
99 return false
100}
101
102func (d *RedisResolver) refreshHandle(ctx context.Context, h syntax.Handle) handleEntry {
103 start := time.Now()
104 did, err := d.Inner.ResolveHandle(ctx, h)
105 duration := time.Since(start)
106
107 if err != nil {
108 d.Logger.Info("handle resolution failed", "handle", h, "duration", duration, "err", err)
109 handleResolution.WithLabelValues("bluepages", "error").Inc()
110 handleResolutionDuration.WithLabelValues("bluepages", "error").Observe(time.Since(start).Seconds())
111 } else {
112 handleResolution.WithLabelValues("bluepages", "success").Inc()
113 handleResolutionDuration.WithLabelValues("bluepages", "success").Observe(time.Since(start).Seconds())
114 }
115 if duration.Seconds() > 5.0 {
116 d.Logger.Info("slow handle resolution", "handle", h, "duration", duration)
117 }
118
119 he := handleEntry{
120 Updated: time.Now(),
121 DID: &did,
122 Err: err,
123 }
124 err = d.handleCache.Set(&cache.Item{
125 Ctx: ctx,
126 Key: "bluepages/handle/" + h.String(),
127 Value: he,
128 TTL: d.ErrTTL,
129 })
130 if err != nil {
131 d.Logger.Error("identity cache write failed", "cache", "handle", "err", err)
132 }
133 return he
134}
135
136func (d *RedisResolver) refreshDID(ctx context.Context, did syntax.DID) didEntry {
137 start := time.Now()
138 rawDoc, err := d.Inner.ResolveDIDRaw(ctx, did)
139 duration := time.Since(start)
140
141 if err != nil {
142 d.Logger.Info("DID resolution failed", "did", did, "duration", duration, "err", err)
143 didResolution.WithLabelValues("bluepages", "error").Inc()
144 didResolutionDuration.WithLabelValues("bluepages", "error").Observe(time.Since(start).Seconds())
145 } else {
146 didResolution.WithLabelValues("bluepages", "success").Inc()
147 didResolutionDuration.WithLabelValues("bluepages", "success").Observe(time.Since(start).Seconds())
148 }
149 if duration.Seconds() > 5.0 {
150 d.Logger.Info("slow DID resolution", "did", did, "duration", duration)
151 }
152
153 // persist the DID lookup error, instead of processing it immediately
154 entry := didEntry{
155 Updated: time.Now(),
156 RawDoc: rawDoc,
157 Err: err,
158 }
159
160 err = d.didCache.Set(&cache.Item{
161 Ctx: ctx,
162 Key: "bluepages/did/" + did.String(),
163 Value: entry,
164 TTL: d.HitTTL,
165 })
166 if err != nil {
167 d.Logger.Error("DID cache write failed", "cache", "did", "did", did, "err", err)
168 }
169 return entry
170}
171
172func (d *RedisResolver) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) {
173 if h.IsInvalidHandle() {
174 return "", fmt.Errorf("can not resolve handle: %w", identity.ErrInvalidHandle)
175 }
176 h = h.Normalize()
177 var entry handleEntry
178 err := d.handleCache.Get(ctx, "bluepages/handle/"+h.String(), &entry)
179 if err != nil && err != cache.ErrCacheMiss {
180 return "", fmt.Errorf("identity cache read failed: %w", err)
181 }
182 if err == nil && !d.isHandleStale(&entry) { // if no error...
183 handleResolution.WithLabelValues("bluepages", "cached").Inc()
184 if entry.Err != nil {
185 return "", entry.Err
186 } else if entry.DID != nil {
187 return *entry.DID, nil
188 } else {
189 return "", errors.New("code flow error in redis identity directory")
190 }
191 }
192
193 // Coalesce multiple requests for the same Handle
194 res := make(chan struct{})
195 val, loaded := d.handleResolveChans.LoadOrStore(h.String(), res)
196 if loaded {
197 handleResolution.WithLabelValues("bluepages", "coalesced").Inc()
198 // Wait for the result from the pending request
199 select {
200 case <-val.(chan struct{}):
201 // The result should now be in the cache
202 err := d.handleCache.Get(ctx, "bluepages/handle/"+h.String(), entry)
203 if err != nil && err != cache.ErrCacheMiss {
204 return "", fmt.Errorf("identity cache read failed: %w", err)
205 }
206 if err == nil && !d.isHandleStale(&entry) { // if no error...
207 if entry.Err != nil {
208 return "", entry.Err
209 } else if entry.DID != nil {
210 return *entry.DID, nil
211 } else {
212 return "", errors.New("code flow error in redis identity directory")
213 }
214 }
215 return "", errors.New("identity not found in cache after coalesce returned")
216 case <-ctx.Done():
217 return "", ctx.Err()
218 }
219 }
220
221 // Update the Handle Entry from PLC and cache the result
222 newEntry := d.refreshHandle(ctx, h)
223
224 // Cleanup the coalesce map and close the results channel
225 d.handleResolveChans.Delete(h.String())
226 // Callers waiting will now get the result from the cache
227 close(res)
228
229 if newEntry.Err != nil {
230 return "", newEntry.Err
231 }
232 if newEntry.DID != nil {
233 return *newEntry.DID, nil
234 }
235 return "", errors.New("unexpected control-flow error")
236}
237
238func (d *RedisResolver) ResolveDIDRaw(ctx context.Context, did syntax.DID) (json.RawMessage, error) {
239 var entry didEntry
240 err := d.didCache.Get(ctx, "bluepages/did/"+did.String(), &entry)
241 if err != nil && err != cache.ErrCacheMiss {
242 return nil, fmt.Errorf("DID cache read failed: %w", err)
243 }
244 if err == nil && !d.isDIDStale(&entry) { // if no error...
245 didResolution.WithLabelValues("bluepages", "cached").Inc()
246 return entry.RawDoc, entry.Err
247 }
248
249 // Coalesce multiple requests for the same DID
250 res := make(chan struct{})
251 val, loaded := d.didResolveChans.LoadOrStore(did.String(), res)
252 if loaded {
253 didResolution.WithLabelValues("bluepages", "coalesced").Inc()
254 // Wait for the result from the pending request
255 select {
256 case <-val.(chan struct{}):
257 // The result should now be in the cache
258 err = d.didCache.Get(ctx, "bluepages/did/"+did.String(), &entry)
259 if err != nil && err != cache.ErrCacheMiss {
260 return nil, fmt.Errorf("DID cache read failed: %w", err)
261 }
262 if err == nil && !d.isDIDStale(&entry) { // if no error...
263 return entry.RawDoc, entry.Err
264 }
265 return nil, errors.New("DID not found in cache after coalesce returned")
266 case <-ctx.Done():
267 return nil, ctx.Err()
268 }
269 }
270
271 // Update the DID Entry and cache the result
272 newEntry := d.refreshDID(ctx, did)
273
274 // Cleanup the coalesce map and close the results channel
275 d.didResolveChans.Delete(did.String())
276 // Callers waiting will now get the result from the cache
277 close(res)
278
279 if newEntry.Err != nil {
280 return nil, newEntry.Err
281 }
282 if newEntry.RawDoc != nil {
283 return newEntry.RawDoc, nil
284 }
285 return nil, errors.New("unexpected control-flow error")
286}
287
288func (d *RedisResolver) ResolveDID(ctx context.Context, did syntax.DID) (*identity.DIDDocument, error) {
289 b, err := d.ResolveDIDRaw(ctx, did)
290 if err != nil {
291 return nil, err
292 }
293
294 var doc identity.DIDDocument
295 if err := json.Unmarshal(b, &doc); err != nil {
296 return nil, fmt.Errorf("%w: JSON DID document parse: %w", identity.ErrDIDResolutionFailed, err)
297 }
298 if doc.DID != did {
299 return nil, fmt.Errorf("document ID did not match DID")
300 }
301 return &doc, nil
302}
303
304func (d *RedisResolver) PurgeHandle(ctx context.Context, handle syntax.Handle) error {
305 handle = handle.Normalize()
306 err := d.handleCache.Delete(ctx, "bluepages/handle/"+handle.String())
307 if err == cache.ErrCacheMiss {
308 return nil
309 }
310 return err
311}
312
313func (d *RedisResolver) PurgeDID(ctx context.Context, did syntax.DID) error {
314 err := d.didCache.Delete(ctx, "bluepages/did/"+did.String())
315 if err == cache.ErrCacheMiss {
316 return nil
317 }
318 return err
319}