porting all github actions from bluesky-social/indigo to tangled CI
at ci 18 kB view raw
1package pds 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net" 10 "net/http" 11 "net/mail" 12 "net/url" 13 "strings" 14 "time" 15 16 "github.com/bluesky-social/indigo/api/atproto" 17 comatproto "github.com/bluesky-social/indigo/api/atproto" 18 "github.com/bluesky-social/indigo/carstore" 19 "github.com/bluesky-social/indigo/events" 20 "github.com/bluesky-social/indigo/indexer" 21 lexutil "github.com/bluesky-social/indigo/lex/util" 22 "github.com/bluesky-social/indigo/models" 23 pdsdata "github.com/bluesky-social/indigo/pds/data" 24 "github.com/bluesky-social/indigo/plc" 25 "github.com/bluesky-social/indigo/repomgr" 26 "github.com/bluesky-social/indigo/util" 27 "github.com/bluesky-social/indigo/xrpc" 28 gojwt "github.com/golang-jwt/jwt" 29 "github.com/gorilla/websocket" 30 "github.com/labstack/echo/v4" 31 "github.com/labstack/echo/v4/middleware" 32 "github.com/lestrrat-go/jwx/v2/jwt" 33 "github.com/whyrusleeping/go-did" 34 "gorm.io/gorm" 35) 36 37type Server struct { 38 db *gorm.DB 39 cs carstore.CarStore 40 repoman *repomgr.RepoManager 41 indexer *indexer.Indexer 42 events *events.EventManager 43 signingKey *did.PrivKey 44 echo *echo.Echo 45 jwtSigningKey []byte 46 enforcePeering bool 47 48 handleSuffix string 49 serviceUrl string 50 51 plc plc.PLCClient 52 53 log *slog.Logger 54} 55 56// serverListenerBootTimeout is how long to wait for the requested server socket 57// to become available for use. This is an arbitrary timeout that should be safe 58// on any platform, but there's no great way to weave this timeout without 59// adding another parameter to the (at time of writing) long signature of 60// NewServer. 61const serverListenerBootTimeout = 5 * time.Second 62 63func NewServer(db *gorm.DB, cs carstore.CarStore, serkey *did.PrivKey, handleSuffix, serviceUrl string, didr plc.PLCClient, jwtkey []byte) (*Server, error) { 64 db.AutoMigrate(&User{}) 65 db.AutoMigrate(&Peering{}) 66 67 evtman := events.NewEventManager(events.NewMemPersister()) 68 69 kmgr := indexer.NewKeyManager(didr, serkey) 70 71 repoman := repomgr.NewRepoManager(cs, kmgr) 72 73 rf := indexer.NewRepoFetcher(db, repoman, 10) 74 75 ix, err := indexer.NewIndexer(db, evtman, didr, rf, false) 76 if err != nil { 77 return nil, err 78 } 79 80 s := &Server{ 81 signingKey: serkey, 82 db: db, 83 cs: cs, 84 indexer: ix, 85 plc: didr, 86 events: evtman, 87 repoman: repoman, 88 handleSuffix: handleSuffix, 89 serviceUrl: serviceUrl, 90 jwtSigningKey: jwtkey, 91 enforcePeering: false, 92 93 log: slog.Default().With("system", "pds"), 94 } 95 96 repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) { 97 if err := ix.HandleRepoEvent(ctx, evt); err != nil { 98 s.log.Error("handle repo event failed", "user", evt.User, "err", err) 99 } 100 }, true) 101 102 //ix.SendRemoteFollow = s.sendRemoteFollow 103 ix.CreateExternalUser = s.createExternalUser 104 105 return s, nil 106} 107 108func (s *Server) Shutdown(ctx context.Context) error { 109 return s.echo.Shutdown(ctx) 110} 111 112func (s *Server) createExternalUser(ctx context.Context, did string) (*models.ActorInfo, error) { 113 doc, err := s.plc.GetDocument(ctx, did) 114 if err != nil { 115 return nil, fmt.Errorf("could not locate DID document for followed user: %s", err) 116 } 117 118 if len(doc.Service) == 0 { 119 return nil, fmt.Errorf("external followed user %s had no services in did document", did) 120 } 121 122 svc := doc.Service[0] 123 durl, err := url.Parse(svc.ServiceEndpoint) 124 if err != nil { 125 return nil, err 126 } 127 128 // TODO: the PDS's DID should also be in the service, we could use that to look up? 129 var peering Peering 130 if err := s.db.Find(&peering, "host = ?", durl.Host).Error; err != nil { 131 return nil, err 132 } 133 134 c := &xrpc.Client{Host: svc.ServiceEndpoint} 135 136 if peering.ID == 0 { 137 cfg, err := atproto.ServerDescribeServer(ctx, c) 138 if err != nil { 139 // TODO: failing this should not halt our indexing 140 return nil, fmt.Errorf("failed to check unrecognized pds: %w", err) 141 } 142 143 // since handles can be anything, checking against this list does not matter... 144 _ = cfg 145 146 // TODO: could check other things, a valid response is good enough for now 147 peering.Host = svc.ServiceEndpoint 148 149 if err := s.db.Create(&peering).Error; err != nil { 150 return nil, err 151 } 152 } 153 154 var handle string 155 if len(doc.AlsoKnownAs) > 0 { 156 hurl, err := url.Parse(doc.AlsoKnownAs[0]) 157 if err != nil { 158 return nil, err 159 } 160 161 handle = hurl.Host 162 } 163 164 // TODO: request this users info from their server to fill out our data... 165 u := User{ 166 Handle: handle, 167 Did: did, 168 PDS: peering.ID, 169 } 170 171 if err := s.db.Create(&u).Error; err != nil { 172 return nil, fmt.Errorf("failed to create other pds user: %w", err) 173 } 174 175 // okay cool, its a user on a server we are peered with 176 // lets make a local record of that user for the future 177 subj := &models.ActorInfo{ 178 Uid: u.ID, 179 Handle: sql.NullString{String: handle, Valid: true}, 180 DisplayName: "missing display name", 181 Did: did, 182 Type: "", 183 PDS: peering.ID, 184 } 185 if err := s.db.Create(subj).Error; err != nil { 186 return nil, err 187 } 188 189 return subj, nil 190} 191 192func (s *Server) RunAPI(addr string) error { 193 var lc net.ListenConfig 194 ctx, cancel := context.WithTimeout(context.Background(), serverListenerBootTimeout) 195 defer cancel() 196 197 li, err := lc.Listen(ctx, "tcp", addr) 198 if err != nil { 199 return err 200 } 201 return s.RunAPIWithListener(li) 202} 203 204func (s *Server) RunAPIWithListener(listen net.Listener) error { 205 e := echo.New() 206 s.echo = e 207 e.HideBanner = true 208 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ 209 Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n", 210 })) 211 212 cfg := middleware.JWTConfig{ 213 Skipper: func(c echo.Context) bool { 214 switch c.Path() { 215 case "/xrpc/_health": 216 return true 217 case "/xrpc/com.atproto.sync.subscribeRepos": 218 return true 219 case "/xrpc/com.atproto.account.create": 220 return true 221 case "/xrpc/com.atproto.identity.resolveHandle": 222 return true 223 case "/xrpc/com.atproto.server.createAccount": 224 return true 225 case "/xrpc/com.atproto.server.createSession": 226 return true 227 case "/xrpc/com.atproto.server.describeServer": 228 return true 229 case "/xrpc/com.atproto.sync.getRepo": 230 fmt.Println("TODO: currently not requiring auth on get repo endpoint") 231 return true 232 case "/xrpc/com.atproto.peering.follow", "/events": 233 auth := c.Request().Header.Get("Authorization") 234 235 did := c.Request().Header.Get("DID") 236 ctx := c.Request().Context() 237 ctx = context.WithValue(ctx, "did", did) 238 ctx = context.WithValue(ctx, "auth", auth) 239 c.SetRequest(c.Request().WithContext(ctx)) 240 return true 241 case "/.well-known/atproto-did": 242 return true 243 case "/takedownRepo": 244 return true 245 case "/suspendRepo": 246 return true 247 case "/deactivateRepo": 248 return true 249 case "/reactivateRepo": 250 return true 251 default: 252 return false 253 } 254 }, 255 SigningKey: s.jwtSigningKey, 256 } 257 258 e.HTTPErrorHandler = func(err error, ctx echo.Context) { 259 fmt.Printf("PDS HANDLER ERROR: (%s) %s\n", ctx.Path(), err) 260 261 // TODO: need to properly figure out where http error codes for error 262 // types get decided. This spot is reasonable, but maybe a bit weird. 263 // reviewers, please advise 264 if errors.Is(err, ErrNoSuchUser) { 265 ctx.Response().WriteHeader(404) 266 return 267 } 268 269 ctx.Response().WriteHeader(500) 270 } 271 272 e.GET("/takedownRepo", func(c echo.Context) error { 273 ctx := c.Request().Context() 274 did := c.QueryParam("did") 275 if did == "" { 276 return fmt.Errorf("missing did") 277 } 278 279 if err := s.TakedownRepo(ctx, did); err != nil { 280 return err 281 } 282 283 return c.String(200, "ok") 284 }) 285 286 e.GET("/suspendRepo", func(c echo.Context) error { 287 ctx := c.Request().Context() 288 did := c.QueryParam("did") 289 if did == "" { 290 return fmt.Errorf("missing did") 291 } 292 293 if err := s.SuspendRepo(ctx, did); err != nil { 294 return err 295 } 296 297 return c.String(200, "ok") 298 }) 299 300 e.GET("/deactivateRepo", func(c echo.Context) error { 301 ctx := c.Request().Context() 302 did := c.QueryParam("did") 303 if did == "" { 304 return fmt.Errorf("missing did") 305 } 306 307 if err := s.DeactivateRepo(ctx, did); err != nil { 308 return err 309 } 310 311 return c.String(200, "ok") 312 }) 313 314 e.GET("/reactivateRepo", func(c echo.Context) error { 315 ctx := c.Request().Context() 316 did := c.QueryParam("did") 317 if did == "" { 318 return fmt.Errorf("missing did") 319 } 320 321 if err := s.ReactivateRepo(ctx, did); err != nil { 322 return err 323 } 324 325 return c.String(200, "ok") 326 }) 327 328 e.Use(middleware.JWTWithConfig(cfg), s.userCheckMiddleware) 329 s.RegisterHandlersComAtproto(e) 330 331 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler) 332 e.GET("/xrpc/_health", s.HandleHealthCheck) 333 e.GET("/.well-known/atproto-did", s.HandleResolveDid) 334 335 // In order to support booting on random ports in tests, we need to tell the 336 // Echo instance it's already got a port, and then use its StartServer 337 // method to re-use that listener. 338 e.Listener = listen 339 srv := &http.Server{} 340 return e.StartServer(srv) 341} 342 343type HealthStatus struct { 344 Status string `json:"status"` 345 Message string `json:"msg,omitempty"` 346} 347 348func (s *Server) HandleHealthCheck(c echo.Context) error { 349 if err := s.db.Exec("SELECT 1").Error; err != nil { 350 s.log.Error("healthcheck can't connect to database", "err", err) 351 return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"}) 352 } else { 353 return c.JSON(200, HealthStatus{Status: "ok"}) 354 } 355} 356 357func (s *Server) HandleResolveDid(c echo.Context) error { 358 ctx := c.Request().Context() 359 360 handle := c.Request().Host 361 if hh := c.Request().Header.Get("Host"); hh != "" { 362 handle = hh 363 } 364 365 u, err := s.lookupUserByHandle(ctx, handle) 366 if err != nil { 367 return fmt.Errorf("resolving %q: %w", handle, err) 368 } 369 370 return c.String(200, u.Did) 371} 372 373type User = pdsdata.User 374 375type RefreshToken struct { 376 gorm.Model 377 Token string 378} 379 380func toTime(i interface{}) (time.Time, error) { 381 ival, ok := i.(float64) 382 if !ok { 383 return time.Time{}, fmt.Errorf("invalid type for timestamp: %T", i) 384 } 385 386 return time.Unix(int64(ival), 0), nil 387} 388 389func (s *Server) checkTokenValidity(user *gojwt.Token) (string, string, error) { 390 claims, ok := user.Claims.(gojwt.MapClaims) 391 if !ok { 392 return "", "", fmt.Errorf("invalid token claims map") 393 } 394 395 iat, ok := claims["iat"] 396 if !ok { 397 return "", "", fmt.Errorf("iat not set") 398 } 399 400 tiat, err := toTime(iat) 401 if err != nil { 402 return "", "", err 403 } 404 405 if tiat.After(time.Now()) { 406 return "", "", fmt.Errorf("iat cannot be in the future") 407 } 408 409 exp, ok := claims["exp"] 410 if !ok { 411 return "", "", fmt.Errorf("exp not set") 412 } 413 414 texp, err := toTime(exp) 415 if err != nil { 416 return "", "", err 417 } 418 419 if texp.Before(time.Now()) { 420 return "", "", fmt.Errorf("token expired") 421 } 422 423 did, ok := claims["sub"] 424 if !ok { 425 return "", "", fmt.Errorf("expected user did in subject") 426 } 427 428 didstr, ok := did.(string) 429 if !ok { 430 return "", "", fmt.Errorf("expected subject to be a string") 431 } 432 433 scope, ok := claims["scope"] 434 if !ok { 435 return "", "", fmt.Errorf("expected scope to be set") 436 } 437 438 scopestr, ok := scope.(string) 439 if !ok { 440 return "", "", fmt.Errorf("expected scope to be a string") 441 } 442 443 return scopestr, didstr, nil 444} 445 446func (s *Server) lookupUser(ctx context.Context, didorhandle string) (*User, error) { 447 if strings.HasPrefix(didorhandle, "did:") { 448 return s.lookupUserByDid(ctx, didorhandle) 449 } 450 451 return s.lookupUserByHandle(ctx, didorhandle) 452} 453 454func (s *Server) lookupUserByDid(ctx context.Context, did string) (*User, error) { 455 var u User 456 if err := s.db.First(&u, "did = ?", did).Error; err != nil { 457 return nil, err 458 } 459 460 return &u, nil 461} 462 463var ErrNoSuchUser = fmt.Errorf("no such user") 464 465func (s *Server) lookupUserByHandle(ctx context.Context, handle string) (*User, error) { 466 var u User 467 if err := s.db.Find(&u, "handle = ?", handle).Error; err != nil { 468 return nil, err 469 } 470 if u.ID == 0 { 471 return nil, ErrNoSuchUser 472 } 473 474 return &u, nil 475} 476 477func (s *Server) userCheckMiddleware(next echo.HandlerFunc) echo.HandlerFunc { 478 return func(c echo.Context) error { 479 ctx := c.Request().Context() 480 481 user, ok := c.Get("user").(*gojwt.Token) 482 if !ok { 483 return next(c) 484 } 485 ctx = context.WithValue(ctx, "token", user) 486 487 scope, did, err := s.checkTokenValidity(user) 488 if err != nil { 489 return fmt.Errorf("invalid token: %w", err) 490 } 491 492 u, err := s.lookupUser(ctx, did) 493 if err != nil { 494 return err 495 } 496 497 ctx = context.WithValue(ctx, "authScope", scope) 498 ctx = context.WithValue(ctx, "user", u) 499 ctx = context.WithValue(ctx, "did", did) 500 501 c.SetRequest(c.Request().WithContext(ctx)) 502 return next(c) 503 } 504} 505 506func (s *Server) getUser(ctx context.Context) (*User, error) { 507 u, ok := ctx.Value("user").(*User) 508 if !ok { 509 return nil, fmt.Errorf("auth required") 510 } 511 512 //u.Did = ctx.Value("did").(string) 513 514 return u, nil 515} 516 517func validateEmail(email string) error { 518 _, err := mail.ParseAddress(email) 519 if err != nil { 520 return err 521 } 522 523 return nil 524} 525 526func (s *Server) validateHandle(handle string) error { 527 if !strings.HasSuffix(handle, s.handleSuffix) { 528 return fmt.Errorf("invalid handle") 529 } 530 531 if strings.Contains(strings.TrimSuffix(handle, s.handleSuffix), ".") { 532 return fmt.Errorf("invalid handle") 533 } 534 535 return nil 536} 537 538func (s *Server) invalidateToken(ctx context.Context, u *User, tok *jwt.Token) error { 539 panic("nyi") 540} 541 542type Peering = pdsdata.Peering 543 544func (s *Server) EventsHandler(c echo.Context) error { 545 conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10) 546 if err != nil { 547 return err 548 } 549 550 var peering *Peering 551 if s.enforcePeering { 552 did := c.Request().Header.Get("DID") 553 if did != "" { 554 if err := s.db.First(peering, "did = ?", did).Error; err != nil { 555 return err 556 } 557 } 558 } 559 560 ctx := c.Request().Context() 561 562 ident := c.RealIP() + "-" + c.Request().UserAgent() 563 564 evts, cancel, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 565 if !s.enforcePeering { 566 return true 567 } 568 if peering.ID == 0 { 569 return true 570 } 571 572 for _, pid := range evt.PrivRelevantPds { 573 if pid == peering.ID { 574 return true 575 } 576 } 577 578 return false 579 }, nil) 580 if err != nil { 581 return err 582 } 583 defer cancel() 584 585 header := events.EventHeader{Op: events.EvtKindMessage} 586 for evt := range evts { 587 wc, err := conn.NextWriter(websocket.BinaryMessage) 588 if err != nil { 589 return err 590 } 591 592 var obj lexutil.CBOR 593 594 switch { 595 case evt.Error != nil: 596 header.Op = events.EvtKindErrorFrame 597 obj = evt.Error 598 case evt.RepoCommit != nil: 599 header.MsgType = "#commit" 600 obj = evt.RepoCommit 601 case evt.RepoSync != nil: 602 header.MsgType = "#sync" 603 obj = evt.RepoSync 604 case evt.RepoIdentity != nil: 605 header.MsgType = "#identity" 606 obj = evt.RepoIdentity 607 case evt.RepoAccount != nil: 608 header.MsgType = "#account" 609 obj = evt.RepoAccount 610 case evt.RepoInfo != nil: 611 header.MsgType = "#info" 612 obj = evt.RepoInfo 613 default: 614 return fmt.Errorf("unrecognized event kind") 615 } 616 617 if err := header.MarshalCBOR(wc); err != nil { 618 return fmt.Errorf("failed to write header: %w", err) 619 } 620 621 if err := obj.MarshalCBOR(wc); err != nil { 622 return fmt.Errorf("failed to write event: %w", err) 623 } 624 625 if err := wc.Close(); err != nil { 626 return fmt.Errorf("failed to flush-close our event write: %w", err) 627 } 628 } 629 630 return nil 631} 632 633func (s *Server) UpdateUserHandle(ctx context.Context, u *User, handle string) error { 634 if u.Handle == handle { 635 // no change? move on 636 s.log.Warn("attempted to change handle to current handle", "did", u.Did, "handle", handle) 637 return nil 638 } 639 640 _, err := s.indexer.LookupUserByHandle(ctx, handle) 641 if err == nil { 642 return fmt.Errorf("handle %q is already in use", handle) 643 } 644 645 if err := s.plc.UpdateUserHandle(ctx, u.Did, handle); err != nil { 646 return fmt.Errorf("failed to update users handle on plc: %w", err) 647 } 648 649 if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumn("handle", handle).Error; err != nil { 650 return fmt.Errorf("failed to update handle: %w", err) 651 } 652 653 if err := s.db.Model(User{}).Where("id = ?", u.ID).UpdateColumn("handle", handle).Error; err != nil { 654 return fmt.Errorf("failed to update handle: %w", err) 655 } 656 657 // Push an Identity event 658 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 659 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 660 Did: u.Did, 661 Time: time.Now().Format(util.ISO8601), 662 }, 663 }); err != nil { 664 return fmt.Errorf("failed to push event: %s", err) 665 } 666 667 return nil 668} 669 670func (s *Server) TakedownRepo(ctx context.Context, did string) error { 671 // Push an Account event 672 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 673 RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 674 Did: did, 675 Active: false, 676 Status: &events.AccountStatusTakendown, 677 Time: time.Now().Format(util.ISO8601), 678 }, 679 }); err != nil { 680 return fmt.Errorf("failed to push event: %s", err) 681 } 682 683 return nil 684} 685 686func (s *Server) SuspendRepo(ctx context.Context, did string) error { 687 // Push an Account event 688 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 689 RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 690 Did: did, 691 Active: false, 692 Status: &events.AccountStatusSuspended, 693 Time: time.Now().Format(util.ISO8601), 694 }, 695 }); err != nil { 696 return fmt.Errorf("failed to push event: %s", err) 697 } 698 699 return nil 700} 701 702func (s *Server) DeactivateRepo(ctx context.Context, did string) error { 703 // Push an Account event 704 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 705 RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 706 Did: did, 707 Active: false, 708 Status: &events.AccountStatusDeactivated, 709 Time: time.Now().Format(util.ISO8601), 710 }, 711 }); err != nil { 712 return fmt.Errorf("failed to push event: %s", err) 713 } 714 715 return nil 716} 717 718func (s *Server) ReactivateRepo(ctx context.Context, did string) error { 719 // Push an Account event 720 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{ 721 RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 722 Did: did, 723 Active: true, 724 Status: &events.AccountStatusActive, 725 Time: time.Now().Format(util.ISO8601), 726 }, 727 }); err != nil { 728 return fmt.Errorf("failed to push event: %s", err) 729 } 730 731 return nil 732} 733 734func (s *Server) Repoman() *repomgr.RepoManager { 735 return s.repoman 736}