tidy: cleanup write records #1

merged
opened by hailey.at targeting main from hailey/cleanup-write-records

went through and did a bunch of things

  • propagated contexts and got rid of a bunch of context.TODO()
  • identified a few things that are missing or rather gross atm and we should go back and fix later
  • cleaned up a few other places that i didnt like...
+1 -1
go.mod
··· 31 31 github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e 32 32 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b 33 33 golang.org/x/crypto v0.38.0 34 + gorm.io/driver/postgres v1.5.7 34 35 gorm.io/driver/sqlite v1.5.7 35 36 gorm.io/gorm v1.25.12 36 37 ) ··· 130 131 google.golang.org/protobuf v1.36.6 // indirect 131 132 gopkg.in/go-playground/assert.v1 v1.2.1 // indirect 132 133 gopkg.in/inf.v0 v0.9.1 // indirect 133 - gorm.io/driver/postgres v1.5.7 // indirect 134 134 lukechampine.com/blake3 v1.2.1 // indirect 135 135 )
+9 -7
server/handle_repo_apply_writes.go
··· 6 6 "github.com/labstack/echo/v4" 7 7 ) 8 8 9 - type ComAtprotoRepoApplyWritesRequest struct { 9 + type ComAtprotoRepoApplyWritesInput struct { 10 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 11 Validate *bool `json:"bool,omitempty"` 12 12 Writes []ComAtprotoRepoApplyWritesItem `json:"writes"` ··· 20 20 Value *MarshalableMap `json:"value,omitempty"` 21 21 } 22 22 23 - type ComAtprotoRepoApplyWritesResponse struct { 23 + type ComAtprotoRepoApplyWritesOutput struct { 24 24 Commit RepoCommit `json:"commit"` 25 25 Results []ApplyWriteResult `json:"results"` 26 26 } 27 27 28 28 func (s *Server) handleApplyWrites(e echo.Context) error { 29 - repo := e.Get("repo").(*models.RepoActor) 29 + ctx := e.Request().Context() 30 30 31 - var req ComAtprotoRepoApplyWritesRequest 31 + var req ComAtprotoRepoApplyWritesInput 32 32 if err := e.Bind(&req); err != nil { 33 33 s.logger.Error("error binding", "error", err) 34 34 return helpers.ServerError(e, nil) ··· 39 39 return helpers.InputError(e, nil) 40 40 } 41 41 42 + repo := e.Get("repo").(*models.RepoActor) 43 + 42 44 if repo.Repo.Did != req.Repo { 43 45 s.logger.Warn("mismatched repo/auth") 44 46 return helpers.InputError(e, nil) 45 47 } 46 48 47 - ops := []Op{} 49 + ops := make([]Op, 0, len(req.Writes)) 48 50 for _, item := range req.Writes { 49 51 ops = append(ops, Op{ 50 52 Type: OpType(item.Type), ··· 54 56 }) 55 57 } 56 58 57 - results, err := s.repoman.applyWrites(repo.Repo, ops, req.SwapCommit) 59 + results, err := s.repoman.applyWrites(ctx, repo.Repo, ops, req.SwapCommit) 58 60 if err != nil { 59 61 s.logger.Error("error applying writes", "error", err) 60 62 return helpers.ServerError(e, nil) ··· 66 68 results[i].Commit = nil 67 69 } 68 70 69 - return e.JSON(200, ComAtprotoRepoApplyWritesResponse{ 71 + return e.JSON(200, ComAtprotoRepoApplyWritesOutput{ 70 72 Commit: commit, 71 73 Results: results, 72 74 })
+5 -3
server/handle_repo_create_record.go
··· 6 6 "github.com/labstack/echo/v4" 7 7 ) 8 8 9 - type ComAtprotoRepoCreateRecordRequest struct { 9 + type ComAtprotoRepoCreateRecordInput struct { 10 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 12 Rkey *string `json:"rkey,omitempty"` ··· 17 17 } 18 18 19 19 func (s *Server) handleCreateRecord(e echo.Context) error { 20 + ctx := e.Request().Context() 21 + 20 22 repo := e.Get("repo").(*models.RepoActor) 21 23 22 - var req ComAtprotoRepoCreateRecordRequest 24 + var req ComAtprotoRepoCreateRecordInput 23 25 if err := e.Bind(&req); err != nil { 24 26 s.logger.Error("error binding", "error", err) 25 27 return helpers.ServerError(e, nil) ··· 40 42 optype = OpTypeUpdate 41 43 } 42 44 43 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 45 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 44 46 { 45 47 Type: optype, 46 48 Collection: req.Collection,
+5 -3
server/handle_repo_delete_record.go
··· 6 6 "github.com/labstack/echo/v4" 7 7 ) 8 8 9 - type ComAtprotoRepoDeleteRecordRequest struct { 9 + type ComAtprotoRepoDeleteRecordInput struct { 10 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 12 Rkey string `json:"rkey" validate:"required,atproto-rkey"` ··· 15 15 } 16 16 17 17 func (s *Server) handleDeleteRecord(e echo.Context) error { 18 + ctx := e.Request().Context() 19 + 18 20 repo := e.Get("repo").(*models.RepoActor) 19 21 20 - var req ComAtprotoRepoDeleteRecordRequest 22 + var req ComAtprotoRepoDeleteRecordInput 21 23 if err := e.Bind(&req); err != nil { 22 24 s.logger.Error("error binding", "error", err) 23 25 return helpers.ServerError(e, nil) ··· 33 35 return helpers.InputError(e, nil) 34 36 } 35 37 36 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 38 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 37 39 { 38 40 Type: OpTypeDelete, 39 41 Collection: req.Collection,
+5 -3
server/handle_repo_put_record.go
··· 6 6 "github.com/labstack/echo/v4" 7 7 ) 8 8 9 - type ComAtprotoRepoPutRecordRequest struct { 9 + type ComAtprotoRepoPutRecordInput struct { 10 10 Repo string `json:"repo" validate:"required,atproto-did"` 11 11 Collection string `json:"collection" validate:"required,atproto-nsid"` 12 12 Rkey string `json:"rkey" validate:"required,atproto-rkey"` ··· 17 17 } 18 18 19 19 func (s *Server) handlePutRecord(e echo.Context) error { 20 + ctx := e.Request().Context() 21 + 20 22 repo := e.Get("repo").(*models.RepoActor) 21 23 22 - var req ComAtprotoRepoPutRecordRequest 24 + var req ComAtprotoRepoPutRecordInput 23 25 if err := e.Bind(&req); err != nil { 24 26 s.logger.Error("error binding", "error", err) 25 27 return helpers.ServerError(e, nil) ··· 40 42 optype = OpTypeUpdate 41 43 } 42 44 43 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 45 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 44 46 { 45 47 Type: optype, 46 48 Collection: req.Collection,
+3 -1
server/handle_sync_get_record.go
··· 13 13 ) 14 14 15 15 func (s *Server) handleSyncGetRecord(e echo.Context) error { 16 + ctx := e.Request().Context() 17 + 16 18 did := e.QueryParam("did") 17 19 collection := e.QueryParam("collection") 18 20 rkey := e.QueryParam("rkey") ··· 23 25 return helpers.ServerError(e, nil) 24 26 } 25 27 26 - root, blocks, err := s.repoman.getRecordProof(urepo, collection, rkey) 28 + root, blocks, err := s.repoman.getRecordProof(ctx, urepo, collection, rkey) 27 29 if err != nil { 28 30 return err 29 31 }
+67 -21
server/repo.go
··· 96 96 } 97 97 98 98 // TODO make use of swap commit 99 - func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 99 + func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 100 100 rootcid, err := cid.Cast(urepo.Root) 101 101 if err != nil { 102 102 return nil, err ··· 104 104 105 105 dbs := rm.s.getBlockstore(urepo.Did) 106 106 bs := recording_blockstore.New(dbs) 107 - r, err := repo.OpenRepo(context.TODO(), bs, rootcid) 107 + r, err := repo.OpenRepo(ctx, bs, rootcid) 108 108 109 - entries := []models.Record{} 110 109 var results []ApplyWriteResult 111 110 111 + entries := make([]models.Record, 0, len(writes)) 112 112 for i, op := range writes { 113 + // updates or deletes must supply an rkey 113 114 if op.Type != OpTypeCreate && op.Rkey == nil { 114 115 return nil, fmt.Errorf("invalid rkey") 115 116 } else if op.Type == OpTypeCreate && op.Rkey != nil { 116 - _, _, err := r.GetRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 117 + // we should conver this op to an update if the rkey already exists 118 + _, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey)) 117 119 if err == nil { 118 120 op.Type = OpTypeUpdate 119 121 } 120 122 } else if op.Rkey == nil { 123 + // creates that don't supply an rkey will have one generated for them 121 124 op.Rkey = to.StringPtr(rm.clock.Next().String()) 122 125 writes[i].Rkey = op.Rkey 123 126 } 124 127 128 + // validate the record key is actually valid 125 129 _, err := syntax.ParseRecordKey(*op.Rkey) 126 130 if err != nil { 127 131 return nil, err ··· 129 133 130 134 switch op.Type { 131 135 case OpTypeCreate: 132 - j, err := json.Marshal(*op.Record) 136 + // HACK: this fixes some type conversions, mainly around integers 137 + // first we convert to json bytes 138 + b, err := json.Marshal(*op.Record) 133 139 if err != nil { 134 140 return nil, err 135 141 } 136 - out, err := atdata.UnmarshalJSON(j) 142 + // then we use atdata.UnmarshalJSON to convert it back to a map 143 + out, err := atdata.UnmarshalJSON(b) 137 144 if err != nil { 138 145 return nil, err 139 146 } 147 + // finally we can cast to a MarshalableMap 140 148 mm := MarshalableMap(out) 141 149 142 150 // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection 151 + // i forget why this is actually necessary? 143 152 if mm["$type"] == "" { 144 153 mm["$type"] = op.Collection 145 154 } 146 155 147 - nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm) 156 + nc, err := r.PutRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm) 148 157 if err != nil { 149 158 return nil, err 150 159 } 160 + 151 161 d, err := atdata.MarshalCBOR(mm) 152 162 if err != nil { 153 163 return nil, err 154 164 } 165 + 155 166 entries = append(entries, models.Record{ 156 167 Did: urepo.Did, 157 168 CreatedAt: rm.clock.Next().String(), ··· 160 171 Cid: nc.String(), 161 172 Value: d, 162 173 }) 174 + 163 175 results = append(results, ApplyWriteResult{ 164 176 Type: to.StringPtr(OpTypeCreate.String()), 165 177 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), ··· 167 179 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 168 180 }) 169 181 case OpTypeDelete: 182 + // try to find the old record in the database 170 183 var old models.Record 171 184 if err := rm.db.Raw("SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil { 172 185 return nil, err 173 186 } 187 + 188 + // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we 189 + // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical 190 + // when reading this code. i dont feel like fixing right now though so 174 191 entries = append(entries, models.Record{ 175 192 Did: urepo.Did, 176 193 Nsid: op.Collection, 177 194 Rkey: *op.Rkey, 178 195 Value: old.Value, 179 196 }) 180 - err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 197 + 198 + // delete the record from the repo 199 + err := r.DeleteRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey)) 181 200 if err != nil { 182 201 return nil, err 183 202 } 203 + 204 + // add a result for the delete 184 205 results = append(results, ApplyWriteResult{ 185 206 Type: to.StringPtr(OpTypeDelete.String()), 186 207 }) 187 208 case OpTypeUpdate: 188 - j, err := json.Marshal(*op.Record) 209 + // HACK: same hack as above for type fixes 210 + b, err := json.Marshal(*op.Record) 189 211 if err != nil { 190 212 return nil, err 191 213 } 192 - out, err := atdata.UnmarshalJSON(j) 214 + out, err := atdata.UnmarshalJSON(b) 193 215 if err != nil { 194 216 return nil, err 195 217 } 196 218 mm := MarshalableMap(out) 197 - nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm) 219 + 220 + nc, err := r.UpdateRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm) 198 221 if err != nil { 199 222 return nil, err 200 223 } 224 + 201 225 d, err := atdata.MarshalCBOR(mm) 202 226 if err != nil { 203 227 return nil, err 204 228 } 229 + 205 230 entries = append(entries, models.Record{ 206 231 Did: urepo.Did, 207 232 CreatedAt: rm.clock.Next().String(), ··· 210 235 Cid: nc.String(), 211 236 Value: d, 212 237 }) 238 + 213 239 results = append(results, ApplyWriteResult{ 214 240 Type: to.StringPtr(OpTypeUpdate.String()), 215 241 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), ··· 219 245 } 220 246 } 221 247 222 - newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor) 248 + // commit and get the new root 249 + newroot, rev, err := r.Commit(ctx, urepo.SignFor) 223 250 if err != nil { 224 251 return nil, err 225 252 } 226 253 254 + // create a buffer for dumping our new cbor into 227 255 buf := new(bytes.Buffer) 228 256 257 + // first write the car header to the buffer 229 258 hb, err := cbor.DumpObject(&car.CarHeader{ 230 259 Roots: []cid.Cid{newroot}, 231 260 Version: 1, 232 261 }) 233 - 234 262 if _, err := carstore.LdWrite(buf, hb); err != nil { 235 263 return nil, err 236 264 } 237 265 238 - diffops, err := r.DiffSince(context.TODO(), rootcid) 266 + // get a diff of the changes to the repo 267 + diffops, err := r.DiffSince(ctx, rootcid) 239 268 if err != nil { 240 269 return nil, err 241 270 } 242 271 272 + // create the repo ops for the given diff 243 273 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops)) 244 - 245 274 for _, op := range diffops { 246 275 var c cid.Cid 247 276 switch op.Op { ··· 270 299 }) 271 300 } 272 301 273 - blk, err := dbs.Get(context.TODO(), c) 302 + blk, err := dbs.Get(ctx, c) 274 303 if err != nil { 275 304 return nil, err 276 305 } 277 306 307 + // write the block to the buffer 278 308 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 279 309 return nil, err 280 310 } 281 311 } 282 312 313 + // write the writelog to the buffer 283 314 for _, op := range bs.GetWriteLog() { 284 315 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 285 316 return nil, err 286 317 } 287 318 } 288 319 320 + // blob blob blob blob blob :3 289 321 var blobs []lexutil.LexLink 290 322 for _, entry := range entries { 291 323 var cids []cid.Cid 324 + // whenever there is cid present, we know it's a create (dumb) 292 325 if entry.Cid != "" { 293 326 if err := rm.s.db.Create(&entry, []clause.Expression{clause.OnConflict{ 294 327 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}}, ··· 297 330 return nil, err 298 331 } 299 332 333 + // increment the given blob refs, yay 300 334 cids, err = rm.incrementBlobRefs(urepo, entry.Value) 301 335 if err != nil { 302 336 return nil, err 303 337 } 304 338 } else { 339 + // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey 340 + // is did + collection + rkey. i still really want to separate that out, or use a different type to make 341 + // this less confusing/easy to read. alas, its 2 am and yea no 305 342 if err := rm.s.db.Delete(&entry, nil).Error; err != nil { 306 343 return nil, err 307 344 } 345 + 346 + // TODO: 308 347 cids, err = rm.decrementBlobRefs(urepo, entry.Value) 309 348 if err != nil { 310 349 return nil, err 311 350 } 312 351 } 313 352 353 + // add all the relevant blobs to the blobs list of blobs. blob ^.^ 314 354 for _, c := range cids { 315 355 blobs = append(blobs, lexutil.LexLink(c)) 316 356 } 317 357 } 318 358 319 - rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{ 359 + // NOTE: using the request ctx seems a bit suss here, so using a background context. i'm not sure if this 360 + // runs sync or not 361 + rm.s.evtman.AddEvent(context.Background(), &events.XRPCStreamEvent{ 320 362 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 321 363 Repo: urepo.Did, 322 364 Blocks: buf.Bytes(), ··· 330 372 }, 331 373 }) 332 374 333 - if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil { 375 + if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil { 334 376 return nil, err 335 377 } 336 378 ··· 345 387 return results, nil 346 388 } 347 389 348 - func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 390 + // this is a fun little guy. to get a proof, we need to read the record out of the blockstore and record how we actually 391 + // got to the guy. we'll wrap a new blockstore in a recording blockstore, then return the log for proof 392 + func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 349 393 c, err := cid.Cast(urepo.Root) 350 394 if err != nil { 351 395 return cid.Undef, nil, err ··· 354 398 dbs := rm.s.getBlockstore(urepo.Did) 355 399 bs := recording_blockstore.New(dbs) 356 400 357 - r, err := repo.OpenRepo(context.TODO(), bs, c) 401 + r, err := repo.OpenRepo(ctx, bs, c) 358 402 if err != nil { 359 403 return cid.Undef, nil, err 360 404 } 361 405 362 - _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey) 406 + _, _, err = r.GetRecordBytes(ctx, fmt.Sprintf("%s/%s", collection, rkey)) 363 407 if err != nil { 364 408 return cid.Undef, nil, err 365 409 } ··· 397 441 return nil, err 398 442 } 399 443 444 + // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what 445 + // storage it is in, and clean up s3!!!! 400 446 if res.Count == 0 { 401 447 if err := rm.db.Exec("DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil { 402 448 return nil, err