porting all github actions from bluesky-social/indigo to tangled CI

bluepages: identity directory service (#940)

Caching identity service.

- [x] rebase on https://github.com/bluesky-social/indigo/pull/872
- [x] review the above, ensure it still makes sense for this application
- [x] actually use redis caching directory
- [x] client package (which implements `identity.Directory` interface)
- [x] config, rate-limit, or admin auth for refreshIdentity (?)
- [x] basic firehose consumer (for `#identity` events)

NOTE: this was previously called `domesday`, it has been updated to
`bluepages` to be a bit more legible.

authored by bnewbold.net and committed by GitHub a2e0aaff baecf1c7

+52
.github/workflows/container-bluepages-aws.yaml
··· 1 + name: container-bluepages-aws 2 + on: [push] 3 + env: 4 + REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} 5 + USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} 6 + PASSWORD: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_PASSWORD }} 7 + # github.repository as <account>/<repo> 8 + IMAGE_NAME: bluepages 9 + 10 + jobs: 11 + container-bluepages-aws: 12 + if: github.repository == 'bluesky-social/indigo' 13 + runs-on: ubuntu-latest 14 + permissions: 15 + contents: read 16 + packages: write 17 + id-token: write 18 + 19 + steps: 20 + - name: Checkout repository 21 + uses: actions/checkout@v3 22 + 23 + - name: Setup Docker buildx 24 + uses: docker/setup-buildx-action@v1 25 + 26 + - name: Log into registry ${{ env.REGISTRY }} 27 + uses: docker/login-action@v2 28 + with: 29 + registry: ${{ env.REGISTRY }} 30 + username: ${{ env.USERNAME }} 31 + password: ${{ env.PASSWORD }} 32 + 33 + - name: Extract Docker metadata 34 + id: meta 35 + uses: docker/metadata-action@v4 36 + with: 37 + images: | 38 + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} 39 + tags: | 40 + type=sha,enable=true,priority=100,prefix=,suffix=,format=long 41 + 42 + - name: Build and push Docker image 43 + id: build-and-push 44 + uses: docker/build-push-action@v4 45 + with: 46 + context: . 47 + file: ./cmd/bluepages/Dockerfile 48 + push: ${{ github.event_name != 'pull_request' }} 49 + tags: ${{ steps.meta.outputs.tags }} 50 + labels: ${{ steps.meta.outputs.labels }} 51 + cache-from: type=gha 52 + cache-to: type=gha,mode=max
+1
HACKING.md
··· 14 14 - `cmd/sonar`: event stream monitoring tool 15 15 - `cmd/hepa`: auto-moderation rule engine service 16 16 - `cmd/rainbow`: firehose fanout service 17 + - `cmd/bluepages`: identity directory service 17 18 - `gen`: dev tool to run CBOR type codegen 18 19 19 20 Packages:
+4
Makefile
··· 89 89 GOLOG_LOG_LEVEL=info go run ./cmd/bigsky --admin-key localdev 90 90 # --crawl-insecure-ws 91 91 92 + .PHONY: run-dev-ident 93 + run-dev-ident: .env ## Runs 'bluepages' identity directory for local dev 94 + GOLOG_LOG_LEVEL=info go run ./cmd/bluepages serve 95 + 92 96 .PHONY: build-relay-image 93 97 build-relay-image: ## Builds 'bigsky' Relay docker image 94 98 docker build -t bigsky -f cmd/bigsky/Dockerfile .
+14
api/atproto/identitydefs.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package atproto 4 + 5 + // schema: com.atproto.identity.defs 6 + 7 + // IdentityDefs_IdentityInfo is a "identityInfo" in the com.atproto.identity.defs schema. 8 + type IdentityDefs_IdentityInfo struct { 9 + Did string `json:"did" cborgen:"did"` 10 + // didDoc: The complete DID document for the identity. 11 + DidDoc interface{} `json:"didDoc" cborgen:"didDoc"` 12 + // handle: The validated handle of the account; or 'handle.invalid' if the handle did not bi-directionally match the DID document. 13 + Handle string `json:"handle" cborgen:"handle"` 14 + }
+26
api/atproto/identityrefreshIdentity.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package atproto 4 + 5 + // schema: com.atproto.identity.refreshIdentity 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/xrpc" 11 + ) 12 + 13 + // IdentityRefreshIdentity_Input is the input argument to a com.atproto.identity.refreshIdentity call. 14 + type IdentityRefreshIdentity_Input struct { 15 + Identifier string `json:"identifier" cborgen:"identifier"` 16 + } 17 + 18 + // IdentityRefreshIdentity calls the XRPC method "com.atproto.identity.refreshIdentity". 19 + func IdentityRefreshIdentity(ctx context.Context, c *xrpc.Client, input *IdentityRefreshIdentity_Input) (*IdentityDefs_IdentityInfo, error) { 20 + var out IdentityDefs_IdentityInfo 21 + if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.identity.refreshIdentity", nil, input, &out); err != nil { 22 + return nil, err 23 + } 24 + 25 + return &out, nil 26 + }
+33
api/atproto/identityresolveDid.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package atproto 4 + 5 + // schema: com.atproto.identity.resolveDid 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/xrpc" 11 + ) 12 + 13 + // IdentityResolveDid_Output is the output of a com.atproto.identity.resolveDid call. 14 + type IdentityResolveDid_Output struct { 15 + // didDoc: The complete DID document for the identity. 16 + DidDoc interface{} `json:"didDoc" cborgen:"didDoc"` 17 + } 18 + 19 + // IdentityResolveDid calls the XRPC method "com.atproto.identity.resolveDid". 20 + // 21 + // did: DID to resolve. 22 + func IdentityResolveDid(ctx context.Context, c *xrpc.Client, did string) (*IdentityResolveDid_Output, error) { 23 + var out IdentityResolveDid_Output 24 + 25 + params := map[string]interface{}{ 26 + "did": did, 27 + } 28 + if err := c.Do(ctx, xrpc.Query, "", "com.atproto.identity.resolveDid", params, nil, &out); err != nil { 29 + return nil, err 30 + } 31 + 32 + return &out, nil 33 + }
+27
api/atproto/identityresolveIdentity.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package atproto 4 + 5 + // schema: com.atproto.identity.resolveIdentity 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/xrpc" 11 + ) 12 + 13 + // IdentityResolveIdentity calls the XRPC method "com.atproto.identity.resolveIdentity". 14 + // 15 + // identifier: Handle or DID to resolve. 16 + func IdentityResolveIdentity(ctx context.Context, c *xrpc.Client, identifier string) (*IdentityDefs_IdentityInfo, error) { 17 + var out IdentityDefs_IdentityInfo 18 + 19 + params := map[string]interface{}{ 20 + "identifier": identifier, 21 + } 22 + if err := c.Do(ctx, xrpc.Query, "", "com.atproto.identity.resolveIdentity", params, nil, &out); err != nil { 23 + return nil, err 24 + } 25 + 26 + return &out, nil 27 + }
+229
atproto/identity/apidir/apidir.go
··· 1 + package apidir 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "time" 11 + 12 + "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + 15 + "github.com/carlmjohnson/versioninfo" 16 + ) 17 + 18 + // Does HTTP requests to an identity server, using standard Lexicon endpoints 19 + type APIDirectory struct { 20 + Client *http.Client 21 + // API service to make queries to. Includes schema, hostname, and port, but no path or trailing slash. Eg: "http://localhost:6600" 22 + Host string 23 + UserAgent string 24 + } 25 + 26 + var _ identity.Directory = (*APIDirectory)(nil) 27 + var _ identity.Resolver = (*APIDirectory)(nil) 28 + 29 + type identityBody struct { 30 + DID syntax.DID `json:"did"` 31 + Handle syntax.Handle `json:"handle"` 32 + DIDDoc json.RawMessage `json:"didDoc"` 33 + } 34 + 35 + type didBody struct { 36 + DIDDoc json.RawMessage `json:"didDoc,omitempty"` 37 + } 38 + 39 + type handleBody struct { 40 + DID syntax.DID `json:"did"` 41 + } 42 + 43 + type errorBody struct { 44 + Name string `json:"error"` 45 + Message string `json:"message,omitempty"` 46 + } 47 + 48 + func NewAPIDirectory(host string) APIDirectory { 49 + return APIDirectory{ 50 + Client: &http.Client{ 51 + Timeout: time.Second * 10, 52 + Transport: &http.Transport{ 53 + IdleConnTimeout: time.Millisecond * 100, 54 + MaxIdleConns: 100, 55 + }, 56 + }, 57 + Host: host, 58 + UserAgent: "indigo-apidir/" + versioninfo.Short(), 59 + } 60 + } 61 + 62 + // body: struct pointer which can be `json.Unmarshal()` 63 + func (dir *APIDirectory) apiGet(ctx context.Context, u string, body any, errFail error, errNotFound error) error { 64 + 65 + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) 66 + if err != nil { 67 + return fmt.Errorf("constructing HTTP request: %w", err) 68 + } 69 + if dir.UserAgent != "" { 70 + req.Header.Set("User-Agent", dir.UserAgent) 71 + } 72 + resp, err := dir.Client.Do(req) 73 + if err != nil { 74 + return fmt.Errorf("%w: identity service HTTP: %w", errFail, err) 75 + } 76 + defer resp.Body.Close() 77 + b, err := io.ReadAll(resp.Body) 78 + if err != nil { 79 + return fmt.Errorf("%w: identity service HTTP: %w", errFail, err) 80 + } 81 + 82 + if resp.StatusCode == http.StatusNotFound { 83 + return errNotFound 84 + } 85 + if resp.StatusCode != http.StatusOK { 86 + // TODO: parse error body, handle more error conditions 87 + return fmt.Errorf("%w: identity service HTTP: %d", errFail, resp.StatusCode) 88 + } 89 + 90 + if err := json.Unmarshal(b, body); err != nil { 91 + return fmt.Errorf("%w: identity service HTTP: %w", errFail, err) 92 + } 93 + return nil 94 + } 95 + 96 + // body: struct pointer which can be `json.Unmarshal()` 97 + func (dir *APIDirectory) apiPost(ctx context.Context, u string, reqBody []byte, body any, errFail error, errNotFound error) error { 98 + req, err := http.NewRequestWithContext(ctx, "POST", u, bytes.NewBuffer(reqBody)) 99 + if err != nil { 100 + return fmt.Errorf("constructing HTTP request: %w", err) 101 + } 102 + req.Header.Set("Content-Type", "application/json") 103 + if dir.UserAgent != "" { 104 + req.Header.Set("User-Agent", dir.UserAgent) 105 + } 106 + resp, err := dir.Client.Do(req) 107 + if err != nil { 108 + return fmt.Errorf("%w: identity service HTTP: %w", errFail, err) 109 + } 110 + defer resp.Body.Close() 111 + b, err := io.ReadAll(resp.Body) 112 + if err != nil { 113 + return fmt.Errorf("%w: identity service HTTP: %w", errFail, err) 114 + } 115 + 116 + if resp.StatusCode == http.StatusNotFound { 117 + return errNotFound 118 + } 119 + if resp.StatusCode != http.StatusOK { 120 + // TODO: parse error body, handle more error conditions 121 + return fmt.Errorf("%w: identity service HTTP: %d", errFail, resp.StatusCode) 122 + } 123 + 124 + if err := json.Unmarshal(b, body); err != nil { 125 + return fmt.Errorf("%w: identity service HTTP: %w", errFail, err) 126 + } 127 + return nil 128 + } 129 + 130 + func (dir *APIDirectory) ResolveDIDRaw(ctx context.Context, did syntax.DID) (json.RawMessage, error) { 131 + var body didBody 132 + u := dir.Host + "/xrpc/com.atproto.identity.resolveDid?did=" + did.String() 133 + 134 + start := time.Now() 135 + err := dir.apiGet(ctx, u, &body, identity.ErrDIDResolutionFailed, identity.ErrDIDNotFound) 136 + if err != nil { 137 + didResolution.WithLabelValues("apidir", "error").Inc() 138 + didResolutionDuration.WithLabelValues("apidir", "error").Observe(time.Since(start).Seconds()) 139 + return nil, err 140 + } 141 + didResolution.WithLabelValues("apidir", "success").Inc() 142 + didResolutionDuration.WithLabelValues("apidir", "success").Observe(time.Since(start).Seconds()) 143 + 144 + return body.DIDDoc, nil 145 + } 146 + 147 + func (dir *APIDirectory) ResolveDID(ctx context.Context, did syntax.DID) (*identity.DIDDocument, error) { 148 + raw, err := dir.ResolveDIDRaw(ctx, did) 149 + if err != nil { 150 + return nil, err 151 + } 152 + 153 + var doc identity.DIDDocument 154 + if err := json.Unmarshal(raw, &doc); err != nil { 155 + return nil, fmt.Errorf("%w: JSON DID document parse: %w", identity.ErrDIDResolutionFailed, err) 156 + } 157 + return &doc, nil 158 + } 159 + 160 + func (dir *APIDirectory) ResolveHandle(ctx context.Context, handle syntax.Handle) (syntax.DID, error) { 161 + var body handleBody 162 + u := dir.Host + "/xrpc/com.atproto.identity.resolveHandle?handle=" + handle.String() 163 + 164 + start := time.Now() 165 + err := dir.apiGet(ctx, u, &body, identity.ErrHandleResolutionFailed, identity.ErrHandleNotFound) 166 + if err != nil { 167 + handleResolution.WithLabelValues("apidir", "error").Inc() 168 + handleResolutionDuration.WithLabelValues("apidir", "error").Observe(time.Since(start).Seconds()) 169 + return "", err 170 + } 171 + handleResolution.WithLabelValues("apidir", "success").Inc() 172 + handleResolutionDuration.WithLabelValues("apidir", "success").Observe(time.Since(start).Seconds()) 173 + 174 + return body.DID, nil 175 + } 176 + 177 + func (dir *APIDirectory) Lookup(ctx context.Context, atid syntax.AtIdentifier) (*identity.Identity, error) { 178 + var body identityBody 179 + u := dir.Host + "/xrpc/com.atproto.identity.resolveIdentity?identifier=" + atid.String() 180 + 181 + // TODO: detect atid type, use that for errors? or just assume DID? 182 + start := time.Now() 183 + err := dir.apiGet(ctx, u, &body, identity.ErrDIDResolutionFailed, identity.ErrDIDNotFound) 184 + if err != nil { 185 + identityResolution.WithLabelValues("apidir", "error").Inc() 186 + identityResolutionDuration.WithLabelValues("apidir", "error").Observe(time.Since(start).Seconds()) 187 + return nil, err 188 + } 189 + identityResolution.WithLabelValues("apidir", "success").Inc() 190 + identityResolutionDuration.WithLabelValues("apidir", "success").Observe(time.Since(start).Seconds()) 191 + 192 + var doc identity.DIDDocument 193 + if err := json.Unmarshal(body.DIDDoc, &doc); err != nil { 194 + return nil, fmt.Errorf("%w: JSON DID document parse: %w", identity.ErrDIDResolutionFailed, err) 195 + } 196 + 197 + ident := identity.ParseIdentity(&doc) 198 + ident.Handle = body.Handle 199 + 200 + return &ident, nil 201 + } 202 + 203 + func (dir *APIDirectory) LookupHandle(ctx context.Context, handle syntax.Handle) (*identity.Identity, error) { 204 + return dir.Lookup(ctx, handle.AtIdentifier()) 205 + } 206 + 207 + func (dir *APIDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { 208 + return dir.Lookup(ctx, did.AtIdentifier()) 209 + } 210 + 211 + func (dir *APIDirectory) Purge(ctx context.Context, atid syntax.AtIdentifier) error { 212 + 213 + input := map[string]string{ 214 + "identifier": atid.String(), 215 + } 216 + reqBody, err := json.Marshal(input) 217 + if err != nil { 218 + return err 219 + } 220 + 221 + var body identityBody 222 + u := dir.Host + "/xrpc/com.atproto.identity.refreshIdentity" 223 + 224 + if err := dir.apiPost(ctx, u, reqBody, &body, identity.ErrDIDResolutionFailed, identity.ErrDIDNotFound); err != nil { 225 + return err 226 + } 227 + 228 + return nil 229 + }
+12
atproto/identity/apidir/doc.go
··· 1 + /* 2 + Identity Directory implementation which makes HTTP requests to a dedicated identity service. 3 + 4 + Implements both `identity.Directory` (Lookup methods) and `identity.Resolver` (Resolve methods). You may want to wrap this with a small in-process cache, eg `identity.CacheDirectory`. 5 + 6 + Makes use of standard Lexicons: 7 + 8 + - com.atproto.identity.resolveHandle 9 + - com.atproto.identity.resolveDid 10 + - com.atproto.identity.resolveIdentity 11 + */ 12 + package apidir
+41
atproto/identity/apidir/examples_test.go
··· 1 + package apidir 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + ) 9 + 10 + func ExampleAPIDirectory() { 11 + // don't run this as a CI test! 12 + //return 13 + 14 + ctx := context.Background() 15 + 16 + // will connect to the provided identity server (eg, a 'bluepages' instance) 17 + dir := NewAPIDirectory("http://localhost:6600") 18 + 19 + handle, _ := syntax.ParseHandle("atproto.com") 20 + did, _ := syntax.ParseDID("did:plc:ewvi7nxzyoun6zhxrhs64oiz") 21 + 22 + // low-level resolution of a handle (`identity.Resolver` interface) 23 + atprotoDID, _ := dir.ResolveHandle(ctx, handle) 24 + fmt.Println(atprotoDID) 25 + 26 + // low-level DID document resolution (`identity.Resolver` interface) 27 + doc, err := dir.ResolveDID(ctx, did) 28 + if err != nil { 29 + panic(err) 30 + } 31 + fmt.Println(doc.Service) 32 + 33 + // higher-level identity resolution with accessors (`identity.Directory` interface) 34 + ident, _ := dir.LookupHandle(ctx, handle) 35 + fmt.Println(ident.PDSEndpoint()) 36 + 37 + /// Output: 38 + // did:plc:ewvi7nxzyoun6zhxrhs64oiz 39 + // [{#atproto_pds AtprotoPersonalDataServer https://enoki.us-east.host.bsky.network}] 40 + // https://enoki.us-east.host.bsky.network 41 + }
+37
atproto/identity/apidir/live_test.go
··· 1 + package apidir 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + 9 + "github.com/stretchr/testify/assert" 10 + ) 11 + 12 + func TestBasicLookups(t *testing.T) { 13 + t.Skip("skipping live network test") 14 + assert := assert.New(t) 15 + ctx := context.Background() 16 + var err error 17 + 18 + dir := NewAPIDirectory("http://localhost:6600") 19 + 20 + _, err = dir.LookupDID(ctx, syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz")) 21 + assert.NoError(err) 22 + 23 + _, err = dir.ResolveDID(ctx, syntax.DID("did:plc:ewvi7nxzyoun6zhxrhs64oiz")) 24 + assert.NoError(err) 25 + 26 + _, err = dir.LookupHandle(ctx, syntax.Handle("atproto.com")) 27 + assert.NoError(err) 28 + 29 + _, err = dir.ResolveHandle(ctx, syntax.Handle("atproto.com")) 30 + assert.NoError(err) 31 + 32 + _, err = dir.LookupHandle(ctx, syntax.Handle("dummy-handle.atproto.com")) 33 + assert.Error(err) 34 + 35 + _, err = dir.ResolveHandle(ctx, syntax.Handle("dummy-handle.atproto.com")) 36 + assert.Error(err) 37 + }
+39
atproto/identity/apidir/metrics.go
··· 1 + package apidir 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + var handleResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 9 + Name: "atproto_identity_apidir_resolve_handle", 10 + Help: "ATProto handle resolutions", 11 + }, []string{"directory", "status"}) 12 + 13 + var handleResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 14 + Name: "atproto_identity_apidir_resolve_handle_duration", 15 + Help: "Time to resolve a handle", 16 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 17 + }, []string{"directory", "status"}) 18 + 19 + var didResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 20 + Name: "atproto_identity_apidir_resolve_did", 21 + Help: "ATProto DID resolutions", 22 + }, []string{"directory", "status"}) 23 + 24 + var didResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 25 + Name: "atproto_identity_apidir_resolve_did_duration", 26 + Help: "Time to resolve a DID", 27 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 28 + }, []string{"directory", "status"}) 29 + 30 + var identityResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 31 + Name: "atproto_identity_apidir_resolve_identity", 32 + Help: "ATProto combined identity resolutions", 33 + }, []string{"directory", "status"}) 34 + 35 + var identityResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 36 + Name: "atproto_identity_apidir_resolve_identity_duration", 37 + Help: "Time to resolve a combined identity", 38 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 39 + }, []string{"directory", "status"})
+18
atproto/identity/cache_directory.go
··· 95 95 if h.IsInvalidHandle() { 96 96 return "", fmt.Errorf("can not resolve handle: %w", ErrInvalidHandle) 97 97 } 98 + start := time.Now() 98 99 entry, ok := d.handleCache.Get(h) 99 100 if ok && !d.IsHandleStale(&entry) { 100 101 handleCacheHits.Inc() 102 + handleResolution.WithLabelValues("lru", "cached").Inc() 103 + handleResolutionDuration.WithLabelValues("lru", "cached").Observe(time.Since(start).Seconds()) 101 104 return entry.DID, entry.Err 102 105 } 103 106 handleCacheMisses.Inc() ··· 107 110 val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res) 108 111 if loaded { 109 112 handleRequestsCoalesced.Inc() 113 + handleResolution.WithLabelValues("lru", "coalesced").Inc() 114 + handleResolutionDuration.WithLabelValues("lru", "coalesced").Observe(time.Since(start).Seconds()) 110 115 // Wait for the result from the pending request 111 116 select { 112 117 case <-val.(chan struct{}): ··· 130 135 close(res) 131 136 132 137 if newEntry.Err != nil { 138 + handleResolution.WithLabelValues("lru", "error").Inc() 139 + handleResolutionDuration.WithLabelValues("lru", "error").Observe(time.Since(start).Seconds()) 133 140 return "", newEntry.Err 134 141 } 135 142 if newEntry.DID != "" { 143 + handleResolution.WithLabelValues("lru", "success").Inc() 144 + handleResolutionDuration.WithLabelValues("lru", "success").Observe(time.Since(start).Seconds()) 136 145 return newEntry.DID, nil 137 146 } 138 147 return "", fmt.Errorf("unexpected control-flow error") ··· 169 178 } 170 179 171 180 func (d *CacheDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax.DID) (*Identity, bool, error) { 181 + start := time.Now() 172 182 entry, ok := d.identityCache.Get(did) 173 183 if ok && !d.IsIdentityStale(&entry) { 174 184 identityCacheHits.Inc() 185 + didResolution.WithLabelValues("lru", "cached").Inc() 186 + didResolutionDuration.WithLabelValues("lru", "cached").Observe(time.Since(start).Seconds()) 175 187 return entry.Identity, true, entry.Err 176 188 } 177 189 identityCacheMisses.Inc() ··· 181 193 val, loaded := d.didLookupChans.LoadOrStore(did.String(), res) 182 194 if loaded { 183 195 identityRequestsCoalesced.Inc() 196 + didResolution.WithLabelValues("lru", "coalesced").Inc() 197 + didResolutionDuration.WithLabelValues("lru", "coalesced").Observe(time.Since(start).Seconds()) 184 198 // Wait for the result from the pending request 185 199 select { 186 200 case <-val.(chan struct{}): ··· 204 218 close(res) 205 219 206 220 if newEntry.Err != nil { 221 + didResolution.WithLabelValues("lru", "error").Inc() 222 + didResolutionDuration.WithLabelValues("lru", "error").Observe(time.Since(start).Seconds()) 207 223 return nil, false, newEntry.Err 208 224 } 209 225 if newEntry.Identity != nil { 226 + didResolution.WithLabelValues("lru", "success").Inc() 227 + didResolutionDuration.WithLabelValues("lru", "success").Observe(time.Since(start).Seconds()) 210 228 return newEntry.Identity, false, nil 211 229 } 212 230 return nil, false, fmt.Errorf("unexpected control-flow error")
+18 -26
atproto/identity/metrics.go
··· 5 5 "github.com/prometheus/client_golang/prometheus/promauto" 6 6 ) 7 7 8 - var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 9 - Name: "atproto_directory_handle_cache_hits", 10 - Help: "Number of cache hits for ATProto handle lookups", 11 - }) 12 - 13 - var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 14 - Name: "atproto_directory_handle_cache_misses", 15 - Help: "Number of cache misses for ATProto handle lookups", 16 - }) 17 - 18 - var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 19 - Name: "atproto_directory_identity_cache_hits", 20 - Help: "Number of cache hits for ATProto identity lookups", 21 - }) 8 + var handleResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 9 + Name: "atproto_identity_resolve_handle", 10 + Help: "ATProto handle resolutions", 11 + }, []string{"directory", "status"}) 22 12 23 - var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 24 - Name: "atproto_directory_identity_cache_misses", 25 - Help: "Number of cache misses for ATProto identity lookups", 26 - }) 13 + var handleResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 14 + Name: "atproto_identity_resolve_handle_duration", 15 + Help: "Time to resolve a handle", 16 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 17 + }, []string{"directory", "status"}) 27 18 28 - var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 29 - Name: "atproto_directory_identity_requests_coalesced", 30 - Help: "Number of identity requests coalesced", 31 - }) 19 + var didResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 20 + Name: "atproto_identity_resolve_did", 21 + Help: "ATProto DID resolutions", 22 + }, []string{"directory", "status"}) 32 23 33 - var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 34 - Name: "atproto_directory_handle_requests_coalesced", 35 - Help: "Number of handle requests coalesced", 36 - }) 24 + var didResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 25 + Name: "atproto_identity_resolve_did_duration", 26 + Help: "Time to resolve a DID", 27 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 28 + }, []string{"directory", "status"})
+42
atproto/identity/metrics_legacy.go
··· 1 + package identity 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // DEPRECATED 9 + var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 10 + Name: "atproto_directory_handle_cache_hits", 11 + Help: "Number of cache hits for ATProto handle lookups", 12 + }) 13 + 14 + // DEPRECATED 15 + var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 16 + Name: "atproto_directory_handle_cache_misses", 17 + Help: "Number of cache misses for ATProto handle lookups", 18 + }) 19 + 20 + // DEPRECATED 21 + var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 22 + Name: "atproto_directory_identity_cache_hits", 23 + Help: "Number of cache hits for ATProto identity lookups", 24 + }) 25 + 26 + // DEPRECATED 27 + var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 28 + Name: "atproto_directory_identity_cache_misses", 29 + Help: "Number of cache misses for ATProto identity lookups", 30 + }) 31 + 32 + // DEPRECATED 33 + var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 34 + Name: "atproto_directory_identity_requests_coalesced", 35 + Help: "Number of identity requests coalesced", 36 + }) 37 + 38 + // DEPRECATED 39 + var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 40 + Name: "atproto_directory_handle_requests_coalesced", 41 + Help: "Number of handle requests coalesced", 42 + })
+18 -26
atproto/identity/redisdir/metrics.go
··· 5 5 "github.com/prometheus/client_golang/prometheus/promauto" 6 6 ) 7 7 8 - var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 9 - Name: "atproto_redis_directory_handle_cache_hits", 10 - Help: "Number of cache hits for ATProto handle lookups", 11 - }) 12 - 13 - var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 14 - Name: "atproto_redis_directory_handle_cache_misses", 15 - Help: "Number of cache misses for ATProto handle lookups", 16 - }) 17 - 18 - var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 19 - Name: "atproto_redis_directory_identity_cache_hits", 20 - Help: "Number of cache hits for ATProto identity lookups", 21 - }) 8 + var handleResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 9 + Name: "atproto_identity_redisdir_resolve_handle", 10 + Help: "ATProto handle resolutions", 11 + }, []string{"directory", "status"}) 22 12 23 - var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 24 - Name: "atproto_redis_directory_identity_cache_misses", 25 - Help: "Number of cache misses for ATProto identity lookups", 26 - }) 13 + var handleResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 14 + Name: "atproto_identity_redisdir_resolve_handle_duration", 15 + Help: "Time to resolve a handle", 16 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 17 + }, []string{"directory", "status"}) 27 18 28 - var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 29 - Name: "atproto_redis_directory_identity_requests_coalesced", 30 - Help: "Number of identity requests coalesced", 31 - }) 19 + var didResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 20 + Name: "atproto_identity_redisdir_resolve_did", 21 + Help: "ATProto DID resolutions", 22 + }, []string{"directory", "status"}) 32 23 33 - var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 34 - Name: "atproto_redis_directory_handle_requests_coalesced", 35 - Help: "Number of handle requests coalesced", 36 - }) 24 + var didResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 25 + Name: "atproto_identity_redisdir_resolve_did_duration", 26 + Help: "Time to resolve a DID", 27 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 28 + }, []string{"directory", "status"})
+42
atproto/identity/redisdir/metrics_legacy.go
··· 1 + package redisdir 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // DEPRECATED 9 + var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 10 + Name: "atproto_redis_directory_handle_cache_hits", 11 + Help: "Number of cache hits for ATProto handle lookups", 12 + }) 13 + 14 + // DEPRECATED 15 + var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 16 + Name: "atproto_redis_directory_handle_cache_misses", 17 + Help: "Number of cache misses for ATProto handle lookups", 18 + }) 19 + 20 + // DEPRECATED 21 + var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 22 + Name: "atproto_redis_directory_identity_cache_hits", 23 + Help: "Number of cache hits for ATProto identity lookups", 24 + }) 25 + 26 + // DEPRECATED 27 + var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 28 + Name: "atproto_redis_directory_identity_cache_misses", 29 + Help: "Number of cache misses for ATProto identity lookups", 30 + }) 31 + 32 + // DEPRECATED 33 + var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 34 + Name: "atproto_redis_directory_identity_requests_coalesced", 35 + Help: "Number of identity requests coalesced", 36 + }) 37 + 38 + // DEPRECATED 39 + var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 40 + Name: "atproto_redis_directory_handle_requests_coalesced", 41 + Help: "Number of handle requests coalesced", 42 + })
+22
atproto/identity/redisdir/redis_directory.go
··· 155 155 } 156 156 157 157 func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { 158 + start := time.Now() 158 159 if h.IsInvalidHandle() { 159 160 return "", fmt.Errorf("can not resolve handle: %w", identity.ErrInvalidHandle) 160 161 } 161 162 var entry handleEntry 162 163 err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), &entry) 163 164 if err != nil && err != cache.ErrCacheMiss { 165 + handleResolution.WithLabelValues("redisdir", "error").Inc() 166 + handleResolutionDuration.WithLabelValues("redisdir", "error").Observe(time.Since(start).Seconds()) 164 167 return "", fmt.Errorf("identity cache read failed: %w", err) 165 168 } 166 169 if err == nil && !d.isHandleStale(&entry) { // if no error... 167 170 handleCacheHits.Inc() 171 + handleResolution.WithLabelValues("redisdir", "cached").Inc() 172 + handleResolutionDuration.WithLabelValues("redisdir", "cached").Observe(time.Since(start).Seconds()) 168 173 if entry.Err != nil { 169 174 return "", entry.Err 170 175 } else if entry.DID != nil { ··· 180 185 val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res) 181 186 if loaded { 182 187 handleRequestsCoalesced.Inc() 188 + handleResolution.WithLabelValues("redisdir", "coalesced").Inc() 189 + handleResolutionDuration.WithLabelValues("redisdir", "coalesced").Observe(time.Since(start).Seconds()) 183 190 // Wait for the result from the pending request 184 191 select { 185 192 case <-val.(chan struct{}): ··· 212 219 close(res) 213 220 214 221 if newEntry.Err != nil { 222 + handleResolution.WithLabelValues("redisdir", "error").Inc() 223 + handleResolutionDuration.WithLabelValues("redisdir", "error").Observe(time.Since(start).Seconds()) 215 224 return "", newEntry.Err 216 225 } 217 226 if newEntry.DID != nil { 227 + handleResolution.WithLabelValues("redisdir", "success").Inc() 228 + handleResolutionDuration.WithLabelValues("redisdir", "success").Observe(time.Since(start).Seconds()) 218 229 return *newEntry.DID, nil 219 230 } 220 231 return "", errors.New("unexpected control-flow error") ··· 267 278 } 268 279 269 280 func (d *RedisDirectory) LookupDIDWithCacheState(ctx context.Context, did syntax.DID) (*identity.Identity, bool, error) { 281 + start := time.Now() 270 282 var entry identityEntry 271 283 err := d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) 272 284 if err != nil && err != cache.ErrCacheMiss { 285 + didResolution.WithLabelValues("redisdir", "error").Inc() 286 + didResolutionDuration.WithLabelValues("redisdir", "error").Observe(time.Since(start).Seconds()) 273 287 return nil, false, fmt.Errorf("identity cache read failed: %w", err) 274 288 } 275 289 if err == nil && !d.isIdentityStale(&entry) { // if no error... 276 290 identityCacheHits.Inc() 291 + didResolution.WithLabelValues("redisdir", "cached").Inc() 292 + didResolutionDuration.WithLabelValues("redisdir", "cached").Observe(time.Since(start).Seconds()) 277 293 return entry.Identity, true, entry.Err 278 294 } 279 295 identityCacheMisses.Inc() ··· 283 299 val, loaded := d.didLookupChans.LoadOrStore(did.String(), res) 284 300 if loaded { 285 301 identityRequestsCoalesced.Inc() 302 + didResolution.WithLabelValues("redisdir", "coalesced").Inc() 303 + didResolutionDuration.WithLabelValues("redisdir", "coalesced").Observe(time.Since(start).Seconds()) 286 304 // Wait for the result from the pending request 287 305 select { 288 306 case <-val.(chan struct{}): ··· 309 327 close(res) 310 328 311 329 if newEntry.Err != nil { 330 + didResolution.WithLabelValues("redisdir", "error").Inc() 331 + didResolutionDuration.WithLabelValues("redisdir", "error").Observe(time.Since(start).Seconds()) 312 332 return nil, false, newEntry.Err 313 333 } 314 334 if newEntry.Identity != nil { 335 + didResolution.WithLabelValues("redisdir", "success").Inc() 336 + didResolutionDuration.WithLabelValues("redisdir", "success").Observe(time.Since(start).Seconds()) 315 337 return newEntry.Identity, false, nil 316 338 } 317 339 return nil, false, errors.New("unexpected control-flow error")
+37
cmd/bluepages/Dockerfile
··· 1 + # Run this dockerfile from the top level of the indigo git repository like: 2 + # 3 + # podman build -f ./cmd/bluepages/Dockerfile -t bluepages . 4 + 5 + ### Compile stage 6 + FROM golang:1.23-alpine3.20 AS build-env 7 + RUN apk add --no-cache build-base make git 8 + 9 + ADD . /dockerbuild 10 + WORKDIR /dockerbuild 11 + 12 + # timezone data for alpine builds 13 + ENV GOEXPERIMENT=loopvar 14 + RUN GIT_VERSION=$(git describe --tags --long --always) && \ 15 + go build -tags timetzdata -o /bluepages ./cmd/bluepages 16 + 17 + ### Run stage 18 + FROM alpine:3.20 19 + 20 + RUN apk add --no-cache --update dumb-init ca-certificates runit 21 + 22 + WORKDIR / 23 + RUN mkdir -p data/bluepages 24 + COPY --from=build-env /bluepages / 25 + 26 + # small things to make golang binaries work well under alpine 27 + ENV GODEBUG=netdns=go 28 + ENV TZ=Etc/UTC 29 + 30 + EXPOSE 6600 31 + 32 + ENTRYPOINT ["/usr/bin/dumb-init", "--"] 33 + CMD ["/bluepages", "serve"] 34 + 35 + LABEL org.opencontainers.image.source=https://github.com/bluesky-social/indigo 36 + LABEL org.opencontainers.image.description="atproto identity directory (bluepages)" 37 + LABEL org.opencontainers.image.licenses=MIT
+17
cmd/bluepages/README.md
··· 1 + 2 + bluepages: an atproto identity directory 3 + ======================================== 4 + 5 + This is a simple API server which caches atproto handle and DID resolution responses. It is useful when you have a bunch of services that do identity resolution, and you don't want duplicated caches. 6 + 7 + Available commands, flags, and config are documented in the usage (`--help`). 8 + 9 + Current features and design decisions: 10 + 11 + - all caches stored in Redis 12 + - will consume from the firehose (but doesn't yet) 13 + - Lexicon API endpoints: 14 + - `GET com.atproto.identity.resolveHandle` 15 + - `GET com.atproto.identity.resolveDid` 16 + - `GET com.atproto.identity.resolveIdentity` 17 + - `POST com.atproto.identity.refreshIdentity` (admin auth)
+152
cmd/bluepages/firehose.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "net/url" 8 + "sync/atomic" 9 + "time" 10 + 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + "github.com/bluesky-social/indigo/events/schedulers/parallel" 14 + 15 + "github.com/bluesky-social/indigo/events" 16 + "github.com/carlmjohnson/versioninfo" 17 + "github.com/gorilla/websocket" 18 + "github.com/redis/go-redis/v9" 19 + ) 20 + 21 + var firehoseCursorKey = "bluepages/firehoseSeq" 22 + 23 + func (srv *Server) RunFirehoseConsumer(ctx context.Context, host string, parallelism int) error { 24 + 25 + cur, err := srv.ReadLastCursor(ctx) 26 + if err != nil { 27 + return err 28 + } 29 + 30 + dialer := websocket.DefaultDialer 31 + u, err := url.Parse(host) 32 + if err != nil { 33 + return fmt.Errorf("invalid Host URI: %w", err) 34 + } 35 + u.Path = "xrpc/com.atproto.sync.subscribeRepos" 36 + if cur != 0 { 37 + u.RawQuery = fmt.Sprintf("cursor=%d", cur) 38 + } 39 + srv.logger.Info("subscribing to repo event stream", "upstream", host, "cursor", cur) 40 + con, _, err := dialer.Dial(u.String(), http.Header{ 41 + "User-Agent": []string{fmt.Sprintf("bluepages/%s", versioninfo.Short())}, 42 + }) 43 + if err != nil { 44 + return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 45 + } 46 + 47 + rsc := &events.RepoStreamCallbacks{ 48 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 49 + atomic.StoreInt64(&srv.lastSeq, evt.Seq) 50 + ctx := context.Background() 51 + srv.logger.Info("flushing cache due to #identity firehose event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 52 + 53 + did, err := syntax.ParseDID(evt.Did) 54 + if err != nil { 55 + srv.logger.Warn("invalid DID in #identity event", "did", evt.Did, "seq", evt.Seq, "err", err) 56 + return nil 57 + } 58 + if err := srv.dir.PurgeDID(ctx, did); err != nil { 59 + srv.logger.Error("failed to purge DID from cache", "did", evt.Did, "seq", evt.Seq, "err", err) 60 + return nil 61 + } 62 + if evt.Handle == nil { 63 + return nil 64 + } 65 + handle, err := syntax.ParseHandle(*evt.Handle) 66 + if err != nil { 67 + srv.logger.Warn("invalid handle in #identity event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 68 + return nil 69 + } 70 + if err := srv.dir.PurgeHandle(ctx, handle); err != nil { 71 + srv.logger.Error("failed to purge handle from cache", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 72 + return nil 73 + } 74 + return nil 75 + }, 76 + } 77 + 78 + var scheduler events.Scheduler 79 + // use a fixed-parallelism scheduler if configured 80 + scheduler = parallel.NewScheduler( 81 + parallelism, 82 + 1000, 83 + host, 84 + rsc.EventHandler, 85 + ) 86 + srv.logger.Info("bluepages firehose scheduler configured", "scheduler", "parallel", "initial", parallelism) 87 + 88 + return events.HandleRepoStream(ctx, con, scheduler, srv.logger) 89 + } 90 + 91 + func (srv *Server) ReadLastCursor(ctx context.Context) (int64, error) { 92 + // if redis isn't configured, just skip 93 + if srv.redisClient == nil { 94 + srv.logger.Info("redis not configured, skipping cursor read") 95 + return 0, nil 96 + } 97 + 98 + val, err := srv.redisClient.Get(ctx, firehoseCursorKey).Int64() 99 + if err == redis.Nil { 100 + srv.logger.Info("no pre-existing cursor in redis") 101 + return 0, nil 102 + } else if err != nil { 103 + return 0, err 104 + } 105 + srv.logger.Info("successfully found prior subscription cursor seq in redis", "seq", val) 106 + return val, nil 107 + } 108 + 109 + func (srv *Server) PersistCursor(ctx context.Context) error { 110 + // if redis isn't configured, just skip 111 + if srv.redisClient == nil { 112 + return nil 113 + } 114 + lastSeq := atomic.LoadInt64(&srv.lastSeq) 115 + if lastSeq <= 0 { 116 + return nil 117 + } 118 + err := srv.redisClient.Set(ctx, firehoseCursorKey, lastSeq, 14*24*time.Hour).Err() 119 + return err 120 + } 121 + 122 + // this method runs in a loop, persisting the current cursor state every 5 seconds 123 + func (srv *Server) RunPersistCursor(ctx context.Context) error { 124 + 125 + // if redis isn't configured, just skip 126 + if srv.redisClient == nil { 127 + return nil 128 + } 129 + ticker := time.NewTicker(5 * time.Second) 130 + for { 131 + select { 132 + case <-ctx.Done(): 133 + lastSeq := atomic.LoadInt64(&srv.lastSeq) 134 + if lastSeq >= 1 { 135 + srv.logger.Info("persisting final cursor seq value", "seq", lastSeq) 136 + err := srv.PersistCursor(ctx) 137 + if err != nil { 138 + srv.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 139 + } 140 + } 141 + return nil 142 + case <-ticker.C: 143 + lastSeq := atomic.LoadInt64(&srv.lastSeq) 144 + if lastSeq >= 1 { 145 + err := srv.PersistCursor(ctx) 146 + if err != nil { 147 + srv.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 148 + } 149 + } 150 + } 151 + } 152 + }
+258
cmd/bluepages/handlers.go
··· 1 + package main 2 + 3 + import ( 4 + "encoding/json" 5 + "errors" 6 + "fmt" 7 + 8 + comatproto "github.com/bluesky-social/indigo/api/atproto" 9 + "github.com/bluesky-social/indigo/atproto/identity" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + 12 + "github.com/labstack/echo/v4" 13 + ) 14 + 15 + // GET /xrpc/com.atproto.identity.resolveHandle 16 + func (srv *Server) ResolveHandle(c echo.Context) error { 17 + ctx := c.Request().Context() 18 + 19 + hdl, err := syntax.ParseHandle(c.QueryParam("handle")) 20 + if err != nil { 21 + return c.JSON(400, GenericError{ 22 + Error: "InvalidHandleSyntax", 23 + Message: err.Error(), 24 + }) 25 + } 26 + 27 + did, err := srv.dir.ResolveHandle(ctx, hdl) 28 + if err != nil && errors.Is(err, identity.ErrHandleNotFound) { 29 + return c.JSON(404, GenericError{ 30 + Error: "HandleNotFound", 31 + Message: err.Error(), 32 + }) 33 + } else if err != nil { 34 + return c.JSON(500, GenericError{ 35 + Error: "InternalError", 36 + Message: err.Error(), 37 + }) 38 + } 39 + return c.JSON(200, comatproto.IdentityResolveHandle_Output{ 40 + Did: did.String(), 41 + }) 42 + } 43 + 44 + // GET /xrpc/com.atproto.identity.resolveDid 45 + func (srv *Server) ResolveDid(c echo.Context) error { 46 + ctx := c.Request().Context() 47 + 48 + did, err := syntax.ParseDID(c.QueryParam("did")) 49 + if err != nil { 50 + return c.JSON(400, GenericError{ 51 + Error: "InvalidDidSyntax", 52 + Message: err.Error(), 53 + }) 54 + } 55 + 56 + rawDoc, err := srv.dir.ResolveDIDRaw(ctx, did) 57 + if err != nil && errors.Is(err, identity.ErrDIDNotFound) { 58 + return c.JSON(404, GenericError{ 59 + Error: "DidNotFound", 60 + Message: err.Error(), 61 + }) 62 + } else if err != nil { 63 + return c.JSON(500, GenericError{ 64 + Error: "InternalError", 65 + Message: err.Error(), 66 + }) 67 + } 68 + return c.JSON(200, comatproto.IdentityResolveDid_Output{ 69 + DidDoc: rawDoc, 70 + }) 71 + } 72 + 73 + // helper for resolveIdentity 74 + func (srv *Server) resolveIdentityFromHandle(c echo.Context, handle syntax.Handle) error { 75 + ctx := c.Request().Context() 76 + 77 + did, err := srv.dir.ResolveHandle(ctx, handle) 78 + if err != nil && errors.Is(err, identity.ErrHandleNotFound) { 79 + return c.JSON(404, GenericError{ 80 + Error: "HandleNotFound", 81 + Message: err.Error(), 82 + }) 83 + } else if err != nil { 84 + srv.logger.Warn("failed handle resolution", "err", err, "handle", handle) 85 + return c.JSON(502, GenericError{ 86 + Error: "HandleResolutionFailed", 87 + Message: err.Error(), 88 + }) 89 + } 90 + 91 + rawDoc, err := srv.dir.ResolveDIDRaw(ctx, did) 92 + if err != nil && errors.Is(err, identity.ErrDIDNotFound) { 93 + return c.JSON(404, GenericError{ 94 + Error: "DidNotFound", 95 + Message: err.Error(), 96 + }) 97 + } else if err != nil { 98 + return c.JSON(502, GenericError{ 99 + Error: "DIDResolutionFailed", 100 + Message: err.Error(), 101 + }) 102 + } 103 + 104 + var doc identity.DIDDocument 105 + if err := json.Unmarshal(rawDoc, &doc); err != nil { 106 + return c.JSON(400, GenericError{ 107 + Error: "InvalidDidDocument", 108 + Message: err.Error(), 109 + }) 110 + } 111 + 112 + ident := identity.ParseIdentity(&doc) 113 + declHandle, err := ident.DeclaredHandle() 114 + if err != nil || declHandle != handle { 115 + return c.JSON(400, GenericError{ 116 + Error: "HandleMismatch", 117 + Message: err.Error(), 118 + }) 119 + } 120 + 121 + return c.JSON(200, comatproto.IdentityDefs_IdentityInfo{ 122 + Did: ident.DID.String(), 123 + Handle: handle.String(), 124 + DidDoc: rawDoc, 125 + }) 126 + } 127 + 128 + // helper for resolveIdentity 129 + func (srv *Server) resolveIdentityFromDID(c echo.Context, did syntax.DID) error { 130 + ctx := c.Request().Context() 131 + 132 + rawDoc, err := srv.dir.ResolveDIDRaw(ctx, did) 133 + if err != nil && errors.Is(err, identity.ErrDIDNotFound) { 134 + return c.JSON(404, GenericError{ 135 + Error: "DidNotFound", 136 + Message: err.Error(), 137 + }) 138 + } else if err != nil { 139 + return c.JSON(502, GenericError{ 140 + Error: "DIDResolutionFailed", 141 + Message: err.Error(), 142 + }) 143 + } 144 + 145 + var doc identity.DIDDocument 146 + if err := json.Unmarshal(rawDoc, &doc); err != nil { 147 + return c.JSON(400, GenericError{ 148 + Error: "InvalidDidDocument", 149 + Message: err.Error(), 150 + }) 151 + } 152 + 153 + ident := identity.ParseIdentity(&doc) 154 + handle, err := ident.DeclaredHandle() 155 + if err != nil { 156 + // no handle declared, or invalid syntax 157 + handle = syntax.Handle("handle.invalid") 158 + } 159 + 160 + checkDID, err := srv.dir.ResolveHandle(ctx, handle) 161 + if err != nil || checkDID != did { 162 + handle = syntax.Handle("handle.invalid") 163 + } 164 + 165 + return c.JSON(200, comatproto.IdentityDefs_IdentityInfo{ 166 + Did: ident.DID.String(), 167 + Handle: handle.String(), 168 + DidDoc: rawDoc, 169 + }) 170 + } 171 + 172 + // GET /xrpc/com.atproto.identity.resolveIdentity 173 + func (srv *Server) ResolveIdentity(c echo.Context) error { 174 + // we partially re-implement the "Lookup()" logic here, but returning the full DID document, not `identity.Identity` 175 + atid, err := syntax.ParseAtIdentifier(c.QueryParam("identifier")) 176 + if err != nil { 177 + return c.JSON(400, GenericError{ 178 + Error: "InvalidIdentifierSyntax", 179 + Message: err.Error(), 180 + }) 181 + } 182 + 183 + handle, err := atid.AsHandle() 184 + if nil == err { 185 + return srv.resolveIdentityFromHandle(c, handle) 186 + } 187 + did, err := atid.AsDID() 188 + if nil == err { 189 + return srv.resolveIdentityFromDID(c, did) 190 + } 191 + return fmt.Errorf("unreachable code path") 192 + } 193 + 194 + // POST /xrpc/com.atproto.identity.refreshIdentity 195 + func (srv *Server) RefreshIdentity(c echo.Context) error { 196 + ctx := c.Request().Context() 197 + 198 + var body comatproto.IdentityRefreshIdentity_Input 199 + if err := c.Bind(&body); err != nil { 200 + return c.JSON(400, GenericError{ 201 + Error: "InvalidRequestBody", 202 + Message: err.Error(), 203 + }) 204 + } 205 + 206 + atid, err := syntax.ParseAtIdentifier(body.Identifier) 207 + if err != nil { 208 + return c.JSON(400, GenericError{ 209 + Error: "InvalidIdentifierSyntax", 210 + Message: err.Error(), 211 + }) 212 + } 213 + 214 + did, err := atid.AsDID() 215 + if nil == err { 216 + if err := srv.dir.PurgeDID(ctx, did); err != nil { 217 + return err 218 + } 219 + return srv.resolveIdentityFromDID(c, did) 220 + } 221 + handle, err := atid.AsHandle() 222 + if nil == err { 223 + if err := srv.dir.PurgeHandle(ctx, handle); err != nil { 224 + return err 225 + } 226 + return srv.resolveIdentityFromHandle(c, handle) 227 + } 228 + 229 + return fmt.Errorf("unreachable code path") 230 + } 231 + 232 + type GenericStatus struct { 233 + Daemon string `json:"daemon"` 234 + Status string `json:"status"` 235 + Message string `json:"msg,omitempty"` 236 + } 237 + 238 + func (s *Server) HandleHealthCheck(c echo.Context) error { 239 + return c.JSON(200, GenericStatus{Status: "ok", Daemon: "bluepages"}) 240 + } 241 + 242 + func (srv *Server) WebHome(c echo.Context) error { 243 + return c.String(200, ` 244 + eeeee e e e eeee eeeee eeeee eeeee eeee eeeee 245 + 8 8 8 8 8 8 8 8 8 8 8 8 8 8 " 246 + 8eee8e 8e 8e 8 8eee 8eee8 8eee8 8e 8eee 8eeee 247 + 88 8 88 88 8 88 88 88 8 88 "8 88 88 248 + 88eee8 88eee 88ee8 88ee 88 88 8 88ee8 88ee 8ee88 249 + 250 + This is an AT Protocol Identity Service 251 + 252 + Most API routes are under /xrpc/ 253 + 254 + Code: https://github.com/bluesky-social/indigo/tree/main/cmd/bluepages 255 + Protocol: https://atproto.com 256 + `) 257 + 258 + }
+340
cmd/bluepages/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "io" 8 + "log/slog" 9 + _ "net/http/pprof" 10 + "os" 11 + "runtime" 12 + "strings" 13 + 14 + "github.com/bluesky-social/indigo/atproto/identity/apidir" 15 + "github.com/bluesky-social/indigo/atproto/syntax" 16 + 17 + "github.com/carlmjohnson/versioninfo" 18 + _ "github.com/joho/godotenv/autoload" 19 + "github.com/urfave/cli/v2" 20 + ) 21 + 22 + func main() { 23 + if err := run(os.Args); err != nil { 24 + slog.Error("exiting", "err", err) 25 + os.Exit(-1) 26 + } 27 + } 28 + 29 + func run(args []string) error { 30 + 31 + app := cli.App{ 32 + Name: "bluepages", 33 + Usage: "atproto identity directory", 34 + Version: versioninfo.Short(), 35 + Flags: []cli.Flag{ 36 + &cli.StringFlag{ 37 + Name: "atp-relay-host", 38 + Usage: "hostname and port of Relay to subscribe to", 39 + Value: "wss://bsky.network", 40 + EnvVars: []string{"ATP_RELAY_HOST", "ATP_BGS_HOST"}, 41 + }, 42 + &cli.StringFlag{ 43 + Name: "atp-plc-host", 44 + Usage: "method, hostname, and port of PLC registry", 45 + Value: "https://plc.directory", 46 + EnvVars: []string{"ATP_PLC_HOST"}, 47 + }, 48 + &cli.IntFlag{ 49 + Name: "plc-rate-limit", 50 + Usage: "max number of requests per second to PLC registry", 51 + Value: 300, 52 + EnvVars: []string{"BLUEPAGES_PLC_RATE_LIMIT"}, 53 + }, 54 + &cli.StringFlag{ 55 + Name: "redis-url", 56 + Usage: "redis connection URL: redis://<user>:<pass>@<hostname>:6379/<db>", 57 + Value: "redis://localhost:6379/0", 58 + EnvVars: []string{"BLUEPAGES_REDIS_URL"}, 59 + }, 60 + &cli.StringFlag{ 61 + Name: "log-level", 62 + Usage: "log verbosity level (eg: warn, info, debug)", 63 + EnvVars: []string{"BLUEPAGES_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 64 + }, 65 + }, 66 + Commands: []*cli.Command{ 67 + &cli.Command{ 68 + Name: "serve", 69 + Usage: "run the bluepages API daemon", 70 + Action: runServeCmd, 71 + Flags: []cli.Flag{ 72 + &cli.StringFlag{ 73 + Name: "bind", 74 + Usage: "Specify the local IP/port to bind to", 75 + Required: false, 76 + Value: ":6600", 77 + EnvVars: []string{"BLUEPAGES_BIND"}, 78 + }, 79 + &cli.StringFlag{ 80 + Name: "metrics-listen", 81 + Usage: "IP or address, and port, to listen on for metrics APIs", 82 + Value: ":3989", 83 + EnvVars: []string{"BLUEPAGES_METRICS_LISTEN"}, 84 + }, 85 + &cli.BoolFlag{ 86 + Name: "disable-firehose-consumer", 87 + Usage: "don't consume #identity events from firehose", 88 + EnvVars: []string{"BLUEPAGES_DISABLE_FIREHOSE_CONSUMER"}, 89 + }, 90 + &cli.BoolFlag{ 91 + Name: "disable-refresh", 92 + Usage: "disable the refreshIdentity API endpoint", 93 + EnvVars: []string{"BLUEPAGES_DISABLE_REFRESH"}, 94 + }, 95 + &cli.IntFlag{ 96 + Name: "firehose-parallelism", 97 + Usage: "number of concurrent firehose workers", 98 + Value: 4, 99 + EnvVars: []string{"BLUEPAGES_FIREHOSE_PARALLELISM"}, 100 + }, 101 + }, 102 + }, 103 + &cli.Command{ 104 + Name: "resolve-handle", 105 + ArgsUsage: `<handle>`, 106 + Usage: "query service for handle resoltion", 107 + Action: runResolveHandleCmd, 108 + Flags: []cli.Flag{ 109 + &cli.StringFlag{ 110 + Name: "host", 111 + Usage: "bluepages server to send request to", 112 + Value: "http://localhost:6600", 113 + EnvVars: []string{"BLUEPAGES_HOST"}, 114 + }, 115 + }, 116 + }, 117 + &cli.Command{ 118 + Name: "resolve-did", 119 + ArgsUsage: `<did>`, 120 + Usage: "query service for DID document resoltion", 121 + Action: runResolveDIDCmd, 122 + Flags: []cli.Flag{ 123 + &cli.StringFlag{ 124 + Name: "host", 125 + Usage: "bluepages server to send request to", 126 + Value: "http://localhost:6600", 127 + EnvVars: []string{"BLUEPAGES_HOST"}, 128 + }, 129 + }, 130 + }, 131 + &cli.Command{ 132 + Name: "lookup", 133 + ArgsUsage: `<at-identifier>`, 134 + Usage: "query service for identity resoltion", 135 + Action: runLookupCmd, 136 + Flags: []cli.Flag{ 137 + &cli.StringFlag{ 138 + Name: "host", 139 + Usage: "bluepages server to send request to", 140 + Value: "http://localhost:6600", 141 + EnvVars: []string{"BLUEPAGES_HOST"}, 142 + }, 143 + }, 144 + }, 145 + &cli.Command{ 146 + Name: "refresh", 147 + ArgsUsage: `<at-identifier>`, 148 + Usage: "ask service to refresh identity", 149 + Action: runRefreshCmd, 150 + Flags: []cli.Flag{ 151 + &cli.StringFlag{ 152 + Name: "host", 153 + Usage: "bluepages server to send request to", 154 + Value: "http://localhost:6600", 155 + EnvVars: []string{"BLUEPAGES_HOST"}, 156 + }, 157 + }, 158 + }, 159 + }, 160 + } 161 + 162 + return app.Run(args) 163 + } 164 + 165 + func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger { 166 + var level slog.Level 167 + switch strings.ToLower(cctx.String("log-level")) { 168 + case "error": 169 + level = slog.LevelError 170 + case "warn": 171 + level = slog.LevelWarn 172 + case "info": 173 + level = slog.LevelInfo 174 + case "debug": 175 + level = slog.LevelDebug 176 + default: 177 + level = slog.LevelInfo 178 + } 179 + logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{ 180 + Level: level, 181 + })) 182 + slog.SetDefault(logger) 183 + return logger 184 + } 185 + 186 + func configClient(cctx *cli.Context) apidir.APIDirectory { 187 + return apidir.NewAPIDirectory(cctx.String("host")) 188 + } 189 + 190 + func runServeCmd(cctx *cli.Context) error { 191 + logger := configLogger(cctx, os.Stdout) 192 + ctx := context.Background() 193 + 194 + srv, err := NewServer( 195 + Config{ 196 + Logger: logger, 197 + Bind: cctx.String("bind"), 198 + RedisURL: cctx.String("redis-url"), 199 + PLCHost: cctx.String("atp-plc-host"), 200 + PLCRateLimit: cctx.Int("plc-rate-limit"), 201 + DisableRefresh: cctx.Bool("disable-refresh"), 202 + }, 203 + ) 204 + if err != nil { 205 + return fmt.Errorf("failed to construct server: %v", err) 206 + } 207 + 208 + if !cctx.Bool("disable-firehose-consumer") { 209 + go func() { 210 + firehoseHost := cctx.String("atp-relay-host") 211 + firehoseParallelism := cctx.Int("firehose-parallelism") 212 + if err := srv.RunFirehoseConsumer(ctx, firehoseHost, firehoseParallelism); err != nil { 213 + slog.Error("firehose consumer thread failed", "err", err) 214 + // NOTE: not crashing or halting process here 215 + } 216 + }() 217 + go func() { 218 + if err := srv.RunPersistCursor(ctx); err != nil { 219 + slog.Error("firehose persist thread failed", "err", err) 220 + // NOTE: not crashing or halting process here 221 + } 222 + }() 223 + } 224 + 225 + // prometheus HTTP endpoint: /metrics 226 + go func() { 227 + // TODO: what is this tuning for? just cargo-culted it 228 + runtime.SetBlockProfileRate(10) 229 + runtime.SetMutexProfileFraction(10) 230 + if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil { 231 + slog.Error("failed to start metrics endpoint", "error", err) 232 + // NOTE: not crashing or halting process here 233 + } 234 + }() 235 + 236 + return srv.RunAPI() 237 + } 238 + 239 + func runResolveHandleCmd(cctx *cli.Context) error { 240 + ctx := context.Background() 241 + dir := configClient(cctx) 242 + 243 + s := cctx.Args().First() 244 + if s == "" { 245 + return fmt.Errorf("need to provide identifier for resolution") 246 + } 247 + handle, err := syntax.ParseHandle(s) 248 + if err != nil { 249 + return err 250 + } 251 + 252 + did, err := dir.ResolveHandle(ctx, handle) 253 + if err != nil { 254 + return err 255 + } 256 + fmt.Println(did.String()) 257 + return nil 258 + } 259 + 260 + func runResolveDIDCmd(cctx *cli.Context) error { 261 + ctx := context.Background() 262 + dir := configClient(cctx) 263 + 264 + s := cctx.Args().First() 265 + if s == "" { 266 + return fmt.Errorf("need to provide identifier for resolution") 267 + } 268 + did, err := syntax.ParseDID(s) 269 + if err != nil { 270 + return err 271 + } 272 + 273 + raw, err := dir.ResolveDIDRaw(ctx, did) 274 + if err != nil { 275 + return err 276 + } 277 + b, err := json.MarshalIndent(raw, "", " ") 278 + if err != nil { 279 + return err 280 + } 281 + fmt.Println(string(b)) 282 + return nil 283 + } 284 + 285 + func runLookupCmd(cctx *cli.Context) error { 286 + ctx := context.Background() 287 + dir := configClient(cctx) 288 + 289 + s := cctx.Args().First() 290 + if s == "" { 291 + return fmt.Errorf("need to provide identifier for resolution") 292 + } 293 + atid, err := syntax.ParseAtIdentifier(s) 294 + if err != nil { 295 + return err 296 + } 297 + 298 + ident, err := dir.Lookup(ctx, *atid) 299 + if err != nil { 300 + return err 301 + } 302 + 303 + b, err := json.MarshalIndent(ident, "", " ") 304 + if err != nil { 305 + return err 306 + } 307 + fmt.Println(string(b)) 308 + return nil 309 + } 310 + 311 + func runRefreshCmd(cctx *cli.Context) error { 312 + ctx := context.Background() 313 + dir := configClient(cctx) 314 + 315 + s := cctx.Args().First() 316 + if s == "" { 317 + return fmt.Errorf("need to provide identifier for resolution") 318 + } 319 + atid, err := syntax.ParseAtIdentifier(s) 320 + if err != nil { 321 + return err 322 + } 323 + 324 + err = dir.Purge(ctx, *atid) 325 + if err != nil { 326 + return err 327 + } 328 + 329 + ident, err := dir.Lookup(ctx, *atid) 330 + if err != nil { 331 + return err 332 + } 333 + 334 + b, err := json.MarshalIndent(ident, "", " ") 335 + if err != nil { 336 + return err 337 + } 338 + fmt.Println(string(b)) 339 + return nil 340 + }
+28
cmd/bluepages/metrics.go
··· 1 + package main 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + var handleResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 9 + Name: "atproto_identity_bluepages_resolve_handle", 10 + Help: "ATProto handle resolutions", 11 + }, []string{"directory", "status"}) 12 + 13 + var handleResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 14 + Name: "atproto_identity_bluepages_resolve_handle_duration", 15 + Help: "Time to resolve a handle", 16 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 17 + }, []string{"directory", "status"}) 18 + 19 + var didResolution = promauto.NewCounterVec(prometheus.CounterOpts{ 20 + Name: "atproto_identity_bluepages_resolve_did", 21 + Help: "ATProto DID resolutions", 22 + }, []string{"directory", "status"}) 23 + 24 + var didResolutionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 25 + Name: "atproto_identity_bluepages_resolve_did_duration", 26 + Help: "Time to resolve a DID", 27 + Buckets: prometheus.ExponentialBucketsRange(0.001, 2, 15), 28 + }, []string{"directory", "status"})
+319
cmd/bluepages/resolver.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "sync" 10 + "time" 11 + 12 + "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + 15 + "github.com/go-redis/cache/v9" 16 + "github.com/redis/go-redis/v9" 17 + ) 18 + 19 + // This file is a fork of indigo:atproto/identity/redisdir. It stores raw DID documents, not identities, and implements `identity.Resolver`. 20 + 21 + // Uses redis as a cache for identity lookups. 22 + // 23 + // Includes an in-process LRU cache as well (provided by the redis client library), for hot key (identities). 24 + type RedisResolver struct { 25 + Inner identity.Resolver 26 + ErrTTL time.Duration 27 + HitTTL time.Duration 28 + InvalidHandleTTL time.Duration 29 + Logger *slog.Logger 30 + 31 + handleCache *cache.Cache 32 + didCache *cache.Cache 33 + didResolveChans sync.Map 34 + handleResolveChans sync.Map 35 + } 36 + 37 + type handleEntry struct { 38 + Updated time.Time 39 + // needs to be pointer type, because unmarshalling empty string would be an error 40 + DID *syntax.DID 41 + Err error 42 + } 43 + 44 + type didEntry struct { 45 + Updated time.Time 46 + RawDoc json.RawMessage 47 + Err error 48 + } 49 + 50 + var _ identity.Resolver = (*RedisResolver)(nil) 51 + 52 + // Creates a new caching `identity.Resolver` wrapper around an existing directory, using Redis and in-process LRU for caching. 53 + // 54 + // `redisURL` contains all the redis connection config options. 55 + // `hitTTL` and `errTTL` define how long successful and errored identity metadata should be cached (respectively). errTTL is expected to be shorted than hitTTL. 56 + // `lruSize` is the size of the in-process cache, for each of the handle and identity caches. 10000 is a reasonable default. 57 + // 58 + // NOTE: Errors returned may be inconsistent with the base directory, or between calls. This is because cached errors are serialized/deserialized and that may break equality checks. 59 + func NewRedisResolver(inner identity.Resolver, redisURL string, hitTTL, errTTL, invalidHandleTTL time.Duration, lruSize int) (*RedisResolver, error) { 60 + opt, err := redis.ParseURL(redisURL) 61 + if err != nil { 62 + return nil, fmt.Errorf("could not configure redis identity cache: %w", err) 63 + } 64 + rdb := redis.NewClient(opt) 65 + // check redis connection 66 + _, err = rdb.Ping(context.TODO()).Result() 67 + if err != nil { 68 + return nil, fmt.Errorf("could not connect to redis identity cache: %w", err) 69 + } 70 + handleCache := cache.New(&cache.Options{ 71 + Redis: rdb, 72 + LocalCache: cache.NewTinyLFU(lruSize, hitTTL), 73 + }) 74 + didCache := cache.New(&cache.Options{ 75 + Redis: rdb, 76 + LocalCache: cache.NewTinyLFU(lruSize, hitTTL), 77 + }) 78 + return &RedisResolver{ 79 + Inner: inner, 80 + ErrTTL: errTTL, 81 + HitTTL: hitTTL, 82 + InvalidHandleTTL: invalidHandleTTL, 83 + handleCache: handleCache, 84 + didCache: didCache, 85 + }, nil 86 + } 87 + 88 + func (d *RedisResolver) isHandleStale(e *handleEntry) bool { 89 + if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 90 + return true 91 + } 92 + return false 93 + } 94 + 95 + func (d *RedisResolver) isDIDStale(e *didEntry) bool { 96 + if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 97 + return true 98 + } 99 + return false 100 + } 101 + 102 + func (d *RedisResolver) refreshHandle(ctx context.Context, h syntax.Handle) handleEntry { 103 + start := time.Now() 104 + did, err := d.Inner.ResolveHandle(ctx, h) 105 + duration := time.Since(start) 106 + 107 + if err != nil { 108 + d.Logger.Info("handle resolution failed", "handle", h, "duration", duration, "err", err) 109 + handleResolution.WithLabelValues("bluepages", "error").Inc() 110 + handleResolutionDuration.WithLabelValues("bluepages", "error").Observe(time.Since(start).Seconds()) 111 + } else { 112 + handleResolution.WithLabelValues("bluepages", "success").Inc() 113 + handleResolutionDuration.WithLabelValues("bluepages", "success").Observe(time.Since(start).Seconds()) 114 + } 115 + if duration.Seconds() > 5.0 { 116 + d.Logger.Info("slow handle resolution", "handle", h, "duration", duration) 117 + } 118 + 119 + he := handleEntry{ 120 + Updated: time.Now(), 121 + DID: &did, 122 + Err: err, 123 + } 124 + err = d.handleCache.Set(&cache.Item{ 125 + Ctx: ctx, 126 + Key: "bluepages/handle/" + h.String(), 127 + Value: he, 128 + TTL: d.ErrTTL, 129 + }) 130 + if err != nil { 131 + d.Logger.Error("identity cache write failed", "cache", "handle", "err", err) 132 + } 133 + return he 134 + } 135 + 136 + func (d *RedisResolver) refreshDID(ctx context.Context, did syntax.DID) didEntry { 137 + start := time.Now() 138 + rawDoc, err := d.Inner.ResolveDIDRaw(ctx, did) 139 + duration := time.Since(start) 140 + 141 + if err != nil { 142 + d.Logger.Info("DID resolution failed", "did", did, "duration", duration, "err", err) 143 + didResolution.WithLabelValues("bluepages", "error").Inc() 144 + didResolutionDuration.WithLabelValues("bluepages", "error").Observe(time.Since(start).Seconds()) 145 + } else { 146 + didResolution.WithLabelValues("bluepages", "success").Inc() 147 + didResolutionDuration.WithLabelValues("bluepages", "success").Observe(time.Since(start).Seconds()) 148 + } 149 + if duration.Seconds() > 5.0 { 150 + d.Logger.Info("slow DID resolution", "did", did, "duration", duration) 151 + } 152 + 153 + // persist the DID lookup error, instead of processing it immediately 154 + entry := didEntry{ 155 + Updated: time.Now(), 156 + RawDoc: rawDoc, 157 + Err: err, 158 + } 159 + 160 + err = d.didCache.Set(&cache.Item{ 161 + Ctx: ctx, 162 + Key: "bluepages/did/" + did.String(), 163 + Value: entry, 164 + TTL: d.HitTTL, 165 + }) 166 + if err != nil { 167 + d.Logger.Error("DID cache write failed", "cache", "did", "did", did, "err", err) 168 + } 169 + return entry 170 + } 171 + 172 + func (d *RedisResolver) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { 173 + if h.IsInvalidHandle() { 174 + return "", fmt.Errorf("can not resolve handle: %w", identity.ErrInvalidHandle) 175 + } 176 + h = h.Normalize() 177 + var entry handleEntry 178 + err := d.handleCache.Get(ctx, "bluepages/handle/"+h.String(), &entry) 179 + if err != nil && err != cache.ErrCacheMiss { 180 + return "", fmt.Errorf("identity cache read failed: %w", err) 181 + } 182 + if err == nil && !d.isHandleStale(&entry) { // if no error... 183 + handleResolution.WithLabelValues("bluepages", "cached").Inc() 184 + if entry.Err != nil { 185 + return "", entry.Err 186 + } else if entry.DID != nil { 187 + return *entry.DID, nil 188 + } else { 189 + return "", errors.New("code flow error in redis identity directory") 190 + } 191 + } 192 + 193 + // Coalesce multiple requests for the same Handle 194 + res := make(chan struct{}) 195 + val, loaded := d.handleResolveChans.LoadOrStore(h.String(), res) 196 + if loaded { 197 + handleResolution.WithLabelValues("bluepages", "coalesced").Inc() 198 + // Wait for the result from the pending request 199 + select { 200 + case <-val.(chan struct{}): 201 + // The result should now be in the cache 202 + err := d.handleCache.Get(ctx, "bluepages/handle/"+h.String(), entry) 203 + if err != nil && err != cache.ErrCacheMiss { 204 + return "", fmt.Errorf("identity cache read failed: %w", err) 205 + } 206 + if err == nil && !d.isHandleStale(&entry) { // if no error... 207 + if entry.Err != nil { 208 + return "", entry.Err 209 + } else if entry.DID != nil { 210 + return *entry.DID, nil 211 + } else { 212 + return "", errors.New("code flow error in redis identity directory") 213 + } 214 + } 215 + return "", errors.New("identity not found in cache after coalesce returned") 216 + case <-ctx.Done(): 217 + return "", ctx.Err() 218 + } 219 + } 220 + 221 + // Update the Handle Entry from PLC and cache the result 222 + newEntry := d.refreshHandle(ctx, h) 223 + 224 + // Cleanup the coalesce map and close the results channel 225 + d.handleResolveChans.Delete(h.String()) 226 + // Callers waiting will now get the result from the cache 227 + close(res) 228 + 229 + if newEntry.Err != nil { 230 + return "", newEntry.Err 231 + } 232 + if newEntry.DID != nil { 233 + return *newEntry.DID, nil 234 + } 235 + return "", errors.New("unexpected control-flow error") 236 + } 237 + 238 + func (d *RedisResolver) ResolveDIDRaw(ctx context.Context, did syntax.DID) (json.RawMessage, error) { 239 + var entry didEntry 240 + err := d.didCache.Get(ctx, "bluepages/did/"+did.String(), &entry) 241 + if err != nil && err != cache.ErrCacheMiss { 242 + return nil, fmt.Errorf("DID cache read failed: %w", err) 243 + } 244 + if err == nil && !d.isDIDStale(&entry) { // if no error... 245 + didResolution.WithLabelValues("bluepages", "cached").Inc() 246 + return entry.RawDoc, entry.Err 247 + } 248 + 249 + // Coalesce multiple requests for the same DID 250 + res := make(chan struct{}) 251 + val, loaded := d.didResolveChans.LoadOrStore(did.String(), res) 252 + if loaded { 253 + didResolution.WithLabelValues("bluepages", "coalesced").Inc() 254 + // Wait for the result from the pending request 255 + select { 256 + case <-val.(chan struct{}): 257 + // The result should now be in the cache 258 + err = d.didCache.Get(ctx, "bluepages/did/"+did.String(), &entry) 259 + if err != nil && err != cache.ErrCacheMiss { 260 + return nil, fmt.Errorf("DID cache read failed: %w", err) 261 + } 262 + if err == nil && !d.isDIDStale(&entry) { // if no error... 263 + return entry.RawDoc, entry.Err 264 + } 265 + return nil, errors.New("DID not found in cache after coalesce returned") 266 + case <-ctx.Done(): 267 + return nil, ctx.Err() 268 + } 269 + } 270 + 271 + // Update the DID Entry and cache the result 272 + newEntry := d.refreshDID(ctx, did) 273 + 274 + // Cleanup the coalesce map and close the results channel 275 + d.didResolveChans.Delete(did.String()) 276 + // Callers waiting will now get the result from the cache 277 + close(res) 278 + 279 + if newEntry.Err != nil { 280 + return nil, newEntry.Err 281 + } 282 + if newEntry.RawDoc != nil { 283 + return newEntry.RawDoc, nil 284 + } 285 + return nil, errors.New("unexpected control-flow error") 286 + } 287 + 288 + func (d *RedisResolver) ResolveDID(ctx context.Context, did syntax.DID) (*identity.DIDDocument, error) { 289 + b, err := d.ResolveDIDRaw(ctx, did) 290 + if err != nil { 291 + return nil, err 292 + } 293 + 294 + var doc identity.DIDDocument 295 + if err := json.Unmarshal(b, &doc); err != nil { 296 + return nil, fmt.Errorf("%w: JSON DID document parse: %w", identity.ErrDIDResolutionFailed, err) 297 + } 298 + if doc.DID != did { 299 + return nil, fmt.Errorf("document ID did not match DID") 300 + } 301 + return &doc, nil 302 + } 303 + 304 + func (d *RedisResolver) PurgeHandle(ctx context.Context, handle syntax.Handle) error { 305 + handle = handle.Normalize() 306 + err := d.handleCache.Delete(ctx, "bluepages/handle/"+handle.String()) 307 + if err == cache.ErrCacheMiss { 308 + return nil 309 + } 310 + return err 311 + } 312 + 313 + func (d *RedisResolver) PurgeDID(ctx context.Context, did syntax.DID) error { 314 + err := d.didCache.Delete(ctx, "bluepages/did/"+did.String()) 315 + if err == cache.ErrCacheMiss { 316 + return nil 317 + } 318 + return err 319 + }
+218
cmd/bluepages/server.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "log/slog" 8 + "net" 9 + "net/http" 10 + "os" 11 + "os/signal" 12 + "syscall" 13 + "time" 14 + 15 + "github.com/bluesky-social/indigo/atproto/identity" 16 + 17 + "github.com/labstack/echo/v4" 18 + "github.com/labstack/echo/v4/middleware" 19 + "github.com/prometheus/client_golang/prometheus/promhttp" 20 + "github.com/redis/go-redis/v9" 21 + slogecho "github.com/samber/slog-echo" 22 + "golang.org/x/time/rate" 23 + ) 24 + 25 + type Server struct { 26 + dir *RedisResolver 27 + echo *echo.Echo 28 + httpd *http.Server 29 + logger *slog.Logger 30 + 31 + // this redis client is used to store firehose offset 32 + redisClient *redis.Client 33 + 34 + // lastSeq is the most recent event sequence number we've received and begun to handle. 35 + // This number is periodically persisted to redis, if redis is present. 36 + // The value is best-effort (the stream handling itself is concurrent, so event numbers may not be monotonic), 37 + // but nonetheless, you must use atomics when updating or reading this (to avoid data races). 38 + lastSeq int64 39 + } 40 + 41 + type Config struct { 42 + Logger *slog.Logger 43 + PLCHost string 44 + PLCRateLimit int 45 + RedisURL string 46 + Bind string 47 + DisableRefresh bool 48 + } 49 + 50 + func NewServer(config Config) (*Server, error) { 51 + logger := config.Logger 52 + if logger == nil { 53 + logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 54 + Level: slog.LevelInfo, 55 + })) 56 + } 57 + 58 + baseDir := identity.BaseDirectory{ 59 + PLCURL: config.PLCHost, 60 + HTTPClient: http.Client{ 61 + Timeout: time.Second * 10, 62 + Transport: &http.Transport{ 63 + // would want this around 100ms for services doing lots of handle resolution (to reduce number of idle connections). Impacts PLC connections as well, but not too bad. 64 + IdleConnTimeout: time.Millisecond * 100, 65 + MaxIdleConns: 1000, 66 + }, 67 + }, 68 + Resolver: net.Resolver{ 69 + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { 70 + d := net.Dialer{Timeout: time.Second * 3} 71 + return d.DialContext(ctx, network, address) 72 + }, 73 + }, 74 + PLCLimiter: rate.NewLimiter(rate.Limit(config.PLCRateLimit), 1), 75 + TryAuthoritativeDNS: true, 76 + SkipDNSDomainSuffixes: []string{".bsky.social", ".staging.bsky.dev"}, 77 + // TODO: UserAgent: "bluepages", 78 + } 79 + 80 + // TODO: config these timeouts 81 + redisDir, err := NewRedisResolver(&baseDir, config.RedisURL, time.Hour*24, time.Minute*2, time.Minute*5, 50_000) 82 + if err != nil { 83 + return nil, err 84 + } 85 + redisDir.Logger = logger 86 + 87 + // configure redis client (for firehose consumer) 88 + redisOpt, err := redis.ParseURL(config.RedisURL) 89 + if err != nil { 90 + return nil, fmt.Errorf("parsing redis URL: %v", err) 91 + } 92 + redisClient := redis.NewClient(redisOpt) 93 + // check redis connection 94 + _, err = redisClient.Ping(context.Background()).Result() 95 + if err != nil { 96 + return nil, fmt.Errorf("redis ping failed: %v", err) 97 + } 98 + 99 + e := echo.New() 100 + 101 + // httpd 102 + var ( 103 + httpTimeout = 1 * time.Minute 104 + httpMaxHeaderBytes = 1 * (1024 * 1024) 105 + ) 106 + 107 + srv := &Server{ 108 + echo: e, 109 + dir: redisDir, 110 + logger: logger, 111 + redisClient: redisClient, 112 + } 113 + 114 + srv.httpd = &http.Server{ 115 + Handler: srv, 116 + Addr: config.Bind, 117 + WriteTimeout: httpTimeout, 118 + ReadTimeout: httpTimeout, 119 + MaxHeaderBytes: httpMaxHeaderBytes, 120 + } 121 + 122 + e.HideBanner = true 123 + e.Use(slogecho.New(logger)) 124 + e.Use(middleware.Recover()) 125 + e.Use(middleware.BodyLimit("4M")) 126 + e.HTTPErrorHandler = srv.errorHandler 127 + e.Use(middleware.SecureWithConfig(middleware.SecureConfig{ 128 + ContentTypeNosniff: "nosniff", 129 + XFrameOptions: "SAMEORIGIN", 130 + HSTSMaxAge: 31536000, // 365 days 131 + // TODO: 132 + // ContentSecurityPolicy 133 + // XSSProtection 134 + })) 135 + 136 + e.GET("/", srv.WebHome) 137 + e.GET("/_health", srv.HandleHealthCheck) 138 + e.GET("/xrpc/com.atproto.identity.resolveHandle", srv.ResolveHandle) 139 + e.GET("/xrpc/com.atproto.identity.resolveDid", srv.ResolveDid) 140 + e.GET("/xrpc/com.atproto.identity.resolveIdentity", srv.ResolveIdentity) 141 + if !config.DisableRefresh { 142 + e.POST("/xrpc/com.atproto.identity.refreshIdentity", srv.RefreshIdentity) 143 + } 144 + 145 + return srv, nil 146 + } 147 + 148 + func (srv *Server) ServeHTTP(rw http.ResponseWriter, req *http.Request) { 149 + srv.echo.ServeHTTP(rw, req) 150 + } 151 + 152 + func (srv *Server) RunAPI() error { 153 + srv.logger.Info("starting server", "bind", srv.httpd.Addr) 154 + go func() { 155 + if err := srv.httpd.ListenAndServe(); err != nil { 156 + if !errors.Is(err, http.ErrServerClosed) { 157 + srv.logger.Error("HTTP server shutting down unexpectedly", "err", err) 158 + } 159 + } 160 + }() 161 + 162 + // Wait for a signal to exit. 163 + srv.logger.Info("registering OS exit signal handler") 164 + quit := make(chan struct{}) 165 + exitSignals := make(chan os.Signal, 1) 166 + signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM) 167 + go func() { 168 + sig := <-exitSignals 169 + srv.logger.Info("received OS exit signal", "signal", sig) 170 + 171 + // Shut down the HTTP server 172 + if err := srv.Shutdown(); err != nil { 173 + srv.logger.Error("HTTP server shutdown error", "err", err) 174 + } 175 + 176 + // Trigger the return that causes an exit. 177 + close(quit) 178 + }() 179 + <-quit 180 + srv.logger.Info("graceful shutdown complete") 181 + return nil 182 + } 183 + 184 + func (srv *Server) RunMetrics(bind string) error { 185 + p := "/metrics" 186 + srv.logger.Info("starting metrics endpoint", "bind", bind, "path", p) 187 + http.Handle(p, promhttp.Handler()) 188 + return http.ListenAndServe(bind, nil) 189 + } 190 + 191 + func (srv *Server) Shutdown() error { 192 + srv.logger.Info("shutting down") 193 + 194 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 195 + defer cancel() 196 + 197 + return srv.httpd.Shutdown(ctx) 198 + } 199 + 200 + type GenericError struct { 201 + Error string `json:"error"` 202 + Message string `json:"message"` 203 + } 204 + 205 + func (srv *Server) errorHandler(err error, c echo.Context) { 206 + code := http.StatusInternalServerError 207 + var errorMessage string 208 + if he, ok := err.(*echo.HTTPError); ok { 209 + code = he.Code 210 + errorMessage = fmt.Sprintf("%s", he.Message) 211 + } 212 + if code >= 500 { 213 + srv.logger.Warn("bluepages-http-internal-error", "err", err) 214 + } 215 + if !c.Response().Committed { 216 + c.JSON(code, GenericError{Error: "InternalError", Message: errorMessage}) 217 + } 218 + }