fork of indigo with slightly nicer lexgen
1package carstore
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 ipld "github.com/ipfs/go-ipld-format"
8 "io"
9 "log/slog"
10 "sync"
11
12 "github.com/bluesky-social/indigo/models"
13 blockformat "github.com/ipfs/go-block-format"
14 "github.com/ipfs/go-cid"
15 car "github.com/ipld/go-car"
16 "go.opentelemetry.io/otel"
17 "gorm.io/gorm"
18 "gorm.io/gorm/clause"
19)
20
21type NonArchivalCarstore struct {
22 db *gorm.DB
23
24 lk sync.Mutex
25 lastCommitCache map[models.Uid]*commitRefInfo
26
27 log *slog.Logger
28}
29
30func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) {
31 if err := db.AutoMigrate(&commitRefInfo{}); err != nil {
32 return nil, err
33 }
34
35 return &NonArchivalCarstore{
36 db: db,
37 lastCommitCache: make(map[models.Uid]*commitRefInfo),
38 log: slog.Default().With("system", "carstorena"),
39 }, nil
40}
41
42type commitRefInfo struct {
43 ID uint `gorm:"primarykey"`
44 Uid models.Uid `gorm:"uniqueIndex"`
45 Rev string
46 Root models.DbCID
47}
48
49func (cs *NonArchivalCarstore) checkLastShardCache(user models.Uid) *commitRefInfo {
50 cs.lk.Lock()
51 defer cs.lk.Unlock()
52
53 ls, ok := cs.lastCommitCache[user]
54 if ok {
55 return ls
56 }
57
58 return nil
59}
60
61func (cs *NonArchivalCarstore) removeLastShardCache(user models.Uid) {
62 cs.lk.Lock()
63 defer cs.lk.Unlock()
64
65 delete(cs.lastCommitCache, user)
66}
67
68func (cs *NonArchivalCarstore) putLastShardCache(ls *commitRefInfo) {
69 cs.lk.Lock()
70 defer cs.lk.Unlock()
71
72 cs.lastCommitCache[ls.Uid] = ls
73}
74
75func (cs *NonArchivalCarstore) loadCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) {
76 var out commitRefInfo
77 wat := cs.db.Find(&out, "uid = ?", user)
78 if wat.Error != nil {
79 return nil, wat.Error
80 }
81 if wat.RowsAffected == 0 {
82 return nil, nil
83 }
84 return &out, nil
85}
86
87func (cs *NonArchivalCarstore) getCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) {
88 ctx, span := otel.Tracer("carstore").Start(ctx, "getCommitRefInfo")
89 defer span.End()
90
91 maybeLs := cs.checkLastShardCache(user)
92 if maybeLs != nil {
93 return maybeLs, nil
94 }
95
96 lastShard, err := cs.loadCommitRefInfo(ctx, user)
97 if err != nil {
98 return nil, err
99 }
100 if lastShard == nil {
101 return nil, nil
102 }
103
104 cs.putLastShardCache(lastShard)
105 return lastShard, nil
106}
107
108func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models.Uid, rev string, cid cid.Cid) error {
109 cri := &commitRefInfo{
110 Uid: uid,
111 Rev: rev,
112 Root: models.DbCID{CID: cid},
113 }
114
115 if err := cs.db.Clauses(clause.OnConflict{
116 Columns: []clause.Column{{Name: "uid"}},
117 UpdateAll: true,
118 }).Create(cri).Error; err != nil {
119 return fmt.Errorf("update or set last commit info: %w", err)
120 }
121
122 cs.putLastShardCache(cri)
123
124 return nil
125}
126
127var commitRefZero = commitRefInfo{}
128
129func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
130 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
131 defer span.End()
132
133 // TODO: ensure that we don't write updates on top of the wrong head
134 // this needs to be a compare and swap type operation
135 lastShard, err := cs.getCommitRefInfo(ctx, user)
136 if err != nil {
137 return nil, err
138 }
139
140 if lastShard == nil {
141 // ok, no previous user state to refer to
142 lastShard = &commitRefZero
143 } else if since != nil && *since != lastShard.Rev {
144 cs.log.Warn("revision mismatch", "commitSince", since, "lastRev", lastShard.Rev, "err", ErrRepoBaseMismatch)
145 }
146
147 return &DeltaSession{
148 blks: make(map[cid.Cid]blockformat.Block),
149 base: &userView{
150 user: user,
151 cs: cs,
152 prefetch: true,
153 cache: make(map[cid.Cid]blockformat.Block),
154 },
155 user: user,
156 baseCid: lastShard.Root.CID,
157 cs: cs,
158 seq: 0,
159 lastRev: lastShard.Rev,
160 }, nil
161}
162
163func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
164 return &DeltaSession{
165 base: &userView{
166 user: user,
167 cs: cs,
168 prefetch: false,
169 cache: make(map[cid.Cid]blockformat.Block),
170 },
171 readonly: true,
172 user: user,
173 cs: cs,
174 }, nil
175}
176
177// TODO: incremental is only ever called true, remove the param
178func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
179 return fmt.Errorf("not supported in non-archival mode")
180}
181
182func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
183 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
184 defer span.End()
185
186 carr, err := car.NewCarReader(bytes.NewReader(carslice))
187 if err != nil {
188 return cid.Undef, nil, err
189 }
190
191 if len(carr.Header.Roots) != 1 {
192 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
193 }
194
195 ds, err := cs.NewDeltaSession(ctx, uid, since)
196 if err != nil {
197 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
198 }
199
200 var cids []cid.Cid
201 for {
202 blk, err := carr.Next()
203 if err != nil {
204 if err == io.EOF {
205 break
206 }
207 return cid.Undef, nil, err
208 }
209
210 cids = append(cids, blk.Cid())
211
212 if err := ds.Put(ctx, blk); err != nil {
213 return cid.Undef, nil, err
214 }
215 }
216
217 return carr.Header.Roots[0], ds, nil
218}
219
220func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
221 lastShard, err := cs.getCommitRefInfo(ctx, user)
222 if err != nil {
223 return cid.Undef, err
224 }
225 if lastShard == nil || lastShard.ID == 0 {
226 return cid.Undef, nil
227 }
228
229 return lastShard.Root.CID, nil
230}
231
232func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
233 lastShard, err := cs.getCommitRefInfo(ctx, user)
234 if err != nil {
235 return "", err
236 }
237 if lastShard == nil || lastShard.ID == 0 {
238 return "", nil
239 }
240
241 return lastShard.Rev, nil
242}
243
244func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
245 return nil, nil
246}
247
248func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error {
249 if err := cs.db.Raw("DELETE from commit_ref_infos WHERE uid = ?", user).Error; err != nil {
250 return err
251 }
252
253 cs.removeLastShardCache(user)
254 return nil
255}
256
257func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
258 return nil, fmt.Errorf("compaction not supported on non-archival")
259}
260
261func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
262 return nil, fmt.Errorf("compaction not supported in non-archival")
263}
264
265func (cs *NonArchivalCarstore) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) {
266 return false, nil
267}
268
269func (cs *NonArchivalCarstore) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) {
270 return "", 0, 0, ipld.ErrNotFound{Cid: k}
271}
272
273func (cs *NonArchivalCarstore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
274 slice, err := blocksToCar(ctx, root, rev, blks)
275 if err != nil {
276 return nil, err
277 }
278 return slice, cs.updateLastCommit(ctx, user, rev, root)
279}