An atproto PDS written in Go

cleanup context todo

+1
server/handle_account.go
··· 12 12 13 13 func (s *Server) handleAccount(e echo.Context) error { 14 14 ctx := e.Request().Context() 15 + 15 16 repo, sess, err := s.getSessionRepoOrErr(e) 16 17 if err != nil { 17 18 return e.Redirect(303, "/account/signin")
+8 -7
server/handle_import_repo.go
··· 2 2 3 3 import ( 4 4 "bytes" 5 - "context" 6 5 "io" 7 6 "slices" 8 7 "strings" ··· 18 17 ) 19 18 20 19 func (s *Server) handleRepoImportRepo(e echo.Context) error { 20 + ctx := e.Request().Context() 21 + 21 22 urepo := e.Get("repo").(*models.RepoActor) 22 23 23 24 b, err := io.ReadAll(e.Request().Body) ··· 52 53 53 54 slices.Reverse(orderedBlocks) 54 55 55 - if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil { 56 + if err := bs.PutMany(ctx, orderedBlocks); err != nil { 56 57 s.logger.Error("could not insert blocks", "error", err) 57 58 return helpers.ServerError(e, nil) 58 59 } 59 60 60 - r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 61 + r, err := repo.OpenRepo(ctx, bs, cs.Header.Roots[0]) 61 62 if err != nil { 62 63 s.logger.Error("could not open repo", "error", err) 63 64 return helpers.ServerError(e, nil) ··· 67 68 68 69 clock := syntax.NewTIDClock(0) 69 70 70 - if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 71 + if err := r.ForEach(ctx, "", func(key string, cid cid.Cid) error { 71 72 pts := strings.Split(key, "/") 72 73 nsid := pts[0] 73 74 rkey := pts[1] 74 75 cidStr := cid.String() 75 - b, err := bs.Get(context.TODO(), cid) 76 + b, err := bs.Get(ctx, cid) 76 77 if err != nil { 77 78 s.logger.Error("record bytes don't exist in blockstore", "error", err) 78 79 return helpers.ServerError(e, nil) ··· 100 101 101 102 tx.Commit() 102 103 103 - root, rev, err := r.Commit(context.TODO(), urepo.SignFor) 104 + root, rev, err := r.Commit(ctx, urepo.SignFor) 104 105 if err != nil { 105 106 s.logger.Error("error committing", "error", err) 106 107 return helpers.ServerError(e, nil) 107 108 } 108 109 109 - if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil { 110 + if err := s.UpdateRepo(ctx, urepo.Repo.Did, root, rev); err != nil { 110 111 s.logger.Error("error updating repo after commit", "error", err) 111 112 return helpers.ServerError(e, nil) 112 113 }
+3 -1
server/handle_repo_apply_writes.go
··· 26 26 } 27 27 28 28 func (s *Server) handleApplyWrites(e echo.Context) error { 29 + ctx := e.Request().Context() 30 + 29 31 repo := e.Get("repo").(*models.RepoActor) 30 32 31 33 var req ComAtprotoRepoApplyWritesRequest ··· 54 56 }) 55 57 } 56 58 57 - results, err := s.repoman.applyWrites(repo.Repo, ops, req.SwapCommit) 59 + results, err := s.repoman.applyWrites(ctx, repo.Repo, ops, req.SwapCommit) 58 60 if err != nil { 59 61 s.logger.Error("error applying writes", "error", err) 60 62 return helpers.ServerError(e, nil)
+3 -1
server/handle_repo_create_record.go
··· 17 17 } 18 18 19 19 func (s *Server) handleCreateRecord(e echo.Context) error { 20 + ctx := e.Request().Context() 21 + 20 22 repo := e.Get("repo").(*models.RepoActor) 21 23 22 24 var req ComAtprotoRepoCreateRecordRequest ··· 40 42 optype = OpTypeUpdate 41 43 } 42 44 43 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 45 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 44 46 { 45 47 Type: optype, 46 48 Collection: req.Collection,
+3 -1
server/handle_repo_delete_record.go
··· 15 15 } 16 16 17 17 func (s *Server) handleDeleteRecord(e echo.Context) error { 18 + ctx := e.Request().Context() 19 + 18 20 repo := e.Get("repo").(*models.RepoActor) 19 21 20 22 var req ComAtprotoRepoDeleteRecordRequest ··· 33 35 return helpers.InputError(e, nil) 34 36 } 35 37 36 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 38 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 37 39 { 38 40 Type: OpTypeDelete, 39 41 Collection: req.Collection,
+3 -1
server/handle_repo_put_record.go
··· 17 17 } 18 18 19 19 func (s *Server) handlePutRecord(e echo.Context) error { 20 + ctx := e.Request().Context() 21 + 20 22 repo := e.Get("repo").(*models.RepoActor) 21 23 22 24 var req ComAtprotoRepoPutRecordRequest ··· 40 42 optype = OpTypeUpdate 41 43 } 42 44 43 - results, err := s.repoman.applyWrites(repo.Repo, []Op{ 45 + results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{ 44 46 { 45 47 Type: optype, 46 48 Collection: req.Collection,
+3 -1
server/handle_sync_get_record.go
··· 13 13 ) 14 14 15 15 func (s *Server) handleSyncGetRecord(e echo.Context) error { 16 + ctx := e.Request().Context() 17 + 16 18 did := e.QueryParam("did") 17 19 collection := e.QueryParam("collection") 18 20 rkey := e.QueryParam("rkey") ··· 23 25 return helpers.ServerError(e, nil) 24 26 } 25 27 26 - root, blocks, err := s.repoman.getRecordProof(urepo, collection, rkey) 28 + root, blocks, err := s.repoman.getRecordProof(ctx, urepo, collection, rkey) 27 29 if err != nil { 28 30 return err 29 31 }
+19 -16
server/repo.go
··· 96 96 } 97 97 98 98 // TODO make use of swap commit 99 - func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 99 + func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 100 100 rootcid, err := cid.Cast(urepo.Root) 101 101 if err != nil { 102 102 return nil, err ··· 104 104 105 105 dbs := rm.s.getBlockstore(urepo.Did) 106 106 bs := recording_blockstore.New(dbs) 107 - r, err := repo.OpenRepo(context.TODO(), bs, rootcid) 107 + r, err := repo.OpenRepo(ctx, bs, rootcid) 108 108 109 - entries := []models.Record{} 110 - var results []ApplyWriteResult 109 + entries := make([]models.Record, 0, len(writes)) 110 + results := make([]ApplyWriteResult, 0, len(writes)) 111 111 112 112 for i, op := range writes { 113 113 if op.Type != OpTypeCreate && op.Rkey == nil { 114 114 return nil, fmt.Errorf("invalid rkey") 115 115 } else if op.Type == OpTypeCreate && op.Rkey != nil { 116 - _, _, err := r.GetRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 116 + _, _, err := r.GetRecord(ctx, op.Collection+"/"+*op.Rkey) 117 117 if err == nil { 118 118 op.Type = OpTypeUpdate 119 119 } ··· 144 144 mm["$type"] = op.Collection 145 145 } 146 146 147 - nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm) 147 + nc, err := r.PutRecord(ctx, op.Collection+"/"+*op.Rkey, &mm) 148 148 if err != nil { 149 149 return nil, err 150 150 } ··· 177 177 Rkey: *op.Rkey, 178 178 Value: old.Value, 179 179 }) 180 - err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey) 180 + err := r.DeleteRecord(ctx, op.Collection+"/"+*op.Rkey) 181 181 if err != nil { 182 182 return nil, err 183 183 } ··· 194 194 return nil, err 195 195 } 196 196 mm := MarshalableMap(out) 197 - nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm) 197 + nc, err := r.UpdateRecord(ctx, op.Collection+"/"+*op.Rkey, &mm) 198 198 if err != nil { 199 199 return nil, err 200 200 } ··· 219 219 } 220 220 } 221 221 222 - newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor) 222 + newroot, rev, err := r.Commit(ctx, urepo.SignFor) 223 223 if err != nil { 224 224 return nil, err 225 225 } ··· 230 230 Roots: []cid.Cid{newroot}, 231 231 Version: 1, 232 232 }) 233 + if err != nil { 234 + return nil, err 235 + } 233 236 234 237 if _, err := carstore.LdWrite(buf, hb); err != nil { 235 238 return nil, err 236 239 } 237 240 238 - diffops, err := r.DiffSince(context.TODO(), rootcid) 241 + diffops, err := r.DiffSince(ctx, rootcid) 239 242 if err != nil { 240 243 return nil, err 241 244 } ··· 270 273 }) 271 274 } 272 275 273 - blk, err := dbs.Get(context.TODO(), c) 276 + blk, err := dbs.Get(ctx, c) 274 277 if err != nil { 275 278 return nil, err 276 279 } ··· 316 319 } 317 320 } 318 321 319 - rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{ 322 + rm.s.evtman.AddEvent(ctx, &events.XRPCStreamEvent{ 320 323 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 321 324 Repo: urepo.Did, 322 325 Blocks: buf.Bytes(), ··· 330 333 }, 331 334 }) 332 335 333 - if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil { 336 + if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil { 334 337 return nil, err 335 338 } 336 339 ··· 345 348 return results, nil 346 349 } 347 350 348 - func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 351 + func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 349 352 c, err := cid.Cast(urepo.Root) 350 353 if err != nil { 351 354 return cid.Undef, nil, err ··· 354 357 dbs := rm.s.getBlockstore(urepo.Did) 355 358 bs := recording_blockstore.New(dbs) 356 359 357 - r, err := repo.OpenRepo(context.TODO(), bs, c) 360 + r, err := repo.OpenRepo(ctx, bs, c) 358 361 if err != nil { 359 362 return cid.Undef, nil, err 360 363 } 361 364 362 - _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey) 365 + _, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey) 363 366 if err != nil { 364 367 return cid.Undef, nil, err 365 368 }