forked from hailey.at/cocoon
An atproto PDS written in Go
at main 3.6 kB view raw
1package server 2 3import ( 4 "bytes" 5 "fmt" 6 "io" 7 8 "github.com/Azure/go-autorest/autorest/to" 9 "github.com/aws/aws-sdk-go/aws" 10 "github.com/aws/aws-sdk-go/aws/credentials" 11 "github.com/aws/aws-sdk-go/aws/session" 12 "github.com/aws/aws-sdk-go/service/s3" 13 "github.com/haileyok/cocoon/internal/helpers" 14 "github.com/haileyok/cocoon/models" 15 "github.com/ipfs/go-cid" 16 "github.com/labstack/echo/v4" 17) 18 19func (s *Server) handleSyncGetBlob(e echo.Context) error { 20 ctx := e.Request().Context() 21 logger := s.logger.With("name", "handleSyncGetBlob") 22 23 did := e.QueryParam("did") 24 if did == "" { 25 return helpers.InputError(e, nil) 26 } 27 28 cstr := e.QueryParam("cid") 29 if cstr == "" { 30 return helpers.InputError(e, nil) 31 } 32 33 c, err := cid.Parse(cstr) 34 if err != nil { 35 return helpers.InputError(e, nil) 36 } 37 38 urepo, err := s.getRepoActorByDid(ctx, did) 39 if err != nil { 40 logger.Error("could not find user for requested blob", "error", err) 41 return helpers.InputError(e, nil) 42 } 43 44 status := urepo.Status() 45 if status != nil { 46 if *status == "deactivated" { 47 return helpers.InputError(e, to.StringPtr("RepoDeactivated")) 48 } 49 } 50 51 var blob models.Blob 52 if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil { 53 logger.Error("error looking up blob", "error", err) 54 return helpers.ServerError(e, nil) 55 } 56 57 buf := new(bytes.Buffer) 58 59 if blob.Storage == "sqlite" { 60 var parts []models.BlobPart 61 if err := s.db.Raw(ctx, "SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil { 62 logger.Error("error getting blob parts", "error", err) 63 return helpers.ServerError(e, nil) 64 } 65 66 // TODO: we can just stream this, don't need to make a buffer 67 for _, p := range parts { 68 buf.Write(p.Data) 69 } 70 } else if blob.Storage == "s3" { 71 if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) { 72 logger.Error("s3 storage disabled") 73 return helpers.ServerError(e, nil) 74 } 75 76 blobKey := fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String()) 77 78 if s.s3Config.CDNUrl != "" { 79 redirectUrl := fmt.Sprintf("%s/%s", s.s3Config.CDNUrl, blobKey) 80 return e.Redirect(302, redirectUrl) 81 } 82 83 config := &aws.Config{ 84 Region: aws.String(s.s3Config.Region), 85 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 86 } 87 88 if s.s3Config.Endpoint != "" { 89 config.Endpoint = aws.String(s.s3Config.Endpoint) 90 config.S3ForcePathStyle = aws.Bool(true) 91 } 92 93 sess, err := session.NewSession(config) 94 if err != nil { 95 logger.Error("error creating aws session", "error", err) 96 return helpers.ServerError(e, nil) 97 } 98 99 svc := s3.New(sess) 100 if result, err := svc.GetObject(&s3.GetObjectInput{ 101 Bucket: aws.String(s.s3Config.Bucket), 102 Key: aws.String(blobKey), 103 }); err != nil { 104 logger.Error("error getting blob from s3", "error", err) 105 return helpers.ServerError(e, nil) 106 } else { 107 read := 0 108 part := 0 109 partBuf := make([]byte, 0x10000) 110 111 for { 112 n, err := io.ReadFull(result.Body, partBuf) 113 if err == io.ErrUnexpectedEOF || err == io.EOF { 114 if n == 0 { 115 break 116 } 117 } else if err != nil && err != io.ErrUnexpectedEOF { 118 logger.Error("error reading blob", "error", err) 119 return helpers.ServerError(e, nil) 120 } 121 122 data := partBuf[:n] 123 read += n 124 buf.Write(data) 125 part++ 126 } 127 } 128 } else { 129 logger.Error("unknown storage", "storage", blob.Storage) 130 return helpers.ServerError(e, nil) 131 } 132 133 e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String()) 134 135 return e.Stream(200, "application/octet-stream", buf) 136}