1package server
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7
8 "github.com/Azure/go-autorest/autorest/to"
9 "github.com/aws/aws-sdk-go/aws"
10 "github.com/aws/aws-sdk-go/aws/credentials"
11 "github.com/aws/aws-sdk-go/aws/session"
12 "github.com/aws/aws-sdk-go/service/s3"
13 "github.com/haileyok/cocoon/internal/helpers"
14 "github.com/haileyok/cocoon/models"
15 "github.com/ipfs/go-cid"
16 "github.com/labstack/echo/v4"
17)
18
19func (s *Server) handleSyncGetBlob(e echo.Context) error {
20 ctx := e.Request().Context()
21 logger := s.logger.With("name", "handleSyncGetBlob")
22
23 did := e.QueryParam("did")
24 if did == "" {
25 return helpers.InputError(e, nil)
26 }
27
28 cstr := e.QueryParam("cid")
29 if cstr == "" {
30 return helpers.InputError(e, nil)
31 }
32
33 c, err := cid.Parse(cstr)
34 if err != nil {
35 return helpers.InputError(e, nil)
36 }
37
38 urepo, err := s.getRepoActorByDid(ctx, did)
39 if err != nil {
40 logger.Error("could not find user for requested blob", "error", err)
41 return helpers.InputError(e, nil)
42 }
43
44 status := urepo.Status()
45 if status != nil {
46 if *status == "deactivated" {
47 return helpers.InputError(e, to.StringPtr("RepoDeactivated"))
48 }
49 }
50
51 var blob models.Blob
52 if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil {
53 logger.Error("error looking up blob", "error", err)
54 return helpers.ServerError(e, nil)
55 }
56
57 buf := new(bytes.Buffer)
58
59 if blob.Storage == "sqlite" {
60 var parts []models.BlobPart
61 if err := s.db.Raw(ctx, "SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
62 logger.Error("error getting blob parts", "error", err)
63 return helpers.ServerError(e, nil)
64 }
65
66 // TODO: we can just stream this, don't need to make a buffer
67 for _, p := range parts {
68 buf.Write(p.Data)
69 }
70 } else if blob.Storage == "s3" {
71 if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) {
72 logger.Error("s3 storage disabled")
73 return helpers.ServerError(e, nil)
74 }
75
76 blobKey := fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())
77
78 if s.s3Config.CDNUrl != "" {
79 redirectUrl := fmt.Sprintf("%s/%s", s.s3Config.CDNUrl, blobKey)
80 return e.Redirect(302, redirectUrl)
81 }
82
83 config := &aws.Config{
84 Region: aws.String(s.s3Config.Region),
85 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
86 }
87
88 if s.s3Config.Endpoint != "" {
89 config.Endpoint = aws.String(s.s3Config.Endpoint)
90 config.S3ForcePathStyle = aws.Bool(true)
91 }
92
93 sess, err := session.NewSession(config)
94 if err != nil {
95 logger.Error("error creating aws session", "error", err)
96 return helpers.ServerError(e, nil)
97 }
98
99 svc := s3.New(sess)
100 if result, err := svc.GetObject(&s3.GetObjectInput{
101 Bucket: aws.String(s.s3Config.Bucket),
102 Key: aws.String(blobKey),
103 }); err != nil {
104 logger.Error("error getting blob from s3", "error", err)
105 return helpers.ServerError(e, nil)
106 } else {
107 read := 0
108 part := 0
109 partBuf := make([]byte, 0x10000)
110
111 for {
112 n, err := io.ReadFull(result.Body, partBuf)
113 if err == io.ErrUnexpectedEOF || err == io.EOF {
114 if n == 0 {
115 break
116 }
117 } else if err != nil && err != io.ErrUnexpectedEOF {
118 logger.Error("error reading blob", "error", err)
119 return helpers.ServerError(e, nil)
120 }
121
122 data := partBuf[:n]
123 read += n
124 buf.Write(data)
125 part++
126 }
127 }
128 } else {
129 logger.Error("unknown storage", "storage", blob.Storage)
130 return helpers.ServerError(e, nil)
131 }
132
133 e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
134
135 return e.Stream(200, "application/octet-stream", buf)
136}