Live video on the AT Protocol
1package spxrpc
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "time"
11
12 "github.com/labstack/echo/v4"
13 "github.com/patrickmn/go-cache"
14 "github.com/slok/go-http-metrics/middleware"
15 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
16 "github.com/streamplace/oatproxy/pkg/oatproxy"
17 "stream.place/streamplace/pkg/aqhttp"
18 "stream.place/streamplace/pkg/atproto"
19 "stream.place/streamplace/pkg/bus"
20 "stream.place/streamplace/pkg/config"
21 "stream.place/streamplace/pkg/log"
22 "stream.place/streamplace/pkg/model"
23 "stream.place/streamplace/pkg/statedb"
24)
25
26type Server struct {
27 e *echo.Echo
28 cli *config.CLI
29 model model.Model
30 OGImageCache *cache.Cache
31 ATSync *atproto.ATProtoSynchronizer
32 statefulDB *statedb.StatefulDB
33 bus *bus.Bus
34}
35
36func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer, bus *bus.Bus) (*Server, error) {
37 e := echo.New()
38 s := &Server{
39 e: e,
40 cli: cli,
41 model: model,
42 OGImageCache: cache.New(5*time.Minute, 10*time.Minute), // 5min TTL, 10min cleanup
43 ATSync: atsync,
44 statefulDB: statefulDB,
45 bus: bus,
46 }
47 e.Use(s.ErrorHandlingMiddleware())
48 e.Use(s.ContextPreservingMiddleware())
49 e.Use(echomiddleware.Handler("", mdlw))
50 e.Use(op.OAuthMiddleware)
51 err := s.RegisterHandlersPlaceStream(e)
52 if err != nil {
53 return nil, err
54 }
55 err = s.RegisterHandlersAppBsky(e)
56 if err != nil {
57 return nil, err
58 }
59 err = s.RegisterHandlersComAtproto(e)
60 if err != nil {
61 return nil, err
62 }
63 e.GET("/xrpc/_health", func(c echo.Context) error {
64 return c.JSON(http.StatusOK, map[string]string{"version": cli.Build.Version})
65 })
66 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleComAtprotoSyncSubscribeRepos)
67 e.GET("/xrpc/place.stream.live.subscribeSegments", s.handlePlaceStreamLiveSubscribeSegments)
68 e.GET("/xrpc/*", s.HandleWildcard)
69 e.POST("/xrpc/*", s.HandleWildcard)
70 return s, nil
71}
72
73func (s *Server) isLocalPDS(ctx context.Context, repo string) (bool, string, error) {
74 did, svc, _, err := resolveRepoService(ctx, repo)
75 if err != nil {
76 return false, "", fmt.Errorf("resolveRepoService: %w", err)
77 }
78 if did == s.cli.MyDID() {
79 return true, svc, nil
80 }
81 return false, svc, nil
82}
83
84func makeUnauthenticatedRequest(ctx context.Context, service, method string, params map[string]interface{}, out interface{}) error {
85 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
86 defer cancel()
87 u, err := url.Parse(fmt.Sprintf("%s/xrpc/%s", service, method))
88 if err != nil {
89 return fmt.Errorf("failed to parse URL: %w", err)
90 }
91
92 // add query parameters
93 query := u.Query()
94 for k, v := range params {
95 query.Set(k, fmt.Sprintf("%v", v))
96 }
97 u.RawQuery = query.Encode()
98
99 log.Error(ctx, "making unauthenticated request", "url", u.String())
100
101 req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
102 if err != nil {
103 return fmt.Errorf("failed to create request: %w", err)
104 }
105
106 resp, err := aqhttp.Client.Do(req)
107 if err != nil {
108 return fmt.Errorf("failed to make request: %w", err)
109 }
110 defer resp.Body.Close()
111
112 if resp.StatusCode != http.StatusOK {
113 return fmt.Errorf("upstream request failed with status %d", resp.StatusCode)
114 }
115
116 body, err := io.ReadAll(resp.Body)
117 if err != nil {
118 return fmt.Errorf("failed to read response body: %w", err)
119 }
120
121 if err := json.Unmarshal(body, out); err != nil {
122 return fmt.Errorf("failed to unmarshal response: %w", err)
123 }
124
125 return nil
126}
127
128func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
129 s.e.ServeHTTP(w, r)
130}
131
132func (s *Server) ErrorHandlingMiddleware() echo.MiddlewareFunc {
133 return func(next echo.HandlerFunc) echo.HandlerFunc {
134 return func(c echo.Context) error {
135 err := next(c)
136 if err == nil {
137 return nil
138 }
139 httpError, ok := err.(*echo.HTTPError)
140 if ok {
141 log.Error(c.Request().Context(), "http error", "code", httpError.Code, "message", httpError.Message, "internal", httpError.Internal)
142 return err
143 }
144 log.Error(c.Request().Context(), "unhandled error", "error", err)
145 return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
146 }
147 }
148}
149
150// unique type to prevent assignment.
151type echoContextKeyType struct{}
152
153// singleton value to identify our logging metadata in context
154var echoContextKey = echoContextKeyType{}
155
156func (s *Server) ContextPreservingMiddleware() echo.MiddlewareFunc {
157 return func(next echo.HandlerFunc) echo.HandlerFunc {
158 return func(c echo.Context) error {
159 ctx := c.Request().Context()
160 if ctx == nil {
161 ctx = context.Background()
162 }
163 ctx = context.WithValue(ctx, echoContextKey, c)
164 c.SetRequest(c.Request().WithContext(ctx))
165 return next(c)
166 }
167 }
168}