1package carstore
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "errors"
8 "fmt"
9 "go.opentelemetry.io/otel/attribute"
10 "io"
11 "log/slog"
12 "os"
13 "path/filepath"
14
15 "github.com/bluesky-social/indigo/models"
16 blockformat "github.com/ipfs/go-block-format"
17 "github.com/ipfs/go-cid"
18 "github.com/ipfs/go-libipfs/blocks"
19 "github.com/ipld/go-car"
20 _ "github.com/mattn/go-sqlite3"
21 "github.com/prometheus/client_golang/prometheus"
22 "github.com/prometheus/client_golang/prometheus/promauto"
23 "go.opentelemetry.io/otel"
24)
25
26// var log = logging.Logger("sqstore")
27
28type SQLiteStore struct {
29 dbPath string
30 db *sql.DB
31
32 log *slog.Logger
33
34 lastShardCache lastShardCache
35}
36
37func ensureDir(path string) error {
38 fi, err := os.Stat(path)
39 if err != nil {
40 if os.IsNotExist(err) {
41 return os.MkdirAll(path, 0755)
42 }
43 return err
44 }
45 if fi.IsDir() {
46 return nil
47 }
48 return fmt.Errorf("%s exists but is not a directory", path)
49}
50
51func NewSqliteStore(csdir string) (*SQLiteStore, error) {
52 if err := ensureDir(csdir); err != nil {
53 return nil, err
54 }
55 dbpath := filepath.Join(csdir, "db.sqlite3")
56 out := new(SQLiteStore)
57 err := out.Open(dbpath)
58 if err != nil {
59 return nil, err
60 }
61 return out, nil
62}
63
64func (sqs *SQLiteStore) Open(path string) error {
65 if sqs.log == nil {
66 sqs.log = slog.Default()
67 }
68 sqs.log.Debug("open db", "path", path)
69 db, err := sql.Open("sqlite3", path)
70 if err != nil {
71 return fmt.Errorf("%s: sqlite could not open, %w", path, err)
72 }
73 sqs.db = db
74 sqs.dbPath = path
75 err = sqs.createTables()
76 if err != nil {
77 return fmt.Errorf("%s: sqlite could not create tables, %w", path, err)
78 }
79 sqs.lastShardCache.source = sqs
80 sqs.lastShardCache.Init()
81 return nil
82}
83
84func (sqs *SQLiteStore) createTables() error {
85 tx, err := sqs.db.Begin()
86 if err != nil {
87 return err
88 }
89 defer tx.Rollback()
90 _, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));")
91 if err != nil {
92 return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err)
93 }
94 _, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)")
95 if err != nil {
96 return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err)
97 }
98 return tx.Commit()
99}
100
101// writeNewShard needed for DeltaSession.CloseWithRoot
102func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
103 sqWriteNewShard.Inc()
104 sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks))
105 ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard")
106 defer span.End()
107 // this is "write many blocks", "write one block" is above in putBlock(). keep them in sync.
108 buf := new(bytes.Buffer)
109 hnw, err := WriteCarHeader(buf, root)
110 if err != nil {
111 return nil, fmt.Errorf("failed to write car header: %w", err)
112 }
113 offset := hnw
114
115 tx, err := sqs.db.BeginTx(ctx, nil)
116 if err != nil {
117 return nil, fmt.Errorf("bad block insert tx, %w", err)
118 }
119 defer tx.Rollback()
120 insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block")
121 if err != nil {
122 return nil, fmt.Errorf("bad block insert sql, %w", err)
123 }
124 defer insertStatement.Close()
125
126 dbroot := models.DbCID{CID: root}
127
128 span.SetAttributes(attribute.Int("blocks", len(blks)))
129
130 for bcid, block := range blks {
131 // build shard for output firehose
132 nw, err := LdWrite(buf, bcid.Bytes(), block.RawData())
133 if err != nil {
134 return nil, fmt.Errorf("failed to write block: %w", err)
135 }
136 offset += nw
137
138 // TODO: better databases have an insert-many option for a prepared statement
139 dbcid := models.DbCID{CID: bcid}
140 blockbytes := block.RawData()
141 _, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes)
142 if err != nil {
143 return nil, fmt.Errorf("(uid,cid) block store failed, %w", err)
144 }
145 sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes))
146 }
147 err = tx.Commit()
148 if err != nil {
149 return nil, fmt.Errorf("bad block insert commit, %w", err)
150 }
151
152 shard := CarShard{
153 Root: models.DbCID{CID: root},
154 DataStart: hnw,
155 Seq: seq,
156 Usr: user,
157 Rev: rev,
158 }
159
160 sqs.lastShardCache.put(&shard)
161
162 return buf.Bytes(), nil
163}
164
165var ErrNothingThere = errors.New("nothing to read)")
166
167// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache
168// What we actually seem to need from this: last {Rev, Root.CID}
169func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) {
170 sqGetLastShard.Inc()
171 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
172 if err != nil {
173 return nil, fmt.Errorf("bad last shard tx, %w", err)
174 }
175 defer tx.Rollback()
176 qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1")
177 if err != nil {
178 return nil, fmt.Errorf("bad last shard sql, %w", err)
179 }
180 rows, err := qstmt.QueryContext(ctx, uid)
181 if err != nil {
182 return nil, fmt.Errorf("last shard err, %w", err)
183 }
184 if rows.Next() {
185 var rev string
186 var rootb models.DbCID
187 err = rows.Scan(&rev, &rootb)
188 if err != nil {
189 return nil, fmt.Errorf("last shard bad scan, %w", err)
190 }
191 return &CarShard{
192 Root: rootb,
193 Rev: rev,
194 }, nil
195 }
196 return nil, nil
197}
198
199func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
200 sqs.log.Warn("TODO: don't call compaction")
201 return nil, nil
202}
203
204func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
205 sqs.log.Warn("TODO: don't call compaction targets")
206 return nil, nil
207}
208
209func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
210 // TODO: same as FileCarStore; re-unify
211 lastShard, err := sqs.lastShardCache.get(ctx, user)
212 if err != nil {
213 return cid.Undef, err
214 }
215 if lastShard == nil {
216 return cid.Undef, nil
217 }
218 if lastShard.ID == 0 {
219 return cid.Undef, nil
220 }
221
222 return lastShard.Root.CID, nil
223}
224
225func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
226 // TODO: same as FileCarStore; re-unify
227 lastShard, err := sqs.lastShardCache.get(ctx, user)
228 if err != nil {
229 return "", err
230 }
231 if lastShard == nil {
232 return "", nil
233 }
234 if lastShard.ID == 0 {
235 return "", nil
236 }
237
238 return lastShard.Rev, nil
239}
240
241func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
242 // TODO: same as FileCarStore, re-unify
243 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
244 defer span.End()
245
246 carr, err := car.NewCarReader(bytes.NewReader(carslice))
247 if err != nil {
248 return cid.Undef, nil, err
249 }
250
251 if len(carr.Header.Roots) != 1 {
252 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
253 }
254
255 ds, err := sqs.NewDeltaSession(ctx, uid, since)
256 if err != nil {
257 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
258 }
259
260 var cids []cid.Cid
261 for {
262 blk, err := carr.Next()
263 if err != nil {
264 if err == io.EOF {
265 break
266 }
267 return cid.Undef, nil, err
268 }
269
270 cids = append(cids, blk.Cid())
271
272 if err := ds.Put(ctx, blk); err != nil {
273 return cid.Undef, nil, err
274 }
275 }
276
277 return carr.Header.Roots[0], ds, nil
278}
279
280var zeroShard CarShard
281
282func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
283 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
284 defer span.End()
285
286 // TODO: ensure that we don't write updates on top of the wrong head
287 // this needs to be a compare and swap type operation
288 lastShard, err := sqs.lastShardCache.get(ctx, user)
289 if err != nil {
290 return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err)
291 }
292
293 if lastShard == nil {
294 lastShard = &zeroShard
295 }
296
297 if since != nil && *since != lastShard.Rev {
298 return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
299 }
300
301 return &DeltaSession{
302 blks: make(map[cid.Cid]blockformat.Block),
303 base: &sqliteUserView{
304 uid: user,
305 sqs: sqs,
306 },
307 user: user,
308 baseCid: lastShard.Root.CID,
309 cs: sqs,
310 seq: lastShard.Seq + 1,
311 lastRev: lastShard.Rev,
312 }, nil
313}
314
315func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
316 return &DeltaSession{
317 base: &sqliteUserView{
318 uid: user,
319 sqs: sqs,
320 },
321 readonly: true,
322 user: user,
323 cs: sqs,
324 }, nil
325}
326
327type cartmp struct {
328 xcid cid.Cid
329 rev string
330 root string
331 block []byte
332}
333
334// ReadUserCar
335// incremental is only ever called true
336func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
337 sqGetCar.Inc()
338 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
339 defer span.End()
340
341 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
342 if err != nil {
343 return fmt.Errorf("rcar tx, %w", err)
344 }
345 defer tx.Rollback()
346 qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC")
347 if err != nil {
348 return fmt.Errorf("rcar sql, %w", err)
349 }
350 defer qstmt.Close()
351 rows, err := qstmt.QueryContext(ctx, user, sinceRev)
352 if err != nil {
353 return fmt.Errorf("rcar err, %w", err)
354 }
355 nblocks := 0
356 first := true
357 for rows.Next() {
358 var xcid models.DbCID
359 var xrev string
360 var xroot models.DbCID
361 var xblock []byte
362 err = rows.Scan(&xcid, &xrev, &xroot, &xblock)
363 if err != nil {
364 return fmt.Errorf("rcar bad scan, %w", err)
365 }
366 if first {
367 if err := car.WriteHeader(&car.CarHeader{
368 Roots: []cid.Cid{xroot.CID},
369 Version: 1,
370 }, shardOut); err != nil {
371 return fmt.Errorf("rcar bad header, %w", err)
372 }
373 first = false
374 }
375 nblocks++
376 _, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock)
377 if err != nil {
378 return fmt.Errorf("rcar bad write, %w", err)
379 }
380 }
381 sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev)
382 return nil
383}
384
385// Stat is only used in a debugging admin handler
386// don't bother implementing it (for now?)
387func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
388 sqs.log.Warn("Stat debugging method not implemented for sqlite store")
389 return nil, nil
390}
391
392func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error {
393 ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData")
394 defer span.End()
395 tx, err := sqs.db.BeginTx(ctx, nil)
396 if err != nil {
397 return fmt.Errorf("wipe tx, %w", err)
398 }
399 defer tx.Rollback()
400 deleteResult, err := tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user)
401 nrows, ierr := deleteResult.RowsAffected()
402 if ierr == nil {
403 sqRowsDeleted.Add(float64(nrows))
404 }
405 if err == nil {
406 err = ierr
407 }
408 if err == nil {
409 err = tx.Commit()
410 }
411 return err
412}
413
414var txReadOnly = sql.TxOptions{ReadOnly: true}
415
416// HasUidCid needed for NewDeltaSession userView
417func (sqs *SQLiteStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) {
418 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
419 sqHas.Inc()
420 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
421 if err != nil {
422 return false, fmt.Errorf("hasUC tx, %w", err)
423 }
424 defer tx.Rollback()
425 qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
426 if err != nil {
427 return false, fmt.Errorf("hasUC sql, %w", err)
428 }
429 defer qstmt.Close()
430 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
431 if err != nil {
432 return false, fmt.Errorf("hasUC err, %w", err)
433 }
434 if rows.Next() {
435 var rev string
436 var rootb models.DbCID
437 err = rows.Scan(&rev, &rootb)
438 if err != nil {
439 return false, fmt.Errorf("hasUC bad scan, %w", err)
440 }
441 return true, nil
442 }
443 return false, nil
444}
445
446func (sqs *SQLiteStore) CarStore() CarStore {
447 return sqs
448}
449
450func (sqs *SQLiteStore) Close() error {
451 return sqs.db.Close()
452}
453
454func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) {
455 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
456 sqGetBlock.Inc()
457 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
458 if err != nil {
459 return nil, fmt.Errorf("getb tx, %w", err)
460 }
461 defer tx.Rollback()
462 qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
463 if err != nil {
464 return nil, fmt.Errorf("getb sql, %w", err)
465 }
466 defer qstmt.Close()
467 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
468 if err != nil {
469 return nil, fmt.Errorf("getb err, %w", err)
470 }
471 if rows.Next() {
472 //var rev string
473 //var rootb models.DbCID
474 var blockb []byte
475 err = rows.Scan(&blockb)
476 if err != nil {
477 return nil, fmt.Errorf("getb bad scan, %w", err)
478 }
479 return blocks.NewBlock(blockb), nil
480 }
481 return nil, ErrNothingThere
482}
483
484func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) {
485 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
486 sqGetBlockSize.Inc()
487 tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
488 if err != nil {
489 return 0, fmt.Errorf("getbs tx, %w", err)
490 }
491 defer tx.Rollback()
492 qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
493 if err != nil {
494 return 0, fmt.Errorf("getbs sql, %w", err)
495 }
496 defer qstmt.Close()
497 rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
498 if err != nil {
499 return 0, fmt.Errorf("getbs err, %w", err)
500 }
501 if rows.Next() {
502 var out int64
503 err = rows.Scan(&out)
504 if err != nil {
505 return 0, fmt.Errorf("getbs bad scan, %w", err)
506 }
507 return out, nil
508 }
509 return 0, nil
510}
511
512type sqliteUserViewInner interface {
513 HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)
514 getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error)
515 getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error)
516}
517
518// TODO: rename, used by both sqlite and scylla
519type sqliteUserView struct {
520 sqs sqliteUserViewInner
521 uid models.Uid
522}
523
524func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) {
525 // TODO: cache block metadata?
526 return s.sqs.HasUidCid(ctx, s.uid, c)
527}
528
529func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
530 // TODO: cache blocks?
531 return s.sqs.getBlock(ctx, s.uid, c)
532}
533
534func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) {
535 // TODO: cache block metadata?
536 bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c)
537 return int(bigsize), err
538}
539
540// ensure we implement the interface
541var _ minBlockstore = (*sqliteUserView)(nil)
542
543var sqRowsDeleted = promauto.NewCounter(prometheus.CounterOpts{
544 Name: "bgs_sq_rows_deleted",
545 Help: "User rows deleted in sqlite backend",
546})
547
548var sqGetBlock = promauto.NewCounter(prometheus.CounterOpts{
549 Name: "bgs_sq_get_block",
550 Help: "get block sqlite backend",
551})
552
553var sqGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{
554 Name: "bgs_sq_get_block_size",
555 Help: "get block size sqlite backend",
556})
557
558var sqGetCar = promauto.NewCounter(prometheus.CounterOpts{
559 Name: "bgs_sq_get_car",
560 Help: "get block sqlite backend",
561})
562
563var sqHas = promauto.NewCounter(prometheus.CounterOpts{
564 Name: "bgs_sq_has",
565 Help: "check block presence sqlite backend",
566})
567
568var sqGetLastShard = promauto.NewCounter(prometheus.CounterOpts{
569 Name: "bgs_sq_get_last_shard",
570 Help: "get last shard sqlite backend",
571})
572
573var sqWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{
574 Name: "bgs_sq_write_shard",
575 Help: "write shard blocks sqlite backend",
576})