1package bgs
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "net/http"
11 "net/url"
12 "strings"
13
14 atproto "github.com/bluesky-social/indigo/api/atproto"
15 comatprototypes "github.com/bluesky-social/indigo/api/atproto"
16 "github.com/bluesky-social/indigo/carstore"
17 "github.com/bluesky-social/indigo/events"
18 "github.com/bluesky-social/indigo/mst"
19 "gorm.io/gorm"
20
21 "github.com/bluesky-social/indigo/xrpc"
22 "github.com/ipfs/go-cid"
23 cbor "github.com/ipfs/go-ipld-cbor"
24 "github.com/ipld/go-car"
25 "github.com/labstack/echo/v4"
26)
27
28func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection string, did string, rkey string) (io.Reader, error) {
29 u, err := s.lookupUserByDid(ctx, did)
30 if err != nil {
31 if errors.Is(err, gorm.ErrRecordNotFound) {
32 return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
33 }
34 log.Error("failed to lookup user", "err", err, "did", did)
35 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
36 }
37
38 if u.GetTombstoned() {
39 return nil, fmt.Errorf("account was deleted")
40 }
41
42 if u.GetTakenDown() {
43 return nil, fmt.Errorf("account was taken down by the Relay")
44 }
45
46 ustatus := u.GetUpstreamStatus()
47 if ustatus == events.AccountStatusTakendown {
48 return nil, fmt.Errorf("account was taken down by its PDS")
49 }
50
51 if ustatus == events.AccountStatusDeactivated {
52 return nil, fmt.Errorf("account is temporarily deactivated")
53 }
54
55 if ustatus == events.AccountStatusSuspended {
56 return nil, fmt.Errorf("account is suspended by its PDS")
57 }
58
59 root, blocks, err := s.repoman.GetRecordProof(ctx, u.ID, collection, rkey)
60 if err != nil {
61 if errors.Is(err, mst.ErrNotFound) {
62 return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo")
63 }
64 log.Error("failed to get record from repo", "err", err, "did", did, "collection", collection, "rkey", rkey)
65 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get record from repo")
66 }
67
68 buf := new(bytes.Buffer)
69 hb, err := cbor.DumpObject(&car.CarHeader{
70 Roots: []cid.Cid{root},
71 Version: 1,
72 })
73 if _, err := carstore.LdWrite(buf, hb); err != nil {
74 return nil, err
75 }
76
77 for _, blk := range blocks {
78 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
79 return nil, err
80 }
81 }
82
83 return buf, nil
84}
85
86func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since string) (io.Reader, error) {
87 u, err := s.lookupUserByDid(ctx, did)
88 if err != nil {
89 if errors.Is(err, gorm.ErrRecordNotFound) {
90 return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
91 }
92 log.Error("failed to lookup user", "err", err, "did", did)
93 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
94 }
95
96 if u.GetTombstoned() {
97 return nil, fmt.Errorf("account was deleted")
98 }
99
100 if u.GetTakenDown() {
101 return nil, fmt.Errorf("account was taken down by the Relay")
102 }
103
104 ustatus := u.GetUpstreamStatus()
105 if ustatus == events.AccountStatusTakendown {
106 return nil, fmt.Errorf("account was taken down by its PDS")
107 }
108
109 if ustatus == events.AccountStatusDeactivated {
110 return nil, fmt.Errorf("account is temporarily deactivated")
111 }
112
113 if ustatus == events.AccountStatusSuspended {
114 return nil, fmt.Errorf("account is suspended by its PDS")
115 }
116
117 // TODO: stream the response
118 buf := new(bytes.Buffer)
119 if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil {
120 log.Error("failed to read repo into buffer", "err", err, "did", did)
121 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to read repo into buffer")
122 }
123
124 return buf, nil
125}
126
127func (s *BGS) handleComAtprotoSyncGetBlocks(ctx context.Context, cids []string, did string) (io.Reader, error) {
128 return nil, fmt.Errorf("NYI")
129}
130
131func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatprototypes.SyncRequestCrawl_Input) error {
132 host := body.Hostname
133 if host == "" {
134 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname")
135 }
136
137 if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
138 if s.ssl {
139 host = "https://" + host
140 } else {
141 host = "http://" + host
142 }
143 }
144
145 u, err := url.Parse(host)
146 if err != nil {
147 return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname")
148 }
149
150 if u.Scheme == "http" && s.ssl {
151 return echo.NewHTTPError(http.StatusBadRequest, "this server requires https")
152 }
153
154 if u.Scheme == "https" && !s.ssl {
155 return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https")
156 }
157
158 if u.Path != "" {
159 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path")
160 }
161
162 if u.Query().Encode() != "" {
163 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query")
164 }
165
166 host = u.Host // potentially hostname:port
167
168 banned, err := s.domainIsBanned(ctx, host)
169 if banned {
170 return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned")
171 }
172
173 log.Warn("TODO: better host validation for crawl requests")
174
175 clientHost := fmt.Sprintf("%s://%s", u.Scheme, host)
176
177 c := &xrpc.Client{
178 Host: clientHost,
179 Client: http.DefaultClient, // not using the client that auto-retries
180 }
181
182 desc, err := atproto.ServerDescribeServer(ctx, c)
183 if err != nil {
184 errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost)
185 return echo.NewHTTPError(http.StatusBadRequest, errMsg)
186 }
187
188 // Maybe we could do something with this response later
189 _ = desc
190
191 if len(s.nextCrawlers) != 0 {
192 blob, err := json.Marshal(body)
193 if err != nil {
194 log.Warn("could not forward requestCrawl, json err", "err", err)
195 } else {
196 go func(bodyBlob []byte) {
197 for _, rpu := range s.nextCrawlers {
198 pu := rpu.JoinPath("/xrpc/com.atproto.sync.requestCrawl")
199 response, err := s.httpClient.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob))
200 if response != nil && response.Body != nil {
201 response.Body.Close()
202 }
203 if err != nil || response == nil {
204 log.Warn("requestCrawl forward failed", "host", rpu, "err", err)
205 } else if response.StatusCode != http.StatusOK {
206 log.Warn("requestCrawl forward failed", "host", rpu, "status", response.Status)
207 } else {
208 log.Info("requestCrawl forward successful", "host", rpu)
209 }
210 }
211 }(blob)
212 }
213 }
214
215 return s.slurper.SubscribeToPds(ctx, host, true, false, nil)
216}
217
218func (s *BGS) handleComAtprotoSyncNotifyOfUpdate(ctx context.Context, body *comatprototypes.SyncNotifyOfUpdate_Input) error {
219 // TODO:
220 return nil
221}
222
223func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor int64, limit int) (*comatprototypes.SyncListRepos_Output, error) {
224 // Filter out tombstoned, taken down, and deactivated accounts
225 q := fmt.Sprintf("id > ? AND NOT tombstoned AND NOT taken_down AND (upstream_status is NULL OR (upstream_status != '%s' AND upstream_status != '%s' AND upstream_status != '%s'))",
226 events.AccountStatusDeactivated, events.AccountStatusSuspended, events.AccountStatusTakendown)
227
228 // Load the users
229 users := []*User{}
230 if err := s.db.Model(&User{}).Where(q, cursor).Order("id").Limit(limit).Find(&users).Error; err != nil {
231 if err == gorm.ErrRecordNotFound {
232 return &comatprototypes.SyncListRepos_Output{}, nil
233 }
234 log.Error("failed to query users", "err", err)
235 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to query users")
236 }
237
238 if len(users) == 0 {
239 // resp.Repos is an explicit empty array, not just 'nil'
240 return &comatprototypes.SyncListRepos_Output{
241 Repos: []*comatprototypes.SyncListRepos_Repo{},
242 }, nil
243 }
244
245 resp := &comatprototypes.SyncListRepos_Output{
246 Repos: make([]*comatprototypes.SyncListRepos_Repo, len(users)),
247 }
248
249 // Fetch the repo roots for each user
250 for i := range users {
251 user := users[i]
252
253 root, err := s.repoman.GetRepoRoot(ctx, user.ID)
254 if err != nil {
255 log.Error("failed to get repo root", "err", err, "did", user.Did)
256 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get repo root for (%s): %v", user.Did, err.Error()))
257 }
258
259 resp.Repos[i] = &comatprototypes.SyncListRepos_Repo{
260 Did: user.Did,
261 Head: root.String(),
262 }
263 }
264
265 // If this is not the last page, set the cursor
266 if len(users) >= limit && len(users) > 1 {
267 nextCursor := fmt.Sprintf("%d", users[len(users)-1].ID)
268 resp.Cursor = &nextCursor
269 }
270
271 return resp, nil
272}
273
274func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did string) (*comatprototypes.SyncGetLatestCommit_Output, error) {
275 u, err := s.lookupUserByDid(ctx, did)
276 if err != nil {
277 if errors.Is(err, gorm.ErrRecordNotFound) {
278 return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
279 }
280 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
281 }
282
283 if u.GetTombstoned() {
284 return nil, fmt.Errorf("account was deleted")
285 }
286
287 if u.GetTakenDown() {
288 return nil, fmt.Errorf("account was taken down by the Relay")
289 }
290
291 ustatus := u.GetUpstreamStatus()
292 if ustatus == events.AccountStatusTakendown {
293 return nil, fmt.Errorf("account was taken down by its PDS")
294 }
295
296 if ustatus == events.AccountStatusDeactivated {
297 return nil, fmt.Errorf("account is temporarily deactivated")
298 }
299
300 if ustatus == events.AccountStatusSuspended {
301 return nil, fmt.Errorf("account is suspended by its PDS")
302 }
303
304 root, err := s.repoman.GetRepoRoot(ctx, u.ID)
305 if err != nil {
306 log.Error("failed to get repo root", "err", err, "did", u.Did)
307 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo root")
308 }
309
310 rev, err := s.repoman.GetRepoRev(ctx, u.ID)
311 if err != nil {
312 log.Error("failed to get repo rev", "err", err, "did", u.Did)
313 return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo rev")
314 }
315
316 return &comatprototypes.SyncGetLatestCommit_Output{
317 Cid: root.String(),
318 Rev: rev,
319 }, nil
320}