Live video on the AT Protocol
1package spxrpc
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "time"
11
12 "github.com/labstack/echo/v4"
13 "github.com/patrickmn/go-cache"
14 "github.com/slok/go-http-metrics/middleware"
15 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
16 "github.com/streamplace/oatproxy/pkg/oatproxy"
17 "stream.place/streamplace/pkg/aqhttp"
18 "stream.place/streamplace/pkg/atproto"
19 "stream.place/streamplace/pkg/config"
20 "stream.place/streamplace/pkg/log"
21 "stream.place/streamplace/pkg/model"
22 "stream.place/streamplace/pkg/statedb"
23)
24
25type Server struct {
26 e *echo.Echo
27 cli *config.CLI
28 model model.Model
29 OGImageCache *cache.Cache
30 ATSync *atproto.ATProtoSynchronizer
31 statefulDB *statedb.StatefulDB
32}
33
34func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer) (*Server, error) {
35 e := echo.New()
36 s := &Server{
37 e: e,
38 cli: cli,
39 model: model,
40 OGImageCache: cache.New(5*time.Minute, 10*time.Minute), // 5min TTL, 10min cleanup
41 ATSync: atsync,
42 statefulDB: statefulDB,
43 }
44 e.Use(s.ErrorHandlingMiddleware())
45 e.Use(s.ContextPreservingMiddleware())
46 e.Use(echomiddleware.Handler("", mdlw))
47 e.Use(op.OAuthMiddleware)
48 err := s.RegisterHandlersPlaceStream(e)
49 if err != nil {
50 return nil, err
51 }
52 err = s.RegisterHandlersAppBsky(e)
53 if err != nil {
54 return nil, err
55 }
56 err = s.RegisterHandlersComAtproto(e)
57 if err != nil {
58 return nil, err
59 }
60 e.GET("/xrpc/_health", func(c echo.Context) error {
61 return c.JSON(http.StatusOK, map[string]string{"version": cli.Build.Version})
62 })
63 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleComAtprotoSyncSubscribeRepos)
64 e.GET("/xrpc/*", s.HandleWildcard)
65 e.POST("/xrpc/*", s.HandleWildcard)
66 return s, nil
67}
68
69func (s *Server) isLocalPDS(ctx context.Context, repo string) (bool, string, error) {
70 did, svc, _, err := resolveRepoService(ctx, repo)
71 if err != nil {
72 return false, "", fmt.Errorf("resolveRepoService: %w", err)
73 }
74 if did == s.cli.MyDID() {
75 return true, svc, nil
76 }
77 return false, svc, nil
78}
79
80func makeUnauthenticatedRequest(ctx context.Context, service, method string, params map[string]interface{}, out interface{}) error {
81 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
82 defer cancel()
83 u, err := url.Parse(fmt.Sprintf("%s/xrpc/%s", service, method))
84 if err != nil {
85 return fmt.Errorf("failed to parse URL: %w", err)
86 }
87
88 // add query parameters
89 query := u.Query()
90 for k, v := range params {
91 query.Set(k, fmt.Sprintf("%v", v))
92 }
93 u.RawQuery = query.Encode()
94
95 log.Error(ctx, "making unauthenticated request", "url", u.String())
96
97 req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
98 if err != nil {
99 return fmt.Errorf("failed to create request: %w", err)
100 }
101
102 resp, err := aqhttp.Client.Do(req)
103 if err != nil {
104 return fmt.Errorf("failed to make request: %w", err)
105 }
106 defer resp.Body.Close()
107
108 if resp.StatusCode != http.StatusOK {
109 return fmt.Errorf("upstream request failed with status %d", resp.StatusCode)
110 }
111
112 body, err := io.ReadAll(resp.Body)
113 if err != nil {
114 return fmt.Errorf("failed to read response body: %w", err)
115 }
116
117 if err := json.Unmarshal(body, out); err != nil {
118 return fmt.Errorf("failed to unmarshal response: %w", err)
119 }
120
121 return nil
122}
123
124func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
125 s.e.ServeHTTP(w, r)
126}
127
128func (s *Server) ErrorHandlingMiddleware() echo.MiddlewareFunc {
129 return func(next echo.HandlerFunc) echo.HandlerFunc {
130 return func(c echo.Context) error {
131 err := next(c)
132 if err == nil {
133 return nil
134 }
135 httpError, ok := err.(*echo.HTTPError)
136 if ok {
137 log.Error(c.Request().Context(), "http error", "code", httpError.Code, "message", httpError.Message, "internal", httpError.Internal)
138 return err
139 }
140 log.Error(c.Request().Context(), "unhandled error", "error", err)
141 return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
142 }
143 }
144}
145
146// unique type to prevent assignment.
147type echoContextKeyType struct{}
148
149// singleton value to identify our logging metadata in context
150var echoContextKey = echoContextKeyType{}
151
152func (s *Server) ContextPreservingMiddleware() echo.MiddlewareFunc {
153 return func(next echo.HandlerFunc) echo.HandlerFunc {
154 return func(c echo.Context) error {
155 ctx := c.Request().Context()
156 if ctx == nil {
157 ctx = context.Background()
158 }
159 ctx = context.WithValue(ctx, echoContextKey, c)
160 c.SetRequest(c.Request().WithContext(ctx))
161 return next(c)
162 }
163 }
164}