fork of indigo with slightly nicer lexgen
at main 8.7 kB view raw
1package identity 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 11 "github.com/hashicorp/golang-lru/v2/expirable" 12) 13 14// CacheDirectory is an implementation of identity.Directory with local cache of Handle and DID 15type CacheDirectory struct { 16 Inner Directory 17 ErrTTL time.Duration 18 InvalidHandleTTL time.Duration 19 handleCache *expirable.LRU[syntax.Handle, handleEntry] 20 identityCache *expirable.LRU[syntax.DID, identityEntry] 21 didLookupChans sync.Map 22 handleLookupChans sync.Map 23} 24 25type handleEntry struct { 26 Updated time.Time 27 DID syntax.DID 28 Err error 29} 30 31type identityEntry struct { 32 Updated time.Time 33 Identity *Identity 34 Err error 35} 36 37var _ Directory = (*CacheDirectory)(nil) 38 39// Capacity of zero means unlimited size. Similarly, ttl of zero means unlimited duration. 40func NewCacheDirectory(inner Directory, capacity int, hitTTL, errTTL, invalidHandleTTL time.Duration) CacheDirectory { 41 return CacheDirectory{ 42 ErrTTL: errTTL, 43 InvalidHandleTTL: invalidHandleTTL, 44 Inner: inner, 45 handleCache: expirable.NewLRU[syntax.Handle, handleEntry](capacity, nil, hitTTL), 46 identityCache: expirable.NewLRU[syntax.DID, identityEntry](capacity, nil, hitTTL), 47 } 48} 49 50func (d *CacheDirectory) isHandleStale(e *handleEntry) bool { 51 if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 52 return true 53 } 54 return false 55} 56 57func (d *CacheDirectory) isIdentityStale(e *identityEntry) bool { 58 if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 59 return true 60 } 61 if e.Identity != nil && e.Identity.Handle.IsInvalidHandle() && time.Since(e.Updated) > d.InvalidHandleTTL { 62 return true 63 } 64 return false 65} 66 67func (d *CacheDirectory) updateHandle(ctx context.Context, h syntax.Handle) handleEntry { 68 ident, err := d.Inner.LookupHandle(ctx, h) 69 if err != nil { 70 he := handleEntry{ 71 Updated: time.Now(), 72 DID: "", 73 Err: err, 74 } 75 d.handleCache.Add(h, he) 76 return he 77 } 78 79 entry := identityEntry{ 80 Updated: time.Now(), 81 Identity: ident, 82 Err: nil, 83 } 84 he := handleEntry{ 85 Updated: time.Now(), 86 DID: ident.DID, 87 Err: nil, 88 } 89 90 d.identityCache.Add(ident.DID, entry) 91 d.handleCache.Add(ident.Handle, he) 92 return he 93} 94 95func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { 96 h = h.Normalize() 97 if h.IsInvalidHandle() { 98 return "", fmt.Errorf("can not resolve handle: %w", ErrInvalidHandle) 99 } 100 start := time.Now() 101 entry, ok := d.handleCache.Get(h) 102 if ok && !d.isHandleStale(&entry) { 103 handleCacheHits.Inc() 104 handleResolution.WithLabelValues("lru", "cached").Inc() 105 handleResolutionDuration.WithLabelValues("lru", "cached").Observe(time.Since(start).Seconds()) 106 return entry.DID, entry.Err 107 } 108 handleCacheMisses.Inc() 109 110 // Coalesce multiple requests for the same Handle 111 res := make(chan struct{}) 112 val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res) 113 if loaded { 114 handleRequestsCoalesced.Inc() 115 handleResolution.WithLabelValues("lru", "coalesced").Inc() 116 handleResolutionDuration.WithLabelValues("lru", "coalesced").Observe(time.Since(start).Seconds()) 117 // Wait for the result from the pending request 118 select { 119 case <-val.(chan struct{}): 120 // The result should now be in the cache 121 entry, ok := d.handleCache.Get(h) 122 if ok && !d.isHandleStale(&entry) { 123 return entry.DID, entry.Err 124 } 125 return "", fmt.Errorf("identity not found in cache after coalesce returned") 126 case <-ctx.Done(): 127 return "", ctx.Err() 128 } 129 } 130 131 // Update the Handle Entry from PLC and cache the result 132 newEntry := d.updateHandle(ctx, h) 133 134 // Cleanup the coalesce map and close the results channel 135 d.handleLookupChans.Delete(h.String()) 136 // Callers waiting will now get the result from the cache 137 close(res) 138 139 if newEntry.Err != nil { 140 handleResolution.WithLabelValues("lru", "error").Inc() 141 handleResolutionDuration.WithLabelValues("lru", "error").Observe(time.Since(start).Seconds()) 142 return "", newEntry.Err 143 } 144 if newEntry.DID != "" { 145 handleResolution.WithLabelValues("lru", "success").Inc() 146 handleResolutionDuration.WithLabelValues("lru", "success").Observe(time.Since(start).Seconds()) 147 return newEntry.DID, nil 148 } 149 return "", fmt.Errorf("unexpected control-flow error") 150} 151 152func (d *CacheDirectory) updateDID(ctx context.Context, did syntax.DID) identityEntry { 153 ident, err := d.Inner.LookupDID(ctx, did) 154 // persist the identity lookup error, instead of processing it immediately 155 entry := identityEntry{ 156 Updated: time.Now(), 157 Identity: ident, 158 Err: err, 159 } 160 var he *handleEntry 161 // if *not* an error, then also update the handle cache 162 if nil == err && !ident.Handle.IsInvalidHandle() { 163 he = &handleEntry{ 164 Updated: time.Now(), 165 DID: did, 166 Err: nil, 167 } 168 } 169 170 d.identityCache.Add(did, entry) 171 if he != nil { 172 d.handleCache.Add(ident.Handle, *he) 173 } 174 return entry 175} 176 177func (d *CacheDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identity, error) { 178 id, _, err := d.LookupDIDWithCacheState(ctx, did) 179 return id, err 180} 181 182func (d *CacheDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax.DID) (*Identity, bool, error) { 183 start := time.Now() 184 entry, ok := d.identityCache.Get(did) 185 if ok && !d.isIdentityStale(&entry) { 186 identityCacheHits.Inc() 187 didResolution.WithLabelValues("lru", "cached").Inc() 188 didResolutionDuration.WithLabelValues("lru", "cached").Observe(time.Since(start).Seconds()) 189 return entry.Identity, true, entry.Err 190 } 191 identityCacheMisses.Inc() 192 193 // Coalesce multiple requests for the same DID 194 res := make(chan struct{}) 195 val, loaded := d.didLookupChans.LoadOrStore(did.String(), res) 196 if loaded { 197 identityRequestsCoalesced.Inc() 198 didResolution.WithLabelValues("lru", "coalesced").Inc() 199 didResolutionDuration.WithLabelValues("lru", "coalesced").Observe(time.Since(start).Seconds()) 200 // Wait for the result from the pending request 201 select { 202 case <-val.(chan struct{}): 203 // The result should now be in the cache 204 entry, ok := d.identityCache.Get(did) 205 if ok && !d.isIdentityStale(&entry) { 206 return entry.Identity, false, entry.Err 207 } 208 return nil, false, fmt.Errorf("identity not found in cache after coalesce returned") 209 case <-ctx.Done(): 210 return nil, false, ctx.Err() 211 } 212 } 213 214 // Update the Identity Entry from PLC and cache the result 215 newEntry := d.updateDID(ctx, did) 216 217 // Cleanup the coalesce map and close the results channel 218 d.didLookupChans.Delete(did.String()) 219 // Callers waiting will now get the result from the cache 220 close(res) 221 222 if newEntry.Err != nil { 223 didResolution.WithLabelValues("lru", "error").Inc() 224 didResolutionDuration.WithLabelValues("lru", "error").Observe(time.Since(start).Seconds()) 225 return nil, false, newEntry.Err 226 } 227 if newEntry.Identity != nil { 228 didResolution.WithLabelValues("lru", "success").Inc() 229 didResolutionDuration.WithLabelValues("lru", "success").Observe(time.Since(start).Seconds()) 230 return newEntry.Identity, false, nil 231 } 232 return nil, false, fmt.Errorf("unexpected control-flow error") 233} 234 235func (d *CacheDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*Identity, error) { 236 ident, _, err := d.LookupHandleWithCacheState(ctx, h) 237 return ident, err 238} 239 240func (d *CacheDirectory) LookupHandleWithCacheState(ctx context.Context, h syntax.Handle) (*Identity, bool, error) { 241 h = h.Normalize() 242 did, err := d.ResolveHandle(ctx, h) 243 if err != nil { 244 return nil, false, err 245 } 246 ident, hit, err := d.LookupDIDWithCacheState(ctx, did) 247 if err != nil { 248 return nil, hit, err 249 } 250 251 declared, err := ident.DeclaredHandle() 252 if err != nil { 253 return nil, hit, fmt.Errorf("could not verify handle/DID mapping: %w", err) 254 } 255 // NOTE: DeclaredHandle() returns a normalized handle, and we already normalized 'h' above 256 if declared != h { 257 return nil, hit, fmt.Errorf("%w: %s != %s", ErrHandleMismatch, declared, h) 258 } 259 return ident, hit, nil 260} 261 262func (d *CacheDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*Identity, error) { 263 handle, err := a.AsHandle() 264 if nil == err { // if not an error, is a handle 265 return d.LookupHandle(ctx, handle) 266 } 267 did, err := a.AsDID() 268 if nil == err { // if not an error, is a DID 269 return d.LookupDID(ctx, did) 270 } 271 return nil, fmt.Errorf("at-identifier neither a Handle nor a DID") 272} 273 274func (d *CacheDirectory) Purge(ctx context.Context, atid syntax.AtIdentifier) error { 275 handle, err := atid.AsHandle() 276 if nil == err { // if not an error, is a handle 277 handle = handle.Normalize() 278 d.handleCache.Remove(handle) 279 return nil 280 } 281 did, err := atid.AsDID() 282 if nil == err { // if not an error, is a DID 283 d.identityCache.Remove(did) 284 return nil 285 } 286 return fmt.Errorf("at-identifier neither a Handle nor a DID") 287}