Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/i18next-cli 168 lines 4.8 kB view raw
1package spxrpc 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "time" 11 12 "github.com/labstack/echo/v4" 13 "github.com/patrickmn/go-cache" 14 "github.com/slok/go-http-metrics/middleware" 15 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 16 "github.com/streamplace/oatproxy/pkg/oatproxy" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/bus" 20 "stream.place/streamplace/pkg/config" 21 "stream.place/streamplace/pkg/log" 22 "stream.place/streamplace/pkg/model" 23 "stream.place/streamplace/pkg/statedb" 24) 25 26type Server struct { 27 e *echo.Echo 28 cli *config.CLI 29 model model.Model 30 OGImageCache *cache.Cache 31 ATSync *atproto.ATProtoSynchronizer 32 statefulDB *statedb.StatefulDB 33 bus *bus.Bus 34} 35 36func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer, bus *bus.Bus) (*Server, error) { 37 e := echo.New() 38 s := &Server{ 39 e: e, 40 cli: cli, 41 model: model, 42 OGImageCache: cache.New(5*time.Minute, 10*time.Minute), // 5min TTL, 10min cleanup 43 ATSync: atsync, 44 statefulDB: statefulDB, 45 bus: bus, 46 } 47 e.Use(s.ErrorHandlingMiddleware()) 48 e.Use(s.ContextPreservingMiddleware()) 49 e.Use(echomiddleware.Handler("", mdlw)) 50 e.Use(op.OAuthMiddleware) 51 err := s.RegisterHandlersPlaceStream(e) 52 if err != nil { 53 return nil, err 54 } 55 err = s.RegisterHandlersAppBsky(e) 56 if err != nil { 57 return nil, err 58 } 59 err = s.RegisterHandlersComAtproto(e) 60 if err != nil { 61 return nil, err 62 } 63 e.GET("/xrpc/_health", func(c echo.Context) error { 64 return c.JSON(http.StatusOK, map[string]string{"version": cli.Build.Version}) 65 }) 66 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleComAtprotoSyncSubscribeRepos) 67 e.GET("/xrpc/place.stream.live.subscribeSegments", s.handlePlaceStreamLiveSubscribeSegments) 68 e.GET("/xrpc/*", s.HandleWildcard) 69 e.POST("/xrpc/*", s.HandleWildcard) 70 return s, nil 71} 72 73func (s *Server) isLocalPDS(ctx context.Context, repo string) (bool, string, error) { 74 did, svc, _, err := resolveRepoService(ctx, repo) 75 if err != nil { 76 return false, "", fmt.Errorf("resolveRepoService: %w", err) 77 } 78 if did == s.cli.MyDID() { 79 return true, svc, nil 80 } 81 return false, svc, nil 82} 83 84func makeUnauthenticatedRequest(ctx context.Context, service, method string, params map[string]interface{}, out interface{}) error { 85 ctx, cancel := context.WithTimeout(ctx, 10*time.Second) 86 defer cancel() 87 u, err := url.Parse(fmt.Sprintf("%s/xrpc/%s", service, method)) 88 if err != nil { 89 return fmt.Errorf("failed to parse URL: %w", err) 90 } 91 92 // add query parameters 93 query := u.Query() 94 for k, v := range params { 95 query.Set(k, fmt.Sprintf("%v", v)) 96 } 97 u.RawQuery = query.Encode() 98 99 log.Error(ctx, "making unauthenticated request", "url", u.String()) 100 101 req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) 102 if err != nil { 103 return fmt.Errorf("failed to create request: %w", err) 104 } 105 106 resp, err := aqhttp.Client.Do(req) 107 if err != nil { 108 return fmt.Errorf("failed to make request: %w", err) 109 } 110 defer resp.Body.Close() 111 112 if resp.StatusCode != http.StatusOK { 113 return fmt.Errorf("upstream request failed with status %d", resp.StatusCode) 114 } 115 116 body, err := io.ReadAll(resp.Body) 117 if err != nil { 118 return fmt.Errorf("failed to read response body: %w", err) 119 } 120 121 if err := json.Unmarshal(body, out); err != nil { 122 return fmt.Errorf("failed to unmarshal response: %w", err) 123 } 124 125 return nil 126} 127 128func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 129 s.e.ServeHTTP(w, r) 130} 131 132func (s *Server) ErrorHandlingMiddleware() echo.MiddlewareFunc { 133 return func(next echo.HandlerFunc) echo.HandlerFunc { 134 return func(c echo.Context) error { 135 err := next(c) 136 if err == nil { 137 return nil 138 } 139 httpError, ok := err.(*echo.HTTPError) 140 if ok { 141 log.Error(c.Request().Context(), "http error", "code", httpError.Code, "message", httpError.Message, "internal", httpError.Internal) 142 return err 143 } 144 log.Error(c.Request().Context(), "unhandled error", "error", err) 145 return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) 146 } 147 } 148} 149 150// unique type to prevent assignment. 151type echoContextKeyType struct{} 152 153// singleton value to identify our logging metadata in context 154var echoContextKey = echoContextKeyType{} 155 156func (s *Server) ContextPreservingMiddleware() echo.MiddlewareFunc { 157 return func(next echo.HandlerFunc) echo.HandlerFunc { 158 return func(c echo.Context) error { 159 ctx := c.Request().Context() 160 if ctx == nil { 161 ctx = context.Background() 162 } 163 ctx = context.WithValue(ctx, echoContextKey, c) 164 c.SetRequest(c.Request().WithContext(ctx)) 165 return next(c) 166 } 167 } 168}