fork of indigo with slightly nicer lexgen
at main 16 kB view raw
1package carstore 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "errors" 8 "fmt" 9 "go.opentelemetry.io/otel/attribute" 10 "io" 11 "log/slog" 12 "os" 13 "path/filepath" 14 15 "github.com/bluesky-social/indigo/models" 16 blockformat "github.com/ipfs/go-block-format" 17 "github.com/ipfs/go-cid" 18 "github.com/ipfs/go-libipfs/blocks" 19 "github.com/ipld/go-car" 20 _ "github.com/mattn/go-sqlite3" 21 "github.com/prometheus/client_golang/prometheus" 22 "github.com/prometheus/client_golang/prometheus/promauto" 23 "go.opentelemetry.io/otel" 24) 25 26// var log = logging.Logger("sqstore") 27 28type SQLiteStore struct { 29 dbPath string 30 db *sql.DB 31 32 log *slog.Logger 33 34 lastShardCache lastShardCache 35} 36 37func ensureDir(path string) error { 38 fi, err := os.Stat(path) 39 if err != nil { 40 if os.IsNotExist(err) { 41 return os.MkdirAll(path, 0755) 42 } 43 return err 44 } 45 if fi.IsDir() { 46 return nil 47 } 48 return fmt.Errorf("%s exists but is not a directory", path) 49} 50 51func NewSqliteStore(csdir string) (*SQLiteStore, error) { 52 if err := ensureDir(csdir); err != nil { 53 return nil, err 54 } 55 dbpath := filepath.Join(csdir, "db.sqlite3") 56 out := new(SQLiteStore) 57 err := out.Open(dbpath) 58 if err != nil { 59 return nil, err 60 } 61 return out, nil 62} 63 64func (sqs *SQLiteStore) Open(path string) error { 65 if sqs.log == nil { 66 sqs.log = slog.Default() 67 } 68 sqs.log.Debug("open db", "path", path) 69 db, err := sql.Open("sqlite3", path) 70 if err != nil { 71 return fmt.Errorf("%s: sqlite could not open, %w", path, err) 72 } 73 sqs.db = db 74 sqs.dbPath = path 75 err = sqs.createTables() 76 if err != nil { 77 return fmt.Errorf("%s: sqlite could not create tables, %w", path, err) 78 } 79 sqs.lastShardCache.source = sqs 80 sqs.lastShardCache.Init() 81 return nil 82} 83 84func (sqs *SQLiteStore) createTables() error { 85 tx, err := sqs.db.Begin() 86 if err != nil { 87 return err 88 } 89 defer tx.Rollback() 90 _, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));") 91 if err != nil { 92 return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err) 93 } 94 _, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)") 95 if err != nil { 96 return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err) 97 } 98 return tx.Commit() 99} 100 101// writeNewShard needed for DeltaSession.CloseWithRoot 102func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 103 sqWriteNewShard.Inc() 104 sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks)) 105 ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard") 106 defer span.End() 107 // this is "write many blocks", "write one block" is above in putBlock(). keep them in sync. 108 buf := new(bytes.Buffer) 109 hnw, err := WriteCarHeader(buf, root) 110 if err != nil { 111 return nil, fmt.Errorf("failed to write car header: %w", err) 112 } 113 offset := hnw 114 115 tx, err := sqs.db.BeginTx(ctx, nil) 116 if err != nil { 117 return nil, fmt.Errorf("bad block insert tx, %w", err) 118 } 119 defer tx.Rollback() 120 insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block") 121 if err != nil { 122 return nil, fmt.Errorf("bad block insert sql, %w", err) 123 } 124 defer insertStatement.Close() 125 126 dbroot := models.DbCID{CID: root} 127 128 span.SetAttributes(attribute.Int("blocks", len(blks))) 129 130 for bcid, block := range blks { 131 // build shard for output firehose 132 nw, err := LdWrite(buf, bcid.Bytes(), block.RawData()) 133 if err != nil { 134 return nil, fmt.Errorf("failed to write block: %w", err) 135 } 136 offset += nw 137 138 // TODO: better databases have an insert-many option for a prepared statement 139 dbcid := models.DbCID{CID: bcid} 140 blockbytes := block.RawData() 141 _, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes) 142 if err != nil { 143 return nil, fmt.Errorf("(uid,cid) block store failed, %w", err) 144 } 145 sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes)) 146 } 147 err = tx.Commit() 148 if err != nil { 149 return nil, fmt.Errorf("bad block insert commit, %w", err) 150 } 151 152 shard := CarShard{ 153 Root: models.DbCID{CID: root}, 154 DataStart: hnw, 155 Seq: seq, 156 Usr: user, 157 Rev: rev, 158 } 159 160 sqs.lastShardCache.put(&shard) 161 162 return buf.Bytes(), nil 163} 164 165var ErrNothingThere = errors.New("nothing to read)") 166 167// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache 168// What we actually seem to need from this: last {Rev, Root.CID} 169func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) { 170 sqGetLastShard.Inc() 171 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 172 if err != nil { 173 return nil, fmt.Errorf("bad last shard tx, %w", err) 174 } 175 defer tx.Rollback() 176 qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1") 177 if err != nil { 178 return nil, fmt.Errorf("bad last shard sql, %w", err) 179 } 180 rows, err := qstmt.QueryContext(ctx, uid) 181 if err != nil { 182 return nil, fmt.Errorf("last shard err, %w", err) 183 } 184 if rows.Next() { 185 var rev string 186 var rootb models.DbCID 187 err = rows.Scan(&rev, &rootb) 188 if err != nil { 189 return nil, fmt.Errorf("last shard bad scan, %w", err) 190 } 191 return &CarShard{ 192 Root: rootb, 193 Rev: rev, 194 }, nil 195 } 196 return nil, nil 197} 198 199func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { 200 sqs.log.Warn("TODO: don't call compaction") 201 return nil, nil 202} 203 204func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { 205 sqs.log.Warn("TODO: don't call compaction targets") 206 return nil, nil 207} 208 209func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 210 // TODO: same as FileCarStore; re-unify 211 lastShard, err := sqs.lastShardCache.get(ctx, user) 212 if err != nil { 213 return cid.Undef, err 214 } 215 if lastShard == nil { 216 return cid.Undef, nil 217 } 218 if lastShard.ID == 0 { 219 return cid.Undef, nil 220 } 221 222 return lastShard.Root.CID, nil 223} 224 225func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 226 // TODO: same as FileCarStore; re-unify 227 lastShard, err := sqs.lastShardCache.get(ctx, user) 228 if err != nil { 229 return "", err 230 } 231 if lastShard == nil { 232 return "", nil 233 } 234 if lastShard.ID == 0 { 235 return "", nil 236 } 237 238 return lastShard.Rev, nil 239} 240 241func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { 242 // TODO: same as FileCarStore, re-unify 243 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") 244 defer span.End() 245 246 carr, err := car.NewCarReader(bytes.NewReader(carslice)) 247 if err != nil { 248 return cid.Undef, nil, err 249 } 250 251 if len(carr.Header.Roots) != 1 { 252 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) 253 } 254 255 ds, err := sqs.NewDeltaSession(ctx, uid, since) 256 if err != nil { 257 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) 258 } 259 260 var cids []cid.Cid 261 for { 262 blk, err := carr.Next() 263 if err != nil { 264 if err == io.EOF { 265 break 266 } 267 return cid.Undef, nil, err 268 } 269 270 cids = append(cids, blk.Cid()) 271 272 if err := ds.Put(ctx, blk); err != nil { 273 return cid.Undef, nil, err 274 } 275 } 276 277 return carr.Header.Roots[0], ds, nil 278} 279 280var zeroShard CarShard 281 282func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { 283 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") 284 defer span.End() 285 286 // TODO: ensure that we don't write updates on top of the wrong head 287 // this needs to be a compare and swap type operation 288 lastShard, err := sqs.lastShardCache.get(ctx, user) 289 if err != nil { 290 return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err) 291 } 292 293 if lastShard == nil { 294 lastShard = &zeroShard 295 } 296 297 if since != nil && *since != lastShard.Rev { 298 return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) 299 } 300 301 return &DeltaSession{ 302 blks: make(map[cid.Cid]blockformat.Block), 303 base: &sqliteUserView{ 304 uid: user, 305 sqs: sqs, 306 }, 307 user: user, 308 baseCid: lastShard.Root.CID, 309 cs: sqs, 310 seq: lastShard.Seq + 1, 311 lastRev: lastShard.Rev, 312 }, nil 313} 314 315func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { 316 return &DeltaSession{ 317 base: &sqliteUserView{ 318 uid: user, 319 sqs: sqs, 320 }, 321 readonly: true, 322 user: user, 323 cs: sqs, 324 }, nil 325} 326 327type cartmp struct { 328 xcid cid.Cid 329 rev string 330 root string 331 block []byte 332} 333 334// ReadUserCar 335// incremental is only ever called true 336func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { 337 sqGetCar.Inc() 338 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") 339 defer span.End() 340 341 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 342 if err != nil { 343 return fmt.Errorf("rcar tx, %w", err) 344 } 345 defer tx.Rollback() 346 qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC") 347 if err != nil { 348 return fmt.Errorf("rcar sql, %w", err) 349 } 350 defer qstmt.Close() 351 rows, err := qstmt.QueryContext(ctx, user, sinceRev) 352 if err != nil { 353 return fmt.Errorf("rcar err, %w", err) 354 } 355 nblocks := 0 356 first := true 357 for rows.Next() { 358 var xcid models.DbCID 359 var xrev string 360 var xroot models.DbCID 361 var xblock []byte 362 err = rows.Scan(&xcid, &xrev, &xroot, &xblock) 363 if err != nil { 364 return fmt.Errorf("rcar bad scan, %w", err) 365 } 366 if first { 367 if err := car.WriteHeader(&car.CarHeader{ 368 Roots: []cid.Cid{xroot.CID}, 369 Version: 1, 370 }, shardOut); err != nil { 371 return fmt.Errorf("rcar bad header, %w", err) 372 } 373 first = false 374 } 375 nblocks++ 376 _, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock) 377 if err != nil { 378 return fmt.Errorf("rcar bad write, %w", err) 379 } 380 } 381 sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev) 382 return nil 383} 384 385// Stat is only used in a debugging admin handler 386// don't bother implementing it (for now?) 387func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { 388 sqs.log.Warn("Stat debugging method not implemented for sqlite store") 389 return nil, nil 390} 391 392func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error { 393 ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData") 394 defer span.End() 395 tx, err := sqs.db.BeginTx(ctx, nil) 396 if err != nil { 397 return fmt.Errorf("wipe tx, %w", err) 398 } 399 defer tx.Rollback() 400 deleteResult, err := tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user) 401 nrows, ierr := deleteResult.RowsAffected() 402 if ierr == nil { 403 sqRowsDeleted.Add(float64(nrows)) 404 } 405 if err == nil { 406 err = ierr 407 } 408 if err == nil { 409 err = tx.Commit() 410 } 411 return err 412} 413 414var txReadOnly = sql.TxOptions{ReadOnly: true} 415 416// HasUidCid needed for NewDeltaSession userView 417func (sqs *SQLiteStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) { 418 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData 419 sqHas.Inc() 420 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 421 if err != nil { 422 return false, fmt.Errorf("hasUC tx, %w", err) 423 } 424 defer tx.Rollback() 425 qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") 426 if err != nil { 427 return false, fmt.Errorf("hasUC sql, %w", err) 428 } 429 defer qstmt.Close() 430 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) 431 if err != nil { 432 return false, fmt.Errorf("hasUC err, %w", err) 433 } 434 if rows.Next() { 435 var rev string 436 var rootb models.DbCID 437 err = rows.Scan(&rev, &rootb) 438 if err != nil { 439 return false, fmt.Errorf("hasUC bad scan, %w", err) 440 } 441 return true, nil 442 } 443 return false, nil 444} 445 446func (sqs *SQLiteStore) CarStore() CarStore { 447 return sqs 448} 449 450func (sqs *SQLiteStore) Close() error { 451 return sqs.db.Close() 452} 453 454func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) { 455 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData 456 sqGetBlock.Inc() 457 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 458 if err != nil { 459 return nil, fmt.Errorf("getb tx, %w", err) 460 } 461 defer tx.Rollback() 462 qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") 463 if err != nil { 464 return nil, fmt.Errorf("getb sql, %w", err) 465 } 466 defer qstmt.Close() 467 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) 468 if err != nil { 469 return nil, fmt.Errorf("getb err, %w", err) 470 } 471 if rows.Next() { 472 //var rev string 473 //var rootb models.DbCID 474 var blockb []byte 475 err = rows.Scan(&blockb) 476 if err != nil { 477 return nil, fmt.Errorf("getb bad scan, %w", err) 478 } 479 return blocks.NewBlock(blockb), nil 480 } 481 return nil, ErrNothingThere 482} 483 484func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) { 485 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData 486 sqGetBlockSize.Inc() 487 tx, err := sqs.db.BeginTx(ctx, &txReadOnly) 488 if err != nil { 489 return 0, fmt.Errorf("getbs tx, %w", err) 490 } 491 defer tx.Rollback() 492 qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1") 493 if err != nil { 494 return 0, fmt.Errorf("getbs sql, %w", err) 495 } 496 defer qstmt.Close() 497 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid}) 498 if err != nil { 499 return 0, fmt.Errorf("getbs err, %w", err) 500 } 501 if rows.Next() { 502 var out int64 503 err = rows.Scan(&out) 504 if err != nil { 505 return 0, fmt.Errorf("getbs bad scan, %w", err) 506 } 507 return out, nil 508 } 509 return 0, nil 510} 511 512type sqliteUserViewInner interface { 513 HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) 514 getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) 515 getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) 516} 517 518// TODO: rename, used by both sqlite and scylla 519type sqliteUserView struct { 520 sqs sqliteUserViewInner 521 uid models.Uid 522} 523 524func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) { 525 // TODO: cache block metadata? 526 return s.sqs.HasUidCid(ctx, s.uid, c) 527} 528 529func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) { 530 // TODO: cache blocks? 531 return s.sqs.getBlock(ctx, s.uid, c) 532} 533 534func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) { 535 // TODO: cache block metadata? 536 bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c) 537 return int(bigsize), err 538} 539 540// ensure we implement the interface 541var _ minBlockstore = (*sqliteUserView)(nil) 542 543var sqRowsDeleted = promauto.NewCounter(prometheus.CounterOpts{ 544 Name: "bgs_sq_rows_deleted", 545 Help: "User rows deleted in sqlite backend", 546}) 547 548var sqGetBlock = promauto.NewCounter(prometheus.CounterOpts{ 549 Name: "bgs_sq_get_block", 550 Help: "get block sqlite backend", 551}) 552 553var sqGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{ 554 Name: "bgs_sq_get_block_size", 555 Help: "get block size sqlite backend", 556}) 557 558var sqGetCar = promauto.NewCounter(prometheus.CounterOpts{ 559 Name: "bgs_sq_get_car", 560 Help: "get block sqlite backend", 561}) 562 563var sqHas = promauto.NewCounter(prometheus.CounterOpts{ 564 Name: "bgs_sq_has", 565 Help: "check block presence sqlite backend", 566}) 567 568var sqGetLastShard = promauto.NewCounter(prometheus.CounterOpts{ 569 Name: "bgs_sq_get_last_shard", 570 Help: "get last shard sqlite backend", 571}) 572 573var sqWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{ 574 Name: "bgs_sq_write_shard", 575 Help: "write shard blocks sqlite backend", 576})