An atproto PDS written in Go
103
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 8c7f03f9c80e688fc25a64ab838c78d4e0ef56d2 115 lines 2.8 kB view raw
1package server 2 3import ( 4 "fmt" 5 "strconv" 6 7 "github.com/bluesky-social/indigo/atproto/atdata" 8 "github.com/haileyok/cocoon/internal/helpers" 9 "github.com/haileyok/cocoon/models" 10 "github.com/ipfs/go-cid" 11 "github.com/labstack/echo/v4" 12) 13 14type ComAtprotoRepoListMissingBlobsResponse struct { 15 Cursor *string `json:"cursor,omitempty"` 16 Blobs []ComAtprotoRepoListMissingBlobsRecordBlob `json:"blobs"` 17} 18 19type ComAtprotoRepoListMissingBlobsRecordBlob struct { 20 Cid string `json:"cid"` 21 RecordUri string `json:"recordUri"` 22} 23 24func (s *Server) handleListMissingBlobs(e echo.Context) error { 25 ctx := e.Request().Context() 26 logger := s.logger.With("name", "handleListMissingBlos") 27 28 urepo := e.Get("repo").(*models.RepoActor) 29 30 limitStr := e.QueryParam("limit") 31 cursor := e.QueryParam("cursor") 32 33 limit := 500 34 if limitStr != "" { 35 if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { 36 limit = l 37 } 38 } 39 40 var records []models.Record 41 if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil { 42 logger.Error("failed to get records for listMissingBlobs", "error", err) 43 return helpers.ServerError(e, nil) 44 } 45 46 type blobRef struct { 47 cid cid.Cid 48 recordUri string 49 } 50 var allBlobRefs []blobRef 51 52 for _, rec := range records { 53 blobs := getBlobsFromRecord(rec.Value) 54 recordUri := fmt.Sprintf("at://%s/%s/%s", urepo.Repo.Did, rec.Nsid, rec.Rkey) 55 for _, b := range blobs { 56 allBlobRefs = append(allBlobRefs, blobRef{cid: cid.Cid(b.Ref), recordUri: recordUri}) 57 } 58 } 59 60 missingBlobs := make([]ComAtprotoRepoListMissingBlobsRecordBlob, 0) 61 seenCids := make(map[string]bool) 62 63 for _, ref := range allBlobRefs { 64 cidStr := ref.cid.String() 65 66 if seenCids[cidStr] { 67 continue 68 } 69 70 if cursor != "" && cidStr <= cursor { 71 continue 72 } 73 74 var count int64 75 if err := s.db.Raw(ctx, "SELECT COUNT(*) FROM blobs WHERE did = ? AND cid = ?", nil, urepo.Repo.Did, ref.cid.Bytes()).Scan(&count).Error; err != nil { 76 continue 77 } 78 79 if count == 0 { 80 missingBlobs = append(missingBlobs, ComAtprotoRepoListMissingBlobsRecordBlob{ 81 Cid: cidStr, 82 RecordUri: ref.recordUri, 83 }) 84 seenCids[cidStr] = true 85 86 if len(missingBlobs) >= limit { 87 break 88 } 89 } 90 } 91 92 var nextCursor *string 93 if len(missingBlobs) > 0 && len(missingBlobs) >= limit { 94 lastCid := missingBlobs[len(missingBlobs)-1].Cid 95 nextCursor = &lastCid 96 } 97 98 return e.JSON(200, ComAtprotoRepoListMissingBlobsResponse{ 99 Cursor: nextCursor, 100 Blobs: missingBlobs, 101 }) 102} 103 104func getBlobsFromRecord(data []byte) []atdata.Blob { 105 if len(data) == 0 { 106 return nil 107 } 108 109 decoded, err := atdata.UnmarshalCBOR(data) 110 if err != nil { 111 return nil 112 } 113 114 return atdata.ExtractBlobs(decoded) 115}