An atproto PDS written in Go
at main 3.6 kB view raw
1package server 2 3import ( 4 "bytes" 5 "fmt" 6 "io" 7 8 "github.com/aws/aws-sdk-go/aws" 9 "github.com/aws/aws-sdk-go/aws/credentials" 10 "github.com/aws/aws-sdk-go/aws/session" 11 "github.com/aws/aws-sdk-go/service/s3" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 "github.com/ipfs/go-cid" 15 "github.com/labstack/echo/v4" 16 "github.com/multiformats/go-multihash" 17) 18 19const ( 20 blockSize = 0x10000 21) 22 23type ComAtprotoRepoUploadBlobResponse struct { 24 Blob struct { 25 Type string `json:"$type"` 26 Ref struct { 27 Link string `json:"$link"` 28 } `json:"ref"` 29 MimeType string `json:"mimeType"` 30 Size int `json:"size"` 31 } `json:"blob"` 32} 33 34func (s *Server) handleRepoUploadBlob(e echo.Context) error { 35 ctx := e.Request().Context() 36 logger := s.logger.With("name", "handleRepoUploadBlob") 37 38 urepo := e.Get("repo").(*models.RepoActor) 39 40 mime := e.Request().Header.Get("content-type") 41 if mime == "" { 42 mime = "application/octet-stream" 43 } 44 45 storage := "sqlite" 46 s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled 47 if s3Upload { 48 storage = "s3" 49 } 50 blob := models.Blob{ 51 Did: urepo.Repo.Did, 52 RefCount: 0, 53 CreatedAt: s.repoman.clock.Next().String(), 54 Storage: storage, 55 } 56 57 if err := s.db.Create(ctx, &blob, nil).Error; err != nil { 58 logger.Error("error creating new blob in db", "error", err) 59 return helpers.ServerError(e, nil) 60 } 61 62 read := 0 63 part := 0 64 65 buf := make([]byte, 0x10000) 66 fulldata := new(bytes.Buffer) 67 68 for { 69 n, err := io.ReadFull(e.Request().Body, buf) 70 if err == io.ErrUnexpectedEOF || err == io.EOF { 71 if n == 0 { 72 break 73 } 74 } else if err != nil && err != io.ErrUnexpectedEOF { 75 logger.Error("error reading blob", "error", err) 76 return helpers.ServerError(e, nil) 77 } 78 79 data := buf[:n] 80 read += n 81 fulldata.Write(data) 82 83 if !s3Upload { 84 blobPart := models.BlobPart{ 85 BlobID: blob.ID, 86 Idx: part, 87 Data: data, 88 } 89 90 if err := s.db.Create(ctx, &blobPart, nil).Error; err != nil { 91 logger.Error("error adding blob part to db", "error", err) 92 return helpers.ServerError(e, nil) 93 } 94 } 95 part++ 96 97 if n < blockSize { 98 break 99 } 100 } 101 102 c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes()) 103 if err != nil { 104 logger.Error("error creating cid prefix", "error", err) 105 return helpers.ServerError(e, nil) 106 } 107 108 if s3Upload { 109 config := &aws.Config{ 110 Region: aws.String(s.s3Config.Region), 111 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 112 } 113 114 if s.s3Config.Endpoint != "" { 115 config.Endpoint = aws.String(s.s3Config.Endpoint) 116 config.S3ForcePathStyle = aws.Bool(true) 117 } 118 119 sess, err := session.NewSession(config) 120 if err != nil { 121 logger.Error("error creating aws session", "error", err) 122 return helpers.ServerError(e, nil) 123 } 124 125 svc := s3.New(sess) 126 127 if _, err := svc.PutObject(&s3.PutObjectInput{ 128 Bucket: aws.String(s.s3Config.Bucket), 129 Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())), 130 Body: bytes.NewReader(fulldata.Bytes()), 131 }); err != nil { 132 logger.Error("error uploading blob to s3", "error", err) 133 return helpers.ServerError(e, nil) 134 } 135 } 136 137 if err := s.db.Exec(ctx, "UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil { 138 // there should probably be somme handling here if this fails... 139 logger.Error("error updating blob", "error", err) 140 return helpers.ServerError(e, nil) 141 } 142 143 resp := ComAtprotoRepoUploadBlobResponse{} 144 resp.Blob.Type = "blob" 145 resp.Blob.Ref.Link = c.String() 146 resp.Blob.MimeType = mime 147 resp.Blob.Size = read 148 149 return e.JSON(200, resp) 150}