porting all github actions from bluesky-social/indigo to tangled CI
at main 35 kB view raw
1package carstore 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "fmt" 8 "io" 9 "log/slog" 10 "os" 11 "path/filepath" 12 "sort" 13 "sync/atomic" 14 "time" 15 16 "github.com/bluesky-social/indigo/models" 17 "github.com/prometheus/client_golang/prometheus" 18 "github.com/prometheus/client_golang/prometheus/promauto" 19 20 blockformat "github.com/ipfs/go-block-format" 21 "github.com/ipfs/go-cid" 22 blockstore "github.com/ipfs/go-ipfs-blockstore" 23 cbor "github.com/ipfs/go-ipld-cbor" 24 ipld "github.com/ipfs/go-ipld-format" 25 "github.com/ipfs/go-libipfs/blocks" 26 car "github.com/ipld/go-car" 27 carutil "github.com/ipld/go-car/util" 28 cbg "github.com/whyrusleeping/cbor-gen" 29 "go.opentelemetry.io/otel" 30 "go.opentelemetry.io/otel/attribute" 31 "gorm.io/gorm" 32) 33 34var blockGetTotalCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 35 Name: "carstore_block_get_total", 36 Help: "carstore get queries", 37}, []string{"usrskip", "cache"}) 38 39var blockGetTotalCounterUsrskip = blockGetTotalCounter.WithLabelValues("true", "miss") 40var blockGetTotalCounterCached = blockGetTotalCounter.WithLabelValues("false", "hit") 41var blockGetTotalCounterNormal = blockGetTotalCounter.WithLabelValues("false", "miss") 42 43const MaxSliceLength = 2 << 20 44 45const BigShardThreshold = 2 << 20 46 47type CarStore interface { 48 // TODO: not really part of general interface 49 CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) 50 // TODO: not really part of general interface 51 GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) 52 53 GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) 54 GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) 55 ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) 56 NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) 57 ReadOnlySession(user models.Uid) (*DeltaSession, error) 58 ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error 59 Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) 60 WipeUserData(ctx context.Context, user models.Uid) error 61} 62 63type FileCarStore struct { 64 meta *CarStoreGormMeta 65 rootDirs []string 66 67 lastShardCache lastShardCache 68 69 log *slog.Logger 70} 71 72func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) { 73 for _, root := range roots { 74 if _, err := os.Stat(root); err != nil { 75 if !os.IsNotExist(err) { 76 return nil, err 77 } 78 79 if err := os.Mkdir(root, 0775); err != nil { 80 return nil, err 81 } 82 } 83 } 84 if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { 85 return nil, err 86 } 87 if err := meta.AutoMigrate(&staleRef{}); err != nil { 88 return nil, err 89 } 90 91 gormMeta := &CarStoreGormMeta{meta: meta} 92 out := &FileCarStore{ 93 meta: gormMeta, 94 rootDirs: roots, 95 lastShardCache: lastShardCache{ 96 source: gormMeta, 97 }, 98 log: slog.Default().With("system", "carstore"), 99 } 100 out.lastShardCache.Init() 101 return out, nil 102} 103 104// userView needs these things to get into the underlying block store 105// implemented by CarStoreGormMeta 106type userViewSource interface { 107 HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) 108 LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) 109} 110 111// wrapper into a block store that keeps track of which user we are working on behalf of 112type userView struct { 113 cs userViewSource 114 user models.Uid 115 116 cache map[cid.Cid]blockformat.Block 117 prefetch bool 118} 119 120var _ blockstore.Blockstore = (*userView)(nil) 121 122func (uv *userView) HashOnRead(hor bool) { 123 //noop 124} 125 126func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) { 127 _, have := uv.cache[k] 128 if have { 129 return have, nil 130 } 131 return uv.cs.HasUidCid(ctx, uv.user, k) 132} 133 134var CacheHits int64 135var CacheMiss int64 136 137func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, error) { 138 139 if !k.Defined() { 140 return nil, fmt.Errorf("attempted to 'get' undefined cid") 141 } 142 if uv.cache != nil { 143 blk, ok := uv.cache[k] 144 if ok { 145 blockGetTotalCounterCached.Add(1) 146 atomic.AddInt64(&CacheHits, 1) 147 148 return blk, nil 149 } 150 } 151 atomic.AddInt64(&CacheMiss, 1) 152 153 path, offset, user, err := uv.cs.LookupBlockRef(ctx, k) 154 if err != nil { 155 return nil, err 156 } 157 if path == "" { 158 return nil, ipld.ErrNotFound{Cid: k} 159 } 160 161 prefetch := uv.prefetch 162 if user != uv.user { 163 blockGetTotalCounterUsrskip.Add(1) 164 prefetch = false 165 } else { 166 blockGetTotalCounterNormal.Add(1) 167 } 168 169 if prefetch { 170 return uv.prefetchRead(ctx, k, path, offset) 171 } else { 172 return uv.singleRead(ctx, k, path, offset) 173 } 174} 175 176const prefetchThreshold = 512 << 10 177 178func (uv *userView) prefetchRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) { 179 ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard") 180 defer span.End() 181 182 fi, err := os.Open(path) 183 if err != nil { 184 return nil, err 185 } 186 defer fi.Close() 187 188 st, err := fi.Stat() 189 if err != nil { 190 return nil, fmt.Errorf("stat file for prefetch: %w", err) 191 } 192 193 span.SetAttributes(attribute.Int64("shard_size", st.Size())) 194 195 if st.Size() > prefetchThreshold { 196 span.SetAttributes(attribute.Bool("no_prefetch", true)) 197 return doBlockRead(fi, k, offset) 198 } 199 200 cr, err := car.NewCarReader(fi) 201 if err != nil { 202 return nil, err 203 } 204 205 for { 206 blk, err := cr.Next() 207 if err != nil { 208 if err == io.EOF { 209 break 210 } 211 return nil, err 212 } 213 214 uv.cache[blk.Cid()] = blk 215 } 216 217 outblk, ok := uv.cache[k] 218 if !ok { 219 return nil, fmt.Errorf("requested block was not found in car slice") 220 } 221 222 return outblk, nil 223} 224 225func (uv *userView) singleRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) { 226 fi, err := os.Open(path) 227 if err != nil { 228 return nil, err 229 } 230 defer fi.Close() 231 232 return doBlockRead(fi, k, offset) 233} 234 235func doBlockRead(fi *os.File, k cid.Cid, offset int64) (blockformat.Block, error) { 236 seeked, err := fi.Seek(offset, io.SeekStart) 237 if err != nil { 238 return nil, err 239 } 240 241 if seeked != offset { 242 return nil, fmt.Errorf("failed to seek to offset (%d != %d)", seeked, offset) 243 } 244 245 bufr := bufio.NewReader(fi) 246 rcid, data, err := carutil.ReadNode(bufr) 247 if err != nil { 248 return nil, err 249 } 250 251 if rcid != k { 252 return nil, fmt.Errorf("mismatch in cid on disk: %s != %s", rcid, k) 253 } 254 255 return blocks.NewBlockWithCid(data, rcid) 256} 257 258func (uv *userView) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 259 return nil, fmt.Errorf("not implemented") 260} 261 262func (uv *userView) Put(ctx context.Context, blk blockformat.Block) error { 263 return fmt.Errorf("puts not supported to car view blockstores") 264} 265 266func (uv *userView) PutMany(ctx context.Context, blks []blockformat.Block) error { 267 return fmt.Errorf("puts not supported to car view blockstores") 268} 269 270func (uv *userView) DeleteBlock(ctx context.Context, k cid.Cid) error { 271 return fmt.Errorf("deletes not supported to car view blockstore") 272} 273 274func (uv *userView) GetSize(ctx context.Context, k cid.Cid) (int, error) { 275 // TODO: maybe block size should be in the database record... 276 blk, err := uv.Get(ctx, k) 277 if err != nil { 278 return 0, err 279 } 280 281 return len(blk.RawData()), nil 282} 283 284// subset of blockstore.Blockstore that we actually use here 285type minBlockstore interface { 286 Get(ctx context.Context, bcid cid.Cid) (blockformat.Block, error) 287 Has(ctx context.Context, bcid cid.Cid) (bool, error) 288 GetSize(ctx context.Context, bcid cid.Cid) (int, error) 289} 290 291type DeltaSession struct { 292 blks map[cid.Cid]blockformat.Block 293 rmcids map[cid.Cid]bool 294 base minBlockstore 295 user models.Uid 296 baseCid cid.Cid 297 seq int 298 readonly bool 299 cs shardWriter 300 lastRev string 301} 302 303func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard { 304 return cs.lastShardCache.check(user) 305} 306 307func (cs *FileCarStore) removeLastShardCache(user models.Uid) { 308 cs.lastShardCache.remove(user) 309} 310 311func (cs *FileCarStore) putLastShardCache(ls *CarShard) { 312 cs.lastShardCache.put(ls) 313} 314 315func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { 316 return cs.lastShardCache.get(ctx, user) 317} 318 319var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head") 320 321func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { 322 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") 323 defer span.End() 324 325 // TODO: ensure that we don't write updates on top of the wrong head 326 // this needs to be a compare and swap type operation 327 lastShard, err := cs.getLastShard(ctx, user) 328 if err != nil { 329 return nil, err 330 } 331 332 if since != nil && *since != lastShard.Rev { 333 return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) 334 } 335 336 return &DeltaSession{ 337 blks: make(map[cid.Cid]blockformat.Block), 338 base: &userView{ 339 user: user, 340 cs: cs.meta, 341 prefetch: true, 342 cache: make(map[cid.Cid]blockformat.Block), 343 }, 344 user: user, 345 baseCid: lastShard.Root.CID, 346 cs: cs, 347 seq: lastShard.Seq + 1, 348 lastRev: lastShard.Rev, 349 }, nil 350} 351 352func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { 353 return &DeltaSession{ 354 base: &userView{ 355 user: user, 356 cs: cs.meta, 357 prefetch: false, 358 cache: make(map[cid.Cid]blockformat.Block), 359 }, 360 readonly: true, 361 user: user, 362 cs: cs, 363 }, nil 364} 365 366// TODO: incremental is only ever called true, remove the param 367func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { 368 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") 369 defer span.End() 370 371 var earlySeq int 372 if sinceRev != "" { 373 var err error 374 earlySeq, err = cs.meta.SeqForRev(ctx, user, sinceRev) 375 if err != nil { 376 return err 377 } 378 } 379 380 shards, err := cs.meta.GetUserShardsDesc(ctx, user, earlySeq) 381 if err != nil { 382 return err 383 } 384 385 // TODO: incremental is only ever called true, so this is fine and we can remove the error check 386 if !incremental && earlySeq > 0 { 387 // have to do it the ugly way 388 return fmt.Errorf("nyi") 389 } 390 391 if len(shards) == 0 { 392 return fmt.Errorf("no data found for user %d", user) 393 } 394 395 // fast path! 396 if err := car.WriteHeader(&car.CarHeader{ 397 Roots: []cid.Cid{shards[0].Root.CID}, 398 Version: 1, 399 }, shardOut); err != nil { 400 return err 401 } 402 403 for _, sh := range shards { 404 if err := cs.writeShardBlocks(ctx, &sh, shardOut); err != nil { 405 return err 406 } 407 } 408 409 return nil 410} 411 412// inner loop part of ReadUserCar 413// copy shard blocks from disk to Writer 414func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, shardOut io.Writer) error { 415 ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks") 416 defer span.End() 417 418 fi, err := os.Open(sh.Path) 419 if err != nil { 420 return err 421 } 422 defer fi.Close() 423 424 _, err = fi.Seek(sh.DataStart, io.SeekStart) 425 if err != nil { 426 return err 427 } 428 429 _, err = io.Copy(shardOut, fi) 430 if err != nil { 431 return err 432 } 433 434 return nil 435} 436 437// inner loop part of compactBucket 438func (cs *FileCarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error { 439 fi, err := os.Open(sh.Path) 440 if err != nil { 441 return err 442 } 443 defer fi.Close() 444 445 rr, err := car.NewCarReader(fi) 446 if err != nil { 447 return fmt.Errorf("opening shard car: %w", err) 448 } 449 450 for { 451 blk, err := rr.Next() 452 if err != nil { 453 if err == io.EOF { 454 return nil 455 } 456 return err 457 } 458 459 if err := cb(blk); err != nil { 460 return err 461 } 462 } 463} 464 465var _ blockstore.Blockstore = (*DeltaSession)(nil) 466 467func (ds *DeltaSession) BaseCid() cid.Cid { 468 return ds.baseCid 469} 470 471func (ds *DeltaSession) Put(ctx context.Context, b blockformat.Block) error { 472 if ds.readonly { 473 return fmt.Errorf("cannot write to readonly deltaSession") 474 } 475 ds.blks[b.Cid()] = b 476 return nil 477} 478 479func (ds *DeltaSession) PutMany(ctx context.Context, bs []blockformat.Block) error { 480 if ds.readonly { 481 return fmt.Errorf("cannot write to readonly deltaSession") 482 } 483 484 for _, b := range bs { 485 ds.blks[b.Cid()] = b 486 } 487 return nil 488} 489 490func (ds *DeltaSession) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { 491 return nil, fmt.Errorf("AllKeysChan not implemented") 492} 493 494func (ds *DeltaSession) DeleteBlock(ctx context.Context, c cid.Cid) error { 495 if ds.readonly { 496 return fmt.Errorf("cannot write to readonly deltaSession") 497 } 498 499 if _, ok := ds.blks[c]; !ok { 500 return ipld.ErrNotFound{Cid: c} 501 } 502 503 delete(ds.blks, c) 504 return nil 505} 506 507func (ds *DeltaSession) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { 508 b, ok := ds.blks[c] 509 if ok { 510 return b, nil 511 } 512 513 return ds.base.Get(ctx, c) 514} 515 516func (ds *DeltaSession) Has(ctx context.Context, c cid.Cid) (bool, error) { 517 _, ok := ds.blks[c] 518 if ok { 519 return true, nil 520 } 521 522 return ds.base.Has(ctx, c) 523} 524 525func (ds *DeltaSession) HashOnRead(hor bool) { 526 // noop? 527} 528 529func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error) { 530 b, ok := ds.blks[c] 531 if ok { 532 return len(b.RawData()), nil 533 } 534 535 return ds.base.GetSize(ctx, c) 536} 537 538func fnameForShard(user models.Uid, seq int) string { 539 return fmt.Sprintf("sh-%d-%d", user, seq) 540} 541 542func (cs *FileCarStore) dirForUser(user models.Uid) string { 543 return cs.rootDirs[int(user)%len(cs.rootDirs)] 544} 545 546func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 547 // TODO: some overwrite protections 548 fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq)) 549 fi, err := os.Create(fname) 550 if err != nil { 551 return nil, "", err 552 } 553 554 return fi, fname, nil 555} 556 557func (cs *FileCarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) { 558 _, span := otel.Tracer("carstore").Start(ctx, "writeNewShardFile") 559 defer span.End() 560 561 // TODO: some overwrite protections 562 fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq)) 563 if err := os.WriteFile(fname, data, 0664); err != nil { 564 return "", err 565 } 566 567 return fname, nil 568} 569 570func (cs *FileCarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { 571 return os.Remove(sh.Path) 572} 573 574// CloseWithRoot writes all new blocks in a car file to the writer with the 575// given cid as the 'root' 576func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error) { 577 ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot") 578 defer span.End() 579 580 if ds.readonly { 581 return nil, fmt.Errorf("cannot write to readonly deltaSession") 582 } 583 584 return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) 585} 586 587func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { 588 h := &car.CarHeader{ 589 Roots: []cid.Cid{root}, 590 Version: 1, 591 } 592 hb, err := cbor.DumpObject(h) 593 if err != nil { 594 return 0, err 595 } 596 597 hnw, err := LdWrite(w, hb) 598 if err != nil { 599 return 0, err 600 } 601 602 return hnw, nil 603} 604 605// shardWriter.writeNewShard called from inside DeltaSession.CloseWithRoot 606type shardWriter interface { 607 // writeNewShard stores blocks in `blks` arg and creates a new shard to propagate out to our firehose 608 writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) 609} 610 611func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) { 612 buf := new(bytes.Buffer) 613 _, err := WriteCarHeader(buf, root) 614 if err != nil { 615 return nil, fmt.Errorf("failed to write car header: %w", err) 616 } 617 618 for k, blk := range blks { 619 _, err := LdWrite(buf, k.Bytes(), blk.RawData()) 620 if err != nil { 621 return nil, fmt.Errorf("failed to write block: %w", err) 622 } 623 } 624 625 return buf.Bytes(), nil 626} 627 628func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 629 630 buf := new(bytes.Buffer) 631 hnw, err := WriteCarHeader(buf, root) 632 if err != nil { 633 return nil, fmt.Errorf("failed to write car header: %w", err) 634 } 635 636 // TODO: writing these blocks in map traversal order is bad, I believe the 637 // optimal ordering will be something like reverse-write-order, but random 638 // is definitely not it 639 640 offset := hnw 641 //brefs := make([]*blockRef, 0, len(ds.blks)) 642 brefs := make([]map[string]interface{}, 0, len(blks)) 643 for k, blk := range blks { 644 nw, err := LdWrite(buf, k.Bytes(), blk.RawData()) 645 if err != nil { 646 return nil, fmt.Errorf("failed to write block: %w", err) 647 } 648 649 /* 650 brefs = append(brefs, &blockRef{ 651 Cid: k.String(), 652 Offset: offset, 653 Shard: shard.ID, 654 }) 655 */ 656 // adding things to the db by map is the only way to get gorm to not 657 // add the 'returning' clause, which costs a lot of time 658 brefs = append(brefs, map[string]interface{}{ 659 "cid": models.DbCID{CID: k}, 660 "offset": offset, 661 }) 662 663 offset += nw 664 } 665 666 start := time.Now() 667 path, err := cs.writeNewShardFile(ctx, user, seq, buf.Bytes()) 668 if err != nil { 669 return nil, fmt.Errorf("failed to write shard file: %w", err) 670 } 671 writeShardFileDuration.Observe(time.Since(start).Seconds()) 672 673 shard := CarShard{ 674 Root: models.DbCID{CID: root}, 675 DataStart: hnw, 676 Seq: seq, 677 Path: path, 678 Usr: user, 679 Rev: rev, 680 } 681 682 start = time.Now() 683 if err := cs.putShard(ctx, &shard, brefs, rmcids, false); err != nil { 684 return nil, err 685 } 686 writeShardMetadataDuration.Observe(time.Since(start).Seconds()) 687 688 return buf.Bytes(), nil 689} 690 691func (cs *FileCarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error { 692 ctx, span := otel.Tracer("carstore").Start(ctx, "putShard") 693 defer span.End() 694 695 err := cs.meta.PutShardAndRefs(ctx, shard, brefs, rmcids) 696 if err != nil { 697 return err 698 } 699 700 if !nocache { 701 cs.putLastShardCache(shard) 702 } 703 704 return nil 705} 706 707func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipcids map[cid.Cid]bool) (map[cid.Cid]bool, error) { 708 ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff") 709 defer span.End() 710 711 if !oldroot.Defined() { 712 return map[cid.Cid]bool{}, nil 713 } 714 715 // walk the entire 'new' portion of the tree, marking all referenced cids as 'keep' 716 keepset := make(map[cid.Cid]bool) 717 for c := range newcids { 718 keepset[c] = true 719 oblk, err := bs.Get(ctx, c) 720 if err != nil { 721 return nil, fmt.Errorf("get failed in new tree: %w", err) 722 } 723 724 if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { 725 keepset[lnk] = true 726 }); err != nil { 727 return nil, err 728 } 729 } 730 731 if keepset[oldroot] { 732 // this should probably never happen, but is technically correct 733 return nil, nil 734 } 735 736 // next, walk the old tree from the root, recursing on cids *not* in the keepset. 737 dropset := make(map[cid.Cid]bool) 738 dropset[oldroot] = true 739 queue := []cid.Cid{oldroot} 740 741 for len(queue) > 0 { 742 c := queue[0] 743 queue = queue[1:] 744 745 if skipcids != nil && skipcids[c] { 746 continue 747 } 748 749 oblk, err := bs.Get(ctx, c) 750 if err != nil { 751 return nil, fmt.Errorf("get failed in old tree: %w", err) 752 } 753 754 if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { 755 if lnk.Prefix().Codec != cid.DagCBOR { 756 return 757 } 758 759 if !keepset[lnk] { 760 dropset[lnk] = true 761 queue = append(queue, lnk) 762 } 763 }); err != nil { 764 return nil, err 765 } 766 } 767 768 return dropset, nil 769} 770 771func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { 772 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") 773 defer span.End() 774 775 carr, err := car.NewCarReader(bytes.NewReader(carslice)) 776 if err != nil { 777 return cid.Undef, nil, err 778 } 779 780 if len(carr.Header.Roots) != 1 { 781 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) 782 } 783 784 ds, err := cs.NewDeltaSession(ctx, uid, since) 785 if err != nil { 786 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) 787 } 788 789 var cids []cid.Cid 790 for { 791 blk, err := carr.Next() 792 if err != nil { 793 if err == io.EOF { 794 break 795 } 796 return cid.Undef, nil, err 797 } 798 799 cids = append(cids, blk.Cid()) 800 801 if err := ds.Put(ctx, blk); err != nil { 802 return cid.Undef, nil, err 803 } 804 } 805 806 return carr.Header.Roots[0], ds, nil 807} 808 809func (ds *DeltaSession) CalcDiff(ctx context.Context, skipcids map[cid.Cid]bool) error { 810 rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks, skipcids) 811 if err != nil { 812 return fmt.Errorf("block diff failed (base=%s,rev=%s): %w", ds.baseCid, ds.lastRev, err) 813 } 814 815 ds.rmcids = rmcids 816 return nil 817} 818 819func (cs *FileCarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 820 lastShard, err := cs.getLastShard(ctx, user) 821 if err != nil { 822 return cid.Undef, err 823 } 824 if lastShard.ID == 0 { 825 return cid.Undef, nil 826 } 827 828 return lastShard.Root.CID, nil 829} 830 831func (cs *FileCarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 832 lastShard, err := cs.getLastShard(ctx, user) 833 if err != nil { 834 return "", err 835 } 836 if lastShard.ID == 0 { 837 return "", nil 838 } 839 840 return lastShard.Rev, nil 841} 842 843type UserStat struct { 844 Seq int 845 Root string 846 Created time.Time 847} 848 849func (cs *FileCarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { 850 shards, err := cs.meta.GetUserShards(ctx, usr) 851 if err != nil { 852 return nil, err 853 } 854 855 var out []UserStat 856 for _, s := range shards { 857 out = append(out, UserStat{ 858 Seq: s.Seq, 859 Root: s.Root.CID.String(), 860 Created: s.CreatedAt, 861 }) 862 } 863 864 return out, nil 865} 866 867func (cs *FileCarStore) WipeUserData(ctx context.Context, user models.Uid) error { 868 shards, err := cs.meta.GetUserShards(ctx, user) 869 if err != nil { 870 return err 871 } 872 873 if err := cs.deleteShards(ctx, shards); err != nil { 874 if !os.IsNotExist(err) { 875 return err 876 } 877 } 878 879 cs.removeLastShardCache(user) 880 881 return nil 882} 883 884func (cs *FileCarStore) deleteShards(ctx context.Context, shs []CarShard) error { 885 ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards") 886 defer span.End() 887 888 deleteSlice := func(ctx context.Context, subs []CarShard) error { 889 ids := make([]uint, len(subs)) 890 for i, sh := range subs { 891 ids[i] = sh.ID 892 } 893 894 err := cs.meta.DeleteShardsAndRefs(ctx, ids) 895 if err != nil { 896 return err 897 } 898 899 for _, sh := range subs { 900 if err := cs.deleteShardFile(ctx, &sh); err != nil { 901 if !os.IsNotExist(err) { 902 return err 903 } 904 cs.log.Warn("shard file we tried to delete did not exist", "shard", sh.ID, "path", sh.Path) 905 } 906 } 907 908 return nil 909 } 910 911 chunkSize := 2000 912 for i := 0; i < len(shs); i += chunkSize { 913 sl := shs[i:] 914 if len(sl) > chunkSize { 915 sl = sl[:chunkSize] 916 } 917 918 if err := deleteSlice(ctx, sl); err != nil { 919 return err 920 } 921 } 922 923 return nil 924} 925 926type shardStat struct { 927 ID uint 928 Dirty int 929 Seq int 930 Total int 931 932 refs []blockRef 933} 934 935func (s shardStat) dirtyFrac() float64 { 936 return float64(s.Dirty) / float64(s.Total) 937} 938 939func aggrRefs(brefs []blockRef, shards map[uint]CarShard, staleCids map[cid.Cid]bool) []shardStat { 940 byId := make(map[uint]*shardStat) 941 942 for _, br := range brefs { 943 s, ok := byId[br.Shard] 944 if !ok { 945 s = &shardStat{ 946 ID: br.Shard, 947 Seq: shards[br.Shard].Seq, 948 } 949 byId[br.Shard] = s 950 } 951 952 s.Total++ 953 if staleCids[br.Cid.CID] { 954 s.Dirty++ 955 } 956 957 s.refs = append(s.refs, br) 958 } 959 960 var out []shardStat 961 for _, s := range byId { 962 out = append(out, *s) 963 } 964 965 sort.Slice(out, func(i, j int) bool { 966 return out[i].Seq < out[j].Seq 967 }) 968 969 return out 970} 971 972type compBucket struct { 973 shards []shardStat 974 975 cleanBlocks int 976 expSize int 977} 978 979func (cb *compBucket) shouldCompact() bool { 980 if len(cb.shards) == 0 { 981 return false 982 } 983 984 if len(cb.shards) > 5 { 985 return true 986 } 987 988 var frac float64 989 for _, s := range cb.shards { 990 frac += s.dirtyFrac() 991 } 992 frac /= float64(len(cb.shards)) 993 994 if len(cb.shards) > 3 && frac > 0.2 { 995 return true 996 } 997 998 return frac > 0.4 999} 1000 1001func (cb *compBucket) addShardStat(ss shardStat) { 1002 cb.cleanBlocks += (ss.Total - ss.Dirty) 1003 cb.shards = append(cb.shards, ss) 1004} 1005 1006func (cb *compBucket) isEmpty() bool { 1007 return len(cb.shards) == 0 1008} 1009 1010func (cs *FileCarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 1011 // TODO: some overwrite protections 1012 // NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary". 1013 // This creates "sh-%d-%d%s" with some random stuff in the last position 1014 fi, err := os.CreateTemp(cs.dirForUser(user), fnameForShard(user, seq)) 1015 if err != nil { 1016 return nil, "", err 1017 } 1018 1019 return fi, fi.Name(), nil 1020} 1021 1022type CompactionTarget struct { 1023 Usr models.Uid 1024 NumShards int 1025} 1026 1027func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { 1028 ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets") 1029 defer span.End() 1030 1031 return cs.meta.GetCompactionTargets(ctx, shardCount) 1032} 1033 1034// getBlockRefsForShards is a prep function for CompactUserShards 1035func (cs *FileCarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) { 1036 ctx, span := otel.Tracer("carstore").Start(ctx, "getBlockRefsForShards") 1037 defer span.End() 1038 1039 span.SetAttributes(attribute.Int("shards", len(shardIds))) 1040 1041 out, err := cs.meta.GetBlockRefsForShards(ctx, shardIds) 1042 if err != nil { 1043 return nil, err 1044 } 1045 1046 span.SetAttributes(attribute.Int("refs", len(out))) 1047 1048 return out, nil 1049} 1050 1051func shardSize(sh *CarShard) (int64, error) { 1052 st, err := os.Stat(sh.Path) 1053 if err != nil { 1054 if os.IsNotExist(err) { 1055 slog.Warn("missing shard, return size of zero", "path", sh.Path, "shard", sh.ID, "system", "carstore") 1056 return 0, nil 1057 } 1058 return 0, fmt.Errorf("stat %q: %w", sh.Path, err) 1059 } 1060 1061 return st.Size(), nil 1062} 1063 1064type CompactionStats struct { 1065 TotalRefs int `json:"totalRefs"` 1066 StartShards int `json:"startShards"` 1067 NewShards int `json:"newShards"` 1068 SkippedShards int `json:"skippedShards"` 1069 ShardsDeleted int `json:"shardsDeleted"` 1070 DupeCount int `json:"dupeCount"` 1071} 1072 1073func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { 1074 ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") 1075 defer span.End() 1076 1077 span.SetAttributes(attribute.Int64("user", int64(user))) 1078 1079 shards, err := cs.meta.GetUserShards(ctx, user) 1080 if err != nil { 1081 return nil, err 1082 } 1083 1084 if skipBigShards { 1085 // Since we generally expect shards to start bigger and get smaller, 1086 // and because we want to avoid compacting non-adjacent shards 1087 // together, and because we want to avoid running a stat on every 1088 // single shard (can be expensive for repos that haven't been compacted 1089 // in a while) we only skip a prefix of shard files that are over the 1090 // threshold. this may end up not skipping some shards that are over 1091 // the threshold if a below-threshold shard occurs before them, but 1092 // since this is a heuristic and imperfect optimization, that is 1093 // acceptable. 1094 var skip int 1095 for i, sh := range shards { 1096 size, err := shardSize(&sh) 1097 if err != nil { 1098 return nil, fmt.Errorf("could not check size of shard file: %w", err) 1099 } 1100 1101 if size > BigShardThreshold { 1102 skip = i + 1 1103 } else { 1104 break 1105 } 1106 } 1107 shards = shards[skip:] 1108 } 1109 1110 span.SetAttributes(attribute.Int("shards", len(shards))) 1111 1112 var shardIds []uint 1113 for _, s := range shards { 1114 shardIds = append(shardIds, s.ID) 1115 } 1116 1117 shardsById := make(map[uint]CarShard) 1118 for _, s := range shards { 1119 shardsById[s.ID] = s 1120 } 1121 1122 brefs, err := cs.getBlockRefsForShards(ctx, shardIds) 1123 if err != nil { 1124 return nil, fmt.Errorf("getting block refs failed: %w", err) 1125 } 1126 1127 span.SetAttributes(attribute.Int("blockRefs", len(brefs))) 1128 1129 staleRefs, err := cs.meta.GetUserStaleRefs(ctx, user) 1130 if err != nil { 1131 return nil, err 1132 } 1133 1134 span.SetAttributes(attribute.Int("staleRefs", len(staleRefs))) 1135 1136 stale := make(map[cid.Cid]bool) 1137 for _, br := range staleRefs { 1138 cids, err := br.getCids() 1139 if err != nil { 1140 return nil, fmt.Errorf("failed to unpack cids from staleRefs record (%d): %w", br.ID, err) 1141 } 1142 for _, c := range cids { 1143 stale[c] = true 1144 } 1145 } 1146 1147 // if we have a staleRef that references multiple blockRefs, we consider that block a 'dirty duplicate' 1148 var dupes []cid.Cid 1149 var hasDirtyDupes bool 1150 seenBlocks := make(map[cid.Cid]bool) 1151 for _, br := range brefs { 1152 if seenBlocks[br.Cid.CID] { 1153 dupes = append(dupes, br.Cid.CID) 1154 hasDirtyDupes = true 1155 delete(stale, br.Cid.CID) 1156 } else { 1157 seenBlocks[br.Cid.CID] = true 1158 } 1159 } 1160 1161 for _, dupe := range dupes { 1162 delete(stale, dupe) // remove dupes from stale list, see comment below 1163 } 1164 1165 if hasDirtyDupes { 1166 // if we have no duplicates, then the keep set is simply all the 'clean' blockRefs 1167 // in the case we have duplicate dirty references we have to compute 1168 // the keep set by walking the entire repo to check if anything is 1169 // still referencing the dirty block in question 1170 1171 // we could also just add the duplicates to the keep set for now and 1172 // focus on compacting everything else. it leaves *some* dirty blocks 1173 // still around but we're doing that anyways since compaction isn't a 1174 // perfect process 1175 1176 cs.log.Debug("repo has dirty dupes", "count", len(dupes), "uid", user, "staleRefs", len(staleRefs), "blockRefs", len(brefs)) 1177 1178 //return nil, fmt.Errorf("WIP: not currently handling this case") 1179 } 1180 1181 keep := make(map[cid.Cid]bool) 1182 for _, br := range brefs { 1183 if !stale[br.Cid.CID] { 1184 keep[br.Cid.CID] = true 1185 } 1186 } 1187 1188 for _, dupe := range dupes { 1189 keep[dupe] = true 1190 } 1191 1192 results := aggrRefs(brefs, shardsById, stale) 1193 var sum int 1194 for _, r := range results { 1195 sum += r.Total 1196 } 1197 1198 lowBound := 20 1199 N := 10 1200 // we want to *aim* for N shards per user 1201 // the last several should be left small to allow easy loading from disk 1202 // for updates (since recent blocks are most likely needed for edits) 1203 // the beginning of the list should be some sort of exponential fall-off 1204 // with the area under the curve targeted by the total number of blocks we 1205 // have 1206 var threshs []int 1207 tot := len(brefs) 1208 for i := 0; i < N; i++ { 1209 v := tot / 2 1210 if v < lowBound { 1211 v = lowBound 1212 } 1213 tot = tot / 2 1214 threshs = append(threshs, v) 1215 } 1216 1217 thresholdForPosition := func(i int) int { 1218 if i >= len(threshs) { 1219 return lowBound 1220 } 1221 return threshs[i] 1222 } 1223 1224 cur := new(compBucket) 1225 cur.expSize = thresholdForPosition(0) 1226 var compactionQueue []*compBucket 1227 for i, r := range results { 1228 cur.addShardStat(r) 1229 1230 if cur.cleanBlocks > cur.expSize || i > len(results)-3 { 1231 compactionQueue = append(compactionQueue, cur) 1232 cur = &compBucket{ 1233 expSize: thresholdForPosition(len(compactionQueue)), 1234 } 1235 } 1236 } 1237 if !cur.isEmpty() { 1238 compactionQueue = append(compactionQueue, cur) 1239 } 1240 1241 stats := &CompactionStats{ 1242 StartShards: len(shards), 1243 TotalRefs: len(brefs), 1244 } 1245 1246 removedShards := make(map[uint]bool) 1247 for _, b := range compactionQueue { 1248 if !b.shouldCompact() { 1249 stats.SkippedShards += len(b.shards) 1250 continue 1251 } 1252 1253 if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { 1254 return nil, fmt.Errorf("compact bucket: %w", err) 1255 } 1256 1257 stats.NewShards++ 1258 1259 todelete := make([]CarShard, 0, len(b.shards)) 1260 for _, s := range b.shards { 1261 removedShards[s.ID] = true 1262 sh, ok := shardsById[s.ID] 1263 if !ok { 1264 return nil, fmt.Errorf("missing shard to delete") 1265 } 1266 1267 todelete = append(todelete, sh) 1268 } 1269 1270 stats.ShardsDeleted += len(todelete) 1271 if err := cs.deleteShards(ctx, todelete); err != nil { 1272 return nil, fmt.Errorf("deleting shards: %w", err) 1273 } 1274 } 1275 1276 // now we need to delete the staleRefs we successfully cleaned up 1277 // we can safely delete a staleRef if all the shards that have blockRefs with matching stale refs were processed 1278 if err := cs.deleteStaleRefs(ctx, user, brefs, staleRefs, removedShards); err != nil { 1279 return nil, fmt.Errorf("delete stale refs: %w", err) 1280 } 1281 1282 stats.DupeCount = len(dupes) 1283 1284 return stats, nil 1285} 1286 1287func (cs *FileCarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { 1288 ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs") 1289 defer span.End() 1290 1291 brByCid := make(map[cid.Cid][]blockRef) 1292 for _, br := range brefs { 1293 brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br) 1294 } 1295 1296 var staleToKeep []cid.Cid 1297 for _, sr := range staleRefs { 1298 cids, err := sr.getCids() 1299 if err != nil { 1300 return fmt.Errorf("getCids on staleRef failed (%d): %w", sr.ID, err) 1301 } 1302 1303 for _, c := range cids { 1304 brs := brByCid[c] 1305 del := true 1306 for _, br := range brs { 1307 if !removedShards[br.Shard] { 1308 del = false 1309 break 1310 } 1311 } 1312 1313 if !del { 1314 staleToKeep = append(staleToKeep, c) 1315 } 1316 } 1317 } 1318 1319 return cs.meta.SetStaleRef(ctx, uid, staleToKeep) 1320} 1321 1322func (cs *FileCarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { 1323 ctx, span := otel.Tracer("carstore").Start(ctx, "compactBucket") 1324 defer span.End() 1325 1326 span.SetAttributes(attribute.Int("shards", len(b.shards))) 1327 1328 last := b.shards[len(b.shards)-1] 1329 lastsh := shardsById[last.ID] 1330 fi, path, err := cs.openNewCompactedShardFile(ctx, user, last.Seq) 1331 if err != nil { 1332 return fmt.Errorf("opening new file: %w", err) 1333 } 1334 1335 defer fi.Close() 1336 root := lastsh.Root.CID 1337 1338 hnw, err := WriteCarHeader(fi, root) 1339 if err != nil { 1340 return err 1341 } 1342 1343 offset := hnw 1344 var nbrefs []map[string]any 1345 written := make(map[cid.Cid]bool) 1346 for _, s := range b.shards { 1347 sh := shardsById[s.ID] 1348 if err := cs.iterateShardBlocks(ctx, &sh, func(blk blockformat.Block) error { 1349 if written[blk.Cid()] { 1350 return nil 1351 } 1352 1353 if keep[blk.Cid()] { 1354 nw, err := LdWrite(fi, blk.Cid().Bytes(), blk.RawData()) 1355 if err != nil { 1356 return fmt.Errorf("failed to write block: %w", err) 1357 } 1358 1359 nbrefs = append(nbrefs, map[string]interface{}{ 1360 "cid": models.DbCID{CID: blk.Cid()}, 1361 "offset": offset, 1362 }) 1363 1364 offset += nw 1365 written[blk.Cid()] = true 1366 } 1367 return nil 1368 }); err != nil { 1369 // If we ever fail to iterate a shard file because its 1370 // corrupted, just log an error and skip the shard 1371 cs.log.Error("iterating blocks in shard", "shard", s.ID, "err", err, "uid", user) 1372 } 1373 } 1374 1375 shard := CarShard{ 1376 Root: models.DbCID{CID: root}, 1377 DataStart: hnw, 1378 Seq: lastsh.Seq, 1379 Path: path, 1380 Usr: user, 1381 Rev: lastsh.Rev, 1382 } 1383 1384 if err := cs.putShard(ctx, &shard, nbrefs, nil, true); err != nil { 1385 // if writing the shard fails, we should also delete the file 1386 _ = fi.Close() 1387 1388 if err2 := os.Remove(fi.Name()); err2 != nil { 1389 cs.log.Error("failed to remove shard file after failed db transaction", "path", fi.Name(), "err", err2) 1390 } 1391 1392 return err 1393 } 1394 return nil 1395}