An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

Changed files
+46 -74
internal
db
server
sqlite_blockstore
+3 -21
internal/db/db.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "sync" 6 5 7 6 "gorm.io/gorm" 8 7 "gorm.io/gorm/clause" ··· 10 9 11 10 type DB struct { 12 11 cli *gorm.DB 13 - mu sync.Mutex 14 12 } 15 13 16 14 func NewDB(cli *gorm.DB) *DB { 17 15 return &DB{ 18 16 cli: cli, 19 - mu: sync.Mutex{}, 20 17 } 21 18 } 22 19 23 20 func (db *DB) Create(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 24 - db.mu.Lock() 25 - defer db.mu.Unlock() 26 21 return db.cli.WithContext(ctx).Clauses(clauses...).Create(value) 27 22 } 28 23 29 24 func (db *DB) Save(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 30 - db.mu.Lock() 31 - defer db.mu.Unlock() 32 25 return db.cli.WithContext(ctx).Clauses(clauses...).Save(value) 33 26 } 34 27 35 28 func (db *DB) Exec(ctx context.Context, sql string, clauses []clause.Expression, values ...any) *gorm.DB { 36 - db.mu.Lock() 37 - defer db.mu.Unlock() 38 29 return db.cli.WithContext(ctx).Clauses(clauses...).Exec(sql, values...) 39 30 } 40 31 ··· 47 38 } 48 39 49 40 func (db *DB) Delete(ctx context.Context, value any, clauses []clause.Expression) *gorm.DB { 50 - db.mu.Lock() 51 - defer db.mu.Unlock() 52 41 return db.cli.WithContext(ctx).Clauses(clauses...).Delete(value) 53 42 } 54 43 ··· 56 45 return db.cli.WithContext(ctx).First(dest, conds...) 57 46 } 58 47 59 - // TODO: this isn't actually good. we can commit even if the db is locked here. this is probably okay for the time being, but need to figure 60 - // out a better solution. right now we only do this whenever we're importing a repo though so i'm mostly not worried, but it's still bad. 61 - // e.g. when we do apply writes we should also be using a transcation but we don't right now 62 - func (db *DB) BeginDangerously(ctx context.Context) *gorm.DB { 48 + func (db *DB) Begin(ctx context.Context) *gorm.DB { 63 49 return db.cli.WithContext(ctx).Begin() 64 50 } 65 51 66 - func (db *DB) Lock() { 67 - db.mu.Lock() 68 - } 69 - 70 - func (db *DB) Unlock() { 71 - db.mu.Unlock() 52 + func (db *DB) Client() *gorm.DB { 53 + return db.cli 72 54 }
+1 -1
server/handle_import_repo.go
··· 66 66 return helpers.ServerError(e, nil) 67 67 } 68 68 69 - tx := s.db.BeginDangerously(ctx) 69 + tx := s.db.Begin(ctx) 70 70 71 71 clock := syntax.NewTIDClock(0) 72 72
+1 -1
server/handle_server_delete_account.go
··· 69 69 }) 70 70 } 71 71 72 - tx := s.db.BeginDangerously(ctx) 72 + tx := s.db.Begin(ctx) 73 73 if tx.Error != nil { 74 74 logger.Error("error starting transaction", "error", tx.Error) 75 75 return helpers.ServerError(e, nil)
+40 -50
server/server.go
··· 322 322 if err != nil { 323 323 return nil, fmt.Errorf("failed to open sqlite database: %w", err) 324 324 } 325 + gdb.Exec("PRAGMA journal_mode=WAL") 326 + gdb.Exec("PRAGMA synchronous=NORMAL") 327 + 325 328 logger.Info("connected to SQLite database", "path", args.DbName) 326 329 } 327 330 dbw := db.NewDB(gdb) ··· 625 628 626 629 logger.Info("beginning backup to s3...") 627 630 628 - var buf bytes.Buffer 629 - if err := func() error { 630 - logger.Info("reading database bytes...") 631 - s.db.Lock() 632 - defer s.db.Unlock() 633 - 634 - sf, err := os.Open(s.dbName) 635 - if err != nil { 636 - return fmt.Errorf("error opening database for backup: %w", err) 637 - } 638 - defer sf.Close() 639 - 640 - if _, err := io.Copy(&buf, sf); err != nil { 641 - return fmt.Errorf("error reading bytes of backup db: %w", err) 642 - } 631 + tmpFile := fmt.Sprintf("/tmp/cocoon-backup-%s.db", time.Now().Format(time.RFC3339Nano)) 632 + defer os.Remove(tmpFile) 643 633 644 - return nil 645 - }(); err != nil { 646 - logger.Error("error backing up database", "error", err) 634 + if err := s.db.Client().Exec(fmt.Sprintf("VACUUM INTO '%s'", tmpFile)).Error; err != nil { 635 + logger.Error("error creating tmp backup file", "err", err) 647 636 return 648 637 } 649 638 650 - if err := func() error { 651 - logger.Info("sending to s3...") 652 - 653 - currTime := time.Now().Format("2006-01-02_15-04-05") 654 - key := "cocoon-backup-" + currTime + ".db" 639 + backupData, err := os.ReadFile(tmpFile) 640 + if err != nil { 641 + logger.Error("error reading tmp backup file", "err", err) 642 + return 643 + } 655 644 656 - config := &aws.Config{ 657 - Region: aws.String(s.s3Config.Region), 658 - Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 659 - } 645 + logger.Info("sending to s3...") 660 646 661 - if s.s3Config.Endpoint != "" { 662 - config.Endpoint = aws.String(s.s3Config.Endpoint) 663 - config.S3ForcePathStyle = aws.Bool(true) 664 - } 647 + currTime := time.Now().Format("2006-01-02_15-04-05") 648 + key := "cocoon-backup-" + currTime + ".db" 665 649 666 - sess, err := session.NewSession(config) 667 - if err != nil { 668 - return err 669 - } 650 + config := &aws.Config{ 651 + Region: aws.String(s.s3Config.Region), 652 + Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 653 + } 670 654 671 - svc := s3.New(sess) 655 + if s.s3Config.Endpoint != "" { 656 + config.Endpoint = aws.String(s.s3Config.Endpoint) 657 + config.S3ForcePathStyle = aws.Bool(true) 658 + } 672 659 673 - if _, err := svc.PutObject(&s3.PutObjectInput{ 674 - Bucket: aws.String(s.s3Config.Bucket), 675 - Key: aws.String(key), 676 - Body: bytes.NewReader(buf.Bytes()), 677 - }); err != nil { 678 - return fmt.Errorf("error uploading file to s3: %w", err) 679 - } 660 + sess, err := session.NewSession(config) 661 + if err != nil { 662 + logger.Error("error creating s3 session", "err", err) 663 + return 664 + } 680 665 681 - logger.Info("finished uploading backup to s3", "key", key, "duration", time.Now().Sub(start).Seconds()) 666 + svc := s3.New(sess) 682 667 683 - return nil 684 - }(); err != nil { 685 - logger.Error("error uploading database backup", "error", err) 668 + if _, err := svc.PutObject(&s3.PutObjectInput{ 669 + Bucket: aws.String(s.s3Config.Bucket), 670 + Key: aws.String(key), 671 + Body: bytes.NewReader(backupData), 672 + }); err != nil { 673 + logger.Error("error uploading file to s3", "err", err) 686 674 return 687 675 } 688 676 689 - os.WriteFile("last-backup.txt", []byte(time.Now().String()), 0644) 677 + logger.Info("finished uploading backup to s3", "key", key, "duration", time.Since(start).Seconds()) 678 + 679 + os.WriteFile("last-backup.txt", []byte(time.Now().Format(time.RFC3339Nano)), 0644) 690 680 } 691 681 692 682 func (s *Server) backupRoutine() { ··· 721 711 if err != nil { 722 712 shouldBackupNow = true 723 713 } else { 724 - lastBackup, err := time.Parse("2006-01-02 15:04:05.999999999 -0700 MST", string(lastBackupStr)) 714 + lastBackup, err := time.Parse(time.RFC3339Nano, string(lastBackupStr)) 725 715 if err != nil { 726 716 shouldBackupNow = true 727 - } else if time.Now().Sub(lastBackup).Seconds() > 3600 { 717 + } else if time.Since(lastBackup).Seconds() > 3600 { 728 718 shouldBackupNow = true 729 719 } 730 720 }
+1 -1
sqlite_blockstore/sqlite_blockstore.go
··· 94 94 } 95 95 96 96 func (bs *SqliteBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { 97 - tx := bs.db.BeginDangerously(ctx) 97 + tx := bs.db.Begin(ctx) 98 98 99 99 for _, block := range blocks { 100 100 bs.inserts[block.Cid()] = block