1package server
2
3import (
4 "fmt"
5 "strconv"
6
7 "github.com/bluesky-social/indigo/atproto/atdata"
8 "github.com/haileyok/cocoon/internal/helpers"
9 "github.com/haileyok/cocoon/models"
10 "github.com/ipfs/go-cid"
11 "github.com/labstack/echo/v4"
12)
13
14type ComAtprotoRepoListMissingBlobsResponse struct {
15 Cursor *string `json:"cursor,omitempty"`
16 Blobs []ComAtprotoRepoListMissingBlobsRecordBlob `json:"blobs"`
17}
18
19type ComAtprotoRepoListMissingBlobsRecordBlob struct {
20 Cid string `json:"cid"`
21 RecordUri string `json:"recordUri"`
22}
23
24func (s *Server) handleListMissingBlobs(e echo.Context) error {
25 ctx := e.Request().Context()
26 logger := s.logger.With("name", "handleListMissingBlos")
27
28 urepo := e.Get("repo").(*models.RepoActor)
29
30 limitStr := e.QueryParam("limit")
31 cursor := e.QueryParam("cursor")
32
33 limit := 500
34 if limitStr != "" {
35 if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 {
36 limit = l
37 }
38 }
39
40 var records []models.Record
41 if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil {
42 logger.Error("failed to get records for listMissingBlobs", "error", err)
43 return helpers.ServerError(e, nil)
44 }
45
46 type blobRef struct {
47 cid cid.Cid
48 recordUri string
49 }
50 var allBlobRefs []blobRef
51
52 for _, rec := range records {
53 blobs := getBlobsFromRecord(rec.Value)
54 recordUri := fmt.Sprintf("at://%s/%s/%s", urepo.Repo.Did, rec.Nsid, rec.Rkey)
55 for _, b := range blobs {
56 allBlobRefs = append(allBlobRefs, blobRef{cid: cid.Cid(b.Ref), recordUri: recordUri})
57 }
58 }
59
60 missingBlobs := make([]ComAtprotoRepoListMissingBlobsRecordBlob, 0)
61 seenCids := make(map[string]bool)
62
63 for _, ref := range allBlobRefs {
64 cidStr := ref.cid.String()
65
66 if seenCids[cidStr] {
67 continue
68 }
69
70 if cursor != "" && cidStr <= cursor {
71 continue
72 }
73
74 var count int64
75 if err := s.db.Raw(ctx, "SELECT COUNT(*) FROM blobs WHERE did = ? AND cid = ?", nil, urepo.Repo.Did, ref.cid.Bytes()).Scan(&count).Error; err != nil {
76 continue
77 }
78
79 if count == 0 {
80 missingBlobs = append(missingBlobs, ComAtprotoRepoListMissingBlobsRecordBlob{
81 Cid: cidStr,
82 RecordUri: ref.recordUri,
83 })
84 seenCids[cidStr] = true
85
86 if len(missingBlobs) >= limit {
87 break
88 }
89 }
90 }
91
92 var nextCursor *string
93 if len(missingBlobs) > 0 && len(missingBlobs) >= limit {
94 lastCid := missingBlobs[len(missingBlobs)-1].Cid
95 nextCursor = &lastCid
96 }
97
98 return e.JSON(200, ComAtprotoRepoListMissingBlobsResponse{
99 Cursor: nextCursor,
100 Blobs: missingBlobs,
101 })
102}
103
104func getBlobsFromRecord(data []byte) []atdata.Blob {
105 if len(data) == 0 {
106 return nil
107 }
108
109 decoded, err := atdata.UnmarshalCBOR(data)
110 if err != nil {
111 return nil
112 }
113
114 return atdata.ExtractBlobs(decoded)
115}