Live video on the AT Protocol
at v0.9.6 963 lines 31 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/google/uuid" 23 "github.com/julienschmidt/httprouter" 24 "github.com/labstack/echo/v4" 25 "github.com/rs/cors" 26 sloghttp "github.com/samber/slog-http" 27 "golang.org/x/time/rate" 28 29 "github.com/streamplace/oatproxy/pkg/oatproxy" 30 "stream.place/streamplace/js/app" 31 "stream.place/streamplace/pkg/atproto" 32 "stream.place/streamplace/pkg/bus" 33 "stream.place/streamplace/pkg/config" 34 "stream.place/streamplace/pkg/crypto/signers/eip712" 35 "stream.place/streamplace/pkg/director" 36 apierrors "stream.place/streamplace/pkg/errors" 37 "stream.place/streamplace/pkg/linking" 38 "stream.place/streamplace/pkg/log" 39 "stream.place/streamplace/pkg/media" 40 "stream.place/streamplace/pkg/mist/mistconfig" 41 "stream.place/streamplace/pkg/model" 42 "stream.place/streamplace/pkg/notifications" 43 "stream.place/streamplace/pkg/spmetrics" 44 "stream.place/streamplace/pkg/spxrpc" 45 "stream.place/streamplace/pkg/statedb" 46 "stream.place/streamplace/pkg/streamplace" 47 48 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 49 "github.com/slok/go-http-metrics/middleware" 50 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 51 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 52 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 53) 54 55type StreamplaceAPI struct { 56 CLI *config.CLI 57 Model model.Model 58 StatefulDB *statedb.StatefulDB 59 Updater *Updater 60 Signer *eip712.EIP712Signer 61 Mimes map[string]string 62 FirebaseNotifier notifications.FirebaseNotifier 63 MediaManager *media.MediaManager 64 MediaSigner media.MediaSigner 65 XRPCServer *spxrpc.Server 66 // not thread-safe yet 67 Aliases map[string]string 68 Bus *bus.Bus 69 ATSync *atproto.ATProtoSynchronizer 70 Director *director.Director 71 72 connTracker *WebsocketTracker 73 74 limiters map[string]*rate.Limiter 75 limitersMu sync.Mutex 76 SignerCache map[string]media.MediaSigner 77 SignerCacheMu sync.Mutex 78 op *oatproxy.OATProxy 79 80 // override tls port for http redirect server if we're using systemd file descriptors 81 HTTPRedirectTLSPort *int 82 sessions map[string]map[string]time.Time 83 sessionsLock sync.RWMutex 84 85 rtmpSessions map[string]*media.RTMPSession 86 rtmpSessionsLock sync.Mutex 87 rtmpInternalPlaybackAddr string 88} 89 90type WebsocketTracker struct { 91 connections map[string]int 92 maxConnsPerIP int 93 mu sync.RWMutex 94} 95 96func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 97 updater, err := PrepareUpdater(cli) 98 if err != nil { 99 return nil, err 100 } 101 a := &StreamplaceAPI{CLI: cli, 102 Model: mod, 103 StatefulDB: statefulDB, 104 Updater: updater, 105 FirebaseNotifier: noter, 106 MediaManager: mm, 107 MediaSigner: ms, 108 Aliases: map[string]string{}, 109 Bus: bus, 110 ATSync: atsync, 111 Director: d, 112 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 113 limiters: make(map[string]*rate.Limiter), 114 SignerCache: make(map[string]media.MediaSigner), 115 op: op, 116 sessions: make(map[string]map[string]time.Time), 117 sessionsLock: sync.RWMutex{}, 118 rtmpSessions: make(map[string]*media.RTMPSession), 119 rtmpSessionsLock: sync.Mutex{}, 120 } 121 a.Mimes, err = updater.GetMimes() 122 if err != nil { 123 return nil, err 124 } 125 return a, nil 126} 127 128type AppHostingFS struct { 129 http.FileSystem 130} 131 132var ErrorIndex = errors.New("not found, use index.html") 133 134func (fs AppHostingFS) Open(name string) (http.File, error) { 135 file, err1 := fs.FileSystem.Open(name) 136 if err1 == nil { 137 return file, nil 138 } 139 return nil, ErrorIndex 140} 141 142// api/playback/iame.li/webrtc?rendition=source 143// api/playback/iame.li/stream.mp4?rendition=source 144// api/playback/iame.li/stream.webm?rendition=source 145// api/playback/iame.li/hls/index.m3u8 146// api/playback/iame.li/hls/source/stream.m3u8 147// api/playback/iame.li/hls/source/000000000000.ts 148 149func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 150 151 mdlw := middleware.New(middleware.Config{ 152 Recorder: metrics.NewRecorder(metrics.Config{}), 153 }) 154 var xrpc http.Handler 155 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus) 156 if err != nil { 157 return nil, err 158 } 159 a.XRPCServer = xrpc.(*spxrpc.Server) 160 router := httprouter.New() 161 162 // Create our middleware factory with the default settings. 163 164 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 165 166 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 167 168 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 169 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 170 } 171 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 172 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 173 } 174 175 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 176 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 177 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 178 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 179 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 180 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 181 apiRouter := httprouter.New() 182 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 183 // old clients 184 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 185 // new ones 186 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 187 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 188 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 189 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 190 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 191 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 192 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 193 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 194 // they're jpegs now 195 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 196 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 197 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 198 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 199 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 200 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 201 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 202 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 203 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 204 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 205 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 206 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 207 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 208 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 209 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 210 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 211 apiRouter.NotFound = a.HandleAPI404(ctx) 212 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 213 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 214 router.Handler("GET", "/api/*resource", apiRouterHandler) 215 router.Handler("POST", "/api/*resource", apiRouterHandler) 216 router.Handler("PUT", "/api/*resource", apiRouterHandler) 217 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 218 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 219 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 220 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 221 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 222 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 223 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 224 // i wonder if there's a better way to do this? 225 router.GET("/favicon.ico", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 226 err := a.XRPCServer.HandleFaviconICO(echo.New().NewContext(r, w)) 227 if err != nil { 228 log.Error(ctx, "error handling favicon.ico", "error", err) 229 w.WriteHeader(500) 230 return 231 } 232 }) 233 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 234 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 235 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 236 router.POST("/", a.HandleWebRTCIngest(ctx)) 237 for _, redirect := range a.CLI.Redirects { 238 parts := strings.Split(redirect, ":") 239 if len(parts) != 2 { 240 log.Error(ctx, "invalid redirect", "redirect", redirect) 241 return nil, fmt.Errorf("invalid redirect: %s", redirect) 242 } 243 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 244 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 245 }) 246 } 247 if a.CLI.FrontendProxy != "" { 248 u, err := url.Parse(a.CLI.FrontendProxy) 249 if err != nil { 250 return nil, err 251 } 252 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 253 router.NotFound = &httputil.ReverseProxy{ 254 Rewrite: func(r *httputil.ProxyRequest) { 255 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 256 // we need to use 127.0.0.1 because the atproto oauth client requires it 257 r.Out.Header.Set("Origin", u.String()) 258 r.SetURL(u) 259 }, 260 } 261 } else { 262 files, err := app.Files() 263 if err != nil { 264 return nil, err 265 } 266 index, err := files.Open("index.html") 267 if err != nil { 268 return nil, err 269 } 270 bs, err := io.ReadAll(index) 271 if err != nil { 272 return nil, err 273 } 274 linker, err := linking.NewLinker(ctx, bs) 275 if err != nil { 276 return nil, err 277 } 278 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 279 if err != nil { 280 return nil, err 281 } 282 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 283 } 284 // needed because the WebRTC handler issues 405s from / otherwise 285 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 286 router.NotFound.ServeHTTP(w, r) 287 }) 288 handler := sloghttp.Recovery(router) 289 handler = cors.AllowAll().Handler(handler) 290 handler = sloghttp.New(slog.Default())(handler) 291 handler = a.RateLimitMiddleware(ctx)(handler) 292 293 // this needs to be LAST so nothing else clobbers the context 294 handler = a.ContextMiddleware(ctx)(handler) 295 296 return handler, nil 297} 298func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler { 299 return func(next http.Handler) http.Handler { 300 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 301 uuid := uuid.New().String() 302 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 303 r = r.WithContext(ctx) 304 next.ServeHTTP(w, r) 305 }) 306 } 307} 308func copyHeader(dst, src http.Header) { 309 for k, vv := range src { 310 // we'll handle CORS ourselves, thanks 311 if strings.HasPrefix(k, "Access-Control") { 312 continue 313 } 314 for _, v := range vv { 315 dst.Add(k, v) 316 } 317 } 318} 319 320// handler that takes care of static files and otherwise returns the index.html with the correct link card data 321func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 322 files, err := app.Files() 323 if err != nil { 324 return nil, err 325 } 326 fs := AppHostingFS{http.FS(files)} 327 328 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 329 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 330 f := strings.TrimPrefix(req.URL.Path, "/") 331 // under docs we need the index.html suffix due to astro rendering 332 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 333 f += "index.html" 334 } 335 _, err := fs.Open(f) 336 if err == nil { 337 fileHandler.ServeHTTP(w, req) 338 return 339 } 340 if errors.Is(err, ErrorIndex) || f == "" { 341 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN) 342 if err != nil { 343 log.Error(ctx, "error generating default card", "error", err) 344 } 345 w.Header().Set("Content-Type", "text/html") 346 if _, err := w.Write(bs); err != nil { 347 log.Error(ctx, "error writing response", "error", err) 348 } 349 } else { 350 log.Warn(ctx, "error opening file", "error", err) 351 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 352 } 353 }) 354 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 355 proto := "http" 356 if req.TLS != nil { 357 proto = "https" 358 } 359 fwProto := req.Header.Get("x-forwarded-proto") 360 if fwProto != "" { 361 proto = fwProto 362 } 363 req.URL.Host = req.Host 364 req.URL.Scheme = proto 365 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 366 // quick check for things that aren't valid handles/dids 367 if strings.ContainsAny(maybeHandle, "/_") { 368 defaultHandler.ServeHTTP(w, req) 369 return 370 } 371 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 372 if err != nil || repo == nil { 373 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 374 defaultHandler.ServeHTTP(w, req) 375 return 376 } 377 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 378 if err != nil || ls == nil { 379 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 380 defaultHandler.ServeHTTP(w, req) 381 return 382 } 383 lsv, err := ls.ToLivestreamView() 384 if err != nil || lsv == nil { 385 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 386 defaultHandler.ServeHTTP(w, req) 387 return 388 } 389 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN) 390 if err != nil { 391 log.Error(ctx, "error generating html", "error", err) 392 defaultHandler.ServeHTTP(w, req) 393 return 394 } 395 w.Header().Set("Content-Type", "text/html") 396 if _, err := w.Write(bs); err != nil { 397 log.Error(ctx, "error writing response", "error", err) 398 } 399 }), nil 400} 401 402func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 403 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 404 if !a.CLI.HasMist() { 405 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 406 return 407 } 408 stream := params.ByName("stream") 409 if stream == "" { 410 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 411 return 412 } 413 414 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 415 prefix := fmt.Sprintf(tmpl, fullstream) 416 resource := params.ByName("resource") 417 418 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 419 420 client := &http.Client{} 421 req.URL = &url.URL{ 422 Scheme: "http", 423 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 424 Path: fmt.Sprintf("%s%s", prefix, resource), 425 RawQuery: req.URL.RawQuery, 426 } 427 428 //http: Request.RequestURI can't be set in client requests. 429 //http://golang.org/src/pkg/net/http/client.go 430 req.RequestURI = "" 431 432 resp, err := client.Do(req) 433 if err != nil { 434 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 435 return 436 } 437 defer resp.Body.Close() 438 439 copyHeader(w.Header(), resp.Header) 440 w.WriteHeader(resp.StatusCode) 441 if _, err := io.Copy(w, resp.Body); err != nil { 442 log.Error(ctx, "error writing response", "error", err) 443 } 444 } 445} 446 447func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 448 return func(w http.ResponseWriter, req *http.Request) { 449 noslash := req.URL.Path[1:] 450 ct, ok := a.Mimes[noslash] 451 if ok { 452 w.Header().Set("content-type", ct) 453 } 454 fs.ServeHTTP(w, req) 455 } 456} 457 458func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 459 var tlsPort string 460 var err error 461 if a.HTTPRedirectTLSPort != nil { 462 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort) 463 } else { 464 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr) 465 if err != nil { 466 return nil, err 467 } 468 } 469 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 470 host, _, err := net.SplitHostPort(req.Host) 471 if err != nil { 472 host = req.Host 473 } 474 u := req.URL 475 if tlsPort == "443" { 476 u.Host = host 477 } else { 478 u.Host = net.JoinHostPort(host, tlsPort) 479 } 480 u.Scheme = "https" 481 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 482 } 483 mux := http.NewServeMux() 484 mux.HandleFunc("/", handleRedirect) 485 return mux, nil 486} 487 488type NotificationPayload struct { 489 Token string `json:"token"` 490 RepoDID string `json:"repoDID"` 491} 492 493func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 494 return func(w http.ResponseWriter, req *http.Request) { 495 w.WriteHeader(404) 496 } 497} 498 499func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 500 return func(w http.ResponseWriter, req *http.Request) { 501 payload, err := io.ReadAll(req.Body) 502 if err != nil { 503 log.Log(ctx, "error reading notification create", "error", err) 504 w.WriteHeader(400) 505 return 506 } 507 n := NotificationPayload{} 508 err = json.Unmarshal(payload, &n) 509 if err != nil { 510 log.Log(ctx, "error unmarshalling notification create", "error", err) 511 w.WriteHeader(400) 512 return 513 } 514 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 515 if err != nil { 516 log.Log(ctx, "error creating notification", "error", err) 517 w.WriteHeader(400) 518 return 519 } 520 log.Log(ctx, "successfully created notification", "token", n.Token) 521 w.WriteHeader(200) 522 if n.RepoDID != "" { 523 go func() { 524 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 525 if err != nil { 526 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 527 } 528 }() 529 } 530 } 531} 532 533func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 534 return func(w http.ResponseWriter, req *http.Request) { 535 err := a.MediaManager.ValidateMP4(ctx, req.Body, false) 536 if err != nil { 537 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 538 return 539 } 540 w.WriteHeader(200) 541 } 542} 543 544func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 545 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 546 if !a.CLI.PlayerTelemetry { 547 w.WriteHeader(200) 548 return 549 } 550 var event model.PlayerEventAPI 551 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 552 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 553 return 554 } 555 err := a.Model.CreatePlayerEvent(event) 556 if err != nil { 557 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 558 return 559 } 560 w.WriteHeader(201) 561 } 562} 563 564func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 565 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 566 segs, err := a.Model.MostRecentSegments() 567 if err != nil { 568 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 569 return 570 } 571 bs, err := json.Marshal(segs) 572 if err != nil { 573 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 574 return 575 } 576 w.Header().Add("Content-Type", "application/json") 577 if _, err := w.Write(bs); err != nil { 578 log.Error(ctx, "error writing response", "error", err) 579 } 580 } 581} 582 583func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 584 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 585 user := params.ByName("repoDID") 586 if user == "" { 587 apierrors.WriteHTTPBadRequest(w, "user required", nil) 588 return 589 } 590 user, err := a.NormalizeUser(ctx, user) 591 if err != nil { 592 apierrors.WriteHTTPNotFound(w, "user not found", err) 593 return 594 } 595 seg, err := a.Model.LatestSegmentForUser(user) 596 if err != nil { 597 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 598 return 599 } 600 streamplaceSeg, err := seg.ToStreamplaceSegment() 601 if err != nil { 602 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 603 return 604 } 605 bs, err := json.Marshal(streamplaceSeg) 606 if err != nil { 607 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 608 return 609 } 610 w.Header().Add("Content-Type", "application/json") 611 if _, err := w.Write(bs); err != nil { 612 log.Error(ctx, "error writing response", "error", err) 613 } 614 } 615} 616 617func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 618 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 619 user := params.ByName("user") 620 if user == "" { 621 apierrors.WriteHTTPBadRequest(w, "user required", nil) 622 return 623 } 624 user, err := a.NormalizeUser(ctx, user) 625 if err != nil { 626 apierrors.WriteHTTPNotFound(w, "user not found", err) 627 return 628 } 629 count := spmetrics.GetViewCount(user) 630 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 631 if err != nil { 632 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 633 return 634 } 635 if _, err := w.Write(bs); err != nil { 636 log.Error(ctx, "error writing response", "error", err) 637 } 638 } 639} 640 641func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 642 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 643 log.Log(ctx, "got bluesky notification", "params", params) 644 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 645 if err != nil { 646 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 647 return 648 } 649 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 650 if err != nil { 651 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 652 return 653 } 654 bs, err := json.Marshal(signingKeys) 655 if err != nil { 656 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 657 return 658 } 659 if _, err := w.Write(bs); err != nil { 660 log.Error(ctx, "error writing response", "error", err) 661 } 662 } 663} 664 665type ChatResponse struct { 666 Post *bsky.FeedPost `json:"post"` 667 Repo *model.Repo `json:"repo"` 668 CID string `json:"cid"` 669} 670 671func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 672 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 673 user := params.ByName("repoDID") 674 if user == "" { 675 apierrors.WriteHTTPBadRequest(w, "user required", nil) 676 return 677 } 678 repoDID, err := a.NormalizeUser(ctx, user) 679 if err != nil { 680 apierrors.WriteHTTPNotFound(w, "user not found", err) 681 return 682 } 683 replies, err := a.Model.GetReplies(repoDID) 684 if err != nil { 685 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 686 return 687 } 688 bs, err := json.Marshal(replies) 689 if err != nil { 690 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 691 return 692 } 693 w.Header().Set("Content-Type", "application/json") 694 if _, err := w.Write(bs); err != nil { 695 log.Error(ctx, "error writing response", "error", err) 696 } 697 } 698} 699 700func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 701 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 702 user := params.ByName("repoDID") 703 if user == "" { 704 apierrors.WriteHTTPBadRequest(w, "user required", nil) 705 return 706 } 707 repoDID, err := a.NormalizeUser(ctx, user) 708 if err != nil { 709 apierrors.WriteHTTPNotFound(w, "user not found", err) 710 return 711 } 712 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 713 if err != nil { 714 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 715 return 716 } 717 718 doc, err := livestream.ToLivestreamView() 719 if err != nil { 720 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 721 return 722 } 723 724 if livestream == nil { 725 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 726 return 727 } 728 729 bs, err := json.Marshal(doc) 730 if err != nil { 731 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 732 return 733 } 734 w.Header().Set("Content-Type", "application/json") 735 if _, err := w.Write(bs); err != nil { 736 log.Error(ctx, "error writing response", "error", err) 737 } 738 } 739} 740 741func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 742 return func(next http.Handler) http.Handler { 743 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 744 ip, _, err := net.SplitHostPort(req.RemoteAddr) 745 if err != nil { 746 ip = req.RemoteAddr 747 } 748 749 if a.CLI.RateLimitPerSecond > 0 { 750 limiter := a.getLimiter(ip) 751 752 if !limiter.Allow() { 753 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 754 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 755 return 756 } 757 } 758 759 next.ServeHTTP(w, req) 760 }) 761 } 762} 763 764// helper for getting a listener from a systemd file descriptor 765func getListenerFromFD(fdName string) (net.Listener, error) { 766 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 767 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 768 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 769 for i, name := range names { 770 if name == fdName { 771 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3) 772 f1 := os.NewFile(uintptr(i+3), fdName) 773 return net.FileListener(f1) 774 } 775 } 776 } 777 return nil, nil 778} 779 780func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 781 handler, err := a.Handler(ctx) 782 if err != nil { 783 return err 784 } 785 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 786 ln, err := getListenerFromFD("http") 787 if err != nil { 788 return err 789 } 790 if ln == nil { 791 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 792 if err != nil { 793 return err 794 } 795 } else { 796 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 797 } 798 log.Log(ctx, "http server starting", "addr", ln.Addr()) 799 return s.Serve(ln) 800 }) 801} 802 803func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 804 handler, err := a.RedirectHandler(ctx) 805 if err != nil { 806 return err 807 } 808 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 809 ln, err := getListenerFromFD("http") 810 if err != nil { 811 return err 812 } 813 if ln == nil { 814 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 815 if err != nil { 816 return err 817 } 818 } else { 819 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 820 } 821 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 822 return s.Serve(ln) 823 }) 824} 825 826func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 827 handler, err := a.Handler(ctx) 828 if err != nil { 829 return err 830 } 831 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 832 ln, err := getListenerFromFD("https") 833 if err != nil { 834 return err 835 } 836 if ln == nil { 837 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 838 if err != nil { 839 return err 840 } 841 } else { 842 // tell the redirect handler we're using systemd and they should go to 443 843 port443 := 443 844 a.HTTPRedirectTLSPort = &port443 845 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 846 } 847 log.Log(ctx, "https server starting", 848 "addr", ln.Addr(), 849 "certPath", a.CLI.TLSCertPath, 850 "keyPath", a.CLI.TLSKeyPath, 851 ) 852 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 853 }) 854} 855 856func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 857 ctx, cancel := context.WithCancel(ctx) 858 handler = gziphandler.GzipHandler(handler) 859 server := http.Server{Handler: handler} 860 var serveErr error 861 go func() { 862 serveErr = serve(&server) 863 cancel() 864 }() 865 <-ctx.Done() 866 if serveErr != nil { 867 return fmt.Errorf("error in http server: %w", serveErr) 868 } 869 870 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 871 defer cancel() 872 return server.Shutdown(ctx) 873} 874 875func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 876 return func(w http.ResponseWriter, req *http.Request) { 877 w.WriteHeader(200) 878 } 879} 880 881func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 882 a.limitersMu.Lock() 883 defer a.limitersMu.Unlock() 884 885 limiter, exists := a.limiters[ip] 886 if !exists { 887 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 888 a.limiters[ip] = limiter 889 } 890 891 return limiter 892} 893 894func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 895 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 896 user := params.ByName("user") 897 file := params.ByName("file") 898 if user == "" || file == "" { 899 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 900 return 901 } 902 user, err := a.NormalizeUser(ctx, user) 903 if err != nil { 904 apierrors.WriteHTTPNotFound(w, "user not found", err) 905 return 906 } 907 fPath := []string{user, "clips", file} 908 exists, err := a.CLI.DataFileExists(fPath) 909 if err != nil { 910 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 911 return 912 } 913 if !exists { 914 apierrors.WriteHTTPNotFound(w, "file not found", nil) 915 return 916 } 917 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 918 if err != nil { 919 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 920 return 921 } 922 defer fd.Close() 923 w.Header().Set("Content-Type", "video/mp4") 924 if _, err := io.Copy(w, fd); err != nil { 925 log.Error(ctx, "error writing response", "error", err) 926 } 927 } 928} 929 930func NewWebsocketTracker(maxConns int) *WebsocketTracker { 931 return &WebsocketTracker{ 932 connections: make(map[string]int), 933 maxConnsPerIP: maxConns, 934 } 935} 936 937func (t *WebsocketTracker) AddConnection(ip string) bool { 938 t.mu.Lock() 939 defer t.mu.Unlock() 940 941 count := t.connections[ip] 942 943 if count >= t.maxConnsPerIP { 944 return false 945 } 946 947 t.connections[ip] = count + 1 948 return true 949} 950 951func (t *WebsocketTracker) RemoveConnection(ip string) { 952 t.mu.Lock() 953 defer t.mu.Unlock() 954 955 count := t.connections[ip] 956 if count > 0 { 957 t.connections[ip] = count - 1 958 } 959 960 if t.connections[ip] == 0 { 961 delete(t.connections, ip) 962 } 963}