An atproto PDS written in Go
103
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 0.5.0 147 lines 3.6 kB view raw
1package server 2 3import ( 4 "bytes" 5 "fmt" 6 "io" 7 8 "github.com/aws/aws-sdk-go/aws" 9 "github.com/aws/aws-sdk-go/aws/credentials" 10 "github.com/aws/aws-sdk-go/aws/session" 11 "github.com/aws/aws-sdk-go/service/s3" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 "github.com/ipfs/go-cid" 15 "github.com/labstack/echo/v4" 16 "github.com/multiformats/go-multihash" 17) 18 19const ( 20 blockSize = 0x10000 21) 22 23type ComAtprotoRepoUploadBlobResponse struct { 24 Blob struct { 25 Type string `json:"$type"` 26 Ref struct { 27 Link string `json:"$link"` 28 } `json:"ref"` 29 MimeType string `json:"mimeType"` 30 Size int `json:"size"` 31 } `json:"blob"` 32} 33 34func (s *Server) handleRepoUploadBlob(e echo.Context) error { 35 urepo := e.Get("repo").(*models.RepoActor) 36 37 mime := e.Request().Header.Get("content-type") 38 if mime == "" { 39 mime = "application/octet-stream" 40 } 41 42 storage := "sqlite" 43 s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled 44 if s3Upload { 45 storage = "s3" 46 } 47 blob := models.Blob{ 48 Did: urepo.Repo.Did, 49 RefCount: 0, 50 CreatedAt: s.repoman.clock.Next().String(), 51 Storage: storage, 52 } 53 54 if err := s.db.Create(&blob, nil).Error; err != nil { 55 s.logger.Error("error creating new blob in db", "error", err) 56 return helpers.ServerError(e, nil) 57 } 58 59 read := 0 60 part := 0 61 62 buf := make([]byte, 0x10000) 63 fulldata := new(bytes.Buffer) 64 65 for { 66 n, err := io.ReadFull(e.Request().Body, buf) 67 if err == io.ErrUnexpectedEOF || err == io.EOF { 68 if n == 0 { 69 break 70 } 71 } else if err != nil && err != io.ErrUnexpectedEOF { 72 s.logger.Error("error reading blob", "error", err) 73 return helpers.ServerError(e, nil) 74 } 75 76 data := buf[:n] 77 read += n 78 fulldata.Write(data) 79 80 if !s3Upload { 81 blobPart := models.BlobPart{ 82 BlobID: blob.ID, 83 Idx: part, 84 Data: data, 85 } 86 87 if err := s.db.Create(&blobPart, nil).Error; err != nil { 88 s.logger.Error("error adding blob part to db", "error", err) 89 return helpers.ServerError(e, nil) 90 } 91 } 92 part++ 93 94 if n < blockSize { 95 break 96 } 97 } 98 99 c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes()) 100 if err != nil { 101 s.logger.Error("error creating cid prefix", "error", err) 102 return helpers.ServerError(e, nil) 103 } 104 105 if s3Upload { 106 config := &aws.Config{ 107 Region: aws.String(s.s3Config.Region), 108 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 109 } 110 111 if s.s3Config.Endpoint != "" { 112 config.Endpoint = aws.String(s.s3Config.Endpoint) 113 config.S3ForcePathStyle = aws.Bool(true) 114 } 115 116 sess, err := session.NewSession(config) 117 if err != nil { 118 s.logger.Error("error creating aws session", "error", err) 119 return helpers.ServerError(e, nil) 120 } 121 122 svc := s3.New(sess) 123 124 if _, err := svc.PutObject(&s3.PutObjectInput{ 125 Bucket: aws.String(s.s3Config.Bucket), 126 Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())), 127 Body: bytes.NewReader(fulldata.Bytes()), 128 }); err != nil { 129 s.logger.Error("error uploading blob to s3", "error", err) 130 return helpers.ServerError(e, nil) 131 } 132 } 133 134 if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil { 135 // there should probably be somme handling here if this fails... 136 s.logger.Error("error updating blob", "error", err) 137 return helpers.ServerError(e, nil) 138 } 139 140 resp := ComAtprotoRepoUploadBlobResponse{} 141 resp.Blob.Type = "blob" 142 resp.Blob.Ref.Link = c.String() 143 resp.Blob.MimeType = mime 144 resp.Blob.Size = read 145 146 return e.JSON(200, resp) 147}