An atproto PDS written in Go
at main 2.9 kB view raw
1package server 2 3import ( 4 "bytes" 5 "context" 6 "io" 7 "slices" 8 "strings" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/repo" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 blocks "github.com/ipfs/go-block-format" 15 "github.com/ipfs/go-cid" 16 "github.com/ipld/go-car" 17 "github.com/labstack/echo/v4" 18) 19 20func (s *Server) handleRepoImportRepo(e echo.Context) error { 21 ctx := e.Request().Context() 22 logger := s.logger.With("name", "handleImportRepo") 23 24 urepo := e.Get("repo").(*models.RepoActor) 25 26 b, err := io.ReadAll(e.Request().Body) 27 if err != nil { 28 logger.Error("could not read bytes in import request", "error", err) 29 return helpers.ServerError(e, nil) 30 } 31 32 bs := s.getBlockstore(urepo.Repo.Did) 33 34 cs, err := car.NewCarReader(bytes.NewReader(b)) 35 if err != nil { 36 logger.Error("could not read car in import request", "error", err) 37 return helpers.ServerError(e, nil) 38 } 39 40 orderedBlocks := []blocks.Block{} 41 currBlock, err := cs.Next() 42 if err != nil { 43 logger.Error("could not get first block from car", "error", err) 44 return helpers.ServerError(e, nil) 45 } 46 currBlockCt := 1 47 48 for currBlock != nil { 49 logger.Info("someone is importing their repo", "block", currBlockCt) 50 orderedBlocks = append(orderedBlocks, currBlock) 51 next, _ := cs.Next() 52 currBlock = next 53 currBlockCt++ 54 } 55 56 slices.Reverse(orderedBlocks) 57 58 if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil { 59 logger.Error("could not insert blocks", "error", err) 60 return helpers.ServerError(e, nil) 61 } 62 63 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 64 if err != nil { 65 logger.Error("could not open repo", "error", err) 66 return helpers.ServerError(e, nil) 67 } 68 69 tx := s.db.BeginDangerously(ctx) 70 71 clock := syntax.NewTIDClock(0) 72 73 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 74 pts := strings.Split(key, "/") 75 nsid := pts[0] 76 rkey := pts[1] 77 cidStr := cid.String() 78 b, err := bs.Get(context.TODO(), cid) 79 if err != nil { 80 logger.Error("record bytes don't exist in blockstore", "error", err) 81 return helpers.ServerError(e, nil) 82 } 83 84 rec := models.Record{ 85 Did: urepo.Repo.Did, 86 CreatedAt: clock.Next().String(), 87 Nsid: nsid, 88 Rkey: rkey, 89 Cid: cidStr, 90 Value: b.RawData(), 91 } 92 93 if err := tx.Save(rec).Error; err != nil { 94 return err 95 } 96 97 return nil 98 }); err != nil { 99 tx.Rollback() 100 logger.Error("record bytes don't exist in blockstore", "error", err) 101 return helpers.ServerError(e, nil) 102 } 103 104 tx.Commit() 105 106 root, rev, err := r.Commit(context.TODO(), urepo.SignFor) 107 if err != nil { 108 logger.Error("error committing", "error", err) 109 return helpers.ServerError(e, nil) 110 } 111 112 if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil { 113 logger.Error("error updating repo after commit", "error", err) 114 return helpers.ServerError(e, nil) 115 } 116 117 return nil 118}