···1package server
23import (
004 "github.com/haileyok/cocoon/models"
5)
67-func (s *Server) getActorByHandle(handle string) (*models.Actor, error) {
8 var actor models.Actor
9- if err := s.db.First(&actor, models.Actor{Handle: handle}).Error; err != nil {
10 return nil, err
11 }
12 return &actor, nil
13}
1415-func (s *Server) getRepoByEmail(email string) (*models.Repo, error) {
16 var repo models.Repo
17- if err := s.db.First(&repo, models.Repo{Email: email}).Error; err != nil {
18 return nil, err
19 }
20 return &repo, nil
21}
2223-func (s *Server) getRepoActorByEmail(email string) (*models.RepoActor, error) {
24 var repo models.RepoActor
25- if err := s.db.Raw("SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.email= ?", nil, email).Scan(&repo).Error; err != nil {
26 return nil, err
27 }
28 return &repo, nil
29}
3031-func (s *Server) getRepoActorByDid(did string) (*models.RepoActor, error) {
32 var repo models.RepoActor
33- if err := s.db.Raw("SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.did = ?", nil, did).Scan(&repo).Error; err != nil {
34 return nil, err
35 }
36 return &repo, nil
···1package server
23import (
4+ "context"
5+6 "github.com/haileyok/cocoon/models"
7)
89+func (s *Server) getActorByHandle(ctx context.Context, handle string) (*models.Actor, error) {
10 var actor models.Actor
11+ if err := s.db.First(ctx, &actor, models.Actor{Handle: handle}).Error; err != nil {
12 return nil, err
13 }
14 return &actor, nil
15}
1617+func (s *Server) getRepoByEmail(ctx context.Context, email string) (*models.Repo, error) {
18 var repo models.Repo
19+ if err := s.db.First(ctx, &repo, models.Repo{Email: email}).Error; err != nil {
20 return nil, err
21 }
22 return &repo, nil
23}
2425+func (s *Server) getRepoActorByEmail(ctx context.Context, email string) (*models.RepoActor, error) {
26 var repo models.RepoActor
27+ if err := s.db.Raw(ctx, "SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.email= ?", nil, email).Scan(&repo).Error; err != nil {
28 return nil, err
29 }
30 return &repo, nil
31}
3233+func (s *Server) getRepoActorByDid(ctx context.Context, did string) (*models.RepoActor, error) {
34 var repo models.RepoActor
35+ if err := s.db.Raw(ctx, "SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.did = ?", nil, did).Scan(&repo).Error; err != nil {
36 return nil, err
37 }
38 return &repo, nil
+4-2
server/handle_account.go
···1213func (s *Server) handleAccount(e echo.Context) error {
14 ctx := e.Request().Context()
0015 repo, sess, err := s.getSessionRepoOrErr(e)
16 if err != nil {
17 return e.Redirect(303, "/account/signin")
···20 oldestPossibleSession := time.Now().Add(constants.ConfidentialClientSessionLifetime)
2122 var tokens []provider.OauthToken
23- if err := s.db.Raw("SELECT * FROM oauth_tokens WHERE sub = ? AND created_at < ? ORDER BY created_at ASC", nil, repo.Repo.Did, oldestPossibleSession).Scan(&tokens).Error; err != nil {
24- s.logger.Error("couldnt fetch oauth sessions for account", "did", repo.Repo.Did, "error", err)
25 sess.AddFlash("Unable to fetch sessions. See server logs for more details.", "error")
26 sess.Save(e.Request(), e.Response())
27 return e.Render(200, "account.html", map[string]any{
···1213func (s *Server) handleAccount(e echo.Context) error {
14 ctx := e.Request().Context()
15+ logger := s.logger.With("name", "handleAuth")
16+17 repo, sess, err := s.getSessionRepoOrErr(e)
18 if err != nil {
19 return e.Redirect(303, "/account/signin")
···22 oldestPossibleSession := time.Now().Add(constants.ConfidentialClientSessionLifetime)
2324 var tokens []provider.OauthToken
25+ if err := s.db.Raw(ctx, "SELECT * FROM oauth_tokens WHERE sub = ? AND created_at < ? ORDER BY created_at ASC", nil, repo.Repo.Did, oldestPossibleSession).Scan(&tokens).Error; err != nil {
26+ logger.Error("couldnt fetch oauth sessions for account", "did", repo.Repo.Did, "error", err)
27 sess.AddFlash("Unable to fetch sessions. See server logs for more details.", "error")
28 sess.Save(e.Request(), e.Response())
29 return e.Render(200, "account.html", map[string]any{
+8-5
server/handle_account_revoke.go
···5 "github.com/labstack/echo/v4"
6)
78-type AccountRevokeRequest struct {
9 Token string `form:"token"`
10}
1112func (s *Server) handleAccountRevoke(e echo.Context) error {
13- var req AccountRevokeRequest
00014 if err := e.Bind(&req); err != nil {
15- s.logger.Error("could not bind account revoke request", "error", err)
16 return helpers.ServerError(e, nil)
17 }
18···21 return e.Redirect(303, "/account/signin")
22 }
2324- if err := s.db.Exec("DELETE FROM oauth_tokens WHERE sub = ? AND token = ?", nil, repo.Repo.Did, req.Token).Error; err != nil {
25- s.logger.Error("couldnt delete oauth session for account", "did", repo.Repo.Did, "token", req.Token, "error", err)
26 sess.AddFlash("Unable to revoke session. See server logs for more details.", "error")
27 sess.Save(e.Request(), e.Response())
28 return e.Redirect(303, "/account")
···5 "github.com/labstack/echo/v4"
6)
78+type AccountRevokeInput struct {
9 Token string `form:"token"`
10}
1112func (s *Server) handleAccountRevoke(e echo.Context) error {
13+ ctx := e.Request().Context()
14+ logger := s.logger.With("name", "handleAcocuntRevoke")
15+16+ var req AccountRevokeInput
17 if err := e.Bind(&req); err != nil {
18+ logger.Error("could not bind account revoke request", "error", err)
19 return helpers.ServerError(e, nil)
20 }
21···24 return e.Redirect(303, "/account/signin")
25 }
2627+ if err := s.db.Exec(ctx, "DELETE FROM oauth_tokens WHERE sub = ? AND token = ?", nil, repo.Repo.Did, req.Token).Error; err != nil {
28+ logger.Error("couldnt delete oauth session for account", "did", repo.Repo.Did, "token", req.Token, "error", err)
29 sess.AddFlash("Unable to revoke session. See server logs for more details.", "error")
30 sess.Save(e.Request(), e.Response())
31 return e.Redirect(303, "/account")
+68-16
server/handle_account_signin.go
···23import (
4 "errors"
05 "strings"
067 "github.com/bluesky-social/indigo/atproto/syntax"
8 "github.com/gorilla/sessions"
···14 "gorm.io/gorm"
15)
1617-type OauthSigninRequest struct {
18- Username string `form:"username"`
19- Password string `form:"password"`
20- QueryParams string `form:"query_params"`
021}
2223func (s *Server) getSessionRepoOrErr(e echo.Context) (*models.RepoActor, *sessions.Session, error) {
0024 sess, err := session.Get("session", e)
25 if err != nil {
26 return nil, nil, err
···31 return nil, sess, errors.New("did was not set in session")
32 }
3334- repo, err := s.getRepoActorByDid(did)
35 if err != nil {
36 return nil, sess, err
37 }
···42func getFlashesFromSession(e echo.Context, sess *sessions.Session) map[string]any {
43 defer sess.Save(e.Request(), e.Response())
44 return map[string]any{
45- "errors": sess.Flashes("error"),
46- "successes": sess.Flashes("success"),
047 }
48}
49···60}
6162func (s *Server) handleAccountSigninPost(e echo.Context) error {
63- var req OauthSigninRequest
00064 if err := e.Bind(&req); err != nil {
65- s.logger.Error("error binding sign in req", "error", err)
66 return helpers.ServerError(e, nil)
67 }
68···76 idtype = "handle"
77 } else {
78 idtype = "email"
0000079 }
8081 // TODO: we should make this a helper since we do it for the base create_session as well
···83 var err error
84 switch idtype {
85 case "did":
86- err = s.db.Raw("SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.did = ?", nil, req.Username).Scan(&repo).Error
87 case "handle":
88- err = s.db.Raw("SELECT r.*, a.* FROM actors a LEFT JOIN repos r ON a.did = r.did WHERE a.handle = ?", nil, req.Username).Scan(&repo).Error
89 case "email":
90- err = s.db.Raw("SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.email = ?", nil, req.Username).Scan(&repo).Error
91 }
92 if err != nil {
93 if err == gorm.ErrRecordNotFound {
···96 sess.AddFlash("Something went wrong!", "error")
97 }
98 sess.Save(e.Request(), e.Response())
99- return e.Redirect(303, "/account/signin")
100 }
101102 if err := bcrypt.CompareHashAndPassword([]byte(repo.Password), []byte(req.Password)); err != nil {
···106 sess.AddFlash("Something went wrong!", "error")
107 }
108 sess.Save(e.Request(), e.Response())
109- return e.Redirect(303, "/account/signin")
00000000000000000000000000000000000000110 }
111112 sess.Options = &sessions.Options{
···122 return err
123 }
124125- if req.QueryParams != "" {
126- return e.Redirect(303, "/oauth/authorize?"+req.QueryParams)
127 } else {
128 return e.Redirect(303, "/account")
129 }
···23import (
4 "errors"
5+ "fmt"
6 "strings"
7+ "time"
89 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/gorilla/sessions"
···16 "gorm.io/gorm"
17)
1819+type OauthSigninInput struct {
20+ Username string `form:"username"`
21+ Password string `form:"password"`
22+ AuthFactorToken string `form:"token"`
23+ QueryParams string `form:"query_params"`
24}
2526func (s *Server) getSessionRepoOrErr(e echo.Context) (*models.RepoActor, *sessions.Session, error) {
27+ ctx := e.Request().Context()
28+29 sess, err := session.Get("session", e)
30 if err != nil {
31 return nil, nil, err
···36 return nil, sess, errors.New("did was not set in session")
37 }
3839+ repo, err := s.getRepoActorByDid(ctx, did)
40 if err != nil {
41 return nil, sess, err
42 }
···47func getFlashesFromSession(e echo.Context, sess *sessions.Session) map[string]any {
48 defer sess.Save(e.Request(), e.Response())
49 return map[string]any{
50+ "errors": sess.Flashes("error"),
51+ "successes": sess.Flashes("success"),
52+ "tokenrequired": sess.Flashes("tokenrequired"),
53 }
54}
55···66}
6768func (s *Server) handleAccountSigninPost(e echo.Context) error {
69+ ctx := e.Request().Context()
70+ logger := s.logger.With("name", "handleAccountSigninPost")
71+72+ var req OauthSigninInput
73 if err := e.Bind(&req); err != nil {
74+ logger.Error("error binding sign in req", "error", err)
75 return helpers.ServerError(e, nil)
76 }
77···85 idtype = "handle"
86 } else {
87 idtype = "email"
88+ }
89+90+ queryParams := ""
91+ if req.QueryParams != "" {
92+ queryParams = fmt.Sprintf("?%s", req.QueryParams)
93 }
9495 // TODO: we should make this a helper since we do it for the base create_session as well
···97 var err error
98 switch idtype {
99 case "did":
100+ err = s.db.Raw(ctx, "SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.did = ?", nil, req.Username).Scan(&repo).Error
101 case "handle":
102+ err = s.db.Raw(ctx, "SELECT r.*, a.* FROM actors a LEFT JOIN repos r ON a.did = r.did WHERE a.handle = ?", nil, req.Username).Scan(&repo).Error
103 case "email":
104+ err = s.db.Raw(ctx, "SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.email = ?", nil, req.Username).Scan(&repo).Error
105 }
106 if err != nil {
107 if err == gorm.ErrRecordNotFound {
···110 sess.AddFlash("Something went wrong!", "error")
111 }
112 sess.Save(e.Request(), e.Response())
113+ return e.Redirect(303, "/account/signin"+queryParams)
114 }
115116 if err := bcrypt.CompareHashAndPassword([]byte(repo.Password), []byte(req.Password)); err != nil {
···120 sess.AddFlash("Something went wrong!", "error")
121 }
122 sess.Save(e.Request(), e.Response())
123+ return e.Redirect(303, "/account/signin"+queryParams)
124+ }
125+126+ // if repo requires 2FA token and one hasn't been provided, return error prompting for one
127+ if repo.TwoFactorType != models.TwoFactorTypeNone && req.AuthFactorToken == "" {
128+ err = s.createAndSendTwoFactorCode(ctx, repo)
129+ if err != nil {
130+ sess.AddFlash("Something went wrong!", "error")
131+ sess.Save(e.Request(), e.Response())
132+ return e.Redirect(303, "/account/signin"+queryParams)
133+ }
134+135+ sess.AddFlash("requires 2FA token", "tokenrequired")
136+ sess.Save(e.Request(), e.Response())
137+ return e.Redirect(303, "/account/signin"+queryParams)
138+ }
139+140+ // if 2FAis required, now check that the one provided is valid
141+ if repo.TwoFactorType != models.TwoFactorTypeNone {
142+ if repo.TwoFactorCode == nil || repo.TwoFactorCodeExpiresAt == nil {
143+ err = s.createAndSendTwoFactorCode(ctx, repo)
144+ if err != nil {
145+ sess.AddFlash("Something went wrong!", "error")
146+ sess.Save(e.Request(), e.Response())
147+ return e.Redirect(303, "/account/signin"+queryParams)
148+ }
149+150+ sess.AddFlash("requires 2FA token", "tokenrequired")
151+ sess.Save(e.Request(), e.Response())
152+ return e.Redirect(303, "/account/signin"+queryParams)
153+ }
154+155+ if *repo.TwoFactorCode != req.AuthFactorToken {
156+ return helpers.InvalidTokenError(e)
157+ }
158+159+ if time.Now().UTC().After(*repo.TwoFactorCodeExpiresAt) {
160+ return helpers.ExpiredTokenError(e)
161+ }
162 }
163164 sess.Options = &sessions.Options{
···174 return err
175 }
176177+ if queryParams != "" {
178+ return e.Redirect(303, "/oauth/authorize"+queryParams)
179 } else {
180 return e.Redirect(303, "/account")
181 }
+3-1
server/handle_actor_put_preferences.go
···10// This is kinda lame. Not great to implement app.bsky in the pds, but alas
1112func (s *Server) handleActorPutPreferences(e echo.Context) error {
0013 repo := e.Get("repo").(*models.RepoActor)
1415 var prefs map[string]any
···22 return err
23 }
2425- if err := s.db.Exec("UPDATE repos SET preferences = ? WHERE did = ?", nil, b, repo.Repo.Did).Error; err != nil {
26 return err
27 }
28
···10// This is kinda lame. Not great to implement app.bsky in the pds, but alas
1112func (s *Server) handleActorPutPreferences(e echo.Context) error {
13+ ctx := e.Request().Context()
14+15 repo := e.Get("repo").(*models.RepoActor)
1617 var prefs map[string]any
···24 return err
25 }
2627+ if err := s.db.Exec(ctx, "UPDATE repos SET preferences = ? WHERE did = ?", nil, b, repo.Repo.Did).Error; err != nil {
28 return err
29 }
30
···20}
2122func (s *Server) handleDescribeRepo(e echo.Context) error {
00023 did := e.QueryParam("repo")
24- repo, err := s.getRepoActorByDid(did)
25 if err != nil {
26 if err == gorm.ErrRecordNotFound {
27 return helpers.InputError(e, to.StringPtr("RepoNotFound"))
28 }
2930- s.logger.Error("error looking up repo", "error", err)
31 return helpers.ServerError(e, nil)
32 }
33···3536 diddoc, err := s.passport.FetchDoc(e.Request().Context(), repo.Repo.Did)
37 if err != nil {
38- s.logger.Error("error fetching diddoc", "error", err)
39 return helpers.ServerError(e, nil)
40 }
41···64 }
6566 var records []models.Record
67- if err := s.db.Raw("SELECT DISTINCT(nsid) FROM records WHERE did = ?", nil, repo.Repo.Did).Scan(&records).Error; err != nil {
68- s.logger.Error("error getting collections", "error", err)
69 return helpers.ServerError(e, nil)
70 }
71
···20}
2122func (s *Server) handleDescribeRepo(e echo.Context) error {
23+ ctx := e.Request().Context()
24+ logger := s.logger.With("name", "handleDescribeRepo")
25+26 did := e.QueryParam("repo")
27+ repo, err := s.getRepoActorByDid(ctx, did)
28 if err != nil {
29 if err == gorm.ErrRecordNotFound {
30 return helpers.InputError(e, to.StringPtr("RepoNotFound"))
31 }
3233+ logger.Error("error looking up repo", "error", err)
34 return helpers.ServerError(e, nil)
35 }
36···3839 diddoc, err := s.passport.FetchDoc(e.Request().Context(), repo.Repo.Did)
40 if err != nil {
41+ logger.Error("error fetching diddoc", "error", err)
42 return helpers.ServerError(e, nil)
43 }
44···67 }
6869 var records []models.Record
70+ if err := s.db.Raw(ctx, "SELECT DISTINCT(nsid) FROM records WHERE did = ?", nil, repo.Repo.Did).Scan(&records).Error; err != nil {
71+ logger.Error("error getting collections", "error", err)
72 return helpers.ServerError(e, nil)
73 }
74
+3-1
server/handle_repo_get_record.go
···14}
1516func (s *Server) handleRepoGetRecord(e echo.Context) error {
0017 repo := e.QueryParam("repo")
18 collection := e.QueryParam("collection")
19 rkey := e.QueryParam("rkey")
···32 }
3334 var record models.Record
35- if err := s.db.Raw("SELECT * FROM records WHERE did = ? AND nsid = ? AND rkey = ?"+cidquery, nil, params...).Scan(&record).Error; err != nil {
36 // TODO: handle error nicely
37 return err
38 }
···14}
1516func (s *Server) handleRepoGetRecord(e echo.Context) error {
17+ ctx := e.Request().Context()
18+19 repo := e.QueryParam("repo")
20 collection := e.QueryParam("collection")
21 rkey := e.QueryParam("rkey")
···34 }
3536 var record models.Record
37+ if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ? AND nsid = ? AND rkey = ?"+cidquery, nil, params...).Scan(&record).Error; err != nil {
38 // TODO: handle error nicely
39 return err
40 }
+6-3
server/handle_repo_list_missing_blobs.go
···22}
2324func (s *Server) handleListMissingBlobs(e echo.Context) error {
00025 urepo := e.Get("repo").(*models.RepoActor)
2627 limitStr := e.QueryParam("limit")
···35 }
3637 var records []models.Record
38- if err := s.db.Raw("SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil {
39- s.logger.Error("failed to get records for listMissingBlobs", "error", err)
40 return helpers.ServerError(e, nil)
41 }
42···69 }
7071 var count int64
72- if err := s.db.Raw("SELECT COUNT(*) FROM blobs WHERE did = ? AND cid = ?", nil, urepo.Repo.Did, ref.cid.Bytes()).Scan(&count).Error; err != nil {
73 continue
74 }
75
···22}
2324func (s *Server) handleListMissingBlobs(e echo.Context) error {
25+ ctx := e.Request().Context()
26+ logger := s.logger.With("name", "handleListMissingBlos")
27+28 urepo := e.Get("repo").(*models.RepoActor)
2930 limitStr := e.QueryParam("limit")
···38 }
3940 var records []models.Record
41+ if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil {
42+ logger.Error("failed to get records for listMissingBlobs", "error", err)
43 return helpers.ServerError(e, nil)
44 }
45···72 }
7374 var count int64
75+ if err := s.db.Raw(ctx, "SELECT COUNT(*) FROM blobs WHERE did = ? AND cid = ?", nil, urepo.Repo.Did, ref.cid.Bytes()).Scan(&count).Error; err != nil {
76 continue
77 }
78
+7-4
server/handle_repo_list_records.go
···46}
4748func (s *Server) handleListRecords(e echo.Context) error {
00049 var req ComAtprotoRepoListRecordsRequest
50 if err := e.Bind(&req); err != nil {
51- s.logger.Error("could not bind list records request", "error", err)
52 return helpers.ServerError(e, nil)
53 }
54···7879 did := req.Repo
80 if _, err := syntax.ParseDID(did); err != nil {
81- actor, err := s.getActorByHandle(req.Repo)
82 if err != nil {
83 return helpers.InputError(e, to.StringPtr("RepoNotFound"))
84 }
···93 params = append(params, limit)
9495 var records []models.Record
96- if err := s.db.Raw("SELECT * FROM records WHERE did = ? AND nsid = ? "+cursorquery+" ORDER BY created_at "+sort+" limit ?", nil, params...).Scan(&records).Error; err != nil {
97- s.logger.Error("error getting records", "error", err)
98 return helpers.ServerError(e, nil)
99 }
100
···46}
4748func (s *Server) handleListRecords(e echo.Context) error {
49+ ctx := e.Request().Context()
50+ logger := s.logger.With("name", "handleListRecords")
51+52 var req ComAtprotoRepoListRecordsRequest
53 if err := e.Bind(&req); err != nil {
54+ logger.Error("could not bind list records request", "error", err)
55 return helpers.ServerError(e, nil)
56 }
57···8182 did := req.Repo
83 if _, err := syntax.ParseDID(did); err != nil {
84+ actor, err := s.getActorByHandle(ctx, req.Repo)
85 if err != nil {
86 return helpers.InputError(e, to.StringPtr("RepoNotFound"))
87 }
···96 params = append(params, limit)
9798 var records []models.Record
99+ if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ? AND nsid = ? "+cursorquery+" ORDER BY created_at "+sort+" limit ?", nil, params...).Scan(&records).Error; err != nil {
100+ logger.Error("error getting records", "error", err)
101 return helpers.ServerError(e, nil)
102 }
103
+3-1
server/handle_repo_list_repos.go
···2122// TODO: paginate this bitch
23func (s *Server) handleListRepos(e echo.Context) error {
0024 var repos []models.Repo
25- if err := s.db.Raw("SELECT * FROM repos ORDER BY created_at DESC LIMIT 500", nil).Scan(&repos).Error; err != nil {
26 return err
27 }
28
···2122// TODO: paginate this bitch
23func (s *Server) handleListRepos(e echo.Context) error {
24+ ctx := e.Request().Context()
25+26 var repos []models.Repo
27+ if err := s.db.Raw(ctx, "SELECT * FROM repos ORDER BY created_at DESC LIMIT 500", nil).Scan(&repos).Error; err != nil {
28 return err
29 }
30
···12}
1314func (s *Server) handleSyncGetLatestCommit(e echo.Context) error {
0015 did := e.QueryParam("did")
16 if did == "" {
17 return helpers.InputError(e, nil)
18 }
1920- urepo, err := s.getRepoActorByDid(did)
21 if err != nil {
22 return err
23 }
···12}
1314func (s *Server) handleSyncGetLatestCommit(e echo.Context) error {
15+ ctx := e.Request().Context()
16+17 did := e.QueryParam("did")
18 if did == "" {
19 return helpers.InputError(e, nil)
20 }
2122+ urepo, err := s.getRepoActorByDid(ctx, did)
23 if err != nil {
24 return err
25 }
+5-4
server/handle_sync_get_record.go
···1415func (s *Server) handleSyncGetRecord(e echo.Context) error {
16 ctx := e.Request().Context()
01718 did := e.QueryParam("did")
19 collection := e.QueryParam("collection")
20 rkey := e.QueryParam("rkey")
2122 var urepo models.Repo
23- if err := s.db.Raw("SELECT * FROM repos WHERE did = ?", nil, did).Scan(&urepo).Error; err != nil {
24- s.logger.Error("error getting repo", "error", err)
25 return helpers.ServerError(e, nil)
26 }
27···38 })
3940 if _, err := carstore.LdWrite(buf, hb); err != nil {
41- s.logger.Error("error writing to car", "error", err)
42 return helpers.ServerError(e, nil)
43 }
4445 for _, blk := range blocks {
46 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
47- s.logger.Error("error writing to car", "error", err)
48 return helpers.ServerError(e, nil)
49 }
50 }
···1415func (s *Server) handleSyncGetRecord(e echo.Context) error {
16 ctx := e.Request().Context()
17+ logger := s.logger.With("name", "handleSyncGetRecord")
1819 did := e.QueryParam("did")
20 collection := e.QueryParam("collection")
21 rkey := e.QueryParam("rkey")
2223 var urepo models.Repo
24+ if err := s.db.Raw(ctx, "SELECT * FROM repos WHERE did = ?", nil, did).Scan(&urepo).Error; err != nil {
25+ logger.Error("error getting repo", "error", err)
26 return helpers.ServerError(e, nil)
27 }
28···39 })
4041 if _, err := carstore.LdWrite(buf, hb); err != nil {
42+ logger.Error("error writing to car", "error", err)
43 return helpers.ServerError(e, nil)
44 }
4546 for _, blk := range blocks {
47 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
48+ logger.Error("error writing to car", "error", err)
49 return helpers.ServerError(e, nil)
50 }
51 }
+6-3
server/handle_sync_get_repo.go
···13)
1415func (s *Server) handleSyncGetRepo(e echo.Context) error {
00016 did := e.QueryParam("did")
17 if did == "" {
18 return helpers.InputError(e, nil)
19 }
2021- urepo, err := s.getRepoActorByDid(did)
22 if err != nil {
23 return err
24 }
···36 buf := new(bytes.Buffer)
3738 if _, err := carstore.LdWrite(buf, hb); err != nil {
39- s.logger.Error("error writing to car", "error", err)
40 return helpers.ServerError(e, nil)
41 }
4243 var blocks []models.Block
44- if err := s.db.Raw("SELECT * FROM blocks WHERE did = ? ORDER BY rev ASC", nil, urepo.Repo.Did).Scan(&blocks).Error; err != nil {
45 return err
46 }
47
···13)
1415func (s *Server) handleSyncGetRepo(e echo.Context) error {
16+ ctx := e.Request().Context()
17+ logger := s.logger.With("name", "handleSyncGetRepo")
18+19 did := e.QueryParam("did")
20 if did == "" {
21 return helpers.InputError(e, nil)
22 }
2324+ urepo, err := s.getRepoActorByDid(ctx, did)
25 if err != nil {
26 return err
27 }
···39 buf := new(bytes.Buffer)
4041 if _, err := carstore.LdWrite(buf, hb); err != nil {
42+ logger.Error("error writing to car", "error", err)
43 return helpers.ServerError(e, nil)
44 }
4546 var blocks []models.Block
47+ if err := s.db.Raw(ctx, "SELECT * FROM blocks WHERE did = ? ORDER BY rev ASC", nil, urepo.Repo.Did).Scan(&blocks).Error; err != nil {
48 return err
49 }
50
+3-1
server/handle_sync_get_repo_status.go
···1415// TODO: make this actually do the right thing
16func (s *Server) handleSyncGetRepoStatus(e echo.Context) error {
0017 did := e.QueryParam("did")
18 if did == "" {
19 return helpers.InputError(e, nil)
20 }
2122- urepo, err := s.getRepoActorByDid(did)
23 if err != nil {
24 return err
25 }
···1415// TODO: make this actually do the right thing
16func (s *Server) handleSyncGetRepoStatus(e echo.Context) error {
17+ ctx := e.Request().Context()
18+19 did := e.QueryParam("did")
20 if did == "" {
21 return helpers.InputError(e, nil)
22 }
2324+ urepo, err := s.getRepoActorByDid(ctx, did)
25 if err != nil {
26 return err
27 }
+8-5
server/handle_sync_list_blobs.go
···14}
1516func (s *Server) handleSyncListBlobs(e echo.Context) error {
00017 did := e.QueryParam("did")
18 if did == "" {
19 return helpers.InputError(e, nil)
···35 }
36 params = append(params, limit)
3738- urepo, err := s.getRepoActorByDid(did)
39 if err != nil {
40- s.logger.Error("could not find user for requested blobs", "error", err)
41 return helpers.InputError(e, nil)
42 }
43···49 }
5051 var blobs []models.Blob
52- if err := s.db.Raw("SELECT * FROM blobs WHERE did = ? "+cursorquery+" ORDER BY created_at DESC LIMIT ?", nil, params...).Scan(&blobs).Error; err != nil {
53- s.logger.Error("error getting records", "error", err)
54 return helpers.ServerError(e, nil)
55 }
56···58 for _, b := range blobs {
59 c, err := cid.Cast(b.Cid)
60 if err != nil {
61- s.logger.Error("error casting cid", "error", err)
62 return helpers.ServerError(e, nil)
63 }
64 cstrs = append(cstrs, c.String())
···14}
1516func (s *Server) handleSyncListBlobs(e echo.Context) error {
17+ ctx := e.Request().Context()
18+ logger := s.logger.With("name", "handleSyncListBlobs")
19+20 did := e.QueryParam("did")
21 if did == "" {
22 return helpers.InputError(e, nil)
···38 }
39 params = append(params, limit)
4041+ urepo, err := s.getRepoActorByDid(ctx, did)
42 if err != nil {
43+ logger.Error("could not find user for requested blobs", "error", err)
44 return helpers.InputError(e, nil)
45 }
46···52 }
5354 var blobs []models.Blob
55+ if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? "+cursorquery+" ORDER BY created_at DESC LIMIT ?", nil, params...).Scan(&blobs).Error; err != nil {
56+ logger.Error("error getting records", "error", err)
57 return helpers.ServerError(e, nil)
58 }
59···61 for _, b := range blobs {
62 c, err := cid.Cast(b.Cid)
63 if err != nil {
64+ logger.Error("error casting cid", "error", err)
65 return helpers.ServerError(e, nil)
66 }
67 cstrs = append(cstrs, c.String())
+82-50
server/handle_sync_subscribe_repos.go
···7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/lex/util"
9 "github.com/btcsuite/websocket"
010 "github.com/labstack/echo/v4"
11)
1213func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
14- ctx := e.Request().Context()
0015 logger := s.logger.With("component", "subscribe-repos-websocket")
1617 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
···24 logger = logger.With("ident", ident)
25 logger.Info("new connection established")
2627- evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
0000028 return true
29 }, nil)
30 if err != nil {
31 return err
32 }
33- defer cancel()
00000000000000003435 header := events.EventHeader{Op: events.EvtKindMessage}
36 for evt := range evts {
37- wc, err := conn.NextWriter(websocket.BinaryMessage)
38- if err != nil {
39- logger.Error("error writing message to relay", "err", err)
40- break
41- }
4243- if ctx.Err() != nil {
44- logger.Error("context error", "err", err)
45- break
46- }
04748- var obj util.CBOR
49- switch {
50- case evt.Error != nil:
51- header.Op = events.EvtKindErrorFrame
52- obj = evt.Error
53- case evt.RepoCommit != nil:
54- header.MsgType = "#commit"
55- obj = evt.RepoCommit
56- case evt.RepoIdentity != nil:
57- header.MsgType = "#identity"
58- obj = evt.RepoIdentity
59- case evt.RepoAccount != nil:
60- header.MsgType = "#account"
61- obj = evt.RepoAccount
62- case evt.RepoInfo != nil:
63- header.MsgType = "#info"
64- obj = evt.RepoInfo
65- default:
66- logger.Warn("unrecognized event kind")
67- return nil
68- }
6970- if err := header.MarshalCBOR(wc); err != nil {
71- logger.Error("failed to write header to relay", "err", err)
72- break
73- }
000000000000000007475- if err := obj.MarshalCBOR(wc); err != nil {
76- logger.Error("failed to write event to relay", "err", err)
77- break
78- }
000007980- if err := wc.Close(); err != nil {
81- logger.Error("failed to flush-close our event write", "err", err)
82- break
83- }
084 }
8586 // we should tell the relay to request a new crawl at this point if we got disconnected
87 // use a new context since the old one might be cancelled at this point
88- ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
89- defer cancel()
90- if err := s.requestCrawl(ctx); err != nil {
91- logger.Error("error requesting crawls", "err", err)
92- }
009394 return nil
95}
···7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/lex/util"
9 "github.com/btcsuite/websocket"
10+ "github.com/haileyok/cocoon/metrics"
11 "github.com/labstack/echo/v4"
12)
1314func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
15+ ctx, cancel := context.WithCancel(e.Request().Context())
16+ defer cancel()
17+18 logger := s.logger.With("component", "subscribe-repos-websocket")
1920 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
···27 logger = logger.With("ident", ident)
28 logger.Info("new connection established")
2930+ metrics.RelaysConnected.WithLabelValues(ident).Inc()
31+ defer func() {
32+ metrics.RelaysConnected.WithLabelValues(ident).Dec()
33+ }()
34+35+ evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
36 return true
37 }, nil)
38 if err != nil {
39 return err
40 }
41+ defer evtManCancel()
42+43+ // drop the connection whenever a subscriber disconnects from the socket, we should get errors
44+ go func() {
45+ for {
46+ select {
47+ case <-ctx.Done():
48+ return
49+ default:
50+ if _, _, err := conn.ReadMessage(); err != nil {
51+ logger.Warn("websocket error", "err", err)
52+ cancel()
53+ return
54+ }
55+ }
56+ }
57+ }()
5859 header := events.EventHeader{Op: events.EvtKindMessage}
60 for evt := range evts {
61+ func() {
62+ defer func() {
63+ metrics.RelaySends.WithLabelValues(ident, header.MsgType).Inc()
64+ }()
06566+ wc, err := conn.NextWriter(websocket.BinaryMessage)
67+ if err != nil {
68+ logger.Error("error writing message to relay", "err", err)
69+ return
70+ }
7172+ if ctx.Err() != nil {
73+ logger.Error("context error", "err", err)
74+ return
75+ }
000000000000000007677+ var obj util.CBOR
78+ switch {
79+ case evt.Error != nil:
80+ header.Op = events.EvtKindErrorFrame
81+ obj = evt.Error
82+ case evt.RepoCommit != nil:
83+ header.MsgType = "#commit"
84+ obj = evt.RepoCommit
85+ case evt.RepoIdentity != nil:
86+ header.MsgType = "#identity"
87+ obj = evt.RepoIdentity
88+ case evt.RepoAccount != nil:
89+ header.MsgType = "#account"
90+ obj = evt.RepoAccount
91+ case evt.RepoInfo != nil:
92+ header.MsgType = "#info"
93+ obj = evt.RepoInfo
94+ default:
95+ logger.Warn("unrecognized event kind")
96+ return
97+ }
9899+ if err := header.MarshalCBOR(wc); err != nil {
100+ logger.Error("failed to write header to relay", "err", err)
101+ return
102+ }
103+104+ if err := obj.MarshalCBOR(wc); err != nil {
105+ logger.Error("failed to write event to relay", "err", err)
106+ return
107+ }
108109+ if err := wc.Close(); err != nil {
110+ logger.Error("failed to flush-close our event write", "err", err)
111+ return
112+ }
113+ }()
114 }
115116 // we should tell the relay to request a new crawl at this point if we got disconnected
117 // use a new context since the old one might be cancelled at this point
118+ go func() {
119+ retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
120+ defer retryCancel()
121+ if err := s.requestCrawl(retryCtx); err != nil {
122+ logger.Error("error requesting crawls", "err", err)
123+ }
124+ }()
125126 return nil
127}
+5-2
server/handle_well_known.go
···67}
6869func (s *Server) handleAtprotoDid(e echo.Context) error {
00070 host := e.Request().Host
71 if host == "" {
72 return helpers.InputError(e, to.StringPtr("Invalid handle."))
···84 return e.NoContent(404)
85 }
8687- actor, err := s.getActorByHandle(host)
88 if err != nil {
89 if err == gorm.ErrRecordNotFound {
90 return e.NoContent(404)
91 }
92- s.logger.Error("error looking up actor by handle", "error", err)
93 return helpers.ServerError(e, nil)
94 }
95
···17 lexutil "github.com/bluesky-social/indigo/lex/util"
18 "github.com/bluesky-social/indigo/repo"
19 "github.com/haileyok/cocoon/internal/db"
020 "github.com/haileyok/cocoon/models"
21 "github.com/haileyok/cocoon/recording_blockstore"
22 blocks "github.com/ipfs/go-block-format"
···181 case OpTypeDelete:
182 // try to find the old record in the database
183 var old models.Record
184- if err := rm.db.Raw("SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil {
185 return nil, err
186 }
187···251 return nil, err
252 }
253000000254 // create a buffer for dumping our new cbor into
255 buf := new(bytes.Buffer)
256···323 var cids []cid.Cid
324 // whenever there is cid present, we know it's a create (dumb)
325 if entry.Cid != "" {
326- if err := rm.s.db.Create(&entry, []clause.Expression{clause.OnConflict{
327 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
328 UpdateAll: true,
329 }}).Error; err != nil {
···331 }
332333 // increment the given blob refs, yay
334- cids, err = rm.incrementBlobRefs(urepo, entry.Value)
335 if err != nil {
336 return nil, err
337 }
···339 // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey
340 // is did + collection + rkey. i still really want to separate that out, or use a different type to make
341 // this less confusing/easy to read. alas, its 2 am and yea no
342- if err := rm.s.db.Delete(&entry, nil).Error; err != nil {
343 return nil, err
344 }
345346 // TODO:
347- cids, err = rm.decrementBlobRefs(urepo, entry.Value)
348 if err != nil {
349 return nil, err
350 }
···411 return c, bs.GetReadLog(), nil
412}
413414-func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
415 cids, err := getBlobCidsFromCbor(cbor)
416 if err != nil {
417 return nil, err
418 }
419420 for _, c := range cids {
421- if err := rm.db.Exec("UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", nil, urepo.Did, c.Bytes()).Error; err != nil {
422 return nil, err
423 }
424 }
···426 return cids, nil
427}
428429-func (rm *RepoMan) decrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
430 cids, err := getBlobCidsFromCbor(cbor)
431 if err != nil {
432 return nil, err
···437 ID uint
438 Count int
439 }
440- if err := rm.db.Raw("UPDATE blobs SET ref_count = ref_count - 1 WHERE did = ? AND cid = ? RETURNING id, ref_count", nil, urepo.Did, c.Bytes()).Scan(&res).Error; err != nil {
441 return nil, err
442 }
443444 // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what
445 // storage it is in, and clean up s3!!!!
446 if res.Count == 0 {
447- if err := rm.db.Exec("DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil {
448 return nil, err
449 }
450- if err := rm.db.Exec("DELETE FROM blob_parts WHERE blob_id = ?", nil, res.ID).Error; err != nil {
451 return nil, err
452 }
453 }
···17 lexutil "github.com/bluesky-social/indigo/lex/util"
18 "github.com/bluesky-social/indigo/repo"
19 "github.com/haileyok/cocoon/internal/db"
20+ "github.com/haileyok/cocoon/metrics"
21 "github.com/haileyok/cocoon/models"
22 "github.com/haileyok/cocoon/recording_blockstore"
23 blocks "github.com/ipfs/go-block-format"
···182 case OpTypeDelete:
183 // try to find the old record in the database
184 var old models.Record
185+ if err := rm.db.Raw(ctx, "SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil {
186 return nil, err
187 }
188···252 return nil, err
253 }
254255+ for _, result := range results {
256+ if result.Type != nil {
257+ metrics.RepoOperations.WithLabelValues(*result.Type).Inc()
258+ }
259+ }
260+261 // create a buffer for dumping our new cbor into
262 buf := new(bytes.Buffer)
263···330 var cids []cid.Cid
331 // whenever there is cid present, we know it's a create (dumb)
332 if entry.Cid != "" {
333+ if err := rm.s.db.Create(ctx, &entry, []clause.Expression{clause.OnConflict{
334 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
335 UpdateAll: true,
336 }}).Error; err != nil {
···338 }
339340 // increment the given blob refs, yay
341+ cids, err = rm.incrementBlobRefs(ctx, urepo, entry.Value)
342 if err != nil {
343 return nil, err
344 }
···346 // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey
347 // is did + collection + rkey. i still really want to separate that out, or use a different type to make
348 // this less confusing/easy to read. alas, its 2 am and yea no
349+ if err := rm.s.db.Delete(ctx, &entry, nil).Error; err != nil {
350 return nil, err
351 }
352353 // TODO:
354+ cids, err = rm.decrementBlobRefs(ctx, urepo, entry.Value)
355 if err != nil {
356 return nil, err
357 }
···418 return c, bs.GetReadLog(), nil
419}
420421+func (rm *RepoMan) incrementBlobRefs(ctx context.Context, urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
422 cids, err := getBlobCidsFromCbor(cbor)
423 if err != nil {
424 return nil, err
425 }
426427 for _, c := range cids {
428+ if err := rm.db.Exec(ctx, "UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", nil, urepo.Did, c.Bytes()).Error; err != nil {
429 return nil, err
430 }
431 }
···433 return cids, nil
434}
435436+func (rm *RepoMan) decrementBlobRefs(ctx context.Context, urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
437 cids, err := getBlobCidsFromCbor(cbor)
438 if err != nil {
439 return nil, err
···444 ID uint
445 Count int
446 }
447+ if err := rm.db.Raw(ctx, "UPDATE blobs SET ref_count = ref_count - 1 WHERE did = ? AND cid = ? RETURNING id, ref_count", nil, urepo.Did, c.Bytes()).Scan(&res).Error; err != nil {
448 return nil, err
449 }
450451 // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what
452 // storage it is in, and clean up s3!!!!
453 if res.Count == 0 {
454+ if err := rm.db.Exec(ctx, "DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil {
455 return nil, err
456 }
457+ if err := rm.db.Exec(ctx, "DELETE FROM blob_parts WHERE blob_id = ?", nil, res.ID).Error; err != nil {
458 return nil, err
459 }
460 }
+75-74
server/server.go
···39 "github.com/haileyok/cocoon/oauth/provider"
40 "github.com/haileyok/cocoon/plc"
41 "github.com/ipfs/go-cid"
042 echo_session "github.com/labstack/echo-contrib/session"
43 "github.com/labstack/echo/v4"
44 "github.com/labstack/echo/v4/middleware"
···89}
9091type Args struct {
0092 Addr string
93 DbName string
94 DbType string
95 DatabaseURL string
96- Logger *slog.Logger
97 Version string
98 Did string
99 Hostname string
···209}
210211func New(args *Args) (*Server, error) {
000000212 if args.Addr == "" {
213 return nil, fmt.Errorf("addr must be set")
214 }
···237 return nil, fmt.Errorf("admin password must be set")
238 }
239240- if args.Logger == nil {
241- args.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}))
242- }
243-244 if args.SessionSecret == "" {
245 panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ")
246 }
···248 e := echo.New()
249250 e.Pre(middleware.RemoveTrailingSlash())
251- e.Pre(slogecho.New(args.Logger))
252 e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret))))
0253 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
254 AllowOrigins: []string{"*"},
255 AllowHeaders: []string{"*"},
···311 if err != nil {
312 return nil, fmt.Errorf("failed to connect to postgres: %w", err)
313 }
314- args.Logger.Info("connected to PostgreSQL database")
315 default:
316 gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
317 if err != nil {
318 return nil, fmt.Errorf("failed to open sqlite database: %w", err)
319 }
320- args.Logger.Info("connected to SQLite database", "path", args.DbName)
000321 }
322 dbw := db.NewDB(gdb)
323···360 var nonceSecret []byte
361 maybeSecret, err := os.ReadFile("nonce.secret")
362 if err != nil && !os.IsNotExist(err) {
363- args.Logger.Error("error attempting to read nonce secret", "error", err)
364 } else {
365 nonceSecret = maybeSecret
366 }
···398 Hostname: args.Hostname,
399 ClientManagerArgs: client.ManagerArgs{
400 Cli: oauthCli,
401- Logger: args.Logger,
402 },
403 DpopManagerArgs: dpop.ManagerArgs{
404 NonceSecret: nonceSecret,
405 NonceRotationInterval: constants.NonceMaxRotationInterval / 3,
406 OnNonceSecretCreated: func(newNonce []byte) {
407 if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil {
408- args.Logger.Error("error writing new nonce secret", "error", err)
409 }
410 },
411- Logger: args.Logger,
412 Hostname: args.Hostname,
413 },
414 }),
···524 s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
525 s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
526 s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
527-528 // admin routes
529 s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware)
530 s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware)
···535}
536537func (s *Server) Serve(ctx context.Context) error {
00538 s.addRoutes()
539540- s.logger.Info("migrating...")
541542 s.db.AutoMigrate(
543 &models.Actor{},
···554 &provider.OauthAuthorizationRequest{},
555 )
556557- s.logger.Info("starting cocoon")
558559 go func() {
560 if err := s.httpd.ListenAndServe(); err != nil {
···566567 go func() {
568 if err := s.requestCrawl(ctx); err != nil {
569- s.logger.Error("error requesting crawls", "err", err)
570 }
571 }()
572···584585 logger.Info("requesting crawl with configured relays")
586587- if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute {
588 return fmt.Errorf("a crawl request has already been made within the last minute")
589 }
590···607}
608609func (s *Server) doBackup() {
00610 if s.dbType == "postgres" {
611- s.logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)")
612 return
613 }
614615 start := time.Now()
616617- s.logger.Info("beginning backup to s3...")
618619- var buf bytes.Buffer
620- if err := func() error {
621- s.logger.Info("reading database bytes...")
622- s.db.Lock()
623- defer s.db.Unlock()
624625- sf, err := os.Open(s.dbName)
626- if err != nil {
627- return fmt.Errorf("error opening database for backup: %w", err)
628- }
629- defer sf.Close()
630-631- if _, err := io.Copy(&buf, sf); err != nil {
632- return fmt.Errorf("error reading bytes of backup db: %w", err)
633- }
634635- return nil
636- }(); err != nil {
637- s.logger.Error("error backing up database", "error", err)
638 return
639 }
640641- if err := func() error {
642- s.logger.Info("sending to s3...")
643644- currTime := time.Now().Format("2006-01-02_15-04-05")
645- key := "cocoon-backup-" + currTime + ".db"
646647- config := &aws.Config{
648- Region: aws.String(s.s3Config.Region),
649- Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
650- }
651652- if s.s3Config.Endpoint != "" {
653- config.Endpoint = aws.String(s.s3Config.Endpoint)
654- config.S3ForcePathStyle = aws.Bool(true)
655- }
656-657- sess, err := session.NewSession(config)
658- if err != nil {
659- return err
660- }
661-662- svc := s3.New(sess)
663664- if _, err := svc.PutObject(&s3.PutObjectInput{
665- Bucket: aws.String(s.s3Config.Bucket),
666- Key: aws.String(key),
667- Body: bytes.NewReader(buf.Bytes()),
668- }); err != nil {
669- return fmt.Errorf("error uploading file to s3: %w", err)
670- }
671672- s.logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds())
673674- return nil
675- }(); err != nil {
676- s.logger.Error("error uploading database backup", "error", err)
000677 return
678 }
679680- os.WriteFile("last-backup.txt", []byte(time.Now().String()), 0644)
00681}
682683func (s *Server) backupRoutine() {
00684 if s.s3Config == nil || !s.s3Config.BackupsEnabled {
685 return
686 }
687688 if s.s3Config.Region == "" {
689- s.logger.Warn("no s3 region configured but backups are enabled. backups will not run.")
690 return
691 }
692693 if s.s3Config.Bucket == "" {
694- s.logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.")
695 return
696 }
697698 if s.s3Config.AccessKey == "" {
699- s.logger.Warn("no s3 access key configured but backups are enabled. backups will not run.")
700 return
701 }
702703 if s.s3Config.SecretKey == "" {
704- s.logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.")
705 return
706 }
707···710 if err != nil {
711 shouldBackupNow = true
712 } else {
713- lastBackup, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", string(lastBackupStr))
714 if err != nil {
715 shouldBackupNow = true
716- } else if time.Now().Sub(lastBackup).Seconds() > 3600 {
717 shouldBackupNow = true
718 }
719 }
···729}
730731func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error {
732- if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil {
733 return err
734 }
735
···39 "github.com/haileyok/cocoon/oauth/provider"
40 "github.com/haileyok/cocoon/plc"
41 "github.com/ipfs/go-cid"
42+ "github.com/labstack/echo-contrib/echoprometheus"
43 echo_session "github.com/labstack/echo-contrib/session"
44 "github.com/labstack/echo/v4"
45 "github.com/labstack/echo/v4/middleware"
···90}
9192type Args struct {
93+ Logger *slog.Logger
94+95 Addr string
96 DbName string
97 DbType string
98 DatabaseURL string
099 Version string
100 Did string
101 Hostname string
···211}
212213func New(args *Args) (*Server, error) {
214+ if args.Logger == nil {
215+ args.Logger = slog.Default()
216+ }
217+218+ logger := args.Logger.With("name", "New")
219+220 if args.Addr == "" {
221 return nil, fmt.Errorf("addr must be set")
222 }
···245 return nil, fmt.Errorf("admin password must be set")
246 }
2470000248 if args.SessionSecret == "" {
249 panic("SESSION SECRET WAS NOT SET. THIS IS REQUIRED. ")
250 }
···252 e := echo.New()
253254 e.Pre(middleware.RemoveTrailingSlash())
255+ e.Pre(slogecho.New(args.Logger.With("component", "slogecho")))
256 e.Use(echo_session.Middleware(sessions.NewCookieStore([]byte(args.SessionSecret))))
257+ e.Use(echoprometheus.NewMiddleware("cocoon"))
258 e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
259 AllowOrigins: []string{"*"},
260 AllowHeaders: []string{"*"},
···316 if err != nil {
317 return nil, fmt.Errorf("failed to connect to postgres: %w", err)
318 }
319+ logger.Info("connected to PostgreSQL database")
320 default:
321 gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
322 if err != nil {
323 return nil, fmt.Errorf("failed to open sqlite database: %w", err)
324 }
325+ gdb.Exec("PRAGMA journal_mode=WAL")
326+ gdb.Exec("PRAGMA synchronous=NORMAL")
327+328+ logger.Info("connected to SQLite database", "path", args.DbName)
329 }
330 dbw := db.NewDB(gdb)
331···368 var nonceSecret []byte
369 maybeSecret, err := os.ReadFile("nonce.secret")
370 if err != nil && !os.IsNotExist(err) {
371+ logger.Error("error attempting to read nonce secret", "error", err)
372 } else {
373 nonceSecret = maybeSecret
374 }
···406 Hostname: args.Hostname,
407 ClientManagerArgs: client.ManagerArgs{
408 Cli: oauthCli,
409+ Logger: args.Logger.With("component", "oauth-client-manager"),
410 },
411 DpopManagerArgs: dpop.ManagerArgs{
412 NonceSecret: nonceSecret,
413 NonceRotationInterval: constants.NonceMaxRotationInterval / 3,
414 OnNonceSecretCreated: func(newNonce []byte) {
415 if err := os.WriteFile("nonce.secret", newNonce, 0644); err != nil {
416+ logger.Error("error writing new nonce secret", "error", err)
417 }
418 },
419+ Logger: args.Logger.With("component", "dpop-manager"),
420 Hostname: args.Hostname,
421 },
422 }),
···532 s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
533 s.echo.POST("/xrpc/app.bsky.actor.putPreferences", s.handleActorPutPreferences, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
534 s.echo.GET("/xrpc/app.bsky.feed.getFeed", s.handleProxyBskyFeedGetFeed, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
535+ s.echo.GET("/xrpc/app.bsky.ageassurance.getState", s.handleAgeAssurance, s.handleLegacySessionMiddleware, s.handleOauthSessionMiddleware)
536 // admin routes
537 s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware)
538 s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware)
···543}
544545func (s *Server) Serve(ctx context.Context) error {
546+ logger := s.logger.With("name", "Serve")
547+548 s.addRoutes()
549550+ logger.Info("migrating...")
551552 s.db.AutoMigrate(
553 &models.Actor{},
···564 &provider.OauthAuthorizationRequest{},
565 )
566567+ logger.Info("starting cocoon")
568569 go func() {
570 if err := s.httpd.ListenAndServe(); err != nil {
···576577 go func() {
578 if err := s.requestCrawl(ctx); err != nil {
579+ logger.Error("error requesting crawls", "err", err)
580 }
581 }()
582···594595 logger.Info("requesting crawl with configured relays")
596597+ if time.Since(s.lastRequestCrawl) <= 1*time.Minute {
598 return fmt.Errorf("a crawl request has already been made within the last minute")
599 }
600···617}
618619func (s *Server) doBackup() {
620+ logger := s.logger.With("name", "doBackup")
621+622 if s.dbType == "postgres" {
623+ logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)")
624 return
625 }
626627 start := time.Now()
628629+ logger.Info("beginning backup to s3...")
630631+ tmpFile := fmt.Sprintf("/tmp/cocoon-backup-%s.db", time.Now().Format(time.RFC3339Nano))
632+ defer os.Remove(tmpFile)
000633634+ if err := s.db.Client().Exec(fmt.Sprintf("VACUUM INTO '%s'", tmpFile)).Error; err != nil {
635+ logger.Error("error creating tmp backup file", "err", err)
636+ return
637+ }
00000638639+ backupData, err := os.ReadFile(tmpFile)
640+ if err != nil {
641+ logger.Error("error reading tmp backup file", "err", err)
642 return
643 }
644645+ logger.Info("sending to s3...")
0646647+ currTime := time.Now().Format("2006-01-02_15-04-05")
648+ key := "cocoon-backup-" + currTime + ".db"
649650+ config := &aws.Config{
651+ Region: aws.String(s.s3Config.Region),
652+ Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
653+ }
654655+ if s.s3Config.Endpoint != "" {
656+ config.Endpoint = aws.String(s.s3Config.Endpoint)
657+ config.S3ForcePathStyle = aws.Bool(true)
658+ }
0000000659660+ sess, err := session.NewSession(config)
661+ if err != nil {
662+ logger.Error("error creating s3 session", "err", err)
663+ return
664+ }
00665666+ svc := s3.New(sess)
667668+ if _, err := svc.PutObject(&s3.PutObjectInput{
669+ Bucket: aws.String(s.s3Config.Bucket),
670+ Key: aws.String(key),
671+ Body: bytes.NewReader(backupData),
672+ }); err != nil {
673+ logger.Error("error uploading file to s3", "err", err)
674 return
675 }
676677+ logger.Info("finished uploading backup to s3", "key", key, "duration", time.Since(start).Seconds())
678+679+ os.WriteFile("last-backup.txt", []byte(time.Now().Format(time.RFC3339Nano)), 0644)
680}
681682func (s *Server) backupRoutine() {
683+ logger := s.logger.With("name", "backupRoutine")
684+685 if s.s3Config == nil || !s.s3Config.BackupsEnabled {
686 return
687 }
688689 if s.s3Config.Region == "" {
690+ logger.Warn("no s3 region configured but backups are enabled. backups will not run.")
691 return
692 }
693694 if s.s3Config.Bucket == "" {
695+ logger.Warn("no s3 bucket configured but backups are enabled. backups will not run.")
696 return
697 }
698699 if s.s3Config.AccessKey == "" {
700+ logger.Warn("no s3 access key configured but backups are enabled. backups will not run.")
701 return
702 }
703704 if s.s3Config.SecretKey == "" {
705+ logger.Warn("no s3 secret key configured but backups are enabled. backups will not run.")
706 return
707 }
708···711 if err != nil {
712 shouldBackupNow = true
713 } else {
714+ lastBackup, err := time.Parse(time.RFC3339Nano, string(lastBackupStr))
715 if err != nil {
716 shouldBackupNow = true
717+ } else if time.Since(lastBackup).Seconds() > 3600 {
718 shouldBackupNow = true
719 }
720 }
···730}
731732func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error {
733+ if err := s.db.Exec(ctx, "UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil {
734 return err
735 }
736