tidy: cleanup write records #1

merged
opened by hailey.at targeting main from hailey/cleanup-write-records

went through and did a bunch of things

  • propagated contexts and got rid of a bunch of context.TODO()
  • identified a few things that are missing or rather gross atm and we should go back and fix later
  • cleaned up a few other places that i didnt like...
+1 -1
go.mod
··· 31 github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e 32 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b 33 golang.org/x/crypto v0.38.0 34 gorm.io/driver/sqlite v1.5.7 35 gorm.io/gorm v1.25.12 36 ) ··· 130 google.golang.org/protobuf v1.36.6 // indirect 131 gopkg.in/go-playground/assert.v1 v1.2.1 // indirect 132 gopkg.in/inf.v0 v0.9.1 // indirect 133 - gorm.io/driver/postgres v1.5.7 // indirect 134 lukechampine.com/blake3 v1.2.1 // indirect 135 )
··· 31 github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e 32 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b 33 golang.org/x/crypto v0.38.0 34 + gorm.io/driver/postgres v1.5.7 35 gorm.io/driver/sqlite v1.5.7 36 gorm.io/gorm v1.25.12 37 ) ··· 131 google.golang.org/protobuf v1.36.6 // indirect 132 gopkg.in/go-playground/assert.v1 v1.2.1 // indirect 133 gopkg.in/inf.v0 v0.9.1 // indirect 134 lukechampine.com/blake3 v1.2.1 // indirect 135 )
+9 -7
server/handle_repo_apply_writes.go
··· 6 "github.com/labstack/echo/v4" 7 ) 8 9 - type ComAtprotoRepoApplyWritesRequest struct { 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 Validate *bool `json:"bool,omitempty"` 12 Writes []ComAtprotoRepoApplyWritesItem `json:"writes"` ··· 20 Value *MarshalableMap `json:"value,omitempty"` 21 } 22 23 - type ComAtprotoRepoApplyWritesResponse struct { 24 Commit RepoCommit `json:"commit"` 25 Results []ApplyWriteResult `json:"results"` 26 } 27 28 func (s *Server) handleApplyWrites(e echo.Context) error { 29 - repo := e.Get("repo").(*models.RepoActor) 30 31 - var req ComAtprotoRepoApplyWritesRequest 32 if err := e.Bind(&req); err != nil { 33 s.logger.Error("error binding", "error", err) 34 return helpers.ServerError(e, nil) ··· 39 return helpers.InputError(e, nil) 40 } 41 42 if repo.Repo.Did != req.Repo { 43 s.logger.Warn("mismatched repo/auth") 44 return helpers.InputError(e, nil) 45 } 46 47 - ops := []Op{} 48 for _, item := range req.Writes { 49 ops = append(ops, Op{ 50 Type: OpType(item.Type), ··· 54 }) 55 } 56 57 - results, err := s.repoman.applyWrites(repo.Repo, ops, req.SwapCommit) 58 if err != nil { 59 s.logger.Error("error applying writes", "error", err) 60 return helpers.ServerError(e, nil) ··· 66 results[i].Commit = nil 67 } 68 69 - return e.JSON(200, ComAtprotoRepoApplyWritesResponse{ 70 Commit: commit, 71 Results: results, 72 })
··· 6 "github.com/labstack/echo/v4" 7 ) 8 9 + type ComAtprotoRepoApplyWritesInput struct { 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 Validate *bool `json:"bool,omitempty"` 12 Writes []ComAtprotoRepoApplyWritesItem `json:"writes"` ··· 20 Value *MarshalableMap `json:"value,omitempty"` 21 } 22 23 + type ComAtprotoRepoApplyWritesOutput struct { 24 Commit RepoCommit `json:"commit"` 25 Results []ApplyWriteResult `json:"results"` 26 } 27 28 func (s *Server) handleApplyWrites(e echo.Context) error { 29 + ctx := e.Request().Context() 30 31 + var req ComAtprotoRepoApplyWritesInput 32 if err := e.Bind(&req); err != nil { 33 s.logger.Error("error binding", "error", err) 34 return helpers.ServerError(e, nil) ··· 39 return helpers.InputError(e, nil) 40 } 41 42 + repo := e.Get("repo").(*models.RepoActor) 43 + 44 if repo.Repo.Did != req.Repo { 45 s.logger.Warn("mismatched repo/auth") 46 return helpers.InputError(e, nil) 47 } 48 49 + ops := make([]Op, 0, len(req.Writes)) 50 for _, item := range req.Writes { 51 ops = append(ops, Op{ 52 Type: OpType(item.Type), ··· 56 }) 57 } 58 59 + results, err := s.repoman.applyWrites(ctx, repo.Repo, ops, req.SwapCommit) 60 if err != nil { 61 s.logger.Error("error applying writes", "error", err) 62 return helpers.ServerError(e, nil) ··· 68 results[i].Commit = nil 69 } 70 71 + return e.JSON(200, ComAtprotoRepoApplyWritesOutput{ 72 Commit: commit, 73 Results: results, 74 })
+5 -3
server/handle_repo_create_record.go
··· 6 "github.com/labstack/echo/v4" 7 ) 8 9 - type ComAtprotoRepoCreateRecordRequest struct { 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 Rkey *string `json:"rkey,omitempty"` ··· 17 } 18 19 func (s *Server) handleCreateRecord(e echo.Context) error { 20 repo := e.Get("repo").(*models.RepoActor) 21 22 - var req ComAtprotoRepoCreateRecordRequest 23 if err := e.Bind(&req); err != nil { 24 s.logger.Error("error binding", "error", err) 25 return helpers.ServerError(e, nil) ··· 40 optype = OpTypeUpdate 41 } 42 43 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 44 { 45 Type: optype, 46 Collection: req.Collection,
··· 6 "github.com/labstack/echo/v4" 7 ) 8 9 + type ComAtprotoRepoCreateRecordInput struct { 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 Rkey *string `json:"rkey,omitempty"` ··· 17 } 18 19 func (s *Server) handleCreateRecord(e echo.Context) error { 20 + ctx := e.Request().Context() 21 + 22 repo := e.Get("repo").(*models.RepoActor) 23 24 + var req ComAtprotoRepoCreateRecordInput 25 if err := e.Bind(&req); err != nil { 26 s.logger.Error("error binding", "error", err) 27 return helpers.ServerError(e, nil) ··· 42 optype = OpTypeUpdate 43 } 44 45 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 46 { 47 Type: optype, 48 Collection: req.Collection,
+5 -3
server/handle_repo_delete_record.go
··· 6 "github.com/labstack/echo/v4" 7 ) 8 9 - type ComAtprotoRepoDeleteRecordRequest struct { 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 Rkey string `json:"rkey" validate:"required,atproto-rkey"` ··· 15 } 16 17 func (s *Server) handleDeleteRecord(e echo.Context) error { 18 repo := e.Get("repo").(*models.RepoActor) 19 20 - var req ComAtprotoRepoDeleteRecordRequest 21 if err := e.Bind(&req); err != nil { 22 s.logger.Error("error binding", "error", err) 23 return helpers.ServerError(e, nil) ··· 33 return helpers.InputError(e, nil) 34 } 35 36 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 37 { 38 Type: OpTypeDelete, 39 Collection: req.Collection,
··· 6 "github.com/labstack/echo/v4" 7 ) 8 9 + type ComAtprotoRepoDeleteRecordInput struct { 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 Rkey string `json:"rkey" validate:"required,atproto-rkey"` ··· 15 } 16 17 func (s *Server) handleDeleteRecord(e echo.Context) error { 18 + ctx := e.Request().Context() 19 + 20 repo := e.Get("repo").(*models.RepoActor) 21 22 + var req ComAtprotoRepoDeleteRecordInput 23 if err := e.Bind(&req); err != nil { 24 s.logger.Error("error binding", "error", err) 25 return helpers.ServerError(e, nil) ··· 35 return helpers.InputError(e, nil) 36 } 37 38 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 39 { 40 Type: OpTypeDelete, 41 Collection: req.Collection,
+5 -3
server/handle_repo_put_record.go
··· 6 "github.com/labstack/echo/v4" 7 ) 8 9 - type ComAtprotoRepoPutRecordRequest struct { 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 Rkey string `json:"rkey" validate:"required,atproto-rkey"` ··· 17 } 18 19 func (s *Server) handlePutRecord(e echo.Context) error { 20 repo := e.Get("repo").(*models.RepoActor) 21 22 - var req ComAtprotoRepoPutRecordRequest 23 if err := e.Bind(&req); err != nil { 24 s.logger.Error("error binding", "error", err) 25 return helpers.ServerError(e, nil) ··· 40 optype = OpTypeUpdate 41 } 42 43 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 44 { 45 Type: optype, 46 Collection: req.Collection,
··· 6 "github.com/labstack/echo/v4" 7 ) 8 9 + type ComAtprotoRepoPutRecordInput struct { 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 Rkey string `json:"rkey" validate:"required,atproto-rkey"` ··· 17 } 18 19 func (s *Server) handlePutRecord(e echo.Context) error { 20 + ctx := e.Request().Context() 21 + 22 repo := e.Get("repo").(*models.RepoActor) 23 24 + var req ComAtprotoRepoPutRecordInput 25 if err := e.Bind(&req); err != nil { 26 s.logger.Error("error binding", "error", err) 27 return helpers.ServerError(e, nil) ··· 42 optype = OpTypeUpdate 43 } 44 45 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 46 { 47 Type: optype, 48 Collection: req.Collection,
+3 -1
server/handle_sync_get_record.go
··· 13 ) 14 15 func (s *Server) handleSyncGetRecord(e echo.Context) error { 16 did := e.QueryParam("did") 17 collection := e.QueryParam("collection") 18 rkey := e.QueryParam("rkey") ··· 23 return helpers.ServerError(e, nil) 24 } 25 26 - root, blocks, err := s.repoman.getRecordProof(urepo, collection, rkey) 27 if err != nil { 28 return err 29 }
··· 13 ) 14 15 func (s *Server) handleSyncGetRecord(e echo.Context) error { 16 + ctx := e.Request().Context() 17 + 18 did := e.QueryParam("did") 19 collection := e.QueryParam("collection") 20 rkey := e.QueryParam("rkey") ··· 25 return helpers.ServerError(e, nil) 26 } 27 28 + root, blocks, err := s.repoman.getRecordProof(ctx, urepo, collection, rkey) 29 if err != nil { 30 return err 31 }
+67 -21
server/repo.go
··· 96 } 97 98 // TODO make use of swap commit 99 - func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 100 rootcid, err := cid.Cast(urepo.Root) 101 if err != nil { 102 return nil, err ··· 104 105 dbs := rm.s.getBlockstore(urepo.Did) 106 bs := recording_blockstore.New(dbs) 107 - r, err := repo.OpenRepo(context.TODO(), bs, rootcid) 108 109 - entries := []models.Record{} 110 var results []ApplyWriteResult 111 112 for i, op := range writes { 113 if op.Type != OpTypeCreate && op.Rkey == nil { 114 return nil, fmt.Errorf("invalid rkey") 115 } else if op.Type == OpTypeCreate && op.Rkey != nil { 116 - _, _, err := r.GetRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 117 if err == nil { 118 op.Type = OpTypeUpdate 119 } 120 } else if op.Rkey == nil { 121 op.Rkey = to.StringPtr(rm.clock.Next().String()) 122 writes[i].Rkey = op.Rkey 123 } 124 125 _, err := syntax.ParseRecordKey(*op.Rkey) 126 if err != nil { 127 return nil, err ··· 129 130 switch op.Type { 131 case OpTypeCreate: 132 - j, err := json.Marshal(*op.Record) 133 if err != nil { 134 return nil, err 135 } 136 - out, err := atdata.UnmarshalJSON(j) 137 if err != nil { 138 return nil, err 139 } 140 mm := MarshalableMap(out) 141 142 // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection 143 if mm["$type"] == "" { 144 mm["$type"] = op.Collection 145 } 146 147 - nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm) 148 if err != nil { 149 return nil, err 150 } 151 d, err := atdata.MarshalCBOR(mm) 152 if err != nil { 153 return nil, err 154 } 155 entries = append(entries, models.Record{ 156 Did: urepo.Did, 157 CreatedAt: rm.clock.Next().String(), ··· 160 Cid: nc.String(), 161 Value: d, 162 }) 163 results = append(results, ApplyWriteResult{ 164 Type: to.StringPtr(OpTypeCreate.String()), 165 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), ··· 167 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 168 }) 169 case OpTypeDelete: 170 var old models.Record 171 if err := rm.db.Raw("SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil { 172 return nil, err 173 } 174 entries = append(entries, models.Record{ 175 Did: urepo.Did, 176 Nsid: op.Collection, 177 Rkey: *op.Rkey, 178 Value: old.Value, 179 }) 180 - err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 181 if err != nil { 182 return nil, err 183 } 184 results = append(results, ApplyWriteResult{ 185 Type: to.StringPtr(OpTypeDelete.String()), 186 }) 187 case OpTypeUpdate: 188 - j, err := json.Marshal(*op.Record) 189 if err != nil { 190 return nil, err 191 } 192 - out, err := atdata.UnmarshalJSON(j) 193 if err != nil { 194 return nil, err 195 } 196 mm := MarshalableMap(out) 197 - nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm) 198 if err != nil { 199 return nil, err 200 } 201 d, err := atdata.MarshalCBOR(mm) 202 if err != nil { 203 return nil, err 204 } 205 entries = append(entries, models.Record{ 206 Did: urepo.Did, 207 CreatedAt: rm.clock.Next().String(), ··· 210 Cid: nc.String(), 211 Value: d, 212 }) 213 results = append(results, ApplyWriteResult{ 214 Type: to.StringPtr(OpTypeUpdate.String()), 215 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), ··· 219 } 220 } 221 222 - newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor) 223 if err != nil { 224 return nil, err 225 } 226 227 buf := new(bytes.Buffer) 228 229 hb, err := cbor.DumpObject(&car.CarHeader{ 230 Roots: []cid.Cid{newroot}, 231 Version: 1, 232 }) 233 - 234 if _, err := carstore.LdWrite(buf, hb); err != nil { 235 return nil, err 236 } 237 238 - diffops, err := r.DiffSince(context.TODO(), rootcid) 239 if err != nil { 240 return nil, err 241 } 242 243 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops)) 244 - 245 for _, op := range diffops { 246 var c cid.Cid 247 switch op.Op { ··· 270 }) 271 } 272 273 - blk, err := dbs.Get(context.TODO(), c) 274 if err != nil { 275 return nil, err 276 } 277 278 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 279 return nil, err 280 } 281 } 282 283 for _, op := range bs.GetWriteLog() { 284 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 285 return nil, err 286 } 287 } 288 289 var blobs []lexutil.LexLink 290 for _, entry := range entries { 291 var cids []cid.Cid 292 if entry.Cid != "" { 293 if err := rm.s.db.Create(&entry, []clause.Expression{clause.OnConflict{ 294 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}}, ··· 297 return nil, err 298 } 299 300 cids, err = rm.incrementBlobRefs(urepo, entry.Value) 301 if err != nil { 302 return nil, err 303 } 304 } else { 305 if err := rm.s.db.Delete(&entry, nil).Error; err != nil { 306 return nil, err 307 } 308 cids, err = rm.decrementBlobRefs(urepo, entry.Value) 309 if err != nil { 310 return nil, err 311 } 312 } 313 314 for _, c := range cids { 315 blobs = append(blobs, lexutil.LexLink(c)) 316 } 317 } 318 319 - rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{ 320 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 321 Repo: urepo.Did, 322 Blocks: buf.Bytes(), ··· 330 }, 331 }) 332 333 - if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil { 334 return nil, err 335 } 336 ··· 345 return results, nil 346 } 347 348 - func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 349 c, err := cid.Cast(urepo.Root) 350 if err != nil { 351 return cid.Undef, nil, err ··· 354 dbs := rm.s.getBlockstore(urepo.Did) 355 bs := recording_blockstore.New(dbs) 356 357 - r, err := repo.OpenRepo(context.TODO(), bs, c) 358 if err != nil { 359 return cid.Undef, nil, err 360 } 361 362 - _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey) 363 if err != nil { 364 return cid.Undef, nil, err 365 } ··· 397 return nil, err 398 } 399 400 if res.Count == 0 { 401 if err := rm.db.Exec("DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil { 402 return nil, err
··· 96 } 97 98 // TODO make use of swap commit 99 + func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 100 rootcid, err := cid.Cast(urepo.Root) 101 if err != nil { 102 return nil, err ··· 104 105 dbs := rm.s.getBlockstore(urepo.Did) 106 bs := recording_blockstore.New(dbs) 107 + r, err := repo.OpenRepo(ctx, bs, rootcid) 108 109 var results []ApplyWriteResult 110 111 + entries := make([]models.Record, 0, len(writes)) 112 for i, op := range writes { 113 + // updates or deletes must supply an rkey 114 if op.Type != OpTypeCreate && op.Rkey == nil { 115 return nil, fmt.Errorf("invalid rkey") 116 } else if op.Type == OpTypeCreate && op.Rkey != nil { 117 + // we should conver this op to an update if the rkey already exists 118 + _, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey)) 119 if err == nil { 120 op.Type = OpTypeUpdate 121 } 122 } else if op.Rkey == nil { 123 + // creates that don't supply an rkey will have one generated for them 124 op.Rkey = to.StringPtr(rm.clock.Next().String()) 125 writes[i].Rkey = op.Rkey 126 } 127 128 + // validate the record key is actually valid 129 _, err := syntax.ParseRecordKey(*op.Rkey) 130 if err != nil { 131 return nil, err ··· 133 134 switch op.Type { 135 case OpTypeCreate: 136 + // HACK: this fixes some type conversions, mainly around integers 137 + // first we convert to json bytes 138 + b, err := json.Marshal(*op.Record) 139 if err != nil { 140 return nil, err 141 } 142 + // then we use atdata.UnmarshalJSON to convert it back to a map 143 + out, err := atdata.UnmarshalJSON(b) 144 if err != nil { 145 return nil, err 146 } 147 + // finally we can cast to a MarshalableMap 148 mm := MarshalableMap(out) 149 150 // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection 151 + // i forget why this is actually necessary? 152 if mm["$type"] == "" { 153 mm["$type"] = op.Collection 154 } 155 156 + nc, err := r.PutRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm) 157 if err != nil { 158 return nil, err 159 } 160 + 161 d, err := atdata.MarshalCBOR(mm) 162 if err != nil { 163 return nil, err 164 } 165 + 166 entries = append(entries, models.Record{ 167 Did: urepo.Did, 168 CreatedAt: rm.clock.Next().String(), ··· 171 Cid: nc.String(), 172 Value: d, 173 }) 174 + 175 results = append(results, ApplyWriteResult{ 176 Type: to.StringPtr(OpTypeCreate.String()), 177 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), ··· 179 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 180 }) 181 case OpTypeDelete: 182 + // try to find the old record in the database 183 var old models.Record 184 if err := rm.db.Raw("SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil { 185 return nil, err 186 } 187 + 188 + // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we 189 + // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical 190 + // when reading this code. i dont feel like fixing right now though so 191 entries = append(entries, models.Record{ 192 Did: urepo.Did, 193 Nsid: op.Collection, 194 Rkey: *op.Rkey, 195 Value: old.Value, 196 }) 197 + 198 + // delete the record from the repo 199 + err := r.DeleteRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey)) 200 if err != nil { 201 return nil, err 202 } 203 + 204 + // add a result for the delete 205 results = append(results, ApplyWriteResult{ 206 Type: to.StringPtr(OpTypeDelete.String()), 207 }) 208 case OpTypeUpdate: 209 + // HACK: same hack as above for type fixes 210 + b, err := json.Marshal(*op.Record) 211 if err != nil { 212 return nil, err 213 } 214 + out, err := atdata.UnmarshalJSON(b) 215 if err != nil { 216 return nil, err 217 } 218 mm := MarshalableMap(out) 219 + 220 + nc, err := r.UpdateRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm) 221 if err != nil { 222 return nil, err 223 } 224 + 225 d, err := atdata.MarshalCBOR(mm) 226 if err != nil { 227 return nil, err 228 } 229 + 230 entries = append(entries, models.Record{ 231 Did: urepo.Did, 232 CreatedAt: rm.clock.Next().String(), ··· 235 Cid: nc.String(), 236 Value: d, 237 }) 238 + 239 results = append(results, ApplyWriteResult{ 240 Type: to.StringPtr(OpTypeUpdate.String()), 241 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), ··· 245 } 246 } 247 248 + // commit and get the new root 249 + newroot, rev, err := r.Commit(ctx, urepo.SignFor) 250 if err != nil { 251 return nil, err 252 } 253 254 + // create a buffer for dumping our new cbor into 255 buf := new(bytes.Buffer) 256 257 + // first write the car header to the buffer 258 hb, err := cbor.DumpObject(&car.CarHeader{ 259 Roots: []cid.Cid{newroot}, 260 Version: 1, 261 }) 262 if _, err := carstore.LdWrite(buf, hb); err != nil { 263 return nil, err 264 } 265 266 + // get a diff of the changes to the repo 267 + diffops, err := r.DiffSince(ctx, rootcid) 268 if err != nil { 269 return nil, err 270 } 271 272 + // create the repo ops for the given diff 273 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops)) 274 for _, op := range diffops { 275 var c cid.Cid 276 switch op.Op { ··· 299 }) 300 } 301 302 + blk, err := dbs.Get(ctx, c) 303 if err != nil { 304 return nil, err 305 } 306 307 + // write the block to the buffer 308 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 309 return nil, err 310 } 311 } 312 313 + // write the writelog to the buffer 314 for _, op := range bs.GetWriteLog() { 315 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 316 return nil, err 317 } 318 } 319 320 + // blob blob blob blob blob :3 321 var blobs []lexutil.LexLink 322 for _, entry := range entries { 323 var cids []cid.Cid 324 + // whenever there is cid present, we know it's a create (dumb) 325 if entry.Cid != "" { 326 if err := rm.s.db.Create(&entry, []clause.Expression{clause.OnConflict{ 327 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}}, ··· 330 return nil, err 331 } 332 333 + // increment the given blob refs, yay 334 cids, err = rm.incrementBlobRefs(urepo, entry.Value) 335 if err != nil { 336 return nil, err 337 } 338 } else { 339 + // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey 340 + // is did + collection + rkey. i still really want to separate that out, or use a different type to make 341 + // this less confusing/easy to read. alas, its 2 am and yea no 342 if err := rm.s.db.Delete(&entry, nil).Error; err != nil { 343 return nil, err 344 } 345 + 346 + // TODO: 347 cids, err = rm.decrementBlobRefs(urepo, entry.Value) 348 if err != nil { 349 return nil, err 350 } 351 } 352 353 + // add all the relevant blobs to the blobs list of blobs. blob ^.^ 354 for _, c := range cids { 355 blobs = append(blobs, lexutil.LexLink(c)) 356 } 357 } 358 359 + // NOTE: using the request ctx seems a bit suss here, so using a background context. i'm not sure if this 360 + // runs sync or not 361 + rm.s.evtman.AddEvent(context.Background(), &events.XRPCStreamEvent{ 362 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 363 Repo: urepo.Did, 364 Blocks: buf.Bytes(), ··· 372 }, 373 }) 374 375 + if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil { 376 return nil, err 377 } 378 ··· 387 return results, nil 388 } 389 390 + // this is a fun little guy. to get a proof, we need to read the record out of the blockstore and record how we actually 391 + // got to the guy. we'll wrap a new blockstore in a recording blockstore, then return the log for proof 392 + func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 393 c, err := cid.Cast(urepo.Root) 394 if err != nil { 395 return cid.Undef, nil, err ··· 398 dbs := rm.s.getBlockstore(urepo.Did) 399 bs := recording_blockstore.New(dbs) 400 401 + r, err := repo.OpenRepo(ctx, bs, c) 402 if err != nil { 403 return cid.Undef, nil, err 404 } 405 406 + _, _, err = r.GetRecordBytes(ctx, fmt.Sprintf("%s/%s", collection, rkey)) 407 if err != nil { 408 return cid.Undef, nil, err 409 } ··· 441 return nil, err 442 } 443 444 + // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what 445 + // storage it is in, and clean up s3!!!! 446 if res.Count == 0 { 447 if err := rm.db.Exec("DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil { 448 return nil, err