1package pds
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net"
10 "net/http"
11 "net/mail"
12 "net/url"
13 "strings"
14 "time"
15
16 "github.com/bluesky-social/indigo/api/atproto"
17 comatproto "github.com/bluesky-social/indigo/api/atproto"
18 "github.com/bluesky-social/indigo/carstore"
19 "github.com/bluesky-social/indigo/events"
20 "github.com/bluesky-social/indigo/indexer"
21 lexutil "github.com/bluesky-social/indigo/lex/util"
22 "github.com/bluesky-social/indigo/models"
23 pdsdata "github.com/bluesky-social/indigo/pds/data"
24 "github.com/bluesky-social/indigo/plc"
25 "github.com/bluesky-social/indigo/repomgr"
26 "github.com/bluesky-social/indigo/util"
27 "github.com/bluesky-social/indigo/xrpc"
28 gojwt "github.com/golang-jwt/jwt"
29 "github.com/gorilla/websocket"
30 "github.com/labstack/echo/v4"
31 "github.com/labstack/echo/v4/middleware"
32 "github.com/lestrrat-go/jwx/v2/jwt"
33 "github.com/whyrusleeping/go-did"
34 "gorm.io/gorm"
35)
36
37type Server struct {
38 db *gorm.DB
39 cs carstore.CarStore
40 repoman *repomgr.RepoManager
41 indexer *indexer.Indexer
42 events *events.EventManager
43 signingKey *did.PrivKey
44 echo *echo.Echo
45 jwtSigningKey []byte
46 enforcePeering bool
47
48 handleSuffix string
49 serviceUrl string
50
51 plc plc.PLCClient
52
53 log *slog.Logger
54}
55
56// serverListenerBootTimeout is how long to wait for the requested server socket
57// to become available for use. This is an arbitrary timeout that should be safe
58// on any platform, but there's no great way to weave this timeout without
59// adding another parameter to the (at time of writing) long signature of
60// NewServer.
61const serverListenerBootTimeout = 5 * time.Second
62
63func NewServer(db *gorm.DB, cs carstore.CarStore, serkey *did.PrivKey, handleSuffix, serviceUrl string, didr plc.PLCClient, jwtkey []byte) (*Server, error) {
64 db.AutoMigrate(&User{})
65 db.AutoMigrate(&Peering{})
66
67 evtman := events.NewEventManager(events.NewMemPersister())
68
69 kmgr := indexer.NewKeyManager(didr, serkey)
70
71 repoman := repomgr.NewRepoManager(cs, kmgr)
72
73 rf := indexer.NewRepoFetcher(db, repoman, 10)
74
75 ix, err := indexer.NewIndexer(db, evtman, didr, rf, false)
76 if err != nil {
77 return nil, err
78 }
79
80 s := &Server{
81 signingKey: serkey,
82 db: db,
83 cs: cs,
84 indexer: ix,
85 plc: didr,
86 events: evtman,
87 repoman: repoman,
88 handleSuffix: handleSuffix,
89 serviceUrl: serviceUrl,
90 jwtSigningKey: jwtkey,
91 enforcePeering: false,
92
93 log: slog.Default().With("system", "pds"),
94 }
95
96 repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) {
97 if err := ix.HandleRepoEvent(ctx, evt); err != nil {
98 s.log.Error("handle repo event failed", "user", evt.User, "err", err)
99 }
100 }, true)
101
102 //ix.SendRemoteFollow = s.sendRemoteFollow
103 ix.CreateExternalUser = s.createExternalUser
104
105 return s, nil
106}
107
108func (s *Server) Shutdown(ctx context.Context) error {
109 return s.echo.Shutdown(ctx)
110}
111
112func (s *Server) createExternalUser(ctx context.Context, did string) (*models.ActorInfo, error) {
113 doc, err := s.plc.GetDocument(ctx, did)
114 if err != nil {
115 return nil, fmt.Errorf("could not locate DID document for followed user: %s", err)
116 }
117
118 if len(doc.Service) == 0 {
119 return nil, fmt.Errorf("external followed user %s had no services in did document", did)
120 }
121
122 svc := doc.Service[0]
123 durl, err := url.Parse(svc.ServiceEndpoint)
124 if err != nil {
125 return nil, err
126 }
127
128 // TODO: the PDS's DID should also be in the service, we could use that to look up?
129 var peering Peering
130 if err := s.db.Find(&peering, "host = ?", durl.Host).Error; err != nil {
131 return nil, err
132 }
133
134 c := &xrpc.Client{Host: svc.ServiceEndpoint}
135
136 if peering.ID == 0 {
137 cfg, err := atproto.ServerDescribeServer(ctx, c)
138 if err != nil {
139 // TODO: failing this should not halt our indexing
140 return nil, fmt.Errorf("failed to check unrecognized pds: %w", err)
141 }
142
143 // since handles can be anything, checking against this list does not matter...
144 _ = cfg
145
146 // TODO: could check other things, a valid response is good enough for now
147 peering.Host = svc.ServiceEndpoint
148
149 if err := s.db.Create(&peering).Error; err != nil {
150 return nil, err
151 }
152 }
153
154 var handle string
155 if len(doc.AlsoKnownAs) > 0 {
156 hurl, err := url.Parse(doc.AlsoKnownAs[0])
157 if err != nil {
158 return nil, err
159 }
160
161 handle = hurl.Host
162 }
163
164 // TODO: request this users info from their server to fill out our data...
165 u := User{
166 Handle: handle,
167 Did: did,
168 PDS: peering.ID,
169 }
170
171 if err := s.db.Create(&u).Error; err != nil {
172 return nil, fmt.Errorf("failed to create other pds user: %w", err)
173 }
174
175 // okay cool, its a user on a server we are peered with
176 // lets make a local record of that user for the future
177 subj := &models.ActorInfo{
178 Uid: u.ID,
179 Handle: sql.NullString{String: handle, Valid: true},
180 DisplayName: "missing display name",
181 Did: did,
182 Type: "",
183 PDS: peering.ID,
184 }
185 if err := s.db.Create(subj).Error; err != nil {
186 return nil, err
187 }
188
189 return subj, nil
190}
191
192func (s *Server) RunAPI(addr string) error {
193 var lc net.ListenConfig
194 ctx, cancel := context.WithTimeout(context.Background(), serverListenerBootTimeout)
195 defer cancel()
196
197 li, err := lc.Listen(ctx, "tcp", addr)
198 if err != nil {
199 return err
200 }
201 return s.RunAPIWithListener(li)
202}
203
204func (s *Server) RunAPIWithListener(listen net.Listener) error {
205 e := echo.New()
206 s.echo = e
207 e.HideBanner = true
208 e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
209 Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n",
210 }))
211
212 cfg := middleware.JWTConfig{
213 Skipper: func(c echo.Context) bool {
214 switch c.Path() {
215 case "/xrpc/_health":
216 return true
217 case "/xrpc/com.atproto.sync.subscribeRepos":
218 return true
219 case "/xrpc/com.atproto.account.create":
220 return true
221 case "/xrpc/com.atproto.identity.resolveHandle":
222 return true
223 case "/xrpc/com.atproto.server.createAccount":
224 return true
225 case "/xrpc/com.atproto.server.createSession":
226 return true
227 case "/xrpc/com.atproto.server.describeServer":
228 return true
229 case "/xrpc/com.atproto.sync.getRepo":
230 fmt.Println("TODO: currently not requiring auth on get repo endpoint")
231 return true
232 case "/xrpc/com.atproto.peering.follow", "/events":
233 auth := c.Request().Header.Get("Authorization")
234
235 did := c.Request().Header.Get("DID")
236 ctx := c.Request().Context()
237 ctx = context.WithValue(ctx, "did", did)
238 ctx = context.WithValue(ctx, "auth", auth)
239 c.SetRequest(c.Request().WithContext(ctx))
240 return true
241 case "/.well-known/atproto-did":
242 return true
243 case "/takedownRepo":
244 return true
245 case "/suspendRepo":
246 return true
247 case "/deactivateRepo":
248 return true
249 case "/reactivateRepo":
250 return true
251 default:
252 return false
253 }
254 },
255 SigningKey: s.jwtSigningKey,
256 }
257
258 e.HTTPErrorHandler = func(err error, ctx echo.Context) {
259 fmt.Printf("PDS HANDLER ERROR: (%s) %s\n", ctx.Path(), err)
260
261 // TODO: need to properly figure out where http error codes for error
262 // types get decided. This spot is reasonable, but maybe a bit weird.
263 // reviewers, please advise
264 if errors.Is(err, ErrNoSuchUser) {
265 ctx.Response().WriteHeader(404)
266 return
267 }
268
269 ctx.Response().WriteHeader(500)
270 }
271
272 e.GET("/takedownRepo", func(c echo.Context) error {
273 ctx := c.Request().Context()
274 did := c.QueryParam("did")
275 if did == "" {
276 return fmt.Errorf("missing did")
277 }
278
279 if err := s.TakedownRepo(ctx, did); err != nil {
280 return err
281 }
282
283 return c.String(200, "ok")
284 })
285
286 e.GET("/suspendRepo", func(c echo.Context) error {
287 ctx := c.Request().Context()
288 did := c.QueryParam("did")
289 if did == "" {
290 return fmt.Errorf("missing did")
291 }
292
293 if err := s.SuspendRepo(ctx, did); err != nil {
294 return err
295 }
296
297 return c.String(200, "ok")
298 })
299
300 e.GET("/deactivateRepo", func(c echo.Context) error {
301 ctx := c.Request().Context()
302 did := c.QueryParam("did")
303 if did == "" {
304 return fmt.Errorf("missing did")
305 }
306
307 if err := s.DeactivateRepo(ctx, did); err != nil {
308 return err
309 }
310
311 return c.String(200, "ok")
312 })
313
314 e.GET("/reactivateRepo", func(c echo.Context) error {
315 ctx := c.Request().Context()
316 did := c.QueryParam("did")
317 if did == "" {
318 return fmt.Errorf("missing did")
319 }
320
321 if err := s.ReactivateRepo(ctx, did); err != nil {
322 return err
323 }
324
325 return c.String(200, "ok")
326 })
327
328 e.Use(middleware.JWTWithConfig(cfg), s.userCheckMiddleware)
329 s.RegisterHandlersComAtproto(e)
330
331 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler)
332 e.GET("/xrpc/_health", s.HandleHealthCheck)
333 e.GET("/.well-known/atproto-did", s.HandleResolveDid)
334
335 // In order to support booting on random ports in tests, we need to tell the
336 // Echo instance it's already got a port, and then use its StartServer
337 // method to re-use that listener.
338 e.Listener = listen
339 srv := &http.Server{}
340 return e.StartServer(srv)
341}
342
343type HealthStatus struct {
344 Status string `json:"status"`
345 Message string `json:"msg,omitempty"`
346}
347
348func (s *Server) HandleHealthCheck(c echo.Context) error {
349 if err := s.db.Exec("SELECT 1").Error; err != nil {
350 s.log.Error("healthcheck can't connect to database", "err", err)
351 return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"})
352 } else {
353 return c.JSON(200, HealthStatus{Status: "ok"})
354 }
355}
356
357func (s *Server) HandleResolveDid(c echo.Context) error {
358 ctx := c.Request().Context()
359
360 handle := c.Request().Host
361 if hh := c.Request().Header.Get("Host"); hh != "" {
362 handle = hh
363 }
364
365 u, err := s.lookupUserByHandle(ctx, handle)
366 if err != nil {
367 return fmt.Errorf("resolving %q: %w", handle, err)
368 }
369
370 return c.String(200, u.Did)
371}
372
373type User = pdsdata.User
374
375type RefreshToken struct {
376 gorm.Model
377 Token string
378}
379
380func toTime(i interface{}) (time.Time, error) {
381 ival, ok := i.(float64)
382 if !ok {
383 return time.Time{}, fmt.Errorf("invalid type for timestamp: %T", i)
384 }
385
386 return time.Unix(int64(ival), 0), nil
387}
388
389func (s *Server) checkTokenValidity(user *gojwt.Token) (string, string, error) {
390 claims, ok := user.Claims.(gojwt.MapClaims)
391 if !ok {
392 return "", "", fmt.Errorf("invalid token claims map")
393 }
394
395 iat, ok := claims["iat"]
396 if !ok {
397 return "", "", fmt.Errorf("iat not set")
398 }
399
400 tiat, err := toTime(iat)
401 if err != nil {
402 return "", "", err
403 }
404
405 if tiat.After(time.Now()) {
406 return "", "", fmt.Errorf("iat cannot be in the future")
407 }
408
409 exp, ok := claims["exp"]
410 if !ok {
411 return "", "", fmt.Errorf("exp not set")
412 }
413
414 texp, err := toTime(exp)
415 if err != nil {
416 return "", "", err
417 }
418
419 if texp.Before(time.Now()) {
420 return "", "", fmt.Errorf("token expired")
421 }
422
423 did, ok := claims["sub"]
424 if !ok {
425 return "", "", fmt.Errorf("expected user did in subject")
426 }
427
428 didstr, ok := did.(string)
429 if !ok {
430 return "", "", fmt.Errorf("expected subject to be a string")
431 }
432
433 scope, ok := claims["scope"]
434 if !ok {
435 return "", "", fmt.Errorf("expected scope to be set")
436 }
437
438 scopestr, ok := scope.(string)
439 if !ok {
440 return "", "", fmt.Errorf("expected scope to be a string")
441 }
442
443 return scopestr, didstr, nil
444}
445
446func (s *Server) lookupUser(ctx context.Context, didorhandle string) (*User, error) {
447 if strings.HasPrefix(didorhandle, "did:") {
448 return s.lookupUserByDid(ctx, didorhandle)
449 }
450
451 return s.lookupUserByHandle(ctx, didorhandle)
452}
453
454func (s *Server) lookupUserByDid(ctx context.Context, did string) (*User, error) {
455 var u User
456 if err := s.db.First(&u, "did = ?", did).Error; err != nil {
457 return nil, err
458 }
459
460 return &u, nil
461}
462
463var ErrNoSuchUser = fmt.Errorf("no such user")
464
465func (s *Server) lookupUserByHandle(ctx context.Context, handle string) (*User, error) {
466 var u User
467 if err := s.db.Find(&u, "handle = ?", handle).Error; err != nil {
468 return nil, err
469 }
470 if u.ID == 0 {
471 return nil, ErrNoSuchUser
472 }
473
474 return &u, nil
475}
476
477func (s *Server) userCheckMiddleware(next echo.HandlerFunc) echo.HandlerFunc {
478 return func(c echo.Context) error {
479 ctx := c.Request().Context()
480
481 user, ok := c.Get("user").(*gojwt.Token)
482 if !ok {
483 return next(c)
484 }
485 ctx = context.WithValue(ctx, "token", user)
486
487 scope, did, err := s.checkTokenValidity(user)
488 if err != nil {
489 return fmt.Errorf("invalid token: %w", err)
490 }
491
492 u, err := s.lookupUser(ctx, did)
493 if err != nil {
494 return err
495 }
496
497 ctx = context.WithValue(ctx, "authScope", scope)
498 ctx = context.WithValue(ctx, "user", u)
499 ctx = context.WithValue(ctx, "did", did)
500
501 c.SetRequest(c.Request().WithContext(ctx))
502 return next(c)
503 }
504}
505
506func (s *Server) getUser(ctx context.Context) (*User, error) {
507 u, ok := ctx.Value("user").(*User)
508 if !ok {
509 return nil, fmt.Errorf("auth required")
510 }
511
512 //u.Did = ctx.Value("did").(string)
513
514 return u, nil
515}
516
517func validateEmail(email string) error {
518 _, err := mail.ParseAddress(email)
519 if err != nil {
520 return err
521 }
522
523 return nil
524}
525
526func (s *Server) validateHandle(handle string) error {
527 if !strings.HasSuffix(handle, s.handleSuffix) {
528 return fmt.Errorf("invalid handle")
529 }
530
531 if strings.Contains(strings.TrimSuffix(handle, s.handleSuffix), ".") {
532 return fmt.Errorf("invalid handle")
533 }
534
535 return nil
536}
537
538func (s *Server) invalidateToken(ctx context.Context, u *User, tok *jwt.Token) error {
539 panic("nyi")
540}
541
542type Peering = pdsdata.Peering
543
544func (s *Server) EventsHandler(c echo.Context) error {
545 conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10)
546 if err != nil {
547 return err
548 }
549
550 var peering *Peering
551 if s.enforcePeering {
552 did := c.Request().Header.Get("DID")
553 if did != "" {
554 if err := s.db.First(peering, "did = ?", did).Error; err != nil {
555 return err
556 }
557 }
558 }
559
560 ctx := c.Request().Context()
561
562 ident := c.RealIP() + "-" + c.Request().UserAgent()
563
564 evts, cancel, err := s.events.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
565 if !s.enforcePeering {
566 return true
567 }
568 if peering.ID == 0 {
569 return true
570 }
571
572 for _, pid := range evt.PrivRelevantPds {
573 if pid == peering.ID {
574 return true
575 }
576 }
577
578 return false
579 }, nil)
580 if err != nil {
581 return err
582 }
583 defer cancel()
584
585 header := events.EventHeader{Op: events.EvtKindMessage}
586 for evt := range evts {
587 wc, err := conn.NextWriter(websocket.BinaryMessage)
588 if err != nil {
589 return err
590 }
591
592 var obj lexutil.CBOR
593
594 switch {
595 case evt.Error != nil:
596 header.Op = events.EvtKindErrorFrame
597 obj = evt.Error
598 case evt.RepoCommit != nil:
599 header.MsgType = "#commit"
600 obj = evt.RepoCommit
601 case evt.RepoSync != nil:
602 header.MsgType = "#sync"
603 obj = evt.RepoSync
604 case evt.RepoIdentity != nil:
605 header.MsgType = "#identity"
606 obj = evt.RepoIdentity
607 case evt.RepoAccount != nil:
608 header.MsgType = "#account"
609 obj = evt.RepoAccount
610 case evt.RepoInfo != nil:
611 header.MsgType = "#info"
612 obj = evt.RepoInfo
613 default:
614 return fmt.Errorf("unrecognized event kind")
615 }
616
617 if err := header.MarshalCBOR(wc); err != nil {
618 return fmt.Errorf("failed to write header: %w", err)
619 }
620
621 if err := obj.MarshalCBOR(wc); err != nil {
622 return fmt.Errorf("failed to write event: %w", err)
623 }
624
625 if err := wc.Close(); err != nil {
626 return fmt.Errorf("failed to flush-close our event write: %w", err)
627 }
628 }
629
630 return nil
631}
632
633func (s *Server) UpdateUserHandle(ctx context.Context, u *User, handle string) error {
634 if u.Handle == handle {
635 // no change? move on
636 s.log.Warn("attempted to change handle to current handle", "did", u.Did, "handle", handle)
637 return nil
638 }
639
640 _, err := s.indexer.LookupUserByHandle(ctx, handle)
641 if err == nil {
642 return fmt.Errorf("handle %q is already in use", handle)
643 }
644
645 if err := s.plc.UpdateUserHandle(ctx, u.Did, handle); err != nil {
646 return fmt.Errorf("failed to update users handle on plc: %w", err)
647 }
648
649 if err := s.db.Model(models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumn("handle", handle).Error; err != nil {
650 return fmt.Errorf("failed to update handle: %w", err)
651 }
652
653 if err := s.db.Model(User{}).Where("id = ?", u.ID).UpdateColumn("handle", handle).Error; err != nil {
654 return fmt.Errorf("failed to update handle: %w", err)
655 }
656
657 // Push an Identity event
658 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
659 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
660 Did: u.Did,
661 Time: time.Now().Format(util.ISO8601),
662 },
663 }); err != nil {
664 return fmt.Errorf("failed to push event: %s", err)
665 }
666
667 return nil
668}
669
670func (s *Server) TakedownRepo(ctx context.Context, did string) error {
671 // Push an Account event
672 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
673 RepoAccount: &comatproto.SyncSubscribeRepos_Account{
674 Did: did,
675 Active: false,
676 Status: &events.AccountStatusTakendown,
677 Time: time.Now().Format(util.ISO8601),
678 },
679 }); err != nil {
680 return fmt.Errorf("failed to push event: %s", err)
681 }
682
683 return nil
684}
685
686func (s *Server) SuspendRepo(ctx context.Context, did string) error {
687 // Push an Account event
688 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
689 RepoAccount: &comatproto.SyncSubscribeRepos_Account{
690 Did: did,
691 Active: false,
692 Status: &events.AccountStatusSuspended,
693 Time: time.Now().Format(util.ISO8601),
694 },
695 }); err != nil {
696 return fmt.Errorf("failed to push event: %s", err)
697 }
698
699 return nil
700}
701
702func (s *Server) DeactivateRepo(ctx context.Context, did string) error {
703 // Push an Account event
704 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
705 RepoAccount: &comatproto.SyncSubscribeRepos_Account{
706 Did: did,
707 Active: false,
708 Status: &events.AccountStatusDeactivated,
709 Time: time.Now().Format(util.ISO8601),
710 },
711 }); err != nil {
712 return fmt.Errorf("failed to push event: %s", err)
713 }
714
715 return nil
716}
717
718func (s *Server) ReactivateRepo(ctx context.Context, did string) error {
719 // Push an Account event
720 if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
721 RepoAccount: &comatproto.SyncSubscribeRepos_Account{
722 Did: did,
723 Active: true,
724 Status: &events.AccountStatusActive,
725 Time: time.Now().Format(util.ISO8601),
726 },
727 }); err != nil {
728 return fmt.Errorf("failed to push event: %s", err)
729 }
730
731 return nil
732}
733
734func (s *Server) Repoman() *repomgr.RepoManager {
735 return s.repoman
736}