Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/concat-script 164 lines 4.6 kB view raw
1package spxrpc 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "time" 11 12 "github.com/labstack/echo/v4" 13 "github.com/patrickmn/go-cache" 14 "github.com/slok/go-http-metrics/middleware" 15 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 16 "github.com/streamplace/oatproxy/pkg/oatproxy" 17 "stream.place/streamplace/pkg/aqhttp" 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/config" 20 "stream.place/streamplace/pkg/log" 21 "stream.place/streamplace/pkg/model" 22 "stream.place/streamplace/pkg/statedb" 23) 24 25type Server struct { 26 e *echo.Echo 27 cli *config.CLI 28 model model.Model 29 OGImageCache *cache.Cache 30 ATSync *atproto.ATProtoSynchronizer 31 statefulDB *statedb.StatefulDB 32} 33 34func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer) (*Server, error) { 35 e := echo.New() 36 s := &Server{ 37 e: e, 38 cli: cli, 39 model: model, 40 OGImageCache: cache.New(5*time.Minute, 10*time.Minute), // 5min TTL, 10min cleanup 41 ATSync: atsync, 42 statefulDB: statefulDB, 43 } 44 e.Use(s.ErrorHandlingMiddleware()) 45 e.Use(s.ContextPreservingMiddleware()) 46 e.Use(echomiddleware.Handler("", mdlw)) 47 e.Use(op.OAuthMiddleware) 48 err := s.RegisterHandlersPlaceStream(e) 49 if err != nil { 50 return nil, err 51 } 52 err = s.RegisterHandlersAppBsky(e) 53 if err != nil { 54 return nil, err 55 } 56 err = s.RegisterHandlersComAtproto(e) 57 if err != nil { 58 return nil, err 59 } 60 e.GET("/xrpc/_health", func(c echo.Context) error { 61 return c.JSON(http.StatusOK, map[string]string{"version": cli.Build.Version}) 62 }) 63 e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.handleComAtprotoSyncSubscribeRepos) 64 e.GET("/xrpc/*", s.HandleWildcard) 65 e.POST("/xrpc/*", s.HandleWildcard) 66 return s, nil 67} 68 69func (s *Server) isLocalPDS(ctx context.Context, repo string) (bool, string, error) { 70 did, svc, _, err := resolveRepoService(ctx, repo) 71 if err != nil { 72 return false, "", fmt.Errorf("resolveRepoService: %w", err) 73 } 74 if did == s.cli.MyDID() { 75 return true, svc, nil 76 } 77 return false, svc, nil 78} 79 80func makeUnauthenticatedRequest(ctx context.Context, service, method string, params map[string]interface{}, out interface{}) error { 81 ctx, cancel := context.WithTimeout(ctx, 10*time.Second) 82 defer cancel() 83 u, err := url.Parse(fmt.Sprintf("%s/xrpc/%s", service, method)) 84 if err != nil { 85 return fmt.Errorf("failed to parse URL: %w", err) 86 } 87 88 // add query parameters 89 query := u.Query() 90 for k, v := range params { 91 query.Set(k, fmt.Sprintf("%v", v)) 92 } 93 u.RawQuery = query.Encode() 94 95 log.Error(ctx, "making unauthenticated request", "url", u.String()) 96 97 req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) 98 if err != nil { 99 return fmt.Errorf("failed to create request: %w", err) 100 } 101 102 resp, err := aqhttp.Client.Do(req) 103 if err != nil { 104 return fmt.Errorf("failed to make request: %w", err) 105 } 106 defer resp.Body.Close() 107 108 if resp.StatusCode != http.StatusOK { 109 return fmt.Errorf("upstream request failed with status %d", resp.StatusCode) 110 } 111 112 body, err := io.ReadAll(resp.Body) 113 if err != nil { 114 return fmt.Errorf("failed to read response body: %w", err) 115 } 116 117 if err := json.Unmarshal(body, out); err != nil { 118 return fmt.Errorf("failed to unmarshal response: %w", err) 119 } 120 121 return nil 122} 123 124func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 125 s.e.ServeHTTP(w, r) 126} 127 128func (s *Server) ErrorHandlingMiddleware() echo.MiddlewareFunc { 129 return func(next echo.HandlerFunc) echo.HandlerFunc { 130 return func(c echo.Context) error { 131 err := next(c) 132 if err == nil { 133 return nil 134 } 135 httpError, ok := err.(*echo.HTTPError) 136 if ok { 137 log.Error(c.Request().Context(), "http error", "code", httpError.Code, "message", httpError.Message, "internal", httpError.Internal) 138 return err 139 } 140 log.Error(c.Request().Context(), "unhandled error", "error", err) 141 return echo.NewHTTPError(http.StatusInternalServerError, err.Error()) 142 } 143 } 144} 145 146// unique type to prevent assignment. 147type echoContextKeyType struct{} 148 149// singleton value to identify our logging metadata in context 150var echoContextKey = echoContextKeyType{} 151 152func (s *Server) ContextPreservingMiddleware() echo.MiddlewareFunc { 153 return func(next echo.HandlerFunc) echo.HandlerFunc { 154 return func(c echo.Context) error { 155 ctx := c.Request().Context() 156 if ctx == nil { 157 ctx = context.Background() 158 } 159 ctx = context.WithValue(ctx, echoContextKey, c) 160 c.SetRequest(c.Request().WithContext(ctx)) 161 return next(c) 162 } 163 } 164}