+34
-7
bgs/admin.go
+34
-7
bgs/admin.go
···
356
356
})
357
357
}
358
358
359
+
type PDSRates struct {
360
+
PerSecond int64 `json:"per_second,omitempty"`
361
+
PerHour int64 `json:"per_hour,omitempty"`
362
+
PerDay int64 `json:"per_day,omitempty"`
363
+
CrawlRate int64 `json:"crawl_rate,omitempty"`
364
+
RepoLimit int64 `json:"repo_limit,omitempty"`
365
+
}
366
+
367
+
func (pr *PDSRates) FromSlurper(s *Slurper) {
368
+
if pr.PerSecond == 0 {
369
+
pr.PerHour = s.DefaultPerSecondLimit
370
+
}
371
+
if pr.PerHour == 0 {
372
+
pr.PerHour = s.DefaultPerHourLimit
373
+
}
374
+
if pr.PerDay == 0 {
375
+
pr.PerDay = s.DefaultPerDayLimit
376
+
}
377
+
if pr.CrawlRate == 0 {
378
+
pr.CrawlRate = int64(s.DefaultCrawlLimit)
379
+
}
380
+
if pr.RepoLimit == 0 {
381
+
pr.RepoLimit = s.DefaultRepoLimit
382
+
}
383
+
}
384
+
359
385
type RateLimitChangeRequest struct {
360
-
Host string `json:"host"`
361
-
PerSecond int64 `json:"per_second"`
362
-
PerHour int64 `json:"per_hour"`
363
-
PerDay int64 `json:"per_day"`
364
-
CrawlRate int64 `json:"crawl_rate"`
365
-
RepoLimit int64 `json:"repo_limit"`
386
+
Host string `json:"host"`
387
+
PDSRates
366
388
}
367
389
368
390
func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error {
···
595
617
596
618
type AdminRequestCrawlRequest struct {
597
619
Hostname string `json:"hostname"`
620
+
621
+
// optional:
622
+
PDSRates
598
623
}
599
624
600
625
func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {
···
647
672
}
648
673
649
674
// Skip checking if the server is online for now
675
+
rateOverrides := body.PDSRates
676
+
rateOverrides.FromSlurper(bgs.slurper)
650
677
651
-
return bgs.slurper.SubscribeToPds(ctx, host, true, true) // Override Trusted Domain Check
678
+
return bgs.slurper.SubscribeToPds(ctx, host, true, true, &rateOverrides) // Override Trusted Domain Check
652
679
}
+8
-1
bgs/fedmgr.go
+8
-1
bgs/fedmgr.go
···
363
363
return !s.newSubsDisabled
364
364
}
365
365
366
-
func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool) error {
366
+
func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool, rateOverrides *PDSRates) error {
367
367
// TODO: for performance, lock on the hostname instead of global
368
368
s.lk.Lock()
369
369
defer s.lk.Unlock()
···
396
396
DailyEventLimit: s.DefaultPerDayLimit,
397
397
CrawlRateLimit: float64(s.DefaultCrawlLimit),
398
398
RepoLimit: s.DefaultRepoLimit,
399
+
}
400
+
if rateOverrides != nil {
401
+
npds.RateLimit = float64(rateOverrides.PerSecond)
402
+
npds.HourlyEventLimit = rateOverrides.PerHour
403
+
npds.DailyEventLimit = rateOverrides.PerDay
404
+
npds.CrawlRateLimit = float64(rateOverrides.CrawlRate)
405
+
npds.RepoLimit = rateOverrides.RepoLimit
399
406
}
400
407
if err := s.db.Create(&npds).Error; err != nil {
401
408
return err
+1
-1
bgs/handlers.go
+1
-1
bgs/handlers.go
···
212
212
}
213
213
}
214
214
215
-
return s.slurper.SubscribeToPds(ctx, host, true, false)
215
+
return s.slurper.SubscribeToPds(ctx, host, true, false, nil)
216
216
}
217
217
218
218
func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error {
+41
carstore/README.md
+41
carstore/README.md
···
1
+
# Carstore
2
+
3
+
Store a zillion users of PDS-like repo, with more limited operations (mainly: firehose in, firehose out).
4
+
5
+
## [ScyllaStore](scylla.go)
6
+
7
+
Blocks stored in ScyllaDB.
8
+
User and PDS metadata stored in gorm (PostgreSQL or sqlite3).
9
+
10
+
## [FileCarStore](bs.go)
11
+
12
+
Store 'car slices' from PDS source subscribeRepo firehose streams to filesystem.
13
+
Store metadata to gorm postgresql (or sqlite3).
14
+
Periodic compaction of car slices into fewer larger car slices.
15
+
User and PDS metadata stored in gorm (PostgreSQL or sqlite3).
16
+
FileCarStore was the first production carstore and used through at least 2024-11.
17
+
18
+
## [SQLiteStore](sqlite_store.go)
19
+
20
+
Experimental/demo.
21
+
Blocks stored in trivial local sqlite3 schema.
22
+
Minimal reference implementation from which fancy scalable/performant implementations may be derived.
23
+
24
+
```sql
25
+
CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid))
26
+
CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)
27
+
28
+
INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block
29
+
30
+
SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1
31
+
32
+
SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC
33
+
34
+
DELETE FROM blocks WHERE uid = ?
35
+
36
+
SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1
37
+
38
+
SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1
39
+
40
+
SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1
41
+
```
+54
-81
carstore/bs.go
+54
-81
carstore/bs.go
···
10
10
"os"
11
11
"path/filepath"
12
12
"sort"
13
-
"sync"
14
13
"sync/atomic"
15
14
"time"
16
15
···
20
19
21
20
blockformat "github.com/ipfs/go-block-format"
22
21
"github.com/ipfs/go-cid"
23
-
"github.com/ipfs/go-datastore"
24
22
blockstore "github.com/ipfs/go-ipfs-blockstore"
25
23
cbor "github.com/ipfs/go-ipld-cbor"
26
24
ipld "github.com/ipfs/go-ipld-format"
···
47
45
const BigShardThreshold = 2 << 20
48
46
49
47
type CarStore interface {
48
+
// TODO: not really part of general interface
50
49
CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error)
50
+
// TODO: not really part of general interface
51
51
GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error)
52
+
52
53
GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
53
54
GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
54
55
ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
···
63
64
meta *CarStoreGormMeta
64
65
rootDirs []string
65
66
66
-
lscLk sync.Mutex
67
-
lastShardCache map[models.Uid]*CarShard
67
+
lastShardCache lastShardCache
68
68
69
69
log *slog.Logger
70
70
}
···
88
88
return nil, err
89
89
}
90
90
91
-
return &FileCarStore{
92
-
meta: &CarStoreGormMeta{meta: meta},
93
-
rootDirs: roots,
94
-
lastShardCache: make(map[models.Uid]*CarShard),
95
-
log: slog.Default().With("system", "carstore"),
96
-
}, nil
91
+
gormMeta := &CarStoreGormMeta{meta: meta}
92
+
out := &FileCarStore{
93
+
meta: gormMeta,
94
+
rootDirs: roots,
95
+
lastShardCache: lastShardCache{
96
+
source: gormMeta,
97
+
},
98
+
log: slog.Default().With("system", "carstore"),
99
+
}
100
+
out.lastShardCache.Init()
101
+
return out, nil
97
102
}
98
103
104
+
// userView needs these things to get into the underlying block store
105
+
// implemented by CarStoreGormMeta
106
+
type userViewSource interface {
107
+
HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error)
108
+
LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error)
109
+
}
110
+
111
+
// wrapper into a block store that keeps track of which user we are working on behalf of
99
112
type userView struct {
100
-
cs CarStore
113
+
cs userViewSource
101
114
user models.Uid
102
115
103
116
cache map[cid.Cid]blockformat.Block
···
115
128
if have {
116
129
return have, nil
117
130
}
118
-
119
-
fcd, ok := uv.cs.(*FileCarStore)
120
-
if !ok {
121
-
return false, nil
122
-
}
123
-
124
-
return fcd.meta.HasUidCid(ctx, uv.user, k)
131
+
return uv.cs.HasUidCid(ctx, uv.user, k)
125
132
}
126
133
127
134
var CacheHits int64
···
143
150
}
144
151
atomic.AddInt64(&CacheMiss, 1)
145
152
146
-
fcd, ok := uv.cs.(*FileCarStore)
147
-
if !ok {
148
-
return nil, ipld.ErrNotFound{Cid: k}
149
-
}
150
-
151
-
path, offset, user, err := fcd.meta.LookupBlockRef(ctx, k)
153
+
path, offset, user, err := uv.cs.LookupBlockRef(ctx, k)
152
154
if err != nil {
153
155
return nil, err
154
156
}
···
279
281
return len(blk.RawData()), nil
280
282
}
281
283
284
+
// subset of blockstore.Blockstore that we actually use here
285
+
type minBlockstore interface {
286
+
Get(ctx context.Context, bcid cid.Cid) (blockformat.Block, error)
287
+
Has(ctx context.Context, bcid cid.Cid) (bool, error)
288
+
GetSize(ctx context.Context, bcid cid.Cid) (int, error)
289
+
}
290
+
282
291
type DeltaSession struct {
283
-
fresh blockstore.Blockstore
284
292
blks map[cid.Cid]blockformat.Block
285
293
rmcids map[cid.Cid]bool
286
-
base blockstore.Blockstore
294
+
base minBlockstore
287
295
user models.Uid
288
296
baseCid cid.Cid
289
297
seq int
290
298
readonly bool
291
-
cs CarStore
299
+
cs shardWriter
292
300
lastRev string
293
301
}
294
302
295
303
func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard {
296
-
cs.lscLk.Lock()
297
-
defer cs.lscLk.Unlock()
298
-
299
-
ls, ok := cs.lastShardCache[user]
300
-
if ok {
301
-
return ls
302
-
}
303
-
304
-
return nil
304
+
return cs.lastShardCache.check(user)
305
305
}
306
306
307
307
func (cs *FileCarStore) removeLastShardCache(user models.Uid) {
308
-
cs.lscLk.Lock()
309
-
defer cs.lscLk.Unlock()
310
-
311
-
delete(cs.lastShardCache, user)
308
+
cs.lastShardCache.remove(user)
312
309
}
313
310
314
311
func (cs *FileCarStore) putLastShardCache(ls *CarShard) {
315
-
cs.lscLk.Lock()
316
-
defer cs.lscLk.Unlock()
317
-
318
-
cs.lastShardCache[ls.Usr] = ls
312
+
cs.lastShardCache.put(ls)
319
313
}
320
314
321
315
func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) {
322
-
ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard")
323
-
defer span.End()
324
-
325
-
maybeLs := cs.checkLastShardCache(user)
326
-
if maybeLs != nil {
327
-
return maybeLs, nil
328
-
}
329
-
330
-
lastShard, err := cs.meta.GetLastShard(ctx, user)
331
-
if err != nil {
332
-
return nil, err
333
-
}
334
-
335
-
cs.putLastShardCache(lastShard)
336
-
return lastShard, nil
316
+
return cs.lastShardCache.get(ctx, user)
337
317
}
338
318
339
319
var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head")
···
354
334
}
355
335
356
336
return &DeltaSession{
357
-
fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()),
358
-
blks: make(map[cid.Cid]blockformat.Block),
337
+
blks: make(map[cid.Cid]blockformat.Block),
359
338
base: &userView{
360
339
user: user,
361
-
cs: cs,
340
+
cs: cs.meta,
362
341
prefetch: true,
363
342
cache: make(map[cid.Cid]blockformat.Block),
364
343
},
···
374
353
return &DeltaSession{
375
354
base: &userView{
376
355
user: user,
377
-
cs: cs,
356
+
cs: cs.meta,
378
357
prefetch: false,
379
358
cache: make(map[cid.Cid]blockformat.Block),
380
359
},
···
385
364
}
386
365
387
366
// TODO: incremental is only ever called true, remove the param
388
-
func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
367
+
func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
389
368
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
390
369
defer span.End()
391
370
···
398
377
}
399
378
}
400
379
401
-
// TODO: Why does ReadUserCar want shards seq DESC but CompactUserShards wants seq ASC ?
402
380
shards, err := cs.meta.GetUserShardsDesc(ctx, user, earlySeq)
403
381
if err != nil {
404
382
return err
···
418
396
if err := car.WriteHeader(&car.CarHeader{
419
397
Roots: []cid.Cid{shards[0].Root.CID},
420
398
Version: 1,
421
-
}, w); err != nil {
399
+
}, shardOut); err != nil {
422
400
return err
423
401
}
424
402
425
403
for _, sh := range shards {
426
-
if err := cs.writeShardBlocks(ctx, &sh, w); err != nil {
404
+
if err := cs.writeShardBlocks(ctx, &sh, shardOut); err != nil {
427
405
return err
428
406
}
429
407
}
···
433
411
434
412
// inner loop part of ReadUserCar
435
413
// copy shard blocks from disk to Writer
436
-
func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error {
414
+
func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, shardOut io.Writer) error {
437
415
ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks")
438
416
defer span.End()
439
417
···
448
426
return err
449
427
}
450
428
451
-
_, err = io.Copy(w, fi)
429
+
_, err = io.Copy(shardOut, fi)
452
430
if err != nil {
453
431
return err
454
432
}
···
603
581
return nil, fmt.Errorf("cannot write to readonly deltaSession")
604
582
}
605
583
606
-
switch ocs := ds.cs.(type) {
607
-
case *FileCarStore:
608
-
return ocs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
609
-
case *NonArchivalCarstore:
610
-
slice, err := blocksToCar(ctx, root, rev, ds.blks)
611
-
if err != nil {
612
-
return nil, err
613
-
}
614
-
return slice, ocs.updateLastCommit(ctx, ds.user, rev, root)
615
-
default:
616
-
return nil, fmt.Errorf("unsupported carstore type")
617
-
}
584
+
return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
618
585
}
619
586
620
587
func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
···
633
600
}
634
601
635
602
return hnw, nil
603
+
}
604
+
605
+
// shardWriter.writeNewShard called from inside DeltaSession.CloseWithRoot
606
+
type shardWriter interface {
607
+
// writeNewShard stores blocks in `blks` arg and creates a new shard to propagate out to our firehose
608
+
writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error)
636
609
}
637
610
638
611
func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) {
+70
carstore/last_shard_cache.go
+70
carstore/last_shard_cache.go
···
1
+
package carstore
2
+
3
+
import (
4
+
"context"
5
+
"github.com/bluesky-social/indigo/models"
6
+
"go.opentelemetry.io/otel"
7
+
"sync"
8
+
)
9
+
10
+
type LastShardSource interface {
11
+
GetLastShard(context.Context, models.Uid) (*CarShard, error)
12
+
}
13
+
14
+
type lastShardCache struct {
15
+
source LastShardSource
16
+
17
+
lscLk sync.Mutex
18
+
lastShardCache map[models.Uid]*CarShard
19
+
}
20
+
21
+
func (lsc *lastShardCache) Init() {
22
+
lsc.lastShardCache = make(map[models.Uid]*CarShard)
23
+
}
24
+
25
+
func (lsc *lastShardCache) check(user models.Uid) *CarShard {
26
+
lsc.lscLk.Lock()
27
+
defer lsc.lscLk.Unlock()
28
+
29
+
ls, ok := lsc.lastShardCache[user]
30
+
if ok {
31
+
return ls
32
+
}
33
+
34
+
return nil
35
+
}
36
+
37
+
func (lsc *lastShardCache) remove(user models.Uid) {
38
+
lsc.lscLk.Lock()
39
+
defer lsc.lscLk.Unlock()
40
+
41
+
delete(lsc.lastShardCache, user)
42
+
}
43
+
44
+
func (lsc *lastShardCache) put(ls *CarShard) {
45
+
if ls == nil {
46
+
return
47
+
}
48
+
lsc.lscLk.Lock()
49
+
defer lsc.lscLk.Unlock()
50
+
51
+
lsc.lastShardCache[ls.Usr] = ls
52
+
}
53
+
54
+
func (lsc *lastShardCache) get(ctx context.Context, user models.Uid) (*CarShard, error) {
55
+
ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard")
56
+
defer span.End()
57
+
58
+
maybeLs := lsc.check(user)
59
+
if maybeLs != nil {
60
+
return maybeLs, nil
61
+
}
62
+
63
+
lastShard, err := lsc.source.GetLastShard(ctx, user)
64
+
if err != nil {
65
+
return nil, err
66
+
}
67
+
68
+
lsc.put(lastShard)
69
+
return lastShard, nil
70
+
}
+18
-4
carstore/nonarchive.go
+18
-4
carstore/nonarchive.go
···
4
4
"bytes"
5
5
"context"
6
6
"fmt"
7
+
ipld "github.com/ipfs/go-ipld-format"
7
8
"io"
8
9
"log/slog"
9
10
"sync"
···
11
12
"github.com/bluesky-social/indigo/models"
12
13
blockformat "github.com/ipfs/go-block-format"
13
14
"github.com/ipfs/go-cid"
14
-
"github.com/ipfs/go-datastore"
15
-
blockstore "github.com/ipfs/go-ipfs-blockstore"
16
15
car "github.com/ipld/go-car"
17
16
"go.opentelemetry.io/otel"
18
17
"gorm.io/gorm"
···
135
134
}
136
135
137
136
return &DeltaSession{
138
-
fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()),
139
-
blks: make(map[cid.Cid]blockformat.Block),
137
+
blks: make(map[cid.Cid]blockformat.Block),
140
138
base: &userView{
141
139
user: user,
142
140
cs: cs,
···
252
250
func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
253
251
return nil, fmt.Errorf("compaction not supported in non-archival")
254
252
}
253
+
254
+
func (cs *NonArchivalCarstore) HasUidCid(ctx context.Context, user models.Uid, k cid.Cid) (bool, error) {
255
+
return false, nil
256
+
}
257
+
258
+
func (cs *NonArchivalCarstore) LookupBlockRef(ctx context.Context, k cid.Cid) (path string, offset int64, user models.Uid, err error) {
259
+
return "", 0, 0, ipld.ErrNotFound{Cid: k}
260
+
}
261
+
262
+
func (cs *NonArchivalCarstore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
263
+
slice, err := blocksToCar(ctx, root, rev, blks)
264
+
if err != nil {
265
+
return nil, err
266
+
}
267
+
return slice, cs.updateLastCommit(ctx, user, rev, root)
268
+
}
+216
-163
carstore/repo_test.go
+216
-163
carstore/repo_test.go
···
6
6
"errors"
7
7
"fmt"
8
8
"io"
9
+
"log/slog"
9
10
"os"
10
11
"path/filepath"
11
12
"testing"
···
24
25
"gorm.io/gorm"
25
26
)
26
27
27
-
func testCarStore() (CarStore, func(), error) {
28
+
func testCarStore(t testing.TB) (CarStore, func(), error) {
28
29
tempdir, err := os.MkdirTemp("", "msttest-")
29
30
if err != nil {
30
31
return nil, nil, err
···
60
61
}, nil
61
62
}
62
63
64
+
func testSqliteCarStore(t testing.TB) (CarStore, func(), error) {
65
+
sqs := &SQLiteStore{}
66
+
sqs.log = slogForTest(t)
67
+
err := sqs.Open(":memory:")
68
+
if err != nil {
69
+
return nil, nil, err
70
+
}
71
+
return sqs, func() {}, nil
72
+
}
73
+
74
+
type testFactory func(t testing.TB) (CarStore, func(), error)
75
+
76
+
var backends = map[string]testFactory{
77
+
"cartore": testCarStore,
78
+
"sqlite": testSqliteCarStore,
79
+
}
80
+
63
81
func testFlatfsBs() (blockstore.Blockstore, func(), error) {
64
82
tempdir, err := os.MkdirTemp("", "msttest-")
65
83
if err != nil {
···
78
96
}, nil
79
97
}
80
98
81
-
func TestBasicOperation(t *testing.T) {
99
+
func TestBasicOperation(ot *testing.T) {
82
100
ctx := context.TODO()
83
101
84
-
cs, cleanup, err := testCarStore()
85
-
if err != nil {
86
-
t.Fatal(err)
87
-
}
88
-
defer cleanup()
102
+
for fname, tf := range backends {
103
+
ot.Run(fname, func(t *testing.T) {
89
104
90
-
ds, err := cs.NewDeltaSession(ctx, 1, nil)
91
-
if err != nil {
92
-
t.Fatal(err)
93
-
}
105
+
cs, cleanup, err := tf(t)
106
+
if err != nil {
107
+
t.Fatal(err)
108
+
}
109
+
defer cleanup()
94
110
95
-
ncid, rev, err := setupRepo(ctx, ds, false)
96
-
if err != nil {
97
-
t.Fatal(err)
98
-
}
111
+
ds, err := cs.NewDeltaSession(ctx, 1, nil)
112
+
if err != nil {
113
+
t.Fatal(err)
114
+
}
99
115
100
-
if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil {
101
-
t.Fatal(err)
102
-
}
116
+
ncid, rev, err := setupRepo(ctx, ds, false)
117
+
if err != nil {
118
+
t.Fatal(err)
119
+
}
103
120
104
-
var recs []cid.Cid
105
-
head := ncid
106
-
for i := 0; i < 10; i++ {
107
-
ds, err := cs.NewDeltaSession(ctx, 1, &rev)
108
-
if err != nil {
109
-
t.Fatal(err)
110
-
}
121
+
if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil {
122
+
t.Fatal(err)
123
+
}
111
124
112
-
rr, err := repo.OpenRepo(ctx, ds, head)
113
-
if err != nil {
114
-
t.Fatal(err)
115
-
}
125
+
var recs []cid.Cid
126
+
head := ncid
127
+
for i := 0; i < 10; i++ {
128
+
ds, err := cs.NewDeltaSession(ctx, 1, &rev)
129
+
if err != nil {
130
+
t.Fatal(err)
131
+
}
116
132
117
-
rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{
118
-
Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()),
119
-
})
120
-
if err != nil {
121
-
t.Fatal(err)
122
-
}
133
+
rr, err := repo.OpenRepo(ctx, ds, head)
134
+
if err != nil {
135
+
t.Fatal(err)
136
+
}
123
137
124
-
recs = append(recs, rc)
138
+
rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{
139
+
Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()),
140
+
})
141
+
if err != nil {
142
+
t.Fatal(err)
143
+
}
125
144
126
-
kmgr := &util.FakeKeyManager{}
127
-
nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser)
128
-
if err != nil {
129
-
t.Fatal(err)
130
-
}
145
+
recs = append(recs, rc)
131
146
132
-
rev = nrev
147
+
kmgr := &util.FakeKeyManager{}
148
+
nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser)
149
+
if err != nil {
150
+
t.Fatal(err)
151
+
}
133
152
134
-
if err := ds.CalcDiff(ctx, nil); err != nil {
135
-
t.Fatal(err)
136
-
}
153
+
rev = nrev
137
154
138
-
if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil {
139
-
t.Fatal(err)
140
-
}
155
+
if err := ds.CalcDiff(ctx, nil); err != nil {
156
+
t.Fatal(err)
157
+
}
141
158
142
-
head = nroot
143
-
}
159
+
if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil {
160
+
t.Fatal(err)
161
+
}
144
162
145
-
buf := new(bytes.Buffer)
146
-
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
147
-
t.Fatal(err)
148
-
}
149
-
checkRepo(t, cs, buf, recs)
163
+
head = nroot
164
+
}
150
165
151
-
if _, err := cs.CompactUserShards(ctx, 1, false); err != nil {
152
-
t.Fatal(err)
153
-
}
166
+
buf := new(bytes.Buffer)
167
+
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
168
+
t.Fatal(err)
169
+
}
170
+
checkRepo(t, cs, buf, recs)
154
171
155
-
buf = new(bytes.Buffer)
156
-
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
157
-
t.Fatal(err)
172
+
if _, err := cs.CompactUserShards(ctx, 1, false); err != nil {
173
+
t.Fatal(err)
174
+
}
175
+
176
+
buf = new(bytes.Buffer)
177
+
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
178
+
t.Fatal(err)
179
+
}
180
+
checkRepo(t, cs, buf, recs)
181
+
})
158
182
}
159
-
checkRepo(t, cs, buf, recs)
160
183
}
161
184
162
185
func TestRepeatedCompactions(t *testing.T) {
163
186
ctx := context.TODO()
164
187
165
-
cs, cleanup, err := testCarStore()
188
+
cs, cleanup, err := testCarStore(t)
166
189
if err != nil {
167
190
t.Fatal(err)
168
191
}
···
323
346
func BenchmarkRepoWritesCarstore(b *testing.B) {
324
347
ctx := context.TODO()
325
348
326
-
cs, cleanup, err := testCarStore()
349
+
cs, cleanup, err := testCarStore(b)
350
+
innerBenchmarkRepoWritesCarstore(b, ctx, cs, cleanup, err)
351
+
}
352
+
func BenchmarkRepoWritesSqliteCarstore(b *testing.B) {
353
+
ctx := context.TODO()
354
+
355
+
cs, cleanup, err := testSqliteCarStore(b)
356
+
innerBenchmarkRepoWritesCarstore(b, ctx, cs, cleanup, err)
357
+
}
358
+
func innerBenchmarkRepoWritesCarstore(b *testing.B, ctx context.Context, cs CarStore, cleanup func(), err error) {
327
359
if err != nil {
328
360
b.Fatal(err)
329
361
}
···
458
490
}
459
491
}
460
492
461
-
func TestDuplicateBlockAcrossShards(t *testing.T) {
493
+
func TestDuplicateBlockAcrossShards(ot *testing.T) {
462
494
ctx := context.TODO()
463
495
464
-
cs, cleanup, err := testCarStore()
465
-
if err != nil {
466
-
t.Fatal(err)
467
-
}
468
-
defer cleanup()
496
+
for fname, tf := range backends {
497
+
ot.Run(fname, func(t *testing.T) {
498
+
499
+
cs, cleanup, err := tf(t)
500
+
if err != nil {
501
+
t.Fatal(err)
502
+
}
503
+
defer cleanup()
504
+
505
+
ds1, err := cs.NewDeltaSession(ctx, 1, nil)
506
+
if err != nil {
507
+
t.Fatal(err)
508
+
}
469
509
470
-
ds1, err := cs.NewDeltaSession(ctx, 1, nil)
471
-
if err != nil {
472
-
t.Fatal(err)
473
-
}
510
+
ds2, err := cs.NewDeltaSession(ctx, 2, nil)
511
+
if err != nil {
512
+
t.Fatal(err)
513
+
}
474
514
475
-
ds2, err := cs.NewDeltaSession(ctx, 2, nil)
476
-
if err != nil {
477
-
t.Fatal(err)
478
-
}
515
+
ds3, err := cs.NewDeltaSession(ctx, 3, nil)
516
+
if err != nil {
517
+
t.Fatal(err)
518
+
}
479
519
480
-
ds3, err := cs.NewDeltaSession(ctx, 3, nil)
481
-
if err != nil {
482
-
t.Fatal(err)
483
-
}
520
+
var cids []cid.Cid
521
+
var revs []string
522
+
for _, ds := range []*DeltaSession{ds1, ds2, ds3} {
523
+
ncid, rev, err := setupRepo(ctx, ds, true)
524
+
if err != nil {
525
+
t.Fatal(err)
526
+
}
484
527
485
-
var cids []cid.Cid
486
-
var revs []string
487
-
for _, ds := range []*DeltaSession{ds1, ds2, ds3} {
488
-
ncid, rev, err := setupRepo(ctx, ds, true)
489
-
if err != nil {
490
-
t.Fatal(err)
491
-
}
528
+
if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil {
529
+
t.Fatal(err)
530
+
}
531
+
cids = append(cids, ncid)
532
+
revs = append(revs, rev)
533
+
}
492
534
493
-
if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil {
494
-
t.Fatal(err)
495
-
}
496
-
cids = append(cids, ncid)
497
-
revs = append(revs, rev)
498
-
}
535
+
var recs []cid.Cid
536
+
head := cids[1]
537
+
rev := revs[1]
538
+
for i := 0; i < 10; i++ {
539
+
ds, err := cs.NewDeltaSession(ctx, 2, &rev)
540
+
if err != nil {
541
+
t.Fatal(err)
542
+
}
499
543
500
-
var recs []cid.Cid
501
-
head := cids[1]
502
-
rev := revs[1]
503
-
for i := 0; i < 10; i++ {
504
-
ds, err := cs.NewDeltaSession(ctx, 2, &rev)
505
-
if err != nil {
506
-
t.Fatal(err)
507
-
}
544
+
rr, err := repo.OpenRepo(ctx, ds, head)
545
+
if err != nil {
546
+
t.Fatal(err)
547
+
}
508
548
509
-
rr, err := repo.OpenRepo(ctx, ds, head)
510
-
if err != nil {
511
-
t.Fatal(err)
512
-
}
549
+
rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{
550
+
Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()),
551
+
})
552
+
if err != nil {
553
+
t.Fatal(err)
554
+
}
513
555
514
-
rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{
515
-
Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()),
516
-
})
517
-
if err != nil {
518
-
t.Fatal(err)
519
-
}
556
+
recs = append(recs, rc)
520
557
521
-
recs = append(recs, rc)
558
+
kmgr := &util.FakeKeyManager{}
559
+
nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser)
560
+
if err != nil {
561
+
t.Fatal(err)
562
+
}
522
563
523
-
kmgr := &util.FakeKeyManager{}
524
-
nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser)
525
-
if err != nil {
526
-
t.Fatal(err)
527
-
}
564
+
rev = nrev
528
565
529
-
rev = nrev
566
+
if err := ds.CalcDiff(ctx, nil); err != nil {
567
+
t.Fatal(err)
568
+
}
530
569
531
-
if err := ds.CalcDiff(ctx, nil); err != nil {
532
-
t.Fatal(err)
533
-
}
570
+
if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil {
571
+
t.Fatal(err)
572
+
}
534
573
535
-
if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil {
536
-
t.Fatal(err)
537
-
}
574
+
head = nroot
575
+
}
538
576
539
-
head = nroot
540
-
}
577
+
// explicitly update the profile object
578
+
{
579
+
ds, err := cs.NewDeltaSession(ctx, 2, &rev)
580
+
if err != nil {
581
+
t.Fatal(err)
582
+
}
541
583
542
-
// explicitly update the profile object
543
-
{
544
-
ds, err := cs.NewDeltaSession(ctx, 2, &rev)
545
-
if err != nil {
546
-
t.Fatal(err)
547
-
}
584
+
rr, err := repo.OpenRepo(ctx, ds, head)
585
+
if err != nil {
586
+
t.Fatal(err)
587
+
}
548
588
549
-
rr, err := repo.OpenRepo(ctx, ds, head)
550
-
if err != nil {
551
-
t.Fatal(err)
552
-
}
589
+
desc := "this is so unique"
590
+
rc, err := rr.UpdateRecord(ctx, "app.bsky.actor.profile/self", &appbsky.ActorProfile{
591
+
Description: &desc,
592
+
})
593
+
if err != nil {
594
+
t.Fatal(err)
595
+
}
553
596
554
-
desc := "this is so unique"
555
-
rc, err := rr.UpdateRecord(ctx, "app.bsky.actor.profile/self", &appbsky.ActorProfile{
556
-
Description: &desc,
557
-
})
558
-
if err != nil {
559
-
t.Fatal(err)
560
-
}
597
+
recs = append(recs, rc)
561
598
562
-
recs = append(recs, rc)
599
+
kmgr := &util.FakeKeyManager{}
600
+
nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser)
601
+
if err != nil {
602
+
t.Fatal(err)
603
+
}
563
604
564
-
kmgr := &util.FakeKeyManager{}
565
-
nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser)
566
-
if err != nil {
567
-
t.Fatal(err)
568
-
}
605
+
rev = nrev
569
606
570
-
rev = nrev
607
+
if err := ds.CalcDiff(ctx, nil); err != nil {
608
+
t.Fatal(err)
609
+
}
571
610
572
-
if err := ds.CalcDiff(ctx, nil); err != nil {
573
-
t.Fatal(err)
574
-
}
611
+
if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil {
612
+
t.Fatal(err)
613
+
}
575
614
576
-
if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil {
577
-
t.Fatal(err)
578
-
}
615
+
head = nroot
616
+
}
579
617
580
-
head = nroot
618
+
buf := new(bytes.Buffer)
619
+
if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil {
620
+
t.Fatal(err)
621
+
}
622
+
checkRepo(t, cs, buf, recs)
623
+
})
581
624
}
625
+
}
582
626
583
-
buf := new(bytes.Buffer)
584
-
if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil {
585
-
t.Fatal(err)
627
+
type testWriter struct {
628
+
t testing.TB
629
+
}
630
+
631
+
func (tw testWriter) Write(p []byte) (n int, err error) {
632
+
tw.t.Log(string(p))
633
+
return len(p), nil
634
+
}
635
+
636
+
func slogForTest(t testing.TB) *slog.Logger {
637
+
hopts := slog.HandlerOptions{
638
+
Level: slog.LevelDebug,
586
639
}
587
-
checkRepo(t, cs, buf, recs)
640
+
return slog.New(slog.NewTextHandler(&testWriter{t}, &hopts))
588
641
}
+636
carstore/scylla.go
+636
carstore/scylla.go
···
1
+
package carstore
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"errors"
7
+
"fmt"
8
+
"github.com/bluesky-social/indigo/models"
9
+
"github.com/gocql/gocql"
10
+
blockformat "github.com/ipfs/go-block-format"
11
+
"github.com/ipfs/go-cid"
12
+
"github.com/ipfs/go-libipfs/blocks"
13
+
"github.com/ipld/go-car"
14
+
_ "github.com/mattn/go-sqlite3"
15
+
"github.com/prometheus/client_golang/prometheus"
16
+
"github.com/prometheus/client_golang/prometheus/promauto"
17
+
"go.opentelemetry.io/otel"
18
+
"go.opentelemetry.io/otel/attribute"
19
+
"io"
20
+
"log/slog"
21
+
"math"
22
+
"math/rand/v2"
23
+
"time"
24
+
)
25
+
26
+
type ScyllaStore struct {
27
+
WriteSession *gocql.Session
28
+
ReadSession *gocql.Session
29
+
30
+
// scylla servers
31
+
scyllaAddrs []string
32
+
// scylla namespace where we find our table
33
+
keyspace string
34
+
35
+
log *slog.Logger
36
+
37
+
lastShardCache lastShardCache
38
+
}
39
+
40
+
func NewScyllaStore(addrs []string, keyspace string) (*ScyllaStore, error) {
41
+
out := new(ScyllaStore)
42
+
out.scyllaAddrs = addrs
43
+
out.keyspace = keyspace
44
+
err := out.Open()
45
+
if err != nil {
46
+
return nil, err
47
+
}
48
+
return out, nil
49
+
}
50
+
51
+
func (sqs *ScyllaStore) Open() error {
52
+
if sqs.log == nil {
53
+
sqs.log = slog.Default()
54
+
}
55
+
sqs.log.Debug("scylla connect", "addrs", sqs.scyllaAddrs)
56
+
var err error
57
+
58
+
//
59
+
// Write session
60
+
//
61
+
var writeSession *gocql.Session
62
+
for retry := 0; ; retry++ {
63
+
writeCluster := gocql.NewCluster(sqs.scyllaAddrs...)
64
+
writeCluster.Keyspace = sqs.keyspace
65
+
// Default port, the client should automatically upgrade to shard-aware port
66
+
writeCluster.Port = 9042
67
+
writeCluster.Consistency = gocql.Quorum
68
+
writeCluster.RetryPolicy = &ExponentialBackoffRetryPolicy{NumRetries: 10, Min: 100 * time.Millisecond, Max: 10 * time.Second}
69
+
writeCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
70
+
writeSession, err = writeCluster.CreateSession()
71
+
if err != nil {
72
+
if retry > 200 {
73
+
return fmt.Errorf("failed to connect read session too many times: %w", err)
74
+
}
75
+
sqs.log.Error("failed to connect to ScyllaDB Read Session, retrying in 1s", "retry", retry, "err", err)
76
+
time.Sleep(delayForAttempt(retry))
77
+
continue
78
+
}
79
+
break
80
+
}
81
+
82
+
//
83
+
// Read session
84
+
//
85
+
var readSession *gocql.Session
86
+
for retry := 0; ; retry++ {
87
+
readCluster := gocql.NewCluster(sqs.scyllaAddrs...)
88
+
readCluster.Keyspace = sqs.keyspace
89
+
// Default port, the client should automatically upgrade to shard-aware port
90
+
readCluster.Port = 9042
91
+
readCluster.RetryPolicy = &ExponentialBackoffRetryPolicy{NumRetries: 5, Min: 10 * time.Millisecond, Max: 1 * time.Second}
92
+
readCluster.Consistency = gocql.One
93
+
readCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
94
+
readSession, err = readCluster.CreateSession()
95
+
if err != nil {
96
+
if retry > 200 {
97
+
return fmt.Errorf("failed to connect read session too many times: %w", err)
98
+
}
99
+
sqs.log.Error("failed to connect to ScyllaDB Read Session, retrying in 1s", "retry", retry, "err", err)
100
+
time.Sleep(delayForAttempt(retry))
101
+
continue
102
+
}
103
+
break
104
+
}
105
+
106
+
sqs.WriteSession = writeSession
107
+
sqs.ReadSession = readSession
108
+
109
+
err = sqs.createTables()
110
+
if err != nil {
111
+
return fmt.Errorf("scylla could not create tables, %w", err)
112
+
}
113
+
sqs.lastShardCache.source = sqs
114
+
sqs.lastShardCache.Init()
115
+
return nil
116
+
}
117
+
118
+
var createTableTexts = []string{
119
+
`CREATE TABLE IF NOT EXISTS blocks (uid bigint, cid blob, rev varchar, root blob, block blob, PRIMARY KEY((uid,cid)))`,
120
+
// This is the INDEX I wish we could use, but scylla can't do it so we MATERIALIZED VIEW instead
121
+
//`CREATE INDEX IF NOT EXISTS block_by_rev ON blocks (uid, rev)`,
122
+
`CREATE MATERIALIZED VIEW IF NOT EXISTS blocks_by_uidrev
123
+
AS SELECT uid, rev, cid, root
124
+
FROM blocks
125
+
WHERE uid IS NOT NULL AND rev IS NOT NULL AND cid IS NOT NULL
126
+
PRIMARY KEY ((uid), rev, cid) WITH CLUSTERING ORDER BY (rev DESC)`,
127
+
}
128
+
129
+
func (sqs *ScyllaStore) createTables() error {
130
+
for i, text := range createTableTexts {
131
+
err := sqs.WriteSession.Query(text).Exec()
132
+
if err != nil {
133
+
return fmt.Errorf("scylla create table statement [%d] %v: %w", i, text, err)
134
+
}
135
+
}
136
+
return nil
137
+
}
138
+
139
+
// writeNewShard needed for DeltaSession.CloseWithRoot
140
+
func (sqs *ScyllaStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
141
+
scWriteNewShard.Inc()
142
+
sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks))
143
+
start := time.Now()
144
+
ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard")
145
+
defer span.End()
146
+
buf := new(bytes.Buffer)
147
+
hnw, err := WriteCarHeader(buf, root)
148
+
if err != nil {
149
+
return nil, fmt.Errorf("failed to write car header: %w", err)
150
+
}
151
+
offset := hnw
152
+
153
+
dbroot := root.Bytes()
154
+
155
+
span.SetAttributes(attribute.Int("blocks", len(blks)))
156
+
157
+
for bcid, block := range blks {
158
+
// build shard for output firehose
159
+
nw, err := LdWrite(buf, bcid.Bytes(), block.RawData())
160
+
if err != nil {
161
+
return nil, fmt.Errorf("failed to write block: %w", err)
162
+
}
163
+
offset += nw
164
+
165
+
// TODO: scylla BATCH doesn't apply if the batch crosses partition keys; BUT, we may be able to send many blocks concurrently?
166
+
dbcid := bcid.Bytes()
167
+
blockbytes := block.RawData()
168
+
// we're relying on cql auto-prepare, no 'PreparedStatement'
169
+
err = sqs.WriteSession.Query(
170
+
`INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?)`,
171
+
user, dbcid, rev, dbroot, blockbytes,
172
+
).Idempotent(true).Exec()
173
+
if err != nil {
174
+
return nil, fmt.Errorf("(uid,cid) block store failed, %w", err)
175
+
}
176
+
sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes))
177
+
}
178
+
179
+
shard := CarShard{
180
+
Root: models.DbCID{CID: root},
181
+
DataStart: hnw,
182
+
Seq: seq,
183
+
Usr: user,
184
+
Rev: rev,
185
+
}
186
+
187
+
sqs.lastShardCache.put(&shard)
188
+
189
+
dt := time.Since(start).Seconds()
190
+
scWriteTimes.Observe(dt)
191
+
return buf.Bytes(), nil
192
+
}
193
+
194
+
// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache
195
+
// What we actually seem to need from this: last {Rev, Root.CID}
196
+
func (sqs *ScyllaStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) {
197
+
scGetLastShard.Inc()
198
+
var rev string
199
+
var rootb []byte
200
+
err := sqs.ReadSession.Query(`SELECT rev, root FROM blocks_by_uidrev WHERE uid = ? ORDER BY rev DESC LIMIT 1`, uid).Scan(&rev, &rootb)
201
+
if errors.Is(err, gocql.ErrNotFound) {
202
+
return nil, nil
203
+
}
204
+
if err != nil {
205
+
return nil, fmt.Errorf("last shard err, %w", err)
206
+
}
207
+
xcid, cidErr := cid.Cast(rootb)
208
+
if cidErr != nil {
209
+
return nil, fmt.Errorf("last shard bad cid, %w", cidErr)
210
+
}
211
+
return &CarShard{
212
+
Root: models.DbCID{CID: xcid},
213
+
Rev: rev,
214
+
}, nil
215
+
}
216
+
217
+
func (sqs *ScyllaStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
218
+
sqs.log.Warn("TODO: don't call compaction")
219
+
return nil, nil
220
+
}
221
+
222
+
func (sqs *ScyllaStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
223
+
sqs.log.Warn("TODO: don't call compaction targets")
224
+
return nil, nil
225
+
}
226
+
227
+
func (sqs *ScyllaStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
228
+
// TODO: same as FileCarStore; re-unify
229
+
lastShard, err := sqs.lastShardCache.get(ctx, user)
230
+
if err != nil {
231
+
return cid.Undef, err
232
+
}
233
+
if lastShard == nil {
234
+
return cid.Undef, nil
235
+
}
236
+
if lastShard.ID == 0 {
237
+
return cid.Undef, nil
238
+
}
239
+
240
+
return lastShard.Root.CID, nil
241
+
}
242
+
243
+
func (sqs *ScyllaStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
244
+
// TODO: same as FileCarStore; re-unify
245
+
lastShard, err := sqs.lastShardCache.get(ctx, user)
246
+
if err != nil {
247
+
return "", err
248
+
}
249
+
if lastShard == nil {
250
+
return "", nil
251
+
}
252
+
if lastShard.ID == 0 {
253
+
return "", nil
254
+
}
255
+
256
+
return lastShard.Rev, nil
257
+
}
258
+
259
+
func (sqs *ScyllaStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
260
+
// TODO: same as FileCarStore, re-unify
261
+
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
262
+
defer span.End()
263
+
264
+
carr, err := car.NewCarReader(bytes.NewReader(carslice))
265
+
if err != nil {
266
+
return cid.Undef, nil, err
267
+
}
268
+
269
+
if len(carr.Header.Roots) != 1 {
270
+
return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
271
+
}
272
+
273
+
ds, err := sqs.NewDeltaSession(ctx, uid, since)
274
+
if err != nil {
275
+
return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
276
+
}
277
+
278
+
var cids []cid.Cid
279
+
for {
280
+
blk, err := carr.Next()
281
+
if err != nil {
282
+
if err == io.EOF {
283
+
break
284
+
}
285
+
return cid.Undef, nil, err
286
+
}
287
+
288
+
cids = append(cids, blk.Cid())
289
+
290
+
if err := ds.Put(ctx, blk); err != nil {
291
+
return cid.Undef, nil, err
292
+
}
293
+
}
294
+
295
+
return carr.Header.Roots[0], ds, nil
296
+
}
297
+
298
+
func (sqs *ScyllaStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
299
+
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
300
+
defer span.End()
301
+
302
+
// TODO: ensure that we don't write updates on top of the wrong head
303
+
// this needs to be a compare and swap type operation
304
+
lastShard, err := sqs.lastShardCache.get(ctx, user)
305
+
if err != nil {
306
+
return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err)
307
+
}
308
+
309
+
if lastShard == nil {
310
+
lastShard = &zeroShard
311
+
}
312
+
313
+
if since != nil && *since != lastShard.Rev {
314
+
return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
315
+
}
316
+
317
+
return &DeltaSession{
318
+
blks: make(map[cid.Cid]blockformat.Block),
319
+
base: &sqliteUserView{
320
+
uid: user,
321
+
sqs: sqs,
322
+
},
323
+
user: user,
324
+
baseCid: lastShard.Root.CID,
325
+
cs: sqs,
326
+
seq: lastShard.Seq + 1,
327
+
lastRev: lastShard.Rev,
328
+
}, nil
329
+
}
330
+
331
+
func (sqs *ScyllaStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
332
+
return &DeltaSession{
333
+
base: &sqliteUserView{
334
+
uid: user,
335
+
sqs: sqs,
336
+
},
337
+
readonly: true,
338
+
user: user,
339
+
cs: sqs,
340
+
}, nil
341
+
}
342
+
343
+
// ReadUserCar
344
+
// incremental is only ever called true
345
+
func (sqs *ScyllaStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
346
+
scGetCar.Inc()
347
+
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
348
+
defer span.End()
349
+
start := time.Now()
350
+
351
+
cidchan := make(chan cid.Cid, 100)
352
+
353
+
go func() {
354
+
defer close(cidchan)
355
+
cids := sqs.ReadSession.Query(`SELECT cid FROM blocks_by_uidrev WHERE uid = ? AND rev > ? ORDER BY rev DESC`, user, sinceRev).Iter()
356
+
defer cids.Close()
357
+
for {
358
+
var cidb []byte
359
+
ok := cids.Scan(&cidb)
360
+
if !ok {
361
+
break
362
+
}
363
+
xcid, cidErr := cid.Cast(cidb)
364
+
if cidErr != nil {
365
+
sqs.log.Warn("ReadUserCar bad cid", "err", cidErr)
366
+
continue
367
+
}
368
+
cidchan <- xcid
369
+
}
370
+
}()
371
+
nblocks := 0
372
+
first := true
373
+
for xcid := range cidchan {
374
+
var xrev string
375
+
var xroot []byte
376
+
var xblock []byte
377
+
err := sqs.ReadSession.Query("SELECT rev, root, block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, xcid.Bytes()).Scan(&xrev, &xroot, &xblock)
378
+
if err != nil {
379
+
return fmt.Errorf("rcar bad read, %w", err)
380
+
}
381
+
if first {
382
+
rootCid, cidErr := cid.Cast(xroot)
383
+
if cidErr != nil {
384
+
return fmt.Errorf("rcar bad rootcid, %w", err)
385
+
}
386
+
if err := car.WriteHeader(&car.CarHeader{
387
+
Roots: []cid.Cid{rootCid},
388
+
Version: 1,
389
+
}, shardOut); err != nil {
390
+
return fmt.Errorf("rcar bad header, %w", err)
391
+
}
392
+
first = false
393
+
}
394
+
nblocks++
395
+
_, err = LdWrite(shardOut, xcid.Bytes(), xblock)
396
+
if err != nil {
397
+
return fmt.Errorf("rcar bad write, %w", err)
398
+
}
399
+
}
400
+
span.SetAttributes(attribute.Int("blocks", nblocks))
401
+
sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev)
402
+
scReadCarTimes.Observe(time.Since(start).Seconds())
403
+
return nil
404
+
}
405
+
406
+
// Stat is only used in a debugging admin handler
407
+
// don't bother implementing it (for now?)
408
+
func (sqs *ScyllaStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
409
+
sqs.log.Warn("Stat debugging method not implemented for sqlite store")
410
+
return nil, nil
411
+
}
412
+
413
+
func (sqs *ScyllaStore) WipeUserData(ctx context.Context, user models.Uid) error {
414
+
ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData")
415
+
defer span.End()
416
+
417
+
// LOL, can't do this if primary key is (uid,cid) because that's hashed with no scan!
418
+
//err := sqs.WriteSession.Query("DELETE FROM blocks WHERE uid = ?", user).Exec()
419
+
420
+
cidchan := make(chan cid.Cid, 100)
421
+
422
+
go func() {
423
+
defer close(cidchan)
424
+
cids := sqs.ReadSession.Query(`SELECT cid FROM blocks_by_uidrev WHERE uid = ?`, user).Iter()
425
+
defer cids.Close()
426
+
for {
427
+
var cidb []byte
428
+
ok := cids.Scan(&cidb)
429
+
if !ok {
430
+
break
431
+
}
432
+
xcid, cidErr := cid.Cast(cidb)
433
+
if cidErr != nil {
434
+
sqs.log.Warn("ReadUserCar bad cid", "err", cidErr)
435
+
continue
436
+
}
437
+
cidchan <- xcid
438
+
}
439
+
}()
440
+
nblocks := 0
441
+
errcount := 0
442
+
for xcid := range cidchan {
443
+
err := sqs.ReadSession.Query("DELETE FROM blocks WHERE uid = ? AND cid = ?", user, xcid.Bytes()).Exec()
444
+
if err != nil {
445
+
sqs.log.Warn("ReadUserCar bad delete, %w", err)
446
+
errcount++
447
+
if errcount > 10 {
448
+
return err
449
+
}
450
+
}
451
+
nblocks++
452
+
}
453
+
scUsersWiped.Inc()
454
+
scBlocksDeleted.Add(float64(nblocks))
455
+
return nil
456
+
}
457
+
458
+
// HasUidCid needed for NewDeltaSession userView
459
+
func (sqs *ScyllaStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) {
460
+
// TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
461
+
scHas.Inc()
462
+
var rev string
463
+
var rootb []byte
464
+
err := sqs.ReadSession.Query(`SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1`, user, bcid.Bytes()).Scan(&rev, &rootb)
465
+
if err != nil {
466
+
return false, fmt.Errorf("hasUC bad scan, %w", err)
467
+
}
468
+
return true, nil
469
+
}
470
+
471
+
func (sqs *ScyllaStore) CarStore() CarStore {
472
+
return sqs
473
+
}
474
+
475
+
func (sqs *ScyllaStore) Close() error {
476
+
sqs.WriteSession.Close()
477
+
sqs.ReadSession.Close()
478
+
return nil
479
+
}
480
+
481
+
func (sqs *ScyllaStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) {
482
+
// TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
483
+
scGetBlock.Inc()
484
+
start := time.Now()
485
+
var blockb []byte
486
+
err := sqs.ReadSession.Query("SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, bcid.Bytes()).Scan(&blockb)
487
+
if err != nil {
488
+
return nil, fmt.Errorf("getb err, %w", err)
489
+
}
490
+
dt := time.Since(start)
491
+
scGetTimes.Observe(dt.Seconds())
492
+
return blocks.NewBlock(blockb), nil
493
+
}
494
+
495
+
func (sqs *ScyllaStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) {
496
+
// TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
497
+
scGetBlockSize.Inc()
498
+
var out int64
499
+
err := sqs.ReadSession.Query("SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, bcid.Bytes()).Scan(&out)
500
+
if err != nil {
501
+
return 0, fmt.Errorf("getbs err, %w", err)
502
+
}
503
+
return out, nil
504
+
}
505
+
506
+
var scUsersWiped = promauto.NewCounter(prometheus.CounterOpts{
507
+
Name: "bgs_sc_users_wiped",
508
+
Help: "User rows deleted in scylla backend",
509
+
})
510
+
511
+
var scBlocksDeleted = promauto.NewCounter(prometheus.CounterOpts{
512
+
Name: "bgs_sc_blocks_deleted",
513
+
Help: "User blocks deleted in scylla backend",
514
+
})
515
+
516
+
var scGetBlock = promauto.NewCounter(prometheus.CounterOpts{
517
+
Name: "bgs_sc_get_block",
518
+
Help: "get block scylla backend",
519
+
})
520
+
521
+
var scGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{
522
+
Name: "bgs_sc_get_block_size",
523
+
Help: "get block size scylla backend",
524
+
})
525
+
526
+
var scGetCar = promauto.NewCounter(prometheus.CounterOpts{
527
+
Name: "bgs_sc_get_car",
528
+
Help: "get block scylla backend",
529
+
})
530
+
531
+
var scHas = promauto.NewCounter(prometheus.CounterOpts{
532
+
Name: "bgs_sc_has",
533
+
Help: "check block presence scylla backend",
534
+
})
535
+
536
+
var scGetLastShard = promauto.NewCounter(prometheus.CounterOpts{
537
+
Name: "bgs_sc_get_last_shard",
538
+
Help: "get last shard scylla backend",
539
+
})
540
+
541
+
var scWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{
542
+
Name: "bgs_sc_write_shard",
543
+
Help: "write shard blocks scylla backend",
544
+
})
545
+
546
+
var timeBuckets []float64
547
+
var scWriteTimes prometheus.Histogram
548
+
var scGetTimes prometheus.Histogram
549
+
var scReadCarTimes prometheus.Histogram
550
+
551
+
func init() {
552
+
timeBuckets = make([]float64, 1, 20)
553
+
timeBuckets[0] = 0.000_0100
554
+
i := 0
555
+
for timeBuckets[i] < 1 && len(timeBuckets) < 20 {
556
+
timeBuckets = append(timeBuckets, timeBuckets[i]*2)
557
+
i++
558
+
}
559
+
scWriteTimes = promauto.NewHistogram(prometheus.HistogramOpts{
560
+
Name: "bgs_sc_write_times",
561
+
Buckets: timeBuckets,
562
+
})
563
+
scGetTimes = promauto.NewHistogram(prometheus.HistogramOpts{
564
+
Name: "bgs_sc_get_times",
565
+
Buckets: timeBuckets,
566
+
})
567
+
scReadCarTimes = promauto.NewHistogram(prometheus.HistogramOpts{
568
+
Name: "bgs_sc_readcar_times",
569
+
Buckets: timeBuckets,
570
+
})
571
+
}
572
+
573
+
// TODO: copied from tango, re-unify?
574
+
// ExponentialBackoffRetryPolicy sleeps between attempts
575
+
type ExponentialBackoffRetryPolicy struct {
576
+
NumRetries int
577
+
Min, Max time.Duration
578
+
}
579
+
580
+
func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration {
581
+
return getExponentialTime(e.Min, e.Max, attempts)
582
+
}
583
+
584
+
func (e *ExponentialBackoffRetryPolicy) Attempt(q gocql.RetryableQuery) bool {
585
+
if q.Attempts() > e.NumRetries {
586
+
return false
587
+
}
588
+
time.Sleep(e.napTime(q.Attempts()))
589
+
return true
590
+
}
591
+
592
+
// used to calculate exponentially growing time
593
+
func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration {
594
+
if min <= 0 {
595
+
min = 100 * time.Millisecond
596
+
}
597
+
if max <= 0 {
598
+
max = 10 * time.Second
599
+
}
600
+
minFloat := float64(min)
601
+
napDuration := minFloat * math.Pow(2, float64(attempts-1))
602
+
// add some jitter
603
+
napDuration += rand.Float64()*minFloat - (minFloat / 2)
604
+
if napDuration > float64(max) {
605
+
return time.Duration(max)
606
+
}
607
+
return time.Duration(napDuration)
608
+
}
609
+
610
+
// GetRetryType returns the retry type for the given error
611
+
func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) gocql.RetryType {
612
+
// Retry timeouts and/or contention errors on the same host
613
+
if errors.Is(err, gocql.ErrTimeoutNoResponse) ||
614
+
errors.Is(err, gocql.ErrNoStreams) ||
615
+
errors.Is(err, gocql.ErrTooManyTimeouts) {
616
+
return gocql.Retry
617
+
}
618
+
619
+
// Retry next host on unavailable errors
620
+
if errors.Is(err, gocql.ErrUnavailable) ||
621
+
errors.Is(err, gocql.ErrConnectionClosed) ||
622
+
errors.Is(err, gocql.ErrSessionClosed) {
623
+
return gocql.RetryNextHost
624
+
}
625
+
626
+
// Otherwise don't retry
627
+
return gocql.Rethrow
628
+
}
629
+
630
+
func delayForAttempt(attempt int) time.Duration {
631
+
if attempt < 50 {
632
+
return time.Millisecond * 5
633
+
}
634
+
635
+
return time.Second
636
+
}
+576
carstore/sqlite_store.go
+576
carstore/sqlite_store.go
···
1
+
package carstore
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"database/sql"
7
+
"errors"
8
+
"fmt"
9
+
"go.opentelemetry.io/otel/attribute"
10
+
"io"
11
+
"log/slog"
12
+
"os"
13
+
"path/filepath"
14
+
15
+
"github.com/bluesky-social/indigo/models"
16
+
blockformat "github.com/ipfs/go-block-format"
17
+
"github.com/ipfs/go-cid"
18
+
"github.com/ipfs/go-libipfs/blocks"
19
+
"github.com/ipld/go-car"
20
+
_ "github.com/mattn/go-sqlite3"
21
+
"github.com/prometheus/client_golang/prometheus"
22
+
"github.com/prometheus/client_golang/prometheus/promauto"
23
+
"go.opentelemetry.io/otel"
24
+
)
25
+
26
+
// var log = logging.Logger("sqstore")
27
+
28
+
type SQLiteStore struct {
29
+
dbPath string
30
+
db *sql.DB
31
+
32
+
log *slog.Logger
33
+
34
+
lastShardCache lastShardCache
35
+
}
36
+
37
+
func ensureDir(path string) error {
38
+
fi, err := os.Stat(path)
39
+
if err != nil {
40
+
if os.IsNotExist(err) {
41
+
return os.MkdirAll(path, 0755)
42
+
}
43
+
return err
44
+
}
45
+
if fi.IsDir() {
46
+
return nil
47
+
}
48
+
return fmt.Errorf("%s exists but is not a directory", path)
49
+
}
50
+
51
+
func NewSqliteStore(csdir string) (*SQLiteStore, error) {
52
+
if err := ensureDir(csdir); err != nil {
53
+
return nil, err
54
+
}
55
+
dbpath := filepath.Join(csdir, "db.sqlite3")
56
+
out := new(SQLiteStore)
57
+
err := out.Open(dbpath)
58
+
if err != nil {
59
+
return nil, err
60
+
}
61
+
return out, nil
62
+
}
63
+
64
+
func (sqs *SQLiteStore) Open(path string) error {
65
+
if sqs.log == nil {
66
+
sqs.log = slog.Default()
67
+
}
68
+
sqs.log.Debug("open db", "path", path)
69
+
db, err := sql.Open("sqlite3", path)
70
+
if err != nil {
71
+
return fmt.Errorf("%s: sqlite could not open, %w", path, err)
72
+
}
73
+
sqs.db = db
74
+
sqs.dbPath = path
75
+
err = sqs.createTables()
76
+
if err != nil {
77
+
return fmt.Errorf("%s: sqlite could not create tables, %w", path, err)
78
+
}
79
+
sqs.lastShardCache.source = sqs
80
+
sqs.lastShardCache.Init()
81
+
return nil
82
+
}
83
+
84
+
func (sqs *SQLiteStore) createTables() error {
85
+
tx, err := sqs.db.Begin()
86
+
if err != nil {
87
+
return err
88
+
}
89
+
defer tx.Rollback()
90
+
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));")
91
+
if err != nil {
92
+
return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err)
93
+
}
94
+
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)")
95
+
if err != nil {
96
+
return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err)
97
+
}
98
+
return tx.Commit()
99
+
}
100
+
101
+
// writeNewShard needed for DeltaSession.CloseWithRoot
102
+
func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
103
+
sqWriteNewShard.Inc()
104
+
sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks))
105
+
ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard")
106
+
defer span.End()
107
+
// this is "write many blocks", "write one block" is above in putBlock(). keep them in sync.
108
+
buf := new(bytes.Buffer)
109
+
hnw, err := WriteCarHeader(buf, root)
110
+
if err != nil {
111
+
return nil, fmt.Errorf("failed to write car header: %w", err)
112
+
}
113
+
offset := hnw
114
+
115
+
tx, err := sqs.db.BeginTx(ctx, nil)
116
+
if err != nil {
117
+
return nil, fmt.Errorf("bad block insert tx, %w", err)
118
+
}
119
+
defer tx.Rollback()
120
+
insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block")
121
+
if err != nil {
122
+
return nil, fmt.Errorf("bad block insert sql, %w", err)
123
+
}
124
+
defer insertStatement.Close()
125
+
126
+
dbroot := models.DbCID{CID: root}
127
+
128
+
span.SetAttributes(attribute.Int("blocks", len(blks)))
129
+
130
+
for bcid, block := range blks {
131
+
// build shard for output firehose
132
+
nw, err := LdWrite(buf, bcid.Bytes(), block.RawData())
133
+
if err != nil {
134
+
return nil, fmt.Errorf("failed to write block: %w", err)
135
+
}
136
+
offset += nw
137
+
138
+
// TODO: better databases have an insert-many option for a prepared statement
139
+
dbcid := models.DbCID{CID: bcid}
140
+
blockbytes := block.RawData()
141
+
_, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes)
142
+
if err != nil {
143
+
return nil, fmt.Errorf("(uid,cid) block store failed, %w", err)
144
+
}
145
+
sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes))
146
+
}
147
+
err = tx.Commit()
148
+
if err != nil {
149
+
return nil, fmt.Errorf("bad block insert commit, %w", err)
150
+
}
151
+
152
+
shard := CarShard{
153
+
Root: models.DbCID{CID: root},
154
+
DataStart: hnw,
155
+
Seq: seq,
156
+
Usr: user,
157
+
Rev: rev,
158
+
}
159
+
160
+
sqs.lastShardCache.put(&shard)
161
+
162
+
return buf.Bytes(), nil
163
+
}
164
+
165
+
var ErrNothingThere = errors.New("nothing to read)")
166
+
167
+
// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache
168
+
// What we actually seem to need from this: last {Rev, Root.CID}
169
+
func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) {
170
+
sqGetLastShard.Inc()
171
+
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
172
+
if err != nil {
173
+
return nil, fmt.Errorf("bad last shard tx, %w", err)
174
+
}
175
+
defer tx.Rollback()
176
+
qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1")
177
+
if err != nil {
178
+
return nil, fmt.Errorf("bad last shard sql, %w", err)
179
+
}
180
+
rows, err := qstmt.QueryContext(ctx, uid)
181
+
if err != nil {
182
+
return nil, fmt.Errorf("last shard err, %w", err)
183
+
}
184
+
if rows.Next() {
185
+
var rev string
186
+
var rootb models.DbCID
187
+
err = rows.Scan(&rev, &rootb)
188
+
if err != nil {
189
+
return nil, fmt.Errorf("last shard bad scan, %w", err)
190
+
}
191
+
return &CarShard{
192
+
Root: rootb,
193
+
Rev: rev,
194
+
}, nil
195
+
}
196
+
return nil, nil
197
+
}
198
+
199
+
func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
200
+
sqs.log.Warn("TODO: don't call compaction")
201
+
return nil, nil
202
+
}
203
+
204
+
func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
205
+
sqs.log.Warn("TODO: don't call compaction targets")
206
+
return nil, nil
207
+
}
208
+
209
+
func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
210
+
// TODO: same as FileCarStore; re-unify
211
+
lastShard, err := sqs.lastShardCache.get(ctx, user)
212
+
if err != nil {
213
+
return cid.Undef, err
214
+
}
215
+
if lastShard == nil {
216
+
return cid.Undef, nil
217
+
}
218
+
if lastShard.ID == 0 {
219
+
return cid.Undef, nil
220
+
}
221
+
222
+
return lastShard.Root.CID, nil
223
+
}
224
+
225
+
func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
226
+
// TODO: same as FileCarStore; re-unify
227
+
lastShard, err := sqs.lastShardCache.get(ctx, user)
228
+
if err != nil {
229
+
return "", err
230
+
}
231
+
if lastShard == nil {
232
+
return "", nil
233
+
}
234
+
if lastShard.ID == 0 {
235
+
return "", nil
236
+
}
237
+
238
+
return lastShard.Rev, nil
239
+
}
240
+
241
+
func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
242
+
// TODO: same as FileCarStore, re-unify
243
+
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
244
+
defer span.End()
245
+
246
+
carr, err := car.NewCarReader(bytes.NewReader(carslice))
247
+
if err != nil {
248
+
return cid.Undef, nil, err
249
+
}
250
+
251
+
if len(carr.Header.Roots) != 1 {
252
+
return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
253
+
}
254
+
255
+
ds, err := sqs.NewDeltaSession(ctx, uid, since)
256
+
if err != nil {
257
+
return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
258
+
}
259
+
260
+
var cids []cid.Cid
261
+
for {
262
+
blk, err := carr.Next()
263
+
if err != nil {
264
+
if err == io.EOF {
265
+
break
266
+
}
267
+
return cid.Undef, nil, err
268
+
}
269
+
270
+
cids = append(cids, blk.Cid())
271
+
272
+
if err := ds.Put(ctx, blk); err != nil {
273
+
return cid.Undef, nil, err
274
+
}
275
+
}
276
+
277
+
return carr.Header.Roots[0], ds, nil
278
+
}
279
+
280
+
var zeroShard CarShard
281
+
282
+
func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
283
+
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
284
+
defer span.End()
285
+
286
+
// TODO: ensure that we don't write updates on top of the wrong head
287
+
// this needs to be a compare and swap type operation
288
+
lastShard, err := sqs.lastShardCache.get(ctx, user)
289
+
if err != nil {
290
+
return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err)
291
+
}
292
+
293
+
if lastShard == nil {
294
+
lastShard = &zeroShard
295
+
}
296
+
297
+
if since != nil && *since != lastShard.Rev {
298
+
return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
299
+
}
300
+
301
+
return &DeltaSession{
302
+
blks: make(map[cid.Cid]blockformat.Block),
303
+
base: &sqliteUserView{
304
+
uid: user,
305
+
sqs: sqs,
306
+
},
307
+
user: user,
308
+
baseCid: lastShard.Root.CID,
309
+
cs: sqs,
310
+
seq: lastShard.Seq + 1,
311
+
lastRev: lastShard.Rev,
312
+
}, nil
313
+
}
314
+
315
+
func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
316
+
return &DeltaSession{
317
+
base: &sqliteUserView{
318
+
uid: user,
319
+
sqs: sqs,
320
+
},
321
+
readonly: true,
322
+
user: user,
323
+
cs: sqs,
324
+
}, nil
325
+
}
326
+
327
+
type cartmp struct {
328
+
xcid cid.Cid
329
+
rev string
330
+
root string
331
+
block []byte
332
+
}
333
+
334
+
// ReadUserCar
335
+
// incremental is only ever called true
336
+
func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
337
+
sqGetCar.Inc()
338
+
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
339
+
defer span.End()
340
+
341
+
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
342
+
if err != nil {
343
+
return fmt.Errorf("rcar tx, %w", err)
344
+
}
345
+
defer tx.Rollback()
346
+
qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC")
347
+
if err != nil {
348
+
return fmt.Errorf("rcar sql, %w", err)
349
+
}
350
+
defer qstmt.Close()
351
+
rows, err := qstmt.QueryContext(ctx, user, sinceRev)
352
+
if err != nil {
353
+
return fmt.Errorf("rcar err, %w", err)
354
+
}
355
+
nblocks := 0
356
+
first := true
357
+
for rows.Next() {
358
+
var xcid models.DbCID
359
+
var xrev string
360
+
var xroot models.DbCID
361
+
var xblock []byte
362
+
err = rows.Scan(&xcid, &xrev, &xroot, &xblock)
363
+
if err != nil {
364
+
return fmt.Errorf("rcar bad scan, %w", err)
365
+
}
366
+
if first {
367
+
if err := car.WriteHeader(&car.CarHeader{
368
+
Roots: []cid.Cid{xroot.CID},
369
+
Version: 1,
370
+
}, shardOut); err != nil {
371
+
return fmt.Errorf("rcar bad header, %w", err)
372
+
}
373
+
first = false
374
+
}
375
+
nblocks++
376
+
_, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock)
377
+
if err != nil {
378
+
return fmt.Errorf("rcar bad write, %w", err)
379
+
}
380
+
}
381
+
sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev)
382
+
return nil
383
+
}
384
+
385
+
// Stat is only used in a debugging admin handler
386
+
// don't bother implementing it (for now?)
387
+
func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
388
+
sqs.log.Warn("Stat debugging method not implemented for sqlite store")
389
+
return nil, nil
390
+
}
391
+
392
+
func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error {
393
+
ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData")
394
+
defer span.End()
395
+
tx, err := sqs.db.BeginTx(ctx, nil)
396
+
if err != nil {
397
+
return fmt.Errorf("wipe tx, %w", err)
398
+
}
399
+
defer tx.Rollback()
400
+
deleteResult, err := tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user)
401
+
nrows, ierr := deleteResult.RowsAffected()
402
+
if ierr == nil {
403
+
sqRowsDeleted.Add(float64(nrows))
404
+
}
405
+
if err == nil {
406
+
err = ierr
407
+
}
408
+
if err == nil {
409
+
err = tx.Commit()
410
+
}
411
+
return err
412
+
}
413
+
414
+
var txReadOnly = sql.TxOptions{ReadOnly: true}
415
+
416
+
// HasUidCid needed for NewDeltaSession userView
417
+
func (sqs *SQLiteStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) {
418
+
// TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
419
+
sqHas.Inc()
420
+
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
421
+
if err != nil {
422
+
return false, fmt.Errorf("hasUC tx, %w", err)
423
+
}
424
+
defer tx.Rollback()
425
+
qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
426
+
if err != nil {
427
+
return false, fmt.Errorf("hasUC sql, %w", err)
428
+
}
429
+
defer qstmt.Close()
430
+
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
431
+
if err != nil {
432
+
return false, fmt.Errorf("hasUC err, %w", err)
433
+
}
434
+
if rows.Next() {
435
+
var rev string
436
+
var rootb models.DbCID
437
+
err = rows.Scan(&rev, &rootb)
438
+
if err != nil {
439
+
return false, fmt.Errorf("hasUC bad scan, %w", err)
440
+
}
441
+
return true, nil
442
+
}
443
+
return false, nil
444
+
}
445
+
446
+
func (sqs *SQLiteStore) CarStore() CarStore {
447
+
return sqs
448
+
}
449
+
450
+
func (sqs *SQLiteStore) Close() error {
451
+
return sqs.db.Close()
452
+
}
453
+
454
+
func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) {
455
+
// TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
456
+
sqGetBlock.Inc()
457
+
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
458
+
if err != nil {
459
+
return nil, fmt.Errorf("getb tx, %w", err)
460
+
}
461
+
defer tx.Rollback()
462
+
qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
463
+
if err != nil {
464
+
return nil, fmt.Errorf("getb sql, %w", err)
465
+
}
466
+
defer qstmt.Close()
467
+
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
468
+
if err != nil {
469
+
return nil, fmt.Errorf("getb err, %w", err)
470
+
}
471
+
if rows.Next() {
472
+
//var rev string
473
+
//var rootb models.DbCID
474
+
var blockb []byte
475
+
err = rows.Scan(&blockb)
476
+
if err != nil {
477
+
return nil, fmt.Errorf("getb bad scan, %w", err)
478
+
}
479
+
return blocks.NewBlock(blockb), nil
480
+
}
481
+
return nil, ErrNothingThere
482
+
}
483
+
484
+
func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) {
485
+
// TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData
486
+
sqGetBlockSize.Inc()
487
+
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
488
+
if err != nil {
489
+
return 0, fmt.Errorf("getbs tx, %w", err)
490
+
}
491
+
defer tx.Rollback()
492
+
qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
493
+
if err != nil {
494
+
return 0, fmt.Errorf("getbs sql, %w", err)
495
+
}
496
+
defer qstmt.Close()
497
+
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
498
+
if err != nil {
499
+
return 0, fmt.Errorf("getbs err, %w", err)
500
+
}
501
+
if rows.Next() {
502
+
var out int64
503
+
err = rows.Scan(&out)
504
+
if err != nil {
505
+
return 0, fmt.Errorf("getbs bad scan, %w", err)
506
+
}
507
+
return out, nil
508
+
}
509
+
return 0, nil
510
+
}
511
+
512
+
type sqliteUserViewInner interface {
513
+
HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)
514
+
getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error)
515
+
getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error)
516
+
}
517
+
518
+
// TODO: rename, used by both sqlite and scylla
519
+
type sqliteUserView struct {
520
+
sqs sqliteUserViewInner
521
+
uid models.Uid
522
+
}
523
+
524
+
func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) {
525
+
// TODO: cache block metadata?
526
+
return s.sqs.HasUidCid(ctx, s.uid, c)
527
+
}
528
+
529
+
func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
530
+
// TODO: cache blocks?
531
+
return s.sqs.getBlock(ctx, s.uid, c)
532
+
}
533
+
534
+
func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) {
535
+
// TODO: cache block metadata?
536
+
bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c)
537
+
return int(bigsize), err
538
+
}
539
+
540
+
// ensure we implement the interface
541
+
var _ minBlockstore = (*sqliteUserView)(nil)
542
+
543
+
var sqRowsDeleted = promauto.NewCounter(prometheus.CounterOpts{
544
+
Name: "bgs_sq_rows_deleted",
545
+
Help: "User rows deleted in sqlite backend",
546
+
})
547
+
548
+
var sqGetBlock = promauto.NewCounter(prometheus.CounterOpts{
549
+
Name: "bgs_sq_get_block",
550
+
Help: "get block sqlite backend",
551
+
})
552
+
553
+
var sqGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{
554
+
Name: "bgs_sq_get_block_size",
555
+
Help: "get block size sqlite backend",
556
+
})
557
+
558
+
var sqGetCar = promauto.NewCounter(prometheus.CounterOpts{
559
+
Name: "bgs_sq_get_car",
560
+
Help: "get block sqlite backend",
561
+
})
562
+
563
+
var sqHas = promauto.NewCounter(prometheus.CounterOpts{
564
+
Name: "bgs_sq_has",
565
+
Help: "check block presence sqlite backend",
566
+
})
567
+
568
+
var sqGetLastShard = promauto.NewCounter(prometheus.CounterOpts{
569
+
Name: "bgs_sq_get_last_shard",
570
+
Help: "get last shard sqlite backend",
571
+
})
572
+
573
+
var sqWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{
574
+
Name: "bgs_sq_write_shard",
575
+
Help: "write shard blocks sqlite backend",
576
+
})
+54
-27
cmd/bigsky/main.go
+54
-27
cmd/bigsky/main.go
···
217
217
EnvVars: []string{"RELAY_NEXT_CRAWLER"},
218
218
},
219
219
&cli.BoolFlag{
220
+
Name: "ex-sqlite-carstore",
221
+
Usage: "enable experimental sqlite carstore",
222
+
Value: false,
223
+
},
224
+
&cli.StringSliceFlag{
225
+
Name: "scylla-carstore",
226
+
Usage: "scylla server addresses for storage backend, comma separated",
227
+
Value: &cli.StringSlice{},
228
+
EnvVars: []string{"RELAY_SCYLLA_NODES"},
229
+
},
230
+
&cli.BoolFlag{
220
231
Name: "non-archival",
221
232
EnvVars: []string{"RELAY_NON_ARCHIVAL"},
222
233
Value: false,
···
316
327
return err
317
328
}
318
329
319
-
slog.Info("setting up main database")
320
330
dburl := cctx.String("db-url")
331
+
slog.Info("setting up main database", "url", dburl)
321
332
db, err := cliutil.SetupDatabase(dburl, cctx.Int("max-metadb-connections"))
322
333
if err != nil {
323
334
return err
324
335
}
325
-
326
-
slog.Info("setting up carstore database")
327
-
csdburl := cctx.String("carstore-db-url")
328
-
csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections"))
329
-
if err != nil {
330
-
return err
331
-
}
332
-
333
336
if cctx.Bool("db-tracing") {
334
337
if err := db.Use(tracing.NewPlugin()); err != nil {
335
338
return err
336
339
}
337
-
if err := csdb.Use(tracing.NewPlugin()); err != nil {
338
-
return err
339
-
}
340
340
}
341
341
342
-
csdirs := []string{csdir}
343
-
if paramDirs := cctx.StringSlice("carstore-shard-dirs"); len(paramDirs) > 0 {
344
-
csdirs = paramDirs
345
-
}
346
-
347
-
for _, csd := range csdirs {
348
-
if err := os.MkdirAll(filepath.Dir(csd), os.ModePerm); err != nil {
342
+
var cstore carstore.CarStore
343
+
scyllaAddrs := cctx.StringSlice("scylla-carstore")
344
+
sqliteStore := cctx.Bool("ex-sqlite-carstore")
345
+
if len(scyllaAddrs) != 0 {
346
+
slog.Info("starting scylla carstore", "addrs", scyllaAddrs)
347
+
cstore, err = carstore.NewScyllaStore(scyllaAddrs, "cs")
348
+
} else if sqliteStore {
349
+
slog.Info("starting sqlite carstore", "dir", csdir)
350
+
cstore, err = carstore.NewSqliteStore(csdir)
351
+
} else if cctx.Bool("non-archival") {
352
+
csdburl := cctx.String("carstore-db-url")
353
+
slog.Info("setting up non-archival carstore database", "url", csdburl)
354
+
csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections"))
355
+
if err != nil {
349
356
return err
350
357
}
351
-
}
352
-
353
-
var cstore carstore.CarStore
354
-
355
-
if cctx.Bool("non-archival") {
358
+
if cctx.Bool("db-tracing") {
359
+
if err := csdb.Use(tracing.NewPlugin()); err != nil {
360
+
return err
361
+
}
362
+
}
356
363
cs, err := carstore.NewNonArchivalCarstore(csdb)
357
364
if err != nil {
358
365
return err
359
366
}
360
-
361
367
cstore = cs
362
368
} else {
363
-
cs, err := carstore.NewCarStore(csdb, csdirs)
369
+
// make standard FileCarStore
370
+
csdburl := cctx.String("carstore-db-url")
371
+
slog.Info("setting up carstore database", "url", csdburl)
372
+
csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections"))
364
373
if err != nil {
365
374
return err
366
375
}
376
+
if cctx.Bool("db-tracing") {
377
+
if err := csdb.Use(tracing.NewPlugin()); err != nil {
378
+
return err
379
+
}
380
+
}
381
+
csdirs := []string{csdir}
382
+
if paramDirs := cctx.StringSlice("carstore-shard-dirs"); len(paramDirs) > 0 {
383
+
csdirs = paramDirs
384
+
}
367
385
368
-
cstore = cs
386
+
for _, csd := range csdirs {
387
+
if err := os.MkdirAll(filepath.Dir(csd), os.ModePerm); err != nil {
388
+
return err
389
+
}
390
+
}
391
+
cstore, err = carstore.NewCarStore(csdb, csdirs)
392
+
}
393
+
394
+
if err != nil {
395
+
return err
369
396
}
370
397
371
398
// DID RESOLUTION
+2
-2
cmd/gosky/debug.go
+2
-2
cmd/gosky/debug.go
···
885
885
886
886
rep1, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo1bytes))
887
887
if err != nil {
888
-
logger.Error("reading repo", "err", err)
888
+
logger.Error("reading repo", "err", err, "bytes", len(repo1bytes))
889
889
os.Exit(1)
890
890
return
891
891
}
···
904
904
905
905
rep2, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo2bytes))
906
906
if err != nil {
907
-
logger.Error("reading repo", "err", err)
907
+
logger.Error("reading repo", "err", err, "bytes", len(repo2bytes))
908
908
os.Exit(1)
909
909
return
910
910
}
+6
-1
go.mod
+6
-1
go.mod
···
16
16
github.com/flosch/pongo2/v6 v6.0.0
17
17
github.com/go-redis/cache/v9 v9.0.0
18
18
github.com/goccy/go-json v0.10.2
19
+
github.com/gocql/gocql v0.0.0-00010101000000-000000000000
19
20
github.com/golang-jwt/jwt v3.2.2+incompatible
20
21
github.com/gorilla/websocket v1.5.1
21
22
github.com/hashicorp/go-retryablehttp v0.7.5
···
90
91
github.com/getsentry/sentry-go v0.27.0 // indirect
91
92
github.com/go-redis/redis v6.15.9+incompatible // indirect
92
93
github.com/golang/snappy v0.0.4 // indirect
94
+
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
93
95
github.com/hashicorp/golang-lru v1.0.2 // indirect
94
96
github.com/ipfs/go-log v1.0.5 // indirect
95
97
github.com/jackc/puddle/v2 v2.2.1 // indirect
···
106
108
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
107
109
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
108
110
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
111
+
gopkg.in/inf.v0 v0.9.1 // indirect
109
112
)
110
113
111
114
require (
···
152
155
github.com/lestrrat-go/option v1.0.1 // indirect
153
156
github.com/mattn/go-colorable v0.1.13 // indirect
154
157
github.com/mattn/go-isatty v0.0.20 // indirect
155
-
github.com/mattn/go-sqlite3 v1.14.22 // indirect
158
+
github.com/mattn/go-sqlite3 v1.14.22
156
159
github.com/multiformats/go-base32 v0.1.0 // indirect
157
160
github.com/multiformats/go-base36 v0.2.0 // indirect
158
161
github.com/multiformats/go-multibase v0.2.0 // indirect
···
188
191
gopkg.in/yaml.v3 v3.0.1 // indirect
189
192
lukechampine.com/blake3 v1.2.1 // indirect
190
193
)
194
+
195
+
replace github.com/gocql/gocql => github.com/scylladb/gocql v1.14.4
+14
go.sum
+14
go.sum
···
73
73
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
74
74
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
75
75
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
76
+
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
77
+
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
78
+
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
79
+
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
76
80
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous=
77
81
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c=
78
82
github.com/brianvoe/gofakeit/v6 v6.25.0 h1:ZpFjktOpLZUeF8q223o0rUuXtA+m5qW5srjvVi+JkXk=
···
211
215
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
212
216
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
213
217
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
218
+
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
214
219
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
215
220
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
216
221
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
···
255
260
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
256
261
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U=
257
262
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y=
263
+
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
264
+
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
258
265
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
259
266
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
260
267
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
···
592
599
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
593
600
github.com/samber/slog-echo v1.8.0 h1:DQQRtAliSvQw+ScEdu5gv3jbHu9cCTzvHuTD8GDv7zI=
594
601
github.com/samber/slog-echo v1.8.0/go.mod h1:0ab2AwcciQXNAXEcjkHwD9okOh9vEHEYn8xP97ocuhM=
602
+
github.com/scylladb/gocql v1.14.4 h1:MhevwCfyAraQ6RvZYFO3pF4Lt0YhvQlfg8Eo2HEqVQA=
603
+
github.com/scylladb/gocql v1.14.4/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0=
595
604
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
596
605
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
597
606
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
···
808
817
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
809
818
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
810
819
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
820
+
golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
811
821
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
812
822
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
813
823
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
···
1092
1102
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
1093
1103
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
1094
1104
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
1105
+
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
1106
+
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
1095
1107
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
1096
1108
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
1097
1109
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
···
1126
1138
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
1127
1139
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
1128
1140
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
1141
+
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
1142
+
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
+14
-6
indexer/crawler.go
+14
-6
indexer/crawler.go
···
14
14
)
15
15
16
16
type CrawlDispatcher struct {
17
+
// from Crawl()
17
18
ingest chan *models.ActorInfo
18
19
20
+
// from AddToCatchupQueue()
21
+
catchup chan *crawlWork
22
+
23
+
// from main loop to fetchWorker()
19
24
repoSync chan *crawlWork
20
25
21
-
catchup chan *crawlWork
22
-
23
26
complete chan models.Uid
24
27
25
28
maplk sync.Mutex
26
29
todo map[models.Uid]*crawlWork
27
30
inProgress map[models.Uid]*crawlWork
28
31
29
-
doRepoCrawl func(context.Context, *crawlWork) error
32
+
repoFetcher CrawlRepoFetcher
30
33
31
34
concurrency int
32
35
···
35
38
done chan struct{}
36
39
}
37
40
38
-
func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
41
+
// this is what we need of RepoFetcher
42
+
type CrawlRepoFetcher interface {
43
+
FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
44
+
}
45
+
46
+
func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
39
47
if concurrency < 1 {
40
48
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
41
49
}
···
45
53
repoSync: make(chan *crawlWork),
46
54
complete: make(chan models.Uid),
47
55
catchup: make(chan *crawlWork),
48
-
doRepoCrawl: repoFn,
56
+
repoFetcher: repoFetcher,
49
57
concurrency: concurrency,
50
58
todo: make(map[models.Uid]*crawlWork),
51
59
inProgress: make(map[models.Uid]*crawlWork),
···
221
229
for {
222
230
select {
223
231
case job := <-c.repoSync:
224
-
if err := c.doRepoCrawl(context.TODO(), job); err != nil {
232
+
if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
225
233
c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
226
234
}
227
235
+1
-1
indexer/indexer.go
+1
-1
indexer/indexer.go
+3
-1
indexer/repofetch.go
+3
-1
indexer/repofetch.go
···
141
141
}
142
142
}
143
143
144
+
revp := &rev
144
145
if rev == "" {
145
146
span.SetAttributes(attribute.Bool("full", true))
147
+
revp = nil
146
148
}
147
149
148
150
c := models.ClientForPds(&pds)
···
153
155
return err
154
156
}
155
157
156
-
if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil {
158
+
if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), revp); err != nil {
157
159
span.RecordError(err)
158
160
159
161
if ipld.IsNotFound(err) || errors.Is(err, io.EOF) || errors.Is(err, fs.ErrNotExist) {
+13
models/dbcid.go
+13
models/dbcid.go
···
4
4
"database/sql/driver"
5
5
"encoding/json"
6
6
"fmt"
7
+
"github.com/gocql/gocql"
7
8
8
9
"github.com/ipfs/go-cid"
9
10
)
···
62
63
func (dbc *DbCID) GormDataType() string {
63
64
return "bytes"
64
65
}
66
+
67
+
func (dbc *DbCID) MarshalCQL(info gocql.TypeInfo) ([]byte, error) {
68
+
return dbc.CID.Bytes(), nil
69
+
}
70
+
func (dbc *DbCID) UnmarshalCQL(info gocql.TypeInfo, data []byte) error {
71
+
xcid, err := cid.Cast(data)
72
+
if err != nil {
73
+
return err
74
+
}
75
+
dbc.CID = xcid
76
+
return nil
77
+
}
+4
-4
repo/repo.go
+4
-4
repo/repo.go
···
80
80
81
81
br, err := car.NewBlockReader(r)
82
82
if err != nil {
83
-
return cid.Undef, err
83
+
return cid.Undef, fmt.Errorf("IngestRepo:NewBlockReader: %w", err)
84
84
}
85
85
86
86
for {
···
89
89
if err == io.EOF {
90
90
break
91
91
}
92
-
return cid.Undef, err
92
+
return cid.Undef, fmt.Errorf("IngestRepo:Next: %w", err)
93
93
}
94
94
95
95
if err := bs.Put(ctx, blk); err != nil {
96
-
return cid.Undef, err
96
+
return cid.Undef, fmt.Errorf("IngestRepo:Put: %w", err)
97
97
}
98
98
}
99
99
···
104
104
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
105
105
root, err := IngestRepo(ctx, bs, r)
106
106
if err != nil {
107
-
return nil, err
107
+
return nil, fmt.Errorf("ReadRepoFromCar:IngestRepo: %w", err)
108
108
}
109
109
110
110
return OpenRepo(ctx, bs, root)
+3
repomgr/repomgr.go
+3
repomgr/repomgr.go
+8
-6
testing/utils.go
+8
-6
testing/utils.go
···
210
210
}
211
211
212
212
limReqBody := bgs.RateLimitChangeRequest{
213
-
Host: u.Host,
214
-
PerSecond: 5_000,
215
-
PerHour: 100_000,
216
-
PerDay: 1_000_000,
217
-
RepoLimit: 500_000,
218
-
CrawlRate: 50_000,
213
+
Host: u.Host,
214
+
PDSRates: bgs.PDSRates{
215
+
PerSecond: 5_000,
216
+
PerHour: 100_000,
217
+
PerDay: 1_000_000,
218
+
RepoLimit: 500_000,
219
+
CrawlRate: 50_000,
220
+
},
219
221
}
220
222
221
223
// JSON encode the request body