fork of indigo with slightly nicer lexgen
at main 9.6 kB view raw
1package carstore 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "strconv" 9 "strings" 10 "time" 11 12 "github.com/bluesky-social/indigo/models" 13 "github.com/ipfs/go-cid" 14 "go.opentelemetry.io/otel" 15 "gorm.io/driver/postgres" 16 "gorm.io/gorm" 17) 18 19type CarStoreGormMeta struct { 20 meta *gorm.DB 21} 22 23func (cs *CarStoreGormMeta) Init() error { 24 if err := cs.meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { 25 return err 26 } 27 if err := cs.meta.AutoMigrate(&staleRef{}); err != nil { 28 return err 29 } 30 return nil 31} 32 33// Return true if any known record matches (Uid, Cid) 34func (cs *CarStoreGormMeta) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) { 35 var count int64 36 if err := cs.meta. 37 Model(blockRef{}). 38 Select("path, block_refs.offset"). 39 Joins("left join car_shards on block_refs.shard = car_shards.id"). 40 Where("usr = ? AND cid = ?", user, models.DbCID{CID: k}). 41 Count(&count).Error; err != nil { 42 return false, err 43 } 44 45 return count > 0, nil 46} 47 48// For some Cid, lookup the block ref. 49// Return the path of the file written, the offset within the file, and the user associated with the Cid. 50func (cs *CarStoreGormMeta) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) { 51 // TODO: for now, im using a join to ensure we only query blocks from the 52 // correct user. maybe it makes sense to put the user in the blockRef 53 // directly? tradeoff of time vs space 54 var info struct { 55 Path string 56 Offset int64 57 Usr models.Uid 58 } 59 if err := cs.meta.Raw(`SELECT 60 (select path from car_shards where id = block_refs.shard) as path, 61 block_refs.offset, 62 (select usr from car_shards where id = block_refs.shard) as usr 63FROM block_refs 64WHERE 65 block_refs.cid = ? 66LIMIT 1;`, models.DbCID{CID: k}).Scan(&info).Error; err != nil { 67 var defaultUser models.Uid 68 return "", -1, defaultUser, err 69 } 70 return info.Path, info.Offset, info.Usr, nil 71} 72 73func (cs *CarStoreGormMeta) GetLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { 74 var lastShard CarShard 75 if err := cs.meta.WithContext(ctx).Model(CarShard{}).Limit(1).Order("seq desc").Find(&lastShard, "usr = ?", user).Error; err != nil { 76 return nil, err 77 } 78 return &lastShard, nil 79} 80 81// return all of a users's shards, ascending by Seq 82func (cs *CarStoreGormMeta) GetUserShards(ctx context.Context, usr models.Uid) ([]CarShard, error) { 83 var shards []CarShard 84 if err := cs.meta.Order("seq asc").Find(&shards, "usr = ?", usr).Error; err != nil { 85 return nil, err 86 } 87 return shards, nil 88} 89 90// return all of a users's shards, descending by Seq 91func (cs *CarStoreGormMeta) GetUserShardsDesc(ctx context.Context, usr models.Uid, minSeq int) ([]CarShard, error) { 92 var shards []CarShard 93 if err := cs.meta.Order("seq desc").Find(&shards, "usr = ? AND seq >= ?", usr, minSeq).Error; err != nil { 94 return nil, err 95 } 96 return shards, nil 97} 98 99func (cs *CarStoreGormMeta) GetUserStaleRefs(ctx context.Context, user models.Uid) ([]staleRef, error) { 100 var staleRefs []staleRef 101 if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil { 102 return nil, err 103 } 104 return staleRefs, nil 105} 106 107func (cs *CarStoreGormMeta) SeqForRev(ctx context.Context, user models.Uid, sinceRev string) (int, error) { 108 var untilShard CarShard 109 if err := cs.meta.Where("rev >= ? AND usr = ?", sinceRev, user).Order("rev").First(&untilShard).Error; err != nil { 110 return 0, fmt.Errorf("finding early shard: %w", err) 111 } 112 return untilShard.Seq, nil 113} 114 115func (cs *CarStoreGormMeta) GetCompactionTargets(ctx context.Context, minShardCount int) ([]CompactionTarget, error) { 116 var targets []CompactionTarget 117 if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > ? order by num_shards desc`, minShardCount).Scan(&targets).Error; err != nil { 118 return nil, err 119 } 120 121 return targets, nil 122} 123 124func (cs *CarStoreGormMeta) PutShardAndRefs(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool) error { 125 // TODO: there should be a way to create the shard and block_refs that 126 // reference it in the same query, would save a lot of time 127 tx := cs.meta.WithContext(ctx).Begin() 128 129 if err := tx.WithContext(ctx).Create(shard).Error; err != nil { 130 return fmt.Errorf("failed to create shard in DB tx: %w", err) 131 } 132 133 for _, ref := range brefs { 134 ref["shard"] = shard.ID 135 } 136 137 if err := createBlockRefs(ctx, tx, brefs); err != nil { 138 return fmt.Errorf("failed to create block refs: %w", err) 139 } 140 141 if len(rmcids) > 0 { 142 cids := make([]cid.Cid, 0, len(rmcids)) 143 for c := range rmcids { 144 cids = append(cids, c) 145 } 146 147 if err := tx.Create(&staleRef{ 148 Cids: packCids(cids), 149 Usr: shard.Usr, 150 }).Error; err != nil { 151 return err 152 } 153 } 154 155 err := tx.WithContext(ctx).Commit().Error 156 if err != nil { 157 return fmt.Errorf("failed to commit shard DB transaction: %w", err) 158 } 159 return nil 160} 161 162func (cs *CarStoreGormMeta) DeleteShardsAndRefs(ctx context.Context, ids []uint) error { 163 txn := cs.meta.Begin() 164 165 if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil { 166 txn.Rollback() 167 return err 168 } 169 170 if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil { 171 txn.Rollback() 172 return err 173 } 174 175 return txn.Commit().Error 176} 177 178func (cs *CarStoreGormMeta) GetBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) { 179 chunkSize := 2000 180 out := make([]blockRef, 0, len(shardIds)) 181 for i := 0; i < len(shardIds); i += chunkSize { 182 sl := shardIds[i:] 183 if len(sl) > chunkSize { 184 sl = sl[:chunkSize] 185 } 186 187 if err := blockRefsForShards(ctx, cs.meta, sl, &out); err != nil { 188 return nil, fmt.Errorf("getting block refs: %w", err) 189 } 190 } 191 return out, nil 192} 193 194// blockRefsForShards is an inner loop helper for GetBlockRefsForShards 195func blockRefsForShards(ctx context.Context, db *gorm.DB, shards []uint, obuf *[]blockRef) error { 196 // Check the database driver 197 switch db.Dialector.(type) { 198 case *postgres.Dialector: 199 sval := valuesStatementForShards(shards) 200 q := fmt.Sprintf(`SELECT block_refs.* FROM block_refs INNER JOIN (VALUES %s) AS vals(v) ON block_refs.shard = v`, sval) 201 return db.Raw(q).Scan(obuf).Error 202 default: 203 return db.Raw(`SELECT * FROM block_refs WHERE shard IN (?)`, shards).Scan(obuf).Error 204 } 205} 206 207// valuesStatementForShards builds a postgres compatible statement string from int literals 208func valuesStatementForShards(shards []uint) string { 209 sb := new(strings.Builder) 210 for i, v := range shards { 211 sb.WriteByte('(') 212 sb.WriteString(strconv.Itoa(int(v))) 213 sb.WriteByte(')') 214 if i != len(shards)-1 { 215 sb.WriteByte(',') 216 } 217 } 218 return sb.String() 219} 220 221func (cs *CarStoreGormMeta) SetStaleRef(ctx context.Context, uid models.Uid, staleToKeep []cid.Cid) error { 222 txn := cs.meta.Begin() 223 224 if err := txn.Delete(&staleRef{}, "usr = ?", uid).Error; err != nil { 225 return err 226 } 227 228 // now create a new staleRef with all the refs we couldn't clear out 229 if len(staleToKeep) > 0 { 230 if err := txn.Create(&staleRef{ 231 Usr: uid, 232 Cids: packCids(staleToKeep), 233 }).Error; err != nil { 234 return err 235 } 236 } 237 238 if err := txn.Commit().Error; err != nil { 239 return fmt.Errorf("failed to commit staleRef updates: %w", err) 240 } 241 return nil 242} 243 244type CarShard struct { 245 ID uint `gorm:"primarykey"` 246 CreatedAt time.Time 247 248 Root models.DbCID `gorm:"index"` 249 DataStart int64 250 Seq int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"` 251 Path string 252 Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"` 253 Rev string 254} 255 256type blockRef struct { 257 ID uint `gorm:"primarykey"` 258 Cid models.DbCID `gorm:"index"` 259 Shard uint `gorm:"index"` 260 Offset int64 261 //User uint `gorm:"index"` 262} 263 264type staleRef struct { 265 ID uint `gorm:"primarykey"` 266 Cid *models.DbCID 267 Cids []byte 268 Usr models.Uid `gorm:"index"` 269} 270 271func (sr *staleRef) getCids() ([]cid.Cid, error) { 272 if sr.Cid != nil { 273 return []cid.Cid{sr.Cid.CID}, nil 274 } 275 276 return unpackCids(sr.Cids) 277} 278 279func unpackCids(b []byte) ([]cid.Cid, error) { 280 br := bytes.NewReader(b) 281 var out []cid.Cid 282 for { 283 _, c, err := cid.CidFromReader(br) 284 if err != nil { 285 if err == io.EOF { 286 break 287 } 288 return nil, err 289 } 290 291 out = append(out, c) 292 } 293 294 return out, nil 295} 296 297func packCids(cids []cid.Cid) []byte { 298 buf := new(bytes.Buffer) 299 for _, c := range cids { 300 buf.Write(c.Bytes()) 301 } 302 303 return buf.Bytes() 304} 305 306func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) error { 307 ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs") 308 defer span.End() 309 310 if err := createInBatches(ctx, tx, brefs, 2000); err != nil { 311 return err 312 } 313 314 return nil 315} 316 317// Function to create in batches 318func createInBatches(ctx context.Context, tx *gorm.DB, brefs []map[string]any, batchSize int) error { 319 for i := 0; i < len(brefs); i += batchSize { 320 batch := brefs[i:] 321 if len(batch) > batchSize { 322 batch = batch[:batchSize] 323 } 324 325 query, values := generateInsertQuery(batch) 326 327 if err := tx.WithContext(ctx).Exec(query, values...).Error; err != nil { 328 return err 329 } 330 } 331 return nil 332} 333 334func generateInsertQuery(brefs []map[string]any) (string, []any) { 335 placeholders := strings.Repeat("(?, ?, ?),", len(brefs)) 336 placeholders = placeholders[:len(placeholders)-1] // trim trailing comma 337 338 query := "INSERT INTO block_refs (\"cid\", \"offset\", \"shard\") VALUES " + placeholders 339 340 values := make([]any, 0, 3*len(brefs)) 341 for _, entry := range brefs { 342 values = append(values, entry["cid"], entry["offset"], entry["shard"]) 343 } 344 345 return query, values 346}