porting all github actions from bluesky-social/indigo to tangled CI
at main 10 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "log/slog" 9 "sync" 10 "time" 11 12 "github.com/bluesky-social/indigo/atproto/identity" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "github.com/go-redis/cache/v9" 16 "github.com/redis/go-redis/v9" 17) 18 19// This file is a fork of indigo:atproto/identity/redisdir. It stores raw DID documents, not identities, and implements `identity.Resolver`. 20 21// Uses redis as a cache for identity lookups. 22// 23// Includes an in-process LRU cache as well (provided by the redis client library), for hot key (identities). 24type RedisResolver struct { 25 Inner identity.Resolver 26 ErrTTL time.Duration 27 HitTTL time.Duration 28 InvalidHandleTTL time.Duration 29 Logger *slog.Logger 30 31 handleCache *cache.Cache 32 didCache *cache.Cache 33 didResolveChans sync.Map 34 handleResolveChans sync.Map 35} 36 37type handleEntry struct { 38 Updated time.Time 39 // needs to be pointer type, because unmarshalling empty string would be an error 40 DID *syntax.DID 41 Err error 42} 43 44type didEntry struct { 45 Updated time.Time 46 RawDoc json.RawMessage 47 Err error 48} 49 50var _ identity.Resolver = (*RedisResolver)(nil) 51 52// Creates a new caching `identity.Resolver` wrapper around an existing directory, using Redis and in-process LRU for caching. 53// 54// `redisURL` contains all the redis connection config options. 55// `hitTTL` and `errTTL` define how long successful and errored identity metadata should be cached (respectively). errTTL is expected to be shorted than hitTTL. 56// `lruSize` is the size of the in-process cache, for each of the handle and identity caches. 10000 is a reasonable default. 57// 58// NOTE: Errors returned may be inconsistent with the base directory, or between calls. This is because cached errors are serialized/deserialized and that may break equality checks. 59func NewRedisResolver(inner identity.Resolver, redisURL string, hitTTL, errTTL, invalidHandleTTL time.Duration, lruSize int) (*RedisResolver, error) { 60 opt, err := redis.ParseURL(redisURL) 61 if err != nil { 62 return nil, fmt.Errorf("could not configure redis identity cache: %w", err) 63 } 64 rdb := redis.NewClient(opt) 65 // check redis connection 66 _, err = rdb.Ping(context.TODO()).Result() 67 if err != nil { 68 return nil, fmt.Errorf("could not connect to redis identity cache: %w", err) 69 } 70 handleCache := cache.New(&cache.Options{ 71 Redis: rdb, 72 LocalCache: cache.NewTinyLFU(lruSize, hitTTL), 73 }) 74 didCache := cache.New(&cache.Options{ 75 Redis: rdb, 76 LocalCache: cache.NewTinyLFU(lruSize, hitTTL), 77 }) 78 return &RedisResolver{ 79 Inner: inner, 80 ErrTTL: errTTL, 81 HitTTL: hitTTL, 82 InvalidHandleTTL: invalidHandleTTL, 83 handleCache: handleCache, 84 didCache: didCache, 85 }, nil 86} 87 88func (d *RedisResolver) isHandleStale(e *handleEntry) bool { 89 if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 90 return true 91 } 92 return false 93} 94 95func (d *RedisResolver) isDIDStale(e *didEntry) bool { 96 if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 97 return true 98 } 99 return false 100} 101 102func (d *RedisResolver) refreshHandle(ctx context.Context, h syntax.Handle) handleEntry { 103 start := time.Now() 104 did, err := d.Inner.ResolveHandle(ctx, h) 105 duration := time.Since(start) 106 107 if err != nil { 108 d.Logger.Info("handle resolution failed", "handle", h, "duration", duration, "err", err) 109 handleResolution.WithLabelValues("bluepages", "error").Inc() 110 handleResolutionDuration.WithLabelValues("bluepages", "error").Observe(time.Since(start).Seconds()) 111 } else { 112 handleResolution.WithLabelValues("bluepages", "success").Inc() 113 handleResolutionDuration.WithLabelValues("bluepages", "success").Observe(time.Since(start).Seconds()) 114 } 115 if duration.Seconds() > 5.0 { 116 d.Logger.Info("slow handle resolution", "handle", h, "duration", duration) 117 } 118 119 he := handleEntry{ 120 Updated: time.Now(), 121 DID: &did, 122 Err: err, 123 } 124 err = d.handleCache.Set(&cache.Item{ 125 Ctx: ctx, 126 Key: "bluepages/handle/" + h.String(), 127 Value: he, 128 TTL: d.ErrTTL, 129 }) 130 if err != nil { 131 d.Logger.Error("identity cache write failed", "cache", "handle", "err", err) 132 } 133 return he 134} 135 136func (d *RedisResolver) refreshDID(ctx context.Context, did syntax.DID) didEntry { 137 start := time.Now() 138 rawDoc, err := d.Inner.ResolveDIDRaw(ctx, did) 139 duration := time.Since(start) 140 141 if err != nil { 142 d.Logger.Info("DID resolution failed", "did", did, "duration", duration, "err", err) 143 didResolution.WithLabelValues("bluepages", "error").Inc() 144 didResolutionDuration.WithLabelValues("bluepages", "error").Observe(time.Since(start).Seconds()) 145 } else { 146 didResolution.WithLabelValues("bluepages", "success").Inc() 147 didResolutionDuration.WithLabelValues("bluepages", "success").Observe(time.Since(start).Seconds()) 148 } 149 if duration.Seconds() > 5.0 { 150 d.Logger.Info("slow DID resolution", "did", did, "duration", duration) 151 } 152 153 // persist the DID lookup error, instead of processing it immediately 154 entry := didEntry{ 155 Updated: time.Now(), 156 RawDoc: rawDoc, 157 Err: err, 158 } 159 160 err = d.didCache.Set(&cache.Item{ 161 Ctx: ctx, 162 Key: "bluepages/did/" + did.String(), 163 Value: entry, 164 TTL: d.HitTTL, 165 }) 166 if err != nil { 167 d.Logger.Error("DID cache write failed", "cache", "did", "did", did, "err", err) 168 } 169 return entry 170} 171 172func (d *RedisResolver) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { 173 if h.IsInvalidHandle() { 174 return "", fmt.Errorf("can not resolve handle: %w", identity.ErrInvalidHandle) 175 } 176 h = h.Normalize() 177 var entry handleEntry 178 err := d.handleCache.Get(ctx, "bluepages/handle/"+h.String(), &entry) 179 if err != nil && err != cache.ErrCacheMiss { 180 return "", fmt.Errorf("identity cache read failed: %w", err) 181 } 182 if err == nil && !d.isHandleStale(&entry) { // if no error... 183 handleResolution.WithLabelValues("bluepages", "cached").Inc() 184 if entry.Err != nil { 185 return "", entry.Err 186 } else if entry.DID != nil { 187 return *entry.DID, nil 188 } else { 189 return "", errors.New("code flow error in redis identity directory") 190 } 191 } 192 193 // Coalesce multiple requests for the same Handle 194 res := make(chan struct{}) 195 val, loaded := d.handleResolveChans.LoadOrStore(h.String(), res) 196 if loaded { 197 handleResolution.WithLabelValues("bluepages", "coalesced").Inc() 198 // Wait for the result from the pending request 199 select { 200 case <-val.(chan struct{}): 201 // The result should now be in the cache 202 err := d.handleCache.Get(ctx, "bluepages/handle/"+h.String(), entry) 203 if err != nil && err != cache.ErrCacheMiss { 204 return "", fmt.Errorf("identity cache read failed: %w", err) 205 } 206 if err == nil && !d.isHandleStale(&entry) { // if no error... 207 if entry.Err != nil { 208 return "", entry.Err 209 } else if entry.DID != nil { 210 return *entry.DID, nil 211 } else { 212 return "", errors.New("code flow error in redis identity directory") 213 } 214 } 215 return "", errors.New("identity not found in cache after coalesce returned") 216 case <-ctx.Done(): 217 return "", ctx.Err() 218 } 219 } 220 221 // Update the Handle Entry from PLC and cache the result 222 newEntry := d.refreshHandle(ctx, h) 223 224 // Cleanup the coalesce map and close the results channel 225 d.handleResolveChans.Delete(h.String()) 226 // Callers waiting will now get the result from the cache 227 close(res) 228 229 if newEntry.Err != nil { 230 return "", newEntry.Err 231 } 232 if newEntry.DID != nil { 233 return *newEntry.DID, nil 234 } 235 return "", errors.New("unexpected control-flow error") 236} 237 238func (d *RedisResolver) ResolveDIDRaw(ctx context.Context, did syntax.DID) (json.RawMessage, error) { 239 var entry didEntry 240 err := d.didCache.Get(ctx, "bluepages/did/"+did.String(), &entry) 241 if err != nil && err != cache.ErrCacheMiss { 242 return nil, fmt.Errorf("DID cache read failed: %w", err) 243 } 244 if err == nil && !d.isDIDStale(&entry) { // if no error... 245 didResolution.WithLabelValues("bluepages", "cached").Inc() 246 return entry.RawDoc, entry.Err 247 } 248 249 // Coalesce multiple requests for the same DID 250 res := make(chan struct{}) 251 val, loaded := d.didResolveChans.LoadOrStore(did.String(), res) 252 if loaded { 253 didResolution.WithLabelValues("bluepages", "coalesced").Inc() 254 // Wait for the result from the pending request 255 select { 256 case <-val.(chan struct{}): 257 // The result should now be in the cache 258 err = d.didCache.Get(ctx, "bluepages/did/"+did.String(), &entry) 259 if err != nil && err != cache.ErrCacheMiss { 260 return nil, fmt.Errorf("DID cache read failed: %w", err) 261 } 262 if err == nil && !d.isDIDStale(&entry) { // if no error... 263 return entry.RawDoc, entry.Err 264 } 265 return nil, errors.New("DID not found in cache after coalesce returned") 266 case <-ctx.Done(): 267 return nil, ctx.Err() 268 } 269 } 270 271 // Update the DID Entry and cache the result 272 newEntry := d.refreshDID(ctx, did) 273 274 // Cleanup the coalesce map and close the results channel 275 d.didResolveChans.Delete(did.String()) 276 // Callers waiting will now get the result from the cache 277 close(res) 278 279 if newEntry.Err != nil { 280 return nil, newEntry.Err 281 } 282 if newEntry.RawDoc != nil { 283 return newEntry.RawDoc, nil 284 } 285 return nil, errors.New("unexpected control-flow error") 286} 287 288func (d *RedisResolver) ResolveDID(ctx context.Context, did syntax.DID) (*identity.DIDDocument, error) { 289 b, err := d.ResolveDIDRaw(ctx, did) 290 if err != nil { 291 return nil, err 292 } 293 294 var doc identity.DIDDocument 295 if err := json.Unmarshal(b, &doc); err != nil { 296 return nil, fmt.Errorf("%w: JSON DID document parse: %w", identity.ErrDIDResolutionFailed, err) 297 } 298 if doc.DID != did { 299 return nil, fmt.Errorf("document ID did not match DID") 300 } 301 return &doc, nil 302} 303 304func (d *RedisResolver) PurgeHandle(ctx context.Context, handle syntax.Handle) error { 305 handle = handle.Normalize() 306 err := d.handleCache.Delete(ctx, "bluepages/handle/"+handle.String()) 307 if err == cache.ErrCacheMiss { 308 return nil 309 } 310 return err 311} 312 313func (d *RedisResolver) PurgeDID(ctx context.Context, did syntax.DID) error { 314 err := d.didCache.Delete(ctx, "bluepages/did/"+did.String()) 315 if err == cache.ErrCacheMiss { 316 return nil 317 } 318 return err 319}