fork of indigo with slightly nicer lexgen
at main 18 kB view raw
1//go:build scylla 2 3package carstore 4 5import ( 6 "bytes" 7 "context" 8 "errors" 9 "fmt" 10 "github.com/bluesky-social/indigo/models" 11 "github.com/gocql/gocql" 12 blockformat "github.com/ipfs/go-block-format" 13 "github.com/ipfs/go-cid" 14 "github.com/ipfs/go-libipfs/blocks" 15 "github.com/ipld/go-car" 16 _ "github.com/mattn/go-sqlite3" 17 "github.com/prometheus/client_golang/prometheus" 18 "github.com/prometheus/client_golang/prometheus/promauto" 19 "go.opentelemetry.io/otel" 20 "go.opentelemetry.io/otel/attribute" 21 "io" 22 "log/slog" 23 "math" 24 "math/rand/v2" 25 "time" 26) 27 28type ScyllaStore struct { 29 WriteSession *gocql.Session 30 ReadSession *gocql.Session 31 32 // scylla servers 33 scyllaAddrs []string 34 // scylla namespace where we find our table 35 keyspace string 36 37 log *slog.Logger 38 39 lastShardCache lastShardCache 40} 41 42func NewScyllaStore(addrs []string, keyspace string) (*ScyllaStore, error) { 43 out := new(ScyllaStore) 44 out.scyllaAddrs = addrs 45 out.keyspace = keyspace 46 err := out.Open() 47 if err != nil { 48 return nil, err 49 } 50 return out, nil 51} 52 53func (sqs *ScyllaStore) Open() error { 54 if sqs.log == nil { 55 sqs.log = slog.Default() 56 } 57 sqs.log.Debug("scylla connect", "addrs", sqs.scyllaAddrs) 58 var err error 59 60 // 61 // Write session 62 // 63 var writeSession *gocql.Session 64 for retry := 0; ; retry++ { 65 writeCluster := gocql.NewCluster(sqs.scyllaAddrs...) 66 writeCluster.Keyspace = sqs.keyspace 67 // Default port, the client should automatically upgrade to shard-aware port 68 writeCluster.Port = 9042 69 writeCluster.Consistency = gocql.Quorum 70 writeCluster.RetryPolicy = &ExponentialBackoffRetryPolicy{NumRetries: 10, Min: 100 * time.Millisecond, Max: 10 * time.Second} 71 writeCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) 72 writeSession, err = writeCluster.CreateSession() 73 if err != nil { 74 if retry > 200 { 75 return fmt.Errorf("failed to connect read session too many times: %w", err) 76 } 77 sqs.log.Error("failed to connect to ScyllaDB Read Session, retrying in 1s", "retry", retry, "err", err) 78 time.Sleep(delayForAttempt(retry)) 79 continue 80 } 81 break 82 } 83 84 // 85 // Read session 86 // 87 var readSession *gocql.Session 88 for retry := 0; ; retry++ { 89 readCluster := gocql.NewCluster(sqs.scyllaAddrs...) 90 readCluster.Keyspace = sqs.keyspace 91 // Default port, the client should automatically upgrade to shard-aware port 92 readCluster.Port = 9042 93 readCluster.RetryPolicy = &ExponentialBackoffRetryPolicy{NumRetries: 5, Min: 10 * time.Millisecond, Max: 1 * time.Second} 94 readCluster.Consistency = gocql.One 95 readCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) 96 readSession, err = readCluster.CreateSession() 97 if err != nil { 98 if retry > 200 { 99 return fmt.Errorf("failed to connect read session too many times: %w", err) 100 } 101 sqs.log.Error("failed to connect to ScyllaDB Read Session, retrying in 1s", "retry", retry, "err", err) 102 time.Sleep(delayForAttempt(retry)) 103 continue 104 } 105 break 106 } 107 108 sqs.WriteSession = writeSession 109 sqs.ReadSession = readSession 110 111 err = sqs.createTables() 112 if err != nil { 113 return fmt.Errorf("scylla could not create tables, %w", err) 114 } 115 sqs.lastShardCache.source = sqs 116 sqs.lastShardCache.Init() 117 return nil 118} 119 120var createTableTexts = []string{ 121 `CREATE TABLE IF NOT EXISTS blocks (uid bigint, cid blob, rev varchar, root blob, block blob, PRIMARY KEY((uid,cid)))`, 122 // This is the INDEX I wish we could use, but scylla can't do it so we MATERIALIZED VIEW instead 123 //`CREATE INDEX IF NOT EXISTS block_by_rev ON blocks (uid, rev)`, 124 `CREATE MATERIALIZED VIEW IF NOT EXISTS blocks_by_uidrev 125AS SELECT uid, rev, cid, root 126FROM blocks 127WHERE uid IS NOT NULL AND rev IS NOT NULL AND cid IS NOT NULL 128PRIMARY KEY ((uid), rev, cid) WITH CLUSTERING ORDER BY (rev DESC)`, 129} 130 131func (sqs *ScyllaStore) createTables() error { 132 for i, text := range createTableTexts { 133 err := sqs.WriteSession.Query(text).Exec() 134 if err != nil { 135 return fmt.Errorf("scylla create table statement [%d] %v: %w", i, text, err) 136 } 137 } 138 return nil 139} 140 141// writeNewShard needed for DeltaSession.CloseWithRoot 142func (sqs *ScyllaStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 143 scWriteNewShard.Inc() 144 sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks)) 145 start := time.Now() 146 ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard") 147 defer span.End() 148 buf := new(bytes.Buffer) 149 hnw, err := WriteCarHeader(buf, root) 150 if err != nil { 151 return nil, fmt.Errorf("failed to write car header: %w", err) 152 } 153 offset := hnw 154 155 dbroot := root.Bytes() 156 157 span.SetAttributes(attribute.Int("blocks", len(blks))) 158 159 for bcid, block := range blks { 160 // build shard for output firehose 161 nw, err := LdWrite(buf, bcid.Bytes(), block.RawData()) 162 if err != nil { 163 return nil, fmt.Errorf("failed to write block: %w", err) 164 } 165 offset += nw 166 167 // TODO: scylla BATCH doesn't apply if the batch crosses partition keys; BUT, we may be able to send many blocks concurrently? 168 dbcid := bcid.Bytes() 169 blockbytes := block.RawData() 170 // we're relying on cql auto-prepare, no 'PreparedStatement' 171 err = sqs.WriteSession.Query( 172 `INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?)`, 173 user, dbcid, rev, dbroot, blockbytes, 174 ).Idempotent(true).Exec() 175 if err != nil { 176 return nil, fmt.Errorf("(uid,cid) block store failed, %w", err) 177 } 178 sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes)) 179 } 180 181 shard := CarShard{ 182 Root: models.DbCID{CID: root}, 183 DataStart: hnw, 184 Seq: seq, 185 Usr: user, 186 Rev: rev, 187 } 188 189 sqs.lastShardCache.put(&shard) 190 191 dt := time.Since(start).Seconds() 192 scWriteTimes.Observe(dt) 193 return buf.Bytes(), nil 194} 195 196// GetLastShard nedeed for NewDeltaSession indirectly through lastShardCache 197// What we actually seem to need from this: last {Rev, Root.CID} 198func (sqs *ScyllaStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) { 199 scGetLastShard.Inc() 200 var rev string 201 var rootb []byte 202 err := sqs.ReadSession.Query(`SELECT rev, root FROM blocks_by_uidrev WHERE uid = ? ORDER BY rev DESC LIMIT 1`, uid).Scan(&rev, &rootb) 203 if errors.Is(err, gocql.ErrNotFound) { 204 return nil, nil 205 } 206 if err != nil { 207 return nil, fmt.Errorf("last shard err, %w", err) 208 } 209 xcid, cidErr := cid.Cast(rootb) 210 if cidErr != nil { 211 return nil, fmt.Errorf("last shard bad cid, %w", cidErr) 212 } 213 return &CarShard{ 214 Root: models.DbCID{CID: xcid}, 215 Rev: rev, 216 }, nil 217} 218 219func (sqs *ScyllaStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { 220 sqs.log.Warn("TODO: don't call compaction") 221 return nil, nil 222} 223 224func (sqs *ScyllaStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { 225 sqs.log.Warn("TODO: don't call compaction targets") 226 return nil, nil 227} 228 229func (sqs *ScyllaStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 230 // TODO: same as FileCarStore; re-unify 231 lastShard, err := sqs.lastShardCache.get(ctx, user) 232 if err != nil { 233 return cid.Undef, err 234 } 235 if lastShard == nil { 236 return cid.Undef, nil 237 } 238 if lastShard.ID == 0 { 239 return cid.Undef, nil 240 } 241 242 return lastShard.Root.CID, nil 243} 244 245func (sqs *ScyllaStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 246 // TODO: same as FileCarStore; re-unify 247 lastShard, err := sqs.lastShardCache.get(ctx, user) 248 if err != nil { 249 return "", err 250 } 251 if lastShard == nil { 252 return "", nil 253 } 254 if lastShard.ID == 0 { 255 return "", nil 256 } 257 258 return lastShard.Rev, nil 259} 260 261func (sqs *ScyllaStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { 262 // TODO: same as FileCarStore, re-unify 263 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") 264 defer span.End() 265 266 carr, err := car.NewCarReader(bytes.NewReader(carslice)) 267 if err != nil { 268 return cid.Undef, nil, err 269 } 270 271 if len(carr.Header.Roots) != 1 { 272 return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) 273 } 274 275 ds, err := sqs.NewDeltaSession(ctx, uid, since) 276 if err != nil { 277 return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) 278 } 279 280 var cids []cid.Cid 281 for { 282 blk, err := carr.Next() 283 if err != nil { 284 if err == io.EOF { 285 break 286 } 287 return cid.Undef, nil, err 288 } 289 290 cids = append(cids, blk.Cid()) 291 292 if err := ds.Put(ctx, blk); err != nil { 293 return cid.Undef, nil, err 294 } 295 } 296 297 return carr.Header.Roots[0], ds, nil 298} 299 300func (sqs *ScyllaStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { 301 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") 302 defer span.End() 303 304 // TODO: ensure that we don't write updates on top of the wrong head 305 // this needs to be a compare and swap type operation 306 lastShard, err := sqs.lastShardCache.get(ctx, user) 307 if err != nil { 308 return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err) 309 } 310 311 if lastShard == nil { 312 lastShard = &zeroShard 313 } 314 315 if since != nil && *since != lastShard.Rev { 316 return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) 317 } 318 319 return &DeltaSession{ 320 blks: make(map[cid.Cid]blockformat.Block), 321 base: &sqliteUserView{ 322 uid: user, 323 sqs: sqs, 324 }, 325 user: user, 326 baseCid: lastShard.Root.CID, 327 cs: sqs, 328 seq: lastShard.Seq + 1, 329 lastRev: lastShard.Rev, 330 }, nil 331} 332 333func (sqs *ScyllaStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { 334 return &DeltaSession{ 335 base: &sqliteUserView{ 336 uid: user, 337 sqs: sqs, 338 }, 339 readonly: true, 340 user: user, 341 cs: sqs, 342 }, nil 343} 344 345// ReadUserCar 346// incremental is only ever called true 347func (sqs *ScyllaStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { 348 scGetCar.Inc() 349 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") 350 defer span.End() 351 start := time.Now() 352 353 cidchan := make(chan cid.Cid, 100) 354 355 go func() { 356 defer close(cidchan) 357 cids := sqs.ReadSession.Query(`SELECT cid FROM blocks_by_uidrev WHERE uid = ? AND rev > ? ORDER BY rev DESC`, user, sinceRev).Iter() 358 defer cids.Close() 359 for { 360 var cidb []byte 361 ok := cids.Scan(&cidb) 362 if !ok { 363 break 364 } 365 xcid, cidErr := cid.Cast(cidb) 366 if cidErr != nil { 367 sqs.log.Warn("ReadUserCar bad cid", "err", cidErr) 368 continue 369 } 370 cidchan <- xcid 371 } 372 }() 373 nblocks := 0 374 first := true 375 for xcid := range cidchan { 376 var xrev string 377 var xroot []byte 378 var xblock []byte 379 err := sqs.ReadSession.Query("SELECT rev, root, block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, xcid.Bytes()).Scan(&xrev, &xroot, &xblock) 380 if err != nil { 381 return fmt.Errorf("rcar bad read, %w", err) 382 } 383 if first { 384 rootCid, cidErr := cid.Cast(xroot) 385 if cidErr != nil { 386 return fmt.Errorf("rcar bad rootcid, %w", err) 387 } 388 if err := car.WriteHeader(&car.CarHeader{ 389 Roots: []cid.Cid{rootCid}, 390 Version: 1, 391 }, shardOut); err != nil { 392 return fmt.Errorf("rcar bad header, %w", err) 393 } 394 first = false 395 } 396 nblocks++ 397 _, err = LdWrite(shardOut, xcid.Bytes(), xblock) 398 if err != nil { 399 return fmt.Errorf("rcar bad write, %w", err) 400 } 401 } 402 span.SetAttributes(attribute.Int("blocks", nblocks)) 403 sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev) 404 scReadCarTimes.Observe(time.Since(start).Seconds()) 405 return nil 406} 407 408// Stat is only used in a debugging admin handler 409// don't bother implementing it (for now?) 410func (sqs *ScyllaStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { 411 sqs.log.Warn("Stat debugging method not implemented for sqlite store") 412 return nil, nil 413} 414 415func (sqs *ScyllaStore) WipeUserData(ctx context.Context, user models.Uid) error { 416 ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData") 417 defer span.End() 418 419 // LOL, can't do this if primary key is (uid,cid) because that's hashed with no scan! 420 //err := sqs.WriteSession.Query("DELETE FROM blocks WHERE uid = ?", user).Exec() 421 422 cidchan := make(chan cid.Cid, 100) 423 424 go func() { 425 defer close(cidchan) 426 cids := sqs.ReadSession.Query(`SELECT cid FROM blocks_by_uidrev WHERE uid = ?`, user).Iter() 427 defer cids.Close() 428 for { 429 var cidb []byte 430 ok := cids.Scan(&cidb) 431 if !ok { 432 break 433 } 434 xcid, cidErr := cid.Cast(cidb) 435 if cidErr != nil { 436 sqs.log.Warn("ReadUserCar bad cid", "err", cidErr) 437 continue 438 } 439 cidchan <- xcid 440 } 441 }() 442 nblocks := 0 443 errcount := 0 444 for xcid := range cidchan { 445 err := sqs.ReadSession.Query("DELETE FROM blocks WHERE uid = ? AND cid = ?", user, xcid.Bytes()).Exec() 446 if err != nil { 447 sqs.log.Warn("ReadUserCar bad delete", "err", err) 448 errcount++ 449 if errcount > 10 { 450 return fmt.Errorf("ReadUserCar bad delete: %w", err) 451 } 452 } 453 nblocks++ 454 } 455 scUsersWiped.Inc() 456 scBlocksDeleted.Add(float64(nblocks)) 457 return nil 458} 459 460// HasUidCid needed for NewDeltaSession userView 461func (sqs *ScyllaStore) HasUidCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) { 462 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData 463 scHas.Inc() 464 var rev string 465 var rootb []byte 466 err := sqs.ReadSession.Query(`SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1`, user, bcid.Bytes()).Scan(&rev, &rootb) 467 if err != nil { 468 return false, fmt.Errorf("hasUC bad scan, %w", err) 469 } 470 return true, nil 471} 472 473func (sqs *ScyllaStore) CarStore() CarStore { 474 return sqs 475} 476 477func (sqs *ScyllaStore) Close() error { 478 sqs.WriteSession.Close() 479 sqs.ReadSession.Close() 480 return nil 481} 482 483func (sqs *ScyllaStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) { 484 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData 485 scGetBlock.Inc() 486 start := time.Now() 487 var blockb []byte 488 err := sqs.ReadSession.Query("SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, bcid.Bytes()).Scan(&blockb) 489 if err != nil { 490 return nil, fmt.Errorf("getb err, %w", err) 491 } 492 dt := time.Since(start) 493 scGetTimes.Observe(dt.Seconds()) 494 return blocks.NewBlock(blockb), nil 495} 496 497func (sqs *ScyllaStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) { 498 // TODO: this is pretty cacheable? invalidate (uid,*) on WipeUserData 499 scGetBlockSize.Inc() 500 var out int64 501 err := sqs.ReadSession.Query("SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1", user, bcid.Bytes()).Scan(&out) 502 if err != nil { 503 return 0, fmt.Errorf("getbs err, %w", err) 504 } 505 return out, nil 506} 507 508var scUsersWiped = promauto.NewCounter(prometheus.CounterOpts{ 509 Name: "bgs_sc_users_wiped", 510 Help: "User rows deleted in scylla backend", 511}) 512 513var scBlocksDeleted = promauto.NewCounter(prometheus.CounterOpts{ 514 Name: "bgs_sc_blocks_deleted", 515 Help: "User blocks deleted in scylla backend", 516}) 517 518var scGetBlock = promauto.NewCounter(prometheus.CounterOpts{ 519 Name: "bgs_sc_get_block", 520 Help: "get block scylla backend", 521}) 522 523var scGetBlockSize = promauto.NewCounter(prometheus.CounterOpts{ 524 Name: "bgs_sc_get_block_size", 525 Help: "get block size scylla backend", 526}) 527 528var scGetCar = promauto.NewCounter(prometheus.CounterOpts{ 529 Name: "bgs_sc_get_car", 530 Help: "get block scylla backend", 531}) 532 533var scHas = promauto.NewCounter(prometheus.CounterOpts{ 534 Name: "bgs_sc_has", 535 Help: "check block presence scylla backend", 536}) 537 538var scGetLastShard = promauto.NewCounter(prometheus.CounterOpts{ 539 Name: "bgs_sc_get_last_shard", 540 Help: "get last shard scylla backend", 541}) 542 543var scWriteNewShard = promauto.NewCounter(prometheus.CounterOpts{ 544 Name: "bgs_sc_write_shard", 545 Help: "write shard blocks scylla backend", 546}) 547 548var timeBuckets []float64 549var scWriteTimes prometheus.Histogram 550var scGetTimes prometheus.Histogram 551var scReadCarTimes prometheus.Histogram 552 553func init() { 554 timeBuckets = make([]float64, 1, 20) 555 timeBuckets[0] = 0.000_0100 556 i := 0 557 for timeBuckets[i] < 1 && len(timeBuckets) < 20 { 558 timeBuckets = append(timeBuckets, timeBuckets[i]*2) 559 i++ 560 } 561 scWriteTimes = promauto.NewHistogram(prometheus.HistogramOpts{ 562 Name: "bgs_sc_write_times", 563 Buckets: timeBuckets, 564 }) 565 scGetTimes = promauto.NewHistogram(prometheus.HistogramOpts{ 566 Name: "bgs_sc_get_times", 567 Buckets: timeBuckets, 568 }) 569 scReadCarTimes = promauto.NewHistogram(prometheus.HistogramOpts{ 570 Name: "bgs_sc_readcar_times", 571 Buckets: timeBuckets, 572 }) 573} 574 575// TODO: copied from tango, re-unify? 576// ExponentialBackoffRetryPolicy sleeps between attempts 577type ExponentialBackoffRetryPolicy struct { 578 NumRetries int 579 Min, Max time.Duration 580} 581 582func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration { 583 return getExponentialTime(e.Min, e.Max, attempts) 584} 585 586func (e *ExponentialBackoffRetryPolicy) Attempt(q gocql.RetryableQuery) bool { 587 if q.Attempts() > e.NumRetries { 588 return false 589 } 590 time.Sleep(e.napTime(q.Attempts())) 591 return true 592} 593 594// used to calculate exponentially growing time 595func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration { 596 if min <= 0 { 597 min = 100 * time.Millisecond 598 } 599 if max <= 0 { 600 max = 10 * time.Second 601 } 602 minFloat := float64(min) 603 napDuration := minFloat * math.Pow(2, float64(attempts-1)) 604 // add some jitter 605 napDuration += rand.Float64()*minFloat - (minFloat / 2) 606 if napDuration > float64(max) { 607 return time.Duration(max) 608 } 609 return time.Duration(napDuration) 610} 611 612// GetRetryType returns the retry type for the given error 613func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) gocql.RetryType { 614 // Retry timeouts and/or contention errors on the same host 615 if errors.Is(err, gocql.ErrTimeoutNoResponse) || 616 errors.Is(err, gocql.ErrNoStreams) || 617 errors.Is(err, gocql.ErrTooManyTimeouts) { 618 return gocql.Retry 619 } 620 621 // Retry next host on unavailable errors 622 if errors.Is(err, gocql.ErrUnavailable) || 623 errors.Is(err, gocql.ErrConnectionClosed) || 624 errors.Is(err, gocql.ErrSessionClosed) { 625 return gocql.RetryNextHost 626 } 627 628 // Otherwise don't retry 629 return gocql.Rethrow 630} 631 632func delayForAttempt(attempt int) time.Duration { 633 if attempt < 50 { 634 return time.Millisecond * 5 635 } 636 637 return time.Second 638}