1package server
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7
8 "github.com/aws/aws-sdk-go/aws"
9 "github.com/aws/aws-sdk-go/aws/credentials"
10 "github.com/aws/aws-sdk-go/aws/session"
11 "github.com/aws/aws-sdk-go/service/s3"
12 "github.com/haileyok/cocoon/internal/helpers"
13 "github.com/haileyok/cocoon/models"
14 "github.com/ipfs/go-cid"
15 "github.com/labstack/echo/v4"
16 "github.com/multiformats/go-multihash"
17)
18
19const (
20 blockSize = 0x10000
21)
22
23type ComAtprotoRepoUploadBlobResponse struct {
24 Blob struct {
25 Type string `json:"$type"`
26 Ref struct {
27 Link string `json:"$link"`
28 } `json:"ref"`
29 MimeType string `json:"mimeType"`
30 Size int `json:"size"`
31 } `json:"blob"`
32}
33
34func (s *Server) handleRepoUploadBlob(e echo.Context) error {
35 ctx := e.Request().Context()
36 logger := s.logger.With("name", "handleRepoUploadBlob")
37
38 urepo := e.Get("repo").(*models.RepoActor)
39
40 mime := e.Request().Header.Get("content-type")
41 if mime == "" {
42 mime = "application/octet-stream"
43 }
44
45 storage := "sqlite"
46 s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled
47 if s3Upload {
48 storage = "s3"
49 }
50 blob := models.Blob{
51 Did: urepo.Repo.Did,
52 RefCount: 0,
53 CreatedAt: s.repoman.clock.Next().String(),
54 Storage: storage,
55 }
56
57 if err := s.db.Create(ctx, &blob, nil).Error; err != nil {
58 logger.Error("error creating new blob in db", "error", err)
59 return helpers.ServerError(e, nil)
60 }
61
62 read := 0
63 part := 0
64
65 buf := make([]byte, 0x10000)
66 fulldata := new(bytes.Buffer)
67
68 for {
69 n, err := io.ReadFull(e.Request().Body, buf)
70 if err == io.ErrUnexpectedEOF || err == io.EOF {
71 if n == 0 {
72 break
73 }
74 } else if err != nil && err != io.ErrUnexpectedEOF {
75 logger.Error("error reading blob", "error", err)
76 return helpers.ServerError(e, nil)
77 }
78
79 data := buf[:n]
80 read += n
81 fulldata.Write(data)
82
83 if !s3Upload {
84 blobPart := models.BlobPart{
85 BlobID: blob.ID,
86 Idx: part,
87 Data: data,
88 }
89
90 if err := s.db.Create(ctx, &blobPart, nil).Error; err != nil {
91 logger.Error("error adding blob part to db", "error", err)
92 return helpers.ServerError(e, nil)
93 }
94 }
95 part++
96
97 if n < blockSize {
98 break
99 }
100 }
101
102 c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes())
103 if err != nil {
104 logger.Error("error creating cid prefix", "error", err)
105 return helpers.ServerError(e, nil)
106 }
107
108 if s3Upload {
109 config := &aws.Config{
110 Region: aws.String(s.s3Config.Region),
111 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
112 }
113
114 if s.s3Config.Endpoint != "" {
115 config.Endpoint = aws.String(s.s3Config.Endpoint)
116 config.S3ForcePathStyle = aws.Bool(true)
117 }
118
119 sess, err := session.NewSession(config)
120 if err != nil {
121 logger.Error("error creating aws session", "error", err)
122 return helpers.ServerError(e, nil)
123 }
124
125 svc := s3.New(sess)
126
127 if _, err := svc.PutObject(&s3.PutObjectInput{
128 Bucket: aws.String(s.s3Config.Bucket),
129 Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
130 Body: bytes.NewReader(fulldata.Bytes()),
131 }); err != nil {
132 logger.Error("error uploading blob to s3", "error", err)
133 return helpers.ServerError(e, nil)
134 }
135 }
136
137 if err := s.db.Exec(ctx, "UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
138 // there should probably be somme handling here if this fails...
139 logger.Error("error updating blob", "error", err)
140 return helpers.ServerError(e, nil)
141 }
142
143 resp := ComAtprotoRepoUploadBlobResponse{}
144 resp.Blob.Type = "blob"
145 resp.Blob.Ref.Link = c.String()
146 resp.Blob.MimeType = mime
147 resp.Blob.Size = read
148
149 return e.JSON(200, resp)
150}