porting all github actions from bluesky-social/indigo to tangled CI
at ci 9.9 kB view raw
1package bgs 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "net/http" 11 "net/url" 12 "strings" 13 14 atproto "github.com/bluesky-social/indigo/api/atproto" 15 comatprototypes "github.com/bluesky-social/indigo/api/atproto" 16 "github.com/bluesky-social/indigo/carstore" 17 "github.com/bluesky-social/indigo/events" 18 "github.com/bluesky-social/indigo/mst" 19 "gorm.io/gorm" 20 21 "github.com/bluesky-social/indigo/xrpc" 22 "github.com/ipfs/go-cid" 23 cbor "github.com/ipfs/go-ipld-cbor" 24 "github.com/ipld/go-car" 25 "github.com/labstack/echo/v4" 26) 27 28func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) { 29 u, err := s.lookupUserByDid(ctx, did) 30 if err != nil { 31 if errors.Is(err, gorm.ErrRecordNotFound) { 32 return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") 33 } 34 log.Error("failed to lookup user", "err", err, "did", did) 35 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") 36 } 37 38 if u.GetTombstoned() { 39 return nil, fmt.Errorf("account was deleted") 40 } 41 42 if u.GetTakenDown() { 43 return nil, fmt.Errorf("account was taken down by the Relay") 44 } 45 46 ustatus := u.GetUpstreamStatus() 47 if ustatus == events.AccountStatusTakendown { 48 return nil, fmt.Errorf("account was taken down by its PDS") 49 } 50 51 if ustatus == events.AccountStatusDeactivated { 52 return nil, fmt.Errorf("account is temporarily deactivated") 53 } 54 55 if ustatus == events.AccountStatusSuspended { 56 return nil, fmt.Errorf("account is suspended by its PDS") 57 } 58 59 root, blocks, err := s.repoman.GetRecordProof(ctx, u.ID, collection, rkey) 60 if err != nil { 61 if errors.Is(err, mst.ErrNotFound) { 62 return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo") 63 } 64 log.Error("failed to get record from repo", "err", err, "did", did, "collection", collection, "rkey", rkey) 65 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get record from repo") 66 } 67 68 buf := new(bytes.Buffer) 69 hb, err := cbor.DumpObject(&car.CarHeader{ 70 Roots: []cid.Cid{root}, 71 Version: 1, 72 }) 73 if _, err := carstore.LdWrite(buf, hb); err != nil { 74 return nil, err 75 } 76 77 for _, blk := range blocks { 78 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 79 return nil, err 80 } 81 } 82 83 return buf, nil 84} 85 86func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) { 87 u, err := s.lookupUserByDid(ctx, did) 88 if err != nil { 89 if errors.Is(err, gorm.ErrRecordNotFound) { 90 return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") 91 } 92 log.Error("failed to lookup user", "err", err, "did", did) 93 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") 94 } 95 96 if u.GetTombstoned() { 97 return nil, fmt.Errorf("account was deleted") 98 } 99 100 if u.GetTakenDown() { 101 return nil, fmt.Errorf("account was taken down by the Relay") 102 } 103 104 ustatus := u.GetUpstreamStatus() 105 if ustatus == events.AccountStatusTakendown { 106 return nil, fmt.Errorf("account was taken down by its PDS") 107 } 108 109 if ustatus == events.AccountStatusDeactivated { 110 return nil, fmt.Errorf("account is temporarily deactivated") 111 } 112 113 if ustatus == events.AccountStatusSuspended { 114 return nil, fmt.Errorf("account is suspended by its PDS") 115 } 116 117 // TODO: stream the response 118 buf := new(bytes.Buffer) 119 if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil { 120 log.Error("failed to read repo into buffer", "err", err, "did", did) 121 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to read repo into buffer") 122 } 123 124 return buf, nil 125} 126 127func (s *BGS) handleComAtprotoSyncGetBlocks(ctx context.Context, cids []string, did string) (io.Reader, error) { 128 return nil, fmt.Errorf("NYI") 129} 130 131func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatprototypes.SyncRequestCrawl_Input) error { 132 host := body.Hostname 133 if host == "" { 134 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") 135 } 136 137 if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 138 if s.ssl { 139 host = "https://" + host 140 } else { 141 host = "http://" + host 142 } 143 } 144 145 u, err := url.Parse(host) 146 if err != nil { 147 return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 148 } 149 150 if u.Scheme == "http" && s.ssl { 151 return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 152 } 153 154 if u.Scheme == "https" && !s.ssl { 155 return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") 156 } 157 158 if u.Path != "" { 159 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") 160 } 161 162 if u.Query().Encode() != "" { 163 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") 164 } 165 166 host = u.Host // potentially hostname:port 167 168 banned, err := s.domainIsBanned(ctx, host) 169 if banned { 170 return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned") 171 } 172 173 log.Warn("TODO: better host validation for crawl requests") 174 175 clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) 176 177 c := &xrpc.Client{ 178 Host: clientHost, 179 Client: http.DefaultClient, // not using the client that auto-retries 180 } 181 182 desc, err := atproto.ServerDescribeServer(ctx, c) 183 if err != nil { 184 errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) 185 return echo.NewHTTPError(http.StatusBadRequest, errMsg) 186 } 187 188 // Maybe we could do something with this response later 189 _ = desc 190 191 if len(s.nextCrawlers) != 0 { 192 blob, err := json.Marshal(body) 193 if err != nil { 194 log.Warn("could not forward requestCrawl, json err", "err", err) 195 } else { 196 go func(bodyBlob []byte) { 197 for _, rpu := range s.nextCrawlers { 198 pu := rpu.JoinPath("/xrpc/com.atproto.sync.requestCrawl") 199 response, err := s.httpClient.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob)) 200 if response != nil && response.Body != nil { 201 response.Body.Close() 202 } 203 if err != nil || response == nil { 204 log.Warn("requestCrawl forward failed", "host", rpu, "err", err) 205 } else if response.StatusCode != http.StatusOK { 206 log.Warn("requestCrawl forward failed", "host", rpu, "status", response.Status) 207 } else { 208 log.Info("requestCrawl forward successful", "host", rpu) 209 } 210 } 211 }(blob) 212 } 213 } 214 215 return s.slurper.SubscribeToPds(ctx, host, true, false, nil) 216} 217 218func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error { 219 // TODO: 220 return nil 221} 222 223func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor int64, limit int) (*comatprototypes.SyncListRepos_Output, error) { 224 // Filter out tombstoned, taken down, and deactivated accounts 225 q := fmt.Sprintf("id > ? AND NOT tombstoned AND NOT taken_down AND (upstream_status is NULL OR (upstream_status != '%s' AND upstream_status != '%s' AND upstream_status != '%s'))", 226 events.AccountStatusDeactivated, events.AccountStatusSuspended, events.AccountStatusTakendown) 227 228 // Load the users 229 users := []*User{} 230 if err := s.db.Model(&User{}).Where(q, cursor).Order("id").Limit(limit).Find(&users).Error; err != nil { 231 if err == gorm.ErrRecordNotFound { 232 return &comatprototypes.SyncListRepos_Output{}, nil 233 } 234 log.Error("failed to query users", "err", err) 235 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to query users") 236 } 237 238 if len(users) == 0 { 239 // resp.Repos is an explicit empty array, not just 'nil' 240 return &comatprototypes.SyncListRepos_Output{ 241 Repos: []*comatprototypes.SyncListRepos_Repo{}, 242 }, nil 243 } 244 245 resp := &comatprototypes.SyncListRepos_Output{ 246 Repos: make([]*comatprototypes.SyncListRepos_Repo, len(users)), 247 } 248 249 // Fetch the repo roots for each user 250 for i := range users { 251 user := users[i] 252 253 root, err := s.repoman.GetRepoRoot(ctx, user.ID) 254 if err != nil { 255 log.Error("failed to get repo root", "err", err, "did", user.Did) 256 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get repo root for (%s): %v", user.Did, err.Error())) 257 } 258 259 resp.Repos[i] = &comatprototypes.SyncListRepos_Repo{ 260 Did: user.Did, 261 Head: root.String(), 262 } 263 } 264 265 // If this is not the last page, set the cursor 266 if len(users) >= limit && len(users) > 1 { 267 nextCursor := fmt.Sprintf("%d", users[len(users)-1].ID) 268 resp.Cursor = &nextCursor 269 } 270 271 return resp, nil 272} 273 274func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) { 275 u, err := s.lookupUserByDid(ctx, did) 276 if err != nil { 277 if errors.Is(err, gorm.ErrRecordNotFound) { 278 return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") 279 } 280 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") 281 } 282 283 if u.GetTombstoned() { 284 return nil, fmt.Errorf("account was deleted") 285 } 286 287 if u.GetTakenDown() { 288 return nil, fmt.Errorf("account was taken down by the Relay") 289 } 290 291 ustatus := u.GetUpstreamStatus() 292 if ustatus == events.AccountStatusTakendown { 293 return nil, fmt.Errorf("account was taken down by its PDS") 294 } 295 296 if ustatus == events.AccountStatusDeactivated { 297 return nil, fmt.Errorf("account is temporarily deactivated") 298 } 299 300 if ustatus == events.AccountStatusSuspended { 301 return nil, fmt.Errorf("account is suspended by its PDS") 302 } 303 304 root, err := s.repoman.GetRepoRoot(ctx, u.ID) 305 if err != nil { 306 log.Error("failed to get repo root", "err", err, "did", u.Did) 307 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo root") 308 } 309 310 rev, err := s.repoman.GetRepoRev(ctx, u.ID) 311 if err != nil { 312 log.Error("failed to get repo rev", "err", err, "did", u.Did) 313 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo rev") 314 } 315 316 return &comatprototypes.SyncGetLatestCommit_Output{ 317 Cid: root.String(), 318 Rev: rev, 319 }, nil 320}