+7
cmd/cocoon/main.go
+7
cmd/cocoon/main.go
···
9
"os"
10
"time"
11
12
"github.com/bluesky-social/indigo/atproto/atcrypto"
13
"github.com/bluesky-social/indigo/atproto/syntax"
14
"github.com/haileyok/cocoon/internal/helpers"
···
154
Name: "fallback-proxy",
155
EnvVars: []string{"COCOON_FALLBACK_PROXY"},
156
},
157
},
158
Commands: []*cli.Command{
159
runServe,
···
177
Flags: []cli.Flag{},
178
Action: func(cmd *cli.Context) error {
179
180
s, err := server.New(&server.Args{
181
Addr: cmd.String("addr"),
182
DbName: cmd.String("db-name"),
183
DbType: cmd.String("db-type"),
···
9
"os"
10
"time"
11
12
+
"github.com/bluesky-social/go-util/pkg/telemetry"
13
"github.com/bluesky-social/indigo/atproto/atcrypto"
14
"github.com/bluesky-social/indigo/atproto/syntax"
15
"github.com/haileyok/cocoon/internal/helpers"
···
155
Name: "fallback-proxy",
156
EnvVars: []string{"COCOON_FALLBACK_PROXY"},
157
},
158
+
telemetry.CLIFlagDebug,
159
+
telemetry.CLIFlagMetricsListenAddress,
160
},
161
Commands: []*cli.Command{
162
runServe,
···
180
Flags: []cli.Flag{},
181
Action: func(cmd *cli.Context) error {
182
183
+
logger := telemetry.StartLogger(cmd)
184
+
telemetry.StartMetrics(cmd)
185
+
186
s, err := server.New(&server.Args{
187
+
Logger: logger,
188
Addr: cmd.String("addr"),
189
DbName: cmd.String("db-name"),
190
DbType: cmd.String("db-type"),
+30
metrics/metrics.go
+30
metrics/metrics.go
···
···
1
+
package metrics
2
+
3
+
import (
4
+
"github.com/prometheus/client_golang/prometheus"
5
+
"github.com/prometheus/client_golang/prometheus/promauto"
6
+
)
7
+
8
+
const (
9
+
NAMESPACE = "cocoon"
10
+
)
11
+
12
+
var (
13
+
RelaysConnected = promauto.NewGaugeVec(prometheus.GaugeOpts{
14
+
Namespace: NAMESPACE,
15
+
Name: "relays_connected",
16
+
Help: "number of connected relays, by host",
17
+
}, []string{"host"})
18
+
19
+
RelaySends = promauto.NewCounterVec(prometheus.CounterOpts{
20
+
Namespace: NAMESPACE,
21
+
Name: "relay_sends",
22
+
Help: "number of events sent to a relay, by host",
23
+
}, []string{"host"})
24
+
25
+
RepoOperations = promauto.NewCounterVec(prometheus.CounterOpts{
26
+
Namespace: NAMESPACE,
27
+
Name: "repo_operations",
28
+
Help: "number of operations made against repos",
29
+
}, []string{"kind"})
30
+
)
+1
-1
oauth/dpop/jti_cache.go
+1
-1
oauth/dpop/jti_cache.go
+2
-1
server/handle_account.go
+2
-1
server/handle_account.go
···
12
13
func (s *Server) handleAccount(e echo.Context) error {
14
ctx := e.Request().Context()
15
16
repo, sess, err := s.getSessionRepoOrErr(e)
17
if err != nil {
···
22
23
var tokens []provider.OauthToken
24
if err := s.db.Raw(ctx, "SELECT * FROM oauth_tokens WHERE sub = ? AND created_at < ? ORDER BY created_at ASC", nil, repo.Repo.Did, oldestPossibleSession).Scan(&tokens).Error; err != nil {
25
-
s.logger.Error("couldnt fetch oauth sessions for account", "did", repo.Repo.Did, "error", err)
26
sess.AddFlash("Unable to fetch sessions. See server logs for more details.", "error")
27
sess.Save(e.Request(), e.Response())
28
return e.Render(200, "account.html", map[string]any{
···
12
13
func (s *Server) handleAccount(e echo.Context) error {
14
ctx := e.Request().Context()
15
+
logger := s.logger.With("name", "handleAuth")
16
17
repo, sess, err := s.getSessionRepoOrErr(e)
18
if err != nil {
···
23
24
var tokens []provider.OauthToken
25
if err := s.db.Raw(ctx, "SELECT * FROM oauth_tokens WHERE sub = ? AND created_at < ? ORDER BY created_at ASC", nil, repo.Repo.Did, oldestPossibleSession).Scan(&tokens).Error; err != nil {
26
+
logger.Error("couldnt fetch oauth sessions for account", "did", repo.Repo.Did, "error", err)
27
sess.AddFlash("Unable to fetch sessions. See server logs for more details.", "error")
28
sess.Save(e.Request(), e.Response())
29
return e.Render(200, "account.html", map[string]any{
+3
-2
server/handle_account_revoke.go
+3
-2
server/handle_account_revoke.go
···
11
12
func (s *Server) handleAccountRevoke(e echo.Context) error {
13
ctx := e.Request().Context()
14
15
var req AccountRevokeInput
16
if err := e.Bind(&req); err != nil {
17
-
s.logger.Error("could not bind account revoke request", "error", err)
18
return helpers.ServerError(e, nil)
19
}
20
···
24
}
25
26
if err := s.db.Exec(ctx, "DELETE FROM oauth_tokens WHERE sub = ? AND token = ?", nil, repo.Repo.Did, req.Token).Error; err != nil {
27
-
s.logger.Error("couldnt delete oauth session for account", "did", repo.Repo.Did, "token", req.Token, "error", err)
28
sess.AddFlash("Unable to revoke session. See server logs for more details.", "error")
29
sess.Save(e.Request(), e.Response())
30
return e.Redirect(303, "/account")
···
11
12
func (s *Server) handleAccountRevoke(e echo.Context) error {
13
ctx := e.Request().Context()
14
+
logger := s.logger.With("name", "handleAcocuntRevoke")
15
16
var req AccountRevokeInput
17
if err := e.Bind(&req); err != nil {
18
+
logger.Error("could not bind account revoke request", "error", err)
19
return helpers.ServerError(e, nil)
20
}
21
···
25
}
26
27
if err := s.db.Exec(ctx, "DELETE FROM oauth_tokens WHERE sub = ? AND token = ?", nil, repo.Repo.Did, req.Token).Error; err != nil {
28
+
logger.Error("couldnt delete oauth session for account", "did", repo.Repo.Did, "token", req.Token, "error", err)
29
sess.AddFlash("Unable to revoke session. See server logs for more details.", "error")
30
sess.Save(e.Request(), e.Response())
31
return e.Redirect(303, "/account")
+2
-1
server/handle_account_signin.go
+2
-1
server/handle_account_signin.go
···
63
64
func (s *Server) handleAccountSigninPost(e echo.Context) error {
65
ctx := e.Request().Context()
66
+
logger := s.logger.With("name", "handleAccountSigninPost")
67
68
var req OauthSigninInput
69
if err := e.Bind(&req); err != nil {
70
+
logger.Error("error binding sign in req", "error", err)
71
return helpers.ServerError(e, nil)
72
}
73
+4
-2
server/handle_identity_get_recommended_did_credentials.go
+4
-2
server/handle_identity_get_recommended_did_credentials.go
···
8
)
9
10
func (s *Server) handleGetRecommendedDidCredentials(e echo.Context) error {
11
repo := e.Get("repo").(*models.RepoActor)
12
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
13
if err != nil {
14
-
s.logger.Error("error parsing key", "error", err)
15
return helpers.ServerError(e, nil)
16
}
17
creds, err := s.plcClient.CreateDidCredentials(k, "", repo.Actor.Handle)
18
if err != nil {
19
-
s.logger.Error("error crating did credentials", "error", err)
20
return helpers.ServerError(e, nil)
21
}
22
···
8
)
9
10
func (s *Server) handleGetRecommendedDidCredentials(e echo.Context) error {
11
+
logger := s.logger.With("name", "handleIdentityGetRecommendedDidCredentials")
12
+
13
repo := e.Get("repo").(*models.RepoActor)
14
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
15
if err != nil {
16
+
logger.Error("error parsing key", "error", err)
17
return helpers.ServerError(e, nil)
18
}
19
creds, err := s.plcClient.CreateDidCredentials(k, "", repo.Actor.Handle)
20
if err != nil {
21
+
logger.Error("error crating did credentials", "error", err)
22
return helpers.ServerError(e, nil)
23
}
24
+3
-2
server/handle_identity_request_plc_operation.go
+3
-2
server/handle_identity_request_plc_operation.go
···
11
12
func (s *Server) handleIdentityRequestPlcOperationSignature(e echo.Context) error {
13
ctx := e.Request().Context()
14
15
urepo := e.Get("repo").(*models.RepoActor)
16
···
18
eat := time.Now().Add(10 * time.Minute).UTC()
19
20
if err := s.db.Exec(ctx, "UPDATE repos SET plc_operation_code = ?, plc_operation_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
21
-
s.logger.Error("error updating user", "error", err)
22
return helpers.ServerError(e, nil)
23
}
24
25
if err := s.sendPlcTokenReset(urepo.Email, urepo.Handle, code); err != nil {
26
-
s.logger.Error("error sending mail", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
···
11
12
func (s *Server) handleIdentityRequestPlcOperationSignature(e echo.Context) error {
13
ctx := e.Request().Context()
14
+
logger := s.logger.With("name", "handleIdentityRequestPlcOperationSignature")
15
16
urepo := e.Get("repo").(*models.RepoActor)
17
···
19
eat := time.Now().Add(10 * time.Minute).UTC()
20
21
if err := s.db.Exec(ctx, "UPDATE repos SET plc_operation_code = ?, plc_operation_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
22
+
logger.Error("error updating user", "error", err)
23
return helpers.ServerError(e, nil)
24
}
25
26
if err := s.sendPlcTokenReset(urepo.Email, urepo.Handle, code); err != nil {
27
+
logger.Error("error sending mail", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
+7
-5
server/handle_identity_sign_plc_operation.go
+7
-5
server/handle_identity_sign_plc_operation.go
···
27
}
28
29
func (s *Server) handleSignPlcOperation(e echo.Context) error {
30
repo := e.Get("repo").(*models.RepoActor)
31
32
var req ComAtprotoSignPlcOperationRequest
33
if err := e.Bind(&req); err != nil {
34
-
s.logger.Error("error binding", "error", err)
35
return helpers.ServerError(e, nil)
36
}
37
···
54
ctx := context.WithValue(e.Request().Context(), "skip-cache", true)
55
log, err := identity.FetchDidAuditLog(ctx, nil, repo.Repo.Did)
56
if err != nil {
57
-
s.logger.Error("error fetching doc", "error", err)
58
return helpers.ServerError(e, nil)
59
}
60
···
83
84
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
85
if err != nil {
86
-
s.logger.Error("error parsing signing key", "error", err)
87
return helpers.ServerError(e, nil)
88
}
89
90
if err := s.plcClient.SignOp(k, &op); err != nil {
91
-
s.logger.Error("error signing plc operation", "error", err)
92
return helpers.ServerError(e, nil)
93
}
94
95
if err := s.db.Exec(ctx, "UPDATE repos SET plc_operation_code = NULL, plc_operation_code_expires_at = NULL WHERE did = ?", nil, repo.Repo.Did).Error; err != nil {
96
-
s.logger.Error("error updating repo", "error", err)
97
return helpers.ServerError(e, nil)
98
}
99
···
27
}
28
29
func (s *Server) handleSignPlcOperation(e echo.Context) error {
30
+
logger := s.logger.With("name", "handleSignPlcOperation")
31
+
32
repo := e.Get("repo").(*models.RepoActor)
33
34
var req ComAtprotoSignPlcOperationRequest
35
if err := e.Bind(&req); err != nil {
36
+
logger.Error("error binding", "error", err)
37
return helpers.ServerError(e, nil)
38
}
39
···
56
ctx := context.WithValue(e.Request().Context(), "skip-cache", true)
57
log, err := identity.FetchDidAuditLog(ctx, nil, repo.Repo.Did)
58
if err != nil {
59
+
logger.Error("error fetching doc", "error", err)
60
return helpers.ServerError(e, nil)
61
}
62
···
85
86
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
87
if err != nil {
88
+
logger.Error("error parsing signing key", "error", err)
89
return helpers.ServerError(e, nil)
90
}
91
92
if err := s.plcClient.SignOp(k, &op); err != nil {
93
+
logger.Error("error signing plc operation", "error", err)
94
return helpers.ServerError(e, nil)
95
}
96
97
if err := s.db.Exec(ctx, "UPDATE repos SET plc_operation_code = NULL, plc_operation_code_expires_at = NULL WHERE did = ?", nil, repo.Repo.Did).Error; err != nil {
98
+
logger.Error("error updating repo", "error", err)
99
return helpers.ServerError(e, nil)
100
}
101
+6
-4
server/handle_identity_submit_plc_operation.go
+6
-4
server/handle_identity_submit_plc_operation.go
···
21
}
22
23
func (s *Server) handleSubmitPlcOperation(e echo.Context) error {
24
repo := e.Get("repo").(*models.RepoActor)
25
26
var req ComAtprotoSubmitPlcOperationRequest
27
if err := e.Bind(&req); err != nil {
28
-
s.logger.Error("error binding", "error", err)
29
return helpers.ServerError(e, nil)
30
}
31
···
40
41
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
42
if err != nil {
43
-
s.logger.Error("error parsing key", "error", err)
44
return helpers.ServerError(e, nil)
45
}
46
required, err := s.plcClient.CreateDidCredentials(k, "", repo.Actor.Handle)
47
if err != nil {
48
-
s.logger.Error("error crating did credentials", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
···
72
}
73
74
if err := s.passport.BustDoc(context.TODO(), repo.Repo.Did); err != nil {
75
-
s.logger.Warn("error busting did doc", "error", err)
76
}
77
78
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
···
21
}
22
23
func (s *Server) handleSubmitPlcOperation(e echo.Context) error {
24
+
logger := s.logger.With("name", "handleIdentitySubmitPlcOperation")
25
+
26
repo := e.Get("repo").(*models.RepoActor)
27
28
var req ComAtprotoSubmitPlcOperationRequest
29
if err := e.Bind(&req); err != nil {
30
+
logger.Error("error binding", "error", err)
31
return helpers.ServerError(e, nil)
32
}
33
···
42
43
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
44
if err != nil {
45
+
logger.Error("error parsing key", "error", err)
46
return helpers.ServerError(e, nil)
47
}
48
required, err := s.plcClient.CreateDidCredentials(k, "", repo.Actor.Handle)
49
if err != nil {
50
+
logger.Error("error crating did credentials", "error", err)
51
return helpers.ServerError(e, nil)
52
}
53
···
74
}
75
76
if err := s.passport.BustDoc(context.TODO(), repo.Repo.Did); err != nil {
77
+
logger.Warn("error busting did doc", "error", err)
78
}
79
80
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
+7
-5
server/handle_identity_update_handle.go
+7
-5
server/handle_identity_update_handle.go
···
22
}
23
24
func (s *Server) handleIdentityUpdateHandle(e echo.Context) error {
25
repo := e.Get("repo").(*models.RepoActor)
26
27
var req ComAtprotoIdentityUpdateHandleRequest
28
if err := e.Bind(&req); err != nil {
29
-
s.logger.Error("error binding", "error", err)
30
return helpers.ServerError(e, nil)
31
}
32
···
41
if strings.HasPrefix(repo.Repo.Did, "did:plc:") {
42
log, err := identity.FetchDidAuditLog(ctx, nil, repo.Repo.Did)
43
if err != nil {
44
-
s.logger.Error("error fetching doc", "error", err)
45
return helpers.ServerError(e, nil)
46
}
47
···
68
69
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
70
if err != nil {
71
-
s.logger.Error("error parsing signing key", "error", err)
72
return helpers.ServerError(e, nil)
73
}
74
···
82
}
83
84
if err := s.passport.BustDoc(context.TODO(), repo.Repo.Did); err != nil {
85
-
s.logger.Warn("error busting did doc", "error", err)
86
}
87
88
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
···
95
})
96
97
if err := s.db.Exec(ctx, "UPDATE actors SET handle = ? WHERE did = ?", nil, req.Handle, repo.Repo.Did).Error; err != nil {
98
-
s.logger.Error("error updating handle in db", "error", err)
99
return helpers.ServerError(e, nil)
100
}
101
···
22
}
23
24
func (s *Server) handleIdentityUpdateHandle(e echo.Context) error {
25
+
logger := s.logger.With("name", "handleIdentityUpdateHandle")
26
+
27
repo := e.Get("repo").(*models.RepoActor)
28
29
var req ComAtprotoIdentityUpdateHandleRequest
30
if err := e.Bind(&req); err != nil {
31
+
logger.Error("error binding", "error", err)
32
return helpers.ServerError(e, nil)
33
}
34
···
43
if strings.HasPrefix(repo.Repo.Did, "did:plc:") {
44
log, err := identity.FetchDidAuditLog(ctx, nil, repo.Repo.Did)
45
if err != nil {
46
+
logger.Error("error fetching doc", "error", err)
47
return helpers.ServerError(e, nil)
48
}
49
···
70
71
k, err := atcrypto.ParsePrivateBytesK256(repo.SigningKey)
72
if err != nil {
73
+
logger.Error("error parsing signing key", "error", err)
74
return helpers.ServerError(e, nil)
75
}
76
···
84
}
85
86
if err := s.passport.BustDoc(context.TODO(), repo.Repo.Did); err != nil {
87
+
logger.Warn("error busting did doc", "error", err)
88
}
89
90
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
···
97
})
98
99
if err := s.db.Exec(ctx, "UPDATE actors SET handle = ? WHERE did = ?", nil, req.Handle, repo.Repo.Did).Error; err != nil {
100
+
logger.Error("error updating handle in db", "error", err)
101
return helpers.ServerError(e, nil)
102
}
103
+11
-10
server/handle_import_repo.go
+11
-10
server/handle_import_repo.go
···
19
20
func (s *Server) handleRepoImportRepo(e echo.Context) error {
21
ctx := e.Request().Context()
22
23
urepo := e.Get("repo").(*models.RepoActor)
24
25
b, err := io.ReadAll(e.Request().Body)
26
if err != nil {
27
-
s.logger.Error("could not read bytes in import request", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
···
32
33
cs, err := car.NewCarReader(bytes.NewReader(b))
34
if err != nil {
35
-
s.logger.Error("could not read car in import request", "error", err)
36
return helpers.ServerError(e, nil)
37
}
38
39
orderedBlocks := []blocks.Block{}
40
currBlock, err := cs.Next()
41
if err != nil {
42
-
s.logger.Error("could not get first block from car", "error", err)
43
return helpers.ServerError(e, nil)
44
}
45
currBlockCt := 1
46
47
for currBlock != nil {
48
-
s.logger.Info("someone is importing their repo", "block", currBlockCt)
49
orderedBlocks = append(orderedBlocks, currBlock)
50
next, _ := cs.Next()
51
currBlock = next
···
55
slices.Reverse(orderedBlocks)
56
57
if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil {
58
-
s.logger.Error("could not insert blocks", "error", err)
59
return helpers.ServerError(e, nil)
60
}
61
62
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
63
if err != nil {
64
-
s.logger.Error("could not open repo", "error", err)
65
return helpers.ServerError(e, nil)
66
}
67
···
76
cidStr := cid.String()
77
b, err := bs.Get(context.TODO(), cid)
78
if err != nil {
79
-
s.logger.Error("record bytes don't exist in blockstore", "error", err)
80
return helpers.ServerError(e, nil)
81
}
82
···
96
return nil
97
}); err != nil {
98
tx.Rollback()
99
-
s.logger.Error("record bytes don't exist in blockstore", "error", err)
100
return helpers.ServerError(e, nil)
101
}
102
···
104
105
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
106
if err != nil {
107
-
s.logger.Error("error committing", "error", err)
108
return helpers.ServerError(e, nil)
109
}
110
111
if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil {
112
-
s.logger.Error("error updating repo after commit", "error", err)
113
return helpers.ServerError(e, nil)
114
}
115
···
19
20
func (s *Server) handleRepoImportRepo(e echo.Context) error {
21
ctx := e.Request().Context()
22
+
logger := s.logger.With("name", "handleImportRepo")
23
24
urepo := e.Get("repo").(*models.RepoActor)
25
26
b, err := io.ReadAll(e.Request().Body)
27
if err != nil {
28
+
logger.Error("could not read bytes in import request", "error", err)
29
return helpers.ServerError(e, nil)
30
}
31
···
33
34
cs, err := car.NewCarReader(bytes.NewReader(b))
35
if err != nil {
36
+
logger.Error("could not read car in import request", "error", err)
37
return helpers.ServerError(e, nil)
38
}
39
40
orderedBlocks := []blocks.Block{}
41
currBlock, err := cs.Next()
42
if err != nil {
43
+
logger.Error("could not get first block from car", "error", err)
44
return helpers.ServerError(e, nil)
45
}
46
currBlockCt := 1
47
48
for currBlock != nil {
49
+
logger.Info("someone is importing their repo", "block", currBlockCt)
50
orderedBlocks = append(orderedBlocks, currBlock)
51
next, _ := cs.Next()
52
currBlock = next
···
56
slices.Reverse(orderedBlocks)
57
58
if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil {
59
+
logger.Error("could not insert blocks", "error", err)
60
return helpers.ServerError(e, nil)
61
}
62
63
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
64
if err != nil {
65
+
logger.Error("could not open repo", "error", err)
66
return helpers.ServerError(e, nil)
67
}
68
···
77
cidStr := cid.String()
78
b, err := bs.Get(context.TODO(), cid)
79
if err != nil {
80
+
logger.Error("record bytes don't exist in blockstore", "error", err)
81
return helpers.ServerError(e, nil)
82
}
83
···
97
return nil
98
}); err != nil {
99
tx.Rollback()
100
+
logger.Error("record bytes don't exist in blockstore", "error", err)
101
return helpers.ServerError(e, nil)
102
}
103
···
105
106
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
107
if err != nil {
108
+
logger.Error("error committing", "error", err)
109
return helpers.ServerError(e, nil)
110
}
111
112
if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil {
113
+
logger.Error("error updating repo after commit", "error", err)
114
return helpers.ServerError(e, nil)
115
}
116
+8
-7
server/handle_oauth_par.go
+8
-7
server/handle_oauth_par.go
···
20
21
func (s *Server) handleOauthPar(e echo.Context) error {
22
ctx := e.Request().Context()
23
24
var parRequest provider.ParRequest
25
if err := e.Bind(&parRequest); err != nil {
26
-
s.logger.Error("error binding for par request", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
30
if err := e.Validate(parRequest); err != nil {
31
-
s.logger.Error("missing parameters for par request", "error", err)
32
return helpers.InputError(e, nil)
33
}
34
···
45
"error": "use_dpop_nonce",
46
})
47
}
48
-
s.logger.Error("error getting dpop proof", "error", err)
49
return helpers.InputError(e, nil)
50
}
51
···
55
AllowMissingDpopProof: true,
56
})
57
if err != nil {
58
-
s.logger.Error("error authenticating client", "client_id", parRequest.ClientID, "error", err)
59
return helpers.InputError(e, to.StringPtr(err.Error()))
60
}
61
···
66
} else {
67
if !client.Metadata.DpopBoundAccessTokens {
68
msg := "dpop bound access tokens are not enabled for this client"
69
-
s.logger.Error(msg)
70
return helpers.InputError(e, &msg)
71
}
72
73
if dpopProof.JKT != *parRequest.DpopJkt {
74
msg := "supplied dpop jkt does not match header dpop jkt"
75
-
s.logger.Error(msg)
76
return helpers.InputError(e, &msg)
77
}
78
}
···
89
}
90
91
if err := s.db.Create(ctx, authRequest, nil).Error; err != nil {
92
-
s.logger.Error("error creating auth request in db", "error", err)
93
return helpers.ServerError(e, nil)
94
}
95
···
20
21
func (s *Server) handleOauthPar(e echo.Context) error {
22
ctx := e.Request().Context()
23
+
logger := s.logger.With("name", "handleOauthPar")
24
25
var parRequest provider.ParRequest
26
if err := e.Bind(&parRequest); err != nil {
27
+
logger.Error("error binding for par request", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
31
if err := e.Validate(parRequest); err != nil {
32
+
logger.Error("missing parameters for par request", "error", err)
33
return helpers.InputError(e, nil)
34
}
35
···
46
"error": "use_dpop_nonce",
47
})
48
}
49
+
logger.Error("error getting dpop proof", "error", err)
50
return helpers.InputError(e, nil)
51
}
52
···
56
AllowMissingDpopProof: true,
57
})
58
if err != nil {
59
+
logger.Error("error authenticating client", "client_id", parRequest.ClientID, "error", err)
60
return helpers.InputError(e, to.StringPtr(err.Error()))
61
}
62
···
67
} else {
68
if !client.Metadata.DpopBoundAccessTokens {
69
msg := "dpop bound access tokens are not enabled for this client"
70
+
logger.Error(msg)
71
return helpers.InputError(e, &msg)
72
}
73
74
if dpopProof.JKT != *parRequest.DpopJkt {
75
msg := "supplied dpop jkt does not match header dpop jkt"
76
+
logger.Error(msg)
77
return helpers.InputError(e, &msg)
78
}
79
}
···
90
}
91
92
if err := s.db.Create(ctx, authRequest, nil).Error; err != nil {
93
+
logger.Error("error creating auth request in db", "error", err)
94
return helpers.ServerError(e, nil)
95
}
96
+9
-8
server/handle_oauth_token.go
+9
-8
server/handle_oauth_token.go
···
39
40
func (s *Server) handleOauthToken(e echo.Context) error {
41
ctx := e.Request().Context()
42
43
var req OauthTokenRequest
44
if err := e.Bind(&req); err != nil {
45
-
s.logger.Error("error binding token request", "error", err)
46
return helpers.ServerError(e, nil)
47
}
48
···
58
"error": "use_dpop_nonce",
59
})
60
}
61
-
s.logger.Error("error getting dpop proof", "error", err)
62
return helpers.InputError(e, nil)
63
}
64
···
66
AllowMissingDpopProof: true,
67
})
68
if err != nil {
69
-
s.logger.Error("error authenticating client", "client_id", req.ClientID, "error", err)
70
return helpers.InputError(e, to.StringPtr(err.Error()))
71
}
72
···
87
var authReq provider.OauthAuthorizationRequest
88
// get the lil guy and delete him
89
if err := s.db.Raw(ctx, "DELETE FROM oauth_authorization_requests WHERE code = ? RETURNING *", nil, *req.Code).Scan(&authReq).Error; err != nil {
90
-
s.logger.Error("error finding authorization request", "error", err)
91
return helpers.ServerError(e, nil)
92
}
93
···
112
case "S256":
113
inputChal, err := base64.RawURLEncoding.DecodeString(*authReq.Parameters.CodeChallenge)
114
if err != nil {
115
-
s.logger.Error("error decoding code challenge", "error", err)
116
return helpers.ServerError(e, nil)
117
}
118
···
173
RefreshToken: refreshToken,
174
Ip: authReq.Ip,
175
}, nil).Error; err != nil {
176
-
s.logger.Error("error creating token in db", "error", err)
177
return helpers.ServerError(e, nil)
178
}
179
···
202
203
var oauthToken provider.OauthToken
204
if err := s.db.Raw(ctx, "SELECT * FROM oauth_tokens WHERE refresh_token = ?", nil, req.RefreshToken).Scan(&oauthToken).Error; err != nil {
205
-
s.logger.Error("error finding oauth token by refresh token", "error", err, "refresh_token", req.RefreshToken)
206
return helpers.ServerError(e, nil)
207
}
208
···
260
}
261
262
if err := s.db.Exec(ctx, "UPDATE oauth_tokens SET token = ?, refresh_token = ?, expires_at = ?, updated_at = ? WHERE refresh_token = ?", nil, accessString, nextRefreshToken, eat, now, *req.RefreshToken).Error; err != nil {
263
-
s.logger.Error("error updating token", "error", err)
264
return helpers.ServerError(e, nil)
265
}
266
···
39
40
func (s *Server) handleOauthToken(e echo.Context) error {
41
ctx := e.Request().Context()
42
+
logger := s.logger.With("name", "handleOauthToken")
43
44
var req OauthTokenRequest
45
if err := e.Bind(&req); err != nil {
46
+
logger.Error("error binding token request", "error", err)
47
return helpers.ServerError(e, nil)
48
}
49
···
59
"error": "use_dpop_nonce",
60
})
61
}
62
+
logger.Error("error getting dpop proof", "error", err)
63
return helpers.InputError(e, nil)
64
}
65
···
67
AllowMissingDpopProof: true,
68
})
69
if err != nil {
70
+
logger.Error("error authenticating client", "client_id", req.ClientID, "error", err)
71
return helpers.InputError(e, to.StringPtr(err.Error()))
72
}
73
···
88
var authReq provider.OauthAuthorizationRequest
89
// get the lil guy and delete him
90
if err := s.db.Raw(ctx, "DELETE FROM oauth_authorization_requests WHERE code = ? RETURNING *", nil, *req.Code).Scan(&authReq).Error; err != nil {
91
+
logger.Error("error finding authorization request", "error", err)
92
return helpers.ServerError(e, nil)
93
}
94
···
113
case "S256":
114
inputChal, err := base64.RawURLEncoding.DecodeString(*authReq.Parameters.CodeChallenge)
115
if err != nil {
116
+
logger.Error("error decoding code challenge", "error", err)
117
return helpers.ServerError(e, nil)
118
}
119
···
174
RefreshToken: refreshToken,
175
Ip: authReq.Ip,
176
}, nil).Error; err != nil {
177
+
logger.Error("error creating token in db", "error", err)
178
return helpers.ServerError(e, nil)
179
}
180
···
203
204
var oauthToken provider.OauthToken
205
if err := s.db.Raw(ctx, "SELECT * FROM oauth_tokens WHERE refresh_token = ?", nil, req.RefreshToken).Scan(&oauthToken).Error; err != nil {
206
+
logger.Error("error finding oauth token by refresh token", "error", err, "refresh_token", req.RefreshToken)
207
return helpers.ServerError(e, nil)
208
}
209
···
261
}
262
263
if err := s.db.Exec(ctx, "UPDATE oauth_tokens SET token = ?, refresh_token = ?, expires_at = ?, updated_at = ? WHERE refresh_token = ?", nil, accessString, nextRefreshToken, eat, now, *req.RefreshToken).Error; err != nil {
264
+
logger.Error("error updating token", "error", err)
265
return helpers.ServerError(e, nil)
266
}
267
+6
-6
server/handle_proxy.go
+6
-6
server/handle_proxy.go
···
47
}
48
49
func (s *Server) handleProxy(e echo.Context) error {
50
-
lgr := s.logger.With("handler", "handleProxy")
51
52
repo, isAuthed := e.Get("repo").(*models.RepoActor)
53
···
58
59
endpoint, svcDid, err := s.getAtprotoProxyEndpointFromRequest(e)
60
if err != nil {
61
-
lgr.Error("could not get atproto proxy", "error", err)
62
return helpers.ServerError(e, nil)
63
}
64
···
90
}
91
hj, err := json.Marshal(header)
92
if err != nil {
93
-
lgr.Error("error marshaling header", "error", err)
94
return helpers.ServerError(e, nil)
95
}
96
···
118
}
119
pj, err := json.Marshal(payload)
120
if err != nil {
121
-
lgr.Error("error marashaling payload", "error", err)
122
return helpers.ServerError(e, nil)
123
}
124
···
129
130
sk, err := secp256k1secec.NewPrivateKey(repo.SigningKey)
131
if err != nil {
132
-
lgr.Error("can't load private key", "error", err)
133
return err
134
}
135
136
R, S, _, err := sk.SignRaw(rand.Reader, hash[:])
137
if err != nil {
138
-
lgr.Error("error signing", "error", err)
139
}
140
141
rBytes := R.Bytes()
···
47
}
48
49
func (s *Server) handleProxy(e echo.Context) error {
50
+
logger := s.logger.With("handler", "handleProxy")
51
52
repo, isAuthed := e.Get("repo").(*models.RepoActor)
53
···
58
59
endpoint, svcDid, err := s.getAtprotoProxyEndpointFromRequest(e)
60
if err != nil {
61
+
logger.Error("could not get atproto proxy", "error", err)
62
return helpers.ServerError(e, nil)
63
}
64
···
90
}
91
hj, err := json.Marshal(header)
92
if err != nil {
93
+
logger.Error("error marshaling header", "error", err)
94
return helpers.ServerError(e, nil)
95
}
96
···
118
}
119
pj, err := json.Marshal(payload)
120
if err != nil {
121
+
logger.Error("error marashaling payload", "error", err)
122
return helpers.ServerError(e, nil)
123
}
124
···
129
130
sk, err := secp256k1secec.NewPrivateKey(repo.SigningKey)
131
if err != nil {
132
+
logger.Error("can't load private key", "error", err)
133
return err
134
}
135
136
R, S, _, err := sk.SignRaw(rand.Reader, hash[:])
137
if err != nil {
138
+
logger.Error("error signing", "error", err)
139
}
140
141
rBytes := R.Bytes()
+5
-4
server/handle_repo_apply_writes.go
+5
-4
server/handle_repo_apply_writes.go
···
27
28
func (s *Server) handleApplyWrites(e echo.Context) error {
29
ctx := e.Request().Context()
30
31
var req ComAtprotoRepoApplyWritesInput
32
if err := e.Bind(&req); err != nil {
33
-
s.logger.Error("error binding", "error", err)
34
return helpers.ServerError(e, nil)
35
}
36
37
if err := e.Validate(req); err != nil {
38
-
s.logger.Error("error validating", "error", err)
39
return helpers.InputError(e, nil)
40
}
41
42
repo := e.Get("repo").(*models.RepoActor)
43
44
if repo.Repo.Did != req.Repo {
45
-
s.logger.Warn("mismatched repo/auth")
46
return helpers.InputError(e, nil)
47
}
48
···
58
59
results, err := s.repoman.applyWrites(ctx, repo.Repo, ops, req.SwapCommit)
60
if err != nil {
61
-
s.logger.Error("error applying writes", "error", err)
62
return helpers.ServerError(e, nil)
63
}
64
···
27
28
func (s *Server) handleApplyWrites(e echo.Context) error {
29
ctx := e.Request().Context()
30
+
logger := s.logger.With("name", "handleRepoApplyWrites")
31
32
var req ComAtprotoRepoApplyWritesInput
33
if err := e.Bind(&req); err != nil {
34
+
logger.Error("error binding", "error", err)
35
return helpers.ServerError(e, nil)
36
}
37
38
if err := e.Validate(req); err != nil {
39
+
logger.Error("error validating", "error", err)
40
return helpers.InputError(e, nil)
41
}
42
43
repo := e.Get("repo").(*models.RepoActor)
44
45
if repo.Repo.Did != req.Repo {
46
+
logger.Warn("mismatched repo/auth")
47
return helpers.InputError(e, nil)
48
}
49
···
59
60
results, err := s.repoman.applyWrites(ctx, repo.Repo, ops, req.SwapCommit)
61
if err != nil {
62
+
logger.Error("error applying writes", "error", err)
63
return helpers.ServerError(e, nil)
64
}
65
+5
-4
server/handle_repo_create_record.go
+5
-4
server/handle_repo_create_record.go
···
18
19
func (s *Server) handleCreateRecord(e echo.Context) error {
20
ctx := e.Request().Context()
21
22
repo := e.Get("repo").(*models.RepoActor)
23
24
var req ComAtprotoRepoCreateRecordInput
25
if err := e.Bind(&req); err != nil {
26
-
s.logger.Error("error binding", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
30
if err := e.Validate(req); err != nil {
31
-
s.logger.Error("error validating", "error", err)
32
return helpers.InputError(e, nil)
33
}
34
35
if repo.Repo.Did != req.Repo {
36
-
s.logger.Warn("mismatched repo/auth")
37
return helpers.InputError(e, nil)
38
}
39
···
53
},
54
}, req.SwapCommit)
55
if err != nil {
56
-
s.logger.Error("error applying writes", "error", err)
57
return helpers.ServerError(e, nil)
58
}
59
···
18
19
func (s *Server) handleCreateRecord(e echo.Context) error {
20
ctx := e.Request().Context()
21
+
logger := s.logger.With("name", "handleCreateRecord")
22
23
repo := e.Get("repo").(*models.RepoActor)
24
25
var req ComAtprotoRepoCreateRecordInput
26
if err := e.Bind(&req); err != nil {
27
+
logger.Error("error binding", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
31
if err := e.Validate(req); err != nil {
32
+
logger.Error("error validating", "error", err)
33
return helpers.InputError(e, nil)
34
}
35
36
if repo.Repo.Did != req.Repo {
37
+
logger.Warn("mismatched repo/auth")
38
return helpers.InputError(e, nil)
39
}
40
···
54
},
55
}, req.SwapCommit)
56
if err != nil {
57
+
logger.Error("error applying writes", "error", err)
58
return helpers.ServerError(e, nil)
59
}
60
+5
-4
server/handle_repo_delete_record.go
+5
-4
server/handle_repo_delete_record.go
···
16
17
func (s *Server) handleDeleteRecord(e echo.Context) error {
18
ctx := e.Request().Context()
19
20
repo := e.Get("repo").(*models.RepoActor)
21
22
var req ComAtprotoRepoDeleteRecordInput
23
if err := e.Bind(&req); err != nil {
24
-
s.logger.Error("error binding", "error", err)
25
return helpers.ServerError(e, nil)
26
}
27
28
if err := e.Validate(req); err != nil {
29
-
s.logger.Error("error validating", "error", err)
30
return helpers.InputError(e, nil)
31
}
32
33
if repo.Repo.Did != req.Repo {
34
-
s.logger.Warn("mismatched repo/auth")
35
return helpers.InputError(e, nil)
36
}
37
···
44
},
45
}, req.SwapCommit)
46
if err != nil {
47
-
s.logger.Error("error applying writes", "error", err)
48
return helpers.ServerError(e, nil)
49
}
50
···
16
17
func (s *Server) handleDeleteRecord(e echo.Context) error {
18
ctx := e.Request().Context()
19
+
logger := s.logger.With("name", "handleDeleteRecord")
20
21
repo := e.Get("repo").(*models.RepoActor)
22
23
var req ComAtprotoRepoDeleteRecordInput
24
if err := e.Bind(&req); err != nil {
25
+
logger.Error("error binding", "error", err)
26
return helpers.ServerError(e, nil)
27
}
28
29
if err := e.Validate(req); err != nil {
30
+
logger.Error("error validating", "error", err)
31
return helpers.InputError(e, nil)
32
}
33
34
if repo.Repo.Did != req.Repo {
35
+
logger.Warn("mismatched repo/auth")
36
return helpers.InputError(e, nil)
37
}
38
···
45
},
46
}, req.SwapCommit)
47
if err != nil {
48
+
logger.Error("error applying writes", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
+4
-3
server/handle_repo_describe_repo.go
+4
-3
server/handle_repo_describe_repo.go
···
21
22
func (s *Server) handleDescribeRepo(e echo.Context) error {
23
ctx := e.Request().Context()
24
25
did := e.QueryParam("repo")
26
repo, err := s.getRepoActorByDid(ctx, did)
···
29
return helpers.InputError(e, to.StringPtr("RepoNotFound"))
30
}
31
32
-
s.logger.Error("error looking up repo", "error", err)
33
return helpers.ServerError(e, nil)
34
}
35
···
37
38
diddoc, err := s.passport.FetchDoc(e.Request().Context(), repo.Repo.Did)
39
if err != nil {
40
-
s.logger.Error("error fetching diddoc", "error", err)
41
return helpers.ServerError(e, nil)
42
}
43
···
67
68
var records []models.Record
69
if err := s.db.Raw(ctx, "SELECT DISTINCT(nsid) FROM records WHERE did = ?", nil, repo.Repo.Did).Scan(&records).Error; err != nil {
70
-
s.logger.Error("error getting collections", "error", err)
71
return helpers.ServerError(e, nil)
72
}
73
···
21
22
func (s *Server) handleDescribeRepo(e echo.Context) error {
23
ctx := e.Request().Context()
24
+
logger := s.logger.With("name", "handleDescribeRepo")
25
26
did := e.QueryParam("repo")
27
repo, err := s.getRepoActorByDid(ctx, did)
···
30
return helpers.InputError(e, to.StringPtr("RepoNotFound"))
31
}
32
33
+
logger.Error("error looking up repo", "error", err)
34
return helpers.ServerError(e, nil)
35
}
36
···
38
39
diddoc, err := s.passport.FetchDoc(e.Request().Context(), repo.Repo.Did)
40
if err != nil {
41
+
logger.Error("error fetching diddoc", "error", err)
42
return helpers.ServerError(e, nil)
43
}
44
···
68
69
var records []models.Record
70
if err := s.db.Raw(ctx, "SELECT DISTINCT(nsid) FROM records WHERE did = ?", nil, repo.Repo.Did).Scan(&records).Error; err != nil {
71
+
logger.Error("error getting collections", "error", err)
72
return helpers.ServerError(e, nil)
73
}
74
+2
-1
server/handle_repo_list_missing_blobs.go
+2
-1
server/handle_repo_list_missing_blobs.go
···
23
24
func (s *Server) handleListMissingBlobs(e echo.Context) error {
25
ctx := e.Request().Context()
26
27
urepo := e.Get("repo").(*models.RepoActor)
28
···
38
39
var records []models.Record
40
if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil {
41
-
s.logger.Error("failed to get records for listMissingBlobs", "error", err)
42
return helpers.ServerError(e, nil)
43
}
44
···
23
24
func (s *Server) handleListMissingBlobs(e echo.Context) error {
25
ctx := e.Request().Context()
26
+
logger := s.logger.With("name", "handleListMissingBlos")
27
28
urepo := e.Get("repo").(*models.RepoActor)
29
···
39
40
var records []models.Record
41
if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil {
42
+
logger.Error("failed to get records for listMissingBlobs", "error", err)
43
return helpers.ServerError(e, nil)
44
}
45
+3
-2
server/handle_repo_list_records.go
+3
-2
server/handle_repo_list_records.go
···
47
48
func (s *Server) handleListRecords(e echo.Context) error {
49
ctx := e.Request().Context()
50
51
var req ComAtprotoRepoListRecordsRequest
52
if err := e.Bind(&req); err != nil {
53
-
s.logger.Error("could not bind list records request", "error", err)
54
return helpers.ServerError(e, nil)
55
}
56
···
96
97
var records []models.Record
98
if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ? AND nsid = ? "+cursorquery+" ORDER BY created_at "+sort+" limit ?", nil, params...).Scan(&records).Error; err != nil {
99
-
s.logger.Error("error getting records", "error", err)
100
return helpers.ServerError(e, nil)
101
}
102
···
47
48
func (s *Server) handleListRecords(e echo.Context) error {
49
ctx := e.Request().Context()
50
+
logger := s.logger.With("name", "handleListRecords")
51
52
var req ComAtprotoRepoListRecordsRequest
53
if err := e.Bind(&req); err != nil {
54
+
logger.Error("could not bind list records request", "error", err)
55
return helpers.ServerError(e, nil)
56
}
57
···
97
98
var records []models.Record
99
if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ? AND nsid = ? "+cursorquery+" ORDER BY created_at "+sort+" limit ?", nil, params...).Scan(&records).Error; err != nil {
100
+
logger.Error("error getting records", "error", err)
101
return helpers.ServerError(e, nil)
102
}
103
+5
-4
server/handle_repo_put_record.go
+5
-4
server/handle_repo_put_record.go
···
18
19
func (s *Server) handlePutRecord(e echo.Context) error {
20
ctx := e.Request().Context()
21
22
repo := e.Get("repo").(*models.RepoActor)
23
24
var req ComAtprotoRepoPutRecordInput
25
if err := e.Bind(&req); err != nil {
26
-
s.logger.Error("error binding", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
30
if err := e.Validate(req); err != nil {
31
-
s.logger.Error("error validating", "error", err)
32
return helpers.InputError(e, nil)
33
}
34
35
if repo.Repo.Did != req.Repo {
36
-
s.logger.Warn("mismatched repo/auth")
37
return helpers.InputError(e, nil)
38
}
39
···
53
},
54
}, req.SwapCommit)
55
if err != nil {
56
-
s.logger.Error("error applying writes", "error", err)
57
return helpers.ServerError(e, nil)
58
}
59
···
18
19
func (s *Server) handlePutRecord(e echo.Context) error {
20
ctx := e.Request().Context()
21
+
logger := s.logger.With("name", "handlePutRecord")
22
23
repo := e.Get("repo").(*models.RepoActor)
24
25
var req ComAtprotoRepoPutRecordInput
26
if err := e.Bind(&req); err != nil {
27
+
logger.Error("error binding", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
31
if err := e.Validate(req); err != nil {
32
+
logger.Error("error validating", "error", err)
33
return helpers.InputError(e, nil)
34
}
35
36
if repo.Repo.Did != req.Repo {
37
+
logger.Warn("mismatched repo/auth")
38
return helpers.InputError(e, nil)
39
}
40
···
54
},
55
}, req.SwapCommit)
56
if err != nil {
57
+
logger.Error("error applying writes", "error", err)
58
return helpers.ServerError(e, nil)
59
}
60
+8
-7
server/handle_repo_upload_blob.go
+8
-7
server/handle_repo_upload_blob.go
···
33
34
func (s *Server) handleRepoUploadBlob(e echo.Context) error {
35
ctx := e.Request().Context()
36
37
urepo := e.Get("repo").(*models.RepoActor)
38
···
54
}
55
56
if err := s.db.Create(ctx, &blob, nil).Error; err != nil {
57
-
s.logger.Error("error creating new blob in db", "error", err)
58
return helpers.ServerError(e, nil)
59
}
60
···
71
break
72
}
73
} else if err != nil && err != io.ErrUnexpectedEOF {
74
-
s.logger.Error("error reading blob", "error", err)
75
return helpers.ServerError(e, nil)
76
}
77
···
87
}
88
89
if err := s.db.Create(ctx, &blobPart, nil).Error; err != nil {
90
-
s.logger.Error("error adding blob part to db", "error", err)
91
return helpers.ServerError(e, nil)
92
}
93
}
···
100
101
c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes())
102
if err != nil {
103
-
s.logger.Error("error creating cid prefix", "error", err)
104
return helpers.ServerError(e, nil)
105
}
106
···
117
118
sess, err := session.NewSession(config)
119
if err != nil {
120
-
s.logger.Error("error creating aws session", "error", err)
121
return helpers.ServerError(e, nil)
122
}
123
···
128
Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
129
Body: bytes.NewReader(fulldata.Bytes()),
130
}); err != nil {
131
-
s.logger.Error("error uploading blob to s3", "error", err)
132
return helpers.ServerError(e, nil)
133
}
134
}
135
136
if err := s.db.Exec(ctx, "UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
137
// there should probably be somme handling here if this fails...
138
-
s.logger.Error("error updating blob", "error", err)
139
return helpers.ServerError(e, nil)
140
}
141
···
33
34
func (s *Server) handleRepoUploadBlob(e echo.Context) error {
35
ctx := e.Request().Context()
36
+
logger := s.logger.With("name", "handleRepoUploadBlob")
37
38
urepo := e.Get("repo").(*models.RepoActor)
39
···
55
}
56
57
if err := s.db.Create(ctx, &blob, nil).Error; err != nil {
58
+
logger.Error("error creating new blob in db", "error", err)
59
return helpers.ServerError(e, nil)
60
}
61
···
72
break
73
}
74
} else if err != nil && err != io.ErrUnexpectedEOF {
75
+
logger.Error("error reading blob", "error", err)
76
return helpers.ServerError(e, nil)
77
}
78
···
88
}
89
90
if err := s.db.Create(ctx, &blobPart, nil).Error; err != nil {
91
+
logger.Error("error adding blob part to db", "error", err)
92
return helpers.ServerError(e, nil)
93
}
94
}
···
101
102
c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes())
103
if err != nil {
104
+
logger.Error("error creating cid prefix", "error", err)
105
return helpers.ServerError(e, nil)
106
}
107
···
118
119
sess, err := session.NewSession(config)
120
if err != nil {
121
+
logger.Error("error creating aws session", "error", err)
122
return helpers.ServerError(e, nil)
123
}
124
···
129
Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
130
Body: bytes.NewReader(fulldata.Bytes()),
131
}); err != nil {
132
+
logger.Error("error uploading blob to s3", "error", err)
133
return helpers.ServerError(e, nil)
134
}
135
}
136
137
if err := s.db.Exec(ctx, "UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
138
// there should probably be somme handling here if this fails...
139
+
logger.Error("error updating blob", "error", err)
140
return helpers.ServerError(e, nil)
141
}
142
+3
-2
server/handle_server_activate_account.go
+3
-2
server/handle_server_activate_account.go
···
19
20
func (s *Server) handleServerActivateAccount(e echo.Context) error {
21
ctx := e.Request().Context()
22
23
var req ComAtprotoServerDeactivateAccountRequest
24
if err := e.Bind(&req); err != nil {
25
-
s.logger.Error("error binding", "error", err)
26
return helpers.ServerError(e, nil)
27
}
28
29
urepo := e.Get("repo").(*models.RepoActor)
30
31
if err := s.db.Exec(ctx, "UPDATE repos SET deactivated = ? WHERE did = ?", nil, false, urepo.Repo.Did).Error; err != nil {
32
-
s.logger.Error("error updating account status to deactivated", "error", err)
33
return helpers.ServerError(e, nil)
34
}
35
···
19
20
func (s *Server) handleServerActivateAccount(e echo.Context) error {
21
ctx := e.Request().Context()
22
+
logger := s.logger.With("name", "handleServerActivateAccount")
23
24
var req ComAtprotoServerDeactivateAccountRequest
25
if err := e.Bind(&req); err != nil {
26
+
logger.Error("error binding", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
30
urepo := e.Get("repo").(*models.RepoActor)
31
32
if err := s.db.Exec(ctx, "UPDATE repos SET deactivated = ? WHERE did = ?", nil, false, urepo.Repo.Did).Error; err != nil {
33
+
logger.Error("error updating account status to deactivated", "error", err)
34
return helpers.ServerError(e, nil)
35
}
36
+5
-4
server/handle_server_check_account_status.go
+5
-4
server/handle_server_check_account_status.go
···
21
22
func (s *Server) handleServerCheckAccountStatus(e echo.Context) error {
23
ctx := e.Request().Context()
24
25
urepo := e.Get("repo").(*models.RepoActor)
26
···
33
34
rootcid, err := cid.Cast(urepo.Root)
35
if err != nil {
36
-
s.logger.Error("error casting cid", "error", err)
37
return helpers.ServerError(e, nil)
38
}
39
resp.RepoCommit = rootcid.String()
···
44
45
var blockCtResp CountResp
46
if err := s.db.Raw(ctx, "SELECT COUNT(*) AS ct FROM blocks WHERE did = ?", nil, urepo.Repo.Did).Scan(&blockCtResp).Error; err != nil {
47
-
s.logger.Error("error getting block count", "error", err)
48
return helpers.ServerError(e, nil)
49
}
50
resp.RepoBlocks = blockCtResp.Ct
51
52
var recCtResp CountResp
53
if err := s.db.Raw(ctx, "SELECT COUNT(*) AS ct FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&recCtResp).Error; err != nil {
54
-
s.logger.Error("error getting record count", "error", err)
55
return helpers.ServerError(e, nil)
56
}
57
resp.IndexedRecords = recCtResp.Ct
58
59
var blobCtResp CountResp
60
if err := s.db.Raw(ctx, "SELECT COUNT(*) AS ct FROM blobs WHERE did = ?", nil, urepo.Repo.Did).Scan(&blobCtResp).Error; err != nil {
61
-
s.logger.Error("error getting record count", "error", err)
62
return helpers.ServerError(e, nil)
63
}
64
resp.ExpectedBlobs = blobCtResp.Ct
···
21
22
func (s *Server) handleServerCheckAccountStatus(e echo.Context) error {
23
ctx := e.Request().Context()
24
+
logger := s.logger.With("name", "handleServerCheckAccountStatus")
25
26
urepo := e.Get("repo").(*models.RepoActor)
27
···
34
35
rootcid, err := cid.Cast(urepo.Root)
36
if err != nil {
37
+
logger.Error("error casting cid", "error", err)
38
return helpers.ServerError(e, nil)
39
}
40
resp.RepoCommit = rootcid.String()
···
45
46
var blockCtResp CountResp
47
if err := s.db.Raw(ctx, "SELECT COUNT(*) AS ct FROM blocks WHERE did = ?", nil, urepo.Repo.Did).Scan(&blockCtResp).Error; err != nil {
48
+
logger.Error("error getting block count", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
resp.RepoBlocks = blockCtResp.Ct
52
53
var recCtResp CountResp
54
if err := s.db.Raw(ctx, "SELECT COUNT(*) AS ct FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&recCtResp).Error; err != nil {
55
+
logger.Error("error getting record count", "error", err)
56
return helpers.ServerError(e, nil)
57
}
58
resp.IndexedRecords = recCtResp.Ct
59
60
var blobCtResp CountResp
61
if err := s.db.Raw(ctx, "SELECT COUNT(*) AS ct FROM blobs WHERE did = ?", nil, urepo.Repo.Did).Scan(&blobCtResp).Error; err != nil {
62
+
logger.Error("error getting record count", "error", err)
63
return helpers.ServerError(e, nil)
64
}
65
resp.ExpectedBlobs = blobCtResp.Ct
+3
-2
server/handle_server_confirm_email.go
+3
-2
server/handle_server_confirm_email.go
···
16
17
func (s *Server) handleServerConfirmEmail(e echo.Context) error {
18
ctx := e.Request().Context()
19
20
urepo := e.Get("repo").(*models.RepoActor)
21
22
var req ComAtprotoServerConfirmEmailRequest
23
if err := e.Bind(&req); err != nil {
24
-
s.logger.Error("error binding", "error", err)
25
return helpers.ServerError(e, nil)
26
}
27
···
44
now := time.Now().UTC()
45
46
if err := s.db.Exec(ctx, "UPDATE repos SET email_verification_code = NULL, email_verification_code_expires_at = NULL, email_confirmed_at = ? WHERE did = ?", nil, now, urepo.Repo.Did).Error; err != nil {
47
-
s.logger.Error("error updating user", "error", err)
48
return helpers.ServerError(e, nil)
49
}
50
···
16
17
func (s *Server) handleServerConfirmEmail(e echo.Context) error {
18
ctx := e.Request().Context()
19
+
logger := s.logger.With("name", "handleServerConfirmEmail")
20
21
urepo := e.Get("repo").(*models.RepoActor)
22
23
var req ComAtprotoServerConfirmEmailRequest
24
if err := e.Bind(&req); err != nil {
25
+
logger.Error("error binding", "error", err)
26
return helpers.ServerError(e, nil)
27
}
28
···
45
now := time.Now().UTC()
46
47
if err := s.db.Exec(ctx, "UPDATE repos SET email_verification_code = NULL, email_verification_code_expires_at = NULL, email_confirmed_at = ? WHERE did = ?", nil, now, urepo.Repo.Did).Error; err != nil {
48
+
logger.Error("error updating user", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
+23
-22
server/handle_server_create_account.go
+23
-22
server/handle_server_create_account.go
···
37
38
func (s *Server) handleCreateAccount(e echo.Context) error {
39
ctx := e.Request().Context()
40
41
var request ComAtprotoServerCreateAccountRequest
42
43
if err := e.Bind(&request); err != nil {
44
-
s.logger.Error("error receiving request", "endpoint", "com.atproto.server.createAccount", "error", err)
45
return helpers.ServerError(e, nil)
46
}
47
48
request.Handle = strings.ToLower(request.Handle)
49
50
if err := e.Validate(request); err != nil {
51
-
s.logger.Error("error validating request", "endpoint", "com.atproto.server.createAccount", "error", err)
52
53
var verr ValidationError
54
if errors.As(err, &verr) {
···
82
authDid, err := s.validateServiceAuth(e.Request().Context(), token, "com.atproto.server.createAccount")
83
84
if err != nil {
85
-
s.logger.Warn("error validating authorization token", "endpoint", "com.atproto.server.createAccount", "error", err)
86
return helpers.UnauthorizedError(e, to.StringPtr("invalid authorization token"))
87
}
88
···
94
// see if the handle is already taken
95
actor, err := s.getActorByHandle(ctx, request.Handle)
96
if err != nil && err != gorm.ErrRecordNotFound {
97
-
s.logger.Error("error looking up handle in db", "endpoint", "com.atproto.server.createAccount", "error", err)
98
return helpers.ServerError(e, nil)
99
}
100
if err == nil && actor.Did != signupDid {
···
115
if err == gorm.ErrRecordNotFound {
116
return helpers.InputError(e, to.StringPtr("InvalidInviteCode"))
117
}
118
-
s.logger.Error("error getting invite code from db", "error", err)
119
return helpers.ServerError(e, nil)
120
}
121
···
127
// see if the email is already taken
128
existingRepo, err := s.getRepoByEmail(ctx, request.Email)
129
if err != nil && err != gorm.ErrRecordNotFound {
130
-
s.logger.Error("error looking up email in db", "endpoint", "com.atproto.server.createAccount", "error", err)
131
return helpers.ServerError(e, nil)
132
}
133
if err == nil && existingRepo.Did != signupDid {
···
141
if signupDid != "" {
142
reservedKey, err := s.getReservedKey(ctx, signupDid)
143
if err != nil {
144
-
s.logger.Error("error looking up reserved key", "error", err)
145
}
146
if reservedKey != nil {
147
k, err = atcrypto.ParsePrivateBytesK256(reservedKey.PrivateKey)
148
if err != nil {
149
-
s.logger.Error("error parsing reserved key", "error", err)
150
k = nil
151
} else {
152
defer func() {
153
if delErr := s.deleteReservedKey(ctx, reservedKey.KeyDid, reservedKey.Did); delErr != nil {
154
-
s.logger.Error("error deleting reserved key", "error", delErr)
155
}
156
}()
157
}
···
161
if k == nil {
162
k, err = atcrypto.GeneratePrivateKeyK256()
163
if err != nil {
164
-
s.logger.Error("error creating signing key", "endpoint", "com.atproto.server.createAccount", "error", err)
165
return helpers.ServerError(e, nil)
166
}
167
}
···
169
if signupDid == "" {
170
did, op, err := s.plcClient.CreateDID(k, "", request.Handle)
171
if err != nil {
172
-
s.logger.Error("error creating operation", "endpoint", "com.atproto.server.createAccount", "error", err)
173
return helpers.ServerError(e, nil)
174
}
175
176
if err := s.plcClient.SendOperation(e.Request().Context(), did, op); err != nil {
177
-
s.logger.Error("error sending plc op", "endpoint", "com.atproto.server.createAccount", "error", err)
178
return helpers.ServerError(e, nil)
179
}
180
signupDid = did
···
182
183
hashed, err := bcrypt.GenerateFromPassword([]byte(request.Password), 10)
184
if err != nil {
185
-
s.logger.Error("error hashing password", "error", err)
186
return helpers.ServerError(e, nil)
187
}
188
···
202
}
203
204
if err := s.db.Create(ctx, &urepo, nil).Error; err != nil {
205
-
s.logger.Error("error inserting new repo", "error", err)
206
return helpers.ServerError(e, nil)
207
}
208
209
if err := s.db.Create(ctx, &actor, nil).Error; err != nil {
210
-
s.logger.Error("error inserting new actor", "error", err)
211
return helpers.ServerError(e, nil)
212
}
213
} else {
214
if err := s.db.Save(ctx, &actor, nil).Error; err != nil {
215
-
s.logger.Error("error inserting new actor", "error", err)
216
return helpers.ServerError(e, nil)
217
}
218
}
···
223
224
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
225
if err != nil {
226
-
s.logger.Error("error committing", "error", err)
227
return helpers.ServerError(e, nil)
228
}
229
230
if err := s.UpdateRepo(context.TODO(), urepo.Did, root, rev); err != nil {
231
-
s.logger.Error("error updating repo after commit", "error", err)
232
return helpers.ServerError(e, nil)
233
}
234
···
244
245
if s.config.RequireInvite {
246
if err := s.db.Raw(ctx, "UPDATE invite_codes SET remaining_use_count = remaining_use_count - 1 WHERE code = ?", nil, request.InviteCode).Scan(&ic).Error; err != nil {
247
-
s.logger.Error("error decrementing use count", "error", err)
248
return helpers.ServerError(e, nil)
249
}
250
}
251
252
sess, err := s.createSession(ctx, &urepo)
253
if err != nil {
254
-
s.logger.Error("error creating new session", "error", err)
255
return helpers.ServerError(e, nil)
256
}
257
258
go func() {
259
if err := s.sendEmailVerification(urepo.Email, actor.Handle, *urepo.EmailVerificationCode); err != nil {
260
-
s.logger.Error("error sending email verification email", "error", err)
261
}
262
if err := s.sendWelcomeMail(urepo.Email, actor.Handle); err != nil {
263
-
s.logger.Error("error sending welcome email", "error", err)
264
}
265
}()
266
···
37
38
func (s *Server) handleCreateAccount(e echo.Context) error {
39
ctx := e.Request().Context()
40
+
logger := s.logger.With("name", "handleServerCreateAccount")
41
42
var request ComAtprotoServerCreateAccountRequest
43
44
if err := e.Bind(&request); err != nil {
45
+
logger.Error("error receiving request", "endpoint", "com.atproto.server.createAccount", "error", err)
46
return helpers.ServerError(e, nil)
47
}
48
49
request.Handle = strings.ToLower(request.Handle)
50
51
if err := e.Validate(request); err != nil {
52
+
logger.Error("error validating request", "endpoint", "com.atproto.server.createAccount", "error", err)
53
54
var verr ValidationError
55
if errors.As(err, &verr) {
···
83
authDid, err := s.validateServiceAuth(e.Request().Context(), token, "com.atproto.server.createAccount")
84
85
if err != nil {
86
+
logger.Warn("error validating authorization token", "endpoint", "com.atproto.server.createAccount", "error", err)
87
return helpers.UnauthorizedError(e, to.StringPtr("invalid authorization token"))
88
}
89
···
95
// see if the handle is already taken
96
actor, err := s.getActorByHandle(ctx, request.Handle)
97
if err != nil && err != gorm.ErrRecordNotFound {
98
+
logger.Error("error looking up handle in db", "endpoint", "com.atproto.server.createAccount", "error", err)
99
return helpers.ServerError(e, nil)
100
}
101
if err == nil && actor.Did != signupDid {
···
116
if err == gorm.ErrRecordNotFound {
117
return helpers.InputError(e, to.StringPtr("InvalidInviteCode"))
118
}
119
+
logger.Error("error getting invite code from db", "error", err)
120
return helpers.ServerError(e, nil)
121
}
122
···
128
// see if the email is already taken
129
existingRepo, err := s.getRepoByEmail(ctx, request.Email)
130
if err != nil && err != gorm.ErrRecordNotFound {
131
+
logger.Error("error looking up email in db", "endpoint", "com.atproto.server.createAccount", "error", err)
132
return helpers.ServerError(e, nil)
133
}
134
if err == nil && existingRepo.Did != signupDid {
···
142
if signupDid != "" {
143
reservedKey, err := s.getReservedKey(ctx, signupDid)
144
if err != nil {
145
+
logger.Error("error looking up reserved key", "error", err)
146
}
147
if reservedKey != nil {
148
k, err = atcrypto.ParsePrivateBytesK256(reservedKey.PrivateKey)
149
if err != nil {
150
+
logger.Error("error parsing reserved key", "error", err)
151
k = nil
152
} else {
153
defer func() {
154
if delErr := s.deleteReservedKey(ctx, reservedKey.KeyDid, reservedKey.Did); delErr != nil {
155
+
logger.Error("error deleting reserved key", "error", delErr)
156
}
157
}()
158
}
···
162
if k == nil {
163
k, err = atcrypto.GeneratePrivateKeyK256()
164
if err != nil {
165
+
logger.Error("error creating signing key", "endpoint", "com.atproto.server.createAccount", "error", err)
166
return helpers.ServerError(e, nil)
167
}
168
}
···
170
if signupDid == "" {
171
did, op, err := s.plcClient.CreateDID(k, "", request.Handle)
172
if err != nil {
173
+
logger.Error("error creating operation", "endpoint", "com.atproto.server.createAccount", "error", err)
174
return helpers.ServerError(e, nil)
175
}
176
177
if err := s.plcClient.SendOperation(e.Request().Context(), did, op); err != nil {
178
+
logger.Error("error sending plc op", "endpoint", "com.atproto.server.createAccount", "error", err)
179
return helpers.ServerError(e, nil)
180
}
181
signupDid = did
···
183
184
hashed, err := bcrypt.GenerateFromPassword([]byte(request.Password), 10)
185
if err != nil {
186
+
logger.Error("error hashing password", "error", err)
187
return helpers.ServerError(e, nil)
188
}
189
···
203
}
204
205
if err := s.db.Create(ctx, &urepo, nil).Error; err != nil {
206
+
logger.Error("error inserting new repo", "error", err)
207
return helpers.ServerError(e, nil)
208
}
209
210
if err := s.db.Create(ctx, &actor, nil).Error; err != nil {
211
+
logger.Error("error inserting new actor", "error", err)
212
return helpers.ServerError(e, nil)
213
}
214
} else {
215
if err := s.db.Save(ctx, &actor, nil).Error; err != nil {
216
+
logger.Error("error inserting new actor", "error", err)
217
return helpers.ServerError(e, nil)
218
}
219
}
···
224
225
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
226
if err != nil {
227
+
logger.Error("error committing", "error", err)
228
return helpers.ServerError(e, nil)
229
}
230
231
if err := s.UpdateRepo(context.TODO(), urepo.Did, root, rev); err != nil {
232
+
logger.Error("error updating repo after commit", "error", err)
233
return helpers.ServerError(e, nil)
234
}
235
···
245
246
if s.config.RequireInvite {
247
if err := s.db.Raw(ctx, "UPDATE invite_codes SET remaining_use_count = remaining_use_count - 1 WHERE code = ?", nil, request.InviteCode).Scan(&ic).Error; err != nil {
248
+
logger.Error("error decrementing use count", "error", err)
249
return helpers.ServerError(e, nil)
250
}
251
}
252
253
sess, err := s.createSession(ctx, &urepo)
254
if err != nil {
255
+
logger.Error("error creating new session", "error", err)
256
return helpers.ServerError(e, nil)
257
}
258
259
go func() {
260
if err := s.sendEmailVerification(urepo.Email, actor.Handle, *urepo.EmailVerificationCode); err != nil {
261
+
logger.Error("error sending email verification email", "error", err)
262
}
263
if err := s.sendWelcomeMail(urepo.Email, actor.Handle); err != nil {
264
+
logger.Error("error sending welcome email", "error", err)
265
}
266
}()
267
+4
-3
server/handle_server_create_invite_code.go
+4
-3
server/handle_server_create_invite_code.go
···
18
19
func (s *Server) handleCreateInviteCode(e echo.Context) error {
20
ctx := e.Request().Context()
21
22
var req ComAtprotoServerCreateInviteCodeRequest
23
if err := e.Bind(&req); err != nil {
24
-
s.logger.Error("error binding", "error", err)
25
return helpers.ServerError(e, nil)
26
}
27
28
if err := e.Validate(req); err != nil {
29
-
s.logger.Error("error validating", "error", err)
30
return helpers.InputError(e, nil)
31
}
32
···
44
Did: acc,
45
RemainingUseCount: req.UseCount,
46
}, nil).Error; err != nil {
47
-
s.logger.Error("error creating invite code", "error", err)
48
return helpers.ServerError(e, nil)
49
}
50
···
18
19
func (s *Server) handleCreateInviteCode(e echo.Context) error {
20
ctx := e.Request().Context()
21
+
logger := s.logger.With("name", "handleServerCreateInviteCode")
22
23
var req ComAtprotoServerCreateInviteCodeRequest
24
if err := e.Bind(&req); err != nil {
25
+
logger.Error("error binding", "error", err)
26
return helpers.ServerError(e, nil)
27
}
28
29
if err := e.Validate(req); err != nil {
30
+
logger.Error("error validating", "error", err)
31
return helpers.InputError(e, nil)
32
}
33
···
45
Did: acc,
46
RemainingUseCount: req.UseCount,
47
}, nil).Error; err != nil {
48
+
logger.Error("error creating invite code", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
+4
-3
server/handle_server_create_invite_codes.go
+4
-3
server/handle_server_create_invite_codes.go
···
23
24
func (s *Server) handleCreateInviteCodes(e echo.Context) error {
25
ctx := e.Request().Context()
26
27
var req ComAtprotoServerCreateInviteCodesRequest
28
if err := e.Bind(&req); err != nil {
29
-
s.logger.Error("error binding", "error", err)
30
return helpers.ServerError(e, nil)
31
}
32
33
if err := e.Validate(req); err != nil {
34
-
s.logger.Error("error validating", "error", err)
35
return helpers.InputError(e, nil)
36
}
37
···
57
Did: did,
58
RemainingUseCount: req.UseCount,
59
}, nil).Error; err != nil {
60
-
s.logger.Error("error creating invite code", "error", err)
61
return helpers.ServerError(e, nil)
62
}
63
}
···
23
24
func (s *Server) handleCreateInviteCodes(e echo.Context) error {
25
ctx := e.Request().Context()
26
+
logger := s.logger.With("name", "handleServerCreateInviteCodes")
27
28
var req ComAtprotoServerCreateInviteCodesRequest
29
if err := e.Bind(&req); err != nil {
30
+
logger.Error("error binding", "error", err)
31
return helpers.ServerError(e, nil)
32
}
33
34
if err := e.Validate(req); err != nil {
35
+
logger.Error("error validating", "error", err)
36
return helpers.InputError(e, nil)
37
}
38
···
58
Did: did,
59
RemainingUseCount: req.UseCount,
60
}, nil).Error; err != nil {
61
+
logger.Error("error creating invite code", "error", err)
62
return helpers.ServerError(e, nil)
63
}
64
}
+5
-4
server/handle_server_create_session.go
+5
-4
server/handle_server_create_session.go
···
33
34
func (s *Server) handleCreateSession(e echo.Context) error {
35
ctx := e.Request().Context()
36
37
var req ComAtprotoServerCreateSessionRequest
38
if err := e.Bind(&req); err != nil {
39
-
s.logger.Error("error binding request", "endpoint", "com.atproto.server.serverCreateSession", "error", err)
40
return helpers.ServerError(e, nil)
41
}
42
···
79
return helpers.InputError(e, to.StringPtr("InvalidRequest"))
80
}
81
82
-
s.logger.Error("erorr looking up repo", "endpoint", "com.atproto.server.createSession", "error", err)
83
return helpers.ServerError(e, nil)
84
}
85
86
if err := bcrypt.CompareHashAndPassword([]byte(repo.Password), []byte(req.Password)); err != nil {
87
if err != bcrypt.ErrMismatchedHashAndPassword {
88
-
s.logger.Error("erorr comparing hash and password", "error", err)
89
}
90
return helpers.InputError(e, to.StringPtr("InvalidRequest"))
91
}
92
93
sess, err := s.createSession(ctx, &repo.Repo)
94
if err != nil {
95
-
s.logger.Error("error creating session", "error", err)
96
return helpers.ServerError(e, nil)
97
}
98
···
33
34
func (s *Server) handleCreateSession(e echo.Context) error {
35
ctx := e.Request().Context()
36
+
logger := s.logger.With("name", "handleServerCreateSession")
37
38
var req ComAtprotoServerCreateSessionRequest
39
if err := e.Bind(&req); err != nil {
40
+
logger.Error("error binding request", "endpoint", "com.atproto.server.serverCreateSession", "error", err)
41
return helpers.ServerError(e, nil)
42
}
43
···
80
return helpers.InputError(e, to.StringPtr("InvalidRequest"))
81
}
82
83
+
logger.Error("erorr looking up repo", "endpoint", "com.atproto.server.createSession", "error", err)
84
return helpers.ServerError(e, nil)
85
}
86
87
if err := bcrypt.CompareHashAndPassword([]byte(repo.Password), []byte(req.Password)); err != nil {
88
if err != bcrypt.ErrMismatchedHashAndPassword {
89
+
logger.Error("erorr comparing hash and password", "error", err)
90
}
91
return helpers.InputError(e, to.StringPtr("InvalidRequest"))
92
}
93
94
sess, err := s.createSession(ctx, &repo.Repo)
95
if err != nil {
96
+
logger.Error("error creating session", "error", err)
97
return helpers.ServerError(e, nil)
98
}
99
+3
-2
server/handle_server_deactivate_account.go
+3
-2
server/handle_server_deactivate_account.go
···
20
21
func (s *Server) handleServerDeactivateAccount(e echo.Context) error {
22
ctx := e.Request().Context()
23
24
var req ComAtprotoServerDeactivateAccountRequest
25
if err := e.Bind(&req); err != nil {
26
-
s.logger.Error("error binding", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
30
urepo := e.Get("repo").(*models.RepoActor)
31
32
if err := s.db.Exec(ctx, "UPDATE repos SET deactivated = ? WHERE did = ?", nil, true, urepo.Repo.Did).Error; err != nil {
33
-
s.logger.Error("error updating account status to deactivated", "error", err)
34
return helpers.ServerError(e, nil)
35
}
36
···
20
21
func (s *Server) handleServerDeactivateAccount(e echo.Context) error {
22
ctx := e.Request().Context()
23
+
logger := s.logger.With("name", "handleServerDeactivateAccount")
24
25
var req ComAtprotoServerDeactivateAccountRequest
26
if err := e.Bind(&req); err != nil {
27
+
logger.Error("error binding", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
31
urepo := e.Get("repo").(*models.RepoActor)
32
33
if err := s.db.Exec(ctx, "UPDATE repos SET deactivated = ? WHERE did = ?", nil, true, urepo.Repo.Did).Error; err != nil {
34
+
logger.Error("error updating account status to deactivated", "error", err)
35
return helpers.ServerError(e, nil)
36
}
37
+30
-27
server/handle_server_delete_account.go
+30
-27
server/handle_server_delete_account.go
···
21
22
func (s *Server) handleServerDeleteAccount(e echo.Context) error {
23
ctx := e.Request().Context()
24
25
var req ComAtprotoServerDeleteAccountRequest
26
if err := e.Bind(&req); err != nil {
27
-
s.logger.Error("error binding", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
31
if err := e.Validate(&req); err != nil {
32
-
s.logger.Error("error validating", "error", err)
33
return helpers.ServerError(e, nil)
34
}
35
36
urepo, err := s.getRepoActorByDid(ctx, req.Did)
37
if err != nil {
38
-
s.logger.Error("error getting repo", "error", err)
39
return echo.NewHTTPError(400, "account not found")
40
}
41
42
if err := bcrypt.CompareHashAndPassword([]byte(urepo.Repo.Password), []byte(req.Password)); err != nil {
43
-
s.logger.Error("password mismatch", "error", err)
44
return echo.NewHTTPError(401, "Invalid did or password")
45
}
46
47
if urepo.Repo.AccountDeleteCode == nil || urepo.Repo.AccountDeleteCodeExpiresAt == nil {
48
-
s.logger.Error("no deletion token found for account")
49
return echo.NewHTTPError(400, map[string]interface{}{
50
"error": "InvalidToken",
51
"message": "Token is invalid",
···
53
}
54
55
if *urepo.Repo.AccountDeleteCode != req.Token {
56
-
s.logger.Error("deletion token mismatch")
57
return echo.NewHTTPError(400, map[string]interface{}{
58
"error": "InvalidToken",
59
"message": "Token is invalid",
···
61
}
62
63
if time.Now().UTC().After(*urepo.Repo.AccountDeleteCodeExpiresAt) {
64
-
s.logger.Error("deletion token expired")
65
return echo.NewHTTPError(400, map[string]interface{}{
66
"error": "ExpiredToken",
67
"message": "Token is expired",
···
70
71
tx := s.db.BeginDangerously(ctx)
72
if tx.Error != nil {
73
-
s.logger.Error("error starting transaction", "error", tx.Error)
74
return helpers.ServerError(e, nil)
75
}
76
77
if err := tx.Exec("DELETE FROM blocks WHERE did = ?", nil, req.Did).Error; err != nil {
78
-
tx.Rollback()
79
-
s.logger.Error("error deleting blocks", "error", err)
80
return helpers.ServerError(e, nil)
81
}
82
83
if err := tx.Exec("DELETE FROM records WHERE did = ?", nil, req.Did).Error; err != nil {
84
-
tx.Rollback()
85
-
s.logger.Error("error deleting records", "error", err)
86
return helpers.ServerError(e, nil)
87
}
88
89
if err := tx.Exec("DELETE FROM blobs WHERE did = ?", nil, req.Did).Error; err != nil {
90
-
tx.Rollback()
91
-
s.logger.Error("error deleting blobs", "error", err)
92
return helpers.ServerError(e, nil)
93
}
94
95
if err := tx.Exec("DELETE FROM tokens WHERE did = ?", nil, req.Did).Error; err != nil {
96
-
tx.Rollback()
97
-
s.logger.Error("error deleting tokens", "error", err)
98
return helpers.ServerError(e, nil)
99
}
100
101
if err := tx.Exec("DELETE FROM refresh_tokens WHERE did = ?", nil, req.Did).Error; err != nil {
102
-
tx.Rollback()
103
-
s.logger.Error("error deleting refresh tokens", "error", err)
104
return helpers.ServerError(e, nil)
105
}
106
107
if err := tx.Exec("DELETE FROM reserved_keys WHERE did = ?", nil, req.Did).Error; err != nil {
108
-
tx.Rollback()
109
-
s.logger.Error("error deleting reserved keys", "error", err)
110
return helpers.ServerError(e, nil)
111
}
112
113
if err := tx.Exec("DELETE FROM invite_codes WHERE did = ?", nil, req.Did).Error; err != nil {
114
-
tx.Rollback()
115
-
s.logger.Error("error deleting invite codes", "error", err)
116
return helpers.ServerError(e, nil)
117
}
118
119
if err := tx.Exec("DELETE FROM actors WHERE did = ?", nil, req.Did).Error; err != nil {
120
-
tx.Rollback()
121
-
s.logger.Error("error deleting actor", "error", err)
122
return helpers.ServerError(e, nil)
123
}
124
125
if err := tx.Exec("DELETE FROM repos WHERE did = ?", nil, req.Did).Error; err != nil {
126
-
tx.Rollback()
127
-
s.logger.Error("error deleting repo", "error", err)
128
return helpers.ServerError(e, nil)
129
}
130
131
if err := tx.Commit().Error; err != nil {
132
-
s.logger.Error("error committing transaction", "error", err)
133
return helpers.ServerError(e, nil)
134
}
135
···
21
22
func (s *Server) handleServerDeleteAccount(e echo.Context) error {
23
ctx := e.Request().Context()
24
+
logger := s.logger.With("name", "handleServerDeleteAccount")
25
26
var req ComAtprotoServerDeleteAccountRequest
27
if err := e.Bind(&req); err != nil {
28
+
logger.Error("error binding", "error", err)
29
return helpers.ServerError(e, nil)
30
}
31
32
if err := e.Validate(&req); err != nil {
33
+
logger.Error("error validating", "error", err)
34
return helpers.ServerError(e, nil)
35
}
36
37
urepo, err := s.getRepoActorByDid(ctx, req.Did)
38
if err != nil {
39
+
logger.Error("error getting repo", "error", err)
40
return echo.NewHTTPError(400, "account not found")
41
}
42
43
if err := bcrypt.CompareHashAndPassword([]byte(urepo.Repo.Password), []byte(req.Password)); err != nil {
44
+
logger.Error("password mismatch", "error", err)
45
return echo.NewHTTPError(401, "Invalid did or password")
46
}
47
48
if urepo.Repo.AccountDeleteCode == nil || urepo.Repo.AccountDeleteCodeExpiresAt == nil {
49
+
logger.Error("no deletion token found for account")
50
return echo.NewHTTPError(400, map[string]interface{}{
51
"error": "InvalidToken",
52
"message": "Token is invalid",
···
54
}
55
56
if *urepo.Repo.AccountDeleteCode != req.Token {
57
+
logger.Error("deletion token mismatch")
58
return echo.NewHTTPError(400, map[string]interface{}{
59
"error": "InvalidToken",
60
"message": "Token is invalid",
···
62
}
63
64
if time.Now().UTC().After(*urepo.Repo.AccountDeleteCodeExpiresAt) {
65
+
logger.Error("deletion token expired")
66
return echo.NewHTTPError(400, map[string]interface{}{
67
"error": "ExpiredToken",
68
"message": "Token is expired",
···
71
72
tx := s.db.BeginDangerously(ctx)
73
if tx.Error != nil {
74
+
logger.Error("error starting transaction", "error", tx.Error)
75
return helpers.ServerError(e, nil)
76
}
77
78
+
status := "error"
79
+
func() {
80
+
if status == "error" {
81
+
if err := tx.Rollback().Error; err != nil {
82
+
logger.Error("error rolling back after delete failure", "err", err)
83
+
}
84
+
}
85
+
}()
86
+
87
if err := tx.Exec("DELETE FROM blocks WHERE did = ?", nil, req.Did).Error; err != nil {
88
+
logger.Error("error deleting blocks", "error", err)
89
return helpers.ServerError(e, nil)
90
}
91
92
if err := tx.Exec("DELETE FROM records WHERE did = ?", nil, req.Did).Error; err != nil {
93
+
logger.Error("error deleting records", "error", err)
94
return helpers.ServerError(e, nil)
95
}
96
97
if err := tx.Exec("DELETE FROM blobs WHERE did = ?", nil, req.Did).Error; err != nil {
98
+
logger.Error("error deleting blobs", "error", err)
99
return helpers.ServerError(e, nil)
100
}
101
102
if err := tx.Exec("DELETE FROM tokens WHERE did = ?", nil, req.Did).Error; err != nil {
103
+
logger.Error("error deleting tokens", "error", err)
104
return helpers.ServerError(e, nil)
105
}
106
107
if err := tx.Exec("DELETE FROM refresh_tokens WHERE did = ?", nil, req.Did).Error; err != nil {
108
+
logger.Error("error deleting refresh tokens", "error", err)
109
return helpers.ServerError(e, nil)
110
}
111
112
if err := tx.Exec("DELETE FROM reserved_keys WHERE did = ?", nil, req.Did).Error; err != nil {
113
+
logger.Error("error deleting reserved keys", "error", err)
114
return helpers.ServerError(e, nil)
115
}
116
117
if err := tx.Exec("DELETE FROM invite_codes WHERE did = ?", nil, req.Did).Error; err != nil {
118
+
logger.Error("error deleting invite codes", "error", err)
119
return helpers.ServerError(e, nil)
120
}
121
122
if err := tx.Exec("DELETE FROM actors WHERE did = ?", nil, req.Did).Error; err != nil {
123
+
logger.Error("error deleting actor", "error", err)
124
return helpers.ServerError(e, nil)
125
}
126
127
if err := tx.Exec("DELETE FROM repos WHERE did = ?", nil, req.Did).Error; err != nil {
128
+
logger.Error("error deleting repo", "error", err)
129
return helpers.ServerError(e, nil)
130
}
131
132
+
status = "ok"
133
+
134
if err := tx.Commit().Error; err != nil {
135
+
logger.Error("error committing transaction", "error", err)
136
return helpers.ServerError(e, nil)
137
}
138
+7
-5
server/handle_server_get_service_auth.go
+7
-5
server/handle_server_get_service_auth.go
···
25
}
26
27
func (s *Server) handleServerGetServiceAuth(e echo.Context) error {
28
var req ServerGetServiceAuthRequest
29
if err := e.Bind(&req); err != nil {
30
-
s.logger.Error("could not bind service auth request", "error", err)
31
return helpers.ServerError(e, nil)
32
}
33
···
64
}
65
hj, err := json.Marshal(header)
66
if err != nil {
67
-
s.logger.Error("error marshaling header", "error", err)
68
return helpers.ServerError(e, nil)
69
}
70
···
82
}
83
pj, err := json.Marshal(payload)
84
if err != nil {
85
-
s.logger.Error("error marashaling payload", "error", err)
86
return helpers.ServerError(e, nil)
87
}
88
···
93
94
sk, err := secp256k1secec.NewPrivateKey(repo.SigningKey)
95
if err != nil {
96
-
s.logger.Error("can't load private key", "error", err)
97
return err
98
}
99
100
R, S, _, err := sk.SignRaw(rand.Reader, hash[:])
101
if err != nil {
102
-
s.logger.Error("error signing", "error", err)
103
return helpers.ServerError(e, nil)
104
}
105
···
25
}
26
27
func (s *Server) handleServerGetServiceAuth(e echo.Context) error {
28
+
logger := s.logger.With("name", "handleServerGetServiceAuth")
29
+
30
var req ServerGetServiceAuthRequest
31
if err := e.Bind(&req); err != nil {
32
+
logger.Error("could not bind service auth request", "error", err)
33
return helpers.ServerError(e, nil)
34
}
35
···
66
}
67
hj, err := json.Marshal(header)
68
if err != nil {
69
+
logger.Error("error marshaling header", "error", err)
70
return helpers.ServerError(e, nil)
71
}
72
···
84
}
85
pj, err := json.Marshal(payload)
86
if err != nil {
87
+
logger.Error("error marashaling payload", "error", err)
88
return helpers.ServerError(e, nil)
89
}
90
···
95
96
sk, err := secp256k1secec.NewPrivateKey(repo.SigningKey)
97
if err != nil {
98
+
logger.Error("can't load private key", "error", err)
99
return err
100
}
101
102
R, S, _, err := sk.SignRaw(rand.Reader, hash[:])
103
if err != nil {
104
+
logger.Error("error signing", "error", err)
105
return helpers.ServerError(e, nil)
106
}
107
+4
-3
server/handle_server_refresh_session.go
+4
-3
server/handle_server_refresh_session.go
···
17
18
func (s *Server) handleRefreshSession(e echo.Context) error {
19
ctx := e.Request().Context()
20
21
token := e.Get("token").(string)
22
repo := e.Get("repo").(*models.RepoActor)
23
24
if err := s.db.Exec(ctx, "DELETE FROM refresh_tokens WHERE token = ?", nil, token).Error; err != nil {
25
-
s.logger.Error("error getting refresh token from db", "error", err)
26
return helpers.ServerError(e, nil)
27
}
28
29
if err := s.db.Exec(ctx, "DELETE FROM tokens WHERE refresh_token = ?", nil, token).Error; err != nil {
30
-
s.logger.Error("error deleting access token from db", "error", err)
31
return helpers.ServerError(e, nil)
32
}
33
34
sess, err := s.createSession(ctx, &repo.Repo)
35
if err != nil {
36
-
s.logger.Error("error creating new session for refresh", "error", err)
37
return helpers.ServerError(e, nil)
38
}
39
···
17
18
func (s *Server) handleRefreshSession(e echo.Context) error {
19
ctx := e.Request().Context()
20
+
logger := s.logger.With("name", "handleServerRefreshSession")
21
22
token := e.Get("token").(string)
23
repo := e.Get("repo").(*models.RepoActor)
24
25
if err := s.db.Exec(ctx, "DELETE FROM refresh_tokens WHERE token = ?", nil, token).Error; err != nil {
26
+
logger.Error("error getting refresh token from db", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
30
if err := s.db.Exec(ctx, "DELETE FROM tokens WHERE refresh_token = ?", nil, token).Error; err != nil {
31
+
logger.Error("error deleting access token from db", "error", err)
32
return helpers.ServerError(e, nil)
33
}
34
35
sess, err := s.createSession(ctx, &repo.Repo)
36
if err != nil {
37
+
logger.Error("error creating new session for refresh", "error", err)
38
return helpers.ServerError(e, nil)
39
}
40
+3
-2
server/handle_server_request_account_delete.go
+3
-2
server/handle_server_request_account_delete.go
···
11
12
func (s *Server) handleServerRequestAccountDelete(e echo.Context) error {
13
ctx := e.Request().Context()
14
15
urepo := e.Get("repo").(*models.RepoActor)
16
···
18
expiresAt := time.Now().UTC().Add(15 * time.Minute)
19
20
if err := s.db.Exec(ctx, "UPDATE repos SET account_delete_code = ?, account_delete_code_expires_at = ? WHERE did = ?", nil, token, expiresAt, urepo.Repo.Did).Error; err != nil {
21
-
s.logger.Error("error setting deletion token", "error", err)
22
return helpers.ServerError(e, nil)
23
}
24
25
if urepo.Email != "" {
26
if err := s.sendAccountDeleteEmail(urepo.Email, urepo.Actor.Handle, token); err != nil {
27
-
s.logger.Error("error sending account deletion email", "error", err)
28
}
29
}
30
···
11
12
func (s *Server) handleServerRequestAccountDelete(e echo.Context) error {
13
ctx := e.Request().Context()
14
+
logger := s.logger.With("name", "handleServerRequestAccountDelete")
15
16
urepo := e.Get("repo").(*models.RepoActor)
17
···
19
expiresAt := time.Now().UTC().Add(15 * time.Minute)
20
21
if err := s.db.Exec(ctx, "UPDATE repos SET account_delete_code = ?, account_delete_code_expires_at = ? WHERE did = ?", nil, token, expiresAt, urepo.Repo.Did).Error; err != nil {
22
+
logger.Error("error setting deletion token", "error", err)
23
return helpers.ServerError(e, nil)
24
}
25
26
if urepo.Email != "" {
27
if err := s.sendAccountDeleteEmail(urepo.Email, urepo.Actor.Handle, token); err != nil {
28
+
logger.Error("error sending account deletion email", "error", err)
29
}
30
}
31
+3
-2
server/handle_server_request_email_confirmation.go
+3
-2
server/handle_server_request_email_confirmation.go
···
12
13
func (s *Server) handleServerRequestEmailConfirmation(e echo.Context) error {
14
ctx := e.Request().Context()
15
16
urepo := e.Get("repo").(*models.RepoActor)
17
···
23
eat := time.Now().Add(10 * time.Minute).UTC()
24
25
if err := s.db.Exec(ctx, "UPDATE repos SET email_verification_code = ?, email_verification_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
26
-
s.logger.Error("error updating user", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
30
if err := s.sendEmailVerification(urepo.Email, urepo.Handle, code); err != nil {
31
-
s.logger.Error("error sending mail", "error", err)
32
return helpers.ServerError(e, nil)
33
}
34
···
12
13
func (s *Server) handleServerRequestEmailConfirmation(e echo.Context) error {
14
ctx := e.Request().Context()
15
+
logger := s.logger.With("name", "handleServerRequestEmailConfirm")
16
17
urepo := e.Get("repo").(*models.RepoActor)
18
···
24
eat := time.Now().Add(10 * time.Minute).UTC()
25
26
if err := s.db.Exec(ctx, "UPDATE repos SET email_verification_code = ?, email_verification_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
27
+
logger.Error("error updating user", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
31
if err := s.sendEmailVerification(urepo.Email, urepo.Handle, code); err != nil {
32
+
logger.Error("error sending mail", "error", err)
33
return helpers.ServerError(e, nil)
34
}
35
+3
-2
server/handle_server_request_email_update.go
+3
-2
server/handle_server_request_email_update.go
···
15
16
func (s *Server) handleServerRequestEmailUpdate(e echo.Context) error {
17
ctx := e.Request().Context()
18
19
urepo := e.Get("repo").(*models.RepoActor)
20
···
23
eat := time.Now().Add(10 * time.Minute).UTC()
24
25
if err := s.db.Exec(ctx, "UPDATE repos SET email_update_code = ?, email_update_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
26
-
s.logger.Error("error updating repo", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
30
if err := s.sendEmailUpdate(urepo.Email, urepo.Handle, code); err != nil {
31
-
s.logger.Error("error sending email", "error", err)
32
return helpers.ServerError(e, nil)
33
}
34
}
···
15
16
func (s *Server) handleServerRequestEmailUpdate(e echo.Context) error {
17
ctx := e.Request().Context()
18
+
logger := s.logger.With("name", "handleServerRequestEmailUpdate")
19
20
urepo := e.Get("repo").(*models.RepoActor)
21
···
24
eat := time.Now().Add(10 * time.Minute).UTC()
25
26
if err := s.db.Exec(ctx, "UPDATE repos SET email_update_code = ?, email_update_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
27
+
logger.Error("error updating repo", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
31
if err := s.sendEmailUpdate(urepo.Email, urepo.Handle, code); err != nil {
32
+
logger.Error("error sending email", "error", err)
33
return helpers.ServerError(e, nil)
34
}
35
}
+3
-2
server/handle_server_request_password_reset.go
+3
-2
server/handle_server_request_password_reset.go
···
15
16
func (s *Server) handleServerRequestPasswordReset(e echo.Context) error {
17
ctx := e.Request().Context()
18
19
urepo, ok := e.Get("repo").(*models.RepoActor)
20
if !ok {
···
39
eat := time.Now().Add(10 * time.Minute).UTC()
40
41
if err := s.db.Exec(ctx, "UPDATE repos SET password_reset_code = ?, password_reset_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
42
-
s.logger.Error("error updating repo", "error", err)
43
return helpers.ServerError(e, nil)
44
}
45
46
if err := s.sendPasswordReset(urepo.Email, urepo.Handle, code); err != nil {
47
-
s.logger.Error("error sending email", "error", err)
48
return helpers.ServerError(e, nil)
49
}
50
···
15
16
func (s *Server) handleServerRequestPasswordReset(e echo.Context) error {
17
ctx := e.Request().Context()
18
+
logger := s.logger.With("name", "handleServerRequestPasswordReset")
19
20
urepo, ok := e.Get("repo").(*models.RepoActor)
21
if !ok {
···
40
eat := time.Now().Add(10 * time.Minute).UTC()
41
42
if err := s.db.Exec(ctx, "UPDATE repos SET password_reset_code = ?, password_reset_code_expires_at = ? WHERE did = ?", nil, code, eat, urepo.Repo.Did).Error; err != nil {
43
+
logger.Error("error updating repo", "error", err)
44
return helpers.ServerError(e, nil)
45
}
46
47
if err := s.sendPasswordReset(urepo.Email, urepo.Handle, code); err != nil {
48
+
logger.Error("error sending email", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
+6
-5
server/handle_server_reserve_signing_key.go
+6
-5
server/handle_server_reserve_signing_key.go
···
20
21
func (s *Server) handleServerReserveSigningKey(e echo.Context) error {
22
ctx := e.Request().Context()
23
24
var req ServerReserveSigningKeyRequest
25
if err := e.Bind(&req); err != nil {
26
-
s.logger.Error("could not bind reserve signing key request", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
···
38
39
k, err := atcrypto.GeneratePrivateKeyK256()
40
if err != nil {
41
-
s.logger.Error("error creating signing key", "endpoint", "com.atproto.server.reserveSigningKey", "error", err)
42
return helpers.ServerError(e, nil)
43
}
44
45
pubKey, err := k.PublicKey()
46
if err != nil {
47
-
s.logger.Error("error getting public key", "endpoint", "com.atproto.server.reserveSigningKey", "error", err)
48
return helpers.ServerError(e, nil)
49
}
50
···
58
}
59
60
if err := s.db.Create(ctx, &reservedKey, nil).Error; err != nil {
61
-
s.logger.Error("error storing reserved key", "endpoint", "com.atproto.server.reserveSigningKey", "error", err)
62
return helpers.ServerError(e, nil)
63
}
64
65
-
s.logger.Info("reserved signing key", "keyDid", keyDid, "forDid", req.Did)
66
67
return e.JSON(200, ServerReserveSigningKeyResponse{
68
SigningKey: keyDid,
···
20
21
func (s *Server) handleServerReserveSigningKey(e echo.Context) error {
22
ctx := e.Request().Context()
23
+
logger := s.logger.With("name", "handleServerReserveSigningKey")
24
25
var req ServerReserveSigningKeyRequest
26
if err := e.Bind(&req); err != nil {
27
+
logger.Error("could not bind reserve signing key request", "error", err)
28
return helpers.ServerError(e, nil)
29
}
30
···
39
40
k, err := atcrypto.GeneratePrivateKeyK256()
41
if err != nil {
42
+
logger.Error("error creating signing key", "endpoint", "com.atproto.server.reserveSigningKey", "error", err)
43
return helpers.ServerError(e, nil)
44
}
45
46
pubKey, err := k.PublicKey()
47
if err != nil {
48
+
logger.Error("error getting public key", "endpoint", "com.atproto.server.reserveSigningKey", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
···
59
}
60
61
if err := s.db.Create(ctx, &reservedKey, nil).Error; err != nil {
62
+
logger.Error("error storing reserved key", "endpoint", "com.atproto.server.reserveSigningKey", "error", err)
63
return helpers.ServerError(e, nil)
64
}
65
66
+
logger.Info("reserved signing key", "keyDid", keyDid, "forDid", req.Did)
67
68
return e.JSON(200, ServerReserveSigningKeyResponse{
69
SigningKey: keyDid,
+4
-3
server/handle_server_reset_password.go
+4
-3
server/handle_server_reset_password.go
···
17
18
func (s *Server) handleServerResetPassword(e echo.Context) error {
19
ctx := e.Request().Context()
20
21
urepo := e.Get("repo").(*models.RepoActor)
22
23
var req ComAtprotoServerResetPasswordRequest
24
if err := e.Bind(&req); err != nil {
25
-
s.logger.Error("error binding", "error", err)
26
return helpers.ServerError(e, nil)
27
}
28
···
44
45
hash, err := bcrypt.GenerateFromPassword([]byte(req.Password), 10)
46
if err != nil {
47
-
s.logger.Error("error creating hash", "error", err)
48
return helpers.ServerError(e, nil)
49
}
50
51
if err := s.db.Exec(ctx, "UPDATE repos SET password_reset_code = NULL, password_reset_code_expires_at = NULL, password = ? WHERE did = ?", nil, hash, urepo.Repo.Did).Error; err != nil {
52
-
s.logger.Error("error updating repo", "error", err)
53
return helpers.ServerError(e, nil)
54
}
55
···
17
18
func (s *Server) handleServerResetPassword(e echo.Context) error {
19
ctx := e.Request().Context()
20
+
logger := s.logger.With("name", "handleServerResetPassword")
21
22
urepo := e.Get("repo").(*models.RepoActor)
23
24
var req ComAtprotoServerResetPasswordRequest
25
if err := e.Bind(&req); err != nil {
26
+
logger.Error("error binding", "error", err)
27
return helpers.ServerError(e, nil)
28
}
29
···
45
46
hash, err := bcrypt.GenerateFromPassword([]byte(req.Password), 10)
47
if err != nil {
48
+
logger.Error("error creating hash", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
52
if err := s.db.Exec(ctx, "UPDATE repos SET password_reset_code = NULL, password_reset_code_expires_at = NULL, password = ? WHERE did = ?", nil, hash, urepo.Repo.Did).Error; err != nil {
53
+
logger.Error("error updating repo", "error", err)
54
return helpers.ServerError(e, nil)
55
}
56
+3
-1
server/handle_server_resolve_handle.go
+3
-1
server/handle_server_resolve_handle.go
···
10
)
11
12
func (s *Server) handleResolveHandle(e echo.Context) error {
13
type Resp struct {
14
Did string `json:"did"`
15
}
···
28
ctx := context.WithValue(e.Request().Context(), "skip-cache", true)
29
did, err := s.passport.ResolveHandle(ctx, parsed.String())
30
if err != nil {
31
-
s.logger.Error("error resolving handle", "error", err)
32
return helpers.ServerError(e, nil)
33
}
34
···
10
)
11
12
func (s *Server) handleResolveHandle(e echo.Context) error {
13
+
logger := s.logger.With("name", "handleServerResolveHandle")
14
+
15
type Resp struct {
16
Did string `json:"did"`
17
}
···
30
ctx := context.WithValue(e.Request().Context(), "skip-cache", true)
31
did, err := s.passport.ResolveHandle(ctx, parsed.String())
32
if err != nil {
33
+
logger.Error("error resolving handle", "error", err)
34
return helpers.ServerError(e, nil)
35
}
36
+3
-2
server/handle_server_update_email.go
+3
-2
server/handle_server_update_email.go
···
16
17
func (s *Server) handleServerUpdateEmail(e echo.Context) error {
18
ctx := e.Request().Context()
19
20
urepo := e.Get("repo").(*models.RepoActor)
21
22
var req ComAtprotoServerUpdateEmailRequest
23
if err := e.Bind(&req); err != nil {
24
-
s.logger.Error("error binding", "error", err)
25
return helpers.ServerError(e, nil)
26
}
27
···
42
}
43
44
if err := s.db.Exec(ctx, "UPDATE repos SET email_update_code = NULL, email_update_code_expires_at = NULL, email_confirmed_at = NULL, email = ? WHERE did = ?", nil, req.Email, urepo.Repo.Did).Error; err != nil {
45
-
s.logger.Error("error updating repo", "error", err)
46
return helpers.ServerError(e, nil)
47
}
48
···
16
17
func (s *Server) handleServerUpdateEmail(e echo.Context) error {
18
ctx := e.Request().Context()
19
+
logger := s.logger.With("name", "handleServerUpdateEmail")
20
21
urepo := e.Get("repo").(*models.RepoActor)
22
23
var req ComAtprotoServerUpdateEmailRequest
24
if err := e.Bind(&req); err != nil {
25
+
logger.Error("error binding", "error", err)
26
return helpers.ServerError(e, nil)
27
}
28
···
43
}
44
45
if err := s.db.Exec(ctx, "UPDATE repos SET email_update_code = NULL, email_update_code_expires_at = NULL, email_confirmed_at = NULL, email = ? WHERE did = ?", nil, req.Email, urepo.Repo.Did).Error; err != nil {
46
+
logger.Error("error updating repo", "error", err)
47
return helpers.ServerError(e, nil)
48
}
49
+9
-8
server/handle_sync_get_blob.go
+9
-8
server/handle_sync_get_blob.go
···
18
19
func (s *Server) handleSyncGetBlob(e echo.Context) error {
20
ctx := e.Request().Context()
21
22
did := e.QueryParam("did")
23
if did == "" {
···
36
37
urepo, err := s.getRepoActorByDid(ctx, did)
38
if err != nil {
39
-
s.logger.Error("could not find user for requested blob", "error", err)
40
return helpers.InputError(e, nil)
41
}
42
···
49
50
var blob models.Blob
51
if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil {
52
-
s.logger.Error("error looking up blob", "error", err)
53
return helpers.ServerError(e, nil)
54
}
55
···
58
if blob.Storage == "sqlite" {
59
var parts []models.BlobPart
60
if err := s.db.Raw(ctx, "SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
61
-
s.logger.Error("error getting blob parts", "error", err)
62
return helpers.ServerError(e, nil)
63
}
64
···
68
}
69
} else if blob.Storage == "s3" {
70
if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) {
71
-
s.logger.Error("s3 storage disabled")
72
return helpers.ServerError(e, nil)
73
}
74
···
91
92
sess, err := session.NewSession(config)
93
if err != nil {
94
-
s.logger.Error("error creating aws session", "error", err)
95
return helpers.ServerError(e, nil)
96
}
97
···
100
Bucket: aws.String(s.s3Config.Bucket),
101
Key: aws.String(blobKey),
102
}); err != nil {
103
-
s.logger.Error("error getting blob from s3", "error", err)
104
return helpers.ServerError(e, nil)
105
} else {
106
read := 0
···
114
break
115
}
116
} else if err != nil && err != io.ErrUnexpectedEOF {
117
-
s.logger.Error("error reading blob", "error", err)
118
return helpers.ServerError(e, nil)
119
}
120
···
125
}
126
}
127
} else {
128
-
s.logger.Error("unknown storage", "storage", blob.Storage)
129
return helpers.ServerError(e, nil)
130
}
131
···
18
19
func (s *Server) handleSyncGetBlob(e echo.Context) error {
20
ctx := e.Request().Context()
21
+
logger := s.logger.With("name", "handleSyncGetBlob")
22
23
did := e.QueryParam("did")
24
if did == "" {
···
37
38
urepo, err := s.getRepoActorByDid(ctx, did)
39
if err != nil {
40
+
logger.Error("could not find user for requested blob", "error", err)
41
return helpers.InputError(e, nil)
42
}
43
···
50
51
var blob models.Blob
52
if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil {
53
+
logger.Error("error looking up blob", "error", err)
54
return helpers.ServerError(e, nil)
55
}
56
···
59
if blob.Storage == "sqlite" {
60
var parts []models.BlobPart
61
if err := s.db.Raw(ctx, "SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
62
+
logger.Error("error getting blob parts", "error", err)
63
return helpers.ServerError(e, nil)
64
}
65
···
69
}
70
} else if blob.Storage == "s3" {
71
if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) {
72
+
logger.Error("s3 storage disabled")
73
return helpers.ServerError(e, nil)
74
}
75
···
92
93
sess, err := session.NewSession(config)
94
if err != nil {
95
+
logger.Error("error creating aws session", "error", err)
96
return helpers.ServerError(e, nil)
97
}
98
···
101
Bucket: aws.String(s.s3Config.Bucket),
102
Key: aws.String(blobKey),
103
}); err != nil {
104
+
logger.Error("error getting blob from s3", "error", err)
105
return helpers.ServerError(e, nil)
106
} else {
107
read := 0
···
115
break
116
}
117
} else if err != nil && err != io.ErrUnexpectedEOF {
118
+
logger.Error("error reading blob", "error", err)
119
return helpers.ServerError(e, nil)
120
}
121
···
126
}
127
}
128
} else {
129
+
logger.Error("unknown storage", "storage", blob.Storage)
130
return helpers.ServerError(e, nil)
131
}
132
+2
-1
server/handle_sync_get_blocks.go
+2
-1
server/handle_sync_get_blocks.go
···
18
19
func (s *Server) handleGetBlocks(e echo.Context) error {
20
ctx := e.Request().Context()
21
22
var req ComAtprotoSyncGetBlocksRequest
23
if err := e.Bind(&req); err != nil {
···
52
})
53
54
if _, err := carstore.LdWrite(buf, hb); err != nil {
55
-
s.logger.Error("error writing to car", "error", err)
56
return helpers.ServerError(e, nil)
57
}
58
···
18
19
func (s *Server) handleGetBlocks(e echo.Context) error {
20
ctx := e.Request().Context()
21
+
logger := s.logger.With("name", "handleSyncGetBlocks")
22
23
var req ComAtprotoSyncGetBlocksRequest
24
if err := e.Bind(&req); err != nil {
···
53
})
54
55
if _, err := carstore.LdWrite(buf, hb); err != nil {
56
+
logger.Error("error writing to car", "error", err)
57
return helpers.ServerError(e, nil)
58
}
59
+4
-3
server/handle_sync_get_record.go
+4
-3
server/handle_sync_get_record.go
···
14
15
func (s *Server) handleSyncGetRecord(e echo.Context) error {
16
ctx := e.Request().Context()
17
18
did := e.QueryParam("did")
19
collection := e.QueryParam("collection")
···
21
22
var urepo models.Repo
23
if err := s.db.Raw(ctx, "SELECT * FROM repos WHERE did = ?", nil, did).Scan(&urepo).Error; err != nil {
24
-
s.logger.Error("error getting repo", "error", err)
25
return helpers.ServerError(e, nil)
26
}
27
···
38
})
39
40
if _, err := carstore.LdWrite(buf, hb); err != nil {
41
-
s.logger.Error("error writing to car", "error", err)
42
return helpers.ServerError(e, nil)
43
}
44
45
for _, blk := range blocks {
46
if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
47
-
s.logger.Error("error writing to car", "error", err)
48
return helpers.ServerError(e, nil)
49
}
50
}
···
14
15
func (s *Server) handleSyncGetRecord(e echo.Context) error {
16
ctx := e.Request().Context()
17
+
logger := s.logger.With("name", "handleSyncGetRecord")
18
19
did := e.QueryParam("did")
20
collection := e.QueryParam("collection")
···
22
23
var urepo models.Repo
24
if err := s.db.Raw(ctx, "SELECT * FROM repos WHERE did = ?", nil, did).Scan(&urepo).Error; err != nil {
25
+
logger.Error("error getting repo", "error", err)
26
return helpers.ServerError(e, nil)
27
}
28
···
39
})
40
41
if _, err := carstore.LdWrite(buf, hb); err != nil {
42
+
logger.Error("error writing to car", "error", err)
43
return helpers.ServerError(e, nil)
44
}
45
46
for _, blk := range blocks {
47
if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
48
+
logger.Error("error writing to car", "error", err)
49
return helpers.ServerError(e, nil)
50
}
51
}
+2
-1
server/handle_sync_get_repo.go
+2
-1
server/handle_sync_get_repo.go
···
14
15
func (s *Server) handleSyncGetRepo(e echo.Context) error {
16
ctx := e.Request().Context()
17
18
did := e.QueryParam("did")
19
if did == "" {
···
38
buf := new(bytes.Buffer)
39
40
if _, err := carstore.LdWrite(buf, hb); err != nil {
41
-
s.logger.Error("error writing to car", "error", err)
42
return helpers.ServerError(e, nil)
43
}
44
···
14
15
func (s *Server) handleSyncGetRepo(e echo.Context) error {
16
ctx := e.Request().Context()
17
+
logger := s.logger.With("name", "handleSyncGetRepo")
18
19
did := e.QueryParam("did")
20
if did == "" {
···
39
buf := new(bytes.Buffer)
40
41
if _, err := carstore.LdWrite(buf, hb); err != nil {
42
+
logger.Error("error writing to car", "error", err)
43
return helpers.ServerError(e, nil)
44
}
45
+4
-3
server/handle_sync_list_blobs.go
+4
-3
server/handle_sync_list_blobs.go
···
15
16
func (s *Server) handleSyncListBlobs(e echo.Context) error {
17
ctx := e.Request().Context()
18
19
did := e.QueryParam("did")
20
if did == "" {
···
39
40
urepo, err := s.getRepoActorByDid(ctx, did)
41
if err != nil {
42
-
s.logger.Error("could not find user for requested blobs", "error", err)
43
return helpers.InputError(e, nil)
44
}
45
···
52
53
var blobs []models.Blob
54
if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? "+cursorquery+" ORDER BY created_at DESC LIMIT ?", nil, params...).Scan(&blobs).Error; err != nil {
55
-
s.logger.Error("error getting records", "error", err)
56
return helpers.ServerError(e, nil)
57
}
58
···
60
for _, b := range blobs {
61
c, err := cid.Cast(b.Cid)
62
if err != nil {
63
-
s.logger.Error("error casting cid", "error", err)
64
return helpers.ServerError(e, nil)
65
}
66
cstrs = append(cstrs, c.String())
···
15
16
func (s *Server) handleSyncListBlobs(e echo.Context) error {
17
ctx := e.Request().Context()
18
+
logger := s.logger.With("name", "handleSyncListBlobs")
19
20
did := e.QueryParam("did")
21
if did == "" {
···
40
41
urepo, err := s.getRepoActorByDid(ctx, did)
42
if err != nil {
43
+
logger.Error("could not find user for requested blobs", "error", err)
44
return helpers.InputError(e, nil)
45
}
46
···
53
54
var blobs []models.Blob
55
if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? "+cursorquery+" ORDER BY created_at DESC LIMIT ?", nil, params...).Scan(&blobs).Error; err != nil {
56
+
logger.Error("error getting records", "error", err)
57
return helpers.ServerError(e, nil)
58
}
59
···
61
for _, b := range blobs {
62
c, err := cid.Cast(b.Cid)
63
if err != nil {
64
+
logger.Error("error casting cid", "error", err)
65
return helpers.ServerError(e, nil)
66
}
67
cstrs = append(cstrs, c.String())
+54
-42
server/handle_sync_subscribe_repos.go
+54
-42
server/handle_sync_subscribe_repos.go
···
7
"github.com/bluesky-social/indigo/events"
8
"github.com/bluesky-social/indigo/lex/util"
9
"github.com/btcsuite/websocket"
10
"github.com/labstack/echo/v4"
11
)
12
···
24
logger = logger.With("ident", ident)
25
logger.Info("new connection established")
26
27
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
28
return true
29
}, nil)
···
34
35
header := events.EventHeader{Op: events.EvtKindMessage}
36
for evt := range evts {
37
-
wc, err := conn.NextWriter(websocket.BinaryMessage)
38
-
if err != nil {
39
-
logger.Error("error writing message to relay", "err", err)
40
-
break
41
-
}
42
43
-
if ctx.Err() != nil {
44
-
logger.Error("context error", "err", err)
45
-
break
46
-
}
47
48
-
var obj util.CBOR
49
-
switch {
50
-
case evt.Error != nil:
51
-
header.Op = events.EvtKindErrorFrame
52
-
obj = evt.Error
53
-
case evt.RepoCommit != nil:
54
-
header.MsgType = "#commit"
55
-
obj = evt.RepoCommit
56
-
case evt.RepoIdentity != nil:
57
-
header.MsgType = "#identity"
58
-
obj = evt.RepoIdentity
59
-
case evt.RepoAccount != nil:
60
-
header.MsgType = "#account"
61
-
obj = evt.RepoAccount
62
-
case evt.RepoInfo != nil:
63
-
header.MsgType = "#info"
64
-
obj = evt.RepoInfo
65
-
default:
66
-
logger.Warn("unrecognized event kind")
67
-
return nil
68
-
}
69
70
-
if err := header.MarshalCBOR(wc); err != nil {
71
-
logger.Error("failed to write header to relay", "err", err)
72
-
break
73
-
}
74
75
-
if err := obj.MarshalCBOR(wc); err != nil {
76
-
logger.Error("failed to write event to relay", "err", err)
77
-
break
78
-
}
79
80
-
if err := wc.Close(); err != nil {
81
-
logger.Error("failed to flush-close our event write", "err", err)
82
-
break
83
-
}
84
}
85
86
// we should tell the relay to request a new crawl at this point if we got disconnected
···
7
"github.com/bluesky-social/indigo/events"
8
"github.com/bluesky-social/indigo/lex/util"
9
"github.com/btcsuite/websocket"
10
+
"github.com/haileyok/cocoon/metrics"
11
"github.com/labstack/echo/v4"
12
)
13
···
25
logger = logger.With("ident", ident)
26
logger.Info("new connection established")
27
28
+
metrics.RelaysConnected.WithLabelValues(ident).Inc()
29
+
defer func() {
30
+
metrics.RelaysConnected.WithLabelValues(ident).Dec()
31
+
}()
32
+
33
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
34
return true
35
}, nil)
···
40
41
header := events.EventHeader{Op: events.EvtKindMessage}
42
for evt := range evts {
43
+
func() {
44
+
defer func() {
45
+
metrics.RelaySends.WithLabelValues(header.MsgType).Inc()
46
+
}()
47
48
+
wc, err := conn.NextWriter(websocket.BinaryMessage)
49
+
if err != nil {
50
+
logger.Error("error writing message to relay", "err", err)
51
+
return
52
+
}
53
54
+
if ctx.Err() != nil {
55
+
logger.Error("context error", "err", err)
56
+
return
57
+
}
58
59
+
var obj util.CBOR
60
+
switch {
61
+
case evt.Error != nil:
62
+
header.Op = events.EvtKindErrorFrame
63
+
obj = evt.Error
64
+
case evt.RepoCommit != nil:
65
+
header.MsgType = "#commit"
66
+
obj = evt.RepoCommit
67
+
case evt.RepoIdentity != nil:
68
+
header.MsgType = "#identity"
69
+
obj = evt.RepoIdentity
70
+
case evt.RepoAccount != nil:
71
+
header.MsgType = "#account"
72
+
obj = evt.RepoAccount
73
+
case evt.RepoInfo != nil:
74
+
header.MsgType = "#info"
75
+
obj = evt.RepoInfo
76
+
default:
77
+
logger.Warn("unrecognized event kind")
78
+
return
79
+
}
80
81
+
if err := header.MarshalCBOR(wc); err != nil {
82
+
logger.Error("failed to write header to relay", "err", err)
83
+
return
84
+
}
85
86
+
if err := obj.MarshalCBOR(wc); err != nil {
87
+
logger.Error("failed to write event to relay", "err", err)
88
+
return
89
+
}
90
+
91
+
if err := wc.Close(); err != nil {
92
+
logger.Error("failed to flush-close our event write", "err", err)
93
+
return
94
+
}
95
+
}()
96
}
97
98
// we should tell the relay to request a new crawl at this point if we got disconnected
+2
-1
server/handle_well_known.go
+2
-1
server/handle_well_known.go
···
68
69
func (s *Server) handleAtprotoDid(e echo.Context) error {
70
ctx := e.Request().Context()
71
72
host := e.Request().Host
73
if host == "" {
···
91
if err == gorm.ErrRecordNotFound {
92
return e.NoContent(404)
93
}
94
-
s.logger.Error("error looking up actor by handle", "error", err)
95
return helpers.ServerError(e, nil)
96
}
97
···
68
69
func (s *Server) handleAtprotoDid(e echo.Context) error {
70
ctx := e.Request().Context()
71
+
logger := s.logger.With("name", "handleAtprotoDid")
72
73
host := e.Request().Host
74
if host == "" {
···
92
if err == gorm.ErrRecordNotFound {
93
return e.NoContent(404)
94
}
95
+
logger.Error("error looking up actor by handle", "error", err)
96
return helpers.ServerError(e, nil)
97
}
98
+18
-16
server/middleware.go
+18
-16
server/middleware.go
···
38
func (s *Server) handleLegacySessionMiddleware(next echo.HandlerFunc) echo.HandlerFunc {
39
return func(e echo.Context) error {
40
ctx := e.Request().Context()
41
42
authheader := e.Request().Header.Get("authorization")
43
if authheader == "" {
···
69
if hasLxm {
70
pts := strings.Split(e.Request().URL.String(), "/")
71
if lxm != pts[len(pts)-1] {
72
-
s.logger.Error("service auth lxm incorrect", "lxm", lxm, "expected", pts[len(pts)-1], "error", err)
73
return helpers.InputError(e, nil)
74
}
75
76
maybeDid, ok := claims["iss"].(string)
77
if !ok {
78
-
s.logger.Error("no iss in service auth token", "error", err)
79
return helpers.InputError(e, nil)
80
}
81
did = maybeDid
82
83
maybeRepo, err := s.getRepoActorByDid(ctx, did)
84
if err != nil {
85
-
s.logger.Error("error fetching repo", "error", err)
86
return helpers.ServerError(e, nil)
87
}
88
repo = maybeRepo
···
96
return s.privateKey.Public(), nil
97
})
98
if err != nil {
99
-
s.logger.Error("error parsing jwt", "error", err)
100
return helpers.ExpiredTokenError(e)
101
}
102
···
109
hash := sha256.Sum256([]byte(signingInput))
110
sigBytes, err := base64.RawURLEncoding.DecodeString(kpts[2])
111
if err != nil {
112
-
s.logger.Error("error decoding signature bytes", "error", err)
113
return helpers.ServerError(e, nil)
114
}
115
116
if len(sigBytes) != 64 {
117
-
s.logger.Error("incorrect sigbytes length", "length", len(sigBytes))
118
return helpers.ServerError(e, nil)
119
}
120
···
140
141
sk, err := secp256k1secec.NewPrivateKey(repo.SigningKey)
142
if err != nil {
143
-
s.logger.Error("can't load private key", "error", err)
144
return err
145
}
146
147
pubKey, ok := sk.Public().(*secp256k1secec.PublicKey)
148
if !ok {
149
-
s.logger.Error("error getting public key from sk")
150
return helpers.ServerError(e, nil)
151
}
152
153
verified := pubKey.VerifyRaw(hash[:], rr, ss)
154
if !verified {
155
-
s.logger.Error("error verifying", "error", err)
156
return helpers.ServerError(e, nil)
157
}
158
}
···
181
return helpers.InvalidTokenError(e)
182
}
183
184
-
s.logger.Error("error getting token from db", "error", err)
185
return helpers.ServerError(e, nil)
186
}
187
···
192
193
exp, ok := claims["exp"].(float64)
194
if !ok {
195
-
s.logger.Error("error getting iat from token")
196
return helpers.ServerError(e, nil)
197
}
198
···
203
if repo == nil {
204
maybeRepo, err := s.getRepoActorByDid(ctx, claims["sub"].(string))
205
if err != nil {
206
-
s.logger.Error("error fetching repo", "error", err)
207
return helpers.ServerError(e, nil)
208
}
209
repo = maybeRepo
···
225
func (s *Server) handleOauthSessionMiddleware(next echo.HandlerFunc) echo.HandlerFunc {
226
return func(e echo.Context) error {
227
ctx := e.Request().Context()
228
229
authheader := e.Request().Header.Get("authorization")
230
if authheader == "" {
···
257
"error": "use_dpop_nonce",
258
})
259
}
260
-
s.logger.Error("invalid dpop proof", "error", err)
261
return helpers.InputError(e, nil)
262
}
263
264
var oauthToken provider.OauthToken
265
if err := s.db.Raw(ctx, "SELECT * FROM oauth_tokens WHERE token = ?", nil, accessToken).Scan(&oauthToken).Error; err != nil {
266
-
s.logger.Error("error finding access token in db", "error", err)
267
return helpers.InputError(e, nil)
268
}
269
···
272
}
273
274
if *oauthToken.Parameters.DpopJkt != proof.JKT {
275
-
s.logger.Error("jkt mismatch", "token", oauthToken.Parameters.DpopJkt, "proof", proof.JKT)
276
return helpers.InputError(e, to.StringPtr("dpop jkt mismatch"))
277
}
278
···
287
288
repo, err := s.getRepoActorByDid(ctx, oauthToken.Sub)
289
if err != nil {
290
-
s.logger.Error("could not find actor in db", "error", err)
291
return helpers.ServerError(e, nil)
292
}
293
···
38
func (s *Server) handleLegacySessionMiddleware(next echo.HandlerFunc) echo.HandlerFunc {
39
return func(e echo.Context) error {
40
ctx := e.Request().Context()
41
+
logger := s.logger.With("name", "handleLegacySessionMiddleware")
42
43
authheader := e.Request().Header.Get("authorization")
44
if authheader == "" {
···
70
if hasLxm {
71
pts := strings.Split(e.Request().URL.String(), "/")
72
if lxm != pts[len(pts)-1] {
73
+
logger.Error("service auth lxm incorrect", "lxm", lxm, "expected", pts[len(pts)-1], "error", err)
74
return helpers.InputError(e, nil)
75
}
76
77
maybeDid, ok := claims["iss"].(string)
78
if !ok {
79
+
logger.Error("no iss in service auth token", "error", err)
80
return helpers.InputError(e, nil)
81
}
82
did = maybeDid
83
84
maybeRepo, err := s.getRepoActorByDid(ctx, did)
85
if err != nil {
86
+
logger.Error("error fetching repo", "error", err)
87
return helpers.ServerError(e, nil)
88
}
89
repo = maybeRepo
···
97
return s.privateKey.Public(), nil
98
})
99
if err != nil {
100
+
logger.Error("error parsing jwt", "error", err)
101
return helpers.ExpiredTokenError(e)
102
}
103
···
110
hash := sha256.Sum256([]byte(signingInput))
111
sigBytes, err := base64.RawURLEncoding.DecodeString(kpts[2])
112
if err != nil {
113
+
logger.Error("error decoding signature bytes", "error", err)
114
return helpers.ServerError(e, nil)
115
}
116
117
if len(sigBytes) != 64 {
118
+
logger.Error("incorrect sigbytes length", "length", len(sigBytes))
119
return helpers.ServerError(e, nil)
120
}
121
···
141
142
sk, err := secp256k1secec.NewPrivateKey(repo.SigningKey)
143
if err != nil {
144
+
logger.Error("can't load private key", "error", err)
145
return err
146
}
147
148
pubKey, ok := sk.Public().(*secp256k1secec.PublicKey)
149
if !ok {
150
+
logger.Error("error getting public key from sk")
151
return helpers.ServerError(e, nil)
152
}
153
154
verified := pubKey.VerifyRaw(hash[:], rr, ss)
155
if !verified {
156
+
logger.Error("error verifying", "error", err)
157
return helpers.ServerError(e, nil)
158
}
159
}
···
182
return helpers.InvalidTokenError(e)
183
}
184
185
+
logger.Error("error getting token from db", "error", err)
186
return helpers.ServerError(e, nil)
187
}
188
···
193
194
exp, ok := claims["exp"].(float64)
195
if !ok {
196
+
logger.Error("error getting iat from token")
197
return helpers.ServerError(e, nil)
198
}
199
···
204
if repo == nil {
205
maybeRepo, err := s.getRepoActorByDid(ctx, claims["sub"].(string))
206
if err != nil {
207
+
logger.Error("error fetching repo", "error", err)
208
return helpers.ServerError(e, nil)
209
}
210
repo = maybeRepo
···
226
func (s *Server) handleOauthSessionMiddleware(next echo.HandlerFunc) echo.HandlerFunc {
227
return func(e echo.Context) error {
228
ctx := e.Request().Context()
229
+
logger := s.logger.With("name", "handleOauthSessionMiddleware")
230
231
authheader := e.Request().Header.Get("authorization")
232
if authheader == "" {
···
259
"error": "use_dpop_nonce",
260
})
261
}
262
+
logger.Error("invalid dpop proof", "error", err)
263
return helpers.InputError(e, nil)
264
}
265
266
var oauthToken provider.OauthToken
267
if err := s.db.Raw(ctx, "SELECT * FROM oauth_tokens WHERE token = ?", nil, accessToken).Scan(&oauthToken).Error; err != nil {
268
+
logger.Error("error finding access token in db", "error", err)
269
return helpers.InputError(e, nil)
270
}
271
···
274
}
275
276
if *oauthToken.Parameters.DpopJkt != proof.JKT {
277
+
logger.Error("jkt mismatch", "token", oauthToken.Parameters.DpopJkt, "proof", proof.JKT)
278
return helpers.InputError(e, to.StringPtr("dpop jkt mismatch"))
279
}
280
···
289
290
repo, err := s.getRepoActorByDid(ctx, oauthToken.Sub)
291
if err != nil {
292
+
logger.Error("could not find actor in db", "error", err)
293
return helpers.ServerError(e, nil)
294
}
295
+7
server/repo.go
+7
server/repo.go
···
17
lexutil "github.com/bluesky-social/indigo/lex/util"
18
"github.com/bluesky-social/indigo/repo"
19
"github.com/haileyok/cocoon/internal/db"
20
"github.com/haileyok/cocoon/models"
21
"github.com/haileyok/cocoon/recording_blockstore"
22
blocks "github.com/ipfs/go-block-format"
···
249
newroot, rev, err := r.Commit(ctx, urepo.SignFor)
250
if err != nil {
251
return nil, err
252
}
253
254
// create a buffer for dumping our new cbor into
···
17
lexutil "github.com/bluesky-social/indigo/lex/util"
18
"github.com/bluesky-social/indigo/repo"
19
"github.com/haileyok/cocoon/internal/db"
20
+
"github.com/haileyok/cocoon/metrics"
21
"github.com/haileyok/cocoon/models"
22
"github.com/haileyok/cocoon/recording_blockstore"
23
blocks "github.com/ipfs/go-block-format"
···
250
newroot, rev, err := r.Commit(ctx, urepo.SignFor)
251
if err != nil {
252
return nil, err
253
+
}
254
+
255
+
for _, result := range results {
256
+
if result.Type != nil {
257
+
metrics.RepoOperations.WithLabelValues(*result.Type).Inc()
258
+
}
259
}
260
261
// create a buffer for dumping our new cbor into
+38
-27
server/server.go
+38
-27
server/server.go
···
39
"github.com/haileyok/cocoon/oauth/provider"
40
"github.com/haileyok/cocoon/plc"
41
"github.com/ipfs/go-cid"
42
echo_session "github.com/labstack/echo-contrib/session"
43
"github.com/labstack/echo/v4"
44
"github.com/labstack/echo/v4/middleware"
···
89
}
90
91
type Args struct {
92
Addr string
93
DbName string
94
DbType string
95
DatabaseURL string
96
-
Logger *slog.Logger
97
Version string
98
Did string
99
Hostname string
···
209
}
210
211
func New(args *Args) (*Server, error) {
212
if args.Addr == "" {
213
return nil, fmt.Errorf("addr must be set")
214
}
···
237
return nil, fmt.Errorf("admin password must be set")
238
}
239
240
-
if args.Logger == nil {
241
-
args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}))
242
-
}
243
-
244
if args.SessionSecret == "" {
245
panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ")
246
}
···
248
e := echo.New()
249
250
e.Pre(middleware.RemoveTrailingSlash())
251
-
e.Pre(slogecho.New(args.Logger))
252
e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret))))
253
e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
254
AllowOrigins: []string{"*"},
255
AllowHeaders: []string{"*"},
···
311
if err != nil {
312
return nil, fmt.Errorf("failed to connect to postgres: %w", err)
313
}
314
-
args.Logger.Info("connected to PostgreSQL database")
315
default:
316
gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
317
if err != nil {
318
return nil, fmt.Errorf("failed to open sqlite database: %w", err)
319
}
320
-
args.Logger.Info("connected to SQLite database", "path", args.DbName)
321
}
322
dbw := db.NewDB(gdb)
323
···
360
var nonceSecret []byte
361
maybeSecret, err := os.ReadFile("nonce.secret")
362
if err != nil && !os.IsNotExist(err) {
363
-
args.Logger.Error("error attempting to read nonce secret", "error", err)
364
} else {
365
nonceSecret = maybeSecret
366
}
···
398
Hostname: args.Hostname,
399
ClientManagerArgs: client.ManagerArgs{
400
Cli: oauthCli,
401
-
Logger: args.Logger,
402
},
403
DpopManagerArgs: dpop.ManagerArgs{
404
NonceSecret: nonceSecret,
405
NonceRotationInterval: constants.NonceMaxRotationInterval / 3,
406
OnNonceSecretCreated: func(newNonce []byte) {
407
if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil {
408
-
args.Logger.Error("error writing new nonce secret", "error", err)
409
}
410
},
411
-
Logger: args.Logger,
412
Hostname: args.Hostname,
413
},
414
}),
···
535
}
536
537
func (s *Server) Serve(ctx context.Context) error {
538
s.addRoutes()
539
540
-
s.logger.Info("migrating...")
541
542
s.db.AutoMigrate(
543
&models.Actor{},
···
554
&provider.OauthAuthorizationRequest{},
555
)
556
557
-
s.logger.Info("starting cocoon")
558
559
go func() {
560
if err := s.httpd.ListenAndServe(); err != nil {
···
566
567
go func() {
568
if err := s.requestCrawl(ctx); err != nil {
569
-
s.logger.Error("error requesting crawls", "err", err)
570
}
571
}()
572
···
584
585
logger.Info("requesting crawl with configured relays")
586
587
-
if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute {
588
return fmt.Errorf("a crawl request has already been made within the last minute")
589
}
590
···
607
}
608
609
func (s *Server) doBackup() {
610
if s.dbType == "postgres" {
611
-
s.logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)")
612
return
613
}
614
615
start := time.Now()
616
617
-
s.logger.Info("beginning backup to s3...")
618
619
var buf bytes.Buffer
620
if err := func() error {
621
-
s.logger.Info("reading database bytes...")
622
s.db.Lock()
623
defer s.db.Unlock()
624
···
634
635
return nil
636
}(); err != nil {
637
-
s.logger.Error("error backing up database", "error", err)
638
return
639
}
640
641
if err := func() error {
642
-
s.logger.Info("sending to s3...")
643
644
currTime := time.Now().Format("2006-01-02_15-04-05")
645
key := "cocoon-backup-" + currTime + ".db"
···
669
return fmt.Errorf("error uploading file to s3: %w", err)
670
}
671
672
-
s.logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds())
673
674
return nil
675
}(); err != nil {
676
-
s.logger.Error("error uploading database backup", "error", err)
677
return
678
}
679
···
681
}
682
683
func (s *Server) backupRoutine() {
684
if s.s3Config == nil || !s.s3Config.BackupsEnabled {
685
return
686
}
687
688
if s.s3Config.Region == "" {
689
-
s.logger.Warn("no s3 region configured but backups are enabled. backups will not run.")
690
return
691
}
692
693
if s.s3Config.Bucket == "" {
694
-
s.logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.")
695
return
696
}
697
698
if s.s3Config.AccessKey == "" {
699
-
s.logger.Warn("no s3 access key configured but backups are enabled. backups will not run.")
700
return
701
}
702
703
if s.s3Config.SecretKey == "" {
704
-
s.logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.")
705
return
706
}
707
···
39
"github.com/haileyok/cocoon/oauth/provider"
40
"github.com/haileyok/cocoon/plc"
41
"github.com/ipfs/go-cid"
42
+
"github.com/labstack/echo-contrib/echoprometheus"
43
echo_session "github.com/labstack/echo-contrib/session"
44
"github.com/labstack/echo/v4"
45
"github.com/labstack/echo/v4/middleware"
···
90
}
91
92
type Args struct {
93
+
Logger *slog.Logger
94
+
95
Addr string
96
DbName string
97
DbType string
98
DatabaseURL string
99
Version string
100
Did string
101
Hostname string
···
211
}
212
213
func New(args *Args) (*Server, error) {
214
+
if args.Logger == nil {
215
+
args.Logger = slog.Default()
216
+
}
217
+
218
+
logger := args.Logger.With("name", "New")
219
+
220
if args.Addr == "" {
221
return nil, fmt.Errorf("addr must be set")
222
}
···
245
return nil, fmt.Errorf("admin password must be set")
246
}
247
248
if args.SessionSecret == "" {
249
panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ")
250
}
···
252
e := echo.New()
253
254
e.Pre(middleware.RemoveTrailingSlash())
255
+
e.Pre(slogecho.New(args.Logger.With("component", "slogecho")))
256
e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret))))
257
+
e.Use(echoprometheus.NewMiddleware("cocoon"))
258
e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
259
AllowOrigins: []string{"*"},
260
AllowHeaders: []string{"*"},
···
316
if err != nil {
317
return nil, fmt.Errorf("failed to connect to postgres: %w", err)
318
}
319
+
logger.Info("connected to PostgreSQL database")
320
default:
321
gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
322
if err != nil {
323
return nil, fmt.Errorf("failed to open sqlite database: %w", err)
324
}
325
+
logger.Info("connected to SQLite database", "path", args.DbName)
326
}
327
dbw := db.NewDB(gdb)
328
···
365
var nonceSecret []byte
366
maybeSecret, err := os.ReadFile("nonce.secret")
367
if err != nil && !os.IsNotExist(err) {
368
+
logger.Error("error attempting to read nonce secret", "error", err)
369
} else {
370
nonceSecret = maybeSecret
371
}
···
403
Hostname: args.Hostname,
404
ClientManagerArgs: client.ManagerArgs{
405
Cli: oauthCli,
406
+
Logger: args.Logger.With("component", "oauth-client-manager"),
407
},
408
DpopManagerArgs: dpop.ManagerArgs{
409
NonceSecret: nonceSecret,
410
NonceRotationInterval: constants.NonceMaxRotationInterval / 3,
411
OnNonceSecretCreated: func(newNonce []byte) {
412
if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil {
413
+
logger.Error("error writing new nonce secret", "error", err)
414
}
415
},
416
+
Logger: args.Logger.With("component", "dpop-manager"),
417
Hostname: args.Hostname,
418
},
419
}),
···
540
}
541
542
func (s *Server) Serve(ctx context.Context) error {
543
+
logger := s.logger.With("name", "Serve")
544
+
545
s.addRoutes()
546
547
+
logger.Info("migrating...")
548
549
s.db.AutoMigrate(
550
&models.Actor{},
···
561
&provider.OauthAuthorizationRequest{},
562
)
563
564
+
logger.Info("starting cocoon")
565
566
go func() {
567
if err := s.httpd.ListenAndServe(); err != nil {
···
573
574
go func() {
575
if err := s.requestCrawl(ctx); err != nil {
576
+
logger.Error("error requesting crawls", "err", err)
577
}
578
}()
579
···
591
592
logger.Info("requesting crawl with configured relays")
593
594
+
if time.Since(s.lastRequestCrawl) <= 1*time.Minute {
595
return fmt.Errorf("a crawl request has already been made within the last minute")
596
}
597
···
614
}
615
616
func (s *Server) doBackup() {
617
+
logger := s.logger.With("name", "doBackup")
618
+
619
if s.dbType == "postgres" {
620
+
logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)")
621
return
622
}
623
624
start := time.Now()
625
626
+
logger.Info("beginning backup to s3...")
627
628
var buf bytes.Buffer
629
if err := func() error {
630
+
logger.Info("reading database bytes...")
631
s.db.Lock()
632
defer s.db.Unlock()
633
···
643
644
return nil
645
}(); err != nil {
646
+
logger.Error("error backing up database", "error", err)
647
return
648
}
649
650
if err := func() error {
651
+
logger.Info("sending to s3...")
652
653
currTime := time.Now().Format("2006-01-02_15-04-05")
654
key := "cocoon-backup-" + currTime + ".db"
···
678
return fmt.Errorf("error uploading file to s3: %w", err)
679
}
680
681
+
logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds())
682
683
return nil
684
}(); err != nil {
685
+
logger.Error("error uploading database backup", "error", err)
686
return
687
}
688
···
690
}
691
692
func (s *Server) backupRoutine() {
693
+
logger := s.logger.With("name", "backupRoutine")
694
+
695
if s.s3Config == nil || !s.s3Config.BackupsEnabled {
696
return
697
}
698
699
if s.s3Config.Region == "" {
700
+
logger.Warn("no s3 region configured but backups are enabled. backups will not run.")
701
return
702
}
703
704
if s.s3Config.Bucket == "" {
705
+
logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.")
706
return
707
}
708
709
if s.s3Config.AccessKey == "" {
710
+
logger.Warn("no s3 access key configured but backups are enabled. backups will not run.")
711
return
712
}
713
714
if s.s3Config.SecretKey == "" {
715
+
logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.")
716
return
717
}
718
+1
-1
test.go
+1
-1
test.go