+54
-346
carstore/bs.go
+54
-346
carstore/bs.go
···
4
4
"bufio"
5
5
"bytes"
6
6
"context"
7
-
"encoding/binary"
8
7
"fmt"
9
8
"io"
10
9
"os"
11
10
"path/filepath"
12
11
"sort"
13
-
"strconv"
14
-
"strings"
15
12
"sync"
16
13
"sync/atomic"
17
14
"time"
···
33
30
cbg "github.com/whyrusleeping/cbor-gen"
34
31
"go.opentelemetry.io/otel"
35
32
"go.opentelemetry.io/otel/attribute"
36
-
"gorm.io/driver/postgres"
37
33
"gorm.io/gorm"
38
34
)
39
35
···
53
49
const BigShardThreshold = 2 << 20
54
50
55
51
type CarStore struct {
56
-
meta *gorm.DB
52
+
meta *CarStoreGormMeta
57
53
rootDir string
58
54
59
55
lscLk sync.Mutex
···
78
74
}
79
75
80
76
return &CarStore{
81
-
meta: meta,
77
+
meta: &CarStoreGormMeta{meta: meta},
82
78
rootDir: root,
83
79
lastShardCache: make(map[models.Uid]*CarShard),
84
80
}, nil
85
81
}
86
82
87
-
type UserInfo struct {
88
-
gorm.Model
89
-
Head string
90
-
}
91
-
92
-
type CarShard struct {
93
-
ID uint `gorm:"primarykey"`
94
-
CreatedAt time.Time
95
-
96
-
Root models.DbCID `gorm:"index"`
97
-
DataStart int64
98
-
Seq int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"`
99
-
Path string
100
-
Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"`
101
-
Rev string
102
-
}
103
-
104
-
type blockRef struct {
105
-
ID uint `gorm:"primarykey"`
106
-
Cid models.DbCID `gorm:"index"`
107
-
Shard uint `gorm:"index"`
108
-
Offset int64
109
-
//User uint `gorm:"index"`
110
-
}
111
-
112
-
type staleRef struct {
113
-
ID uint `gorm:"primarykey"`
114
-
Cid *models.DbCID
115
-
Cids []byte
116
-
Usr models.Uid `gorm:"index"`
117
-
}
118
-
119
-
func (sr *staleRef) getCids() ([]cid.Cid, error) {
120
-
if sr.Cid != nil {
121
-
return []cid.Cid{sr.Cid.CID}, nil
122
-
}
123
-
124
-
return unpackCids(sr.Cids)
125
-
}
126
-
127
-
func packCids(cids []cid.Cid) []byte {
128
-
buf := new(bytes.Buffer)
129
-
for _, c := range cids {
130
-
buf.Write(c.Bytes())
131
-
}
132
-
133
-
return buf.Bytes()
134
-
}
135
-
136
-
func unpackCids(b []byte) ([]cid.Cid, error) {
137
-
br := bytes.NewReader(b)
138
-
var out []cid.Cid
139
-
for {
140
-
_, c, err := cid.CidFromReader(br)
141
-
if err != nil {
142
-
if err == io.EOF {
143
-
break
144
-
}
145
-
return nil, err
146
-
}
147
-
148
-
out = append(out, c)
149
-
}
150
-
151
-
return out, nil
152
-
}
153
-
154
83
type userView struct {
155
84
cs *CarStore
156
85
user models.Uid
···
166
95
}
167
96
168
97
func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) {
169
-
var count int64
170
-
if err := uv.cs.meta.
171
-
Model(blockRef{}).
172
-
Select("path, block_refs.offset").
173
-
Joins("left join car_shards on block_refs.shard = car_shards.id").
174
-
Where("usr = ? AND cid = ?", uv.user, models.DbCID{CID: k}).
175
-
Count(&count).Error; err != nil {
176
-
return false, err
177
-
}
178
-
179
-
return count > 0, nil
98
+
return uv.cs.meta.HasUidCid(ctx, uv.user, k)
180
99
}
181
100
182
101
var CacheHits int64
···
197
116
}
198
117
atomic.AddInt64(&CacheMiss, 1)
199
118
200
-
// TODO: for now, im using a join to ensure we only query blocks from the
201
-
// correct user. maybe it makes sense to put the user in the blockRef
202
-
// directly? tradeoff of time vs space
203
-
var info struct {
204
-
Path string
205
-
Offset int64
206
-
Usr models.Uid
207
-
}
208
-
if err := uv.cs.meta.Raw(`SELECT
209
-
(select path from car_shards where id = block_refs.shard) as path,
210
-
block_refs.offset,
211
-
(select usr from car_shards where id = block_refs.shard) as usr
212
-
FROM block_refs
213
-
WHERE
214
-
block_refs.cid = ?
215
-
LIMIT 1;`, models.DbCID{CID: k}).Scan(&info).Error; err != nil {
119
+
path, offset, user, err := uv.cs.meta.LookupBlockRef(ctx, k)
120
+
if err != nil {
216
121
return nil, err
217
122
}
218
-
if info.Path == "" {
123
+
if path == "" {
219
124
return nil, ipld.ErrNotFound{Cid: k}
220
125
}
221
126
222
127
prefetch := uv.prefetch
223
-
if info.Usr != uv.user {
128
+
if user != uv.user {
224
129
blockGetTotalCounterUsrskip.Add(1)
225
130
prefetch = false
226
131
} else {
···
228
133
}
229
134
230
135
if prefetch {
231
-
return uv.prefetchRead(ctx, k, info.Path, info.Offset)
136
+
return uv.prefetchRead(ctx, k, path, offset)
232
137
} else {
233
-
return uv.singleRead(ctx, k, info.Path, info.Offset)
138
+
return uv.singleRead(ctx, k, path, offset)
234
139
}
235
140
}
236
141
···
390
295
return maybeLs, nil
391
296
}
392
297
393
-
var lastShard CarShard
394
-
// this is often slow (which is why we're caching it) but could be sped up with an extra index:
395
-
// CREATE INDEX idx_car_shards_usr_id ON car_shards (usr, seq DESC);
396
-
if err := cs.meta.WithContext(ctx).Model(CarShard{}).Limit(1).Order("seq desc").Find(&lastShard, "usr = ?", user).Error; err != nil {
397
-
//if err := cs.meta.Model(CarShard{}).Where("user = ?", user).Last(&lastShard).Error; err != nil {
398
-
//if err != gorm.ErrRecordNotFound {
298
+
lastShard, err := cs.meta.GetLastShard(ctx, user)
299
+
if err != nil {
399
300
return nil, err
400
-
//}
401
301
}
402
302
403
-
cs.putLastShardCache(&lastShard)
404
-
return &lastShard, nil
303
+
cs.putLastShardCache(lastShard)
304
+
return lastShard, nil
405
305
}
406
306
407
307
var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head")
···
452
352
}, nil
453
353
}
454
354
355
+
// TODO: incremental is only ever called true, remove the param
455
356
func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
456
357
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
457
358
defer span.End()
458
359
459
360
var earlySeq int
460
361
if sinceRev != "" {
461
-
var untilShard CarShard
462
-
if err := cs.meta.Where("rev >= ? AND usr = ?", sinceRev, user).Order("rev").First(&untilShard).Error; err != nil {
463
-
return fmt.Errorf("finding early shard: %w", err)
362
+
var err error
363
+
earlySeq, err = cs.meta.SeqForRev(ctx, user, sinceRev)
364
+
if err != nil {
365
+
return err
464
366
}
465
-
earlySeq = untilShard.Seq
466
367
}
467
368
468
-
var shards []CarShard
469
-
if err := cs.meta.Order("seq desc").Where("usr = ? AND seq >= ?", user, earlySeq).Find(&shards).Error; err != nil {
369
+
// TODO: Why does ReadUserCar want shards seq DESC but CompactUserShards wants seq ASC ?
370
+
shards, err := cs.meta.GetUserShardsDesc(ctx, user, earlySeq)
371
+
if err != nil {
470
372
return err
471
373
}
472
374
375
+
// TODO: incremental is only ever called true, so this is fine and we can remove the error check
473
376
if !incremental && earlySeq > 0 {
474
377
// have to do it the ugly way
475
378
return fmt.Errorf("nyi")
···
496
399
return nil
497
400
}
498
401
402
+
// inner loop part of ReadUserCar
403
+
// copy shard blocks from disk to Writer
499
404
func (cs *CarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error {
500
405
ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks")
501
406
defer span.End()
···
519
424
return nil
520
425
}
521
426
522
-
func (cs *CarStore) writeBlockFromShard(ctx context.Context, sh *CarShard, w io.Writer, c cid.Cid) error {
523
-
fi, err := os.Open(sh.Path)
524
-
if err != nil {
525
-
return err
526
-
}
527
-
defer fi.Close()
528
-
529
-
rr, err := car.NewCarReader(fi)
530
-
if err != nil {
531
-
return err
532
-
}
533
-
534
-
for {
535
-
blk, err := rr.Next()
536
-
if err != nil {
537
-
return err
538
-
}
539
-
540
-
if blk.Cid() == c {
541
-
_, err := LdWrite(w, c.Bytes(), blk.RawData())
542
-
return err
543
-
}
544
-
}
545
-
}
546
-
427
+
// inner loop part of compactBucket
547
428
func (cs *CarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error {
548
429
fi, err := os.Open(sh.Path)
549
430
if err != nil {
···
769
650
ctx, span := otel.Tracer("carstore").Start(ctx, "putShard")
770
651
defer span.End()
771
652
772
-
// TODO: there should be a way to create the shard and block_refs that
773
-
// reference it in the same query, would save a lot of time
774
-
tx := cs.meta.WithContext(ctx).Begin()
775
-
776
-
if err := tx.WithContext(ctx).Create(shard).Error; err != nil {
777
-
return fmt.Errorf("failed to create shard in DB tx: %w", err)
778
-
}
779
-
780
-
if !nocache {
781
-
cs.putLastShardCache(shard)
782
-
}
783
-
784
-
for _, ref := range brefs {
785
-
ref["shard"] = shard.ID
786
-
}
787
-
788
-
if err := createBlockRefs(ctx, tx, brefs); err != nil {
789
-
return fmt.Errorf("failed to create block refs: %w", err)
790
-
}
791
-
792
-
if len(rmcids) > 0 {
793
-
cids := make([]cid.Cid, 0, len(rmcids))
794
-
for c := range rmcids {
795
-
cids = append(cids, c)
796
-
}
797
-
798
-
if err := tx.Create(&staleRef{
799
-
Cids: packCids(cids),
800
-
Usr: shard.Usr,
801
-
}).Error; err != nil {
802
-
return err
803
-
}
804
-
}
805
-
806
-
err := tx.WithContext(ctx).Commit().Error
653
+
err := cs.meta.PutShardAndRefs(ctx, shard, brefs, rmcids)
807
654
if err != nil {
808
-
return fmt.Errorf("failed to commit shard DB transaction: %w", err)
809
-
}
810
-
811
-
return nil
812
-
}
813
-
814
-
func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) error {
815
-
ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs")
816
-
defer span.End()
817
-
818
-
if err := createInBatches(ctx, tx, brefs, 2000); err != nil {
819
655
return err
820
656
}
821
657
822
-
return nil
823
-
}
824
-
825
-
func generateInsertQuery(data []map[string]any) (string, []any) {
826
-
placeholders := strings.Repeat("(?, ?, ?),", len(data))
827
-
placeholders = placeholders[:len(placeholders)-1] // trim trailing comma
828
-
829
-
query := "INSERT INTO block_refs (\"cid\", \"offset\", \"shard\") VALUES " + placeholders
830
-
831
-
values := make([]any, 0, 3*len(data))
832
-
for _, entry := range data {
833
-
values = append(values, entry["cid"], entry["offset"], entry["shard"])
658
+
if !nocache {
659
+
cs.putLastShardCache(shard)
834
660
}
835
661
836
-
return query, values
837
-
}
838
-
839
-
// Function to create in batches
840
-
func createInBatches(ctx context.Context, tx *gorm.DB, data []map[string]any, batchSize int) error {
841
-
for i := 0; i < len(data); i += batchSize {
842
-
batch := data[i:]
843
-
if len(batch) > batchSize {
844
-
batch = batch[:batchSize]
845
-
}
846
-
847
-
query, values := generateInsertQuery(batch)
848
-
849
-
if err := tx.WithContext(ctx).Exec(query, values...).Error; err != nil {
850
-
return err
851
-
}
852
-
}
853
662
return nil
854
-
}
855
-
856
-
func LdWrite(w io.Writer, d ...[]byte) (int64, error) {
857
-
var sum uint64
858
-
for _, s := range d {
859
-
sum += uint64(len(s))
860
-
}
861
-
862
-
buf := make([]byte, 8)
863
-
n := binary.PutUvarint(buf, sum)
864
-
nw, err := w.Write(buf[:n])
865
-
if err != nil {
866
-
return 0, err
867
-
}
868
-
869
-
for _, s := range d {
870
-
onw, err := w.Write(s)
871
-
if err != nil {
872
-
return int64(nw), err
873
-
}
874
-
nw += onw
875
-
}
876
-
877
-
return int64(nw), nil
878
-
}
879
-
880
-
func setToSlice(s map[cid.Cid]bool) []cid.Cid {
881
-
out := make([]cid.Cid, 0, len(s))
882
-
for c := range s {
883
-
out = append(out, c)
884
-
}
885
-
886
-
return out
887
663
}
888
664
889
665
func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipcids map[cid.Cid]bool) (map[cid.Cid]bool, error) {
···
1029
805
}
1030
806
1031
807
func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
1032
-
var shards []CarShard
1033
-
if err := cs.meta.Order("seq asc").Find(&shards, "usr = ?", usr).Error; err != nil {
808
+
shards, err := cs.meta.GetUserShards(ctx, usr)
809
+
if err != nil {
1034
810
return nil, err
1035
811
}
1036
812
···
1047
823
}
1048
824
1049
825
func (cs *CarStore) WipeUserData(ctx context.Context, user models.Uid) error {
1050
-
var shards []*CarShard
1051
-
if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil {
826
+
shards, err := cs.meta.GetUserShards(ctx, user)
827
+
if err != nil {
1052
828
return err
1053
829
}
1054
830
···
1063
839
return nil
1064
840
}
1065
841
1066
-
func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error {
842
+
func (cs *CarStore) deleteShards(ctx context.Context, shs []CarShard) error {
1067
843
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards")
1068
844
defer span.End()
1069
845
1070
-
deleteSlice := func(ctx context.Context, subs []*CarShard) error {
1071
-
var ids []uint
1072
-
for _, sh := range subs {
1073
-
ids = append(ids, sh.ID)
1074
-
}
1075
-
1076
-
txn := cs.meta.Begin()
1077
-
1078
-
if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil {
1079
-
return err
846
+
deleteSlice := func(ctx context.Context, subs []CarShard) error {
847
+
ids := make([]uint, len(subs))
848
+
for i, sh := range subs {
849
+
ids[i] = sh.ID
1080
850
}
1081
851
1082
-
if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil {
1083
-
return err
1084
-
}
1085
-
1086
-
if err := txn.Commit().Error; err != nil {
852
+
err := cs.meta.DeleteShardsAndRefs(ctx, ids)
853
+
if err != nil {
1087
854
return err
1088
855
}
1089
856
1090
857
for _, sh := range subs {
1091
-
if err := cs.deleteShardFile(ctx, sh); err != nil {
858
+
if err := cs.deleteShardFile(ctx, &sh); err != nil {
1092
859
if !os.IsNotExist(err) {
1093
860
return err
1094
861
}
···
1200
967
1201
968
func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
1202
969
// TODO: some overwrite protections
970
+
// NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary".
971
+
// This creates "sh-%d-%d%s" with some random stuff in the last position
1203
972
fi, err := os.CreateTemp(cs.rootDir, fnameForShard(user, seq))
1204
973
if err != nil {
1205
974
return nil, "", err
···
1217
986
ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets")
1218
987
defer span.End()
1219
988
1220
-
var targets []CompactionTarget
1221
-
if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > ? order by num_shards desc`, shardCount).Scan(&targets).Error; err != nil {
1222
-
return nil, err
1223
-
}
1224
-
1225
-
return targets, nil
989
+
return cs.meta.GetCompactionTargets(ctx, shardCount)
1226
990
}
1227
991
992
+
// getBlockRefsForShards is a prep function for CompactUserShards
1228
993
func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) {
1229
994
ctx, span := otel.Tracer("carstore").Start(ctx, "getBlockRefsForShards")
1230
995
defer span.End()
1231
996
1232
997
span.SetAttributes(attribute.Int("shards", len(shardIds)))
1233
998
1234
-
chunkSize := 2000
1235
-
out := make([]blockRef, 0, len(shardIds))
1236
-
for i := 0; i < len(shardIds); i += chunkSize {
1237
-
sl := shardIds[i:]
1238
-
if len(sl) > chunkSize {
1239
-
sl = sl[:chunkSize]
1240
-
}
1241
-
1242
-
if err := blockRefsForShards(ctx, cs.meta, sl, &out); err != nil {
1243
-
return nil, fmt.Errorf("getting block refs: %w", err)
1244
-
}
999
+
out, err := cs.meta.GetBlockRefsForShards(ctx, shardIds)
1000
+
if err != nil {
1001
+
return nil, err
1245
1002
}
1246
1003
1247
1004
span.SetAttributes(attribute.Int("refs", len(out)))
···
1249
1006
return out, nil
1250
1007
}
1251
1008
1252
-
func valuesStatementForShards(shards []uint) string {
1253
-
sb := new(strings.Builder)
1254
-
for i, v := range shards {
1255
-
sb.WriteByte('(')
1256
-
sb.WriteString(strconv.Itoa(int(v)))
1257
-
sb.WriteByte(')')
1258
-
if i != len(shards)-1 {
1259
-
sb.WriteByte(',')
1260
-
}
1261
-
}
1262
-
return sb.String()
1263
-
}
1264
-
1265
-
func blockRefsForShards(ctx context.Context, db *gorm.DB, shards []uint, obuf *[]blockRef) error {
1266
-
// Check the database driver
1267
-
switch db.Dialector.(type) {
1268
-
case *postgres.Dialector:
1269
-
sval := valuesStatementForShards(shards)
1270
-
q := fmt.Sprintf(`SELECT block_refs.* FROM block_refs INNER JOIN (VALUES %s) AS vals(v) ON block_refs.shard = v`, sval)
1271
-
return db.Raw(q).Scan(obuf).Error
1272
-
default:
1273
-
return db.Raw(`SELECT * FROM block_refs WHERE shard IN (?)`, shards).Scan(obuf).Error
1274
-
}
1275
-
}
1276
-
1277
1009
func shardSize(sh *CarShard) (int64, error) {
1278
1010
st, err := os.Stat(sh.Path)
1279
1011
if err != nil {
···
1302
1034
1303
1035
span.SetAttributes(attribute.Int64("user", int64(user)))
1304
1036
1305
-
var shards []CarShard
1306
-
if err := cs.meta.WithContext(ctx).Find(&shards, "usr = ?", user).Error; err != nil {
1037
+
shards, err := cs.meta.GetUserShards(ctx, user)
1038
+
if err != nil {
1307
1039
return nil, err
1308
1040
}
1309
-
1310
-
sort.Slice(shards, func(i, j int) bool {
1311
-
return shards[i].Seq < shards[j].Seq
1312
-
})
1313
1041
1314
1042
if skipBigShards {
1315
1043
// Since we generally expect shards to start bigger and get smaller,
···
1356
1084
1357
1085
span.SetAttributes(attribute.Int("blockRefs", len(brefs)))
1358
1086
1359
-
var staleRefs []staleRef
1360
-
if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil {
1087
+
staleRefs, err := cs.meta.GetUserStaleRefs(ctx, user)
1088
+
if err != nil {
1361
1089
return nil, err
1362
1090
}
1363
1091
···
1486
1214
1487
1215
stats.NewShards++
1488
1216
1489
-
var todelete []*CarShard
1217
+
todelete := make([]CarShard, 0, len(b.shards))
1490
1218
for _, s := range b.shards {
1491
1219
removedShards[s.ID] = true
1492
1220
sh, ok := shardsById[s.ID]
···
1494
1222
return nil, fmt.Errorf("missing shard to delete")
1495
1223
}
1496
1224
1497
-
todelete = append(todelete, &sh)
1225
+
todelete = append(todelete, sh)
1498
1226
}
1499
1227
1500
1228
stats.ShardsDeleted += len(todelete)
···
1546
1274
}
1547
1275
}
1548
1276
1549
-
txn := cs.meta.Begin()
1550
-
1551
-
if err := txn.Delete(&staleRef{}, "usr = ?", uid).Error; err != nil {
1552
-
return err
1553
-
}
1554
-
1555
-
// now create a new staleRef with all the refs we couldn't clear out
1556
-
if len(staleToKeep) > 0 {
1557
-
if err := txn.Create(&staleRef{
1558
-
Usr: uid,
1559
-
Cids: packCids(staleToKeep),
1560
-
}).Error; err != nil {
1561
-
return err
1562
-
}
1563
-
}
1564
-
1565
-
if err := txn.Commit().Error; err != nil {
1566
-
return fmt.Errorf("failed to commit staleRef updates: %w", err)
1567
-
}
1568
-
1569
-
return nil
1277
+
return cs.meta.SetStaleRef(ctx, uid, staleToKeep)
1570
1278
}
1571
1279
1572
1280
func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error {
+346
carstore/meta_gorm.go
+346
carstore/meta_gorm.go
···
1
+
package carstore
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"io"
8
+
"strconv"
9
+
"strings"
10
+
"time"
11
+
12
+
"github.com/bluesky-social/indigo/models"
13
+
"github.com/ipfs/go-cid"
14
+
"go.opentelemetry.io/otel"
15
+
"gorm.io/driver/postgres"
16
+
"gorm.io/gorm"
17
+
)
18
+
19
+
type CarStoreGormMeta struct {
20
+
meta *gorm.DB
21
+
}
22
+
23
+
func (cs *CarStoreGormMeta) Init() error {
24
+
if err := cs.meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil {
25
+
return err
26
+
}
27
+
if err := cs.meta.AutoMigrate(&staleRef{}); err != nil {
28
+
return err
29
+
}
30
+
return nil
31
+
}
32
+
33
+
// Return true if any known record matches (Uid, Cid)
34
+
func (cs *CarStoreGormMeta) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) {
35
+
var count int64
36
+
if err := cs.meta.
37
+
Model(blockRef{}).
38
+
Select("path, block_refs.offset").
39
+
Joins("left join car_shards on block_refs.shard = car_shards.id").
40
+
Where("usr = ? AND cid = ?", user, models.DbCID{CID: k}).
41
+
Count(&count).Error; err != nil {
42
+
return false, err
43
+
}
44
+
45
+
return count > 0, nil
46
+
}
47
+
48
+
// For some Cid, lookup the block ref.
49
+
// Return the path of the file written, the offset within the file, and the user associated with the Cid.
50
+
func (cs *CarStoreGormMeta) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) {
51
+
// TODO: for now, im using a join to ensure we only query blocks from the
52
+
// correct user. maybe it makes sense to put the user in the blockRef
53
+
// directly? tradeoff of time vs space
54
+
var info struct {
55
+
Path string
56
+
Offset int64
57
+
Usr models.Uid
58
+
}
59
+
if err := cs.meta.Raw(`SELECT
60
+
(select path from car_shards where id = block_refs.shard) as path,
61
+
block_refs.offset,
62
+
(select usr from car_shards where id = block_refs.shard) as usr
63
+
FROM block_refs
64
+
WHERE
65
+
block_refs.cid = ?
66
+
LIMIT 1;`, models.DbCID{CID: k}).Scan(&info).Error; err != nil {
67
+
var defaultUser models.Uid
68
+
return "", -1, defaultUser, err
69
+
}
70
+
return info.Path, info.Offset, info.Usr, nil
71
+
}
72
+
73
+
func (cs *CarStoreGormMeta) GetLastShard(ctx context.Context, user models.Uid) (*CarShard, error) {
74
+
var lastShard CarShard
75
+
if err := cs.meta.WithContext(ctx).Model(CarShard{}).Limit(1).Order("seq desc").Find(&lastShard, "usr = ?", user).Error; err != nil {
76
+
return nil, err
77
+
}
78
+
return &lastShard, nil
79
+
}
80
+
81
+
// return all of a users's shards, ascending by Seq
82
+
func (cs *CarStoreGormMeta) GetUserShards(ctx context.Context, usr models.Uid) ([]CarShard, error) {
83
+
var shards []CarShard
84
+
if err := cs.meta.Order("seq asc").Find(&shards, "usr = ?", usr).Error; err != nil {
85
+
return nil, err
86
+
}
87
+
return shards, nil
88
+
}
89
+
90
+
// return all of a users's shards, descending by Seq
91
+
func (cs *CarStoreGormMeta) GetUserShardsDesc(ctx context.Context, usr models.Uid, minSeq int) ([]CarShard, error) {
92
+
var shards []CarShard
93
+
if err := cs.meta.Order("seq desc").Find(&shards, "usr = ? AND seq >= ?", usr, minSeq).Error; err != nil {
94
+
return nil, err
95
+
}
96
+
return shards, nil
97
+
}
98
+
99
+
func (cs *CarStoreGormMeta) GetUserStaleRefs(ctx context.Context, user models.Uid) ([]staleRef, error) {
100
+
var staleRefs []staleRef
101
+
if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil {
102
+
return nil, err
103
+
}
104
+
return staleRefs, nil
105
+
}
106
+
107
+
func (cs *CarStoreGormMeta) SeqForRev(ctx context.Context, user models.Uid, sinceRev string) (int, error) {
108
+
var untilShard CarShard
109
+
if err := cs.meta.Where("rev >= ? AND usr = ?", sinceRev, user).Order("rev").First(&untilShard).Error; err != nil {
110
+
return 0, fmt.Errorf("finding early shard: %w", err)
111
+
}
112
+
return untilShard.Seq, nil
113
+
}
114
+
115
+
func (cs *CarStoreGormMeta) GetCompactionTargets(ctx context.Context, minShardCount int) ([]CompactionTarget, error) {
116
+
var targets []CompactionTarget
117
+
if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > ? order by num_shards desc`, minShardCount).Scan(&targets).Error; err != nil {
118
+
return nil, err
119
+
}
120
+
121
+
return targets, nil
122
+
}
123
+
124
+
func (cs *CarStoreGormMeta) PutShardAndRefs(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool) error {
125
+
// TODO: there should be a way to create the shard and block_refs that
126
+
// reference it in the same query, would save a lot of time
127
+
tx := cs.meta.WithContext(ctx).Begin()
128
+
129
+
if err := tx.WithContext(ctx).Create(shard).Error; err != nil {
130
+
return fmt.Errorf("failed to create shard in DB tx: %w", err)
131
+
}
132
+
133
+
for _, ref := range brefs {
134
+
ref["shard"] = shard.ID
135
+
}
136
+
137
+
if err := createBlockRefs(ctx, tx, brefs); err != nil {
138
+
return fmt.Errorf("failed to create block refs: %w", err)
139
+
}
140
+
141
+
if len(rmcids) > 0 {
142
+
cids := make([]cid.Cid, 0, len(rmcids))
143
+
for c := range rmcids {
144
+
cids = append(cids, c)
145
+
}
146
+
147
+
if err := tx.Create(&staleRef{
148
+
Cids: packCids(cids),
149
+
Usr: shard.Usr,
150
+
}).Error; err != nil {
151
+
return err
152
+
}
153
+
}
154
+
155
+
err := tx.WithContext(ctx).Commit().Error
156
+
if err != nil {
157
+
return fmt.Errorf("failed to commit shard DB transaction: %w", err)
158
+
}
159
+
return nil
160
+
}
161
+
162
+
func (cs *CarStoreGormMeta) DeleteShardsAndRefs(ctx context.Context, ids []uint) error {
163
+
txn := cs.meta.Begin()
164
+
165
+
if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil {
166
+
txn.Rollback()
167
+
return err
168
+
}
169
+
170
+
if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil {
171
+
txn.Rollback()
172
+
return err
173
+
}
174
+
175
+
return txn.Commit().Error
176
+
}
177
+
178
+
func (cs *CarStoreGormMeta) GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) {
179
+
chunkSize := 2000
180
+
out := make([]blockRef, 0, len(shardIds))
181
+
for i := 0; i < len(shardIds); i += chunkSize {
182
+
sl := shardIds[i:]
183
+
if len(sl) > chunkSize {
184
+
sl = sl[:chunkSize]
185
+
}
186
+
187
+
if err := blockRefsForShards(ctx, cs.meta, sl, &out); err != nil {
188
+
return nil, fmt.Errorf("getting block refs: %w", err)
189
+
}
190
+
}
191
+
return out, nil
192
+
}
193
+
194
+
// blockRefsForShards is an inner loop helper for GetBlockRefsForShards
195
+
func blockRefsForShards(ctx context.Context, db *gorm.DB, shards []uint, obuf *[]blockRef) error {
196
+
// Check the database driver
197
+
switch db.Dialector.(type) {
198
+
case *postgres.Dialector:
199
+
sval := valuesStatementForShards(shards)
200
+
q := fmt.Sprintf(`SELECT block_refs.* FROM block_refs INNER JOIN (VALUES %s) AS vals(v) ON block_refs.shard = v`, sval)
201
+
return db.Raw(q).Scan(obuf).Error
202
+
default:
203
+
return db.Raw(`SELECT * FROM block_refs WHERE shard IN (?)`, shards).Scan(obuf).Error
204
+
}
205
+
}
206
+
207
+
// valuesStatementForShards builds a postgres compatible statement string from int literals
208
+
func valuesStatementForShards(shards []uint) string {
209
+
sb := new(strings.Builder)
210
+
for i, v := range shards {
211
+
sb.WriteByte('(')
212
+
sb.WriteString(strconv.Itoa(int(v)))
213
+
sb.WriteByte(')')
214
+
if i != len(shards)-1 {
215
+
sb.WriteByte(',')
216
+
}
217
+
}
218
+
return sb.String()
219
+
}
220
+
221
+
func (cs *CarStoreGormMeta) SetStaleRef(ctx context.Context, uid models.Uid, staleToKeep []cid.Cid) error {
222
+
txn := cs.meta.Begin()
223
+
224
+
if err := txn.Delete(&staleRef{}, "usr = ?", uid).Error; err != nil {
225
+
return err
226
+
}
227
+
228
+
// now create a new staleRef with all the refs we couldn't clear out
229
+
if len(staleToKeep) > 0 {
230
+
if err := txn.Create(&staleRef{
231
+
Usr: uid,
232
+
Cids: packCids(staleToKeep),
233
+
}).Error; err != nil {
234
+
return err
235
+
}
236
+
}
237
+
238
+
if err := txn.Commit().Error; err != nil {
239
+
return fmt.Errorf("failed to commit staleRef updates: %w", err)
240
+
}
241
+
return nil
242
+
}
243
+
244
+
type CarShard struct {
245
+
ID uint `gorm:"primarykey"`
246
+
CreatedAt time.Time
247
+
248
+
Root models.DbCID `gorm:"index"`
249
+
DataStart int64
250
+
Seq int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"`
251
+
Path string
252
+
Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"`
253
+
Rev string
254
+
}
255
+
256
+
type blockRef struct {
257
+
ID uint `gorm:"primarykey"`
258
+
Cid models.DbCID `gorm:"index"`
259
+
Shard uint `gorm:"index"`
260
+
Offset int64
261
+
//User uint `gorm:"index"`
262
+
}
263
+
264
+
type staleRef struct {
265
+
ID uint `gorm:"primarykey"`
266
+
Cid *models.DbCID
267
+
Cids []byte
268
+
Usr models.Uid `gorm:"index"`
269
+
}
270
+
271
+
func (sr *staleRef) getCids() ([]cid.Cid, error) {
272
+
if sr.Cid != nil {
273
+
return []cid.Cid{sr.Cid.CID}, nil
274
+
}
275
+
276
+
return unpackCids(sr.Cids)
277
+
}
278
+
279
+
func unpackCids(b []byte) ([]cid.Cid, error) {
280
+
br := bytes.NewReader(b)
281
+
var out []cid.Cid
282
+
for {
283
+
_, c, err := cid.CidFromReader(br)
284
+
if err != nil {
285
+
if err == io.EOF {
286
+
break
287
+
}
288
+
return nil, err
289
+
}
290
+
291
+
out = append(out, c)
292
+
}
293
+
294
+
return out, nil
295
+
}
296
+
297
+
func packCids(cids []cid.Cid) []byte {
298
+
buf := new(bytes.Buffer)
299
+
for _, c := range cids {
300
+
buf.Write(c.Bytes())
301
+
}
302
+
303
+
return buf.Bytes()
304
+
}
305
+
306
+
func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) error {
307
+
ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs")
308
+
defer span.End()
309
+
310
+
if err := createInBatches(ctx, tx, brefs, 2000); err != nil {
311
+
return err
312
+
}
313
+
314
+
return nil
315
+
}
316
+
317
+
// Function to create in batches
318
+
func createInBatches(ctx context.Context, tx *gorm.DB, brefs []map[string]any, batchSize int) error {
319
+
for i := 0; i < len(brefs); i += batchSize {
320
+
batch := brefs[i:]
321
+
if len(batch) > batchSize {
322
+
batch = batch[:batchSize]
323
+
}
324
+
325
+
query, values := generateInsertQuery(batch)
326
+
327
+
if err := tx.WithContext(ctx).Exec(query, values...).Error; err != nil {
328
+
return err
329
+
}
330
+
}
331
+
return nil
332
+
}
333
+
334
+
func generateInsertQuery(brefs []map[string]any) (string, []any) {
335
+
placeholders := strings.Repeat("(?, ?, ?),", len(brefs))
336
+
placeholders = placeholders[:len(placeholders)-1] // trim trailing comma
337
+
338
+
query := "INSERT INTO block_refs (\"cid\", \"offset\", \"shard\") VALUES " + placeholders
339
+
340
+
values := make([]any, 0, 3*len(brefs))
341
+
for _, entry := range brefs {
342
+
values = append(values, entry["cid"], entry["offset"], entry["shard"])
343
+
}
344
+
345
+
return query, values
346
+
}
+32
carstore/util.go
+32
carstore/util.go
···
1
+
package carstore
2
+
3
+
import (
4
+
"encoding/binary"
5
+
"io"
6
+
)
7
+
8
+
// Length-delimited Write
9
+
// Writer stream gets Uvarint length then concatenated data
10
+
func LdWrite(w io.Writer, d ...[]byte) (int64, error) {
11
+
var sum uint64
12
+
for _, s := range d {
13
+
sum += uint64(len(s))
14
+
}
15
+
16
+
buf := make([]byte, 8)
17
+
n := binary.PutUvarint(buf, sum)
18
+
nw, err := w.Write(buf[:n])
19
+
if err != nil {
20
+
return 0, err
21
+
}
22
+
23
+
for _, s := range d {
24
+
onw, err := w.Write(s)
25
+
if err != nil {
26
+
return int64(nw), err
27
+
}
28
+
nw += onw
29
+
}
30
+
31
+
return int64(nw), nil
32
+
}