An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

Changed files
+46 -74
internal
db
server
sqlite_blockstore
+3 -21
internal/db/db.go
··· 2 3 import ( 4 "context" 5 - "sync" 6 7 "gorm.io/gorm" 8 "gorm.io/gorm/clause" ··· 10 11 type DB struct { 12 cli *gorm.DB 13 - mu sync.Mutex 14 } 15 16 func NewDB(cli *gorm.DB) *DB { 17 return &DB{ 18 cli: cli, 19 - mu: sync.Mutex{}, 20 } 21 } 22 23 func (db *DB) Create(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 24 - db.mu.Lock() 25 - defer db.mu.Unlock() 26 return db.cli.WithContext(ctx).Clauses(clauses...).Create(value) 27 } 28 29 func (db *DB) Save(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 30 - db.mu.Lock() 31 - defer db.mu.Unlock() 32 return db.cli.WithContext(ctx).Clauses(clauses...).Save(value) 33 } 34 35 func (db *DB) Exec(ctx context.Context, sql string, clauses []clause.Expression, values ...any) *gorm.DB { 36 - db.mu.Lock() 37 - defer db.mu.Unlock() 38 return db.cli.WithContext(ctx).Clauses(clauses...).Exec(sql, values...) 39 } 40 ··· 47 } 48 49 func (db *DB) Delete(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 50 - db.mu.Lock() 51 - defer db.mu.Unlock() 52 return db.cli.WithContext(ctx).Clauses(clauses...).Delete(value) 53 } 54 ··· 56 return db.cli.WithContext(ctx).First(dest, conds...) 57 } 58 59 - // TODO: this isn't actually good. we can commit even if the db is locked here. this is probably okay for the time being, but need to figure 60 - // out a better solution. right now we only do this whenever we're importing a repo though so i'm mostly not worried, but it's still bad. 61 - // e.g. when we do apply writes we should also be using a transcation but we don't right now 62 - func (db *DB) BeginDangerously(ctx context.Context) *gorm.DB { 63 return db.cli.WithContext(ctx).Begin() 64 } 65 66 - func (db *DB) Lock() { 67 - db.mu.Lock() 68 - } 69 - 70 - func (db *DB) Unlock() { 71 - db.mu.Unlock() 72 }
··· 2 3 import ( 4 "context" 5 6 "gorm.io/gorm" 7 "gorm.io/gorm/clause" ··· 9 10 type DB struct { 11 cli *gorm.DB 12 } 13 14 func NewDB(cli *gorm.DB) *DB { 15 return &DB{ 16 cli: cli, 17 } 18 } 19 20 func (db *DB) Create(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 21 return db.cli.WithContext(ctx).Clauses(clauses...).Create(value) 22 } 23 24 func (db *DB) Save(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 25 return db.cli.WithContext(ctx).Clauses(clauses...).Save(value) 26 } 27 28 func (db *DB) Exec(ctx context.Context, sql string, clauses []clause.Expression, values ...any) *gorm.DB { 29 return db.cli.WithContext(ctx).Clauses(clauses...).Exec(sql, values...) 30 } 31 ··· 38 } 39 40 func (db *DB) Delete(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 41 return db.cli.WithContext(ctx).Clauses(clauses...).Delete(value) 42 } 43 ··· 45 return db.cli.WithContext(ctx).First(dest, conds...) 46 } 47 48 + func (db *DB) Begin(ctx context.Context) *gorm.DB { 49 return db.cli.WithContext(ctx).Begin() 50 } 51 52 + func (db *DB) Client() *gorm.DB { 53 + return db.cli 54 }
+1 -1
server/handle_import_repo.go
··· 66 return helpers.ServerError(e, nil) 67 } 68 69 - tx := s.db.BeginDangerously(ctx) 70 71 clock := syntax.NewTIDClock(0) 72
··· 66 return helpers.ServerError(e, nil) 67 } 68 69 + tx := s.db.Begin(ctx) 70 71 clock := syntax.NewTIDClock(0) 72
+1 -1
server/handle_server_delete_account.go
··· 69 }) 70 } 71 72 - tx := s.db.BeginDangerously(ctx) 73 if tx.Error != nil { 74 logger.Error("error starting transaction", "error", tx.Error) 75 return helpers.ServerError(e, nil)
··· 69 }) 70 } 71 72 + tx := s.db.Begin(ctx) 73 if tx.Error != nil { 74 logger.Error("error starting transaction", "error", tx.Error) 75 return helpers.ServerError(e, nil)
+40 -50
server/server.go
··· 322 if err != nil { 323 return nil, fmt.Errorf("failed to open sqlite database: %w", err) 324 } 325 logger.Info("connected to SQLite database", "path", args.DbName) 326 } 327 dbw := db.NewDB(gdb) ··· 625 626 logger.Info("beginning backup to s3...") 627 628 - var buf bytes.Buffer 629 - if err := func() error { 630 - logger.Info("reading database bytes...") 631 - s.db.Lock() 632 - defer s.db.Unlock() 633 - 634 - sf, err := os.Open(s.dbName) 635 - if err != nil { 636 - return fmt.Errorf("error opening database for backup: %w", err) 637 - } 638 - defer sf.Close() 639 - 640 - if _, err := io.Copy(&buf, sf); err != nil { 641 - return fmt.Errorf("error reading bytes of backup db: %w", err) 642 - } 643 644 - return nil 645 - }(); err != nil { 646 - logger.Error("error backing up database", "error", err) 647 return 648 } 649 650 - if err := func() error { 651 - logger.Info("sending to s3...") 652 - 653 - currTime := time.Now().Format("2006-01-02_15-04-05") 654 - key := "cocoon-backup-" + currTime + ".db" 655 656 - config := &aws.Config{ 657 - Region: aws.String(s.s3Config.Region), 658 - Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 659 - } 660 661 - if s.s3Config.Endpoint != "" { 662 - config.Endpoint = aws.String(s.s3Config.Endpoint) 663 - config.S3ForcePathStyle = aws.Bool(true) 664 - } 665 666 - sess, err := session.NewSession(config) 667 - if err != nil { 668 - return err 669 - } 670 671 - svc := s3.New(sess) 672 673 - if _, err := svc.PutObject(&s3.PutObjectInput{ 674 - Bucket: aws.String(s.s3Config.Bucket), 675 - Key: aws.String(key), 676 - Body: bytes.NewReader(buf.Bytes()), 677 - }); err != nil { 678 - return fmt.Errorf("error uploading file to s3: %w", err) 679 - } 680 681 - logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds()) 682 683 - return nil 684 - }(); err != nil { 685 - logger.Error("error uploading database backup", "error", err) 686 return 687 } 688 689 - os.WriteFile("last-backup.txt", []byte(time.Now().String()), 0644) 690 } 691 692 func (s *Server) backupRoutine() { ··· 721 if err != nil { 722 shouldBackupNow = true 723 } else { 724 - lastBackup, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", string(lastBackupStr)) 725 if err != nil { 726 shouldBackupNow = true 727 - } else if time.Now().Sub(lastBackup).Seconds() > 3600 { 728 shouldBackupNow = true 729 } 730 }
··· 322 if err != nil { 323 return nil, fmt.Errorf("failed to open sqlite database: %w", err) 324 } 325 + gdb.Exec("PRAGMA journal_mode=WAL") 326 + gdb.Exec("PRAGMA synchronous=NORMAL") 327 + 328 logger.Info("connected to SQLite database", "path", args.DbName) 329 } 330 dbw := db.NewDB(gdb) ··· 628 629 logger.Info("beginning backup to s3...") 630 631 + tmpFile := fmt.Sprintf("/tmp/cocoon-backup-%s.db", time.Now().Format(time.RFC3339Nano)) 632 + defer os.Remove(tmpFile) 633 634 + if err := s.db.Client().Exec(fmt.Sprintf("VACUUM INTO '%s'", tmpFile)).Error; err != nil { 635 + logger.Error("error creating tmp backup file", "err", err) 636 return 637 } 638 639 + backupData, err := os.ReadFile(tmpFile) 640 + if err != nil { 641 + logger.Error("error reading tmp backup file", "err", err) 642 + return 643 + } 644 645 + logger.Info("sending to s3...") 646 647 + currTime := time.Now().Format("2006-01-02_15-04-05") 648 + key := "cocoon-backup-" + currTime + ".db" 649 650 + config := &aws.Config{ 651 + Region: aws.String(s.s3Config.Region), 652 + Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 653 + } 654 655 + if s.s3Config.Endpoint != "" { 656 + config.Endpoint = aws.String(s.s3Config.Endpoint) 657 + config.S3ForcePathStyle = aws.Bool(true) 658 + } 659 660 + sess, err := session.NewSession(config) 661 + if err != nil { 662 + logger.Error("error creating s3 session", "err", err) 663 + return 664 + } 665 666 + svc := s3.New(sess) 667 668 + if _, err := svc.PutObject(&s3.PutObjectInput{ 669 + Bucket: aws.String(s.s3Config.Bucket), 670 + Key: aws.String(key), 671 + Body: bytes.NewReader(backupData), 672 + }); err != nil { 673 + logger.Error("error uploading file to s3", "err", err) 674 return 675 } 676 677 + logger.Info("finished uploading backup to s3", "key", key, "duration", time.Since(start).Seconds()) 678 + 679 + os.WriteFile("last-backup.txt", []byte(time.Now().Format(time.RFC3339Nano)), 0644) 680 } 681 682 func (s *Server) backupRoutine() { ··· 711 if err != nil { 712 shouldBackupNow = true 713 } else { 714 + lastBackup, err := time.Parse(time.RFC3339Nano, string(lastBackupStr)) 715 if err != nil { 716 shouldBackupNow = true 717 + } else if time.Since(lastBackup).Seconds() > 3600 { 718 shouldBackupNow = true 719 } 720 }
+1 -1
sqlite_blockstore/sqlite_blockstore.go
··· 94 } 95 96 func (bs *SqliteBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { 97 - tx := bs.db.BeginDangerously(ctx) 98 99 for _, block := range blocks { 100 bs.inserts[block.Cid()] = block
··· 94 } 95 96 func (bs *SqliteBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { 97 + tx := bs.db.Begin(ctx) 98 99 for _, block := range blocks { 100 bs.inserts[block.Cid()] = block