1package carstore
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "strconv"
9 "strings"
10 "time"
11
12 "github.com/bluesky-social/indigo/models"
13 "github.com/ipfs/go-cid"
14 "go.opentelemetry.io/otel"
15 "gorm.io/driver/postgres"
16 "gorm.io/gorm"
17)
18
19type CarStoreGormMeta struct {
20 meta *gorm.DB
21}
22
23func (cs *CarStoreGormMeta) Init() error {
24 if err := cs.meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil {
25 return err
26 }
27 if err := cs.meta.AutoMigrate(&staleRef{}); err != nil {
28 return err
29 }
30 return nil
31}
32
33// Return true if any known record matches (Uid, Cid)
34func (cs *CarStoreGormMeta) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) {
35 var count int64
36 if err := cs.meta.
37 Model(blockRef{}).
38 Select("path, block_refs.offset").
39 Joins("left join car_shards on block_refs.shard = car_shards.id").
40 Where("usr = ? AND cid = ?", user, models.DbCID{CID: k}).
41 Count(&count).Error; err != nil {
42 return false, err
43 }
44
45 return count > 0, nil
46}
47
48// For some Cid, lookup the block ref.
49// Return the path of the file written, the offset within the file, and the user associated with the Cid.
50func (cs *CarStoreGormMeta) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) {
51 // TODO: for now, im using a join to ensure we only query blocks from the
52 // correct user. maybe it makes sense to put the user in the blockRef
53 // directly? tradeoff of time vs space
54 var info struct {
55 Path string
56 Offset int64
57 Usr models.Uid
58 }
59 if err := cs.meta.Raw(`SELECT
60 (select path from car_shards where id = block_refs.shard) as path,
61 block_refs.offset,
62 (select usr from car_shards where id = block_refs.shard) as usr
63FROM block_refs
64WHERE
65 block_refs.cid = ?
66LIMIT 1;`, models.DbCID{CID: k}).Scan(&info).Error; err != nil {
67 var defaultUser models.Uid
68 return "", -1, defaultUser, err
69 }
70 return info.Path, info.Offset, info.Usr, nil
71}
72
73func (cs *CarStoreGormMeta) GetLastShard(ctx context.Context, user models.Uid) (*CarShard, error) {
74 var lastShard CarShard
75 if err := cs.meta.WithContext(ctx).Model(CarShard{}).Limit(1).Order("seq desc").Find(&lastShard, "usr = ?", user).Error; err != nil {
76 return nil, err
77 }
78 return &lastShard, nil
79}
80
81// return all of a users's shards, ascending by Seq
82func (cs *CarStoreGormMeta) GetUserShards(ctx context.Context, usr models.Uid) ([]CarShard, error) {
83 var shards []CarShard
84 if err := cs.meta.Order("seq asc").Find(&shards, "usr = ?", usr).Error; err != nil {
85 return nil, err
86 }
87 return shards, nil
88}
89
90// return all of a users's shards, descending by Seq
91func (cs *CarStoreGormMeta) GetUserShardsDesc(ctx context.Context, usr models.Uid, minSeq int) ([]CarShard, error) {
92 var shards []CarShard
93 if err := cs.meta.Order("seq desc").Find(&shards, "usr = ? AND seq >= ?", usr, minSeq).Error; err != nil {
94 return nil, err
95 }
96 return shards, nil
97}
98
99func (cs *CarStoreGormMeta) GetUserStaleRefs(ctx context.Context, user models.Uid) ([]staleRef, error) {
100 var staleRefs []staleRef
101 if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil {
102 return nil, err
103 }
104 return staleRefs, nil
105}
106
107func (cs *CarStoreGormMeta) SeqForRev(ctx context.Context, user models.Uid, sinceRev string) (int, error) {
108 var untilShard CarShard
109 if err := cs.meta.Where("rev >= ? AND usr = ?", sinceRev, user).Order("rev").First(&untilShard).Error; err != nil {
110 return 0, fmt.Errorf("finding early shard: %w", err)
111 }
112 return untilShard.Seq, nil
113}
114
115func (cs *CarStoreGormMeta) GetCompactionTargets(ctx context.Context, minShardCount int) ([]CompactionTarget, error) {
116 var targets []CompactionTarget
117 if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > ? order by num_shards desc`, minShardCount).Scan(&targets).Error; err != nil {
118 return nil, err
119 }
120
121 return targets, nil
122}
123
124func (cs *CarStoreGormMeta) PutShardAndRefs(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool) error {
125 // TODO: there should be a way to create the shard and block_refs that
126 // reference it in the same query, would save a lot of time
127 tx := cs.meta.WithContext(ctx).Begin()
128
129 if err := tx.WithContext(ctx).Create(shard).Error; err != nil {
130 return fmt.Errorf("failed to create shard in DB tx: %w", err)
131 }
132
133 for _, ref := range brefs {
134 ref["shard"] = shard.ID
135 }
136
137 if err := createBlockRefs(ctx, tx, brefs); err != nil {
138 return fmt.Errorf("failed to create block refs: %w", err)
139 }
140
141 if len(rmcids) > 0 {
142 cids := make([]cid.Cid, 0, len(rmcids))
143 for c := range rmcids {
144 cids = append(cids, c)
145 }
146
147 if err := tx.Create(&staleRef{
148 Cids: packCids(cids),
149 Usr: shard.Usr,
150 }).Error; err != nil {
151 return err
152 }
153 }
154
155 err := tx.WithContext(ctx).Commit().Error
156 if err != nil {
157 return fmt.Errorf("failed to commit shard DB transaction: %w", err)
158 }
159 return nil
160}
161
162func (cs *CarStoreGormMeta) DeleteShardsAndRefs(ctx context.Context, ids []uint) error {
163 txn := cs.meta.Begin()
164
165 if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil {
166 txn.Rollback()
167 return err
168 }
169
170 if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil {
171 txn.Rollback()
172 return err
173 }
174
175 return txn.Commit().Error
176}
177
178func (cs *CarStoreGormMeta) GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) {
179 chunkSize := 2000
180 out := make([]blockRef, 0, len(shardIds))
181 for i := 0; i < len(shardIds); i += chunkSize {
182 sl := shardIds[i:]
183 if len(sl) > chunkSize {
184 sl = sl[:chunkSize]
185 }
186
187 if err := blockRefsForShards(ctx, cs.meta, sl, &out); err != nil {
188 return nil, fmt.Errorf("getting block refs: %w", err)
189 }
190 }
191 return out, nil
192}
193
194// blockRefsForShards is an inner loop helper for GetBlockRefsForShards
195func blockRefsForShards(ctx context.Context, db *gorm.DB, shards []uint, obuf *[]blockRef) error {
196 // Check the database driver
197 switch db.Dialector.(type) {
198 case *postgres.Dialector:
199 sval := valuesStatementForShards(shards)
200 q := fmt.Sprintf(`SELECT block_refs.* FROM block_refs INNER JOIN (VALUES %s) AS vals(v) ON block_refs.shard = v`, sval)
201 return db.Raw(q).Scan(obuf).Error
202 default:
203 return db.Raw(`SELECT * FROM block_refs WHERE shard IN (?)`, shards).Scan(obuf).Error
204 }
205}
206
207// valuesStatementForShards builds a postgres compatible statement string from int literals
208func valuesStatementForShards(shards []uint) string {
209 sb := new(strings.Builder)
210 for i, v := range shards {
211 sb.WriteByte('(')
212 sb.WriteString(strconv.Itoa(int(v)))
213 sb.WriteByte(')')
214 if i != len(shards)-1 {
215 sb.WriteByte(',')
216 }
217 }
218 return sb.String()
219}
220
221func (cs *CarStoreGormMeta) SetStaleRef(ctx context.Context, uid models.Uid, staleToKeep []cid.Cid) error {
222 txn := cs.meta.Begin()
223
224 if err := txn.Delete(&staleRef{}, "usr = ?", uid).Error; err != nil {
225 return err
226 }
227
228 // now create a new staleRef with all the refs we couldn't clear out
229 if len(staleToKeep) > 0 {
230 if err := txn.Create(&staleRef{
231 Usr: uid,
232 Cids: packCids(staleToKeep),
233 }).Error; err != nil {
234 return err
235 }
236 }
237
238 if err := txn.Commit().Error; err != nil {
239 return fmt.Errorf("failed to commit staleRef updates: %w", err)
240 }
241 return nil
242}
243
244type CarShard struct {
245 ID uint `gorm:"primarykey"`
246 CreatedAt time.Time
247
248 Root models.DbCID `gorm:"index"`
249 DataStart int64
250 Seq int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"`
251 Path string
252 Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"`
253 Rev string
254}
255
256type blockRef struct {
257 ID uint `gorm:"primarykey"`
258 Cid models.DbCID `gorm:"index"`
259 Shard uint `gorm:"index"`
260 Offset int64
261 //User uint `gorm:"index"`
262}
263
264type staleRef struct {
265 ID uint `gorm:"primarykey"`
266 Cid *models.DbCID
267 Cids []byte
268 Usr models.Uid `gorm:"index"`
269}
270
271func (sr *staleRef) getCids() ([]cid.Cid, error) {
272 if sr.Cid != nil {
273 return []cid.Cid{sr.Cid.CID}, nil
274 }
275
276 return unpackCids(sr.Cids)
277}
278
279func unpackCids(b []byte) ([]cid.Cid, error) {
280 br := bytes.NewReader(b)
281 var out []cid.Cid
282 for {
283 _, c, err := cid.CidFromReader(br)
284 if err != nil {
285 if err == io.EOF {
286 break
287 }
288 return nil, err
289 }
290
291 out = append(out, c)
292 }
293
294 return out, nil
295}
296
297func packCids(cids []cid.Cid) []byte {
298 buf := new(bytes.Buffer)
299 for _, c := range cids {
300 buf.Write(c.Bytes())
301 }
302
303 return buf.Bytes()
304}
305
306func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) error {
307 ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs")
308 defer span.End()
309
310 if err := createInBatches(ctx, tx, brefs, 2000); err != nil {
311 return err
312 }
313
314 return nil
315}
316
317// Function to create in batches
318func createInBatches(ctx context.Context, tx *gorm.DB, brefs []map[string]any, batchSize int) error {
319 for i := 0; i < len(brefs); i += batchSize {
320 batch := brefs[i:]
321 if len(batch) > batchSize {
322 batch = batch[:batchSize]
323 }
324
325 query, values := generateInsertQuery(batch)
326
327 if err := tx.WithContext(ctx).Exec(query, values...).Error; err != nil {
328 return err
329 }
330 }
331 return nil
332}
333
334func generateInsertQuery(brefs []map[string]any) (string, []any) {
335 placeholders := strings.Repeat("(?, ?, ?),", len(brefs))
336 placeholders = placeholders[:len(placeholders)-1] // trim trailing comma
337
338 query := "INSERT INTO block_refs (\"cid\", \"offset\", \"shard\") VALUES " + placeholders
339
340 values := make([]any, 0, 3*len(brefs))
341 for _, entry := range brefs {
342 values = append(values, entry["cid"], entry["offset"], entry["shard"])
343 }
344
345 return query, values
346}