1package carstore
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "path/filepath"
12 "sort"
13 "sync/atomic"
14 "time"
15
16 "github.com/bluesky-social/indigo/models"
17 "github.com/prometheus/client_golang/prometheus"
18 "github.com/prometheus/client_golang/prometheus/promauto"
19
20 blockformat "github.com/ipfs/go-block-format"
21 "github.com/ipfs/go-cid"
22 blockstore "github.com/ipfs/go-ipfs-blockstore"
23 cbor "github.com/ipfs/go-ipld-cbor"
24 ipld "github.com/ipfs/go-ipld-format"
25 "github.com/ipfs/go-libipfs/blocks"
26 car "github.com/ipld/go-car"
27 carutil "github.com/ipld/go-car/util"
28 cbg "github.com/whyrusleeping/cbor-gen"
29 "go.opentelemetry.io/otel"
30 "go.opentelemetry.io/otel/attribute"
31 "gorm.io/gorm"
32)
33
34var blockGetTotalCounter = promauto.NewCounterVec(prometheus.CounterOpts{
35 Name: "carstore_block_get_total",
36 Help: "carstore get queries",
37}, []string{"usrskip", "cache"})
38
39var blockGetTotalCounterUsrskip = blockGetTotalCounter.WithLabelValues("true", "miss")
40var blockGetTotalCounterCached = blockGetTotalCounter.WithLabelValues("false", "hit")
41var blockGetTotalCounterNormal = blockGetTotalCounter.WithLabelValues("false", "miss")
42
43const MaxSliceLength = 2 << 20
44
45const BigShardThreshold = 2 << 20
46
47type CarStore interface {
48 // TODO: not really part of general interface
49 CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
50 // TODO: not really part of general interface
51 GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
52
53 GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
54 GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
55 ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
56 NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
57 ReadOnlySession(user models.Uid) (*DeltaSession, error)
58 ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error
59 Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)
60 WipeUserData(ctx context.Context, user models.Uid) error
61}
62
63type FileCarStore struct {
64 meta *CarStoreGormMeta
65 rootDirs []string
66
67 lastShardCache lastShardCache
68
69 log *slog.Logger
70}
71
72func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) {
73 for _, root := range roots {
74 if _, err := os.Stat(root); err != nil {
75 if !os.IsNotExist(err) {
76 return nil, err
77 }
78
79 if err := os.Mkdir(root, 0775); err != nil {
80 return nil, err
81 }
82 }
83 }
84 if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil {
85 return nil, err
86 }
87 if err := meta.AutoMigrate(&staleRef{}); err != nil {
88 return nil, err
89 }
90
91 gormMeta := &CarStoreGormMeta{meta: meta}
92 out := &FileCarStore{
93 meta: gormMeta,
94 rootDirs: roots,
95 lastShardCache: lastShardCache{
96 source: gormMeta,
97 },
98 log: slog.Default().With("system", "carstore"),
99 }
100 out.lastShardCache.Init()
101 return out, nil
102}
103
104// userView needs these things to get into the underlying block store
105// implemented by CarStoreGormMeta
106type userViewSource interface {
107 HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error)
108 LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error)
109}
110
111// wrapper into a block store that keeps track of which user we are working on behalf of
112type userView struct {
113 cs userViewSource
114 user models.Uid
115
116 cache map[cid.Cid]blockformat.Block
117 prefetch bool
118}
119
120var _ blockstore.Blockstore = (*userView)(nil)
121
122func (uv *userView) HashOnRead(hor bool) {
123 //noop
124}
125
126func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) {
127 _, have := uv.cache[k]
128 if have {
129 return have, nil
130 }
131 return uv.cs.HasUidCid(ctx, uv.user, k)
132}
133
134var CacheHits int64
135var CacheMiss int64
136
137func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, error) {
138
139 if !k.Defined() {
140 return nil, fmt.Errorf("attempted to 'get' undefined cid")
141 }
142 if uv.cache != nil {
143 blk, ok := uv.cache[k]
144 if ok {
145 blockGetTotalCounterCached.Add(1)
146 atomic.AddInt64(&CacheHits, 1)
147
148 return blk, nil
149 }
150 }
151 atomic.AddInt64(&CacheMiss, 1)
152
153 path, offset, user, err := uv.cs.LookupBlockRef(ctx, k)
154 if err != nil {
155 return nil, err
156 }
157 if path == "" {
158 return nil, ipld.ErrNotFound{Cid: k}
159 }
160
161 prefetch := uv.prefetch
162 if user != uv.user {
163 blockGetTotalCounterUsrskip.Add(1)
164 prefetch = false
165 } else {
166 blockGetTotalCounterNormal.Add(1)
167 }
168
169 if prefetch {
170 return uv.prefetchRead(ctx, k, path, offset)
171 } else {
172 return uv.singleRead(ctx, k, path, offset)
173 }
174}
175
176const prefetchThreshold = 512 << 10
177
178func (uv *userView) prefetchRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) {
179 ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard")
180 defer span.End()
181
182 fi, err := os.Open(path)
183 if err != nil {
184 return nil, err
185 }
186 defer fi.Close()
187
188 st, err := fi.Stat()
189 if err != nil {
190 return nil, fmt.Errorf("stat file for prefetch: %w", err)
191 }
192
193 span.SetAttributes(attribute.Int64("shard_size", st.Size()))
194
195 if st.Size() > prefetchThreshold {
196 span.SetAttributes(attribute.Bool("no_prefetch", true))
197 return doBlockRead(fi, k, offset)
198 }
199
200 cr, err := car.NewCarReader(fi)
201 if err != nil {
202 return nil, err
203 }
204
205 for {
206 blk, err := cr.Next()
207 if err != nil {
208 if err == io.EOF {
209 break
210 }
211 return nil, err
212 }
213
214 uv.cache[blk.Cid()] = blk
215 }
216
217 outblk, ok := uv.cache[k]
218 if !ok {
219 return nil, fmt.Errorf("requested block was not found in car slice")
220 }
221
222 return outblk, nil
223}
224
225func (uv *userView) singleRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) {
226 fi, err := os.Open(path)
227 if err != nil {
228 return nil, err
229 }
230 defer fi.Close()
231
232 return doBlockRead(fi, k, offset)
233}
234
235func doBlockRead(fi *os.File, k cid.Cid, offset int64) (blockformat.Block, error) {
236 seeked, err := fi.Seek(offset, io.SeekStart)
237 if err != nil {
238 return nil, err
239 }
240
241 if seeked != offset {
242 return nil, fmt.Errorf("failed to seek to offset (%d != %d)", seeked, offset)
243 }
244
245 bufr := bufio.NewReader(fi)
246 rcid, data, err := carutil.ReadNode(bufr)
247 if err != nil {
248 return nil, err
249 }
250
251 if rcid != k {
252 return nil, fmt.Errorf("mismatch in cid on disk: %s != %s", rcid, k)
253 }
254
255 return blocks.NewBlockWithCid(data, rcid)
256}
257
258func (uv *userView) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
259 return nil, fmt.Errorf("not implemented")
260}
261
262func (uv *userView) Put(ctx context.Context, blk blockformat.Block) error {
263 return fmt.Errorf("puts not supported to car view blockstores")
264}
265
266func (uv *userView) PutMany(ctx context.Context, blks []blockformat.Block) error {
267 return fmt.Errorf("puts not supported to car view blockstores")
268}
269
270func (uv *userView) DeleteBlock(ctx context.Context, k cid.Cid) error {
271 return fmt.Errorf("deletes not supported to car view blockstore")
272}
273
274func (uv *userView) GetSize(ctx context.Context, k cid.Cid) (int, error) {
275 // TODO: maybe block size should be in the database record...
276 blk, err := uv.Get(ctx, k)
277 if err != nil {
278 return 0, err
279 }
280
281 return len(blk.RawData()), nil
282}
283
284// subset of blockstore.Blockstore that we actually use here
285type minBlockstore interface {
286 Get(ctx context.Context, bcid cid.Cid) (blockformat.Block, error)
287 Has(ctx context.Context, bcid cid.Cid) (bool, error)
288 GetSize(ctx context.Context, bcid cid.Cid) (int, error)
289}
290
291type DeltaSession struct {
292 blks map[cid.Cid]blockformat.Block
293 rmcids map[cid.Cid]bool
294 base minBlockstore
295 user models.Uid
296 baseCid cid.Cid
297 seq int
298 readonly bool
299 cs shardWriter
300 lastRev string
301}
302
303func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard {
304 return cs.lastShardCache.check(user)
305}
306
307func (cs *FileCarStore) removeLastShardCache(user models.Uid) {
308 cs.lastShardCache.remove(user)
309}
310
311func (cs *FileCarStore) putLastShardCache(ls *CarShard) {
312 cs.lastShardCache.put(ls)
313}
314
315func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) {
316 return cs.lastShardCache.get(ctx, user)
317}
318
319var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head")
320
321func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
322 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
323 defer span.End()
324
325 // TODO: ensure that we don't write updates on top of the wrong head
326 // this needs to be a compare and swap type operation
327 lastShard, err := cs.getLastShard(ctx, user)
328 if err != nil {
329 return nil, err
330 }
331
332 if since != nil && *since != lastShard.Rev {
333 return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
334 }
335
336 return &DeltaSession{
337 blks: make(map[cid.Cid]blockformat.Block),
338 base: &userView{
339 user: user,
340 cs: cs.meta,
341 prefetch: true,
342 cache: make(map[cid.Cid]blockformat.Block),
343 },
344 user: user,
345 baseCid: lastShard.Root.CID,
346 cs: cs,
347 seq: lastShard.Seq + 1,
348 lastRev: lastShard.Rev,
349 }, nil
350}
351
352func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
353 return &DeltaSession{
354 base: &userView{
355 user: user,
356 cs: cs.meta,
357 prefetch: false,
358 cache: make(map[cid.Cid]blockformat.Block),
359 },
360 readonly: true,
361 user: user,
362 cs: cs,
363 }, nil
364}
365
366// TODO: incremental is only ever called true, remove the param
367func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
368 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
369 defer span.End()
370
371 var earlySeq int
372 if sinceRev != "" {
373 var err error
374 earlySeq, err = cs.meta.SeqForRev(ctx, user, sinceRev)
375 if err != nil {
376 return err
377 }
378 }
379
380 shards, err := cs.meta.GetUserShardsDesc(ctx, user, earlySeq)
381 if err != nil {
382 return err
383 }
384
385 // TODO: incremental is only ever called true, so this is fine and we can remove the error check
386 if !incremental && earlySeq > 0 {
387 // have to do it the ugly way
388 return fmt.Errorf("nyi")
389 }
390
391 if len(shards) == 0 {
392 return fmt.Errorf("no data found for user %d", user)
393 }
394
395 // fast path!
396 if err := car.WriteHeader(&car.CarHeader{
397 Roots: []cid.Cid{shards[0].Root.CID},
398 Version: 1,
399 }, shardOut); err != nil {
400 return err
401 }
402
403 for _, sh := range shards {
404 if err := cs.writeShardBlocks(ctx, &sh, shardOut); err != nil {
405 return err
406 }
407 }
408
409 return nil
410}
411
412// inner loop part of ReadUserCar
413// copy shard blocks from disk to Writer
414func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, shardOut io.Writer) error {
415 ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks")
416 defer span.End()
417
418 fi, err := os.Open(sh.Path)
419 if err != nil {
420 return err
421 }
422 defer fi.Close()
423
424 _, err = fi.Seek(sh.DataStart, io.SeekStart)
425 if err != nil {
426 return err
427 }
428
429 _, err = io.Copy(shardOut, fi)
430 if err != nil {
431 return err
432 }
433
434 return nil
435}
436
437// inner loop part of compactBucket
438func (cs *FileCarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error {
439 fi, err := os.Open(sh.Path)
440 if err != nil {
441 return err
442 }
443 defer fi.Close()
444
445 rr, err := car.NewCarReader(fi)
446 if err != nil {
447 return fmt.Errorf("opening shard car: %w", err)
448 }
449
450 for {
451 blk, err := rr.Next()
452 if err != nil {
453 if err == io.EOF {
454 return nil
455 }
456 return err
457 }
458
459 if err := cb(blk); err != nil {
460 return err
461 }
462 }
463}
464
465var _ blockstore.Blockstore = (*DeltaSession)(nil)
466
467func (ds *DeltaSession) BaseCid() cid.Cid {
468 return ds.baseCid
469}
470
471func (ds *DeltaSession) Put(ctx context.Context, b blockformat.Block) error {
472 if ds.readonly {
473 return fmt.Errorf("cannot write to readonly deltaSession")
474 }
475 ds.blks[b.Cid()] = b
476 return nil
477}
478
479func (ds *DeltaSession) PutMany(ctx context.Context, bs []blockformat.Block) error {
480 if ds.readonly {
481 return fmt.Errorf("cannot write to readonly deltaSession")
482 }
483
484 for _, b := range bs {
485 ds.blks[b.Cid()] = b
486 }
487 return nil
488}
489
490func (ds *DeltaSession) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
491 return nil, fmt.Errorf("AllKeysChan not implemented")
492}
493
494func (ds *DeltaSession) DeleteBlock(ctx context.Context, c cid.Cid) error {
495 if ds.readonly {
496 return fmt.Errorf("cannot write to readonly deltaSession")
497 }
498
499 if _, ok := ds.blks[c]; !ok {
500 return ipld.ErrNotFound{Cid: c}
501 }
502
503 delete(ds.blks, c)
504 return nil
505}
506
507func (ds *DeltaSession) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
508 b, ok := ds.blks[c]
509 if ok {
510 return b, nil
511 }
512
513 return ds.base.Get(ctx, c)
514}
515
516func (ds *DeltaSession) Has(ctx context.Context, c cid.Cid) (bool, error) {
517 _, ok := ds.blks[c]
518 if ok {
519 return true, nil
520 }
521
522 return ds.base.Has(ctx, c)
523}
524
525func (ds *DeltaSession) HashOnRead(hor bool) {
526 // noop?
527}
528
529func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error) {
530 b, ok := ds.blks[c]
531 if ok {
532 return len(b.RawData()), nil
533 }
534
535 return ds.base.GetSize(ctx, c)
536}
537
538func fnameForShard(user models.Uid, seq int) string {
539 return fmt.Sprintf("sh-%d-%d", user, seq)
540}
541
542func (cs *FileCarStore) dirForUser(user models.Uid) string {
543 return cs.rootDirs[int(user)%len(cs.rootDirs)]
544}
545
546func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
547 // TODO: some overwrite protections
548 fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq))
549 fi, err := os.Create(fname)
550 if err != nil {
551 return nil, "", err
552 }
553
554 return fi, fname, nil
555}
556
557func (cs *FileCarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) {
558 _, span := otel.Tracer("carstore").Start(ctx, "writeNewShardFile")
559 defer span.End()
560
561 // TODO: some overwrite protections
562 fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq))
563 if err := os.WriteFile(fname, data, 0664); err != nil {
564 return "", err
565 }
566
567 return fname, nil
568}
569
570func (cs *FileCarStore) deleteShardFile(ctx context.Context, sh *CarShard) error {
571 return os.Remove(sh.Path)
572}
573
574// CloseWithRoot writes all new blocks in a car file to the writer with the
575// given cid as the 'root'
576func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error) {
577 ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot")
578 defer span.End()
579
580 if ds.readonly {
581 return nil, fmt.Errorf("cannot write to readonly deltaSession")
582 }
583
584 return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
585}
586
587func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
588 h := &car.CarHeader{
589 Roots: []cid.Cid{root},
590 Version: 1,
591 }
592 hb, err := cbor.DumpObject(h)
593 if err != nil {
594 return 0, err
595 }
596
597 hnw, err := LdWrite(w, hb)
598 if err != nil {
599 return 0, err
600 }
601
602 return hnw, nil
603}
604
605// shardWriter.writeNewShard called from inside DeltaSession.CloseWithRoot
606type shardWriter interface {
607 // writeNewShard stores blocks in `blks` arg and creates a new shard to propagate out to our firehose
608 writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error)
609}
610
611func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) {
612 buf := new(bytes.Buffer)
613 _, err := WriteCarHeader(buf, root)
614 if err != nil {
615 return nil, fmt.Errorf("failed to write car header: %w", err)
616 }
617
618 for k, blk := range blks {
619 _, err := LdWrite(buf, k.Bytes(), blk.RawData())
620 if err != nil {
621 return nil, fmt.Errorf("failed to write block: %w", err)
622 }
623 }
624
625 return buf.Bytes(), nil
626}
627
628func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
629
630 buf := new(bytes.Buffer)
631 hnw, err := WriteCarHeader(buf, root)
632 if err != nil {
633 return nil, fmt.Errorf("failed to write car header: %w", err)
634 }
635
636 // TODO: writing these blocks in map traversal order is bad, I believe the
637 // optimal ordering will be something like reverse-write-order, but random
638 // is definitely not it
639
640 offset := hnw
641 //brefs := make([]*blockRef, 0, len(ds.blks))
642 brefs := make([]map[string]interface{}, 0, len(blks))
643 for k, blk := range blks {
644 nw, err := LdWrite(buf, k.Bytes(), blk.RawData())
645 if err != nil {
646 return nil, fmt.Errorf("failed to write block: %w", err)
647 }
648
649 /*
650 brefs = append(brefs, &blockRef{
651 Cid: k.String(),
652 Offset: offset,
653 Shard: shard.ID,
654 })
655 */
656 // adding things to the db by map is the only way to get gorm to not
657 // add the 'returning' clause, which costs a lot of time
658 brefs = append(brefs, map[string]interface{}{
659 "cid": models.DbCID{CID: k},
660 "offset": offset,
661 })
662
663 offset += nw
664 }
665
666 start := time.Now()
667 path, err := cs.writeNewShardFile(ctx, user, seq, buf.Bytes())
668 if err != nil {
669 return nil, fmt.Errorf("failed to write shard file: %w", err)
670 }
671 writeShardFileDuration.Observe(time.Since(start).Seconds())
672
673 shard := CarShard{
674 Root: models.DbCID{CID: root},
675 DataStart: hnw,
676 Seq: seq,
677 Path: path,
678 Usr: user,
679 Rev: rev,
680 }
681
682 start = time.Now()
683 if err := cs.putShard(ctx, &shard, brefs, rmcids, false); err != nil {
684 return nil, err
685 }
686 writeShardMetadataDuration.Observe(time.Since(start).Seconds())
687
688 return buf.Bytes(), nil
689}
690
691func (cs *FileCarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error {
692 ctx, span := otel.Tracer("carstore").Start(ctx, "putShard")
693 defer span.End()
694
695 err := cs.meta.PutShardAndRefs(ctx, shard, brefs, rmcids)
696 if err != nil {
697 return err
698 }
699
700 if !nocache {
701 cs.putLastShardCache(shard)
702 }
703
704 return nil
705}
706
707func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipcids map[cid.Cid]bool) (map[cid.Cid]bool, error) {
708 ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff")
709 defer span.End()
710
711 if !oldroot.Defined() {
712 return map[cid.Cid]bool{}, nil
713 }
714
715 // walk the entire 'new' portion of the tree, marking all referenced cids as 'keep'
716 keepset := make(map[cid.Cid]bool)
717 for c := range newcids {
718 keepset[c] = true
719 oblk, err := bs.Get(ctx, c)
720 if err != nil {
721 return nil, fmt.Errorf("get failed in new tree: %w", err)
722 }
723
724 if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) {
725 keepset[lnk] = true
726 }); err != nil {
727 return nil, err
728 }
729 }
730
731 if keepset[oldroot] {
732 // this should probably never happen, but is technically correct
733 return nil, nil
734 }
735
736 // next, walk the old tree from the root, recursing on cids *not* in the keepset.
737 dropset := make(map[cid.Cid]bool)
738 dropset[oldroot] = true
739 queue := []cid.Cid{oldroot}
740
741 for len(queue) > 0 {
742 c := queue[0]
743 queue = queue[1:]
744
745 if skipcids != nil && skipcids[c] {
746 continue
747 }
748
749 oblk, err := bs.Get(ctx, c)
750 if err != nil {
751 return nil, fmt.Errorf("get failed in old tree: %w", err)
752 }
753
754 if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) {
755 if lnk.Prefix().Codec != cid.DagCBOR {
756 return
757 }
758
759 if !keepset[lnk] {
760 dropset[lnk] = true
761 queue = append(queue, lnk)
762 }
763 }); err != nil {
764 return nil, err
765 }
766 }
767
768 return dropset, nil
769}
770
771func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
772 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
773 defer span.End()
774
775 carr, err := car.NewCarReader(bytes.NewReader(carslice))
776 if err != nil {
777 return cid.Undef, nil, err
778 }
779
780 if len(carr.Header.Roots) != 1 {
781 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
782 }
783
784 ds, err := cs.NewDeltaSession(ctx, uid, since)
785 if err != nil {
786 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
787 }
788
789 var cids []cid.Cid
790 for {
791 blk, err := carr.Next()
792 if err != nil {
793 if err == io.EOF {
794 break
795 }
796 return cid.Undef, nil, err
797 }
798
799 cids = append(cids, blk.Cid())
800
801 if err := ds.Put(ctx, blk); err != nil {
802 return cid.Undef, nil, err
803 }
804 }
805
806 return carr.Header.Roots[0], ds, nil
807}
808
809func (ds *DeltaSession) CalcDiff(ctx context.Context, skipcids map[cid.Cid]bool) error {
810 rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks, skipcids)
811 if err != nil {
812 return fmt.Errorf("block diff failed (base=%s,rev=%s): %w", ds.baseCid, ds.lastRev, err)
813 }
814
815 ds.rmcids = rmcids
816 return nil
817}
818
819func (cs *FileCarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
820 lastShard, err := cs.getLastShard(ctx, user)
821 if err != nil {
822 return cid.Undef, err
823 }
824 if lastShard.ID == 0 {
825 return cid.Undef, nil
826 }
827
828 return lastShard.Root.CID, nil
829}
830
831func (cs *FileCarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
832 lastShard, err := cs.getLastShard(ctx, user)
833 if err != nil {
834 return "", err
835 }
836 if lastShard.ID == 0 {
837 return "", nil
838 }
839
840 return lastShard.Rev, nil
841}
842
843type UserStat struct {
844 Seq int
845 Root string
846 Created time.Time
847}
848
849func (cs *FileCarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
850 shards, err := cs.meta.GetUserShards(ctx, usr)
851 if err != nil {
852 return nil, err
853 }
854
855 var out []UserStat
856 for _, s := range shards {
857 out = append(out, UserStat{
858 Seq: s.Seq,
859 Root: s.Root.CID.String(),
860 Created: s.CreatedAt,
861 })
862 }
863
864 return out, nil
865}
866
867func (cs *FileCarStore) WipeUserData(ctx context.Context, user models.Uid) error {
868 shards, err := cs.meta.GetUserShards(ctx, user)
869 if err != nil {
870 return err
871 }
872
873 if err := cs.deleteShards(ctx, shards); err != nil {
874 if !os.IsNotExist(err) {
875 return err
876 }
877 }
878
879 cs.removeLastShardCache(user)
880
881 return nil
882}
883
884func (cs *FileCarStore) deleteShards(ctx context.Context, shs []CarShard) error {
885 ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards")
886 defer span.End()
887
888 deleteSlice := func(ctx context.Context, subs []CarShard) error {
889 ids := make([]uint, len(subs))
890 for i, sh := range subs {
891 ids[i] = sh.ID
892 }
893
894 err := cs.meta.DeleteShardsAndRefs(ctx, ids)
895 if err != nil {
896 return err
897 }
898
899 for _, sh := range subs {
900 if err := cs.deleteShardFile(ctx, &sh); err != nil {
901 if !os.IsNotExist(err) {
902 return err
903 }
904 cs.log.Warn("shard file we tried to delete did not exist", "shard", sh.ID, "path", sh.Path)
905 }
906 }
907
908 return nil
909 }
910
911 chunkSize := 2000
912 for i := 0; i < len(shs); i += chunkSize {
913 sl := shs[i:]
914 if len(sl) > chunkSize {
915 sl = sl[:chunkSize]
916 }
917
918 if err := deleteSlice(ctx, sl); err != nil {
919 return err
920 }
921 }
922
923 return nil
924}
925
926type shardStat struct {
927 ID uint
928 Dirty int
929 Seq int
930 Total int
931
932 refs []blockRef
933}
934
935func (s shardStat) dirtyFrac() float64 {
936 return float64(s.Dirty) / float64(s.Total)
937}
938
939func aggrRefs(brefs []blockRef, shards map[uint]CarShard, staleCids map[cid.Cid]bool) []shardStat {
940 byId := make(map[uint]*shardStat)
941
942 for _, br := range brefs {
943 s, ok := byId[br.Shard]
944 if !ok {
945 s = &shardStat{
946 ID: br.Shard,
947 Seq: shards[br.Shard].Seq,
948 }
949 byId[br.Shard] = s
950 }
951
952 s.Total++
953 if staleCids[br.Cid.CID] {
954 s.Dirty++
955 }
956
957 s.refs = append(s.refs, br)
958 }
959
960 var out []shardStat
961 for _, s := range byId {
962 out = append(out, *s)
963 }
964
965 sort.Slice(out, func(i, j int) bool {
966 return out[i].Seq < out[j].Seq
967 })
968
969 return out
970}
971
972type compBucket struct {
973 shards []shardStat
974
975 cleanBlocks int
976 expSize int
977}
978
979func (cb *compBucket) shouldCompact() bool {
980 if len(cb.shards) == 0 {
981 return false
982 }
983
984 if len(cb.shards) > 5 {
985 return true
986 }
987
988 var frac float64
989 for _, s := range cb.shards {
990 frac += s.dirtyFrac()
991 }
992 frac /= float64(len(cb.shards))
993
994 if len(cb.shards) > 3 && frac > 0.2 {
995 return true
996 }
997
998 return frac > 0.4
999}
1000
1001func (cb *compBucket) addShardStat(ss shardStat) {
1002 cb.cleanBlocks += (ss.Total - ss.Dirty)
1003 cb.shards = append(cb.shards, ss)
1004}
1005
1006func (cb *compBucket) isEmpty() bool {
1007 return len(cb.shards) == 0
1008}
1009
1010func (cs *FileCarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
1011 // TODO: some overwrite protections
1012 // NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary".
1013 // This creates "sh-%d-%d%s" with some random stuff in the last position
1014 fi, err := os.CreateTemp(cs.dirForUser(user), fnameForShard(user, seq))
1015 if err != nil {
1016 return nil, "", err
1017 }
1018
1019 return fi, fi.Name(), nil
1020}
1021
1022type CompactionTarget struct {
1023 Usr models.Uid
1024 NumShards int
1025}
1026
1027func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
1028 ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets")
1029 defer span.End()
1030
1031 return cs.meta.GetCompactionTargets(ctx, shardCount)
1032}
1033
1034// getBlockRefsForShards is a prep function for CompactUserShards
1035func (cs *FileCarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) {
1036 ctx, span := otel.Tracer("carstore").Start(ctx, "getBlockRefsForShards")
1037 defer span.End()
1038
1039 span.SetAttributes(attribute.Int("shards", len(shardIds)))
1040
1041 out, err := cs.meta.GetBlockRefsForShards(ctx, shardIds)
1042 if err != nil {
1043 return nil, err
1044 }
1045
1046 span.SetAttributes(attribute.Int("refs", len(out)))
1047
1048 return out, nil
1049}
1050
1051func shardSize(sh *CarShard) (int64, error) {
1052 st, err := os.Stat(sh.Path)
1053 if err != nil {
1054 if os.IsNotExist(err) {
1055 slog.Warn("missing shard, return size of zero", "path", sh.Path, "shard", sh.ID, "system", "carstore")
1056 return 0, nil
1057 }
1058 return 0, fmt.Errorf("stat %q: %w", sh.Path, err)
1059 }
1060
1061 return st.Size(), nil
1062}
1063
1064type CompactionStats struct {
1065 TotalRefs int `json:"totalRefs"`
1066 StartShards int `json:"startShards"`
1067 NewShards int `json:"newShards"`
1068 SkippedShards int `json:"skippedShards"`
1069 ShardsDeleted int `json:"shardsDeleted"`
1070 DupeCount int `json:"dupeCount"`
1071}
1072
1073func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
1074 ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards")
1075 defer span.End()
1076
1077 span.SetAttributes(attribute.Int64("user", int64(user)))
1078
1079 shards, err := cs.meta.GetUserShards(ctx, user)
1080 if err != nil {
1081 return nil, err
1082 }
1083
1084 if skipBigShards {
1085 // Since we generally expect shards to start bigger and get smaller,
1086 // and because we want to avoid compacting non-adjacent shards
1087 // together, and because we want to avoid running a stat on every
1088 // single shard (can be expensive for repos that haven't been compacted
1089 // in a while) we only skip a prefix of shard files that are over the
1090 // threshold. this may end up not skipping some shards that are over
1091 // the threshold if a below-threshold shard occurs before them, but
1092 // since this is a heuristic and imperfect optimization, that is
1093 // acceptable.
1094 var skip int
1095 for i, sh := range shards {
1096 size, err := shardSize(&sh)
1097 if err != nil {
1098 return nil, fmt.Errorf("could not check size of shard file: %w", err)
1099 }
1100
1101 if size > BigShardThreshold {
1102 skip = i + 1
1103 } else {
1104 break
1105 }
1106 }
1107 shards = shards[skip:]
1108 }
1109
1110 span.SetAttributes(attribute.Int("shards", len(shards)))
1111
1112 var shardIds []uint
1113 for _, s := range shards {
1114 shardIds = append(shardIds, s.ID)
1115 }
1116
1117 shardsById := make(map[uint]CarShard)
1118 for _, s := range shards {
1119 shardsById[s.ID] = s
1120 }
1121
1122 brefs, err := cs.getBlockRefsForShards(ctx, shardIds)
1123 if err != nil {
1124 return nil, fmt.Errorf("getting block refs failed: %w", err)
1125 }
1126
1127 span.SetAttributes(attribute.Int("blockRefs", len(brefs)))
1128
1129 staleRefs, err := cs.meta.GetUserStaleRefs(ctx, user)
1130 if err != nil {
1131 return nil, err
1132 }
1133
1134 span.SetAttributes(attribute.Int("staleRefs", len(staleRefs)))
1135
1136 stale := make(map[cid.Cid]bool)
1137 for _, br := range staleRefs {
1138 cids, err := br.getCids()
1139 if err != nil {
1140 return nil, fmt.Errorf("failed to unpack cids from staleRefs record (%d): %w", br.ID, err)
1141 }
1142 for _, c := range cids {
1143 stale[c] = true
1144 }
1145 }
1146
1147 // if we have a staleRef that references multiple blockRefs, we consider that block a 'dirty duplicate'
1148 var dupes []cid.Cid
1149 var hasDirtyDupes bool
1150 seenBlocks := make(map[cid.Cid]bool)
1151 for _, br := range brefs {
1152 if seenBlocks[br.Cid.CID] {
1153 dupes = append(dupes, br.Cid.CID)
1154 hasDirtyDupes = true
1155 delete(stale, br.Cid.CID)
1156 } else {
1157 seenBlocks[br.Cid.CID] = true
1158 }
1159 }
1160
1161 for _, dupe := range dupes {
1162 delete(stale, dupe) // remove dupes from stale list, see comment below
1163 }
1164
1165 if hasDirtyDupes {
1166 // if we have no duplicates, then the keep set is simply all the 'clean' blockRefs
1167 // in the case we have duplicate dirty references we have to compute
1168 // the keep set by walking the entire repo to check if anything is
1169 // still referencing the dirty block in question
1170
1171 // we could also just add the duplicates to the keep set for now and
1172 // focus on compacting everything else. it leaves *some* dirty blocks
1173 // still around but we're doing that anyways since compaction isn't a
1174 // perfect process
1175
1176 cs.log.Debug("repo has dirty dupes", "count", len(dupes), "uid", user, "staleRefs", len(staleRefs), "blockRefs", len(brefs))
1177
1178 //return nil, fmt.Errorf("WIP: not currently handling this case")
1179 }
1180
1181 keep := make(map[cid.Cid]bool)
1182 for _, br := range brefs {
1183 if !stale[br.Cid.CID] {
1184 keep[br.Cid.CID] = true
1185 }
1186 }
1187
1188 for _, dupe := range dupes {
1189 keep[dupe] = true
1190 }
1191
1192 results := aggrRefs(brefs, shardsById, stale)
1193 var sum int
1194 for _, r := range results {
1195 sum += r.Total
1196 }
1197
1198 lowBound := 20
1199 N := 10
1200 // we want to *aim* for N shards per user
1201 // the last several should be left small to allow easy loading from disk
1202 // for updates (since recent blocks are most likely needed for edits)
1203 // the beginning of the list should be some sort of exponential fall-off
1204 // with the area under the curve targeted by the total number of blocks we
1205 // have
1206 var threshs []int
1207 tot := len(brefs)
1208 for i := 0; i < N; i++ {
1209 v := tot / 2
1210 if v < lowBound {
1211 v = lowBound
1212 }
1213 tot = tot / 2
1214 threshs = append(threshs, v)
1215 }
1216
1217 thresholdForPosition := func(i int) int {
1218 if i >= len(threshs) {
1219 return lowBound
1220 }
1221 return threshs[i]
1222 }
1223
1224 cur := new(compBucket)
1225 cur.expSize = thresholdForPosition(0)
1226 var compactionQueue []*compBucket
1227 for i, r := range results {
1228 cur.addShardStat(r)
1229
1230 if cur.cleanBlocks > cur.expSize || i > len(results)-3 {
1231 compactionQueue = append(compactionQueue, cur)
1232 cur = &compBucket{
1233 expSize: thresholdForPosition(len(compactionQueue)),
1234 }
1235 }
1236 }
1237 if !cur.isEmpty() {
1238 compactionQueue = append(compactionQueue, cur)
1239 }
1240
1241 stats := &CompactionStats{
1242 StartShards: len(shards),
1243 TotalRefs: len(brefs),
1244 }
1245
1246 removedShards := make(map[uint]bool)
1247 for _, b := range compactionQueue {
1248 if !b.shouldCompact() {
1249 stats.SkippedShards += len(b.shards)
1250 continue
1251 }
1252
1253 if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil {
1254 return nil, fmt.Errorf("compact bucket: %w", err)
1255 }
1256
1257 stats.NewShards++
1258
1259 todelete := make([]CarShard, 0, len(b.shards))
1260 for _, s := range b.shards {
1261 removedShards[s.ID] = true
1262 sh, ok := shardsById[s.ID]
1263 if !ok {
1264 return nil, fmt.Errorf("missing shard to delete")
1265 }
1266
1267 todelete = append(todelete, sh)
1268 }
1269
1270 stats.ShardsDeleted += len(todelete)
1271 if err := cs.deleteShards(ctx, todelete); err != nil {
1272 return nil, fmt.Errorf("deleting shards: %w", err)
1273 }
1274 }
1275
1276 // now we need to delete the staleRefs we successfully cleaned up
1277 // we can safely delete a staleRef if all the shards that have blockRefs with matching stale refs were processed
1278 if err := cs.deleteStaleRefs(ctx, user, brefs, staleRefs, removedShards); err != nil {
1279 return nil, fmt.Errorf("delete stale refs: %w", err)
1280 }
1281
1282 stats.DupeCount = len(dupes)
1283
1284 return stats, nil
1285}
1286
1287func (cs *FileCarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error {
1288 ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs")
1289 defer span.End()
1290
1291 brByCid := make(map[cid.Cid][]blockRef)
1292 for _, br := range brefs {
1293 brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br)
1294 }
1295
1296 var staleToKeep []cid.Cid
1297 for _, sr := range staleRefs {
1298 cids, err := sr.getCids()
1299 if err != nil {
1300 return fmt.Errorf("getCids on staleRef failed (%d): %w", sr.ID, err)
1301 }
1302
1303 for _, c := range cids {
1304 brs := brByCid[c]
1305 del := true
1306 for _, br := range brs {
1307 if !removedShards[br.Shard] {
1308 del = false
1309 break
1310 }
1311 }
1312
1313 if !del {
1314 staleToKeep = append(staleToKeep, c)
1315 }
1316 }
1317 }
1318
1319 return cs.meta.SetStaleRef(ctx, uid, staleToKeep)
1320}
1321
1322func (cs *FileCarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error {
1323 ctx, span := otel.Tracer("carstore").Start(ctx, "compactBucket")
1324 defer span.End()
1325
1326 span.SetAttributes(attribute.Int("shards", len(b.shards)))
1327
1328 last := b.shards[len(b.shards)-1]
1329 lastsh := shardsById[last.ID]
1330 fi, path, err := cs.openNewCompactedShardFile(ctx, user, last.Seq)
1331 if err != nil {
1332 return fmt.Errorf("opening new file: %w", err)
1333 }
1334
1335 defer fi.Close()
1336 root := lastsh.Root.CID
1337
1338 hnw, err := WriteCarHeader(fi, root)
1339 if err != nil {
1340 return err
1341 }
1342
1343 offset := hnw
1344 var nbrefs []map[string]any
1345 written := make(map[cid.Cid]bool)
1346 for _, s := range b.shards {
1347 sh := shardsById[s.ID]
1348 if err := cs.iterateShardBlocks(ctx, &sh, func(blk blockformat.Block) error {
1349 if written[blk.Cid()] {
1350 return nil
1351 }
1352
1353 if keep[blk.Cid()] {
1354 nw, err := LdWrite(fi, blk.Cid().Bytes(), blk.RawData())
1355 if err != nil {
1356 return fmt.Errorf("failed to write block: %w", err)
1357 }
1358
1359 nbrefs = append(nbrefs, map[string]interface{}{
1360 "cid": models.DbCID{CID: blk.Cid()},
1361 "offset": offset,
1362 })
1363
1364 offset += nw
1365 written[blk.Cid()] = true
1366 }
1367 return nil
1368 }); err != nil {
1369 // If we ever fail to iterate a shard file because its
1370 // corrupted, just log an error and skip the shard
1371 cs.log.Error("iterating blocks in shard", "shard", s.ID, "err", err, "uid", user)
1372 }
1373 }
1374
1375 shard := CarShard{
1376 Root: models.DbCID{CID: root},
1377 DataStart: hnw,
1378 Seq: lastsh.Seq,
1379 Path: path,
1380 Usr: user,
1381 Rev: lastsh.Rev,
1382 }
1383
1384 if err := cs.putShard(ctx, &shard, nbrefs, nil, true); err != nil {
1385 // if writing the shard fails, we should also delete the file
1386 _ = fi.Close()
1387
1388 if err2 := os.Remove(fi.Name()); err2 != nil {
1389 cs.log.Error("failed to remove shard file after failed db transaction", "path", fi.Name(), "err", err2)
1390 }
1391
1392 return err
1393 }
1394 return nil
1395}