Live video on the AT Protocol
at natb/analytics 1030 lines 33 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/google/uuid" 23 "github.com/julienschmidt/httprouter" 24 "github.com/labstack/echo/v4" 25 "github.com/rs/cors" 26 sloghttp "github.com/samber/slog-http" 27 "golang.org/x/time/rate" 28 29 "github.com/streamplace/oatproxy/pkg/oatproxy" 30 "stream.place/streamplace/js/app" 31 "stream.place/streamplace/pkg/analytics" 32 "stream.place/streamplace/pkg/atproto" 33 "stream.place/streamplace/pkg/bus" 34 "stream.place/streamplace/pkg/config" 35 "stream.place/streamplace/pkg/crypto/signers/eip712" 36 "stream.place/streamplace/pkg/director" 37 apierrors "stream.place/streamplace/pkg/errors" 38 "stream.place/streamplace/pkg/linking" 39 "stream.place/streamplace/pkg/log" 40 "stream.place/streamplace/pkg/media" 41 "stream.place/streamplace/pkg/mist/mistconfig" 42 "stream.place/streamplace/pkg/model" 43 "stream.place/streamplace/pkg/notifications" 44 "stream.place/streamplace/pkg/spmetrics" 45 "stream.place/streamplace/pkg/spxrpc" 46 "stream.place/streamplace/pkg/statedb" 47 "stream.place/streamplace/pkg/streamplace" 48 49 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 50 "github.com/slok/go-http-metrics/middleware" 51 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 52 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 53 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 54) 55 56type StreamplaceAPI struct { 57 CLI *config.CLI 58 Model model.Model 59 StatefulDB *statedb.StatefulDB 60 Updater *Updater 61 Signer *eip712.EIP712Signer 62 Mimes map[string]string 63 FirebaseNotifier notifications.FirebaseNotifier 64 AnalyticsClient analytics.Client 65 MediaManager *media.MediaManager 66 MediaSigner media.MediaSigner 67 XRPCServer *spxrpc.Server 68 // not thread-safe yet 69 Aliases map[string]string 70 Bus *bus.Bus 71 ATSync *atproto.ATProtoSynchronizer 72 Director *director.Director 73 74 connTracker *WebsocketTracker 75 76 limiters map[string]*rate.Limiter 77 limitersMu sync.Mutex 78 SignerCache map[string]media.MediaSigner 79 SignerCacheMu sync.Mutex 80 op *oatproxy.OATProxy 81 82 // override tls port for http redirect server if we're using systemd file descriptors 83 HTTPRedirectTLSPort *int 84 sessions map[string]map[string]time.Time 85 sessionsLock sync.RWMutex 86 87 rtmpSessions map[string]*media.RTMPSession 88 rtmpSessionsLock sync.Mutex 89 rtmpInternalPlaybackAddr string 90} 91 92type WebsocketTracker struct { 93 connections map[string]int 94 maxConnsPerIP int 95 mu sync.RWMutex 96} 97 98func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, analyticsClient analytics.Client, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 99 updater, err := PrepareUpdater(cli) 100 if err != nil { 101 return nil, err 102 } 103 a := &StreamplaceAPI{CLI: cli, 104 Model: mod, 105 StatefulDB: statefulDB, 106 Updater: updater, 107 FirebaseNotifier: noter, 108 AnalyticsClient: analyticsClient, 109 MediaManager: mm, 110 MediaSigner: ms, 111 Aliases: map[string]string{}, 112 Bus: bus, 113 ATSync: atsync, 114 Director: d, 115 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 116 limiters: make(map[string]*rate.Limiter), 117 SignerCache: make(map[string]media.MediaSigner), 118 op: op, 119 sessions: make(map[string]map[string]time.Time), 120 sessionsLock: sync.RWMutex{}, 121 rtmpSessions: make(map[string]*media.RTMPSession), 122 rtmpSessionsLock: sync.Mutex{}, 123 } 124 a.Mimes, err = updater.GetMimes() 125 if err != nil { 126 return nil, err 127 } 128 return a, nil 129} 130 131type AppHostingFS struct { 132 http.FileSystem 133} 134 135var ErrorIndex = errors.New("not found, use index.html") 136 137func (fs AppHostingFS) Open(name string) (http.File, error) { 138 file, err1 := fs.FileSystem.Open(name) 139 if err1 == nil { 140 return file, nil 141 } 142 return nil, ErrorIndex 143} 144 145// api/playback/iame.li/webrtc?rendition=source 146// api/playback/iame.li/stream.mp4?rendition=source 147// api/playback/iame.li/stream.webm?rendition=source 148// api/playback/iame.li/hls/index.m3u8 149// api/playback/iame.li/hls/source/stream.m3u8 150// api/playback/iame.li/hls/source/000000000000.ts 151 152func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 153 154 mdlw := middleware.New(middleware.Config{ 155 Recorder: metrics.NewRecorder(metrics.Config{}), 156 }) 157 var xrpc http.Handler 158 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus) 159 if err != nil { 160 return nil, err 161 } 162 a.XRPCServer = xrpc.(*spxrpc.Server) 163 router := httprouter.New() 164 165 // Create our middleware factory with the default settings. 166 167 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 168 169 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 170 171 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 172 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 173 } 174 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 175 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 176 } 177 178 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 179 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 180 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 181 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 182 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 183 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 184 apiRouter := httprouter.New() 185 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 186 // old clients 187 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 188 // new ones 189 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 190 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 191 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 192 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 193 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 194 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 195 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 196 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 197 // they're jpegs now 198 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 199 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 200 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 201 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 202 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 203 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 204 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 205 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 206 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 207 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 208 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 209 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 210 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 211 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 212 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 213 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 214 215 // Analytics routes (optional, only if analytics is configured) 216 if a.AnalyticsClient != nil { 217 addFunc(apiRouter, "GET", "/api/analytics/realtime", analytics.HandleRealtimeStats(a.AnalyticsClient)) 218 addHandle(apiRouter, "GET", "/api/analytics/streamer/:did", analytics.HandleStreamerStats(a.AnalyticsClient)) 219 addHandle(apiRouter, "GET", "/api/analytics/viewer/:did", analytics.HandleViewerHistory(a.AnalyticsClient)) 220 log.Log(ctx, "registered analytics HTTP endpoints") 221 } else { 222 log.Debug(ctx, "analytics not configured, skipping analytics endpoints") 223 } 224 225 apiRouter.NotFound = a.HandleAPI404(ctx) 226 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 227 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 228 router.Handler("GET", "/api/*resource", apiRouterHandler) 229 router.Handler("POST", "/api/*resource", apiRouterHandler) 230 router.Handler("PUT", "/api/*resource", apiRouterHandler) 231 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 232 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 233 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 234 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 235 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 236 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 237 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 238 // i wonder if there's a better way to do this? 239 router.GET("/favicon.ico", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 240 err := a.XRPCServer.HandleFaviconICO(echo.New().NewContext(r, w)) 241 if err != nil { 242 log.Error(ctx, "error handling favicon.ico", "error", err) 243 w.WriteHeader(500) 244 return 245 } 246 }) 247 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 248 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 249 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 250 router.POST("/", a.HandleWebRTCIngest(ctx)) 251 for _, redirect := range a.CLI.Redirects { 252 parts := strings.Split(redirect, ":") 253 if len(parts) != 2 { 254 log.Error(ctx, "invalid redirect", "redirect", redirect) 255 return nil, fmt.Errorf("invalid redirect: %s", redirect) 256 } 257 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 258 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 259 }) 260 } 261 if a.CLI.FrontendProxy != "" { 262 u, err := url.Parse(a.CLI.FrontendProxy) 263 if err != nil { 264 return nil, err 265 } 266 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 267 router.NotFound = &httputil.ReverseProxy{ 268 Rewrite: func(r *httputil.ProxyRequest) { 269 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 270 // we need to use 127.0.0.1 because the atproto oauth client requires it 271 r.Out.Header.Set("Origin", u.String()) 272 r.SetURL(u) 273 }, 274 } 275 } else { 276 files, err := app.Files() 277 if err != nil { 278 return nil, err 279 } 280 index, err := files.Open("index.html") 281 if err != nil { 282 return nil, err 283 } 284 bs, err := io.ReadAll(index) 285 if err != nil { 286 return nil, err 287 } 288 linker, err := linking.NewLinker(ctx, bs) 289 if err != nil { 290 return nil, err 291 } 292 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 293 if err != nil { 294 return nil, err 295 } 296 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 297 } 298 // needed because the WebRTC handler issues 405s from / otherwise 299 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 300 router.NotFound.ServeHTTP(w, r) 301 }) 302 handler := sloghttp.Recovery(router) 303 handler = cors.AllowAll().Handler(handler) 304 handler = sloghttp.New(slog.Default())(handler) 305 handler = a.RateLimitMiddleware(ctx)(handler) 306 307 // this needs to be LAST so nothing else clobbers the context 308 handler = a.ContextMiddleware(ctx)(handler) 309 310 return handler, nil 311} 312func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler { 313 return func(next http.Handler) http.Handler { 314 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 315 uuid := uuid.New().String() 316 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 317 r = r.WithContext(ctx) 318 next.ServeHTTP(w, r) 319 }) 320 } 321} 322func copyHeader(dst, src http.Header) { 323 for k, vv := range src { 324 // we'll handle CORS ourselves, thanks 325 if strings.HasPrefix(k, "Access-Control") { 326 continue 327 } 328 for _, v := range vv { 329 dst.Add(k, v) 330 } 331 } 332} 333 334// handler that takes care of static files and otherwise returns the index.html with the correct link card data 335func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 336 files, err := app.Files() 337 if err != nil { 338 return nil, err 339 } 340 fs := AppHostingFS{http.FS(files)} 341 342 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 343 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 344 f := strings.TrimPrefix(req.URL.Path, "/") 345 // under docs we need the index.html suffix due to astro rendering 346 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 347 f += "index.html" 348 } 349 _, err := fs.Open(f) 350 if err == nil { 351 fileHandler.ServeHTTP(w, req) 352 return 353 } 354 if errors.Is(err, ErrorIndex) || f == "" { 355 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN) 356 if err != nil { 357 log.Error(ctx, "error generating default card", "error", err) 358 } 359 w.Header().Set("Content-Type", "text/html") 360 if _, err := w.Write(bs); err != nil { 361 log.Error(ctx, "error writing response", "error", err) 362 } 363 } else { 364 log.Warn(ctx, "error opening file", "error", err) 365 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 366 } 367 }) 368 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 369 proto := "http" 370 if req.TLS != nil { 371 proto = "https" 372 } 373 fwProto := req.Header.Get("x-forwarded-proto") 374 if fwProto != "" { 375 proto = fwProto 376 } 377 req.URL.Host = req.Host 378 req.URL.Scheme = proto 379 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 380 // quick check for things that aren't valid handles/dids 381 if strings.ContainsAny(maybeHandle, "/_") { 382 defaultHandler.ServeHTTP(w, req) 383 return 384 } 385 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 386 if err != nil || repo == nil { 387 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 388 defaultHandler.ServeHTTP(w, req) 389 return 390 } 391 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 392 if err != nil || ls == nil { 393 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 394 defaultHandler.ServeHTTP(w, req) 395 return 396 } 397 lsv, err := ls.ToLivestreamView() 398 if err != nil || lsv == nil { 399 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 400 defaultHandler.ServeHTTP(w, req) 401 return 402 } 403 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN) 404 if err != nil { 405 log.Error(ctx, "error generating html", "error", err) 406 defaultHandler.ServeHTTP(w, req) 407 return 408 } 409 w.Header().Set("Content-Type", "text/html") 410 if _, err := w.Write(bs); err != nil { 411 log.Error(ctx, "error writing response", "error", err) 412 } 413 }), nil 414} 415 416func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 417 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 418 if !a.CLI.HasMist() { 419 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 420 return 421 } 422 stream := params.ByName("stream") 423 if stream == "" { 424 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 425 return 426 } 427 428 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 429 prefix := fmt.Sprintf(tmpl, fullstream) 430 resource := params.ByName("resource") 431 432 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 433 434 client := &http.Client{} 435 req.URL = &url.URL{ 436 Scheme: "http", 437 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 438 Path: fmt.Sprintf("%s%s", prefix, resource), 439 RawQuery: req.URL.RawQuery, 440 } 441 442 //http: Request.RequestURI can't be set in client requests. 443 //http://golang.org/src/pkg/net/http/client.go 444 req.RequestURI = "" 445 446 resp, err := client.Do(req) 447 if err != nil { 448 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 449 return 450 } 451 defer resp.Body.Close() 452 453 copyHeader(w.Header(), resp.Header) 454 w.WriteHeader(resp.StatusCode) 455 if _, err := io.Copy(w, resp.Body); err != nil { 456 log.Error(ctx, "error writing response", "error", err) 457 } 458 } 459} 460 461func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 462 return func(w http.ResponseWriter, req *http.Request) { 463 noslash := req.URL.Path[1:] 464 ct, ok := a.Mimes[noslash] 465 if ok { 466 w.Header().Set("content-type", ct) 467 } 468 fs.ServeHTTP(w, req) 469 } 470} 471 472func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 473 var tlsPort string 474 var err error 475 if a.HTTPRedirectTLSPort != nil { 476 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort) 477 } else { 478 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr) 479 if err != nil { 480 return nil, err 481 } 482 } 483 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 484 host, _, err := net.SplitHostPort(req.Host) 485 if err != nil { 486 host = req.Host 487 } 488 u := req.URL 489 if tlsPort == "443" { 490 u.Host = host 491 } else { 492 u.Host = net.JoinHostPort(host, tlsPort) 493 } 494 u.Scheme = "https" 495 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 496 } 497 mux := http.NewServeMux() 498 mux.HandleFunc("/", handleRedirect) 499 return mux, nil 500} 501 502type NotificationPayload struct { 503 Token string `json:"token"` 504 RepoDID string `json:"repoDID"` 505} 506 507func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 508 return func(w http.ResponseWriter, req *http.Request) { 509 w.WriteHeader(404) 510 } 511} 512 513func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 514 return func(w http.ResponseWriter, req *http.Request) { 515 payload, err := io.ReadAll(req.Body) 516 if err != nil { 517 log.Log(ctx, "error reading notification create", "error", err) 518 w.WriteHeader(400) 519 return 520 } 521 n := NotificationPayload{} 522 err = json.Unmarshal(payload, &n) 523 if err != nil { 524 log.Log(ctx, "error unmarshalling notification create", "error", err) 525 w.WriteHeader(400) 526 return 527 } 528 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 529 if err != nil { 530 log.Log(ctx, "error creating notification", "error", err) 531 w.WriteHeader(400) 532 return 533 } 534 log.Log(ctx, "successfully created notification", "token", n.Token) 535 w.WriteHeader(200) 536 if n.RepoDID != "" { 537 go func() { 538 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 539 if err != nil { 540 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 541 } 542 }() 543 } 544 } 545} 546 547func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 548 return func(w http.ResponseWriter, req *http.Request) { 549 err := a.MediaManager.ValidateMP4(ctx, req.Body, false) 550 if err != nil { 551 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 552 return 553 } 554 w.WriteHeader(200) 555 } 556} 557 558func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 559 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 560 var event model.PlayerEventAPI 561 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 562 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 563 return 564 } 565 566 // Generate server-side ID 567 uu, err := uuid.NewV7() 568 if err != nil { 569 apierrors.WriteHTTPInternalServerError(w, "could not generate event ID", err) 570 return 571 } 572 event.ID = uu.String() 573 574 err = a.Model.CreatePlayerEvent(event) 575 if err != nil { 576 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 577 return 578 } 579 580 // Forward relevant events to analytics service if configured 581 if a.AnalyticsClient != nil { 582 go a.forwardPlayerEventToAnalytics(req.Context(), event) 583 } else { 584 log.Log(req.Context(), "no analytics client configured, skipping forwarding player event") 585 } 586 587 w.WriteHeader(201) 588 } 589} 590 591func (a *StreamplaceAPI) forwardPlayerEventToAnalytics(ctx context.Context, event model.PlayerEventAPI) { 592 // Only forward watch/playback events 593 if event.EventType != "aq-played" && event.EventType != "watch" { 594 return 595 } 596 597 // Parse metadata into typed struct 598 var meta model.PlayerEventMeta 599 metaBytes, _ := json.Marshal(event.Meta) 600 if err := json.Unmarshal(metaBytes, &meta); err != nil { 601 log.Log(ctx, "failed to parse event metadata", "error", err) 602 return 603 } 604 605 // Validate required fields 606 if err := meta.Validate(); err != nil { 607 log.Log(ctx, "invalid event metadata", "error", err) 608 return 609 } 610 611 analyticsEvent := &analytics.Event{ 612 EventID: event.ID, 613 EventType: event.EventType, 614 DeviceID: meta.DeviceID, 615 DID: meta.DID, 616 SessionID: meta.SessionID, 617 TimestampMs: event.Time.UnixMilli(), 618 StreamerDID: meta.StreamerDID, 619 StreamID: meta.StreamID, 620 PropertiesJSON: string(metaBytes), 621 SchemaVersion: 1, 622 ClientVersion: meta.ClientVersion, 623 Platform: meta.Platform, 624 } 625 626 if err := a.AnalyticsClient.IngestEvents(ctx, []*analytics.Event{analyticsEvent}); err != nil { 627 log.Log(ctx, "failed to ingest analytics event", "error", err) 628 } 629} 630 631func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 632 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 633 segs, err := a.Model.MostRecentSegments() 634 if err != nil { 635 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 636 return 637 } 638 bs, err := json.Marshal(segs) 639 if err != nil { 640 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 641 return 642 } 643 w.Header().Add("Content-Type", "application/json") 644 if _, err := w.Write(bs); err != nil { 645 log.Error(ctx, "error writing response", "error", err) 646 } 647 } 648} 649 650func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 651 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 652 user := params.ByName("repoDID") 653 if user == "" { 654 apierrors.WriteHTTPBadRequest(w, "user required", nil) 655 return 656 } 657 user, err := a.NormalizeUser(ctx, user) 658 if err != nil { 659 apierrors.WriteHTTPNotFound(w, "user not found", err) 660 return 661 } 662 seg, err := a.Model.LatestSegmentForUser(user) 663 if err != nil { 664 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 665 return 666 } 667 streamplaceSeg, err := seg.ToStreamplaceSegment() 668 if err != nil { 669 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 670 return 671 } 672 bs, err := json.Marshal(streamplaceSeg) 673 if err != nil { 674 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 675 return 676 } 677 w.Header().Add("Content-Type", "application/json") 678 if _, err := w.Write(bs); err != nil { 679 log.Error(ctx, "error writing response", "error", err) 680 } 681 } 682} 683 684func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 685 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 686 user := params.ByName("user") 687 if user == "" { 688 apierrors.WriteHTTPBadRequest(w, "user required", nil) 689 return 690 } 691 user, err := a.NormalizeUser(ctx, user) 692 if err != nil { 693 apierrors.WriteHTTPNotFound(w, "user not found", err) 694 return 695 } 696 count := spmetrics.GetViewCount(user) 697 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 698 if err != nil { 699 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 700 return 701 } 702 if _, err := w.Write(bs); err != nil { 703 log.Error(ctx, "error writing response", "error", err) 704 } 705 } 706} 707 708func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 709 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 710 log.Log(ctx, "got bluesky notification", "params", params) 711 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 712 if err != nil { 713 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 714 return 715 } 716 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 717 if err != nil { 718 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 719 return 720 } 721 bs, err := json.Marshal(signingKeys) 722 if err != nil { 723 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 724 return 725 } 726 if _, err := w.Write(bs); err != nil { 727 log.Error(ctx, "error writing response", "error", err) 728 } 729 } 730} 731 732type ChatResponse struct { 733 Post *bsky.FeedPost `json:"post"` 734 Repo *model.Repo `json:"repo"` 735 CID string `json:"cid"` 736} 737 738func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 739 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 740 user := params.ByName("repoDID") 741 if user == "" { 742 apierrors.WriteHTTPBadRequest(w, "user required", nil) 743 return 744 } 745 repoDID, err := a.NormalizeUser(ctx, user) 746 if err != nil { 747 apierrors.WriteHTTPNotFound(w, "user not found", err) 748 return 749 } 750 replies, err := a.Model.GetReplies(repoDID) 751 if err != nil { 752 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 753 return 754 } 755 bs, err := json.Marshal(replies) 756 if err != nil { 757 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 758 return 759 } 760 w.Header().Set("Content-Type", "application/json") 761 if _, err := w.Write(bs); err != nil { 762 log.Error(ctx, "error writing response", "error", err) 763 } 764 } 765} 766 767func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 768 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 769 user := params.ByName("repoDID") 770 if user == "" { 771 apierrors.WriteHTTPBadRequest(w, "user required", nil) 772 return 773 } 774 repoDID, err := a.NormalizeUser(ctx, user) 775 if err != nil { 776 apierrors.WriteHTTPNotFound(w, "user not found", err) 777 return 778 } 779 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 780 if err != nil { 781 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 782 return 783 } 784 785 doc, err := livestream.ToLivestreamView() 786 if err != nil { 787 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 788 return 789 } 790 791 if livestream == nil { 792 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 793 return 794 } 795 796 bs, err := json.Marshal(doc) 797 if err != nil { 798 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 799 return 800 } 801 w.Header().Set("Content-Type", "application/json") 802 if _, err := w.Write(bs); err != nil { 803 log.Error(ctx, "error writing response", "error", err) 804 } 805 } 806} 807 808func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 809 return func(next http.Handler) http.Handler { 810 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 811 ip, _, err := net.SplitHostPort(req.RemoteAddr) 812 if err != nil { 813 ip = req.RemoteAddr 814 } 815 816 if a.CLI.RateLimitPerSecond > 0 { 817 limiter := a.getLimiter(ip) 818 819 if !limiter.Allow() { 820 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 821 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 822 return 823 } 824 } 825 826 next.ServeHTTP(w, req) 827 }) 828 } 829} 830 831// helper for getting a listener from a systemd file descriptor 832func getListenerFromFD(fdName string) (net.Listener, error) { 833 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 834 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 835 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 836 for i, name := range names { 837 if name == fdName { 838 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3) 839 f1 := os.NewFile(uintptr(i+3), fdName) 840 return net.FileListener(f1) 841 } 842 } 843 } 844 return nil, nil 845} 846 847func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 848 handler, err := a.Handler(ctx) 849 if err != nil { 850 return err 851 } 852 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 853 ln, err := getListenerFromFD("http") 854 if err != nil { 855 return err 856 } 857 if ln == nil { 858 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 859 if err != nil { 860 return err 861 } 862 } else { 863 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 864 } 865 log.Log(ctx, "http server starting", "addr", ln.Addr()) 866 return s.Serve(ln) 867 }) 868} 869 870func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 871 handler, err := a.RedirectHandler(ctx) 872 if err != nil { 873 return err 874 } 875 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 876 ln, err := getListenerFromFD("http") 877 if err != nil { 878 return err 879 } 880 if ln == nil { 881 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 882 if err != nil { 883 return err 884 } 885 } else { 886 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 887 } 888 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 889 return s.Serve(ln) 890 }) 891} 892 893func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 894 handler, err := a.Handler(ctx) 895 if err != nil { 896 return err 897 } 898 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 899 ln, err := getListenerFromFD("https") 900 if err != nil { 901 return err 902 } 903 if ln == nil { 904 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 905 if err != nil { 906 return err 907 } 908 } else { 909 // tell the redirect handler we're using systemd and they should go to 443 910 port443 := 443 911 a.HTTPRedirectTLSPort = &port443 912 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 913 } 914 log.Log(ctx, "https server starting", 915 "addr", ln.Addr(), 916 "certPath", a.CLI.TLSCertPath, 917 "keyPath", a.CLI.TLSKeyPath, 918 ) 919 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 920 }) 921} 922 923func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 924 ctx, cancel := context.WithCancel(ctx) 925 handler = gziphandler.GzipHandler(handler) 926 server := http.Server{Handler: handler} 927 var serveErr error 928 go func() { 929 serveErr = serve(&server) 930 cancel() 931 }() 932 <-ctx.Done() 933 if serveErr != nil { 934 return fmt.Errorf("error in http server: %w", serveErr) 935 } 936 937 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 938 defer cancel() 939 return server.Shutdown(ctx) 940} 941 942func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 943 return func(w http.ResponseWriter, req *http.Request) { 944 w.WriteHeader(200) 945 } 946} 947 948func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 949 a.limitersMu.Lock() 950 defer a.limitersMu.Unlock() 951 952 limiter, exists := a.limiters[ip] 953 if !exists { 954 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 955 a.limiters[ip] = limiter 956 } 957 958 return limiter 959} 960 961func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 962 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 963 user := params.ByName("user") 964 file := params.ByName("file") 965 if user == "" || file == "" { 966 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 967 return 968 } 969 user, err := a.NormalizeUser(ctx, user) 970 if err != nil { 971 apierrors.WriteHTTPNotFound(w, "user not found", err) 972 return 973 } 974 fPath := []string{user, "clips", file} 975 exists, err := a.CLI.DataFileExists(fPath) 976 if err != nil { 977 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 978 return 979 } 980 if !exists { 981 apierrors.WriteHTTPNotFound(w, "file not found", nil) 982 return 983 } 984 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 985 if err != nil { 986 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 987 return 988 } 989 defer fd.Close() 990 w.Header().Set("Content-Type", "video/mp4") 991 if _, err := io.Copy(w, fd); err != nil { 992 log.Error(ctx, "error writing response", "error", err) 993 } 994 } 995} 996 997func NewWebsocketTracker(maxConns int) *WebsocketTracker { 998 return &WebsocketTracker{ 999 connections: make(map[string]int), 1000 maxConnsPerIP: maxConns, 1001 } 1002} 1003 1004func (t *WebsocketTracker) AddConnection(ip string) bool { 1005 t.mu.Lock() 1006 defer t.mu.Unlock() 1007 1008 count := t.connections[ip] 1009 1010 if count >= t.maxConnsPerIP { 1011 return false 1012 } 1013 1014 t.connections[ip] = count + 1 1015 return true 1016} 1017 1018func (t *WebsocketTracker) RemoveConnection(ip string) { 1019 t.mu.Lock() 1020 defer t.mu.Unlock() 1021 1022 count := t.connections[ip] 1023 if count > 0 { 1024 t.connections[ip] = count - 1 1025 } 1026 1027 if t.connections[ip] == 0 { 1028 delete(t.connections, ip) 1029 } 1030}