AT Protocol IPLD-CAR Repository toolkit (CLI)
1package repo
2
3import (
4 "context"
5 "fmt"
6 "io"
7
8 cbor2 "github.com/fxamacker/cbor/v2"
9
10 "github.com/bluesky-social/indigo/mst"
11 "github.com/bluesky-social/indigo/util"
12 blockstore "github.com/ipfs/boxo/blockstore"
13 "github.com/ipfs/go-cid"
14 "github.com/ipfs/go-datastore"
15 cbor "github.com/ipfs/go-ipld-cbor"
16 "github.com/ipld/go-car/v2"
17)
18
19// current version of repo currently implemented
20const ATP_REPO_VERSION int64 = 3
21
22type SignedCommit struct {
23 Did string `cborgen:"did"`
24 Version int64 `cborgen:"version"`
25 Prev *cid.Cid `cborgen:"prev"`
26 Data cid.Cid `cborgen:"data"`
27 Sig []byte `cborgen:"sig"`
28}
29
30type UnsignedCommit struct {
31 Did string `cborgen:"did"`
32 Version int64 `cborgen:"version"`
33 Prev *cid.Cid `cborgen:"prev"`
34 Data cid.Cid `cborgen:"data"`
35}
36
37type Repo struct {
38 Blocks int
39
40 sc SignedCommit
41 cst cbor.IpldStore
42 bs blockstore.Blockstore
43
44 repoCid cid.Cid
45
46 mst *mst.MerkleSearchTree
47
48 dirty bool
49}
50
51// Returns a copy of commit without the Sig field. Helpful when verifying signature.
52func (sc *SignedCommit) Unsigned() *UnsignedCommit {
53 return &UnsignedCommit{
54 Did: sc.Did,
55 Version: sc.Version,
56 Prev: sc.Prev,
57 Data: sc.Data,
58 }
59}
60
61// returns bytes of the DAG-CBOR representation of object. This is what gets
62// signed; the `go-did` library will take the SHA-256 of the bytes and sign
63// that.
64/*func (uc *UnsignedCommit) BytesForSigning() ([]byte, error) {
65 buf := new(bytes.Buffer)
66 /if err := uc.MarshalCBOR(buf); err != nil {
67 return []byte{}, err
68 }
69 return buf.Bytes(), nil
70}*/
71
72func IngestRepo(ctx context.Context, bs blockstore.Blockstore, r io.Reader) (cid.Cid, int, error) {
73 br, err := car.NewBlockReader(r)
74 if err != nil {
75 return cid.Undef, 0, err
76 }
77
78 size := 0
79 for {
80 blk, err := br.Next()
81 if err != nil {
82 if err == io.EOF {
83 break
84 }
85 return cid.Undef, size, err
86 }
87
88 if err := bs.Put(ctx, blk); err != nil {
89 return cid.Undef, size, err
90 }
91 size++
92 }
93
94 return br.Roots[0], size, nil
95}
96
97func ReadRepoFromCar(ctx context.Context, r io.Reader) (*Repo, error) {
98 bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
99 root, size, err := IngestRepo(ctx, bs, r)
100 if err != nil {
101 return nil, err
102 }
103
104 return OpenRepo(ctx, bs, root, size)
105}
106
107func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid, size int) (*Repo, error) {
108 cst := util.CborStore(bs)
109
110 var sc SignedCommit
111 if err := cst.Get(ctx, root, &sc); err != nil {
112 return nil, fmt.Errorf("loading root from blockstore: %w", err)
113 }
114
115 if sc.Version > ATP_REPO_VERSION {
116 return nil, fmt.Errorf("unsupported repo version: %d", sc.Version)
117 }
118
119 return &Repo{
120 sc: sc,
121 bs: bs,
122 cst: cst,
123 repoCid: root,
124 Blocks: size,
125 }, nil
126}
127
128func (r *Repo) GetCommitsPath(len int) ([]cid.Cid, error) {
129 path := []cid.Cid{}
130 path = append(path, r.repoCid)
131 if r.sc.Prev != nil {
132 getParentCommits(r, *r.sc.Prev, &path, len-1)
133 }
134 return path, nil
135}
136
137func getParentCommits(r *Repo, c cid.Cid, p *[]cid.Cid, len int) ([]cid.Cid, error) {
138 var sc SignedCommit
139 ctx := context.TODO()
140 if err := r.cst.Get(ctx, c, &sc); err != nil {
141 return nil, fmt.Errorf("loading root from blockstore: %w", err)
142 }
143 *p = append(*p, c)
144 if len == 0 {
145 return nil, nil
146 } else {
147 len--
148 }
149 if sc.Prev != nil {
150 return getParentCommits(r, *sc.Prev, p, len)
151 }
152 return nil, nil
153}
154
155func (r *Repo) Head() cid.Cid {
156 return r.repoCid
157}
158
159func (r *Repo) SignedCommit() SignedCommit {
160 return r.sc
161}
162
163func (r *Repo) MerkleSearchTree() *mst.MerkleSearchTree {
164 return r.mst
165}
166
167func (r *Repo) BlockStore() blockstore.Blockstore {
168 return r.bs
169}
170
171func (r *Repo) getMst(ctx context.Context) (*mst.MerkleSearchTree, error) {
172 if r.mst != nil {
173 return r.mst, nil
174 }
175
176 t := mst.LoadMST(r.cst, r.sc.Data)
177 r.mst = t
178 return t, nil
179}
180
181func (r *Repo) MST() *mst.MerkleSearchTree {
182 mst, _ := r.getMst(context.TODO())
183 return mst
184}
185
186var ErrDoneIterating = fmt.Errorf("done iterating")
187
188func (r *Repo) ForEach(ctx context.Context, prefix string, cb func(k string, v cid.Cid) error) error {
189 t, _ := r.getMst(ctx)
190
191 if err := t.WalkLeavesFrom(ctx, prefix, cb); err != nil {
192 if err != ErrDoneIterating {
193 return err
194 }
195 }
196
197 return nil
198}
199
200func (r *Repo) GetRecord(ctx context.Context, rpath string) (cid.Cid, map[string]interface{}, error) {
201 mst, err := r.getMst(ctx)
202 if err != nil {
203 return cid.Undef, nil, fmt.Errorf("getting repo mst: %w", err)
204 }
205
206 cc, err := mst.Get(ctx, rpath)
207 if err != nil {
208 return cid.Undef, nil, fmt.Errorf("resolving rpath within mst: %w", err)
209 }
210
211 blk, err := r.bs.Get(ctx, cc)
212 if err != nil {
213 return cid.Undef, nil, err
214 }
215 var v map[string]interface{}
216 cbor2.Unmarshal(blk.RawData(), &v)
217
218 return cc, v, nil
219}