···5 push:
6 branches:
7 - main
0089env:
10 REGISTRY: ghcr.io
···1213jobs:
14 build-and-push-image:
15- runs-on: ubuntu-latest
000000016 # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job.
17 permissions:
18 contents: read
19 packages: write
20 attestations: write
21 id-token: write
22- #
0023 steps:
24 - name: Checkout repository
25 uses: actions/checkout@v4
026 # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
27 - name: Log in to the Container registry
28- uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
29 with:
30 registry: ${{ env.REGISTRY }}
31 username: ${{ github.actor }}
32 password: ${{ secrets.GITHUB_TOKEN }}
033 # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
34 - name: Extract metadata (tags, labels) for Docker
35 id: meta
···37 with:
38 images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
39 tags: |
40- type=sha
41- type=sha,format=long
000042 # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
43 # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
44 # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
45 - name: Build and push Docker image
46 id: push
47- uses: docker/build-push-action@v5
48 with:
49 context: .
50 push: true
51 tags: ${{ steps.meta.outputs.tags }}
52 labels: ${{ steps.meta.outputs.labels }}
5300000000000000000000000000000000000000054 # This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
55 - name: Generate artifact attestation
56 uses: actions/attest-build-provenance@v1
57 with:
58 subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
59- subject-digest: ${{ steps.push.outputs.digest }}
60 push-to-registry: true
···5 push:
6 branches:
7 - main
8+ tags:
9+ - 'v*'
1011env:
12 REGISTRY: ghcr.io
···1415jobs:
16 build-and-push-image:
17+ strategy:
18+ matrix:
19+ include:
20+ - arch: amd64
21+ runner: ubuntu-latest
22+ - arch: arm64
23+ runner: ubuntu-24.04-arm
24+ runs-on: ${{ matrix.runner }}
25 # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job.
26 permissions:
27 contents: read
28 packages: write
29 attestations: write
30 id-token: write
31+ outputs:
32+ digest-amd64: ${{ matrix.arch == 'amd64' && steps.push.outputs.digest || '' }}
33+ digest-arm64: ${{ matrix.arch == 'arm64' && steps.push.outputs.digest || '' }}
34 steps:
35 - name: Checkout repository
36 uses: actions/checkout@v4
37+38 # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
39 - name: Log in to the Container registry
40+ uses: docker/login-action@v3
41 with:
42 registry: ${{ env.REGISTRY }}
43 username: ${{ github.actor }}
44 password: ${{ secrets.GITHUB_TOKEN }}
45+46 # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
47 - name: Extract metadata (tags, labels) for Docker
48 id: meta
···50 with:
51 images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
52 tags: |
53+ type=raw,value=latest,enable={{is_default_branch}},suffix=-${{ matrix.arch }}
54+ type=sha,suffix=-${{ matrix.arch }}
55+ type=sha,format=long,suffix=-${{ matrix.arch }}
56+ type=semver,pattern={{version}},suffix=-${{ matrix.arch }}
57+ type=semver,pattern={{major}}.{{minor}},suffix=-${{ matrix.arch }}
58+59 # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
60 # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
61 # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
62 - name: Build and push Docker image
63 id: push
64+ uses: docker/build-push-action@v6
65 with:
66 context: .
67 push: true
68 tags: ${{ steps.meta.outputs.tags }}
69 labels: ${{ steps.meta.outputs.labels }}
7071+ publish-manifest:
72+ needs: build-and-push-image
73+ runs-on: ubuntu-latest
74+ permissions:
75+ packages: write
76+ attestations: write
77+ id-token: write
78+ steps:
79+ - name: Log in to the Container registry
80+ uses: docker/login-action@v3
81+ with:
82+ registry: ${{ env.REGISTRY }}
83+ username: ${{ github.actor }}
84+ password: ${{ secrets.GITHUB_TOKEN }}
85+86+ - name: Extract metadata (tags, labels) for Docker
87+ id: meta
88+ uses: docker/metadata-action@v5
89+ with:
90+ images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
91+ tags: |
92+ type=raw,value=latest,enable={{is_default_branch}}
93+ type=sha
94+ type=sha,format=long
95+ type=semver,pattern={{version}}
96+ type=semver,pattern={{major}}.{{minor}}
97+98+ - name: Create and push manifest
99+ run: |
100+ # Split tags into an array
101+ readarray -t tags <<< "${{ steps.meta.outputs.tags }}"
102+103+ # Create and push manifest for each tag
104+ for tag in "${tags[@]}"; do
105+ docker buildx imagetools create -t "$tag" \
106+ "${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ needs.build-and-push-image.outputs.digest-amd64 }}" \
107+ "${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ needs.build-and-push-image.outputs.digest-arm64 }}"
108+ done
109+110 # This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
111 - name: Generate artifact attestation
112 uses: actions/attest-build-provenance@v1
113 with:
114 subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
115+ subject-digest: ${{ needs.build-and-push-image.outputs.digest-amd64 }}
116 push-to-registry: true
···55 docker-compose up -d
56 ```
57000000585. **Get your invite code**
5960 On first run, an invite code is automatically created. View it with:
···9697### Optional Configuration
9800000000000000000000000099#### SMTP Email Settings
100```bash
101COCOON_SMTP_USER="your-smtp-username"
···107```
108109#### S3 Storage
000110```bash
0111COCOON_S3_BACKUPS_ENABLED=true
000112COCOON_S3_BLOBSTORE_ENABLED=true
00113COCOON_S3_REGION="us-east-1"
114COCOON_S3_BUCKET="your-bucket"
115COCOON_S3_ENDPOINT="https://s3.amazonaws.com"
116COCOON_S3_ACCESS_KEY="your-access-key"
117COCOON_S3_SECRET_KEY="your-secret-key"
0000118```
1190000000000120### Management Commands
121122Create an invite code:
···160- [x] `com.atproto.repo.getRecord`
161- [x] `com.atproto.repo.importRepo` (Works "okay". Use with extreme caution.)
162- [x] `com.atproto.repo.listRecords`
163-- [ ] `com.atproto.repo.listMissingBlobs`
164165### Server
166···171- [x] `com.atproto.server.createInviteCode`
172- [x] `com.atproto.server.createInviteCodes`
173- [x] `com.atproto.server.deactivateAccount`
174-- [ ] `com.atproto.server.deleteAccount`
175- [x] `com.atproto.server.deleteSession`
176- [x] `com.atproto.server.describeServer`
177- [ ] `com.atproto.server.getAccountInviteCodes`
178-- [ ] `com.atproto.server.getServiceAuth`
179- ~~[ ] `com.atproto.server.listAppPasswords`~~ - not going to add app passwords
180- [x] `com.atproto.server.refreshSession`
181-- [ ] `com.atproto.server.requestAccountDelete`
182- [x] `com.atproto.server.requestEmailConfirmation`
183- [x] `com.atproto.server.requestEmailUpdate`
184- [x] `com.atproto.server.requestPasswordReset`
185-- [ ] `com.atproto.server.reserveSigningKey`
186- [x] `com.atproto.server.resetPassword`
187- ~~[] `com.atproto.server.revokeAppPassword`~~ - not going to add app passwords
188- [x] `com.atproto.server.updateEmail`
···203204### Other
205206-- [ ] `com.atproto.label.queryLabels`
207- [x] `com.atproto.moderation.createReport` (Note: this should be handled by proxying, not actually implemented in the PDS)
208- [x] `app.bsky.actor.getPreferences`
209- [x] `app.bsky.actor.putPreferences`
···55 docker-compose up -d
56 ```
5758+ **For PostgreSQL deployment:**
59+ ```bash
60+ # Add POSTGRES_PASSWORD to your .env file first!
61+ docker-compose -f docker-compose.postgres.yaml up -d
62+ ```
63+645. **Get your invite code**
6566 On first run, an invite code is automatically created. View it with:
···102103### Optional Configuration
104105+#### Database Configuration
106+107+By default, Cocoon uses SQLite which requires no additional setup. For production deployments with higher traffic, you can use PostgreSQL:
108+109+```bash
110+# Database type: sqlite (default) or postgres
111+COCOON_DB_TYPE="postgres"
112+113+# PostgreSQL connection string (required if db-type is postgres)
114+# Format: postgres://user:password@host:port/database?sslmode=disable
115+COCOON_DATABASE_URL="postgres://cocoon:password@localhost:5432/cocoon?sslmode=disable"
116+117+# Or use the standard DATABASE_URL environment variable
118+DATABASE_URL="postgres://cocoon:password@localhost:5432/cocoon?sslmode=disable"
119+```
120+121+For SQLite (default):
122+```bash
123+COCOON_DB_TYPE="sqlite"
124+COCOON_DB_NAME="/data/cocoon/cocoon.db"
125+```
126+127+> **Note**: When using PostgreSQL, database backups to S3 are not handled by Cocoon. Use `pg_dump` or your database provider's backup solution instead.
128+129#### SMTP Email Settings
130```bash
131COCOON_SMTP_USER="your-smtp-username"
···137```
138139#### S3 Storage
140+141+Cocoon supports S3-compatible storage for both database backups (SQLite only) and blob storage (images, videos, etc.):
142+143```bash
144+# Enable S3 backups (SQLite databases only - hourly backups)
145COCOON_S3_BACKUPS_ENABLED=true
146+147+# Enable S3 for blob storage (images, videos, etc.)
148+# When enabled, blobs are stored in S3 instead of the database
149COCOON_S3_BLOBSTORE_ENABLED=true
150+151+# S3 configuration (works with AWS S3, MinIO, Cloudflare R2, etc.)
152COCOON_S3_REGION="us-east-1"
153COCOON_S3_BUCKET="your-bucket"
154COCOON_S3_ENDPOINT="https://s3.amazonaws.com"
155COCOON_S3_ACCESS_KEY="your-access-key"
156COCOON_S3_SECRET_KEY="your-secret-key"
157+158+# Optional: CDN/public URL for blob redirects
159+# When set, com.atproto.sync.getBlob redirects to this URL instead of proxying
160+COCOON_S3_CDN_URL="https://cdn.example.com"
161```
162163+**Blob Storage Options:**
164+- `COCOON_S3_BLOBSTORE_ENABLED=false` (default): Blobs stored in the database
165+- `COCOON_S3_BLOBSTORE_ENABLED=true`: Blobs stored in S3 bucket under `blobs/{did}/{cid}`
166+167+**Blob Serving Options:**
168+- Without `COCOON_S3_CDN_URL`: Blobs are proxied through the PDS server
169+- With `COCOON_S3_CDN_URL`: `getBlob` returns a 302 redirect to `{CDN_URL}/blobs/{did}/{cid}`
170+171+> **Tip**: For Cloudflare R2, you can use the public bucket URL as the CDN URL. For AWS S3, you can use CloudFront or the S3 bucket URL directly if public access is enabled.
172+173### Management Commands
174175Create an invite code:
···213- [x] `com.atproto.repo.getRecord`
214- [x] `com.atproto.repo.importRepo` (Works "okay". Use with extreme caution.)
215- [x] `com.atproto.repo.listRecords`
216+- [x] `com.atproto.repo.listMissingBlobs`
217218### Server
219···224- [x] `com.atproto.server.createInviteCode`
225- [x] `com.atproto.server.createInviteCodes`
226- [x] `com.atproto.server.deactivateAccount`
227+- [x] `com.atproto.server.deleteAccount`
228- [x] `com.atproto.server.deleteSession`
229- [x] `com.atproto.server.describeServer`
230- [ ] `com.atproto.server.getAccountInviteCodes`
231+- [x] `com.atproto.server.getServiceAuth`
232- ~~[ ] `com.atproto.server.listAppPasswords`~~ - not going to add app passwords
233- [x] `com.atproto.server.refreshSession`
234+- [x] `com.atproto.server.requestAccountDelete`
235- [x] `com.atproto.server.requestEmailConfirmation`
236- [x] `com.atproto.server.requestEmailUpdate`
237- [x] `com.atproto.server.requestPasswordReset`
238+- [x] `com.atproto.server.reserveSigningKey`
239- [x] `com.atproto.server.resetPassword`
240- ~~[] `com.atproto.server.revokeAppPassword`~~ - not going to add app passwords
241- [x] `com.atproto.server.updateEmail`
···256257### Other
258259+- [x] `com.atproto.label.queryLabels`
260- [x] `com.atproto.moderation.createReport` (Note: this should be handled by proxying, not actually implemented in the PDS)
261- [x] `app.bsky.actor.getPreferences`
262- [x] `app.bsky.actor.putPreferences`
···1package server
23import (
004 "github.com/haileyok/cocoon/models"
5)
67-func (s *Server) getActorByHandle(handle string) (*models.Actor, error) {
8 var actor models.Actor
9- if err := s.db.First(&actor, models.Actor{Handle: handle}).Error; err != nil {
10 return nil, err
11 }
12 return &actor, nil
13}
1415-func (s *Server) getRepoByEmail(email string) (*models.Repo, error) {
16 var repo models.Repo
17- if err := s.db.First(&repo, models.Repo{Email: email}).Error; err != nil {
18 return nil, err
19 }
20 return &repo, nil
21}
2223-func (s *Server) getRepoActorByEmail(email string) (*models.RepoActor, error) {
24 var repo models.RepoActor
25- if err := s.db.Raw("SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.email= ?", nil, email).Scan(&repo).Error; err != nil {
26 return nil, err
27 }
28 return &repo, nil
29}
3031-func (s *Server) getRepoActorByDid(did string) (*models.RepoActor, error) {
32 var repo models.RepoActor
33- if err := s.db.Raw("SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.did = ?", nil, did).Scan(&repo).Error; err != nil {
34 return nil, err
35 }
36 return &repo, nil
···1package server
23import (
4+ "context"
5+6 "github.com/haileyok/cocoon/models"
7)
89+func (s *Server) getActorByHandle(ctx context.Context, handle string) (*models.Actor, error) {
10 var actor models.Actor
11+ if err := s.db.First(ctx, &actor, models.Actor{Handle: handle}).Error; err != nil {
12 return nil, err
13 }
14 return &actor, nil
15}
1617+func (s *Server) getRepoByEmail(ctx context.Context, email string) (*models.Repo, error) {
18 var repo models.Repo
19+ if err := s.db.First(ctx, &repo, models.Repo{Email: email}).Error; err != nil {
20 return nil, err
21 }
22 return &repo, nil
23}
2425+func (s *Server) getRepoActorByEmail(ctx context.Context, email string) (*models.RepoActor, error) {
26 var repo models.RepoActor
27+ if err := s.db.Raw(ctx, "SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.email= ?", nil, email).Scan(&repo).Error; err != nil {
28 return nil, err
29 }
30 return &repo, nil
31}
3233+func (s *Server) getRepoActorByDid(ctx context.Context, did string) (*models.RepoActor, error) {
34 var repo models.RepoActor
35+ if err := s.db.Raw(ctx, "SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.did = ?", nil, did).Scan(&repo).Error; err != nil {
36 return nil, err
37 }
38 return &repo, nil
+4-2
server/handle_account.go
···1213func (s *Server) handleAccount(e echo.Context) error {
14 ctx := e.Request().Context()
0015 repo, sess, err := s.getSessionRepoOrErr(e)
16 if err != nil {
17 return e.Redirect(303, "/account/signin")
···20 oldestPossibleSession := time.Now().Add(constants.ConfidentialClientSessionLifetime)
2122 var tokens []provider.OauthToken
23- if err := s.db.Raw("SELECT * FROM oauth_tokens WHERE sub = ? AND created_at < ? ORDER BY created_at ASC", nil, repo.Repo.Did, oldestPossibleSession).Scan(&tokens).Error; err != nil {
24- s.logger.Error("couldnt fetch oauth sessions for account", "did", repo.Repo.Did, "error", err)
25 sess.AddFlash("Unable to fetch sessions. See server logs for more details.", "error")
26 sess.Save(e.Request(), e.Response())
27 return e.Render(200, "account.html", map[string]any{
···1213func (s *Server) handleAccount(e echo.Context) error {
14 ctx := e.Request().Context()
15+ logger := s.logger.With("name", "handleAuth")
16+17 repo, sess, err := s.getSessionRepoOrErr(e)
18 if err != nil {
19 return e.Redirect(303, "/account/signin")
···22 oldestPossibleSession := time.Now().Add(constants.ConfidentialClientSessionLifetime)
2324 var tokens []provider.OauthToken
25+ if err := s.db.Raw(ctx, "SELECT * FROM oauth_tokens WHERE sub = ? AND created_at < ? ORDER BY created_at ASC", nil, repo.Repo.Did, oldestPossibleSession).Scan(&tokens).Error; err != nil {
26+ logger.Error("couldnt fetch oauth sessions for account", "did", repo.Repo.Did, "error", err)
27 sess.AddFlash("Unable to fetch sessions. See server logs for more details.", "error")
28 sess.Save(e.Request(), e.Response())
29 return e.Render(200, "account.html", map[string]any{
+8-5
server/handle_account_revoke.go
···5 "github.com/labstack/echo/v4"
6)
78-type AccountRevokeRequest struct {
9 Token string `form:"token"`
10}
1112func (s *Server) handleAccountRevoke(e echo.Context) error {
13- var req AccountRevokeRequest
00014 if err := e.Bind(&req); err != nil {
15- s.logger.Error("could not bind account revoke request", "error", err)
16 return helpers.ServerError(e, nil)
17 }
18···21 return e.Redirect(303, "/account/signin")
22 }
2324- if err := s.db.Exec("DELETE FROM oauth_tokens WHERE sub = ? AND token = ?", nil, repo.Repo.Did, req.Token).Error; err != nil {
25- s.logger.Error("couldnt delete oauth session for account", "did", repo.Repo.Did, "token", req.Token, "error", err)
26 sess.AddFlash("Unable to revoke session. See server logs for more details.", "error")
27 sess.Save(e.Request(), e.Response())
28 return e.Redirect(303, "/account")
···5 "github.com/labstack/echo/v4"
6)
78+type AccountRevokeInput struct {
9 Token string `form:"token"`
10}
1112func (s *Server) handleAccountRevoke(e echo.Context) error {
13+ ctx := e.Request().Context()
14+ logger := s.logger.With("name", "handleAcocuntRevoke")
15+16+ var req AccountRevokeInput
17 if err := e.Bind(&req); err != nil {
18+ logger.Error("could not bind account revoke request", "error", err)
19 return helpers.ServerError(e, nil)
20 }
21···24 return e.Redirect(303, "/account/signin")
25 }
2627+ if err := s.db.Exec(ctx, "DELETE FROM oauth_tokens WHERE sub = ? AND token = ?", nil, repo.Repo.Did, req.Token).Error; err != nil {
28+ logger.Error("couldnt delete oauth session for account", "did", repo.Repo.Did, "token", req.Token, "error", err)
29 sess.AddFlash("Unable to revoke session. See server logs for more details.", "error")
30 sess.Save(e.Request(), e.Response())
31 return e.Redirect(303, "/account")
+68-16
server/handle_account_signin.go
···23import (
4 "errors"
05 "strings"
067 "github.com/bluesky-social/indigo/atproto/syntax"
8 "github.com/gorilla/sessions"
···14 "gorm.io/gorm"
15)
1617-type OauthSigninRequest struct {
18- Username string `form:"username"`
19- Password string `form:"password"`
20- QueryParams string `form:"query_params"`
021}
2223func (s *Server) getSessionRepoOrErr(e echo.Context) (*models.RepoActor, *sessions.Session, error) {
0024 sess, err := session.Get("session", e)
25 if err != nil {
26 return nil, nil, err
···31 return nil, sess, errors.New("did was not set in session")
32 }
3334- repo, err := s.getRepoActorByDid(did)
35 if err != nil {
36 return nil, sess, err
37 }
···42func getFlashesFromSession(e echo.Context, sess *sessions.Session) map[string]any {
43 defer sess.Save(e.Request(), e.Response())
44 return map[string]any{
45- "errors": sess.Flashes("error"),
46- "successes": sess.Flashes("success"),
047 }
48}
49···60}
6162func (s *Server) handleAccountSigninPost(e echo.Context) error {
63- var req OauthSigninRequest
00064 if err := e.Bind(&req); err != nil {
65- s.logger.Error("error binding sign in req", "error", err)
66 return helpers.ServerError(e, nil)
67 }
68···76 idtype = "handle"
77 } else {
78 idtype = "email"
0000079 }
8081 // TODO: we should make this a helper since we do it for the base create_session as well
···83 var err error
84 switch idtype {
85 case "did":
86- err = s.db.Raw("SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.did = ?", nil, req.Username).Scan(&repo).Error
87 case "handle":
88- err = s.db.Raw("SELECT r.*, a.* FROM actors a LEFT JOIN repos r ON a.did = r.did WHERE a.handle = ?", nil, req.Username).Scan(&repo).Error
89 case "email":
90- err = s.db.Raw("SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.email = ?", nil, req.Username).Scan(&repo).Error
91 }
92 if err != nil {
93 if err == gorm.ErrRecordNotFound {
···96 sess.AddFlash("Something went wrong!", "error")
97 }
98 sess.Save(e.Request(), e.Response())
99- return e.Redirect(303, "/account/signin")
100 }
101102 if err := bcrypt.CompareHashAndPassword([]byte(repo.Password), []byte(req.Password)); err != nil {
···106 sess.AddFlash("Something went wrong!", "error")
107 }
108 sess.Save(e.Request(), e.Response())
109- return e.Redirect(303, "/account/signin")
00000000000000000000000000000000000000110 }
111112 sess.Options = &sessions.Options{
···122 return err
123 }
124125- if req.QueryParams != "" {
126- return e.Redirect(303, "/oauth/authorize?"+req.QueryParams)
127 } else {
128 return e.Redirect(303, "/account")
129 }
···23import (
4 "errors"
5+ "fmt"
6 "strings"
7+ "time"
89 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/gorilla/sessions"
···16 "gorm.io/gorm"
17)
1819+type OauthSigninInput struct {
20+ Username string `form:"username"`
21+ Password string `form:"password"`
22+ AuthFactorToken string `form:"token"`
23+ QueryParams string `form:"query_params"`
24}
2526func (s *Server) getSessionRepoOrErr(e echo.Context) (*models.RepoActor, *sessions.Session, error) {
27+ ctx := e.Request().Context()
28+29 sess, err := session.Get("session", e)
30 if err != nil {
31 return nil, nil, err
···36 return nil, sess, errors.New("did was not set in session")
37 }
3839+ repo, err := s.getRepoActorByDid(ctx, did)
40 if err != nil {
41 return nil, sess, err
42 }
···47func getFlashesFromSession(e echo.Context, sess *sessions.Session) map[string]any {
48 defer sess.Save(e.Request(), e.Response())
49 return map[string]any{
50+ "errors": sess.Flashes("error"),
51+ "successes": sess.Flashes("success"),
52+ "tokenrequired": sess.Flashes("tokenrequired"),
53 }
54}
55···66}
6768func (s *Server) handleAccountSigninPost(e echo.Context) error {
69+ ctx := e.Request().Context()
70+ logger := s.logger.With("name", "handleAccountSigninPost")
71+72+ var req OauthSigninInput
73 if err := e.Bind(&req); err != nil {
74+ logger.Error("error binding sign in req", "error", err)
75 return helpers.ServerError(e, nil)
76 }
77···85 idtype = "handle"
86 } else {
87 idtype = "email"
88+ }
89+90+ queryParams := ""
91+ if req.QueryParams != "" {
92+ queryParams = fmt.Sprintf("?%s", req.QueryParams)
93 }
9495 // TODO: we should make this a helper since we do it for the base create_session as well
···97 var err error
98 switch idtype {
99 case "did":
100+ err = s.db.Raw(ctx, "SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.did = ?", nil, req.Username).Scan(&repo).Error
101 case "handle":
102+ err = s.db.Raw(ctx, "SELECT r.*, a.* FROM actors a LEFT JOIN repos r ON a.did = r.did WHERE a.handle = ?", nil, req.Username).Scan(&repo).Error
103 case "email":
104+ err = s.db.Raw(ctx, "SELECT r.*, a.* FROM repos r LEFT JOIN actors a ON r.did = a.did WHERE r.email = ?", nil, req.Username).Scan(&repo).Error
105 }
106 if err != nil {
107 if err == gorm.ErrRecordNotFound {
···110 sess.AddFlash("Something went wrong!", "error")
111 }
112 sess.Save(e.Request(), e.Response())
113+ return e.Redirect(303, "/account/signin"+queryParams)
114 }
115116 if err := bcrypt.CompareHashAndPassword([]byte(repo.Password), []byte(req.Password)); err != nil {
···120 sess.AddFlash("Something went wrong!", "error")
121 }
122 sess.Save(e.Request(), e.Response())
123+ return e.Redirect(303, "/account/signin"+queryParams)
124+ }
125+126+ // if repo requires 2FA token and one hasn't been provided, return error prompting for one
127+ if repo.TwoFactorType != models.TwoFactorTypeNone && req.AuthFactorToken == "" {
128+ err = s.createAndSendTwoFactorCode(ctx, repo)
129+ if err != nil {
130+ sess.AddFlash("Something went wrong!", "error")
131+ sess.Save(e.Request(), e.Response())
132+ return e.Redirect(303, "/account/signin"+queryParams)
133+ }
134+135+ sess.AddFlash("requires 2FA token", "tokenrequired")
136+ sess.Save(e.Request(), e.Response())
137+ return e.Redirect(303, "/account/signin"+queryParams)
138+ }
139+140+ // if 2FAis required, now check that the one provided is valid
141+ if repo.TwoFactorType != models.TwoFactorTypeNone {
142+ if repo.TwoFactorCode == nil || repo.TwoFactorCodeExpiresAt == nil {
143+ err = s.createAndSendTwoFactorCode(ctx, repo)
144+ if err != nil {
145+ sess.AddFlash("Something went wrong!", "error")
146+ sess.Save(e.Request(), e.Response())
147+ return e.Redirect(303, "/account/signin"+queryParams)
148+ }
149+150+ sess.AddFlash("requires 2FA token", "tokenrequired")
151+ sess.Save(e.Request(), e.Response())
152+ return e.Redirect(303, "/account/signin"+queryParams)
153+ }
154+155+ if *repo.TwoFactorCode != req.AuthFactorToken {
156+ return helpers.InvalidTokenError(e)
157+ }
158+159+ if time.Now().UTC().After(*repo.TwoFactorCodeExpiresAt) {
160+ return helpers.ExpiredTokenError(e)
161+ }
162 }
163164 sess.Options = &sessions.Options{
···174 return err
175 }
176177+ if queryParams != "" {
178+ return e.Redirect(303, "/oauth/authorize"+queryParams)
179 } else {
180 return e.Redirect(303, "/account")
181 }
+3-1
server/handle_actor_put_preferences.go
···10// This is kinda lame. Not great to implement app.bsky in the pds, but alas
1112func (s *Server) handleActorPutPreferences(e echo.Context) error {
0013 repo := e.Get("repo").(*models.RepoActor)
1415 var prefs map[string]any
···22 return err
23 }
2425- if err := s.db.Exec("UPDATE repos SET preferences = ? WHERE did = ?", nil, b, repo.Repo.Did).Error; err != nil {
26 return err
27 }
28
···10// This is kinda lame. Not great to implement app.bsky in the pds, but alas
1112func (s *Server) handleActorPutPreferences(e echo.Context) error {
13+ ctx := e.Request().Context()
14+15 repo := e.Get("repo").(*models.RepoActor)
1617 var prefs map[string]any
···24 return err
25 }
2627+ if err := s.db.Exec(ctx, "UPDATE repos SET preferences = ? WHERE did = ?", nil, b, repo.Repo.Did).Error; err != nil {
28 return err
29 }
30
···13)
1415func (s *Server) handleSyncGetRepo(e echo.Context) error {
00016 did := e.QueryParam("did")
17 if did == "" {
18 return helpers.InputError(e, nil)
19 }
2021- urepo, err := s.getRepoActorByDid(did)
22 if err != nil {
23 return err
24 }
···36 buf := new(bytes.Buffer)
3738 if _, err := carstore.LdWrite(buf, hb); err != nil {
39- s.logger.Error("error writing to car", "error", err)
40 return helpers.ServerError(e, nil)
41 }
4243 var blocks []models.Block
44- if err := s.db.Raw("SELECT * FROM blocks WHERE did = ? ORDER BY rev ASC", nil, urepo.Repo.Did).Scan(&blocks).Error; err != nil {
45 return err
46 }
47
···13)
1415func (s *Server) handleSyncGetRepo(e echo.Context) error {
16+ ctx := e.Request().Context()
17+ logger := s.logger.With("name", "handleSyncGetRepo")
18+19 did := e.QueryParam("did")
20 if did == "" {
21 return helpers.InputError(e, nil)
22 }
2324+ urepo, err := s.getRepoActorByDid(ctx, did)
25 if err != nil {
26 return err
27 }
···39 buf := new(bytes.Buffer)
4041 if _, err := carstore.LdWrite(buf, hb); err != nil {
42+ logger.Error("error writing to car", "error", err)
43 return helpers.ServerError(e, nil)
44 }
4546 var blocks []models.Block
47+ if err := s.db.Raw(ctx, "SELECT * FROM blocks WHERE did = ? ORDER BY rev ASC", nil, urepo.Repo.Did).Scan(&blocks).Error; err != nil {
48 return err
49 }
50
+3-1
server/handle_sync_get_repo_status.go
···1415// TODO: make this actually do the right thing
16func (s *Server) handleSyncGetRepoStatus(e echo.Context) error {
0017 did := e.QueryParam("did")
18 if did == "" {
19 return helpers.InputError(e, nil)
20 }
2122- urepo, err := s.getRepoActorByDid(did)
23 if err != nil {
24 return err
25 }
···1415// TODO: make this actually do the right thing
16func (s *Server) handleSyncGetRepoStatus(e echo.Context) error {
17+ ctx := e.Request().Context()
18+19 did := e.QueryParam("did")
20 if did == "" {
21 return helpers.InputError(e, nil)
22 }
2324+ urepo, err := s.getRepoActorByDid(ctx, did)
25 if err != nil {
26 return err
27 }
+8-5
server/handle_sync_list_blobs.go
···14}
1516func (s *Server) handleSyncListBlobs(e echo.Context) error {
00017 did := e.QueryParam("did")
18 if did == "" {
19 return helpers.InputError(e, nil)
···35 }
36 params = append(params, limit)
3738- urepo, err := s.getRepoActorByDid(did)
39 if err != nil {
40- s.logger.Error("could not find user for requested blobs", "error", err)
41 return helpers.InputError(e, nil)
42 }
43···49 }
5051 var blobs []models.Blob
52- if err := s.db.Raw("SELECT * FROM blobs WHERE did = ? "+cursorquery+" ORDER BY created_at DESC LIMIT ?", nil, params...).Scan(&blobs).Error; err != nil {
53- s.logger.Error("error getting records", "error", err)
54 return helpers.ServerError(e, nil)
55 }
56···58 for _, b := range blobs {
59 c, err := cid.Cast(b.Cid)
60 if err != nil {
61- s.logger.Error("error casting cid", "error", err)
62 return helpers.ServerError(e, nil)
63 }
64 cstrs = append(cstrs, c.String())
···14}
1516func (s *Server) handleSyncListBlobs(e echo.Context) error {
17+ ctx := e.Request().Context()
18+ logger := s.logger.With("name", "handleSyncListBlobs")
19+20 did := e.QueryParam("did")
21 if did == "" {
22 return helpers.InputError(e, nil)
···38 }
39 params = append(params, limit)
4041+ urepo, err := s.getRepoActorByDid(ctx, did)
42 if err != nil {
43+ logger.Error("could not find user for requested blobs", "error", err)
44 return helpers.InputError(e, nil)
45 }
46···52 }
5354 var blobs []models.Blob
55+ if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? "+cursorquery+" ORDER BY created_at DESC LIMIT ?", nil, params...).Scan(&blobs).Error; err != nil {
56+ logger.Error("error getting records", "error", err)
57 return helpers.ServerError(e, nil)
58 }
59···61 for _, b := range blobs {
62 c, err := cid.Cast(b.Cid)
63 if err != nil {
64+ logger.Error("error casting cid", "error", err)
65 return helpers.ServerError(e, nil)
66 }
67 cstrs = append(cstrs, c.String())
+82-50
server/handle_sync_subscribe_repos.go
···7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/lex/util"
9 "github.com/btcsuite/websocket"
010 "github.com/labstack/echo/v4"
11)
1213func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
14- ctx := e.Request().Context()
0015 logger := s.logger.With("component", "subscribe-repos-websocket")
1617 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
···24 logger = logger.With("ident", ident)
25 logger.Info("new connection established")
2627- evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
0000028 return true
29 }, nil)
30 if err != nil {
31 return err
32 }
33- defer cancel()
00000000000000003435 header := events.EventHeader{Op: events.EvtKindMessage}
36 for evt := range evts {
37- wc, err := conn.NextWriter(websocket.BinaryMessage)
38- if err != nil {
39- logger.Error("error writing message to relay", "err", err)
40- break
41- }
4243- if ctx.Err() != nil {
44- logger.Error("context error", "err", err)
45- break
46- }
04748- var obj util.CBOR
49- switch {
50- case evt.Error != nil:
51- header.Op = events.EvtKindErrorFrame
52- obj = evt.Error
53- case evt.RepoCommit != nil:
54- header.MsgType = "#commit"
55- obj = evt.RepoCommit
56- case evt.RepoIdentity != nil:
57- header.MsgType = "#identity"
58- obj = evt.RepoIdentity
59- case evt.RepoAccount != nil:
60- header.MsgType = "#account"
61- obj = evt.RepoAccount
62- case evt.RepoInfo != nil:
63- header.MsgType = "#info"
64- obj = evt.RepoInfo
65- default:
66- logger.Warn("unrecognized event kind")
67- return nil
68- }
6970- if err := header.MarshalCBOR(wc); err != nil {
71- logger.Error("failed to write header to relay", "err", err)
72- break
73- }
000000000000000007475- if err := obj.MarshalCBOR(wc); err != nil {
76- logger.Error("failed to write event to relay", "err", err)
77- break
78- }
000007980- if err := wc.Close(); err != nil {
81- logger.Error("failed to flush-close our event write", "err", err)
82- break
83- }
084 }
8586 // we should tell the relay to request a new crawl at this point if we got disconnected
87 // use a new context since the old one might be cancelled at this point
88- ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
89- defer cancel()
90- if err := s.requestCrawl(ctx); err != nil {
91- logger.Error("error requesting crawls", "err", err)
92- }
009394 return nil
95}
···7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/lex/util"
9 "github.com/btcsuite/websocket"
10+ "github.com/haileyok/cocoon/metrics"
11 "github.com/labstack/echo/v4"
12)
1314func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
15+ ctx, cancel := context.WithCancel(e.Request().Context())
16+ defer cancel()
17+18 logger := s.logger.With("component", "subscribe-repos-websocket")
1920 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
···27 logger = logger.With("ident", ident)
28 logger.Info("new connection established")
2930+ metrics.RelaysConnected.WithLabelValues(ident).Inc()
31+ defer func() {
32+ metrics.RelaysConnected.WithLabelValues(ident).Dec()
33+ }()
34+35+ evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
36 return true
37 }, nil)
38 if err != nil {
39 return err
40 }
41+ defer evtManCancel()
42+43+ // drop the connection whenever a subscriber disconnects from the socket, we should get errors
44+ go func() {
45+ for {
46+ select {
47+ case <-ctx.Done():
48+ return
49+ default:
50+ if _, _, err := conn.ReadMessage(); err != nil {
51+ logger.Warn("websocket error", "err", err)
52+ cancel()
53+ return
54+ }
55+ }
56+ }
57+ }()
5859 header := events.EventHeader{Op: events.EvtKindMessage}
60 for evt := range evts {
61+ func() {
62+ defer func() {
63+ metrics.RelaySends.WithLabelValues(ident, header.MsgType).Inc()
64+ }()
06566+ wc, err := conn.NextWriter(websocket.BinaryMessage)
67+ if err != nil {
68+ logger.Error("error writing message to relay", "err", err)
69+ return
70+ }
7172+ if ctx.Err() != nil {
73+ logger.Error("context error", "err", err)
74+ return
75+ }
000000000000000007677+ var obj util.CBOR
78+ switch {
79+ case evt.Error != nil:
80+ header.Op = events.EvtKindErrorFrame
81+ obj = evt.Error
82+ case evt.RepoCommit != nil:
83+ header.MsgType = "#commit"
84+ obj = evt.RepoCommit
85+ case evt.RepoIdentity != nil:
86+ header.MsgType = "#identity"
87+ obj = evt.RepoIdentity
88+ case evt.RepoAccount != nil:
89+ header.MsgType = "#account"
90+ obj = evt.RepoAccount
91+ case evt.RepoInfo != nil:
92+ header.MsgType = "#info"
93+ obj = evt.RepoInfo
94+ default:
95+ logger.Warn("unrecognized event kind")
96+ return
97+ }
9899+ if err := header.MarshalCBOR(wc); err != nil {
100+ logger.Error("failed to write header to relay", "err", err)
101+ return
102+ }
103+104+ if err := obj.MarshalCBOR(wc); err != nil {
105+ logger.Error("failed to write event to relay", "err", err)
106+ return
107+ }
108109+ if err := wc.Close(); err != nil {
110+ logger.Error("failed to flush-close our event write", "err", err)
111+ return
112+ }
113+ }()
114 }
115116 // we should tell the relay to request a new crawl at this point if we got disconnected
117 // use a new context since the old one might be cancelled at this point
118+ go func() {
119+ retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
120+ defer retryCancel()
121+ if err := s.requestCrawl(retryCtx); err != nil {
122+ logger.Error("error requesting crawls", "err", err)
123+ }
124+ }()
125126 return nil
127}
···17 lexutil "github.com/bluesky-social/indigo/lex/util"
18 "github.com/bluesky-social/indigo/repo"
19 "github.com/haileyok/cocoon/internal/db"
20+ "github.com/haileyok/cocoon/metrics"
21 "github.com/haileyok/cocoon/models"
22 "github.com/haileyok/cocoon/recording_blockstore"
23 blocks "github.com/ipfs/go-block-format"
···97}
9899// TODO make use of swap commit
100+func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
101 rootcid, err := cid.Cast(urepo.Root)
102 if err != nil {
103 return nil, err
···105106 dbs := rm.s.getBlockstore(urepo.Did)
107 bs := recording_blockstore.New(dbs)
108+ r, err := repo.OpenRepo(ctx, bs, rootcid)
1090110 var results []ApplyWriteResult
111112+ entries := make([]models.Record, 0, len(writes))
113 for i, op := range writes {
114+ // updates or deletes must supply an rkey
115 if op.Type != OpTypeCreate && op.Rkey == nil {
116 return nil, fmt.Errorf("invalid rkey")
117 } else if op.Type == OpTypeCreate && op.Rkey != nil {
118+ // we should conver this op to an update if the rkey already exists
119+ _, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey))
120 if err == nil {
121 op.Type = OpTypeUpdate
122 }
123 } else if op.Rkey == nil {
124+ // creates that don't supply an rkey will have one generated for them
125 op.Rkey = to.StringPtr(rm.clock.Next().String())
126 writes[i].Rkey = op.Rkey
127 }
128129+ // validate the record key is actually valid
130 _, err := syntax.ParseRecordKey(*op.Rkey)
131 if err != nil {
132 return nil, err
···134135 switch op.Type {
136 case OpTypeCreate:
137+ // HACK: this fixes some type conversions, mainly around integers
138+ // first we convert to json bytes
139+ b, err := json.Marshal(*op.Record)
140 if err != nil {
141 return nil, err
142 }
143+ // then we use atdata.UnmarshalJSON to convert it back to a map
144+ out, err := atdata.UnmarshalJSON(b)
145 if err != nil {
146 return nil, err
147 }
148+ // finally we can cast to a MarshalableMap
149 mm := MarshalableMap(out)
150151 // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection
152+ // i forget why this is actually necessary?
153 if mm["$type"] == "" {
154 mm["$type"] = op.Collection
155 }
156157+ nc, err := r.PutRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm)
158 if err != nil {
159 return nil, err
160 }
161+162 d, err := atdata.MarshalCBOR(mm)
163 if err != nil {
164 return nil, err
165 }
166+167 entries = append(entries, models.Record{
168 Did: urepo.Did,
169 CreatedAt: rm.clock.Next().String(),
···172 Cid: nc.String(),
173 Value: d,
174 })
175+176 results = append(results, ApplyWriteResult{
177 Type: to.StringPtr(OpTypeCreate.String()),
178 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
···180 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
181 })
182 case OpTypeDelete:
183+ // try to find the old record in the database
184 var old models.Record
185+ if err := rm.db.Raw(ctx, "SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil {
186 return nil, err
187 }
188+189+ // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we
190+ // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical
191+ // when reading this code. i dont feel like fixing right now though so
192 entries = append(entries, models.Record{
193 Did: urepo.Did,
194 Nsid: op.Collection,
195 Rkey: *op.Rkey,
196 Value: old.Value,
197 })
198+199+ // delete the record from the repo
200+ err := r.DeleteRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey))
201 if err != nil {
202 return nil, err
203 }
204+205+ // add a result for the delete
206 results = append(results, ApplyWriteResult{
207 Type: to.StringPtr(OpTypeDelete.String()),
208 })
209 case OpTypeUpdate:
210+ // HACK: same hack as above for type fixes
211+ b, err := json.Marshal(*op.Record)
212 if err != nil {
213 return nil, err
214 }
215+ out, err := atdata.UnmarshalJSON(b)
216 if err != nil {
217 return nil, err
218 }
219 mm := MarshalableMap(out)
220+221+ nc, err := r.UpdateRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm)
222 if err != nil {
223 return nil, err
224 }
225+226 d, err := atdata.MarshalCBOR(mm)
227 if err != nil {
228 return nil, err
229 }
230+231 entries = append(entries, models.Record{
232 Did: urepo.Did,
233 CreatedAt: rm.clock.Next().String(),
···236 Cid: nc.String(),
237 Value: d,
238 })
239+240 results = append(results, ApplyWriteResult{
241 Type: to.StringPtr(OpTypeUpdate.String()),
242 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
···246 }
247 }
248249+ // commit and get the new root
250+ newroot, rev, err := r.Commit(ctx, urepo.SignFor)
251 if err != nil {
252 return nil, err
253 }
254255+ for _, result := range results {
256+ if result.Type != nil {
257+ metrics.RepoOperations.WithLabelValues(*result.Type).Inc()
258+ }
259+ }
260+261+ // create a buffer for dumping our new cbor into
262 buf := new(bytes.Buffer)
263264+ // first write the car header to the buffer
265 hb, err := cbor.DumpObject(&car.CarHeader{
266 Roots: []cid.Cid{newroot},
267 Version: 1,
268 })
0269 if _, err := carstore.LdWrite(buf, hb); err != nil {
270 return nil, err
271 }
272273+ // get a diff of the changes to the repo
274+ diffops, err := r.DiffSince(ctx, rootcid)
275 if err != nil {
276 return nil, err
277 }
278279+ // create the repo ops for the given diff
280 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
0281 for _, op := range diffops {
282 var c cid.Cid
283 switch op.Op {
···306 })
307 }
308309+ blk, err := dbs.Get(ctx, c)
310 if err != nil {
311 return nil, err
312 }
313314+ // write the block to the buffer
315 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
316 return nil, err
317 }
318 }
319320+ // write the writelog to the buffer
321 for _, op := range bs.GetWriteLog() {
322 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
323 return nil, err
324 }
325 }
326327+ // blob blob blob blob blob :3
328 var blobs []lexutil.LexLink
329 for _, entry := range entries {
330 var cids []cid.Cid
331+ // whenever there is cid present, we know it's a create (dumb)
332 if entry.Cid != "" {
333+ if err := rm.s.db.Create(ctx, &entry, []clause.Expression{clause.OnConflict{
334 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
335 UpdateAll: true,
336 }}).Error; err != nil {
337 return nil, err
338 }
339340+ // increment the given blob refs, yay
341+ cids, err = rm.incrementBlobRefs(ctx, urepo, entry.Value)
342 if err != nil {
343 return nil, err
344 }
345 } else {
346+ // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey
347+ // is did + collection + rkey. i still really want to separate that out, or use a different type to make
348+ // this less confusing/easy to read. alas, its 2 am and yea no
349+ if err := rm.s.db.Delete(ctx, &entry, nil).Error; err != nil {
350 return nil, err
351 }
352+353+ // TODO:
354+ cids, err = rm.decrementBlobRefs(ctx, urepo, entry.Value)
355 if err != nil {
356 return nil, err
357 }
358 }
359360+ // add all the relevant blobs to the blobs list of blobs. blob ^.^
361 for _, c := range cids {
362 blobs = append(blobs, lexutil.LexLink(c))
363 }
364 }
365366+ // NOTE: using the request ctx seems a bit suss here, so using a background context. i'm not sure if this
367+ // runs sync or not
368+ rm.s.evtman.AddEvent(context.Background(), &events.XRPCStreamEvent{
369 RepoCommit: &atproto.SyncSubscribeRepos_Commit{
370 Repo: urepo.Did,
371 Blocks: buf.Bytes(),
···379 },
380 })
381382+ if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil {
383 return nil, err
384 }
385···394 return results, nil
395}
396397+// this is a fun little guy. to get a proof, we need to read the record out of the blockstore and record how we actually
398+// got to the guy. we'll wrap a new blockstore in a recording blockstore, then return the log for proof
399+func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
400 c, err := cid.Cast(urepo.Root)
401 if err != nil {
402 return cid.Undef, nil, err
···405 dbs := rm.s.getBlockstore(urepo.Did)
406 bs := recording_blockstore.New(dbs)
407408+ r, err := repo.OpenRepo(ctx, bs, c)
409 if err != nil {
410 return cid.Undef, nil, err
411 }
412413+ _, _, err = r.GetRecordBytes(ctx, fmt.Sprintf("%s/%s", collection, rkey))
414 if err != nil {
415 return cid.Undef, nil, err
416 }
···418 return c, bs.GetReadLog(), nil
419}
420421+func (rm *RepoMan) incrementBlobRefs(ctx context.Context, urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
422 cids, err := getBlobCidsFromCbor(cbor)
423 if err != nil {
424 return nil, err
425 }
426427 for _, c := range cids {
428+ if err := rm.db.Exec(ctx, "UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", nil, urepo.Did, c.Bytes()).Error; err != nil {
429 return nil, err
430 }
431 }
···433 return cids, nil
434}
435436+func (rm *RepoMan) decrementBlobRefs(ctx context.Context, urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
437 cids, err := getBlobCidsFromCbor(cbor)
438 if err != nil {
439 return nil, err
···444 ID uint
445 Count int
446 }
447+ if err := rm.db.Raw(ctx, "UPDATE blobs SET ref_count = ref_count - 1 WHERE did = ? AND cid = ? RETURNING id, ref_count", nil, urepo.Did, c.Bytes()).Scan(&res).Error; err != nil {
448 return nil, err
449 }
450451+ // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what
452+ // storage it is in, and clean up s3!!!!
453 if res.Count == 0 {
454+ if err := rm.db.Exec(ctx, "DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil {
455 return nil, err
456 }
457+ if err := rm.db.Exec(ctx, "DELETE FROM blob_parts WHERE blob_id = ?", nil, res.ID).Error; err != nil {
458 return nil, err
459 }
460 }