porting all github actions from bluesky-social/indigo to tangled CI
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 515 lines 13 kB view raw
1package main 2 3import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "net/http" 8 "strconv" 9 "strings" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/cmd/relay/relay" 15 "github.com/bluesky-social/indigo/cmd/relay/relay/models" 16 17 "github.com/labstack/echo/v4" 18 dto "github.com/prometheus/client_model/go" 19) 20 21// This endpoint is basically the same as the regular com.atproto.sync.requestCrawl endpoint, except it sets a flag to bypass configuration checks. 22func (s *Service) handleAdminRequestCrawl(c echo.Context) error { 23 var body comatproto.SyncRequestCrawl_Input 24 if err := c.Bind(&body); err != nil { 25 return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Sprintf("invalid body: %s", err)} 26 } 27 28 return s.handleComAtprotoSyncRequestCrawl(c, &body, true) 29} 30 31func (s *Service) handleAdminSetSubsEnabled(c echo.Context) error { 32 enabled, err := strconv.ParseBool(c.QueryParam("enabled")) 33 if err != nil { 34 return &echo.HTTPError{Code: http.StatusBadRequest, Message: err.Error()} 35 } 36 s.config.DisableRequestCrawl = !enabled 37 return c.JSON(http.StatusOK, map[string]any{ 38 "success": "true", 39 }) 40} 41 42func (s *Service) handleAdminGetSubsEnabled(c echo.Context) error { 43 return c.JSON(http.StatusOK, map[string]bool{ 44 "enabled": !s.config.DisableRequestCrawl, 45 }) 46} 47 48func (s *Service) handleAdminGetNewHostPerDayRateLimit(c echo.Context) error { 49 return c.JSON(http.StatusOK, map[string]int64{ 50 "limit": s.relay.HostPerDayLimiter.Limit(), 51 }) 52} 53 54func (s *Service) handleAdminSetNewHostPerDayRateLimit(c echo.Context) error { 55 limit, err := strconv.ParseInt(c.QueryParam("limit"), 10, 64) 56 if err != nil { 57 return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Errorf("failed to parse limit: %w", err).Error()} 58 } 59 60 s.relay.HostPerDayLimiter.SetLimit(limit) 61 62 // NOTE: *not* forwarding to sibling instances 63 64 return c.JSON(http.StatusOK, map[string]any{ 65 "success": "true", 66 }) 67} 68 69func (s *Service) handleAdminTakeDownRepo(c echo.Context) error { 70 ctx := c.Request().Context() 71 72 var body map[string]string 73 if err := c.Bind(&body); err != nil { 74 return err 75 } 76 didField, ok := body["did"] 77 if !ok { 78 return &echo.HTTPError{ 79 Code: http.StatusBadRequest, 80 Message: "must specify DID parameter in body", 81 } 82 } 83 did, err := syntax.ParseDID(didField) 84 if err != nil { 85 return err 86 } 87 88 if err := s.relay.UpdateAccountLocalStatus(ctx, did, models.AccountStatusTakendown, true); err != nil { 89 if errors.Is(err, relay.ErrAccountNotFound) { 90 return &echo.HTTPError{ 91 Code: http.StatusNotFound, 92 Message: "account not found", 93 } 94 } 95 return &echo.HTTPError{ 96 Code: http.StatusInternalServerError, 97 Message: err.Error(), 98 } 99 } 100 101 // forward on to any sibling instances 102 b, err := json.Marshal(body) 103 if err != nil { 104 return err 105 } 106 go s.ForwardSiblingRequest(c, b) 107 108 return c.JSON(http.StatusOK, map[string]any{ 109 "success": "true", 110 }) 111} 112 113func (s *Service) handleAdminReverseTakedown(c echo.Context) error { 114 ctx := c.Request().Context() 115 116 var body map[string]string 117 if err := c.Bind(&body); err != nil { 118 return err 119 } 120 didField, ok := body["did"] 121 if !ok { 122 return &echo.HTTPError{ 123 Code: http.StatusBadRequest, 124 Message: "must specify DID parameter in body", 125 } 126 } 127 did, err := syntax.ParseDID(didField) 128 if err != nil { 129 return err 130 } 131 132 if err := s.relay.UpdateAccountLocalStatus(ctx, did, models.AccountStatusActive, true); err != nil { 133 if errors.Is(err, relay.ErrAccountNotFound) { 134 return &echo.HTTPError{ 135 Code: http.StatusNotFound, 136 Message: "repo not found", 137 } 138 } 139 return &echo.HTTPError{ 140 Code: http.StatusInternalServerError, 141 Message: err.Error(), 142 } 143 } 144 145 // forward on to any sibling instances 146 b, err := json.Marshal(body) 147 if err != nil { 148 return err 149 } 150 go s.ForwardSiblingRequest(c, b) 151 152 return c.JSON(http.StatusOK, map[string]any{ 153 "success": "true", 154 }) 155} 156 157type ListTakedownsResponse struct { 158 DIDs []string `json:"dids"` 159 Cursor int64 `json:"cursor,omitempty"` 160} 161 162func (s *Service) handleAdminListRepoTakeDowns(c echo.Context) error { 163 ctx := c.Request().Context() 164 var err error 165 166 limit := 500 167 cursor := int64(0) 168 cursorQuery := c.QueryParam("cursor") 169 if cursorQuery != "" { 170 cursor, err = strconv.ParseInt(cursorQuery, 10, 64) 171 if err != nil { 172 return &echo.HTTPError{Code: http.StatusBadRequest, Message: "invalid cursor param"} 173 } 174 } 175 176 accounts, err := s.relay.ListAccountTakedowns(ctx, cursor, limit) 177 if err != nil { 178 return &echo.HTTPError{Code: http.StatusInternalServerError, Message: "failed to list takedowns"} 179 } 180 181 out := ListTakedownsResponse{ 182 DIDs: make([]string, len(accounts)), 183 } 184 for i, acc := range accounts { 185 out.DIDs[i] = acc.DID 186 out.Cursor = int64(acc.UID) 187 } 188 if len(out.DIDs) < limit { 189 out.Cursor = 0 190 } 191 return c.JSON(http.StatusOK, out) 192} 193 194func (s *Service) handleAdminGetUpstreamConns(c echo.Context) error { 195 return c.JSON(http.StatusOK, s.relay.Slurper.GetActiveSubHostnames()) 196} 197 198type rateLimit struct { 199 Max float64 `json:"Max"` 200 WindowSeconds float64 `json:"Window"` 201} 202 203type hostInfo struct { 204 // fields from old models.PDS 205 ID uint64 206 CreatedAt time.Time 207 Host string 208 SSL bool 209 Cursor int64 210 Registered bool 211 Blocked bool 212 CrawlRateLimit float64 213 RepoCount int64 214 RepoLimit int64 215 216 HasActiveConnection bool `json:"HasActiveConnection"` 217 EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"` 218 PerSecondEventRate rateLimit `json:"PerSecondEventRate"` 219 PerHourEventRate rateLimit `json:"PerHourEventRate"` 220 PerDayEventRate rateLimit `json:"PerDayEventRate"` 221 UserCount int64 `json:"UserCount"` 222} 223 224func (s *Service) handleListHosts(c echo.Context) error { 225 ctx := c.Request().Context() 226 227 limit := 10_000 228 hosts, err := s.relay.ListHosts(ctx, 0, limit, false) 229 if err != nil { 230 return err 231 } 232 233 activeHostnames := s.relay.Slurper.GetActiveSubHostnames() 234 activeHosts := make(map[string]bool, len(activeHostnames)) 235 for _, hostname := range activeHostnames { 236 activeHosts[hostname] = true 237 } 238 239 hostInfos := make([]hostInfo, len(hosts)) 240 for i, host := range hosts { 241 _, isActive := activeHosts[host.Hostname] 242 hostInfos[i] = hostInfo{ 243 ID: host.ID, 244 CreatedAt: host.CreatedAt, 245 Host: host.Hostname, 246 SSL: !host.NoSSL, 247 Cursor: host.LastSeq, 248 Registered: host.Status == models.HostStatusActive, // is this right? 249 Blocked: host.Status == models.HostStatusBanned, 250 RepoCount: host.AccountCount, 251 RepoLimit: host.AccountLimit, 252 253 HasActiveConnection: isActive, 254 UserCount: host.AccountCount, 255 } 256 257 // fetch current rate limits 258 hostInfos[i].PerSecondEventRate = rateLimit{Max: -1.0, WindowSeconds: 1} 259 hostInfos[i].PerHourEventRate = rateLimit{Max: -1.0, WindowSeconds: 3600} 260 hostInfos[i].PerDayEventRate = rateLimit{Max: -1.0, WindowSeconds: 86400} 261 if isActive { 262 slc, err := s.relay.Slurper.GetLimits(host.Hostname) 263 if err != nil { 264 s.logger.Error("fetching subscribed host limits", "err", err) 265 } else { 266 hostInfos[i].PerSecondEventRate = rateLimit{ 267 Max: float64(slc.PerSecond), 268 WindowSeconds: 1, 269 } 270 hostInfos[i].PerHourEventRate = rateLimit{ 271 Max: float64(slc.PerHour), 272 WindowSeconds: 3600, 273 } 274 hostInfos[i].PerDayEventRate = rateLimit{ 275 Max: float64(slc.PerDay), 276 WindowSeconds: 86400, 277 } 278 } 279 } 280 281 // pull event counter metrics from prometheus 282 var m = &dto.Metric{} 283 if err := relay.EventsReceivedCounter.WithLabelValues(host.Hostname).Write(m); err != nil { 284 hostInfos[i].EventsSeenSinceStartup = 0 285 continue 286 } 287 hostInfos[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) 288 } 289 290 return c.JSON(http.StatusOK, hostInfos) 291} 292 293func (s *Service) handleAdminListConsumers(c echo.Context) error { 294 return c.JSON(http.StatusOK, s.relay.ListConsumers()) 295} 296 297func (s *Service) handleAdminKillUpstreamConn(c echo.Context) error { 298 ctx := c.Request().Context() 299 300 queryHost := strings.TrimSpace(c.QueryParam("host")) 301 hostname, _, err := relay.ParseHostname(queryHost) 302 if err != nil { 303 return &echo.HTTPError{ 304 Code: http.StatusBadRequest, 305 Message: "must pass a valid host", 306 } 307 } 308 309 banHost := strings.ToLower(c.QueryParam("block")) == "true" 310 311 // TODO: move this method to relay (for updating the database) 312 if err := s.relay.Slurper.KillUpstreamConnection(ctx, hostname, banHost); err != nil { 313 if errors.Is(err, relay.ErrHostInactive) { 314 return &echo.HTTPError{ 315 Code: http.StatusBadRequest, 316 Message: "no active connection to given host", 317 } 318 } 319 return err 320 } 321 322 // forward on to any sibling instances 323 go s.ForwardSiblingRequest(c, nil) 324 325 return c.JSON(http.StatusOK, map[string]any{ 326 "success": "true", 327 }) 328} 329 330func (s *Service) handleBlockHost(c echo.Context) error { 331 ctx := c.Request().Context() 332 333 queryHost := strings.TrimSpace(c.QueryParam("host")) 334 hostname, _, err := relay.ParseHostname(queryHost) 335 if err != nil { 336 return &echo.HTTPError{ 337 Code: http.StatusBadRequest, 338 Message: "must pass a valid hostname", 339 } 340 } 341 342 host, err := s.relay.GetHost(ctx, hostname) 343 if err != nil { 344 return err 345 } 346 347 if host.Status != models.HostStatusBanned { 348 if err := s.relay.UpdateHostStatus(ctx, host.ID, models.HostStatusBanned); err != nil { 349 return err 350 } 351 } 352 353 // kill any active connection (there may not be one, so ignore error) 354 _ = s.relay.Slurper.KillUpstreamConnection(ctx, host.Hostname, false) 355 356 // forward on to any sibling instances 357 go s.ForwardSiblingRequest(c, nil) 358 359 return c.JSON(http.StatusOK, map[string]any{ 360 "success": "true", 361 }) 362} 363 364func (s *Service) handleUnblockHost(c echo.Context) error { 365 ctx := c.Request().Context() 366 367 queryHost := strings.TrimSpace(c.QueryParam("host")) 368 hostname, _, err := relay.ParseHostname(queryHost) 369 if err != nil { 370 return &echo.HTTPError{ 371 Code: http.StatusBadRequest, 372 Message: "must pass a valid hostname", 373 } 374 } 375 376 host, err := s.relay.GetHost(ctx, hostname) 377 if err != nil { 378 return err 379 } 380 381 if host.Status != models.HostStatusActive { 382 if err := s.relay.UpdateHostStatus(ctx, host.ID, models.HostStatusActive); err != nil { 383 return err 384 } 385 } 386 387 // forward on to any sibling instances 388 go s.ForwardSiblingRequest(c, nil) 389 390 return c.JSON(http.StatusOK, map[string]any{ 391 "success": "true", 392 }) 393} 394 395type bannedDomains struct { 396 BannedDomains []string `json:"banned_domains"` 397} 398 399func (s *Service) handleAdminListDomainBans(c echo.Context) error { 400 ctx := c.Request().Context() 401 402 bans, err := s.relay.ListDomainBans(ctx) 403 if err != nil { 404 return err 405 } 406 407 resp := bannedDomains{ 408 BannedDomains: make([]string, len(bans)), 409 } 410 411 for i, ban := range bans { 412 resp.BannedDomains[i] = ban.Domain 413 } 414 415 return c.JSON(http.StatusOK, resp) 416} 417 418type banDomainBody struct { 419 Domain string 420} 421 422func (s *Service) handleAdminBanDomain(c echo.Context) error { 423 ctx := c.Request().Context() 424 425 var body banDomainBody 426 if err := c.Bind(&body); err != nil { 427 return err 428 } 429 430 err := s.relay.CreateDomainBan(ctx, body.Domain) 431 if err != nil { 432 return err 433 } 434 435 // forward on to any sibling instances 436 b, err := json.Marshal(body) 437 if err != nil { 438 return err 439 } 440 go s.ForwardSiblingRequest(c, b) 441 442 return c.JSON(http.StatusOK, map[string]any{ 443 "success": "true", 444 }) 445} 446 447func (s *Service) handleAdminUnbanDomain(c echo.Context) error { 448 ctx := c.Request().Context() 449 450 var body banDomainBody 451 if err := c.Bind(&body); err != nil { 452 return err 453 } 454 455 err := s.relay.RemoveDomainBan(ctx, body.Domain) 456 if err != nil { 457 return err 458 } 459 460 // forward on to any sibling instances 461 b, err := json.Marshal(body) 462 if err != nil { 463 return err 464 } 465 go s.ForwardSiblingRequest(c, b) 466 467 return c.JSON(http.StatusOK, map[string]any{ 468 "success": "true", 469 }) 470} 471 472type RateLimitChangeRequest struct { 473 Hostname string `json:"host"` 474 RepoLimit *int64 `json:"repo_limit"` 475} 476 477func (s *Service) handleAdminChangeHostRateLimits(c echo.Context) error { 478 ctx := c.Request().Context() 479 480 var body RateLimitChangeRequest 481 if err := c.Bind(&body); err != nil { 482 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err)) 483 } 484 485 hostname, _, err := relay.ParseHostname(body.Hostname) 486 if err != nil { 487 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid hostname: %s", err)) 488 } 489 490 // catch empty/nil body 491 if body.RepoLimit == nil { 492 return echo.NewHTTPError(http.StatusBadRequest, "missing repo_limit parameter") 493 } 494 495 host, err := s.relay.GetHost(ctx, hostname) 496 if err != nil { 497 // TODO: technically, there could be a database error here or something 498 return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("unknown hostname: %s", err)) 499 } 500 501 if err := s.relay.UpdateHostAccountLimit(ctx, host.ID, *body.RepoLimit); err != nil { 502 return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to update limits: %s", err)) 503 } 504 505 // forward on to any sibling instances 506 b, err := json.Marshal(body) 507 if err != nil { 508 return err 509 } 510 go s.ForwardSiblingRequest(c, b) 511 512 return c.JSON(http.StatusOK, map[string]any{ 513 "success": "true", 514 }) 515}