1package server
2
3import (
4 "bytes"
5 "context"
6 "io"
7 "slices"
8 "strings"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/indigo/repo"
12 "github.com/haileyok/cocoon/internal/helpers"
13 "github.com/haileyok/cocoon/models"
14 blocks "github.com/ipfs/go-block-format"
15 "github.com/ipfs/go-cid"
16 "github.com/ipld/go-car"
17 "github.com/labstack/echo/v4"
18)
19
20func (s *Server) handleRepoImportRepo(e echo.Context) error {
21 ctx := e.Request().Context()
22 logger := s.logger.With("name", "handleImportRepo")
23
24 urepo := e.Get("repo").(*models.RepoActor)
25
26 b, err := io.ReadAll(e.Request().Body)
27 if err != nil {
28 logger.Error("could not read bytes in import request", "error", err)
29 return helpers.ServerError(e, nil)
30 }
31
32 bs := s.getBlockstore(urepo.Repo.Did)
33
34 cs, err := car.NewCarReader(bytes.NewReader(b))
35 if err != nil {
36 logger.Error("could not read car in import request", "error", err)
37 return helpers.ServerError(e, nil)
38 }
39
40 orderedBlocks := []blocks.Block{}
41 currBlock, err := cs.Next()
42 if err != nil {
43 logger.Error("could not get first block from car", "error", err)
44 return helpers.ServerError(e, nil)
45 }
46 currBlockCt := 1
47
48 for currBlock != nil {
49 logger.Info("someone is importing their repo", "block", currBlockCt)
50 orderedBlocks = append(orderedBlocks, currBlock)
51 next, _ := cs.Next()
52 currBlock = next
53 currBlockCt++
54 }
55
56 slices.Reverse(orderedBlocks)
57
58 if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil {
59 logger.Error("could not insert blocks", "error", err)
60 return helpers.ServerError(e, nil)
61 }
62
63 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
64 if err != nil {
65 logger.Error("could not open repo", "error", err)
66 return helpers.ServerError(e, nil)
67 }
68
69 tx := s.db.BeginDangerously(ctx)
70
71 clock := syntax.NewTIDClock(0)
72
73 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
74 pts := strings.Split(key, "/")
75 nsid := pts[0]
76 rkey := pts[1]
77 cidStr := cid.String()
78 b, err := bs.Get(context.TODO(), cid)
79 if err != nil {
80 logger.Error("record bytes don't exist in blockstore", "error", err)
81 return helpers.ServerError(e, nil)
82 }
83
84 rec := models.Record{
85 Did: urepo.Repo.Did,
86 CreatedAt: clock.Next().String(),
87 Nsid: nsid,
88 Rkey: rkey,
89 Cid: cidStr,
90 Value: b.RawData(),
91 }
92
93 if err := tx.Save(rec).Error; err != nil {
94 return err
95 }
96
97 return nil
98 }); err != nil {
99 tx.Rollback()
100 logger.Error("record bytes don't exist in blockstore", "error", err)
101 return helpers.ServerError(e, nil)
102 }
103
104 tx.Commit()
105
106 root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
107 if err != nil {
108 logger.Error("error committing", "error", err)
109 return helpers.ServerError(e, nil)
110 }
111
112 if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil {
113 logger.Error("error updating repo after commit", "error", err)
114 return helpers.ServerError(e, nil)
115 }
116
117 return nil
118}