An atproto PDS written in Go
103
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.4 118 lines 2.9 kB view raw
1package server 2 3import ( 4 "bytes" 5 "context" 6 "io" 7 "slices" 8 "strings" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/repo" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 blocks "github.com/ipfs/go-block-format" 15 "github.com/ipfs/go-cid" 16 "github.com/ipld/go-car" 17 "github.com/labstack/echo/v4" 18) 19 20func (s *Server) handleRepoImportRepo(e echo.Context) error { 21 ctx := e.Request().Context() 22 logger := s.logger.With("name", "handleImportRepo") 23 24 urepo := e.Get("repo").(*models.RepoActor) 25 26 b, err := io.ReadAll(e.Request().Body) 27 if err != nil { 28 logger.Error("could not read bytes in import request", "error", err) 29 return helpers.ServerError(e, nil) 30 } 31 32 bs := s.getBlockstore(urepo.Repo.Did) 33 34 cs, err := car.NewCarReader(bytes.NewReader(b)) 35 if err != nil { 36 logger.Error("could not read car in import request", "error", err) 37 return helpers.ServerError(e, nil) 38 } 39 40 orderedBlocks := []blocks.Block{} 41 currBlock, err := cs.Next() 42 if err != nil { 43 logger.Error("could not get first block from car", "error", err) 44 return helpers.ServerError(e, nil) 45 } 46 currBlockCt := 1 47 48 for currBlock != nil { 49 logger.Info("someone is importing their repo", "block", currBlockCt) 50 orderedBlocks = append(orderedBlocks, currBlock) 51 next, _ := cs.Next() 52 currBlock = next 53 currBlockCt++ 54 } 55 56 slices.Reverse(orderedBlocks) 57 58 if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil { 59 logger.Error("could not insert blocks", "error", err) 60 return helpers.ServerError(e, nil) 61 } 62 63 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 64 if err != nil { 65 logger.Error("could not open repo", "error", err) 66 return helpers.ServerError(e, nil) 67 } 68 69 tx := s.db.Begin(ctx) 70 71 clock := syntax.NewTIDClock(0) 72 73 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 74 pts := strings.Split(key, "/") 75 nsid := pts[0] 76 rkey := pts[1] 77 cidStr := cid.String() 78 b, err := bs.Get(context.TODO(), cid) 79 if err != nil { 80 logger.Error("record bytes don't exist in blockstore", "error", err) 81 return helpers.ServerError(e, nil) 82 } 83 84 rec := models.Record{ 85 Did: urepo.Repo.Did, 86 CreatedAt: clock.Next().String(), 87 Nsid: nsid, 88 Rkey: rkey, 89 Cid: cidStr, 90 Value: b.RawData(), 91 } 92 93 if err := tx.Save(rec).Error; err != nil { 94 return err 95 } 96 97 return nil 98 }); err != nil { 99 tx.Rollback() 100 logger.Error("record bytes don't exist in blockstore", "error", err) 101 return helpers.ServerError(e, nil) 102 } 103 104 tx.Commit() 105 106 root, rev, err := r.Commit(context.TODO(), urepo.SignFor) 107 if err != nil { 108 logger.Error("error committing", "error", err) 109 return helpers.ServerError(e, nil) 110 } 111 112 if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil { 113 logger.Error("error updating repo after commit", "error", err) 114 return helpers.ServerError(e, nil) 115 } 116 117 return nil 118}