1package identity
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10
11 "github.com/hashicorp/golang-lru/v2/expirable"
12)
13
14// CacheDirectory is an implementation of identity.Directory with local cache of Handle and DID
15type CacheDirectory struct {
16 Inner Directory
17 ErrTTL time.Duration
18 InvalidHandleTTL time.Duration
19 handleCache *expirable.LRU[syntax.Handle, handleEntry]
20 identityCache *expirable.LRU[syntax.DID, identityEntry]
21 didLookupChans sync.Map
22 handleLookupChans sync.Map
23}
24
25type handleEntry struct {
26 Updated time.Time
27 DID syntax.DID
28 Err error
29}
30
31type identityEntry struct {
32 Updated time.Time
33 Identity *Identity
34 Err error
35}
36
37var _ Directory = (*CacheDirectory)(nil)
38
39// Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration.
40func NewCacheDirectory(inner Directory, capacity int, hitTTL, errTTL, invalidHandleTTL time.Duration) CacheDirectory {
41 return CacheDirectory{
42 ErrTTL: errTTL,
43 InvalidHandleTTL: invalidHandleTTL,
44 Inner: inner,
45 handleCache: expirable.NewLRU[syntax.Handle, handleEntry](capacity, nil, hitTTL),
46 identityCache: expirable.NewLRU[syntax.DID, identityEntry](capacity, nil, hitTTL),
47 }
48}
49
50func (d *CacheDirectory) isHandleStale(e *handleEntry) bool {
51 if e.Err != nil && time.Since(e.Updated) > d.ErrTTL {
52 return true
53 }
54 return false
55}
56
57func (d *CacheDirectory) isIdentityStale(e *identityEntry) bool {
58 if e.Err != nil && time.Since(e.Updated) > d.ErrTTL {
59 return true
60 }
61 if e.Identity != nil && e.Identity.Handle.IsInvalidHandle() && time.Since(e.Updated) > d.InvalidHandleTTL {
62 return true
63 }
64 return false
65}
66
67func (d *CacheDirectory) updateHandle(ctx context.Context, h syntax.Handle) handleEntry {
68 ident, err := d.Inner.LookupHandle(ctx, h)
69 if err != nil {
70 he := handleEntry{
71 Updated: time.Now(),
72 DID: "",
73 Err: err,
74 }
75 d.handleCache.Add(h, he)
76 return he
77 }
78
79 entry := identityEntry{
80 Updated: time.Now(),
81 Identity: ident,
82 Err: nil,
83 }
84 he := handleEntry{
85 Updated: time.Now(),
86 DID: ident.DID,
87 Err: nil,
88 }
89
90 d.identityCache.Add(ident.DID, entry)
91 d.handleCache.Add(ident.Handle, he)
92 return he
93}
94
95func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) {
96 h = h.Normalize()
97 if h.IsInvalidHandle() {
98 return "", fmt.Errorf("can not resolve handle: %w", ErrInvalidHandle)
99 }
100 start := time.Now()
101 entry, ok := d.handleCache.Get(h)
102 if ok && !d.isHandleStale(&entry) {
103 handleCacheHits.Inc()
104 handleResolution.WithLabelValues("lru", "cached").Inc()
105 handleResolutionDuration.WithLabelValues("lru", "cached").Observe(time.Since(start).Seconds())
106 return entry.DID, entry.Err
107 }
108 handleCacheMisses.Inc()
109
110 // Coalesce multiple requests for the same Handle
111 res := make(chan struct{})
112 val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res)
113 if loaded {
114 handleRequestsCoalesced.Inc()
115 handleResolution.WithLabelValues("lru", "coalesced").Inc()
116 handleResolutionDuration.WithLabelValues("lru", "coalesced").Observe(time.Since(start).Seconds())
117 // Wait for the result from the pending request
118 select {
119 case <-val.(chan struct{}):
120 // The result should now be in the cache
121 entry, ok := d.handleCache.Get(h)
122 if ok && !d.isHandleStale(&entry) {
123 return entry.DID, entry.Err
124 }
125 return "", fmt.Errorf("identity not found in cache after coalesce returned")
126 case <-ctx.Done():
127 return "", ctx.Err()
128 }
129 }
130
131 // Update the Handle Entry from PLC and cache the result
132 newEntry := d.updateHandle(ctx, h)
133
134 // Cleanup the coalesce map and close the results channel
135 d.handleLookupChans.Delete(h.String())
136 // Callers waiting will now get the result from the cache
137 close(res)
138
139 if newEntry.Err != nil {
140 handleResolution.WithLabelValues("lru", "error").Inc()
141 handleResolutionDuration.WithLabelValues("lru", "error").Observe(time.Since(start).Seconds())
142 return "", newEntry.Err
143 }
144 if newEntry.DID != "" {
145 handleResolution.WithLabelValues("lru", "success").Inc()
146 handleResolutionDuration.WithLabelValues("lru", "success").Observe(time.Since(start).Seconds())
147 return newEntry.DID, nil
148 }
149 return "", fmt.Errorf("unexpected control-flow error")
150}
151
152func (d *CacheDirectory) updateDID(ctx context.Context, did syntax.DID) identityEntry {
153 ident, err := d.Inner.LookupDID(ctx, did)
154 // persist the identity lookup error, instead of processing it immediately
155 entry := identityEntry{
156 Updated: time.Now(),
157 Identity: ident,
158 Err: err,
159 }
160 var he *handleEntry
161 // if *not* an error, then also update the handle cache
162 if nil == err && !ident.Handle.IsInvalidHandle() {
163 he = &handleEntry{
164 Updated: time.Now(),
165 DID: did,
166 Err: nil,
167 }
168 }
169
170 d.identityCache.Add(did, entry)
171 if he != nil {
172 d.handleCache.Add(ident.Handle, *he)
173 }
174 return entry
175}
176
177func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identity, error) {
178 id, _, err := d.LookupDIDWithCacheState(ctx, did)
179 return id, err
180}
181
182func (d *CacheDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax.DID) (*Identity, bool, error) {
183 start := time.Now()
184 entry, ok := d.identityCache.Get(did)
185 if ok && !d.isIdentityStale(&entry) {
186 identityCacheHits.Inc()
187 didResolution.WithLabelValues("lru", "cached").Inc()
188 didResolutionDuration.WithLabelValues("lru", "cached").Observe(time.Since(start).Seconds())
189 return entry.Identity, true, entry.Err
190 }
191 identityCacheMisses.Inc()
192
193 // Coalesce multiple requests for the same DID
194 res := make(chan struct{})
195 val, loaded := d.didLookupChans.LoadOrStore(did.String(), res)
196 if loaded {
197 identityRequestsCoalesced.Inc()
198 didResolution.WithLabelValues("lru", "coalesced").Inc()
199 didResolutionDuration.WithLabelValues("lru", "coalesced").Observe(time.Since(start).Seconds())
200 // Wait for the result from the pending request
201 select {
202 case <-val.(chan struct{}):
203 // The result should now be in the cache
204 entry, ok := d.identityCache.Get(did)
205 if ok && !d.isIdentityStale(&entry) {
206 return entry.Identity, false, entry.Err
207 }
208 return nil, false, fmt.Errorf("identity not found in cache after coalesce returned")
209 case <-ctx.Done():
210 return nil, false, ctx.Err()
211 }
212 }
213
214 // Update the Identity Entry from PLC and cache the result
215 newEntry := d.updateDID(ctx, did)
216
217 // Cleanup the coalesce map and close the results channel
218 d.didLookupChans.Delete(did.String())
219 // Callers waiting will now get the result from the cache
220 close(res)
221
222 if newEntry.Err != nil {
223 didResolution.WithLabelValues("lru", "error").Inc()
224 didResolutionDuration.WithLabelValues("lru", "error").Observe(time.Since(start).Seconds())
225 return nil, false, newEntry.Err
226 }
227 if newEntry.Identity != nil {
228 didResolution.WithLabelValues("lru", "success").Inc()
229 didResolutionDuration.WithLabelValues("lru", "success").Observe(time.Since(start).Seconds())
230 return newEntry.Identity, false, nil
231 }
232 return nil, false, fmt.Errorf("unexpected control-flow error")
233}
234
235func (d *CacheDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*Identity, error) {
236 ident, _, err := d.LookupHandleWithCacheState(ctx, h)
237 return ident, err
238}
239
240func (d *CacheDirectory) LookupHandleWithCacheState(ctx context.Context, h syntax.Handle) (*Identity, bool, error) {
241 h = h.Normalize()
242 did, err := d.ResolveHandle(ctx, h)
243 if err != nil {
244 return nil, false, err
245 }
246 ident, hit, err := d.LookupDIDWithCacheState(ctx, did)
247 if err != nil {
248 return nil, hit, err
249 }
250
251 declared, err := ident.DeclaredHandle()
252 if err != nil {
253 return nil, hit, fmt.Errorf("could not verify handle/DID mapping: %w", err)
254 }
255 // NOTE: DeclaredHandle() returns a normalized handle, and we already normalized 'h' above
256 if declared != h {
257 return nil, hit, fmt.Errorf("%w: %s != %s", ErrHandleMismatch, declared, h)
258 }
259 return ident, hit, nil
260}
261
262func (d *CacheDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*Identity, error) {
263 handle, err := a.AsHandle()
264 if nil == err { // if not an error, is a handle
265 return d.LookupHandle(ctx, handle)
266 }
267 did, err := a.AsDID()
268 if nil == err { // if not an error, is a DID
269 return d.LookupDID(ctx, did)
270 }
271 return nil, fmt.Errorf("at-identifier neither a Handle nor a DID")
272}
273
274func (d *CacheDirectory) Purge(ctx context.Context, atid syntax.AtIdentifier) error {
275 handle, err := atid.AsHandle()
276 if nil == err { // if not an error, is a handle
277 handle = handle.Normalize()
278 d.handleCache.Remove(handle)
279 return nil
280 }
281 did, err := atid.AsDID()
282 if nil == err { // if not an error, is a DID
283 d.identityCache.Remove(did)
284 return nil
285 }
286 return fmt.Errorf("at-identifier neither a Handle nor a DID")
287}