From a8fa5c561e77a71b21b6acd17fd51eaff6a601de Mon Sep 17 00:00:00 2001 From: Hailey Date: Tue, 30 Dec 2025 02:23:48 -0800 Subject: [PATCH] cleanup write records --- go.mod | 2 +- server/handle_repo_apply_writes.go | 16 +++--- server/handle_repo_create_record.go | 8 +-- server/handle_repo_delete_record.go | 8 +-- server/handle_repo_put_record.go | 8 +-- server/handle_sync_get_record.go | 4 +- server/repo.go | 82 ++++++++++++++++++++++------- 7 files changed, 92 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 72ab840..e52d758 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b golang.org/x/crypto v0.38.0 + gorm.io/driver/postgres v1.5.7 gorm.io/driver/sqlite v1.5.7 gorm.io/gorm v1.25.12 ) @@ -130,6 +131,5 @@ require ( google.golang.org/protobuf v1.36.6 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gorm.io/driver/postgres v1.5.7 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/server/handle_repo_apply_writes.go b/server/handle_repo_apply_writes.go index fdcab73..db8f760 100644 --- a/server/handle_repo_apply_writes.go +++ b/server/handle_repo_apply_writes.go @@ -6,7 +6,7 @@ import ( "github.com/labstack/echo/v4" ) -type ComAtprotoRepoApplyWritesRequest struct { +type ComAtprotoRepoApplyWritesInput struct { Repo string `json:"repo" validate:"required,atproto-did"` Validate *bool `json:"bool,omitempty"` Writes []ComAtprotoRepoApplyWritesItem `json:"writes"` @@ -20,15 +20,15 @@ type ComAtprotoRepoApplyWritesItem struct { Value *MarshalableMap `json:"value,omitempty"` } -type ComAtprotoRepoApplyWritesResponse struct { +type ComAtprotoRepoApplyWritesOutput struct { Commit RepoCommit `json:"commit"` Results []ApplyWriteResult `json:"results"` } func (s *Server) handleApplyWrites(e echo.Context) error { - repo := e.Get("repo").(*models.RepoActor) + ctx := e.Request().Context() - var req ComAtprotoRepoApplyWritesRequest + var req ComAtprotoRepoApplyWritesInput if err := e.Bind(&req); err != nil { s.logger.Error("error binding", "error", err) return helpers.ServerError(e, nil) @@ -39,12 +39,14 @@ func (s *Server) handleApplyWrites(e echo.Context) error { return helpers.InputError(e, nil) } + repo := e.Get("repo").(*models.RepoActor) + if repo.Repo.Did != req.Repo { s.logger.Warn("mismatched repo/auth") return helpers.InputError(e, nil) } - ops := []Op{} + ops := make([]Op, 0, len(req.Writes)) for _, item := range req.Writes { ops = append(ops, Op{ Type: OpType(item.Type), @@ -54,7 +56,7 @@ func (s *Server) handleApplyWrites(e echo.Context) error { }) } - results, err := s.repoman.applyWrites(repo.Repo, ops, req.SwapCommit) + results, err := s.repoman.applyWrites(ctx, repo.Repo, ops, req.SwapCommit) if err != nil { s.logger.Error("error applying writes", "error", err) return helpers.ServerError(e, nil) @@ -66,7 +68,7 @@ func (s *Server) handleApplyWrites(e echo.Context) error { results[i].Commit = nil } - return e.JSON(200, ComAtprotoRepoApplyWritesResponse{ + return e.JSON(200, ComAtprotoRepoApplyWritesOutput{ Commit: commit, Results: results, }) diff --git a/server/handle_repo_create_record.go b/server/handle_repo_create_record.go index 7726435..5c86c50 100644 --- a/server/handle_repo_create_record.go +++ b/server/handle_repo_create_record.go @@ -6,7 +6,7 @@ import ( "github.com/labstack/echo/v4" ) -type ComAtprotoRepoCreateRecordRequest struct { +type ComAtprotoRepoCreateRecordInput struct { Repo string `json:"repo" validate:"required,atproto-did"` Collection string `json:"collection" validate:"required,atproto-nsid"` Rkey *string `json:"rkey,omitempty"` @@ -17,9 +17,11 @@ type ComAtprotoRepoCreateRecordRequest struct { } func (s *Server) handleCreateRecord(e echo.Context) error { + ctx := e.Request().Context() + repo := e.Get("repo").(*models.RepoActor) - var req ComAtprotoRepoCreateRecordRequest + var req ComAtprotoRepoCreateRecordInput if err := e.Bind(&req); err != nil { s.logger.Error("error binding", "error", err) return helpers.ServerError(e, nil) @@ -40,7 +42,7 @@ func (s *Server) handleCreateRecord(e echo.Context) error { optype = OpTypeUpdate } - results, err := s.repoman.applyWrites(repo.Repo, []Op{ + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ { Type: optype, Collection: req.Collection, diff --git a/server/handle_repo_delete_record.go b/server/handle_repo_delete_record.go index e6b22b8..ecc22fc 100644 --- a/server/handle_repo_delete_record.go +++ b/server/handle_repo_delete_record.go @@ -6,7 +6,7 @@ import ( "github.com/labstack/echo/v4" ) -type ComAtprotoRepoDeleteRecordRequest struct { +type ComAtprotoRepoDeleteRecordInput struct { Repo string `json:"repo" validate:"required,atproto-did"` Collection string `json:"collection" validate:"required,atproto-nsid"` Rkey string `json:"rkey" validate:"required,atproto-rkey"` @@ -15,9 +15,11 @@ type ComAtprotoRepoDeleteRecordRequest struct { } func (s *Server) handleDeleteRecord(e echo.Context) error { + ctx := e.Request().Context() + repo := e.Get("repo").(*models.RepoActor) - var req ComAtprotoRepoDeleteRecordRequest + var req ComAtprotoRepoDeleteRecordInput if err := e.Bind(&req); err != nil { s.logger.Error("error binding", "error", err) return helpers.ServerError(e, nil) @@ -33,7 +35,7 @@ func (s *Server) handleDeleteRecord(e echo.Context) error { return helpers.InputError(e, nil) } - results, err := s.repoman.applyWrites(repo.Repo, []Op{ + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ { Type: OpTypeDelete, Collection: req.Collection, diff --git a/server/handle_repo_put_record.go b/server/handle_repo_put_record.go index f5138f3..12ec7ad 100644 --- a/server/handle_repo_put_record.go +++ b/server/handle_repo_put_record.go @@ -6,7 +6,7 @@ import ( "github.com/labstack/echo/v4" ) -type ComAtprotoRepoPutRecordRequest struct { +type ComAtprotoRepoPutRecordInput struct { Repo string `json:"repo" validate:"required,atproto-did"` Collection string `json:"collection" validate:"required,atproto-nsid"` Rkey string `json:"rkey" validate:"required,atproto-rkey"` @@ -17,9 +17,11 @@ type ComAtprotoRepoPutRecordRequest struct { } func (s *Server) handlePutRecord(e echo.Context) error { + ctx := e.Request().Context() + repo := e.Get("repo").(*models.RepoActor) - var req ComAtprotoRepoPutRecordRequest + var req ComAtprotoRepoPutRecordInput if err := e.Bind(&req); err != nil { s.logger.Error("error binding", "error", err) return helpers.ServerError(e, nil) @@ -40,7 +42,7 @@ func (s *Server) handlePutRecord(e echo.Context) error { optype = OpTypeUpdate } - results, err := s.repoman.applyWrites(repo.Repo, []Op{ + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ { Type: optype, Collection: req.Collection, diff --git a/server/handle_sync_get_record.go b/server/handle_sync_get_record.go index 13621ad..c64e62b 100644 --- a/server/handle_sync_get_record.go +++ b/server/handle_sync_get_record.go @@ -13,6 +13,8 @@ import ( ) func (s *Server) handleSyncGetRecord(e echo.Context) error { + ctx := e.Request().Context() + did := e.QueryParam("did") collection := e.QueryParam("collection") rkey := e.QueryParam("rkey") @@ -23,7 +25,7 @@ func (s *Server) handleSyncGetRecord(e echo.Context) error { return helpers.ServerError(e, nil) } - root, blocks, err := s.repoman.getRecordProof(urepo, collection, rkey) + root, blocks, err := s.repoman.getRecordProof(ctx, urepo, collection, rkey) if err != nil { return err } diff --git a/server/repo.go b/server/repo.go index 2fb8578..e9eb9f8 100644 --- a/server/repo.go +++ b/server/repo.go @@ -96,7 +96,7 @@ type RepoCommit struct { } // TODO make use of swap commit -func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { +func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { rootcid, err := cid.Cast(urepo.Root) if err != nil { return nil, err @@ -106,22 +106,26 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin bs := recording_blockstore.New(dbs) r, err := repo.OpenRepo(context.TODO(), bs, rootcid) - entries := []models.Record{} var results []ApplyWriteResult + entries := make([]models.Record, 0, len(writes)) for i, op := range writes { + // updates or deletes must supply an rkey if op.Type != OpTypeCreate && op.Rkey == nil { return nil, fmt.Errorf("invalid rkey") } else if op.Type == OpTypeCreate && op.Rkey != nil { - _, _, err := r.GetRecord(context.TODO(), op.Collection+"/"+*op.Rkey) + // we should conver this op to an update if the rkey already exists + _, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey)) if err == nil { op.Type = OpTypeUpdate } } else if op.Rkey == nil { + // creates that don't supply an rkey will have one generated for them op.Rkey = to.StringPtr(rm.clock.Next().String()) writes[i].Rkey = op.Rkey } + // validate the record key is actually valid _, err := syntax.ParseRecordKey(*op.Rkey) if err != nil { return nil, err @@ -129,29 +133,36 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin switch op.Type { case OpTypeCreate: - j, err := json.Marshal(*op.Record) + // HACK: this fixes some type conversions, mainly around integers + // first we convert to json bytes + b, err := json.Marshal(*op.Record) if err != nil { return nil, err } - out, err := atdata.UnmarshalJSON(j) + // then we use atdata.UnmarshalJSON to convert it back to a map + out, err := atdata.UnmarshalJSON(b) if err != nil { return nil, err } + // finally we can cast to a MarshalableMap mm := MarshalableMap(out) // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection + // i forget why this is actually necessary? if mm["$type"] == "" { mm["$type"] = op.Collection } - nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm) + nc, err := r.PutRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm) if err != nil { return nil, err } + d, err := atdata.MarshalCBOR(mm) if err != nil { return nil, err } + entries = append(entries, models.Record{ Did: urepo.Did, CreatedAt: rm.clock.Next().String(), @@ -160,6 +171,7 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin Cid: nc.String(), Value: d, }) + results = append(results, ApplyWriteResult{ Type: to.StringPtr(OpTypeCreate.String()), Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), @@ -167,41 +179,54 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol }) case OpTypeDelete: + // try to find the old record in the database var old models.Record if err := rm.db.Raw("SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil { return nil, err } + + // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we + // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical + // when reading this code. i dont feel like fixing right now though so entries = append(entries, models.Record{ Did: urepo.Did, Nsid: op.Collection, Rkey: *op.Rkey, Value: old.Value, }) + + // delete the record from the repo err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey) if err != nil { return nil, err } + + // add a result for the delete results = append(results, ApplyWriteResult{ Type: to.StringPtr(OpTypeDelete.String()), }) case OpTypeUpdate: - j, err := json.Marshal(*op.Record) + // HACK: same hack as above for type fixes + b, err := json.Marshal(*op.Record) if err != nil { return nil, err } - out, err := atdata.UnmarshalJSON(j) + out, err := atdata.UnmarshalJSON(b) if err != nil { return nil, err } mm := MarshalableMap(out) - nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm) + + nc, err := r.UpdateRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm) if err != nil { return nil, err } + d, err := atdata.MarshalCBOR(mm) if err != nil { return nil, err } + entries = append(entries, models.Record{ Did: urepo.Did, CreatedAt: rm.clock.Next().String(), @@ -210,6 +235,7 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin Cid: nc.String(), Value: d, }) + results = append(results, ApplyWriteResult{ Type: to.StringPtr(OpTypeUpdate.String()), Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), @@ -219,29 +245,32 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin } } - newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor) + // commit and get the new root + newroot, rev, err := r.Commit(ctx, urepo.SignFor) if err != nil { return nil, err } + // create a buffer for dumping our new cbor into buf := new(bytes.Buffer) + // first write the car header to the buffer hb, err := cbor.DumpObject(&car.CarHeader{ Roots: []cid.Cid{newroot}, Version: 1, }) - if _, err := carstore.LdWrite(buf, hb); err != nil { return nil, err } + // get a diff of the changes to the repo diffops, err := r.DiffSince(context.TODO(), rootcid) if err != nil { return nil, err } + // create the repo ops for the given diff ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops)) - for _, op := range diffops { var c cid.Cid switch op.Op { @@ -270,25 +299,29 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin }) } - blk, err := dbs.Get(context.TODO(), c) + blk, err := dbs.Get(ctx, c) if err != nil { return nil, err } + // write the block to the buffer if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { return nil, err } } + // write the writelog to the buffer for _, op := range bs.GetWriteLog() { if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { return nil, err } } + // blob blob blob blob blob :3 var blobs []lexutil.LexLink for _, entry := range entries { var cids []cid.Cid + // whenever there is cid present, we know it's a create (dumb) if entry.Cid != "" { if err := rm.s.db.Create(&entry, []clause.Expression{clause.OnConflict{ Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}}, @@ -297,26 +330,35 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin return nil, err } + // increment the given blob refs, yay cids, err = rm.incrementBlobRefs(urepo, entry.Value) if err != nil { return nil, err } } else { + // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey + // is did + collection + rkey. i still really want to separate that out, or use a different type to make + // this less confusing/easy to read. alas, its 2 am and yea no if err := rm.s.db.Delete(&entry, nil).Error; err != nil { return nil, err } + + // TODO: cids, err = rm.decrementBlobRefs(urepo, entry.Value) if err != nil { return nil, err } } + // add all the relevant blobs to the blobs list of blobs. blob ^.^ for _, c := range cids { blobs = append(blobs, lexutil.LexLink(c)) } } - rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{ + // NOTE: using the request ctx seems a bit suss here, so using a background context. i'm not sure if this + // runs sync or not + rm.s.evtman.AddEvent(context.Background(), &events.XRPCStreamEvent{ RepoCommit: &atproto.SyncSubscribeRepos_Commit{ Repo: urepo.Did, Blocks: buf.Bytes(), @@ -330,7 +372,7 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin }, }) - if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil { + if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil { return nil, err } @@ -345,7 +387,9 @@ func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *strin return results, nil } -func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { +// this is a fun little guy. to get a proof, we need to read the record out of the blockstore and record how we actually +// got to the guy. we'll wrap a new blockstore in a recording blockstore, then return the log for proof +func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { c, err := cid.Cast(urepo.Root) if err != nil { return cid.Undef, nil, err @@ -354,12 +398,12 @@ func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (c dbs := rm.s.getBlockstore(urepo.Did) bs := recording_blockstore.New(dbs) - r, err := repo.OpenRepo(context.TODO(), bs, c) + r, err := repo.OpenRepo(ctx, bs, c) if err != nil { return cid.Undef, nil, err } - _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey) + _, _, err = r.GetRecordBytes(ctx, fmt.Sprintf("%s/%s", collection, rkey)) if err != nil { return cid.Undef, nil, err } @@ -397,6 +441,8 @@ func (rm *RepoMan) decrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, return nil, err } + // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what + // storage it is in, and clean up s3!!!! if res.Count == 0 { if err := rm.db.Exec("DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil { return nil, err -- 2.43.0 From d48a06b2140fb5e446d8be485b0006e5d1709239 Mon Sep 17 00:00:00 2001 From: Hailey Date: Tue, 30 Dec 2025 02:28:17 -0800 Subject: [PATCH] nits --- server/repo.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/repo.go b/server/repo.go index e9eb9f8..5d8f7a4 100644 --- a/server/repo.go +++ b/server/repo.go @@ -104,7 +104,7 @@ func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes [] dbs := rm.s.getBlockstore(urepo.Did) bs := recording_blockstore.New(dbs) - r, err := repo.OpenRepo(context.TODO(), bs, rootcid) + r, err := repo.OpenRepo(ctx, bs, rootcid) var results []ApplyWriteResult @@ -196,7 +196,7 @@ func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes [] }) // delete the record from the repo - err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey) + err := r.DeleteRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey)) if err != nil { return nil, err } @@ -264,7 +264,7 @@ func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes [] } // get a diff of the changes to the repo - diffops, err := r.DiffSince(context.TODO(), rootcid) + diffops, err := r.DiffSince(ctx, rootcid) if err != nil { return nil, err } -- 2.43.0