An atproto PDS written in Go
103
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 0.4.4 115 lines 2.9 kB view raw
1package server 2 3import ( 4 "bytes" 5 "context" 6 "io" 7 "slices" 8 "strings" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/indigo/repo" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 blocks "github.com/ipfs/go-block-format" 15 "github.com/ipfs/go-cid" 16 "github.com/ipld/go-car" 17 "github.com/labstack/echo/v4" 18) 19 20func (s *Server) handleRepoImportRepo(e echo.Context) error { 21 urepo := e.Get("repo").(*models.RepoActor) 22 23 b, err := io.ReadAll(e.Request().Body) 24 if err != nil { 25 s.logger.Error("could not read bytes in import request", "error", err) 26 return helpers.ServerError(e, nil) 27 } 28 29 bs := s.getBlockstore(urepo.Repo.Did) 30 31 cs, err := car.NewCarReader(bytes.NewReader(b)) 32 if err != nil { 33 s.logger.Error("could not read car in import request", "error", err) 34 return helpers.ServerError(e, nil) 35 } 36 37 orderedBlocks := []blocks.Block{} 38 currBlock, err := cs.Next() 39 if err != nil { 40 s.logger.Error("could not get first block from car", "error", err) 41 return helpers.ServerError(e, nil) 42 } 43 currBlockCt := 1 44 45 for currBlock != nil { 46 s.logger.Info("someone is importing their repo", "block", currBlockCt) 47 orderedBlocks = append(orderedBlocks, currBlock) 48 next, _ := cs.Next() 49 currBlock = next 50 currBlockCt++ 51 } 52 53 slices.Reverse(orderedBlocks) 54 55 if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil { 56 s.logger.Error("could not insert blocks", "error", err) 57 return helpers.ServerError(e, nil) 58 } 59 60 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 61 if err != nil { 62 s.logger.Error("could not open repo", "error", err) 63 return helpers.ServerError(e, nil) 64 } 65 66 tx := s.db.BeginDangerously() 67 68 clock := syntax.NewTIDClock(0) 69 70 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 71 pts := strings.Split(key, "/") 72 nsid := pts[0] 73 rkey := pts[1] 74 cidStr := cid.String() 75 b, err := bs.Get(context.TODO(), cid) 76 if err != nil { 77 s.logger.Error("record bytes don't exist in blockstore", "error", err) 78 return helpers.ServerError(e, nil) 79 } 80 81 rec := models.Record{ 82 Did: urepo.Repo.Did, 83 CreatedAt: clock.Next().String(), 84 Nsid: nsid, 85 Rkey: rkey, 86 Cid: cidStr, 87 Value: b.RawData(), 88 } 89 90 if err := tx.Save(rec).Error; err != nil { 91 return err 92 } 93 94 return nil 95 }); err != nil { 96 tx.Rollback() 97 s.logger.Error("record bytes don't exist in blockstore", "error", err) 98 return helpers.ServerError(e, nil) 99 } 100 101 tx.Commit() 102 103 root, rev, err := r.Commit(context.TODO(), urepo.SignFor) 104 if err != nil { 105 s.logger.Error("error committing", "error", err) 106 return helpers.ServerError(e, nil) 107 } 108 109 if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil { 110 s.logger.Error("error updating repo after commit", "error", err) 111 return helpers.ServerError(e, nil) 112 } 113 114 return nil 115}