this repo has no description
at ozoneEventUpdate 724 lines 17 kB view raw
1package bgs 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "net/http" 8 "net/url" 9 "slices" 10 "strconv" 11 "strings" 12 "time" 13 14 "github.com/bluesky-social/indigo/models" 15 "github.com/labstack/echo/v4" 16 dto "github.com/prometheus/client_model/go" 17 "go.opentelemetry.io/otel" 18 "golang.org/x/time/rate" 19 "gorm.io/gorm" 20) 21 22func (bgs *BGS) handleAdminSetSubsEnabled(e echo.Context) error { 23 enabled, err := strconv.ParseBool(e.QueryParam("enabled")) 24 if err != nil { 25 return &echo.HTTPError{ 26 Code: 400, 27 Message: err.Error(), 28 } 29 } 30 31 return bgs.slurper.SetNewSubsDisabled(!enabled) 32} 33 34func (bgs *BGS) handleAdminGetSubsEnabled(e echo.Context) error { 35 return e.JSON(200, map[string]bool{ 36 "enabled": !bgs.slurper.GetNewSubsDisabledState(), 37 }) 38} 39 40func (bgs *BGS) handleAdminGetNewPDSPerDayRateLimit(e echo.Context) error { 41 limit := bgs.slurper.GetNewPDSPerDayLimit() 42 return e.JSON(200, map[string]int64{ 43 "limit": limit, 44 }) 45} 46 47func (bgs *BGS) handleAdminSetNewPDSPerDayRateLimit(e echo.Context) error { 48 limit, err := strconv.ParseInt(e.QueryParam("limit"), 10, 64) 49 if err != nil { 50 return &echo.HTTPError{ 51 Code: 400, 52 Message: fmt.Errorf("failed to parse limit: %w", err).Error(), 53 } 54 } 55 56 err = bgs.slurper.SetNewPDSPerDayLimit(limit) 57 if err != nil { 58 return &echo.HTTPError{ 59 Code: 500, 60 Message: fmt.Errorf("failed to set new PDS per day rate limit: %w", err).Error(), 61 } 62 } 63 64 return nil 65} 66 67func (bgs *BGS) handleAdminTakeDownRepo(e echo.Context) error { 68 ctx := e.Request().Context() 69 70 var body map[string]string 71 if err := e.Bind(&body); err != nil { 72 return err 73 } 74 did, ok := body["did"] 75 if !ok { 76 return &echo.HTTPError{ 77 Code: 400, 78 Message: "must specify did parameter in body", 79 } 80 } 81 82 err := bgs.TakeDownRepo(ctx, did) 83 if err != nil { 84 if errors.Is(err, gorm.ErrRecordNotFound) { 85 return &echo.HTTPError{ 86 Code: http.StatusNotFound, 87 Message: "repo not found", 88 } 89 } 90 return &echo.HTTPError{ 91 Code: http.StatusInternalServerError, 92 Message: err.Error(), 93 } 94 } 95 return nil 96} 97 98func (bgs *BGS) handleAdminReverseTakedown(e echo.Context) error { 99 did := e.QueryParam("did") 100 ctx := e.Request().Context() 101 err := bgs.ReverseTakedown(ctx, did) 102 103 if err != nil { 104 if errors.Is(err, gorm.ErrRecordNotFound) { 105 return &echo.HTTPError{ 106 Code: http.StatusNotFound, 107 Message: "repo not found", 108 } 109 } 110 return &echo.HTTPError{ 111 Code: http.StatusInternalServerError, 112 Message: err.Error(), 113 } 114 } 115 116 return nil 117} 118 119type ListTakedownsResponse struct { 120 Dids []string `json:"dids"` 121 Cursor int64 `json:"cursor,omitempty"` 122} 123 124func (bgs *BGS) handleAdminListRepoTakeDowns(e echo.Context) error { 125 ctx := e.Request().Context() 126 haveMinId := false 127 minId := int64(-1) 128 qmin := e.QueryParam("cursor") 129 if qmin != "" { 130 tmin, err := strconv.ParseInt(qmin, 10, 64) 131 if err != nil { 132 return &echo.HTTPError{Code: 400, Message: "bad cursor"} 133 } 134 minId = tmin 135 haveMinId = true 136 } 137 limit := 1000 138 wat := bgs.db.Model(User{}).WithContext(ctx).Select("id", "did").Where("taken_down = TRUE") 139 if haveMinId { 140 wat = wat.Where("id > ?", minId) 141 } 142 //var users []User 143 rows, err := wat.Order("id").Limit(limit).Rows() 144 if err != nil { 145 return echo.NewHTTPError(http.StatusInternalServerError, "oops").WithInternal(err) 146 } 147 var out ListTakedownsResponse 148 for rows.Next() { 149 var id int64 150 var did string 151 err := rows.Scan(&id, &did) 152 if err != nil { 153 return echo.NewHTTPError(http.StatusInternalServerError, "oops").WithInternal(err) 154 } 155 out.Dids = append(out.Dids, did) 156 out.Cursor = id 157 } 158 if len(out.Dids) < limit { 159 out.Cursor = 0 160 } 161 return e.JSON(200, out) 162} 163 164func (bgs *BGS) handleAdminGetUpstreamConns(e echo.Context) error { 165 return e.JSON(200, bgs.slurper.GetActiveList()) 166} 167 168type rateLimit struct { 169 Max float64 `json:"Max"` 170 WindowSeconds float64 `json:"Window"` 171} 172 173type enrichedPDS struct { 174 models.PDS 175 HasActiveConnection bool `json:"HasActiveConnection"` 176 EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"` 177 PerSecondEventRate rateLimit `json:"PerSecondEventRate"` 178 PerHourEventRate rateLimit `json:"PerHourEventRate"` 179 PerDayEventRate rateLimit `json:"PerDayEventRate"` 180 CrawlRate rateLimit `json:"CrawlRate"` 181 UserCount int64 `json:"UserCount"` 182} 183 184type UserCount struct { 185 PDSID uint `gorm:"column:pds"` 186 UserCount int64 `gorm:"column:user_count"` 187} 188 189func (bgs *BGS) handleListPDSs(e echo.Context) error { 190 var pds []models.PDS 191 if err := bgs.db.Find(&pds).Error; err != nil { 192 return err 193 } 194 195 enrichedPDSs := make([]enrichedPDS, len(pds)) 196 197 activePDSHosts := bgs.slurper.GetActiveList() 198 199 for i, p := range pds { 200 enrichedPDSs[i].PDS = p 201 enrichedPDSs[i].HasActiveConnection = false 202 for _, host := range activePDSHosts { 203 if strings.ToLower(host) == strings.ToLower(p.Host) { 204 enrichedPDSs[i].HasActiveConnection = true 205 break 206 } 207 } 208 var m = &dto.Metric{} 209 if err := eventsReceivedCounter.WithLabelValues(p.Host).Write(m); err != nil { 210 enrichedPDSs[i].EventsSeenSinceStartup = 0 211 continue 212 } 213 enrichedPDSs[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) 214 215 enrichedPDSs[i].PerSecondEventRate = rateLimit{ 216 Max: p.RateLimit, 217 WindowSeconds: 1, 218 } 219 220 enrichedPDSs[i].PerHourEventRate = rateLimit{ 221 Max: float64(p.HourlyEventLimit), 222 WindowSeconds: 3600, 223 } 224 225 enrichedPDSs[i].PerDayEventRate = rateLimit{ 226 Max: float64(p.DailyEventLimit), 227 WindowSeconds: 86400, 228 } 229 230 // Get the crawl rate limit for this PDS 231 crawlRate := rateLimit{ 232 Max: p.CrawlRateLimit, 233 WindowSeconds: 1, 234 } 235 236 enrichedPDSs[i].CrawlRate = crawlRate 237 } 238 239 return e.JSON(200, enrichedPDSs) 240} 241 242type consumer struct { 243 ID uint64 `json:"id"` 244 RemoteAddr string `json:"remote_addr"` 245 UserAgent string `json:"user_agent"` 246 EventsConsumed uint64 `json:"events_consumed"` 247 ConnectedAt time.Time `json:"connected_at"` 248} 249 250func (bgs *BGS) handleAdminListConsumers(e echo.Context) error { 251 bgs.consumersLk.RLock() 252 defer bgs.consumersLk.RUnlock() 253 254 consumers := make([]consumer, 0, len(bgs.consumers)) 255 for id, c := range bgs.consumers { 256 var m = &dto.Metric{} 257 if err := c.EventsSent.Write(m); err != nil { 258 continue 259 } 260 consumers = append(consumers, consumer{ 261 ID: id, 262 RemoteAddr: c.RemoteAddr, 263 UserAgent: c.UserAgent, 264 EventsConsumed: uint64(m.Counter.GetValue()), 265 ConnectedAt: c.ConnectedAt, 266 }) 267 } 268 269 return e.JSON(200, consumers) 270} 271 272func (bgs *BGS) handleAdminKillUpstreamConn(e echo.Context) error { 273 host := strings.TrimSpace(e.QueryParam("host")) 274 if host == "" { 275 return &echo.HTTPError{ 276 Code: 400, 277 Message: "must pass a valid host", 278 } 279 } 280 281 block := strings.ToLower(e.QueryParam("block")) == "true" 282 283 if err := bgs.slurper.KillUpstreamConnection(host, block); err != nil { 284 if errors.Is(err, ErrNoActiveConnection) { 285 return &echo.HTTPError{ 286 Code: 400, 287 Message: "no active connection to given host", 288 } 289 } 290 return err 291 } 292 293 return e.JSON(200, map[string]any{ 294 "success": "true", 295 }) 296} 297 298func (bgs *BGS) handleBlockPDS(e echo.Context) error { 299 host := strings.TrimSpace(e.QueryParam("host")) 300 if host == "" { 301 return &echo.HTTPError{ 302 Code: 400, 303 Message: "must pass a valid host", 304 } 305 } 306 307 // Set the block flag to true in the DB 308 if err := bgs.db.Model(&models.PDS{}).Where("host = ?", host).Update("blocked", true).Error; err != nil { 309 return err 310 } 311 312 // don't care if this errors, but we should try to disconnect something we just blocked 313 _ = bgs.slurper.KillUpstreamConnection(host, false) 314 315 return e.JSON(200, map[string]any{ 316 "success": "true", 317 }) 318} 319 320func (bgs *BGS) handleUnblockPDS(e echo.Context) error { 321 host := strings.TrimSpace(e.QueryParam("host")) 322 if host == "" { 323 return &echo.HTTPError{ 324 Code: 400, 325 Message: "must pass a valid host", 326 } 327 } 328 329 // Set the block flag to false in the DB 330 if err := bgs.db.Model(&models.PDS{}).Where("host = ?", host).Update("blocked", false).Error; err != nil { 331 return err 332 } 333 334 return e.JSON(200, map[string]any{ 335 "success": "true", 336 }) 337} 338 339type bannedDomains struct { 340 BannedDomains []string `json:"banned_domains"` 341} 342 343func (bgs *BGS) handleAdminListDomainBans(c echo.Context) error { 344 var all []models.DomainBan 345 if err := bgs.db.Find(&all).Error; err != nil { 346 return err 347 } 348 349 resp := bannedDomains{ 350 BannedDomains: []string{}, 351 } 352 for _, b := range all { 353 resp.BannedDomains = append(resp.BannedDomains, b.Domain) 354 } 355 356 return c.JSON(200, resp) 357} 358 359type banDomainBody struct { 360 Domain string 361} 362 363func (bgs *BGS) handleAdminBanDomain(c echo.Context) error { 364 var body banDomainBody 365 if err := c.Bind(&body); err != nil { 366 return err 367 } 368 369 // Check if the domain is already banned 370 var existing models.DomainBan 371 if err := bgs.db.Where("domain = ?", body.Domain).First(&existing).Error; err == nil { 372 return &echo.HTTPError{ 373 Code: 400, 374 Message: "domain is already banned", 375 } 376 } 377 378 if err := bgs.db.Create(&models.DomainBan{ 379 Domain: body.Domain, 380 }).Error; err != nil { 381 return err 382 } 383 384 return c.JSON(200, map[string]any{ 385 "success": "true", 386 }) 387} 388 389func (bgs *BGS) handleAdminUnbanDomain(c echo.Context) error { 390 var body banDomainBody 391 if err := c.Bind(&body); err != nil { 392 return err 393 } 394 395 if err := bgs.db.Where("domain = ?", body.Domain).Delete(&models.DomainBan{}).Error; err != nil { 396 return err 397 } 398 399 return c.JSON(200, map[string]any{ 400 "success": "true", 401 }) 402} 403 404type PDSRates struct { 405 PerSecond int64 `json:"per_second,omitempty"` 406 PerHour int64 `json:"per_hour,omitempty"` 407 PerDay int64 `json:"per_day,omitempty"` 408 CrawlRate int64 `json:"crawl_rate,omitempty"` 409 RepoLimit int64 `json:"repo_limit,omitempty"` 410} 411 412func (pr *PDSRates) FromSlurper(s *Slurper) { 413 if pr.PerSecond == 0 { 414 pr.PerHour = s.DefaultPerSecondLimit 415 } 416 if pr.PerHour == 0 { 417 pr.PerHour = s.DefaultPerHourLimit 418 } 419 if pr.PerDay == 0 { 420 pr.PerDay = s.DefaultPerDayLimit 421 } 422 if pr.CrawlRate == 0 { 423 pr.CrawlRate = int64(s.DefaultCrawlLimit) 424 } 425 if pr.RepoLimit == 0 { 426 pr.RepoLimit = s.DefaultRepoLimit 427 } 428} 429 430type RateLimitChangeRequest struct { 431 Host string `json:"host"` 432 PDSRates 433} 434 435func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error { 436 var body RateLimitChangeRequest 437 if err := e.Bind(&body); err != nil { 438 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err)) 439 } 440 441 // Get the PDS from the DB 442 var pds models.PDS 443 if err := bgs.db.Where("host = ?", body.Host).First(&pds).Error; err != nil { 444 return err 445 } 446 447 // Update the rate limits in the DB 448 pds.RateLimit = float64(body.PerSecond) 449 pds.HourlyEventLimit = body.PerHour 450 pds.DailyEventLimit = body.PerDay 451 pds.CrawlRateLimit = float64(body.CrawlRate) 452 pds.RepoLimit = body.RepoLimit 453 454 if err := bgs.db.Save(&pds).Error; err != nil { 455 return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err)) 456 } 457 458 // Update the rate limit in the limiter 459 limits := bgs.slurper.GetOrCreateLimiters(pds.ID, body.PerSecond, body.PerHour, body.PerDay) 460 limits.PerSecond.SetLimit(body.PerSecond) 461 limits.PerHour.SetLimit(body.PerHour) 462 limits.PerDay.SetLimit(body.PerDay) 463 464 // Set the crawl rate limit 465 bgs.repoFetcher.GetOrCreateLimiter(pds.ID, float64(body.CrawlRate)).SetLimit(rate.Limit(body.CrawlRate)) 466 467 return e.JSON(200, map[string]any{ 468 "success": "true", 469 }) 470} 471 472func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error { 473 ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactRepo") 474 defer span.End() 475 476 did := e.QueryParam("did") 477 if did == "" { 478 return fmt.Errorf("must pass a did") 479 } 480 481 var fast bool 482 if strings.ToLower(e.QueryParam("fast")) == "true" { 483 fast = true 484 } 485 486 u, err := bgs.lookupUserByDid(ctx, did) 487 if err != nil { 488 return fmt.Errorf("no such user: %w", err) 489 } 490 491 stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID, fast) 492 if err != nil { 493 return fmt.Errorf("compaction failed: %w", err) 494 } 495 496 return e.JSON(200, map[string]any{ 497 "success": "true", 498 "stats": stats, 499 }) 500} 501 502func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error { 503 ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactAllRepos") 504 defer span.End() 505 506 var fast bool 507 if strings.ToLower(e.QueryParam("fast")) == "true" { 508 fast = true 509 } 510 511 lim := 50 512 if limstr := e.QueryParam("limit"); limstr != "" { 513 v, err := strconv.Atoi(limstr) 514 if err != nil { 515 return err 516 } 517 518 lim = v 519 } 520 521 shardThresh := 20 522 if threshstr := e.QueryParam("threshold"); threshstr != "" { 523 v, err := strconv.Atoi(threshstr) 524 if err != nil { 525 return err 526 } 527 528 shardThresh = v 529 } 530 531 err := bgs.compactor.EnqueueAllRepos(ctx, bgs, lim, shardThresh, fast) 532 if err != nil { 533 return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to enqueue all repos: %w", err)) 534 } 535 536 return e.JSON(200, map[string]any{ 537 "success": "true", 538 }) 539} 540 541func (bgs *BGS) handleAdminPostResyncPDS(e echo.Context) error { 542 host := strings.TrimSpace(e.QueryParam("host")) 543 if host == "" { 544 return fmt.Errorf("must pass a host") 545 } 546 547 // Get the PDS from the DB 548 var pds models.PDS 549 if err := bgs.db.Where("host = ?", host).First(&pds).Error; err != nil { 550 return err 551 } 552 553 go func() { 554 ctx := context.Background() 555 err := bgs.ResyncPDS(ctx, pds) 556 if err != nil { 557 log.Error("failed to resync PDS", "err", err, "pds", pds.Host) 558 } 559 }() 560 561 return e.JSON(200, map[string]any{ 562 "message": "resync started...", 563 }) 564} 565 566func (bgs *BGS) handleAdminGetResyncPDS(e echo.Context) error { 567 host := strings.TrimSpace(e.QueryParam("host")) 568 if host == "" { 569 return fmt.Errorf("must pass a host") 570 } 571 572 // Get the PDS from the DB 573 var pds models.PDS 574 if err := bgs.db.Where("host = ?", host).First(&pds).Error; err != nil { 575 return err 576 } 577 578 resync, found := bgs.GetResync(pds) 579 if !found { 580 return &echo.HTTPError{ 581 Code: 404, 582 Message: "no resync found for given PDS", 583 } 584 } 585 586 return e.JSON(200, map[string]any{ 587 "resync": resync, 588 }) 589} 590 591func (bgs *BGS) handleAdminResetRepo(e echo.Context) error { 592 ctx := e.Request().Context() 593 594 did := e.QueryParam("did") 595 if did == "" { 596 return fmt.Errorf("must pass a did") 597 } 598 599 ai, err := bgs.Index.LookupUserByDid(ctx, did) 600 if err != nil { 601 return fmt.Errorf("no such user: %w", err) 602 } 603 604 if err := bgs.repoman.ResetRepo(ctx, ai.Uid); err != nil { 605 return err 606 } 607 608 if err := bgs.Index.Crawler.Crawl(ctx, ai); err != nil { 609 return err 610 } 611 612 return e.JSON(200, map[string]any{ 613 "success": true, 614 }) 615} 616 617func (bgs *BGS) handleAdminVerifyRepo(e echo.Context) error { 618 ctx := e.Request().Context() 619 620 did := e.QueryParam("did") 621 if did == "" { 622 return fmt.Errorf("must pass a did") 623 } 624 625 ai, err := bgs.Index.LookupUserByDid(ctx, did) 626 if err != nil { 627 return fmt.Errorf("no such user: %w", err) 628 } 629 630 if err := bgs.repoman.VerifyRepo(ctx, ai.Uid); err != nil { 631 return err 632 } 633 634 return e.JSON(200, map[string]any{ 635 "success": true, 636 }) 637} 638 639func (bgs *BGS) handleAdminAddTrustedDomain(e echo.Context) error { 640 domain := e.QueryParam("domain") 641 if domain == "" { 642 return fmt.Errorf("must specify domain in query parameter") 643 } 644 645 // Check if the domain is already trusted 646 trustedDomains := bgs.slurper.GetTrustedDomains() 647 if slices.Contains(trustedDomains, domain) { 648 return &echo.HTTPError{ 649 Code: 400, 650 Message: "domain is already trusted", 651 } 652 } 653 654 if err := bgs.slurper.AddTrustedDomain(domain); err != nil { 655 return err 656 } 657 658 return e.JSON(200, map[string]any{ 659 "success": true, 660 }) 661} 662 663type AdminRequestCrawlRequest struct { 664 Hostname string `json:"hostname"` 665 666 // optional: 667 PDSRates 668} 669 670func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error { 671 ctx := e.Request().Context() 672 673 var body AdminRequestCrawlRequest 674 if err := e.Bind(&body); err != nil { 675 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err)) 676 } 677 678 host := body.Hostname 679 if host == "" { 680 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") 681 } 682 683 if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 684 if bgs.ssl { 685 host = "https://" + host 686 } else { 687 host = "http://" + host 688 } 689 } 690 691 u, err := url.Parse(host) 692 if err != nil { 693 return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 694 } 695 696 if u.Scheme == "http" && bgs.ssl { 697 return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 698 } 699 700 if u.Scheme == "https" && !bgs.ssl { 701 return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") 702 } 703 704 if u.Path != "" { 705 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") 706 } 707 708 if u.Query().Encode() != "" { 709 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") 710 } 711 712 host = u.Host // potentially hostname:port 713 714 banned, err := bgs.domainIsBanned(ctx, host) 715 if banned { 716 return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned") 717 } 718 719 // Skip checking if the server is online for now 720 rateOverrides := body.PDSRates 721 rateOverrides.FromSlurper(bgs.slurper) 722 723 return bgs.slurper.SubscribeToPds(ctx, host, true, true, &rateOverrides) // Override Trusted Domain Check 724}