porting all github actions from bluesky-social/indigo to tangled CI
1package main
2
3import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "net/http"
8 "strconv"
9 "strings"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/cmd/relay/relay"
15 "github.com/bluesky-social/indigo/cmd/relay/relay/models"
16
17 "github.com/labstack/echo/v4"
18 dto "github.com/prometheus/client_model/go"
19)
20
21// This endpoint is basically the same as the regular com.atproto.sync.requestCrawl endpoint, except it sets a flag to bypass configuration checks.
22func (s *Service) handleAdminRequestCrawl(c echo.Context) error {
23 var body comatproto.SyncRequestCrawl_Input
24 if err := c.Bind(&body); err != nil {
25 return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Sprintf("invalid body: %s", err)}
26 }
27
28 return s.handleComAtprotoSyncRequestCrawl(c, &body, true)
29}
30
31func (s *Service) handleAdminSetSubsEnabled(c echo.Context) error {
32 enabled, err := strconv.ParseBool(c.QueryParam("enabled"))
33 if err != nil {
34 return &echo.HTTPError{Code: http.StatusBadRequest, Message: err.Error()}
35 }
36 s.config.DisableRequestCrawl = !enabled
37 return c.JSON(http.StatusOK, map[string]any{
38 "success": "true",
39 })
40}
41
42func (s *Service) handleAdminGetSubsEnabled(c echo.Context) error {
43 return c.JSON(http.StatusOK, map[string]bool{
44 "enabled": !s.config.DisableRequestCrawl,
45 })
46}
47
48func (s *Service) handleAdminGetNewHostPerDayRateLimit(c echo.Context) error {
49 return c.JSON(http.StatusOK, map[string]int64{
50 "limit": s.relay.HostPerDayLimiter.Limit(),
51 })
52}
53
54func (s *Service) handleAdminSetNewHostPerDayRateLimit(c echo.Context) error {
55 limit, err := strconv.ParseInt(c.QueryParam("limit"), 10, 64)
56 if err != nil {
57 return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Errorf("failed to parse limit: %w", err).Error()}
58 }
59
60 s.relay.HostPerDayLimiter.SetLimit(limit)
61
62 // NOTE: *not* forwarding to sibling instances
63
64 return c.JSON(http.StatusOK, map[string]any{
65 "success": "true",
66 })
67}
68
69func (s *Service) handleAdminTakeDownRepo(c echo.Context) error {
70 ctx := c.Request().Context()
71
72 var body map[string]string
73 if err := c.Bind(&body); err != nil {
74 return err
75 }
76 didField, ok := body["did"]
77 if !ok {
78 return &echo.HTTPError{
79 Code: http.StatusBadRequest,
80 Message: "must specify DID parameter in body",
81 }
82 }
83 did, err := syntax.ParseDID(didField)
84 if err != nil {
85 return err
86 }
87
88 if err := s.relay.UpdateAccountLocalStatus(ctx, did, models.AccountStatusTakendown, true); err != nil {
89 if errors.Is(err, relay.ErrAccountNotFound) {
90 return &echo.HTTPError{
91 Code: http.StatusNotFound,
92 Message: "account not found",
93 }
94 }
95 return &echo.HTTPError{
96 Code: http.StatusInternalServerError,
97 Message: err.Error(),
98 }
99 }
100
101 // forward on to any sibling instances
102 b, err := json.Marshal(body)
103 if err != nil {
104 return err
105 }
106 go s.ForwardSiblingRequest(c, b)
107
108 return c.JSON(http.StatusOK, map[string]any{
109 "success": "true",
110 })
111}
112
113func (s *Service) handleAdminReverseTakedown(c echo.Context) error {
114 ctx := c.Request().Context()
115
116 var body map[string]string
117 if err := c.Bind(&body); err != nil {
118 return err
119 }
120 didField, ok := body["did"]
121 if !ok {
122 return &echo.HTTPError{
123 Code: http.StatusBadRequest,
124 Message: "must specify DID parameter in body",
125 }
126 }
127 did, err := syntax.ParseDID(didField)
128 if err != nil {
129 return err
130 }
131
132 if err := s.relay.UpdateAccountLocalStatus(ctx, did, models.AccountStatusActive, true); err != nil {
133 if errors.Is(err, relay.ErrAccountNotFound) {
134 return &echo.HTTPError{
135 Code: http.StatusNotFound,
136 Message: "repo not found",
137 }
138 }
139 return &echo.HTTPError{
140 Code: http.StatusInternalServerError,
141 Message: err.Error(),
142 }
143 }
144
145 // forward on to any sibling instances
146 b, err := json.Marshal(body)
147 if err != nil {
148 return err
149 }
150 go s.ForwardSiblingRequest(c, b)
151
152 return c.JSON(http.StatusOK, map[string]any{
153 "success": "true",
154 })
155}
156
157type ListTakedownsResponse struct {
158 DIDs []string `json:"dids"`
159 Cursor int64 `json:"cursor,omitempty"`
160}
161
162func (s *Service) handleAdminListRepoTakeDowns(c echo.Context) error {
163 ctx := c.Request().Context()
164 var err error
165
166 limit := 500
167 cursor := int64(0)
168 cursorQuery := c.QueryParam("cursor")
169 if cursorQuery != "" {
170 cursor, err = strconv.ParseInt(cursorQuery, 10, 64)
171 if err != nil {
172 return &echo.HTTPError{Code: http.StatusBadRequest, Message: "invalid cursor param"}
173 }
174 }
175
176 accounts, err := s.relay.ListAccountTakedowns(ctx, cursor, limit)
177 if err != nil {
178 return &echo.HTTPError{Code: http.StatusInternalServerError, Message: "failed to list takedowns"}
179 }
180
181 out := ListTakedownsResponse{
182 DIDs: make([]string, len(accounts)),
183 }
184 for i, acc := range accounts {
185 out.DIDs[i] = acc.DID
186 out.Cursor = int64(acc.UID)
187 }
188 if len(out.DIDs) < limit {
189 out.Cursor = 0
190 }
191 return c.JSON(http.StatusOK, out)
192}
193
194func (s *Service) handleAdminGetUpstreamConns(c echo.Context) error {
195 return c.JSON(http.StatusOK, s.relay.Slurper.GetActiveSubHostnames())
196}
197
198type rateLimit struct {
199 Max float64 `json:"Max"`
200 WindowSeconds float64 `json:"Window"`
201}
202
203type hostInfo struct {
204 // fields from old models.PDS
205 ID uint64
206 CreatedAt time.Time
207 Host string
208 SSL bool
209 Cursor int64
210 Registered bool
211 Blocked bool
212 CrawlRateLimit float64
213 RepoCount int64
214 RepoLimit int64
215
216 HasActiveConnection bool `json:"HasActiveConnection"`
217 EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"`
218 PerSecondEventRate rateLimit `json:"PerSecondEventRate"`
219 PerHourEventRate rateLimit `json:"PerHourEventRate"`
220 PerDayEventRate rateLimit `json:"PerDayEventRate"`
221 UserCount int64 `json:"UserCount"`
222}
223
224func (s *Service) handleListHosts(c echo.Context) error {
225 ctx := c.Request().Context()
226
227 limit := 10_000
228 hosts, err := s.relay.ListHosts(ctx, 0, limit, false)
229 if err != nil {
230 return err
231 }
232
233 activeHostnames := s.relay.Slurper.GetActiveSubHostnames()
234 activeHosts := make(map[string]bool, len(activeHostnames))
235 for _, hostname := range activeHostnames {
236 activeHosts[hostname] = true
237 }
238
239 hostInfos := make([]hostInfo, len(hosts))
240 for i, host := range hosts {
241 _, isActive := activeHosts[host.Hostname]
242 hostInfos[i] = hostInfo{
243 ID: host.ID,
244 CreatedAt: host.CreatedAt,
245 Host: host.Hostname,
246 SSL: !host.NoSSL,
247 Cursor: host.LastSeq,
248 Registered: host.Status == models.HostStatusActive, // is this right?
249 Blocked: host.Status == models.HostStatusBanned,
250 RepoCount: host.AccountCount,
251 RepoLimit: host.AccountLimit,
252
253 HasActiveConnection: isActive,
254 UserCount: host.AccountCount,
255 }
256
257 // fetch current rate limits
258 hostInfos[i].PerSecondEventRate = rateLimit{Max: -1.0, WindowSeconds: 1}
259 hostInfos[i].PerHourEventRate = rateLimit{Max: -1.0, WindowSeconds: 3600}
260 hostInfos[i].PerDayEventRate = rateLimit{Max: -1.0, WindowSeconds: 86400}
261 if isActive {
262 slc, err := s.relay.Slurper.GetLimits(host.Hostname)
263 if err != nil {
264 s.logger.Error("fetching subscribed host limits", "err", err)
265 } else {
266 hostInfos[i].PerSecondEventRate = rateLimit{
267 Max: float64(slc.PerSecond),
268 WindowSeconds: 1,
269 }
270 hostInfos[i].PerHourEventRate = rateLimit{
271 Max: float64(slc.PerHour),
272 WindowSeconds: 3600,
273 }
274 hostInfos[i].PerDayEventRate = rateLimit{
275 Max: float64(slc.PerDay),
276 WindowSeconds: 86400,
277 }
278 }
279 }
280
281 // pull event counter metrics from prometheus
282 var m = &dto.Metric{}
283 if err := relay.EventsReceivedCounter.WithLabelValues(host.Hostname).Write(m); err != nil {
284 hostInfos[i].EventsSeenSinceStartup = 0
285 continue
286 }
287 hostInfos[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue())
288 }
289
290 return c.JSON(http.StatusOK, hostInfos)
291}
292
293func (s *Service) handleAdminListConsumers(c echo.Context) error {
294 return c.JSON(http.StatusOK, s.relay.ListConsumers())
295}
296
297func (s *Service) handleAdminKillUpstreamConn(c echo.Context) error {
298 ctx := c.Request().Context()
299
300 queryHost := strings.TrimSpace(c.QueryParam("host"))
301 hostname, _, err := relay.ParseHostname(queryHost)
302 if err != nil {
303 return &echo.HTTPError{
304 Code: http.StatusBadRequest,
305 Message: "must pass a valid host",
306 }
307 }
308
309 banHost := strings.ToLower(c.QueryParam("block")) == "true"
310
311 // TODO: move this method to relay (for updating the database)
312 if err := s.relay.Slurper.KillUpstreamConnection(ctx, hostname, banHost); err != nil {
313 if errors.Is(err, relay.ErrHostInactive) {
314 return &echo.HTTPError{
315 Code: http.StatusBadRequest,
316 Message: "no active connection to given host",
317 }
318 }
319 return err
320 }
321
322 // forward on to any sibling instances
323 go s.ForwardSiblingRequest(c, nil)
324
325 return c.JSON(http.StatusOK, map[string]any{
326 "success": "true",
327 })
328}
329
330func (s *Service) handleBlockHost(c echo.Context) error {
331 ctx := c.Request().Context()
332
333 queryHost := strings.TrimSpace(c.QueryParam("host"))
334 hostname, _, err := relay.ParseHostname(queryHost)
335 if err != nil {
336 return &echo.HTTPError{
337 Code: http.StatusBadRequest,
338 Message: "must pass a valid hostname",
339 }
340 }
341
342 host, err := s.relay.GetHost(ctx, hostname)
343 if err != nil {
344 return err
345 }
346
347 if host.Status != models.HostStatusBanned {
348 if err := s.relay.UpdateHostStatus(ctx, host.ID, models.HostStatusBanned); err != nil {
349 return err
350 }
351 }
352
353 // kill any active connection (there may not be one, so ignore error)
354 _ = s.relay.Slurper.KillUpstreamConnection(ctx, host.Hostname, false)
355
356 // forward on to any sibling instances
357 go s.ForwardSiblingRequest(c, nil)
358
359 return c.JSON(http.StatusOK, map[string]any{
360 "success": "true",
361 })
362}
363
364func (s *Service) handleUnblockHost(c echo.Context) error {
365 ctx := c.Request().Context()
366
367 queryHost := strings.TrimSpace(c.QueryParam("host"))
368 hostname, _, err := relay.ParseHostname(queryHost)
369 if err != nil {
370 return &echo.HTTPError{
371 Code: http.StatusBadRequest,
372 Message: "must pass a valid hostname",
373 }
374 }
375
376 host, err := s.relay.GetHost(ctx, hostname)
377 if err != nil {
378 return err
379 }
380
381 if host.Status != models.HostStatusActive {
382 if err := s.relay.UpdateHostStatus(ctx, host.ID, models.HostStatusActive); err != nil {
383 return err
384 }
385 }
386
387 // forward on to any sibling instances
388 go s.ForwardSiblingRequest(c, nil)
389
390 return c.JSON(http.StatusOK, map[string]any{
391 "success": "true",
392 })
393}
394
395type bannedDomains struct {
396 BannedDomains []string `json:"banned_domains"`
397}
398
399func (s *Service) handleAdminListDomainBans(c echo.Context) error {
400 ctx := c.Request().Context()
401
402 bans, err := s.relay.ListDomainBans(ctx)
403 if err != nil {
404 return err
405 }
406
407 resp := bannedDomains{
408 BannedDomains: make([]string, len(bans)),
409 }
410
411 for i, ban := range bans {
412 resp.BannedDomains[i] = ban.Domain
413 }
414
415 return c.JSON(http.StatusOK, resp)
416}
417
418type banDomainBody struct {
419 Domain string
420}
421
422func (s *Service) handleAdminBanDomain(c echo.Context) error {
423 ctx := c.Request().Context()
424
425 var body banDomainBody
426 if err := c.Bind(&body); err != nil {
427 return err
428 }
429
430 err := s.relay.CreateDomainBan(ctx, body.Domain)
431 if err != nil {
432 return err
433 }
434
435 // forward on to any sibling instances
436 b, err := json.Marshal(body)
437 if err != nil {
438 return err
439 }
440 go s.ForwardSiblingRequest(c, b)
441
442 return c.JSON(http.StatusOK, map[string]any{
443 "success": "true",
444 })
445}
446
447func (s *Service) handleAdminUnbanDomain(c echo.Context) error {
448 ctx := c.Request().Context()
449
450 var body banDomainBody
451 if err := c.Bind(&body); err != nil {
452 return err
453 }
454
455 err := s.relay.RemoveDomainBan(ctx, body.Domain)
456 if err != nil {
457 return err
458 }
459
460 // forward on to any sibling instances
461 b, err := json.Marshal(body)
462 if err != nil {
463 return err
464 }
465 go s.ForwardSiblingRequest(c, b)
466
467 return c.JSON(http.StatusOK, map[string]any{
468 "success": "true",
469 })
470}
471
472type RateLimitChangeRequest struct {
473 Hostname string `json:"host"`
474 RepoLimit *int64 `json:"repo_limit"`
475}
476
477func (s *Service) handleAdminChangeHostRateLimits(c echo.Context) error {
478 ctx := c.Request().Context()
479
480 var body RateLimitChangeRequest
481 if err := c.Bind(&body); err != nil {
482 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err))
483 }
484
485 hostname, _, err := relay.ParseHostname(body.Hostname)
486 if err != nil {
487 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid hostname: %s", err))
488 }
489
490 // catch empty/nil body
491 if body.RepoLimit == nil {
492 return echo.NewHTTPError(http.StatusBadRequest, "missing repo_limit parameter")
493 }
494
495 host, err := s.relay.GetHost(ctx, hostname)
496 if err != nil {
497 // TODO: technically, there could be a database error here or something
498 return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("unknown hostname: %s", err))
499 }
500
501 if err := s.relay.UpdateHostAccountLimit(ctx, host.ID, *body.RepoLimit); err != nil {
502 return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to update limits: %s", err))
503 }
504
505 // forward on to any sibling instances
506 b, err := json.Marshal(body)
507 if err != nil {
508 return err
509 }
510 go s.ForwardSiblingRequest(c, b)
511
512 return c.JSON(http.StatusOK, map[string]any{
513 "success": "true",
514 })
515}