this repo has no description
1package bgs
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "net/http"
8 "net/url"
9 "slices"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/bluesky-social/indigo/models"
15 "github.com/labstack/echo/v4"
16 dto "github.com/prometheus/client_model/go"
17 "go.opentelemetry.io/otel"
18 "golang.org/x/time/rate"
19 "gorm.io/gorm"
20)
21
22func (bgs *BGS) handleAdminSetSubsEnabled(e echo.Context) error {
23 enabled, err := strconv.ParseBool(e.QueryParam("enabled"))
24 if err != nil {
25 return &echo.HTTPError{
26 Code: 400,
27 Message: err.Error(),
28 }
29 }
30
31 return bgs.slurper.SetNewSubsDisabled(!enabled)
32}
33
34func (bgs *BGS) handleAdminGetSubsEnabled(e echo.Context) error {
35 return e.JSON(200, map[string]bool{
36 "enabled": !bgs.slurper.GetNewSubsDisabledState(),
37 })
38}
39
40func (bgs *BGS) handleAdminGetNewPDSPerDayRateLimit(e echo.Context) error {
41 limit := bgs.slurper.GetNewPDSPerDayLimit()
42 return e.JSON(200, map[string]int64{
43 "limit": limit,
44 })
45}
46
47func (bgs *BGS) handleAdminSetNewPDSPerDayRateLimit(e echo.Context) error {
48 limit, err := strconv.ParseInt(e.QueryParam("limit"), 10, 64)
49 if err != nil {
50 return &echo.HTTPError{
51 Code: 400,
52 Message: fmt.Errorf("failed to parse limit: %w", err).Error(),
53 }
54 }
55
56 err = bgs.slurper.SetNewPDSPerDayLimit(limit)
57 if err != nil {
58 return &echo.HTTPError{
59 Code: 500,
60 Message: fmt.Errorf("failed to set new PDS per day rate limit: %w", err).Error(),
61 }
62 }
63
64 return nil
65}
66
67func (bgs *BGS) handleAdminTakeDownRepo(e echo.Context) error {
68 ctx := e.Request().Context()
69
70 var body map[string]string
71 if err := e.Bind(&body); err != nil {
72 return err
73 }
74 did, ok := body["did"]
75 if !ok {
76 return &echo.HTTPError{
77 Code: 400,
78 Message: "must specify did parameter in body",
79 }
80 }
81
82 err := bgs.TakeDownRepo(ctx, did)
83 if err != nil {
84 if errors.Is(err, gorm.ErrRecordNotFound) {
85 return &echo.HTTPError{
86 Code: http.StatusNotFound,
87 Message: "repo not found",
88 }
89 }
90 return &echo.HTTPError{
91 Code: http.StatusInternalServerError,
92 Message: err.Error(),
93 }
94 }
95 return nil
96}
97
98func (bgs *BGS) handleAdminReverseTakedown(e echo.Context) error {
99 did := e.QueryParam("did")
100 ctx := e.Request().Context()
101 err := bgs.ReverseTakedown(ctx, did)
102
103 if err != nil {
104 if errors.Is(err, gorm.ErrRecordNotFound) {
105 return &echo.HTTPError{
106 Code: http.StatusNotFound,
107 Message: "repo not found",
108 }
109 }
110 return &echo.HTTPError{
111 Code: http.StatusInternalServerError,
112 Message: err.Error(),
113 }
114 }
115
116 return nil
117}
118
119type ListTakedownsResponse struct {
120 Dids []string `json:"dids"`
121 Cursor int64 `json:"cursor,omitempty"`
122}
123
124func (bgs *BGS) handleAdminListRepoTakeDowns(e echo.Context) error {
125 ctx := e.Request().Context()
126 haveMinId := false
127 minId := int64(-1)
128 qmin := e.QueryParam("cursor")
129 if qmin != "" {
130 tmin, err := strconv.ParseInt(qmin, 10, 64)
131 if err != nil {
132 return &echo.HTTPError{Code: 400, Message: "bad cursor"}
133 }
134 minId = tmin
135 haveMinId = true
136 }
137 limit := 1000
138 wat := bgs.db.Model(User{}).WithContext(ctx).Select("id", "did").Where("taken_down = TRUE")
139 if haveMinId {
140 wat = wat.Where("id > ?", minId)
141 }
142 //var users []User
143 rows, err := wat.Order("id").Limit(limit).Rows()
144 if err != nil {
145 return echo.NewHTTPError(http.StatusInternalServerError, "oops").WithInternal(err)
146 }
147 var out ListTakedownsResponse
148 for rows.Next() {
149 var id int64
150 var did string
151 err := rows.Scan(&id, &did)
152 if err != nil {
153 return echo.NewHTTPError(http.StatusInternalServerError, "oops").WithInternal(err)
154 }
155 out.Dids = append(out.Dids, did)
156 out.Cursor = id
157 }
158 if len(out.Dids) < limit {
159 out.Cursor = 0
160 }
161 return e.JSON(200, out)
162}
163
164func (bgs *BGS) handleAdminGetUpstreamConns(e echo.Context) error {
165 return e.JSON(200, bgs.slurper.GetActiveList())
166}
167
168type rateLimit struct {
169 Max float64 `json:"Max"`
170 WindowSeconds float64 `json:"Window"`
171}
172
173type enrichedPDS struct {
174 models.PDS
175 HasActiveConnection bool `json:"HasActiveConnection"`
176 EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"`
177 PerSecondEventRate rateLimit `json:"PerSecondEventRate"`
178 PerHourEventRate rateLimit `json:"PerHourEventRate"`
179 PerDayEventRate rateLimit `json:"PerDayEventRate"`
180 CrawlRate rateLimit `json:"CrawlRate"`
181 UserCount int64 `json:"UserCount"`
182}
183
184type UserCount struct {
185 PDSID uint `gorm:"column:pds"`
186 UserCount int64 `gorm:"column:user_count"`
187}
188
189func (bgs *BGS) handleListPDSs(e echo.Context) error {
190 var pds []models.PDS
191 if err := bgs.db.Find(&pds).Error; err != nil {
192 return err
193 }
194
195 enrichedPDSs := make([]enrichedPDS, len(pds))
196
197 activePDSHosts := bgs.slurper.GetActiveList()
198
199 for i, p := range pds {
200 enrichedPDSs[i].PDS = p
201 enrichedPDSs[i].HasActiveConnection = false
202 for _, host := range activePDSHosts {
203 if strings.ToLower(host) == strings.ToLower(p.Host) {
204 enrichedPDSs[i].HasActiveConnection = true
205 break
206 }
207 }
208 var m = &dto.Metric{}
209 if err := eventsReceivedCounter.WithLabelValues(p.Host).Write(m); err != nil {
210 enrichedPDSs[i].EventsSeenSinceStartup = 0
211 continue
212 }
213 enrichedPDSs[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue())
214
215 enrichedPDSs[i].PerSecondEventRate = rateLimit{
216 Max: p.RateLimit,
217 WindowSeconds: 1,
218 }
219
220 enrichedPDSs[i].PerHourEventRate = rateLimit{
221 Max: float64(p.HourlyEventLimit),
222 WindowSeconds: 3600,
223 }
224
225 enrichedPDSs[i].PerDayEventRate = rateLimit{
226 Max: float64(p.DailyEventLimit),
227 WindowSeconds: 86400,
228 }
229
230 // Get the crawl rate limit for this PDS
231 crawlRate := rateLimit{
232 Max: p.CrawlRateLimit,
233 WindowSeconds: 1,
234 }
235
236 enrichedPDSs[i].CrawlRate = crawlRate
237 }
238
239 return e.JSON(200, enrichedPDSs)
240}
241
242type consumer struct {
243 ID uint64 `json:"id"`
244 RemoteAddr string `json:"remote_addr"`
245 UserAgent string `json:"user_agent"`
246 EventsConsumed uint64 `json:"events_consumed"`
247 ConnectedAt time.Time `json:"connected_at"`
248}
249
250func (bgs *BGS) handleAdminListConsumers(e echo.Context) error {
251 bgs.consumersLk.RLock()
252 defer bgs.consumersLk.RUnlock()
253
254 consumers := make([]consumer, 0, len(bgs.consumers))
255 for id, c := range bgs.consumers {
256 var m = &dto.Metric{}
257 if err := c.EventsSent.Write(m); err != nil {
258 continue
259 }
260 consumers = append(consumers, consumer{
261 ID: id,
262 RemoteAddr: c.RemoteAddr,
263 UserAgent: c.UserAgent,
264 EventsConsumed: uint64(m.Counter.GetValue()),
265 ConnectedAt: c.ConnectedAt,
266 })
267 }
268
269 return e.JSON(200, consumers)
270}
271
272func (bgs *BGS) handleAdminKillUpstreamConn(e echo.Context) error {
273 host := strings.TrimSpace(e.QueryParam("host"))
274 if host == "" {
275 return &echo.HTTPError{
276 Code: 400,
277 Message: "must pass a valid host",
278 }
279 }
280
281 block := strings.ToLower(e.QueryParam("block")) == "true"
282
283 if err := bgs.slurper.KillUpstreamConnection(host, block); err != nil {
284 if errors.Is(err, ErrNoActiveConnection) {
285 return &echo.HTTPError{
286 Code: 400,
287 Message: "no active connection to given host",
288 }
289 }
290 return err
291 }
292
293 return e.JSON(200, map[string]any{
294 "success": "true",
295 })
296}
297
298func (bgs *BGS) handleBlockPDS(e echo.Context) error {
299 host := strings.TrimSpace(e.QueryParam("host"))
300 if host == "" {
301 return &echo.HTTPError{
302 Code: 400,
303 Message: "must pass a valid host",
304 }
305 }
306
307 // Set the block flag to true in the DB
308 if err := bgs.db.Model(&models.PDS{}).Where("host = ?", host).Update("blocked", true).Error; err != nil {
309 return err
310 }
311
312 // don't care if this errors, but we should try to disconnect something we just blocked
313 _ = bgs.slurper.KillUpstreamConnection(host, false)
314
315 return e.JSON(200, map[string]any{
316 "success": "true",
317 })
318}
319
320func (bgs *BGS) handleUnblockPDS(e echo.Context) error {
321 host := strings.TrimSpace(e.QueryParam("host"))
322 if host == "" {
323 return &echo.HTTPError{
324 Code: 400,
325 Message: "must pass a valid host",
326 }
327 }
328
329 // Set the block flag to false in the DB
330 if err := bgs.db.Model(&models.PDS{}).Where("host = ?", host).Update("blocked", false).Error; err != nil {
331 return err
332 }
333
334 return e.JSON(200, map[string]any{
335 "success": "true",
336 })
337}
338
339type bannedDomains struct {
340 BannedDomains []string `json:"banned_domains"`
341}
342
343func (bgs *BGS) handleAdminListDomainBans(c echo.Context) error {
344 var all []models.DomainBan
345 if err := bgs.db.Find(&all).Error; err != nil {
346 return err
347 }
348
349 resp := bannedDomains{
350 BannedDomains: []string{},
351 }
352 for _, b := range all {
353 resp.BannedDomains = append(resp.BannedDomains, b.Domain)
354 }
355
356 return c.JSON(200, resp)
357}
358
359type banDomainBody struct {
360 Domain string
361}
362
363func (bgs *BGS) handleAdminBanDomain(c echo.Context) error {
364 var body banDomainBody
365 if err := c.Bind(&body); err != nil {
366 return err
367 }
368
369 // Check if the domain is already banned
370 var existing models.DomainBan
371 if err := bgs.db.Where("domain = ?", body.Domain).First(&existing).Error; err == nil {
372 return &echo.HTTPError{
373 Code: 400,
374 Message: "domain is already banned",
375 }
376 }
377
378 if err := bgs.db.Create(&models.DomainBan{
379 Domain: body.Domain,
380 }).Error; err != nil {
381 return err
382 }
383
384 return c.JSON(200, map[string]any{
385 "success": "true",
386 })
387}
388
389func (bgs *BGS) handleAdminUnbanDomain(c echo.Context) error {
390 var body banDomainBody
391 if err := c.Bind(&body); err != nil {
392 return err
393 }
394
395 if err := bgs.db.Where("domain = ?", body.Domain).Delete(&models.DomainBan{}).Error; err != nil {
396 return err
397 }
398
399 return c.JSON(200, map[string]any{
400 "success": "true",
401 })
402}
403
404type PDSRates struct {
405 PerSecond int64 `json:"per_second,omitempty"`
406 PerHour int64 `json:"per_hour,omitempty"`
407 PerDay int64 `json:"per_day,omitempty"`
408 CrawlRate int64 `json:"crawl_rate,omitempty"`
409 RepoLimit int64 `json:"repo_limit,omitempty"`
410}
411
412func (pr *PDSRates) FromSlurper(s *Slurper) {
413 if pr.PerSecond == 0 {
414 pr.PerHour = s.DefaultPerSecondLimit
415 }
416 if pr.PerHour == 0 {
417 pr.PerHour = s.DefaultPerHourLimit
418 }
419 if pr.PerDay == 0 {
420 pr.PerDay = s.DefaultPerDayLimit
421 }
422 if pr.CrawlRate == 0 {
423 pr.CrawlRate = int64(s.DefaultCrawlLimit)
424 }
425 if pr.RepoLimit == 0 {
426 pr.RepoLimit = s.DefaultRepoLimit
427 }
428}
429
430type RateLimitChangeRequest struct {
431 Host string `json:"host"`
432 PDSRates
433}
434
435func (bgs *BGS) handleAdminChangePDSRateLimits(e echo.Context) error {
436 var body RateLimitChangeRequest
437 if err := e.Bind(&body); err != nil {
438 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err))
439 }
440
441 // Get the PDS from the DB
442 var pds models.PDS
443 if err := bgs.db.Where("host = ?", body.Host).First(&pds).Error; err != nil {
444 return err
445 }
446
447 // Update the rate limits in the DB
448 pds.RateLimit = float64(body.PerSecond)
449 pds.HourlyEventLimit = body.PerHour
450 pds.DailyEventLimit = body.PerDay
451 pds.CrawlRateLimit = float64(body.CrawlRate)
452 pds.RepoLimit = body.RepoLimit
453
454 if err := bgs.db.Save(&pds).Error; err != nil {
455 return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err))
456 }
457
458 // Update the rate limit in the limiter
459 limits := bgs.slurper.GetOrCreateLimiters(pds.ID, body.PerSecond, body.PerHour, body.PerDay)
460 limits.PerSecond.SetLimit(body.PerSecond)
461 limits.PerHour.SetLimit(body.PerHour)
462 limits.PerDay.SetLimit(body.PerDay)
463
464 // Set the crawl rate limit
465 bgs.repoFetcher.GetOrCreateLimiter(pds.ID, float64(body.CrawlRate)).SetLimit(rate.Limit(body.CrawlRate))
466
467 return e.JSON(200, map[string]any{
468 "success": "true",
469 })
470}
471
472func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error {
473 ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactRepo")
474 defer span.End()
475
476 did := e.QueryParam("did")
477 if did == "" {
478 return fmt.Errorf("must pass a did")
479 }
480
481 var fast bool
482 if strings.ToLower(e.QueryParam("fast")) == "true" {
483 fast = true
484 }
485
486 u, err := bgs.lookupUserByDid(ctx, did)
487 if err != nil {
488 return fmt.Errorf("no such user: %w", err)
489 }
490
491 stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID, fast)
492 if err != nil {
493 return fmt.Errorf("compaction failed: %w", err)
494 }
495
496 return e.JSON(200, map[string]any{
497 "success": "true",
498 "stats": stats,
499 })
500}
501
502func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error {
503 ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactAllRepos")
504 defer span.End()
505
506 var fast bool
507 if strings.ToLower(e.QueryParam("fast")) == "true" {
508 fast = true
509 }
510
511 lim := 50
512 if limstr := e.QueryParam("limit"); limstr != "" {
513 v, err := strconv.Atoi(limstr)
514 if err != nil {
515 return err
516 }
517
518 lim = v
519 }
520
521 shardThresh := 20
522 if threshstr := e.QueryParam("threshold"); threshstr != "" {
523 v, err := strconv.Atoi(threshstr)
524 if err != nil {
525 return err
526 }
527
528 shardThresh = v
529 }
530
531 err := bgs.compactor.EnqueueAllRepos(ctx, bgs, lim, shardThresh, fast)
532 if err != nil {
533 return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to enqueue all repos: %w", err))
534 }
535
536 return e.JSON(200, map[string]any{
537 "success": "true",
538 })
539}
540
541func (bgs *BGS) handleAdminPostResyncPDS(e echo.Context) error {
542 host := strings.TrimSpace(e.QueryParam("host"))
543 if host == "" {
544 return fmt.Errorf("must pass a host")
545 }
546
547 // Get the PDS from the DB
548 var pds models.PDS
549 if err := bgs.db.Where("host = ?", host).First(&pds).Error; err != nil {
550 return err
551 }
552
553 go func() {
554 ctx := context.Background()
555 err := bgs.ResyncPDS(ctx, pds)
556 if err != nil {
557 log.Error("failed to resync PDS", "err", err, "pds", pds.Host)
558 }
559 }()
560
561 return e.JSON(200, map[string]any{
562 "message": "resync started...",
563 })
564}
565
566func (bgs *BGS) handleAdminGetResyncPDS(e echo.Context) error {
567 host := strings.TrimSpace(e.QueryParam("host"))
568 if host == "" {
569 return fmt.Errorf("must pass a host")
570 }
571
572 // Get the PDS from the DB
573 var pds models.PDS
574 if err := bgs.db.Where("host = ?", host).First(&pds).Error; err != nil {
575 return err
576 }
577
578 resync, found := bgs.GetResync(pds)
579 if !found {
580 return &echo.HTTPError{
581 Code: 404,
582 Message: "no resync found for given PDS",
583 }
584 }
585
586 return e.JSON(200, map[string]any{
587 "resync": resync,
588 })
589}
590
591func (bgs *BGS) handleAdminResetRepo(e echo.Context) error {
592 ctx := e.Request().Context()
593
594 did := e.QueryParam("did")
595 if did == "" {
596 return fmt.Errorf("must pass a did")
597 }
598
599 ai, err := bgs.Index.LookupUserByDid(ctx, did)
600 if err != nil {
601 return fmt.Errorf("no such user: %w", err)
602 }
603
604 if err := bgs.repoman.ResetRepo(ctx, ai.Uid); err != nil {
605 return err
606 }
607
608 if err := bgs.Index.Crawler.Crawl(ctx, ai); err != nil {
609 return err
610 }
611
612 return e.JSON(200, map[string]any{
613 "success": true,
614 })
615}
616
617func (bgs *BGS) handleAdminVerifyRepo(e echo.Context) error {
618 ctx := e.Request().Context()
619
620 did := e.QueryParam("did")
621 if did == "" {
622 return fmt.Errorf("must pass a did")
623 }
624
625 ai, err := bgs.Index.LookupUserByDid(ctx, did)
626 if err != nil {
627 return fmt.Errorf("no such user: %w", err)
628 }
629
630 if err := bgs.repoman.VerifyRepo(ctx, ai.Uid); err != nil {
631 return err
632 }
633
634 return e.JSON(200, map[string]any{
635 "success": true,
636 })
637}
638
639func (bgs *BGS) handleAdminAddTrustedDomain(e echo.Context) error {
640 domain := e.QueryParam("domain")
641 if domain == "" {
642 return fmt.Errorf("must specify domain in query parameter")
643 }
644
645 // Check if the domain is already trusted
646 trustedDomains := bgs.slurper.GetTrustedDomains()
647 if slices.Contains(trustedDomains, domain) {
648 return &echo.HTTPError{
649 Code: 400,
650 Message: "domain is already trusted",
651 }
652 }
653
654 if err := bgs.slurper.AddTrustedDomain(domain); err != nil {
655 return err
656 }
657
658 return e.JSON(200, map[string]any{
659 "success": true,
660 })
661}
662
663type AdminRequestCrawlRequest struct {
664 Hostname string `json:"hostname"`
665
666 // optional:
667 PDSRates
668}
669
670func (bgs *BGS) handleAdminRequestCrawl(e echo.Context) error {
671 ctx := e.Request().Context()
672
673 var body AdminRequestCrawlRequest
674 if err := e.Bind(&body); err != nil {
675 return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err))
676 }
677
678 host := body.Hostname
679 if host == "" {
680 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname")
681 }
682
683 if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
684 if bgs.ssl {
685 host = "https://" + host
686 } else {
687 host = "http://" + host
688 }
689 }
690
691 u, err := url.Parse(host)
692 if err != nil {
693 return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname")
694 }
695
696 if u.Scheme == "http" && bgs.ssl {
697 return echo.NewHTTPError(http.StatusBadRequest, "this server requires https")
698 }
699
700 if u.Scheme == "https" && !bgs.ssl {
701 return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https")
702 }
703
704 if u.Path != "" {
705 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path")
706 }
707
708 if u.Query().Encode() != "" {
709 return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query")
710 }
711
712 host = u.Host // potentially hostname:port
713
714 banned, err := bgs.domainIsBanned(ctx, host)
715 if banned {
716 return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned")
717 }
718
719 // Skip checking if the server is online for now
720 rateOverrides := body.PDSRates
721 rateOverrides.FromSlurper(bgs.slurper)
722
723 return bgs.slurper.SubscribeToPds(ctx, host, true, true, &rateOverrides) // Override Trusted Domain Check
724}