forked from hailey.at/cocoon
An atproto PDS written in Go
1package server 2 3import ( 4 "fmt" 5 "strconv" 6 7 "github.com/bluesky-social/indigo/atproto/atdata" 8 "github.com/haileyok/cocoon/internal/helpers" 9 "github.com/haileyok/cocoon/models" 10 "github.com/ipfs/go-cid" 11 "github.com/labstack/echo/v4" 12) 13 14type ComAtprotoRepoListMissingBlobsResponse struct { 15 Cursor *string `json:"cursor,omitempty"` 16 Blobs []ComAtprotoRepoListMissingBlobsRecordBlob `json:"blobs"` 17} 18 19type ComAtprotoRepoListMissingBlobsRecordBlob struct { 20 Cid string `json:"cid"` 21 RecordUri string `json:"recordUri"` 22} 23 24func (s *Server) handleListMissingBlobs(e echo.Context) error { 25 ctx := e.Request().Context() 26 logger := s.logger.With("name", "handleListMissingBlos") 27 28 urepo := e.Get("repo").(*models.RepoActor) 29 30 limitStr := e.QueryParam("limit") 31 cursor := e.QueryParam("cursor") 32 33 limit := 500 34 if limitStr != "" { 35 if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { 36 limit = l 37 } 38 } 39 40 var records []models.Record 41 if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil { 42 logger.Error("failed to get records for listMissingBlobs", "error", err) 43 return helpers.ServerError(e, nil) 44 } 45 46 type blobRef struct { 47 cid cid.Cid 48 recordUri string 49 } 50 var allBlobRefs []blobRef 51 52 for _, rec := range records { 53 blobs := getBlobsFromRecord(rec.Value) 54 recordUri := fmt.Sprintf("at://%s/%s/%s", urepo.Repo.Did, rec.Nsid, rec.Rkey) 55 for _, b := range blobs { 56 allBlobRefs = append(allBlobRefs, blobRef{cid: cid.Cid(b.Ref), recordUri: recordUri}) 57 } 58 } 59 60 missingBlobs := make([]ComAtprotoRepoListMissingBlobsRecordBlob, 0) 61 seenCids := make(map[string]bool) 62 63 for _, ref := range allBlobRefs { 64 cidStr := ref.cid.String() 65 66 if seenCids[cidStr] { 67 continue 68 } 69 70 if cursor != "" && cidStr <= cursor { 71 continue 72 } 73 74 var count int64 75 if err := s.db.Raw(ctx, "SELECT COUNT(*) FROM blobs WHERE did = ? AND cid = ?", nil, urepo.Repo.Did, ref.cid.Bytes()).Scan(&count).Error; err != nil { 76 continue 77 } 78 79 if count == 0 { 80 missingBlobs = append(missingBlobs, ComAtprotoRepoListMissingBlobsRecordBlob{ 81 Cid: cidStr, 82 RecordUri: ref.recordUri, 83 }) 84 seenCids[cidStr] = true 85 86 if len(missingBlobs) >= limit { 87 break 88 } 89 } 90 } 91 92 var nextCursor *string 93 if len(missingBlobs) > 0 && len(missingBlobs) >= limit { 94 lastCid := missingBlobs[len(missingBlobs)-1].Cid 95 nextCursor = &lastCid 96 } 97 98 return e.JSON(200, ComAtprotoRepoListMissingBlobsResponse{ 99 Cursor: nextCursor, 100 Blobs: missingBlobs, 101 }) 102} 103 104func getBlobsFromRecord(data []byte) []atdata.Blob { 105 if len(data) == 0 { 106 return nil 107 } 108 109 decoded, err := atdata.UnmarshalCBOR(data) 110 if err != nil { 111 return nil 112 } 113 114 return atdata.ExtractBlobs(decoded) 115}