···1+name: Docker image
2+3+on:
4+ workflow_dispatch:
5+ push:
6+ branches:
7+ - main
8+ tags:
9+ - 'v*'
10+11+env:
12+ REGISTRY: ghcr.io
13+ IMAGE_NAME: ${{ github.repository }}
14+15+jobs:
16+ build-and-push-image:
17+ strategy:
18+ matrix:
19+ include:
20+ - arch: amd64
21+ runner: ubuntu-latest
22+ - arch: arm64
23+ runner: ubuntu-24.04-arm
24+ runs-on: ${{ matrix.runner }}
25+ # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job.
26+ permissions:
27+ contents: read
28+ packages: write
29+ attestations: write
30+ id-token: write
31+ outputs:
32+ digest-amd64: ${{ matrix.arch == 'amd64' && steps.push.outputs.digest || '' }}
33+ digest-arm64: ${{ matrix.arch == 'arm64' && steps.push.outputs.digest || '' }}
34+ steps:
35+ - name: Checkout repository
36+ uses: actions/checkout@v4
37+38+ # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
39+ - name: Log in to the Container registry
40+ uses: docker/login-action@v3
41+ with:
42+ registry: ${{ env.REGISTRY }}
43+ username: ${{ github.actor }}
44+ password: ${{ secrets.GITHUB_TOKEN }}
45+46+ # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
47+ - name: Extract metadata (tags, labels) for Docker
48+ id: meta
49+ uses: docker/metadata-action@v5
50+ with:
51+ images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
52+ tags: |
53+ type=raw,value=latest,enable={{is_default_branch}},suffix=-${{ matrix.arch }}
54+ type=sha,suffix=-${{ matrix.arch }}
55+ type=sha,format=long,suffix=-${{ matrix.arch }}
56+ type=semver,pattern={{version}},suffix=-${{ matrix.arch }}
57+ type=semver,pattern={{major}}.{{minor}},suffix=-${{ matrix.arch }}
58+59+ # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
60+ # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
61+ # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
62+ - name: Build and push Docker image
63+ id: push
64+ uses: docker/build-push-action@v6
65+ with:
66+ context: .
67+ push: true
68+ tags: ${{ steps.meta.outputs.tags }}
69+ labels: ${{ steps.meta.outputs.labels }}
70+71+ publish-manifest:
72+ needs: build-and-push-image
73+ runs-on: ubuntu-latest
74+ permissions:
75+ packages: write
76+ attestations: write
77+ id-token: write
78+ steps:
79+ - name: Log in to the Container registry
80+ uses: docker/login-action@v3
81+ with:
82+ registry: ${{ env.REGISTRY }}
83+ username: ${{ github.actor }}
84+ password: ${{ secrets.GITHUB_TOKEN }}
85+86+ - name: Extract metadata (tags, labels) for Docker
87+ id: meta
88+ uses: docker/metadata-action@v5
89+ with:
90+ images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
91+ tags: |
92+ type=raw,value=latest,enable={{is_default_branch}}
93+ type=sha
94+ type=sha,format=long
95+ type=semver,pattern={{version}}
96+ type=semver,pattern={{major}}.{{minor}}
97+98+ - name: Create and push manifest
99+ run: |
100+ # Split tags into an array
101+ readarray -t tags <<< "${{ steps.meta.outputs.tags }}"
102+103+ # Create and push manifest for each tag
104+ for tag in "${tags[@]}"; do
105+ docker buildx imagetools create -t "$tag" \
106+ "${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ needs.build-and-push-image.outputs.digest-amd64 }}" \
107+ "${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ needs.build-and-push-image.outputs.digest-arm64 }}"
108+ done
109+110+ # This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
111+ - name: Generate artifact attestation
112+ uses: actions/attest-build-provenance@v1
113+ with:
114+ subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
115+ subject-digest: ${{ needs.build-and-push-image.outputs.digest-amd64 }}
116+ push-to-registry: true
···4GIT_COMMIT := $(shell git rev-parse --short=9 HEAD)
5VERSION := $(if $(GIT_TAG),$(GIT_TAG),dev-$(GIT_COMMIT))
6000000000000000007.PHONY: help
8help: ## Print info about all commands
9 @echo "Commands:"
···14build: ## Build all executables
15 go build -ldflags "-X main.Version=$(VERSION)" -o cocoon ./cmd/cocoon
16000000000000000000017.PHONY: run
18run:
19 go build -ldflags "-X main.Version=dev-local" -o cocoon ./cmd/cocoon && ./cocoon run
···4041.env:
42 if [ ! -f ".env" ]; then cp example.dev.env .env; fi
0000
···4GIT_COMMIT := $(shell git rev-parse --short=9 HEAD)
5VERSION := $(if $(GIT_TAG),$(GIT_TAG),dev-$(GIT_COMMIT))
67+# Build output directory
8+BUILD_DIR := dist
9+10+# Platforms to build for
11+PLATFORMS := \
12+ linux/amd64 \
13+ linux/arm64 \
14+ linux/arm \
15+ darwin/amd64 \
16+ darwin/arm64 \
17+ windows/amd64 \
18+ windows/arm64 \
19+ freebsd/amd64 \
20+ freebsd/arm64 \
21+ openbsd/amd64 \
22+ openbsd/arm64
23+24.PHONY: help
25help: ## Print info about all commands
26 @echo "Commands:"
···31build: ## Build all executables
32 go build -ldflags "-X main.Version=$(VERSION)" -o cocoon ./cmd/cocoon
3334+.PHONY: build-release
35+build-all: ## Build binaries for all architectures
36+ @echo "Building for all architectures..."
37+ @mkdir -p $(BUILD_DIR)
38+ @$(foreach platform,$(PLATFORMS), \
39+ $(eval OS := $(word 1,$(subst /, ,$(platform)))) \
40+ $(eval ARCH := $(word 2,$(subst /, ,$(platform)))) \
41+ $(eval EXT := $(if $(filter windows,$(OS)),.exe,)) \
42+ $(eval OUTPUT := $(BUILD_DIR)/cocoon-$(VERSION)-$(OS)-$(ARCH)$(EXT)) \
43+ echo "Building $(OS)/$(ARCH)..."; \
44+ GOOS=$(OS) GOARCH=$(ARCH) go build -ldflags "-X main.Version=$(VERSION)" -o $(OUTPUT) ./cmd/cocoon && \
45+ echo " โ $(OUTPUT)" || echo " โ Failed: $(OS)/$(ARCH)"; \
46+ )
47+ @echo "Done! Binaries are in $(BUILD_DIR)/"
48+49+.PHONY: clean-dist
50+clean-dist: ## Remove all built binaries
51+ rm -rf $(BUILD_DIR)
52+53.PHONY: run
54run:
55 go build -ldflags "-X main.Version=dev-local" -o cocoon ./cmd/cocoon && ./cocoon run
···7677.env:
78 if [ ! -f ".env" ]; then cp example.dev.env .env; fi
79+80+.PHONY: docker-build
81+docker-build:
82+ docker build -t cocoon .
+198-14
README.md
···1# Cocoon
23> [!WARNING]
4-You should not use this PDS. You should not rely on this code as a reference for a PDS implementation. You should not trust this code. Using this PDS implementation may result in data loss, corruption, etc.
56Cocoon is a PDS implementation in Go. It is highly experimental, and is not ready for any production use.
700000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008## Implemented Endpoints
910> [!NOTE]
···1213### Identity
1415-- [ ] `com.atproto.identity.getRecommendedDidCredentials`
16-- [ ] `com.atproto.identity.requestPlcOperationSignature`
17- [x] `com.atproto.identity.resolveHandle`
18-- [ ] `com.atproto.identity.signPlcOperation`
19-- [ ] `com.atproto.identity.submitPlcOperation`
20- [x] `com.atproto.identity.updateHandle`
2122### Repo
···27- [x] `com.atproto.repo.deleteRecord`
28- [x] `com.atproto.repo.describeRepo`
29- [x] `com.atproto.repo.getRecord`
30-- [x] `com.atproto.repo.importRepo` (Works "okay". You still have to handle PLC operations on your own when migrating. Use with extreme caution.)
31- [x] `com.atproto.repo.listRecords`
32-- [ ] `com.atproto.repo.listMissingBlobs`
3334### Server
3536-- [ ] `com.atproto.server.activateAccount`
37- [x] `com.atproto.server.checkAccountStatus`
38- [x] `com.atproto.server.confirmEmail`
39- [x] `com.atproto.server.createAccount`
40- [x] `com.atproto.server.createInviteCode`
41- [x] `com.atproto.server.createInviteCodes`
42-- [ ] `com.atproto.server.deactivateAccount`
43-- [ ] `com.atproto.server.deleteAccount`
44- [x] `com.atproto.server.deleteSession`
45- [x] `com.atproto.server.describeServer`
46- [ ] `com.atproto.server.getAccountInviteCodes`
47-- [ ] `com.atproto.server.getServiceAuth`
48- ~~[ ] `com.atproto.server.listAppPasswords`~~ - not going to add app passwords
49- [x] `com.atproto.server.refreshSession`
50-- [ ] `com.atproto.server.requestAccountDelete`
51- [x] `com.atproto.server.requestEmailConfirmation`
52- [x] `com.atproto.server.requestEmailUpdate`
53- [x] `com.atproto.server.requestPasswordReset`
54-- [ ] `com.atproto.server.reserveSigningKey`
55- [x] `com.atproto.server.resetPassword`
56- ~~[] `com.atproto.server.revokeAppPassword`~~ - not going to add app passwords
57- [x] `com.atproto.server.updateEmail`
···7273### Other
7475-- [ ] `com.atproto.label.queryLabels`
76- [x] `com.atproto.moderation.createReport` (Note: this should be handled by proxying, not actually implemented in the PDS)
77- [x] `app.bsky.actor.getPreferences`
78- [x] `app.bsky.actor.putPreferences`
···1# Cocoon
23> [!WARNING]
4+I migrated and have been running my main account on this PDS for months now without issue, however, I am still not responsible if things go awry, particularly during account migration. Please use caution.
56Cocoon is a PDS implementation in Go. It is highly experimental, and is not ready for any production use.
78+## Quick Start with Docker Compose
9+10+### Prerequisites
11+12+- Docker and Docker Compose installed
13+- A domain name pointing to your server (for automatic HTTPS)
14+- Ports 80 and 443 open in i.e. UFW
15+16+### Installation
17+18+1. **Clone the repository**
19+ ```bash
20+ git clone https://github.com/haileyok/cocoon.git
21+ cd cocoon
22+ ```
23+24+2. **Create your configuration file**
25+ ```bash
26+ cp .env.example .env
27+ ```
28+29+3. **Edit `.env` with your settings**
30+31+ Required settings:
32+ ```bash
33+ COCOON_DID="did:web:your-domain.com"
34+ COCOON_HOSTNAME="your-domain.com"
35+ COCOON_CONTACT_EMAIL="you@example.com"
36+ COCOON_RELAYS="https://bsky.network"
37+38+ # Generate with: openssl rand -hex 16
39+ COCOON_ADMIN_PASSWORD="your-secure-password"
40+41+ # Generate with: openssl rand -hex 32
42+ COCOON_SESSION_SECRET="your-session-secret"
43+ ```
44+45+4. **Start the services**
46+ ```bash
47+ # Pull pre-built image from GitHub Container Registry
48+ docker-compose pull
49+ docker-compose up -d
50+ ```
51+52+ Or build locally:
53+ ```bash
54+ docker-compose build
55+ docker-compose up -d
56+ ```
57+58+ **For PostgreSQL deployment:**
59+ ```bash
60+ # Add POSTGRES_PASSWORD to your .env file first!
61+ docker-compose -f docker-compose.postgres.yaml up -d
62+ ```
63+64+5. **Get your invite code**
65+66+ On first run, an invite code is automatically created. View it with:
67+ ```bash
68+ docker-compose logs create-invite
69+ ```
70+71+ Or check the saved file:
72+ ```bash
73+ cat keys/initial-invite-code.txt
74+ ```
75+76+ **IMPORTANT**: Save this invite code! You'll need it to create your first account.
77+78+6. **Monitor the services**
79+ ```bash
80+ docker-compose logs -f
81+ ```
82+83+### What Gets Set Up
84+85+The Docker Compose setup includes:
86+87+- **init-keys**: Automatically generates cryptographic keys (rotation key and JWK) on first run
88+- **cocoon**: The main PDS service running on port 8080
89+- **create-invite**: Automatically creates an initial invite code after Cocoon starts (first run only)
90+- **caddy**: Reverse proxy with automatic HTTPS via Let's Encrypt
91+92+### Data Persistence
93+94+The following directories will be created automatically:
95+96+- `./keys/` - Cryptographic keys (generated automatically)
97+ - `rotation.key` - PDS rotation key
98+ - `jwk.key` - JWK private key
99+ - `initial-invite-code.txt` - Your first invite code (first run only)
100+- `./data/` - SQLite database and blockstore
101+- Docker volumes for Caddy configuration and certificates
102+103+### Optional Configuration
104+105+#### Database Configuration
106+107+By default, Cocoon uses SQLite which requires no additional setup. For production deployments with higher traffic, you can use PostgreSQL:
108+109+```bash
110+# Database type: sqlite (default) or postgres
111+COCOON_DB_TYPE="postgres"
112+113+# PostgreSQL connection string (required if db-type is postgres)
114+# Format: postgres://user:password@host:port/database?sslmode=disable
115+COCOON_DATABASE_URL="postgres://cocoon:password@localhost:5432/cocoon?sslmode=disable"
116+117+# Or use the standard DATABASE_URL environment variable
118+DATABASE_URL="postgres://cocoon:password@localhost:5432/cocoon?sslmode=disable"
119+```
120+121+For SQLite (default):
122+```bash
123+COCOON_DB_TYPE="sqlite"
124+COCOON_DB_NAME="/data/cocoon/cocoon.db"
125+```
126+127+> **Note**: When using PostgreSQL, database backups to S3 are not handled by Cocoon. Use `pg_dump` or your database provider's backup solution instead.
128+129+#### SMTP Email Settings
130+```bash
131+COCOON_SMTP_USER="your-smtp-username"
132+COCOON_SMTP_PASS="your-smtp-password"
133+COCOON_SMTP_HOST="smtp.example.com"
134+COCOON_SMTP_PORT="587"
135+COCOON_SMTP_EMAIL="noreply@example.com"
136+COCOON_SMTP_NAME="Cocoon PDS"
137+```
138+139+#### S3 Storage
140+141+Cocoon supports S3-compatible storage for both database backups (SQLite only) and blob storage (images, videos, etc.):
142+143+```bash
144+# Enable S3 backups (SQLite databases only - hourly backups)
145+COCOON_S3_BACKUPS_ENABLED=true
146+147+# Enable S3 for blob storage (images, videos, etc.)
148+# When enabled, blobs are stored in S3 instead of the database
149+COCOON_S3_BLOBSTORE_ENABLED=true
150+151+# S3 configuration (works with AWS S3, MinIO, Cloudflare R2, etc.)
152+COCOON_S3_REGION="us-east-1"
153+COCOON_S3_BUCKET="your-bucket"
154+COCOON_S3_ENDPOINT="https://s3.amazonaws.com"
155+COCOON_S3_ACCESS_KEY="your-access-key"
156+COCOON_S3_SECRET_KEY="your-secret-key"
157+158+# Optional: CDN/public URL for blob redirects
159+# When set, com.atproto.sync.getBlob redirects to this URL instead of proxying
160+COCOON_S3_CDN_URL="https://cdn.example.com"
161+```
162+163+**Blob Storage Options:**
164+- `COCOON_S3_BLOBSTORE_ENABLED=false` (default): Blobs stored in the database
165+- `COCOON_S3_BLOBSTORE_ENABLED=true`: Blobs stored in S3 bucket under `blobs/{did}/{cid}`
166+167+**Blob Serving Options:**
168+- Without `COCOON_S3_CDN_URL`: Blobs are proxied through the PDS server
169+- With `COCOON_S3_CDN_URL`: `getBlob` returns a 302 redirect to `{CDN_URL}/blobs/{did}/{cid}`
170+171+> **Tip**: For Cloudflare R2, you can use the public bucket URL as the CDN URL. For AWS S3, you can use CloudFront or the S3 bucket URL directly if public access is enabled.
172+173+### Management Commands
174+175+Create an invite code:
176+```bash
177+docker exec cocoon-pds /cocoon create-invite-code --uses 1
178+```
179+180+Reset a user's password:
181+```bash
182+docker exec cocoon-pds /cocoon reset-password --did "did:plc:xxx"
183+```
184+185+### Updating
186+187+```bash
188+docker-compose pull
189+docker-compose up -d
190+```
191+192## Implemented Endpoints
193194> [!NOTE]
···196197### Identity
198199+- [x] `com.atproto.identity.getRecommendedDidCredentials`
200+- [x] `com.atproto.identity.requestPlcOperationSignature`
201- [x] `com.atproto.identity.resolveHandle`
202+- [x] `com.atproto.identity.signPlcOperation`
203+- [x] `com.atproto.identity.submitPlcOperation`
204- [x] `com.atproto.identity.updateHandle`
205206### Repo
···211- [x] `com.atproto.repo.deleteRecord`
212- [x] `com.atproto.repo.describeRepo`
213- [x] `com.atproto.repo.getRecord`
214+- [x] `com.atproto.repo.importRepo` (Works "okay". Use with extreme caution.)
215- [x] `com.atproto.repo.listRecords`
216+- [x] `com.atproto.repo.listMissingBlobs`
217218### Server
219220+- [x] `com.atproto.server.activateAccount`
221- [x] `com.atproto.server.checkAccountStatus`
222- [x] `com.atproto.server.confirmEmail`
223- [x] `com.atproto.server.createAccount`
224- [x] `com.atproto.server.createInviteCode`
225- [x] `com.atproto.server.createInviteCodes`
226+- [x] `com.atproto.server.deactivateAccount`
227+- [x] `com.atproto.server.deleteAccount`
228- [x] `com.atproto.server.deleteSession`
229- [x] `com.atproto.server.describeServer`
230- [ ] `com.atproto.server.getAccountInviteCodes`
231+- [x] `com.atproto.server.getServiceAuth`
232- ~~[ ] `com.atproto.server.listAppPasswords`~~ - not going to add app passwords
233- [x] `com.atproto.server.refreshSession`
234+- [x] `com.atproto.server.requestAccountDelete`
235- [x] `com.atproto.server.requestEmailConfirmation`
236- [x] `com.atproto.server.requestEmailUpdate`
237- [x] `com.atproto.server.requestPasswordReset`
238+- [x] `com.atproto.server.reserveSigningKey`
239- [x] `com.atproto.server.resetPassword`
240- ~~[] `com.atproto.server.revokeAppPassword`~~ - not going to add app passwords
241- [x] `com.atproto.server.updateEmail`
···256257### Other
258259+- [x] `com.atproto.label.queryLabels`
260- [x] `com.atproto.moderation.createReport` (Note: this should be handled by proxying, not actually implemented in the PDS)
261- [x] `app.bsky.actor.getPreferences`
262- [x] `app.bsky.actor.putPreferences`
···1package db
23import (
4- "sync"
56 "gorm.io/gorm"
7 "gorm.io/gorm/clause"
···910type DB struct {
11 cli *gorm.DB
12- mu sync.Mutex
13}
1415func NewDB(cli *gorm.DB) *DB {
16 return &DB{
17 cli: cli,
18- mu: sync.Mutex{},
19 }
20}
2122-func (db *DB) Create(value any, clauses []clause.Expression) *gorm.DB {
23- db.mu.Lock()
24- defer db.mu.Unlock()
25- return db.cli.Clauses(clauses...).Create(value)
0026}
2728-func (db *DB) Exec(sql string, clauses []clause.Expression, values ...any) *gorm.DB {
29- db.mu.Lock()
30- defer db.mu.Unlock()
31- return db.cli.Clauses(clauses...).Exec(sql, values...)
32}
3334-func (db *DB) Raw(sql string, clauses []clause.Expression, values ...any) *gorm.DB {
35- return db.cli.Clauses(clauses...).Raw(sql, values...)
36}
3738func (db *DB) AutoMigrate(models ...any) error {
39 return db.cli.AutoMigrate(models...)
40}
4142-func (db *DB) Delete(value any, clauses []clause.Expression) *gorm.DB {
43- db.mu.Lock()
44- defer db.mu.Unlock()
45- return db.cli.Clauses(clauses...).Delete(value)
46-}
47-48-func (db *DB) First(dest any, conds ...any) *gorm.DB {
49- return db.cli.First(dest, conds...)
50}
5152-// TODO: this isn't actually good. we can commit even if the db is locked here. this is probably okay for the time being, but need to figure
53-// out a better solution. right now we only do this whenever we're importing a repo though so i'm mostly not worried, but it's still bad.
54-// e.g. when we do apply writes we should also be using a transcation but we don't right now
55-func (db *DB) BeginDangerously() *gorm.DB {
56- return db.cli.Begin()
57}
5859-func (db *DB) Lock() {
60- db.mu.Lock()
61}
6263-func (db *DB) Unlock() {
64- db.mu.Unlock()
65}
···10// This is kinda lame. Not great to implement app.bsky in the pds, but alas
1112func (s *Server) handleActorPutPreferences(e echo.Context) error {
0013 repo := e.Get("repo").(*models.RepoActor)
1415 var prefs map[string]any
···22 return err
23 }
2425- if err := s.db.Exec("UPDATE repos SET preferences = ? WHERE did = ?", nil, b, repo.Repo.Did).Error; err != nil {
26 return err
27 }
28
···10// This is kinda lame. Not great to implement app.bsky in the pds, but alas
1112func (s *Server) handleActorPutPreferences(e echo.Context) error {
13+ ctx := e.Request().Context()
14+15 repo := e.Get("repo").(*models.RepoActor)
1617 var prefs map[string]any
···24 return err
25 }
2627+ if err := s.db.Exec(ctx, "UPDATE repos SET preferences = ? WHERE did = ?", nil, b, repo.Repo.Did).Error; err != nil {
28 return err
29 }
30
···1011 "github.com/Azure/go-autorest/autorest/to"
12 "github.com/bluesky-social/indigo/api/atproto"
13+ "github.com/bluesky-social/indigo/atproto/atdata"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/carstore"
16 "github.com/bluesky-social/indigo/events"
17 lexutil "github.com/bluesky-social/indigo/lex/util"
18 "github.com/bluesky-social/indigo/repo"
0019 "github.com/haileyok/cocoon/internal/db"
20+ "github.com/haileyok/cocoon/metrics"
21 "github.com/haileyok/cocoon/models"
22+ "github.com/haileyok/cocoon/recording_blockstore"
23 blocks "github.com/ipfs/go-block-format"
24 "github.com/ipfs/go-cid"
25 cbor "github.com/ipfs/go-ipld-cbor"
···73}
7475func (mm *MarshalableMap) MarshalCBOR(w io.Writer) error {
76+ data, err := atdata.MarshalCBOR(*mm)
77 if err != nil {
78 return err
79 }
···97}
9899// TODO make use of swap commit
100+func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
101 rootcid, err := cid.Cast(urepo.Root)
102 if err != nil {
103 return nil, err
104 }
105106+ dbs := rm.s.getBlockstore(urepo.Did)
107+ bs := recording_blockstore.New(dbs)
108+ r, err := repo.OpenRepo(ctx, bs, rootcid)
1090110 var results []ApplyWriteResult
111112+ entries := make([]models.Record, 0, len(writes))
113 for i, op := range writes {
114+ // updates or deletes must supply an rkey
115 if op.Type != OpTypeCreate && op.Rkey == nil {
116 return nil, fmt.Errorf("invalid rkey")
117 } else if op.Type == OpTypeCreate && op.Rkey != nil {
118+ // we should conver this op to an update if the rkey already exists
119+ _, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey))
120 if err == nil {
121 op.Type = OpTypeUpdate
122 }
123 } else if op.Rkey == nil {
124+ // creates that don't supply an rkey will have one generated for them
125 op.Rkey = to.StringPtr(rm.clock.Next().String())
126 writes[i].Rkey = op.Rkey
127 }
128129+ // validate the record key is actually valid
130 _, err := syntax.ParseRecordKey(*op.Rkey)
131 if err != nil {
132 return nil, err
···134135 switch op.Type {
136 case OpTypeCreate:
137+ // HACK: this fixes some type conversions, mainly around integers
138+ // first we convert to json bytes
139+ b, err := json.Marshal(*op.Record)
140 if err != nil {
141 return nil, err
142 }
143+ // then we use atdata.UnmarshalJSON to convert it back to a map
144+ out, err := atdata.UnmarshalJSON(b)
145 if err != nil {
146 return nil, err
147 }
148+ // finally we can cast to a MarshalableMap
149 mm := MarshalableMap(out)
150+151+ // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection
152+ // i forget why this is actually necessary?
153+ if mm["$type"] == "" {
154+ mm["$type"] = op.Collection
155+ }
156+157+ nc, err := r.PutRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm)
158 if err != nil {
159 return nil, err
160 }
161+162+ d, err := atdata.MarshalCBOR(mm)
163 if err != nil {
164 return nil, err
165 }
166+167 entries = append(entries, models.Record{
168 Did: urepo.Did,
169 CreatedAt: rm.clock.Next().String(),
···172 Cid: nc.String(),
173 Value: d,
174 })
175+176 results = append(results, ApplyWriteResult{
177 Type: to.StringPtr(OpTypeCreate.String()),
178 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
···180 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
181 })
182 case OpTypeDelete:
183+ // try to find the old record in the database
184 var old models.Record
185+ if err := rm.db.Raw(ctx, "SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil {
186 return nil, err
187 }
188+189+ // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we
190+ // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical
191+ // when reading this code. i dont feel like fixing right now though so
192 entries = append(entries, models.Record{
193 Did: urepo.Did,
194 Nsid: op.Collection,
195 Rkey: *op.Rkey,
196 Value: old.Value,
197 })
198+199+ // delete the record from the repo
200+ err := r.DeleteRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey))
201 if err != nil {
202 return nil, err
203 }
204+205+ // add a result for the delete
206 results = append(results, ApplyWriteResult{
207 Type: to.StringPtr(OpTypeDelete.String()),
208 })
209 case OpTypeUpdate:
210+ // HACK: same hack as above for type fixes
211+ b, err := json.Marshal(*op.Record)
212 if err != nil {
213 return nil, err
214 }
215+ out, err := atdata.UnmarshalJSON(b)
216 if err != nil {
217 return nil, err
218 }
219 mm := MarshalableMap(out)
220+221+ nc, err := r.UpdateRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm)
222 if err != nil {
223 return nil, err
224 }
225+226+ d, err := atdata.MarshalCBOR(mm)
227 if err != nil {
228 return nil, err
229 }
230+231 entries = append(entries, models.Record{
232 Did: urepo.Did,
233 CreatedAt: rm.clock.Next().String(),
···236 Cid: nc.String(),
237 Value: d,
238 })
239+240 results = append(results, ApplyWriteResult{
241 Type: to.StringPtr(OpTypeUpdate.String()),
242 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
···246 }
247 }
248249+ // commit and get the new root
250+ newroot, rev, err := r.Commit(ctx, urepo.SignFor)
251 if err != nil {
252 return nil, err
253 }
254255+ for _, result := range results {
256+ if result.Type != nil {
257+ metrics.RepoOperations.WithLabelValues(*result.Type).Inc()
258+ }
259+ }
260+261+ // create a buffer for dumping our new cbor into
262 buf := new(bytes.Buffer)
263264+ // first write the car header to the buffer
265 hb, err := cbor.DumpObject(&car.CarHeader{
266 Roots: []cid.Cid{newroot},
267 Version: 1,
268 })
0269 if _, err := carstore.LdWrite(buf, hb); err != nil {
270 return nil, err
271 }
272273+ // get a diff of the changes to the repo
274+ diffops, err := r.DiffSince(ctx, rootcid)
275 if err != nil {
276 return nil, err
277 }
278279+ // create the repo ops for the given diff
280 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
0281 for _, op := range diffops {
282 var c cid.Cid
283 switch op.Op {
···306 })
307 }
308309+ blk, err := dbs.Get(ctx, c)
310 if err != nil {
311 return nil, err
312 }
313314+ // write the block to the buffer
315 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
316 return nil, err
317 }
318 }
319320+ // write the writelog to the buffer
321+ for _, op := range bs.GetWriteLog() {
322 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
323 return nil, err
324 }
325 }
326327+ // blob blob blob blob blob :3
328 var blobs []lexutil.LexLink
329 for _, entry := range entries {
330 var cids []cid.Cid
331+ // whenever there is cid present, we know it's a create (dumb)
332 if entry.Cid != "" {
333+ if err := rm.s.db.Create(ctx, &entry, []clause.Expression{clause.OnConflict{
334 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
335 UpdateAll: true,
336 }}).Error; err != nil {
337 return nil, err
338 }
339340+ // increment the given blob refs, yay
341+ cids, err = rm.incrementBlobRefs(ctx, urepo, entry.Value)
342 if err != nil {
343 return nil, err
344 }
345 } else {
346+ // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey
347+ // is did + collection + rkey. i still really want to separate that out, or use a different type to make
348+ // this less confusing/easy to read. alas, its 2 am and yea no
349+ if err := rm.s.db.Delete(ctx, &entry, nil).Error; err != nil {
350 return nil, err
351 }
352+353+ // TODO:
354+ cids, err = rm.decrementBlobRefs(ctx, urepo, entry.Value)
355 if err != nil {
356 return nil, err
357 }
358 }
359360+ // add all the relevant blobs to the blobs list of blobs. blob ^.^
361 for _, c := range cids {
362 blobs = append(blobs, lexutil.LexLink(c))
363 }
364 }
365366+ // NOTE: using the request ctx seems a bit suss here, so using a background context. i'm not sure if this
367+ // runs sync or not
368+ rm.s.evtman.AddEvent(context.Background(), &events.XRPCStreamEvent{
369 RepoCommit: &atproto.SyncSubscribeRepos_Commit{
370 Repo: urepo.Did,
371 Blocks: buf.Bytes(),
···379 },
380 })
381382+ if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil {
383 return nil, err
384 }
385···394 return results, nil
395}
396397+// this is a fun little guy. to get a proof, we need to read the record out of the blockstore and record how we actually
398+// got to the guy. we'll wrap a new blockstore in a recording blockstore, then return the log for proof
399+func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
400 c, err := cid.Cast(urepo.Root)
401 if err != nil {
402 return cid.Undef, nil, err
403 }
404405+ dbs := rm.s.getBlockstore(urepo.Did)
406+ bs := recording_blockstore.New(dbs)
407408+ r, err := repo.OpenRepo(ctx, bs, c)
409 if err != nil {
410 return cid.Undef, nil, err
411 }
412413+ _, _, err = r.GetRecordBytes(ctx, fmt.Sprintf("%s/%s", collection, rkey))
414 if err != nil {
415 return cid.Undef, nil, err
416 }
417418+ return c, bs.GetReadLog(), nil
419}
420421+func (rm *RepoMan) incrementBlobRefs(ctx context.Context, urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
422 cids, err := getBlobCidsFromCbor(cbor)
423 if err != nil {
424 return nil, err
425 }
426427 for _, c := range cids {
428+ if err := rm.db.Exec(ctx, "UPDATE blobs SET ref_count = ref_count + 1 WHERE did = ? AND cid = ?", nil, urepo.Did, c.Bytes()).Error; err != nil {
429 return nil, err
430 }
431 }
···433 return cids, nil
434}
435436+func (rm *RepoMan) decrementBlobRefs(ctx context.Context, urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
437 cids, err := getBlobCidsFromCbor(cbor)
438 if err != nil {
439 return nil, err
···444 ID uint
445 Count int
446 }
447+ if err := rm.db.Raw(ctx, "UPDATE blobs SET ref_count = ref_count - 1 WHERE did = ? AND cid = ? RETURNING id, ref_count", nil, urepo.Did, c.Bytes()).Scan(&res).Error; err != nil {
448 return nil, err
449 }
450451+ // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what
452+ // storage it is in, and clean up s3!!!!
453 if res.Count == 0 {
454+ if err := rm.db.Exec(ctx, "DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil {
455 return nil, err
456 }
457+ if err := rm.db.Exec(ctx, "DELETE FROM blob_parts WHERE blob_id = ?", nil, res.ID).Error; err != nil {
458 return nil, err
459 }
460 }
···468func getBlobCidsFromCbor(cbor []byte) ([]cid.Cid, error) {
469 var cids []cid.Cid
470471+ decoded, err := atdata.UnmarshalCBOR(cbor)
472 if err != nil {
473 return nil, fmt.Errorf("error unmarshaling cbor: %w", err)
474 }
475476+ var deepiter func(any) error
477+ deepiter = func(item any) error {
478 switch val := item.(type) {
479+ case map[string]any:
480 if val["$type"] == "blob" {
481 if ref, ok := val["ref"].(string); ok {
482 c, err := cid.Parse(ref)
···489 return deepiter(v)
490 }
491 }
492+ case []any:
493 for _, v := range val {
494 deepiter(v)
495 }