An atproto PDS written in Go

Option to store blob content in S3 (#32)

authored by Ed Costello and committed by GitHub 28876f4e cfbcc310

Changed files
+139 -28
cmd
cocoon
models
server
+11 -6
cmd/cocoon/main.go
··· 107 107 Name: "s3-backups-enabled", 108 108 EnvVars: []string{"COCOON_S3_BACKUPS_ENABLED"}, 109 109 }, 110 + &cli.BoolFlag{ 111 + Name: "s3-blobstore-enabled", 112 + EnvVars: []string{"COCOON_S3_BLOBSTORE_ENABLED"}, 113 + }, 110 114 &cli.StringFlag{ 111 115 Name: "s3-region", 112 116 EnvVars: []string{"COCOON_S3_REGION"}, ··· 181 185 SmtpEmail: cmd.String("smtp-email"), 182 186 SmtpName: cmd.String("smtp-name"), 183 187 S3Config: &server.S3Config{ 184 - BackupsEnabled: cmd.Bool("s3-backups-enabled"), 185 - Region: cmd.String("s3-region"), 186 - Bucket: cmd.String("s3-bucket"), 187 - Endpoint: cmd.String("s3-endpoint"), 188 - AccessKey: cmd.String("s3-access-key"), 189 - SecretKey: cmd.String("s3-secret-key"), 188 + BackupsEnabled: cmd.Bool("s3-backups-enabled"), 189 + BlobstoreEnabled: cmd.Bool("s3-blobstore-enabled"), 190 + Region: cmd.String("s3-region"), 191 + Bucket: cmd.String("s3-bucket"), 192 + Endpoint: cmd.String("s3-endpoint"), 193 + AccessKey: cmd.String("s3-access-key"), 194 + SecretKey: cmd.String("s3-secret-key"), 190 195 }, 191 196 SessionSecret: cmd.String("session-secret"), 192 197 BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")),
+1
models/models.go
··· 106 106 Did string `gorm:"index;index:idx_blob_did_cid"` 107 107 Cid []byte `gorm:"index;index:idx_blob_did_cid"` 108 108 RefCount int 109 + Storage string `gorm:"default:sqlite"` 109 110 } 110 111 111 112 type BlobPart struct {
+50 -8
server/handle_repo_upload_blob.go
··· 2 2 3 3 import ( 4 4 "bytes" 5 + "fmt" 5 6 "io" 6 7 8 + "github.com/aws/aws-sdk-go/aws" 9 + "github.com/aws/aws-sdk-go/aws/credentials" 10 + "github.com/aws/aws-sdk-go/aws/session" 11 + "github.com/aws/aws-sdk-go/service/s3" 7 12 "github.com/haileyok/cocoon/internal/helpers" 8 13 "github.com/haileyok/cocoon/models" 9 14 "github.com/ipfs/go-cid" ··· 34 39 mime = "application/octet-stream" 35 40 } 36 41 42 + storage := "sqlite" 43 + s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled 44 + if s3Upload { 45 + storage = "s3" 46 + } 37 47 blob := models.Blob{ 38 48 Did: urepo.Repo.Did, 39 49 RefCount: 0, 40 50 CreatedAt: s.repoman.clock.Next().String(), 51 + Storage: storage, 41 52 } 42 53 43 54 if err := s.db.Create(&blob, nil).Error; err != nil { ··· 66 77 read += n 67 78 fulldata.Write(data) 68 79 69 - blobPart := models.BlobPart{ 70 - BlobID: blob.ID, 71 - Idx: part, 72 - Data: data, 73 - } 80 + if !s3Upload { 81 + blobPart := models.BlobPart{ 82 + BlobID: blob.ID, 83 + Idx: part, 84 + Data: data, 85 + } 74 86 75 - if err := s.db.Create(&blobPart, nil).Error; err != nil { 76 - s.logger.Error("error adding blob part to db", "error", err) 77 - return helpers.ServerError(e, nil) 87 + if err := s.db.Create(&blobPart, nil).Error; err != nil { 88 + s.logger.Error("error adding blob part to db", "error", err) 89 + return helpers.ServerError(e, nil) 90 + } 78 91 } 79 92 part++ 80 93 ··· 87 100 if err != nil { 88 101 s.logger.Error("error creating cid prefix", "error", err) 89 102 return helpers.ServerError(e, nil) 103 + } 104 + 105 + if s3Upload { 106 + config := &aws.Config{ 107 + Region: aws.String(s.s3Config.Region), 108 + Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 109 + } 110 + 111 + if s.s3Config.Endpoint != "" { 112 + config.Endpoint = aws.String(s.s3Config.Endpoint) 113 + config.S3ForcePathStyle = aws.Bool(true) 114 + } 115 + 116 + sess, err := session.NewSession(config) 117 + if err != nil { 118 + s.logger.Error("error creating aws session", "error", err) 119 + return helpers.ServerError(e, nil) 120 + } 121 + 122 + svc := s3.New(sess) 123 + 124 + if _, err := svc.PutObject(&s3.PutObjectInput{ 125 + Bucket: aws.String(s.s3Config.Bucket), 126 + Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())), 127 + Body: bytes.NewReader(fulldata.Bytes()), 128 + }); err != nil { 129 + s.logger.Error("error uploading blob to s3", "error", err) 130 + return helpers.ServerError(e, nil) 131 + } 90 132 } 91 133 92 134 if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
+70 -8
server/handle_sync_get_blob.go
··· 2 2 3 3 import ( 4 4 "bytes" 5 + "fmt" 6 + "io" 5 7 6 8 "github.com/Azure/go-autorest/autorest/to" 9 + "github.com/aws/aws-sdk-go/aws" 10 + "github.com/aws/aws-sdk-go/aws/credentials" 11 + "github.com/aws/aws-sdk-go/aws/session" 12 + "github.com/aws/aws-sdk-go/service/s3" 7 13 "github.com/haileyok/cocoon/internal/helpers" 8 14 "github.com/haileyok/cocoon/models" 9 15 "github.com/ipfs/go-cid" ··· 47 53 48 54 buf := new(bytes.Buffer) 49 55 50 - var parts []models.BlobPart 51 - if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil { 52 - s.logger.Error("error getting blob parts", "error", err) 53 - return helpers.ServerError(e, nil) 54 - } 56 + if blob.Storage == "sqlite" { 57 + var parts []models.BlobPart 58 + if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil { 59 + s.logger.Error("error getting blob parts", "error", err) 60 + return helpers.ServerError(e, nil) 61 + } 62 + 63 + // TODO: we can just stream this, don't need to make a buffer 64 + for _, p := range parts { 65 + buf.Write(p.Data) 66 + } 67 + } else if blob.Storage == "s3" { 68 + if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) { 69 + s.logger.Error("s3 storage disabled") 70 + return helpers.ServerError(e, nil) 71 + } 72 + 73 + config := &aws.Config{ 74 + Region: aws.String(s.s3Config.Region), 75 + Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 76 + } 55 77 56 - // TODO: we can just stream this, don't need to make a buffer 57 - for _, p := range parts { 58 - buf.Write(p.Data) 78 + if s.s3Config.Endpoint != "" { 79 + config.Endpoint = aws.String(s.s3Config.Endpoint) 80 + config.S3ForcePathStyle = aws.Bool(true) 81 + } 82 + 83 + sess, err := session.NewSession(config) 84 + if err != nil { 85 + s.logger.Error("error creating aws session", "error", err) 86 + return helpers.ServerError(e, nil) 87 + } 88 + 89 + svc := s3.New(sess) 90 + if result, err := svc.GetObject(&s3.GetObjectInput{ 91 + Bucket: aws.String(s.s3Config.Bucket), 92 + Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())), 93 + }); err != nil { 94 + s.logger.Error("error getting blob from s3", "error", err) 95 + return helpers.ServerError(e, nil) 96 + } else { 97 + read := 0 98 + part := 0 99 + partBuf := make([]byte, 0x10000) 100 + 101 + for { 102 + n, err := io.ReadFull(result.Body, partBuf) 103 + if err == io.ErrUnexpectedEOF || err == io.EOF { 104 + if n == 0 { 105 + break 106 + } 107 + } else if err != nil && err != io.ErrUnexpectedEOF { 108 + s.logger.Error("error reading blob", "error", err) 109 + return helpers.ServerError(e, nil) 110 + } 111 + 112 + data := partBuf[:n] 113 + read += n 114 + buf.Write(data) 115 + part++ 116 + } 117 + } 118 + } else { 119 + s.logger.Error("unknown storage", "storage", blob.Storage) 120 + return helpers.ServerError(e, nil) 59 121 } 60 122 61 123 e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
+7 -6
server/server.go
··· 52 52 ) 53 53 54 54 type S3Config struct { 55 - BackupsEnabled bool 56 - Endpoint string 57 - Region string 58 - Bucket string 59 - AccessKey string 60 - SecretKey string 55 + BackupsEnabled bool 56 + BlobstoreEnabled bool 57 + Endpoint string 58 + Region string 59 + Bucket string 60 + AccessKey string 61 + SecretKey string 61 62 } 62 63 63 64 type Server struct {