porting all github actions from bluesky-social/indigo to tangled CI

Merge branch 'main' into hailey/codegen-2

+3 -3
api/agnostic/actorgetPreferences.go
··· 7 7 import ( 8 8 "context" 9 9 10 - "github.com/bluesky-social/indigo/xrpc" 10 + "github.com/bluesky-social/indigo/lex/util" 11 11 ) 12 12 13 13 // ActorGetPreferences_Output is the output of a app.bsky.actor.getPreferences call. ··· 16 16 } 17 17 18 18 // ActorGetPreferences calls the XRPC method "app.bsky.actor.getPreferences". 19 - func ActorGetPreferences(ctx context.Context, c *xrpc.Client) (*ActorGetPreferences_Output, error) { 19 + func ActorGetPreferences(ctx context.Context, c util.LexClient) (*ActorGetPreferences_Output, error) { 20 20 var out ActorGetPreferences_Output 21 21 22 22 params := map[string]interface{}{} 23 - if err := c.Do(ctx, xrpc.Query, "", "app.bsky.actor.getPreferences", params, nil, &out); err != nil { 23 + if err := c.LexDo(ctx, util.Query, "", "app.bsky.actor.getPreferences", params, nil, &out); err != nil { 24 24 return nil, err 25 25 } 26 26
+3 -3
api/agnostic/actorputPreferences.go
··· 7 7 import ( 8 8 "context" 9 9 10 - "github.com/bluesky-social/indigo/xrpc" 10 + "github.com/bluesky-social/indigo/lex/util" 11 11 ) 12 12 13 13 // ActorPutPreferences_Input is the input argument to a app.bsky.actor.putPreferences call. ··· 16 16 } 17 17 18 18 // ActorPutPreferences calls the XRPC method "app.bsky.actor.putPreferences". 19 - func ActorPutPreferences(ctx context.Context, c *xrpc.Client, input *ActorPutPreferences_Input) error { 20 - if err := c.Do(ctx, xrpc.Procedure, "application/json", "app.bsky.actor.putPreferences", nil, input, nil); err != nil { 19 + func ActorPutPreferences(ctx context.Context, c util.LexClient, input *ActorPutPreferences_Input) error { 20 + if err := c.LexDo(ctx, util.Procedure, "application/json", "app.bsky.actor.putPreferences", nil, input, nil); err != nil { 21 21 return err 22 22 } 23 23
+3 -3
api/agnostic/identitygetRecommendedDidCredentials.go
··· 8 8 "context" 9 9 "encoding/json" 10 10 11 - "github.com/bluesky-social/indigo/xrpc" 11 + "github.com/bluesky-social/indigo/lex/util" 12 12 ) 13 13 14 14 // IdentityGetRecommendedDidCredentials calls the XRPC method "com.atproto.identity.getRecommendedDidCredentials". 15 - func IdentityGetRecommendedDidCredentials(ctx context.Context, c *xrpc.Client) (*json.RawMessage, error) { 15 + func IdentityGetRecommendedDidCredentials(ctx context.Context, c util.LexClient) (*json.RawMessage, error) { 16 16 var out json.RawMessage 17 17 18 - if err := c.Do(ctx, xrpc.Query, "", "com.atproto.identity.getRecommendedDidCredentials", nil, nil, &out); err != nil { 18 + if err := c.LexDo(ctx, util.Query, "", "com.atproto.identity.getRecommendedDidCredentials", nil, nil, &out); err != nil { 19 19 return nil, err 20 20 } 21 21
+3 -3
api/agnostic/identitysignPlcOperation.go
··· 8 8 "context" 9 9 "encoding/json" 10 10 11 - "github.com/bluesky-social/indigo/xrpc" 11 + "github.com/bluesky-social/indigo/lex/util" 12 12 ) 13 13 14 14 // IdentitySignPlcOperation_Input is the input argument to a com.atproto.identity.signPlcOperation call. ··· 28 28 } 29 29 30 30 // IdentitySignPlcOperation calls the XRPC method "com.atproto.identity.signPlcOperation". 31 - func IdentitySignPlcOperation(ctx context.Context, c *xrpc.Client, input *IdentitySignPlcOperation_Input) (*IdentitySignPlcOperation_Output, error) { 31 + func IdentitySignPlcOperation(ctx context.Context, c util.LexClient, input *IdentitySignPlcOperation_Input) (*IdentitySignPlcOperation_Output, error) { 32 32 var out IdentitySignPlcOperation_Output 33 - if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.identity.signPlcOperation", nil, input, &out); err != nil { 33 + if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.identity.signPlcOperation", nil, input, &out); err != nil { 34 34 return nil, err 35 35 } 36 36
+3 -3
api/agnostic/identitysubmitPlcOperation.go
··· 8 8 "context" 9 9 "encoding/json" 10 10 11 - "github.com/bluesky-social/indigo/xrpc" 11 + "github.com/bluesky-social/indigo/lex/util" 12 12 ) 13 13 14 14 // IdentitySubmitPlcOperation_Input is the input argument to a com.atproto.identity.submitPlcOperation call. ··· 17 17 } 18 18 19 19 // IdentitySubmitPlcOperation calls the XRPC method "com.atproto.identity.submitPlcOperation". 20 - func IdentitySubmitPlcOperation(ctx context.Context, c *xrpc.Client, input *IdentitySubmitPlcOperation_Input) error { 21 - if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.identity.submitPlcOperation", nil, input, nil); err != nil { 20 + func IdentitySubmitPlcOperation(ctx context.Context, c util.LexClient, input *IdentitySubmitPlcOperation_Input) error { 21 + if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.identity.submitPlcOperation", nil, input, nil); err != nil { 22 22 return err 23 23 } 24 24
+2 -3
api/agnostic/repoapplyWrites.go
··· 10 10 "fmt" 11 11 12 12 "github.com/bluesky-social/indigo/lex/util" 13 - "github.com/bluesky-social/indigo/xrpc" 14 13 ) 15 14 16 15 // RepoApplyWrites_Create is a "create" in the com.atproto.repo.applyWrites schema. ··· 179 178 } 180 179 181 180 // RepoApplyWrites calls the XRPC method "com.atproto.repo.applyWrites". 182 - func RepoApplyWrites(ctx context.Context, c *xrpc.Client, input *RepoApplyWrites_Input) (*RepoApplyWrites_Output, error) { 181 + func RepoApplyWrites(ctx context.Context, c util.LexClient, input *RepoApplyWrites_Input) (*RepoApplyWrites_Output, error) { 183 182 var out RepoApplyWrites_Output 184 - if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.applyWrites", nil, input, &out); err != nil { 183 + if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.repo.applyWrites", nil, input, &out); err != nil { 185 184 return nil, err 186 185 } 187 186
+3 -3
api/agnostic/repocreateRecord.go
··· 7 7 import ( 8 8 "context" 9 9 10 - "github.com/bluesky-social/indigo/xrpc" 10 + "github.com/bluesky-social/indigo/lex/util" 11 11 ) 12 12 13 13 // RepoDefs_CommitMeta is a "commitMeta" in the com.atproto.repo.defs schema. ··· 41 41 } 42 42 43 43 // RepoCreateRecord calls the XRPC method "com.atproto.repo.createRecord". 44 - func RepoCreateRecord(ctx context.Context, c *xrpc.Client, input *RepoCreateRecord_Input) (*RepoCreateRecord_Output, error) { 44 + func RepoCreateRecord(ctx context.Context, c util.LexClient, input *RepoCreateRecord_Input) (*RepoCreateRecord_Output, error) { 45 45 var out RepoCreateRecord_Output 46 - if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.createRecord", nil, input, &out); err != nil { 46 + if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.repo.createRecord", nil, input, &out); err != nil { 47 47 return nil, err 48 48 } 49 49
+3 -3
api/agnostic/repogetRecord.go
··· 8 8 "context" 9 9 "encoding/json" 10 10 11 - "github.com/bluesky-social/indigo/xrpc" 11 + "github.com/bluesky-social/indigo/lex/util" 12 12 ) 13 13 14 14 // RepoGetRecord_Output is the output of a com.atproto.repo.getRecord call. ··· 25 25 // collection: The NSID of the record collection. 26 26 // repo: The handle or DID of the repo. 27 27 // rkey: The Record Key. 28 - func RepoGetRecord(ctx context.Context, c *xrpc.Client, cid string, collection string, repo string, rkey string) (*RepoGetRecord_Output, error) { 28 + func RepoGetRecord(ctx context.Context, c util.LexClient, cid string, collection string, repo string, rkey string) (*RepoGetRecord_Output, error) { 29 29 var out RepoGetRecord_Output 30 30 31 31 params := map[string]interface{}{ ··· 34 34 "repo": repo, 35 35 "rkey": rkey, 36 36 } 37 - if err := c.Do(ctx, xrpc.Query, "", "com.atproto.repo.getRecord", params, nil, &out); err != nil { 37 + if err := c.LexDo(ctx, util.Query, "", "com.atproto.repo.getRecord", params, nil, &out); err != nil { 38 38 return nil, err 39 39 } 40 40
+3 -3
api/agnostic/repolistRecords.go
··· 8 8 "context" 9 9 "encoding/json" 10 10 11 - "github.com/bluesky-social/indigo/xrpc" 11 + "github.com/bluesky-social/indigo/lex/util" 12 12 ) 13 13 14 14 // RepoListRecords_Output is the output of a com.atproto.repo.listRecords call. ··· 31 31 // limit: The number of records to return. 32 32 // repo: The handle or DID of the repo. 33 33 // reverse: Flag to reverse the order of the returned records. 34 - func RepoListRecords(ctx context.Context, c *xrpc.Client, collection string, cursor string, limit int64, repo string, reverse bool) (*RepoListRecords_Output, error) { 34 + func RepoListRecords(ctx context.Context, c util.LexClient, collection string, cursor string, limit int64, repo string, reverse bool) (*RepoListRecords_Output, error) { 35 35 var out RepoListRecords_Output 36 36 37 37 params := map[string]interface{}{ ··· 41 41 "repo": repo, 42 42 "reverse": reverse, 43 43 } 44 - if err := c.Do(ctx, xrpc.Query, "", "com.atproto.repo.listRecords", params, nil, &out); err != nil { 44 + if err := c.LexDo(ctx, util.Query, "", "com.atproto.repo.listRecords", params, nil, &out); err != nil { 45 45 return nil, err 46 46 } 47 47
+3 -3
api/agnostic/repoputRecord.go
··· 7 7 import ( 8 8 "context" 9 9 10 - "github.com/bluesky-social/indigo/xrpc" 10 + "github.com/bluesky-social/indigo/lex/util" 11 11 ) 12 12 13 13 // RepoPutRecord_Input is the input argument to a com.atproto.repo.putRecord call. ··· 37 37 } 38 38 39 39 // RepoPutRecord calls the XRPC method "com.atproto.repo.putRecord". 40 - func RepoPutRecord(ctx context.Context, c *xrpc.Client, input *RepoPutRecord_Input) (*RepoPutRecord_Output, error) { 40 + func RepoPutRecord(ctx context.Context, c util.LexClient, input *RepoPutRecord_Input) (*RepoPutRecord_Output, error) { 41 41 var out RepoPutRecord_Output 42 - if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", nil, input, &out); err != nil { 42 + if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.repo.putRecord", nil, input, &out); err != nil { 43 43 return nil, err 44 44 } 45 45
+2
api/bsky/feeddefs.go
··· 211 211 type FeedDefs_ReasonRepost struct { 212 212 LexiconTypeID string `json:"$type,const=app.bsky.feed.defs#reasonRepost" cborgen:"$type,const=app.bsky.feed.defs#reasonRepost"` 213 213 By *ActorDefs_ProfileViewBasic `json:"by" cborgen:"by"` 214 + Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty"` 214 215 IndexedAt string `json:"indexedAt" cborgen:"indexedAt"` 216 + Uri *string `json:"uri,omitempty" cborgen:"uri,omitempty"` 215 217 } 216 218 217 219 // FeedDefs_ReplyRef is a "replyRef" in the app.bsky.feed.defs schema.
+1
api/bsky/feedlike.go
··· 17 17 LexiconTypeID string `json:"$type,const=app.bsky.feed.like" cborgen:"$type,const=app.bsky.feed.like"` 18 18 CreatedAt string `json:"createdAt" cborgen:"createdAt"` 19 19 Subject *comatprototypes.RepoStrongRef `json:"subject" cborgen:"subject"` 20 + Via *comatprototypes.RepoStrongRef `json:"via,omitempty" cborgen:"via,omitempty"` 20 21 }
+1
api/bsky/feedrepost.go
··· 17 17 LexiconTypeID string `json:"$type,const=app.bsky.feed.repost" cborgen:"$type,const=app.bsky.feed.repost"` 18 18 CreatedAt string `json:"createdAt" cborgen:"createdAt"` 19 19 Subject *comatprototypes.RepoStrongRef `json:"subject" cborgen:"subject"` 20 + Via *comatprototypes.RepoStrongRef `json:"via,omitempty" cborgen:"via,omitempty"` 20 21 }
+1 -1
api/bsky/notificationlistNotifications.go
··· 19 19 IndexedAt string `json:"indexedAt" cborgen:"indexedAt"` 20 20 IsRead bool `json:"isRead" cborgen:"isRead"` 21 21 Labels []*comatprototypes.LabelDefs_Label `json:"labels,omitempty" cborgen:"labels,omitempty"` 22 - // reason: Expected values are 'like', 'repost', 'follow', 'mention', 'reply', 'quote', 'starterpack-joined', 'verified', and 'unverified'. 22 + // reason: The reason why this notification was delivered - e.g. your post was liked, or you received a new follower. 23 23 Reason string `json:"reason" cborgen:"reason"` 24 24 ReasonSubject *string `json:"reasonSubject,omitempty" cborgen:"reasonSubject,omitempty"` 25 25 Record *util.LexiconTypeDecoder `json:"record" cborgen:"record"`
+124
atproto/crypto/jwk.go
··· 1 + package crypto 2 + 3 + import ( 4 + "crypto/ecdsa" 5 + "crypto/elliptic" 6 + "encoding/base64" 7 + "encoding/json" 8 + "fmt" 9 + "math/big" 10 + 11 + secp256k1 "gitlab.com/yawning/secp256k1-voi" 12 + secp256k1secec "gitlab.com/yawning/secp256k1-voi/secec" 13 + ) 14 + 15 + // Representation of a JSON Web Key (JWK), as relevant to the keys supported by this package. 16 + // 17 + // Expected to be marshalled/unmarshalled as JSON. 18 + type JWK struct { 19 + KeyType string `json:"kty"` 20 + Curve string `json:"crv"` 21 + X string `json:"x"` // base64url, no padding 22 + Y string `json:"y"` // base64url, no padding 23 + Use string `json:"use,omitempty"` 24 + KeyID *string `json:"kid,omitempty"` 25 + } 26 + 27 + // Loads a [PublicKey] from JWK (serialized as JSON bytes) 28 + func ParsePublicJWKBytes(jwkBytes []byte) (PublicKey, error) { 29 + var jwk JWK 30 + if err := json.Unmarshal(jwkBytes, &jwk); err != nil { 31 + return nil, fmt.Errorf("parsing JWK JSON: %w", err) 32 + } 33 + return ParsePublicJWK(jwk) 34 + } 35 + 36 + // Loads a [PublicKey] from JWK struct. 37 + func ParsePublicJWK(jwk JWK) (PublicKey, error) { 38 + 39 + if jwk.KeyType != "EC" { 40 + return nil, fmt.Errorf("unsupported JWK key type: %s", jwk.KeyType) 41 + } 42 + 43 + // base64url with no encoding 44 + xbuf, err := base64.RawURLEncoding.DecodeString(jwk.X) 45 + if err != nil { 46 + return nil, fmt.Errorf("invalid JWK base64 encoding: %w", err) 47 + } 48 + ybuf, err := base64.RawURLEncoding.DecodeString(jwk.Y) 49 + if err != nil { 50 + return nil, fmt.Errorf("invalid JWK base64 encoding: %w", err) 51 + } 52 + 53 + switch jwk.Curve { 54 + case "P-256": 55 + curve := elliptic.P256() 56 + 57 + var x, y big.Int 58 + x.SetBytes(xbuf) 59 + y.SetBytes(ybuf) 60 + 61 + if !curve.Params().IsOnCurve(&x, &y) { 62 + return nil, fmt.Errorf("invalid P-256 public key (not on curve)") 63 + } 64 + pubECDSA := &ecdsa.PublicKey{ 65 + Curve: curve, 66 + X: &x, 67 + Y: &y, 68 + } 69 + pub := PublicKeyP256{pubP256: *pubECDSA} 70 + err := pub.checkCurve() 71 + if err != nil { 72 + return nil, err 73 + } 74 + return &pub, nil 75 + case "secp256k1": // K-256 76 + if len(xbuf) != 32 || len(ybuf) != 32 { 77 + return nil, fmt.Errorf("invalid K-256 coordinates") 78 + } 79 + xarr := ([32]byte)(xbuf[:32]) 80 + yarr := ([32]byte)(ybuf[:32]) 81 + p, err := secp256k1.NewPointFromCoords(&xarr, &yarr) 82 + if err != nil { 83 + return nil, fmt.Errorf("invalid K-256 coordinates: %w", err) 84 + } 85 + pubK, err := secp256k1secec.NewPublicKeyFromPoint(p) 86 + if err != nil { 87 + return nil, fmt.Errorf("invalid K-256/secp256k1 public key: %w", err) 88 + } 89 + pub := PublicKeyK256{pubK256: pubK} 90 + err = pub.ensureBytes() 91 + if err != nil { 92 + return nil, err 93 + } 94 + return &pub, nil 95 + default: 96 + return nil, fmt.Errorf("unsupported JWK cryptography: %s", jwk.Curve) 97 + } 98 + } 99 + 100 + func (k *PublicKeyP256) JWK() (*JWK, error) { 101 + jwk := JWK{ 102 + KeyType: "EC", 103 + Curve: "P-256", 104 + X: base64.RawURLEncoding.EncodeToString(k.pubP256.X.Bytes()), 105 + Y: base64.RawURLEncoding.EncodeToString(k.pubP256.Y.Bytes()), 106 + } 107 + return &jwk, nil 108 + } 109 + 110 + func (k *PublicKeyK256) JWK() (*JWK, error) { 111 + raw := k.UncompressedBytes() 112 + if len(raw) != 65 { 113 + return nil, fmt.Errorf("unexpected K-256 bytes size") 114 + } 115 + xbytes := raw[1:33] 116 + ybytes := raw[33:65] 117 + jwk := JWK{ 118 + KeyType: "EC", 119 + Curve: "secp256k1", 120 + X: base64.RawURLEncoding.EncodeToString(xbytes), 121 + Y: base64.RawURLEncoding.EncodeToString(ybytes), 122 + } 123 + return &jwk, nil 124 + }
+98
atproto/crypto/jwk_test.go
··· 1 + package crypto 2 + 3 + import ( 4 + "testing" 5 + 6 + "github.com/stretchr/testify/assert" 7 + ) 8 + 9 + func TestParseJWK(t *testing.T) { 10 + assert := assert.New(t) 11 + 12 + jwkTestFixtures := []string{ 13 + // https://datatracker.ietf.org/doc/html/rfc7517#appendix-A.1 14 + `{ 15 + "kty":"EC", 16 + "crv":"P-256", 17 + "x":"MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4", 18 + "y":"4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM", 19 + "d":"870MB6gfuTJ4HtUnUvYMyJpr5eUZNP4Bk43bVdj3eAE", 20 + "use":"enc", 21 + "kid":"1" 22 + }`, 23 + // https://openid.net/specs/draft-jones-json-web-key-03.html; with kty in addition to alg 24 + `{ 25 + "alg":"EC", 26 + "kty":"EC", 27 + "crv":"P-256", 28 + "x":"MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4", 29 + "y":"4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM", 30 + "use":"enc", 31 + "kid":"1" 32 + }`, 33 + // https://w3c-ccg.github.io/lds-ecdsa-secp256k1-2019/; with kty in addition to alg 34 + `{ 35 + "alg": "EC", 36 + "kty": "EC", 37 + "crv": "secp256k1", 38 + "kid": "JUvpllMEYUZ2joO59UNui_XYDqxVqiFLLAJ8klWuPBw", 39 + "x": "dWCvM4fTdeM0KmloF57zxtBPXTOythHPMm1HCLrdd3A", 40 + "y": "36uMVGM7hnw-N6GnjFcihWE3SkrhMLzzLCdPMXPEXlA" 41 + }`, 42 + } 43 + 44 + for _, jwkBytes := range jwkTestFixtures { 45 + _, err := ParsePublicJWKBytes([]byte(jwkBytes)) 46 + assert.NoError(err) 47 + } 48 + } 49 + 50 + func TestP256GenJWK(t *testing.T) { 51 + assert := assert.New(t) 52 + 53 + priv, err := GeneratePrivateKeyP256() 54 + if err != nil { 55 + t.Fatal(err) 56 + } 57 + pub, err := priv.PublicKey() 58 + if err != nil { 59 + t.Fatal(err) 60 + } 61 + 62 + pk, ok := pub.(*PublicKeyP256) 63 + if !ok { 64 + t.Fatal() 65 + } 66 + jwk, err := pk.JWK() 67 + if err != nil { 68 + t.Fatal(err) 69 + } 70 + 71 + _, err = ParsePublicJWK(*jwk) 72 + assert.NoError(err) 73 + } 74 + 75 + func TestK256GenJWK(t *testing.T) { 76 + assert := assert.New(t) 77 + 78 + priv, err := GeneratePrivateKeyK256() 79 + if err != nil { 80 + t.Fatal(err) 81 + } 82 + pub, err := priv.PublicKey() 83 + if err != nil { 84 + t.Fatal(err) 85 + } 86 + 87 + pk, ok := pub.(*PublicKeyK256) 88 + if !ok { 89 + t.Fatal() 90 + } 91 + jwk, err := pk.JWK() 92 + if err != nil { 93 + t.Fatal(err) 94 + } 95 + 96 + _, err = ParsePublicJWK(*jwk) 97 + assert.NoError(err) 98 + }
+3
atproto/crypto/keys.go
··· 64 64 // For systems with no compressed/uncompressed distinction, returns the same 65 65 // value as Bytes(). 66 66 UncompressedBytes() []byte 67 + 68 + // Serialization as JWK struct (which can be marshalled to JSON) 69 + JWK() (*JWK, error) 67 70 } 68 71 69 72 var ErrInvalidSignature = errors.New("crytographic signature invalid")
+1 -1
atproto/data/parse.go
··· 11 11 12 12 func parseFloat(f float64) (int64, error) { 13 13 if f != float64(int64(f)) { 14 - return 0, fmt.Errorf("number was is not a safe integer: %f", f) 14 + return 0, fmt.Errorf("number is not a safe integer: %f", f) 15 15 } 16 16 return int64(f), nil 17 17 }
+65 -48
cmd/collectiondir/serve.go
··· 5 5 "context" 6 6 "encoding/csv" 7 7 "encoding/json" 8 + "errors" 8 9 "fmt" 9 10 "log/slog" 10 11 "net" 11 12 "net/http" 13 + _ "net/http/pprof" 12 14 "net/url" 13 15 "os" 14 16 "os/signal" ··· 193 195 if err != nil { 194 196 return fmt.Errorf("lru init, %w", err) 195 197 } 196 - cs.wg.Add(1) 197 - go cs.ingestReceiver() 198 198 cs.log = log 199 199 cs.ctx = cctx.Context 200 200 cs.AdminToken = cctx.String("admin-token") 201 201 cs.ExepctedAuthHeader = "Bearer " + cs.AdminToken 202 + cs.wg.Add(1) 203 + go cs.ingestReceiver() 202 204 pebblePath := cctx.String("pebble") 203 205 cs.pcd = &PebbleCollectionDirectory{ 204 206 log: cs.log, ··· 215 217 } 216 218 } 217 219 cs.statsCacheFresh.L = &cs.statsCacheLock 218 - errchan := make(chan error, 3) 219 - apiAddr := cctx.String("api-listen") 220 + 221 + apiServerEcho, err := cs.createApiServer(cctx.Context, cctx.String("api-listen")) 222 + if err != nil { 223 + return err 224 + } 220 225 cs.wg.Add(1) 221 - go func() { 222 - errchan <- cs.StartApiServer(cctx.Context, apiAddr) 223 - }() 224 - metricsAddr := cctx.String("metrics-listen") 226 + go func() { cs.StartApiServer(cctx.Context, apiServerEcho) }() 227 + 228 + cs.createMetricsServer(cctx.String("metrics-listen")) 225 229 cs.wg.Add(1) 226 - go func() { 227 - errchan <- cs.StartMetricsServer(cctx.Context, metricsAddr) 228 - }() 230 + go func() { cs.StartMetricsServer(cctx.Context) }() 229 231 230 232 upstream := cctx.String("upstream") 231 233 if upstream != "" { ··· 247 249 go cs.handleFirehose(fhevents) 248 250 } 249 251 250 - select { 251 - case <-signals: 252 - log.Info("received shutdown signal") 253 - go errchanlog(cs.log, "server error", errchan) 254 - return cs.Shutdown() 255 - case err := <-errchan: 256 - if err != nil { 257 - log.Error("server error", "err", err) 258 - go errchanlog(cs.log, "server error", errchan) 259 - return cs.Shutdown() 260 - } 261 - } 262 - return nil 252 + <-signals 253 + log.Info("received shutdown signal") 254 + return cs.Shutdown() 263 255 } 264 256 265 257 func (cs *collectionServer) openDau() error { ··· 282 274 return nil 283 275 } 284 276 285 - func errchanlog(log *slog.Logger, msg string, errchan <-chan error) { 286 - for err := range errchan { 287 - log.Error(msg, "err", err) 288 - } 289 - } 290 - 291 277 func (cs *collectionServer) Shutdown() error { 292 278 close(cs.shutdown) 293 - go func() { 279 + 280 + func() { 281 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) 282 + defer cancel() 283 + 294 284 cs.log.Info("metrics shutdown start") 295 - sherr := cs.metricsServer.Shutdown(context.Background()) 285 + sherr := cs.metricsServer.Shutdown(ctx) 296 286 cs.log.Info("metrics shutdown", "err", sherr) 297 287 }() 298 - cs.log.Info("api shutdown start...") 299 - err := cs.apiServer.Shutdown(context.Background()) 300 - cs.log.Info("api shutdown, thread wait...", "err", err) 301 - cs.wg.Wait() 288 + 289 + func() { 290 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 291 + defer cancel() 292 + 293 + cs.log.Info("api shutdown start...") 294 + err := cs.apiServer.Shutdown(ctx) 295 + cs.log.Info("api shutdown, thread wait...", "err", err) 296 + }() 297 + 302 298 cs.log.Info("threads done, db close...") 303 - ee := cs.pcd.Close() 304 - if ee != nil { 305 - cs.log.Error("failed to shutdown pebble", "err", ee) 299 + err := cs.pcd.Close() 300 + if err != nil { 301 + cs.log.Error("failed to shutdown pebble", "err", err) 306 302 } 307 303 cs.log.Info("db done. done.") 304 + cs.wg.Wait() 308 305 return err 309 306 } 310 307 ··· 384 381 } 385 382 } 386 383 387 - func (cs *collectionServer) StartMetricsServer(ctx context.Context, addr string) error { 388 - defer cs.wg.Done() 389 - defer cs.log.Info("metrics server exit") 384 + func (cs *collectionServer) createMetricsServer(addr string) { 385 + e := echo.New() 386 + e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) 387 + e.Any("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux)) 388 + 390 389 cs.metricsServer = &http.Server{ 391 390 Addr: addr, 392 - Handler: promhttp.Handler(), 391 + Handler: e, 393 392 } 394 - return cs.metricsServer.ListenAndServe() 395 393 } 396 394 397 - func (cs *collectionServer) StartApiServer(ctx context.Context, addr string) error { 395 + func (cs *collectionServer) StartMetricsServer(ctx context.Context) { 398 396 defer cs.wg.Done() 399 - defer cs.log.Info("api server exit") 397 + defer cs.log.Info("metrics server exit") 398 + 399 + err := cs.metricsServer.ListenAndServe() 400 + if err != nil && !errors.Is(err, http.ErrServerClosed) { 401 + slog.Error("error in metrics server", "err", err) 402 + os.Exit(1) 403 + } 404 + } 405 + 406 + func (cs *collectionServer) createApiServer(ctx context.Context, addr string) (*echo.Echo, error) { 400 407 var lc net.ListenConfig 401 408 li, err := lc.Listen(ctx, "tcp", addr) 402 409 if err != nil { 403 - return err 410 + return nil, err 404 411 } 405 412 e := echo.New() 406 413 e.HideBanner = true ··· 430 437 Handler: e, 431 438 } 432 439 cs.apiServer = srv 433 - return srv.Serve(li) 440 + return e, nil 441 + } 442 + 443 + func (cs *collectionServer) StartApiServer(ctx context.Context, e *echo.Echo) { 444 + defer cs.wg.Done() 445 + defer cs.log.Info("api server exit") 446 + err := cs.apiServer.Serve(e.Listener) 447 + if err != nil && !errors.Is(err, http.ErrServerClosed) { 448 + slog.Error("error in api server", "err", err) 449 + os.Exit(1) 450 + } 434 451 } 435 452 436 453 const statsCacheDuration = time.Second * 300
+54
events/dbpersist/dbpersist.go
··· 171 171 switch { 172 172 case e.RepoCommit != nil: 173 173 e.RepoCommit.Seq = int64(item.Seq) 174 + case e.RepoSync != nil: 175 + e.RepoSync.Seq = int64(item.Seq) 174 176 case e.RepoIdentity != nil: 175 177 e.RepoIdentity.Seq = int64(item.Seq) 176 178 case e.RepoAccount != nil: ··· 211 213 switch { 212 214 case e.RepoCommit != nil: 213 215 rer, err = p.RecordFromRepoCommit(ctx, e.RepoCommit) 216 + if err != nil { 217 + return err 218 + } 219 + case e.RepoSync != nil: 220 + rer, err = p.RecordFromRepoSync(ctx, e.RepoSync) 214 221 if err != nil { 215 222 return err 216 223 } ··· 320 327 return &rer, nil 321 328 } 322 329 330 + func (p *DbPersistence) RecordFromRepoSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) (*RepoEventRecord, error) { 331 + 332 + uid, err := p.uidForDid(ctx, evt.Did) 333 + if err != nil { 334 + return nil, err 335 + } 336 + 337 + t, err := time.Parse(util.ISO8601, evt.Time) 338 + if err != nil { 339 + return nil, err 340 + } 341 + 342 + rer := RepoEventRecord{ 343 + Repo: uid, 344 + Type: "repo_sync", 345 + Time: t, 346 + Rev: evt.Rev, 347 + } 348 + 349 + return &rer, nil 350 + } 351 + 323 352 func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 324 353 pageSize := 1000 325 354 ··· 398 427 switch { 399 428 case record.Commit != nil: 400 429 streamEvent, err = p.hydrateCommit(ctx, record) 430 + case record.Type == "repo_sync": 431 + streamEvent, err = p.hydrateSyncEvent(ctx, record) 401 432 case record.Type == "repo_identity": 402 433 streamEvent, err = p.hydrateIdentityEvent(ctx, record) 403 434 case record.Type == "repo_account": ··· 549 580 } 550 581 551 582 return &events.XRPCStreamEvent{RepoCommit: out}, nil 583 + } 584 + 585 + func (p *DbPersistence) hydrateSyncEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 586 + 587 + did, err := p.didForUid(ctx, rer.Repo) 588 + if err != nil { 589 + return nil, err 590 + } 591 + 592 + evt := &comatproto.SyncSubscribeRepos_Sync{ 593 + Seq: int64(rer.Seq), 594 + Did: did, 595 + Time: rer.Time.Format(util.ISO8601), 596 + Rev: rer.Rev, 597 + } 598 + 599 + cs, err := p.readCarSlice(ctx, rer) 600 + if err != nil { 601 + return nil, fmt.Errorf("read car slice: %w", err) 602 + } 603 + evt.Blocks = cs 604 + 605 + return &events.XRPCStreamEvent{RepoSync: evt}, nil 552 606 } 553 607 554 608 func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {
+18
events/diskpersist/diskpersist.go
··· 282 282 evtKindTombstone = 3 283 283 evtKindIdentity = 4 284 284 evtKindAccount = 5 285 + evtKindSync = 6 285 286 ) 286 287 287 288 var emptyHeader = make([]byte, headerSize) ··· 455 456 switch { 456 457 case e.RepoCommit != nil: 457 458 e.RepoCommit.Seq = seq 459 + case e.RepoSync != nil: 460 + e.RepoSync.Seq = seq 458 461 case e.RepoIdentity != nil: 459 462 e.RepoIdentity.Seq = seq 460 463 case e.RepoAccount != nil: ··· 503 506 evtKind = evtKindCommit 504 507 did = e.RepoCommit.Repo 505 508 if err := e.RepoCommit.MarshalCBOR(cw); err != nil { 509 + return fmt.Errorf("failed to marshal: %w", err) 510 + } 511 + case e.RepoSync != nil: 512 + evtKind = evtKindSync 513 + did = e.RepoSync.Did 514 + if err := e.RepoSync.MarshalCBOR(cw); err != nil { 506 515 return fmt.Errorf("failed to marshal: %w", err) 507 516 } 508 517 case e.RepoIdentity != nil: ··· 727 736 } 728 737 evt.Seq = h.Seq 729 738 if err := cb(&events.XRPCStreamEvent{RepoCommit: &evt}); err != nil { 739 + return nil, err 740 + } 741 + case evtKindSync: 742 + var evt atproto.SyncSubscribeRepos_Sync 743 + if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 744 + return nil, err 745 + } 746 + evt.Seq = h.Seq 747 + if err := cb(&events.XRPCStreamEvent{RepoSync: &evt}); err != nil { 730 748 return nil, err 731 749 } 732 750 case evtKindIdentity:
+2
events/yolopersist/yolopersist.go
··· 28 28 switch { 29 29 case e.RepoCommit != nil: 30 30 e.RepoCommit.Seq = yp.seq 31 + case e.RepoSync != nil: 32 + e.RepoSync.Seq = yp.seq 31 33 case e.RepoIdentity != nil: 32 34 e.RepoIdentity.Seq = yp.seq 33 35 case e.RepoAccount != nil:
-1
lex/gen.go
··· 135 135 pf("\t\"fmt\"\n") 136 136 pf("\t\"encoding/json\"\n") 137 137 pf("\tcbg \"github.com/whyrusleeping/cbor-gen\"\n") 138 - pf("\t\"github.com/bluesky-social/indigo/xrpc\"\n") 139 138 pf("\t\"github.com/bluesky-social/indigo/lex/util\"\n") 140 139 for _, xpkg := range packages { 141 140 if xpkg.Prefix != pkg.Prefix {
+4 -4
lex/type_schema.go
··· 54 54 pf := printerf(w) 55 55 fname := typename 56 56 57 - params := "ctx context.Context, c *xrpc.Client" 57 + params := "ctx context.Context, c util.LexClient" 58 58 inpvar := "nil" 59 59 inpenc := "" 60 60 ··· 161 161 var reqtype string 162 162 switch s.Type { 163 163 case "procedure": 164 - reqtype = "xrpc.Procedure" 164 + reqtype = "util.Procedure" 165 165 case "query": 166 - reqtype = "xrpc.Query" 166 + reqtype = "util.Query" 167 167 default: 168 168 return fmt.Errorf("can only generate RPC for Query or Procedure (got %s)", s.Type) 169 169 } 170 170 171 - pf("\tif err := c.Do(ctx, %s, %q, \"%s\", %s, %s, %s); err != nil {\n", reqtype, inpenc, s.id, queryparams, inpvar, outvar) 171 + pf("\tif err := c.LexDo(ctx, %s, %q, \"%s\", %s, %s, %s); err != nil {\n", reqtype, inpenc, s.id, queryparams, inpvar, outvar) 172 172 pf("\t\treturn %s\n", errRet) 173 173 pf("\t}\n\n") 174 174 pf("\treturn %s\n", outRet)
+18
lex/util/client.go
··· 1 + package util 2 + 3 + import ( 4 + "context" 5 + "net/http" 6 + ) 7 + 8 + const ( 9 + Query = http.MethodGet 10 + Procedure = http.MethodPost 11 + ) 12 + 13 + // API client interface used in lexgen. 14 + // 15 + // 'method' is the HTTP method type. 'inputEncoding' is the Content-Type for bodyData in Procedure calls. 'params' are query parameters. 'bodyData' should be either 'nil', an [io.Reader], or a type which can be marshalled to JSON. 'out' is optional; if not nil it should be a pointer to a type which can be un-Marshaled as JSON, for the response body. 16 + type LexClient interface { 17 + LexDo(ctx context.Context, method string, inputEncoding string, endpoint string, params map[string]any, bodyData any, out any) error 18 + }
+10 -8
xrpc/xrpc.go
··· 34 34 return c.Client 35 35 } 36 36 37 - type XRPCRequestType int 37 + var ( 38 + Query = http.MethodGet 39 + Procedure = http.MethodPost 40 + ) 38 41 39 42 type AuthInfo struct { 40 43 AccessJwt string `json:"accessJwt"` ··· 110 113 Reset time.Time 111 114 } 112 115 113 - const ( 114 - Query = XRPCRequestType(iota) 115 - Procedure 116 - ) 117 - 118 116 // makeParams converts a map of string keys and any values into a URL-encoded string. 119 117 // If a value is a slice of strings, it will be joined with commas. 120 118 // Generally the values will be strings, numbers, booleans, or slices of strings ··· 133 131 return params.Encode() 134 132 } 135 133 136 - func (c *Client) Do(ctx context.Context, kind XRPCRequestType, inpenc string, method string, params map[string]interface{}, bodyobj interface{}, out interface{}) error { 134 + func (c *Client) Do(ctx context.Context, kind string, inpenc string, method string, params map[string]interface{}, bodyobj interface{}, out interface{}) error { 137 135 var body io.Reader 138 136 if bodyobj != nil { 139 137 if rr, ok := bodyobj.(io.Reader); ok { ··· 155 153 case Procedure: 156 154 m = "POST" 157 155 default: 158 - return fmt.Errorf("unsupported request kind: %d", kind) 156 + return fmt.Errorf("unsupported request kind: %s", kind) 159 157 } 160 158 161 159 var paramStr string ··· 227 225 228 226 return nil 229 227 } 228 + 229 + func (c *Client) LexDo(ctx context.Context, method string, inputEncoding string, endpoint string, params map[string]any, bodyData any, out any) error { 230 + return c.Do(ctx, method, inputEncoding, endpoint, params, bodyData, out) 231 + }