fork of indigo with slightly nicer lexgen
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at fdf4607cbe47b0432ea2fae243fe7613c28a827a 279 lines 7.2 kB view raw
1package carstore 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 ipld "github.com/ipfs/go-ipld-format" 8 "io" 9 "log/slog" 10 "sync" 11 12 "github.com/bluesky-social/indigo/models" 13 blockformat "github.com/ipfs/go-block-format" 14 "github.com/ipfs/go-cid" 15 car "github.com/ipld/go-car" 16 "go.opentelemetry.io/otel" 17 "gorm.io/gorm" 18 "gorm.io/gorm/clause" 19) 20 21type NonArchivalCarstore struct { 22 db *gorm.DB 23 24 lk sync.Mutex 25 lastCommitCache map[models.Uid]*commitRefInfo 26 27 log *slog.Logger 28} 29 30func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) { 31 if err := db.AutoMigrate(&commitRefInfo{}); err != nil { 32 return nil, err 33 } 34 35 return &NonArchivalCarstore{ 36 db: db, 37 lastCommitCache: make(map[models.Uid]*commitRefInfo), 38 log: slog.Default().With("system", "carstorena"), 39 }, nil 40} 41 42type commitRefInfo struct { 43 ID uint `gorm:"primarykey"` 44 Uid models.Uid `gorm:"uniqueIndex"` 45 Rev string 46 Root models.DbCID 47} 48 49func (cs *NonArchivalCarstore) checkLastShardCache(user models.Uid) *commitRefInfo { 50 cs.lk.Lock() 51 defer cs.lk.Unlock() 52 53 ls, ok := cs.lastCommitCache[user] 54 if ok { 55 return ls 56 } 57 58 return nil 59} 60 61func (cs *NonArchivalCarstore) removeLastShardCache(user models.Uid) { 62 cs.lk.Lock() 63 defer cs.lk.Unlock() 64 65 delete(cs.lastCommitCache, user) 66} 67 68func (cs *NonArchivalCarstore) putLastShardCache(ls *commitRefInfo) { 69 cs.lk.Lock() 70 defer cs.lk.Unlock() 71 72 cs.lastCommitCache[ls.Uid] = ls 73} 74 75func (cs *NonArchivalCarstore) loadCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) { 76 var out commitRefInfo 77 wat := cs.db.Find(&out, "uid = ?", user) 78 if wat.Error != nil { 79 return nil, wat.Error 80 } 81 if wat.RowsAffected == 0 { 82 return nil, nil 83 } 84 return &out, nil 85} 86 87func (cs *NonArchivalCarstore) getCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) { 88 ctx, span := otel.Tracer("carstore").Start(ctx, "getCommitRefInfo") 89 defer span.End() 90 91 maybeLs := cs.checkLastShardCache(user) 92 if maybeLs != nil { 93 return maybeLs, nil 94 } 95 96 lastShard, err := cs.loadCommitRefInfo(ctx, user) 97 if err != nil { 98 return nil, err 99 } 100 if lastShard == nil { 101 return nil, nil 102 } 103 104 cs.putLastShardCache(lastShard) 105 return lastShard, nil 106} 107 108func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models.Uid, rev string, cid cid.Cid) error { 109 cri := &commitRefInfo{ 110 Uid: uid, 111 Rev: rev, 112 Root: models.DbCID{CID: cid}, 113 } 114 115 if err := cs.db.Clauses(clause.OnConflict{ 116 Columns: []clause.Column{{Name: "uid"}}, 117 UpdateAll: true, 118 }).Create(cri).Error; err != nil { 119 return fmt.Errorf("update or set last commit info: %w", err) 120 } 121 122 cs.putLastShardCache(cri) 123 124 return nil 125} 126 127var commitRefZero = commitRefInfo{} 128 129func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { 130 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") 131 defer span.End() 132 133 // TODO: ensure that we don't write updates on top of the wrong head 134 // this needs to be a compare and swap type operation 135 lastShard, err := cs.getCommitRefInfo(ctx, user) 136 if err != nil { 137 return nil, err 138 } 139 140 if lastShard == nil { 141 // ok, no previous user state to refer to 142 lastShard = &commitRefZero 143 } else if since != nil && *since != lastShard.Rev { 144 cs.log.Warn("revision mismatch", "commitSince", since, "lastRev", lastShard.Rev, "err", ErrRepoBaseMismatch) 145 } 146 147 return &DeltaSession{ 148 blks: make(map[cid.Cid]blockformat.Block), 149 base: &userView{ 150 user: user, 151 cs: cs, 152 prefetch: true, 153 cache: make(map[cid.Cid]blockformat.Block), 154 }, 155 user: user, 156 baseCid: lastShard.Root.CID, 157 cs: cs, 158 seq: 0, 159 lastRev: lastShard.Rev, 160 }, nil 161} 162 163func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { 164 return &DeltaSession{ 165 base: &userView{ 166 user: user, 167 cs: cs, 168 prefetch: false, 169 cache: make(map[cid.Cid]blockformat.Block), 170 }, 171 readonly: true, 172 user: user, 173 cs: cs, 174 }, nil 175} 176 177// TODO: incremental is only ever called true, remove the param 178func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { 179 return fmt.Errorf("not supported in non-archival mode") 180} 181 182func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { 183 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") 184 defer span.End() 185 186 carr, err := car.NewCarReader(bytes.NewReader(carslice)) 187 if err != nil { 188 return cid.Undef, nil, err 189 } 190 191 if len(carr.Header.Roots) != 1 { 192 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) 193 } 194 195 ds, err := cs.NewDeltaSession(ctx, uid, since) 196 if err != nil { 197 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) 198 } 199 200 var cids []cid.Cid 201 for { 202 blk, err := carr.Next() 203 if err != nil { 204 if err == io.EOF { 205 break 206 } 207 return cid.Undef, nil, err 208 } 209 210 cids = append(cids, blk.Cid()) 211 212 if err := ds.Put(ctx, blk); err != nil { 213 return cid.Undef, nil, err 214 } 215 } 216 217 return carr.Header.Roots[0], ds, nil 218} 219 220func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 221 lastShard, err := cs.getCommitRefInfo(ctx, user) 222 if err != nil { 223 return cid.Undef, err 224 } 225 if lastShard == nil || lastShard.ID == 0 { 226 return cid.Undef, nil 227 } 228 229 return lastShard.Root.CID, nil 230} 231 232func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 233 lastShard, err := cs.getCommitRefInfo(ctx, user) 234 if err != nil { 235 return "", err 236 } 237 if lastShard == nil || lastShard.ID == 0 { 238 return "", nil 239 } 240 241 return lastShard.Rev, nil 242} 243 244func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { 245 return nil, nil 246} 247 248func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error { 249 if err := cs.db.Raw("DELETE from commit_ref_infos WHERE uid = ?", user).Error; err != nil { 250 return err 251 } 252 253 cs.removeLastShardCache(user) 254 return nil 255} 256 257func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { 258 return nil, fmt.Errorf("compaction not supported on non-archival") 259} 260 261func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { 262 return nil, fmt.Errorf("compaction not supported in non-archival") 263} 264 265func (cs *NonArchivalCarstore) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) { 266 return false, nil 267} 268 269func (cs *NonArchivalCarstore) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) { 270 return "", 0, 0, ipld.ErrNotFound{Cid: k} 271} 272 273func (cs *NonArchivalCarstore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 274 slice, err := blocksToCar(ctx, root, rev, blks) 275 if err != nil { 276 return nil, err 277 } 278 return slice, cs.updateLastCommit(ctx, user, rev, root) 279}