Live video on the AT Protocol
at eli/lexicon-dev 952 lines 30 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "regexp" 16 "strconv" 17 "strings" 18 "sync" 19 "time" 20 21 "github.com/NYTimes/gziphandler" 22 "github.com/bluesky-social/indigo/api/bsky" 23 "github.com/google/uuid" 24 "github.com/julienschmidt/httprouter" 25 "github.com/labstack/echo/v4" 26 "github.com/rs/cors" 27 sloghttp "github.com/samber/slog-http" 28 "golang.org/x/time/rate" 29 30 "github.com/streamplace/oatproxy/pkg/oatproxy" 31 "stream.place/streamplace/js/app" 32 "stream.place/streamplace/pkg/atproto" 33 "stream.place/streamplace/pkg/bus" 34 "stream.place/streamplace/pkg/config" 35 "stream.place/streamplace/pkg/crypto/signers/eip712" 36 "stream.place/streamplace/pkg/director" 37 apierrors "stream.place/streamplace/pkg/errors" 38 "stream.place/streamplace/pkg/linking" 39 "stream.place/streamplace/pkg/localdb" 40 "stream.place/streamplace/pkg/log" 41 "stream.place/streamplace/pkg/media" 42 "stream.place/streamplace/pkg/mist/mistconfig" 43 "stream.place/streamplace/pkg/model" 44 "stream.place/streamplace/pkg/notifications" 45 "stream.place/streamplace/pkg/spxrpc" 46 "stream.place/streamplace/pkg/statedb" 47 "stream.place/streamplace/pkg/streamplace" 48 49 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 50 "github.com/slok/go-http-metrics/middleware" 51 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 52 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 53 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 54) 55 56type StreamplaceAPI struct { 57 CLI *config.CLI 58 Model model.Model 59 StatefulDB *statedb.StatefulDB 60 LocalDB localdb.LocalDB 61 Updater *Updater 62 Signer *eip712.EIP712Signer 63 Mimes map[string]string 64 FirebaseNotifier notifications.FirebaseNotifier 65 MediaManager *media.MediaManager 66 MediaSigner media.MediaSigner 67 XRPCServer *spxrpc.Server 68 // not thread-safe yet 69 Aliases map[string]string 70 Bus *bus.Bus 71 ATSync *atproto.ATProtoSynchronizer 72 Director *director.Director 73 74 connTracker *WebsocketTracker 75 76 limiters map[string]*rate.Limiter 77 limitersMu sync.Mutex 78 SignerCache map[string]media.MediaSigner 79 SignerCacheMu sync.Mutex 80 op *oatproxy.OATProxy 81 82 // override tls port for http redirect server if we're using systemd file descriptors 83 HTTPRedirectTLSPort *int 84 sessions map[string]map[string]time.Time 85 sessionsLock sync.RWMutex 86 87 rtmpSessions map[string]*media.RTMPSession 88 rtmpSessionsLock sync.Mutex 89 rtmpInternalPlaybackAddr string 90} 91 92type WebsocketTracker struct { 93 connections map[string]int 94 maxConnsPerIP int 95 mu sync.RWMutex 96} 97 98func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy, ldb localdb.LocalDB) (*StreamplaceAPI, error) { 99 updater, err := PrepareUpdater(cli) 100 if err != nil { 101 return nil, err 102 } 103 a := &StreamplaceAPI{CLI: cli, 104 Model: mod, 105 StatefulDB: statefulDB, 106 Updater: updater, 107 FirebaseNotifier: noter, 108 MediaManager: mm, 109 MediaSigner: ms, 110 Aliases: map[string]string{}, 111 Bus: bus, 112 ATSync: atsync, 113 Director: d, 114 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 115 limiters: make(map[string]*rate.Limiter), 116 SignerCache: make(map[string]media.MediaSigner), 117 op: op, 118 sessions: make(map[string]map[string]time.Time), 119 sessionsLock: sync.RWMutex{}, 120 rtmpSessions: make(map[string]*media.RTMPSession), 121 rtmpSessionsLock: sync.Mutex{}, 122 LocalDB: ldb, 123 } 124 a.Mimes, err = updater.GetMimes() 125 if err != nil { 126 return nil, err 127 } 128 return a, nil 129} 130 131type AppHostingFS struct { 132 http.FileSystem 133} 134 135var ErrorIndex = errors.New("not found, use index.html") 136 137func (fs AppHostingFS) Open(name string) (http.File, error) { 138 file, err1 := fs.FileSystem.Open(name) 139 if err1 == nil { 140 return file, nil 141 } 142 return nil, ErrorIndex 143} 144 145// api/playback/iame.li/webrtc?rendition=source 146// api/playback/iame.li/stream.mp4?rendition=source 147// api/playback/iame.li/stream.webm?rendition=source 148// api/playback/iame.li/hls/index.m3u8 149// api/playback/iame.li/hls/source/stream.m3u8 150// api/playback/iame.li/hls/source/000000000000.ts 151 152func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 153 154 mdlw := middleware.New(middleware.Config{ 155 Recorder: metrics.NewRecorder(metrics.Config{}), 156 }) 157 var xrpc http.Handler 158 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus, a.LocalDB, a.MediaManager, a.Aliases) 159 if err != nil { 160 return nil, err 161 } 162 a.XRPCServer = xrpc.(*spxrpc.Server) 163 router := httprouter.New() 164 165 // Create our middleware factory with the default settings. 166 167 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 168 169 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 170 171 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 172 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 173 } 174 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 175 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 176 } 177 178 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 179 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 180 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 181 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 182 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 183 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 184 apiRouter := httprouter.New() 185 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 186 // old clients 187 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 188 // new ones 189 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 190 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 191 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 192 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 193 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 194 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 195 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 196 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 197 // they're jpegs now 198 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 199 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 200 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 201 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 202 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 203 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 204 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 205 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 206 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 207 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 208 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 209 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 210 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 211 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 212 apiRouter.NotFound = a.HandleAPI404(ctx) 213 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 214 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 215 router.Handler("GET", "/api/*resource", apiRouterHandler) 216 router.Handler("POST", "/api/*resource", apiRouterHandler) 217 router.Handler("PUT", "/api/*resource", apiRouterHandler) 218 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 219 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 220 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 221 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 222 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 223 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 224 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 225 // i wonder if there's a better way to do this? 226 router.GET("/favicon.ico", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 227 err := a.XRPCServer.HandleFaviconICO(echo.New().NewContext(r, w)) 228 if err != nil { 229 log.Error(ctx, "error handling favicon.ico", "error", err) 230 w.WriteHeader(500) 231 return 232 } 233 }) 234 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 235 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 236 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 237 router.POST("/", a.HandleWebRTCIngest(ctx)) 238 if a.CLI.FrontendProxy != "" { 239 u, err := url.Parse(a.CLI.FrontendProxy) 240 if err != nil { 241 return nil, err 242 } 243 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 244 router.NotFound = &httputil.ReverseProxy{ 245 Rewrite: func(r *httputil.ProxyRequest) { 246 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 247 // we need to use 127.0.0.1 because the atproto oauth client requires it 248 r.Out.Header.Set("Origin", u.String()) 249 r.SetURL(u) 250 }, 251 } 252 } else { 253 files, err := app.Files() 254 if err != nil { 255 return nil, err 256 } 257 index, err := files.Open("index.html") 258 if err != nil { 259 return nil, err 260 } 261 bs, err := io.ReadAll(index) 262 if err != nil { 263 return nil, err 264 } 265 linker, err := linking.NewLinker(ctx, bs, a.StatefulDB, a.CLI) 266 if err != nil { 267 return nil, err 268 } 269 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 270 if err != nil { 271 return nil, err 272 } 273 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 274 } 275 // needed because the WebRTC handler issues 405s from / otherwise 276 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 277 router.NotFound.ServeHTTP(w, r) 278 }) 279 handler := sloghttp.Recovery(router) 280 handler = cors.AllowAll().Handler(handler) 281 handler = sloghttp.New(slog.Default())(handler) 282 handler = a.RateLimitMiddleware(ctx)(handler) 283 redirectMiddleware, err := a.RedirectMiddleware() 284 if err != nil { 285 return nil, err 286 } 287 handler = redirectMiddleware(handler) 288 289 // this needs to be LAST so nothing else clobbers the context 290 handler = a.ContextMiddleware(ctx)(handler) 291 292 return handler, nil 293} 294func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler { 295 return func(next http.Handler) http.Handler { 296 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 297 uuid := uuid.New().String() 298 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 299 r = r.WithContext(ctx) 300 next.ServeHTTP(w, r) 301 }) 302 } 303} 304func copyHeader(dst, src http.Header) { 305 for k, vv := range src { 306 // we'll handle CORS ourselves, thanks 307 if strings.HasPrefix(k, "Access-Control") { 308 continue 309 } 310 for _, v := range vv { 311 dst.Add(k, v) 312 } 313 } 314} 315 316// handler that takes care of static files and otherwise returns the index.html with the correct link card data 317func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 318 files, err := app.Files() 319 if err != nil { 320 return nil, err 321 } 322 fs := AppHostingFS{http.FS(files)} 323 324 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 325 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 326 f := strings.TrimPrefix(req.URL.Path, "/") 327 // under docs we need the index.html suffix due to astro rendering 328 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 329 f += "index.html" 330 } 331 _, err := fs.Open(f) 332 if err == nil { 333 fileHandler.ServeHTTP(w, req) 334 return 335 } 336 if errors.Is(err, ErrorIndex) || f == "" { 337 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN) 338 if err != nil { 339 log.Error(ctx, "error generating default card", "error", err) 340 } 341 w.Header().Set("Content-Type", "text/html") 342 if _, err := w.Write(bs); err != nil { 343 log.Error(ctx, "error writing response", "error", err) 344 } 345 } else { 346 log.Warn(ctx, "error opening file", "error", err) 347 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 348 } 349 }) 350 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 351 proto := "http" 352 if req.TLS != nil { 353 proto = "https" 354 } 355 fwProto := req.Header.Get("x-forwarded-proto") 356 if fwProto != "" { 357 proto = fwProto 358 } 359 req.URL.Host = req.Host 360 req.URL.Scheme = proto 361 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 362 // quick check for things that aren't valid handles/dids 363 if strings.ContainsAny(maybeHandle, "/_") { 364 defaultHandler.ServeHTTP(w, req) 365 return 366 } 367 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 368 if err != nil || repo == nil { 369 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 370 defaultHandler.ServeHTTP(w, req) 371 return 372 } 373 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 374 if err != nil || ls == nil { 375 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 376 defaultHandler.ServeHTTP(w, req) 377 return 378 } 379 lsv, err := ls.ToLivestreamView() 380 if err != nil || lsv == nil { 381 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 382 defaultHandler.ServeHTTP(w, req) 383 return 384 } 385 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN) 386 if err != nil { 387 log.Error(ctx, "error generating html", "error", err) 388 defaultHandler.ServeHTTP(w, req) 389 return 390 } 391 w.Header().Set("Content-Type", "text/html") 392 if _, err := w.Write(bs); err != nil { 393 log.Error(ctx, "error writing response", "error", err) 394 } 395 }), nil 396} 397 398func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 399 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 400 if !a.CLI.HasMist() { 401 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 402 return 403 } 404 stream := params.ByName("stream") 405 if stream == "" { 406 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 407 return 408 } 409 410 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 411 prefix := fmt.Sprintf(tmpl, fullstream) 412 resource := params.ByName("resource") 413 414 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 415 416 client := &http.Client{} 417 req.URL = &url.URL{ 418 Scheme: "http", 419 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 420 Path: fmt.Sprintf("%s%s", prefix, resource), 421 RawQuery: req.URL.RawQuery, 422 } 423 424 //http: Request.RequestURI can't be set in client requests. 425 //http://golang.org/src/pkg/net/http/client.go 426 req.RequestURI = "" 427 428 resp, err := client.Do(req) 429 if err != nil { 430 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 431 return 432 } 433 defer resp.Body.Close() 434 435 copyHeader(w.Header(), resp.Header) 436 w.WriteHeader(resp.StatusCode) 437 if _, err := io.Copy(w, resp.Body); err != nil { 438 log.Error(ctx, "error writing response", "error", err) 439 } 440 } 441} 442 443func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 444 return func(w http.ResponseWriter, req *http.Request) { 445 noslash := req.URL.Path[1:] 446 ct, ok := a.Mimes[noslash] 447 if ok { 448 w.Header().Set("content-type", ct) 449 } 450 fs.ServeHTTP(w, req) 451 } 452} 453 454func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 455 var tlsPort string 456 var err error 457 if a.HTTPRedirectTLSPort != nil { 458 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort) 459 } else { 460 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr) 461 if err != nil { 462 return nil, err 463 } 464 } 465 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 466 host, _, err := net.SplitHostPort(req.Host) 467 if err != nil { 468 host = req.Host 469 } 470 u := req.URL 471 if tlsPort == "443" { 472 u.Host = host 473 } else { 474 u.Host = net.JoinHostPort(host, tlsPort) 475 } 476 u.Scheme = "https" 477 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 478 } 479 mux := http.NewServeMux() 480 mux.HandleFunc("/", handleRedirect) 481 return mux, nil 482} 483 484type NotificationPayload struct { 485 Token string `json:"token"` 486 RepoDID string `json:"repoDID"` 487} 488 489func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 490 return func(w http.ResponseWriter, req *http.Request) { 491 w.WriteHeader(404) 492 } 493} 494 495func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 496 return func(w http.ResponseWriter, req *http.Request) { 497 payload, err := io.ReadAll(req.Body) 498 if err != nil { 499 log.Log(ctx, "error reading notification create", "error", err) 500 w.WriteHeader(400) 501 return 502 } 503 n := NotificationPayload{} 504 err = json.Unmarshal(payload, &n) 505 if err != nil { 506 log.Log(ctx, "error unmarshalling notification create", "error", err) 507 w.WriteHeader(400) 508 return 509 } 510 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 511 if err != nil { 512 log.Log(ctx, "error creating notification", "error", err) 513 w.WriteHeader(400) 514 return 515 } 516 log.Log(ctx, "successfully created notification", "token", n.Token) 517 w.WriteHeader(200) 518 if n.RepoDID != "" { 519 go func() { 520 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 521 if err != nil { 522 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 523 } 524 }() 525 } 526 } 527} 528 529func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 530 return func(w http.ResponseWriter, req *http.Request) { 531 err := a.MediaManager.ValidateMP4(ctx, req.Body, false) 532 if err != nil { 533 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 534 return 535 } 536 w.WriteHeader(200) 537 } 538} 539 540func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 541 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 542 if !a.CLI.PlayerTelemetry { 543 w.WriteHeader(200) 544 return 545 } 546 var event model.PlayerEventAPI 547 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 548 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 549 return 550 } 551 err := a.Model.CreatePlayerEvent(event) 552 if err != nil { 553 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 554 return 555 } 556 w.WriteHeader(201) 557 } 558} 559 560func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 561 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 562 user := params.ByName("user") 563 if user == "" { 564 apierrors.WriteHTTPBadRequest(w, "user required", nil) 565 return 566 } 567 user, err := a.NormalizeUser(ctx, user) 568 if err != nil { 569 apierrors.WriteHTTPNotFound(w, "user not found", err) 570 return 571 } 572 count := a.Bus.GetViewerCount(user) 573 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 574 if err != nil { 575 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 576 return 577 } 578 if _, err := w.Write(bs); err != nil { 579 log.Error(ctx, "error writing response", "error", err) 580 } 581 } 582} 583 584func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 585 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 586 log.Log(ctx, "got bluesky notification", "params", params) 587 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 588 if err != nil { 589 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 590 return 591 } 592 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 593 if err != nil { 594 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 595 return 596 } 597 bs, err := json.Marshal(signingKeys) 598 if err != nil { 599 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 600 return 601 } 602 if _, err := w.Write(bs); err != nil { 603 log.Error(ctx, "error writing response", "error", err) 604 } 605 } 606} 607 608type ChatResponse struct { 609 Post *bsky.FeedPost `json:"post"` 610 Repo *model.Repo `json:"repo"` 611 CID string `json:"cid"` 612} 613 614func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 615 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 616 user := params.ByName("repoDID") 617 if user == "" { 618 apierrors.WriteHTTPBadRequest(w, "user required", nil) 619 return 620 } 621 repoDID, err := a.NormalizeUser(ctx, user) 622 if err != nil { 623 apierrors.WriteHTTPNotFound(w, "user not found", err) 624 return 625 } 626 replies, err := a.Model.GetReplies(repoDID) 627 if err != nil { 628 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 629 return 630 } 631 bs, err := json.Marshal(replies) 632 if err != nil { 633 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 634 return 635 } 636 w.Header().Set("Content-Type", "application/json") 637 if _, err := w.Write(bs); err != nil { 638 log.Error(ctx, "error writing response", "error", err) 639 } 640 } 641} 642 643func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 644 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 645 user := params.ByName("repoDID") 646 if user == "" { 647 apierrors.WriteHTTPBadRequest(w, "user required", nil) 648 return 649 } 650 repoDID, err := a.NormalizeUser(ctx, user) 651 if err != nil { 652 apierrors.WriteHTTPNotFound(w, "user not found", err) 653 return 654 } 655 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 656 if err != nil { 657 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 658 return 659 } 660 if livestream == nil { 661 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 662 return 663 } 664 665 doc, err := livestream.ToLivestreamView() 666 if err != nil { 667 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 668 return 669 } 670 671 if livestream == nil { 672 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 673 return 674 } 675 676 bs, err := json.Marshal(doc) 677 if err != nil { 678 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 679 return 680 } 681 w.Header().Set("Content-Type", "application/json") 682 if _, err := w.Write(bs); err != nil { 683 log.Error(ctx, "error writing response", "error", err) 684 } 685 } 686} 687 688func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 689 return func(next http.Handler) http.Handler { 690 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 691 ip, _, err := net.SplitHostPort(req.RemoteAddr) 692 if err != nil { 693 ip = req.RemoteAddr 694 } 695 696 if a.CLI.RateLimitPerSecond > 0 { 697 limiter := a.getLimiter(ip) 698 699 if !limiter.Allow() { 700 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 701 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 702 return 703 } 704 } 705 706 next.ServeHTTP(w, req) 707 }) 708 } 709} 710 711type redirectRule struct { 712 re *regexp.Regexp 713 toURL *url.URL 714 rawTo string 715} 716 717// RedirectMiddleware returns a middleware that handles path redirects according to CLI.Redirects 718func (a *StreamplaceAPI) RedirectMiddleware() (func(http.Handler) http.Handler, error) { 719 var redirectRules []redirectRule 720 for from, to := range a.CLI.Redirects { 721 re, err := regexp.Compile(from) 722 if err != nil { 723 return nil, fmt.Errorf("invalid redirect pattern: %s (regex error: %w)", from, err) 724 } 725 toBase, err := url.Parse(to) 726 if err != nil { 727 return nil, fmt.Errorf("invalid redirect destination: %s", to) 728 } 729 redirectRules = append(redirectRules, redirectRule{re: re, toURL: toBase, rawTo: to}) 730 } 731 732 return func(next http.Handler) http.Handler { 733 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 734 // Only match redirections for GET requests 735 if r.Method == http.MethodGet { 736 for _, rule := range redirectRules { 737 if rule.re.MatchString(r.URL.Path) { 738 // Make new URL by copying base and setting url query param 739 redirectURL := *rule.toURL 740 q := redirectURL.Query() 741 q.Set("url", r.URL.String()) 742 redirectURL.RawQuery = q.Encode() 743 http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) 744 return 745 } 746 } 747 } 748 next.ServeHTTP(w, r) 749 }) 750 }, nil 751} 752 753// helper for getting a listener from a systemd file descriptor 754func getListenerFromFD(fdName string) (net.Listener, error) { 755 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 756 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 757 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 758 for i, name := range names { 759 if name == fdName { 760 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3) 761 f1 := os.NewFile(uintptr(i+3), fdName) 762 return net.FileListener(f1) 763 } 764 } 765 } 766 return nil, nil 767} 768 769func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 770 handler, err := a.Handler(ctx) 771 if err != nil { 772 return err 773 } 774 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 775 ln, err := getListenerFromFD("http") 776 if err != nil { 777 return err 778 } 779 if ln == nil { 780 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 781 if err != nil { 782 return err 783 } 784 } else { 785 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 786 } 787 log.Log(ctx, "http server starting", "addr", ln.Addr()) 788 return s.Serve(ln) 789 }) 790} 791 792func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 793 handler, err := a.RedirectHandler(ctx) 794 if err != nil { 795 return err 796 } 797 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 798 ln, err := getListenerFromFD("http") 799 if err != nil { 800 return err 801 } 802 if ln == nil { 803 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 804 if err != nil { 805 return err 806 } 807 } else { 808 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 809 } 810 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 811 return s.Serve(ln) 812 }) 813} 814 815func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 816 handler, err := a.Handler(ctx) 817 if err != nil { 818 return err 819 } 820 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 821 ln, err := getListenerFromFD("https") 822 if err != nil { 823 return err 824 } 825 if ln == nil { 826 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 827 if err != nil { 828 return err 829 } 830 } else { 831 // tell the redirect handler we're using systemd and they should go to 443 832 port443 := 443 833 a.HTTPRedirectTLSPort = &port443 834 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 835 } 836 log.Log(ctx, "https server starting", 837 "addr", ln.Addr(), 838 "certPath", a.CLI.TLSCertPath, 839 "keyPath", a.CLI.TLSKeyPath, 840 ) 841 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 842 }) 843} 844 845func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 846 ctx, cancel := context.WithCancel(ctx) 847 handler = gziphandler.GzipHandler(handler) 848 server := http.Server{Handler: handler} 849 var serveErr error 850 go func() { 851 serveErr = serve(&server) 852 cancel() 853 }() 854 <-ctx.Done() 855 if serveErr != nil { 856 return fmt.Errorf("error in http server: %w", serveErr) 857 } 858 859 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 860 defer cancel() 861 return server.Shutdown(ctx) 862} 863 864func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 865 return func(w http.ResponseWriter, req *http.Request) { 866 w.WriteHeader(200) 867 } 868} 869 870func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 871 a.limitersMu.Lock() 872 defer a.limitersMu.Unlock() 873 874 limiter, exists := a.limiters[ip] 875 if !exists { 876 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 877 a.limiters[ip] = limiter 878 } 879 880 return limiter 881} 882 883func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 884 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 885 user := params.ByName("user") 886 file := params.ByName("file") 887 if user == "" || file == "" { 888 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 889 return 890 } 891 user, err := a.NormalizeUser(ctx, user) 892 if err != nil { 893 apierrors.WriteHTTPNotFound(w, "user not found", err) 894 return 895 } 896 fPath := []string{user, "clips", file} 897 exists, err := a.CLI.DataFileExists(fPath) 898 if err != nil { 899 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 900 return 901 } 902 if !exists { 903 apierrors.WriteHTTPNotFound(w, "file not found", nil) 904 return 905 } 906 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 907 if err != nil { 908 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 909 return 910 } 911 defer fd.Close() 912 w.Header().Set("Content-Type", "video/mp4") 913 if _, err := io.Copy(w, fd); err != nil { 914 log.Error(ctx, "error writing response", "error", err) 915 } 916 } 917} 918 919func NewWebsocketTracker(maxConns int) *WebsocketTracker { 920 return &WebsocketTracker{ 921 connections: make(map[string]int), 922 maxConnsPerIP: maxConns, 923 } 924} 925 926func (t *WebsocketTracker) AddConnection(ip string) bool { 927 t.mu.Lock() 928 defer t.mu.Unlock() 929 930 count := t.connections[ip] 931 932 if count >= t.maxConnsPerIP { 933 return false 934 } 935 936 t.connections[ip] = count + 1 937 return true 938} 939 940func (t *WebsocketTracker) RemoveConnection(ip string) { 941 t.mu.Lock() 942 defer t.mu.Unlock() 943 944 count := t.connections[ip] 945 if count > 0 { 946 t.connections[ip] = count - 1 947 } 948 949 if t.connections[ip] == 0 { 950 delete(t.connections, ip) 951 } 952}