Live video on the AT Protocol
at natb/s3-ui 947 lines 30 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/google/uuid" 23 "github.com/julienschmidt/httprouter" 24 "github.com/rs/cors" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/time/rate" 27 28 "github.com/streamplace/oatproxy/pkg/oatproxy" 29 "stream.place/streamplace/js/app" 30 "stream.place/streamplace/pkg/atproto" 31 "stream.place/streamplace/pkg/bus" 32 "stream.place/streamplace/pkg/config" 33 "stream.place/streamplace/pkg/crypto/signers/eip712" 34 "stream.place/streamplace/pkg/director" 35 apierrors "stream.place/streamplace/pkg/errors" 36 "stream.place/streamplace/pkg/linking" 37 "stream.place/streamplace/pkg/log" 38 "stream.place/streamplace/pkg/media" 39 "stream.place/streamplace/pkg/mist/mistconfig" 40 "stream.place/streamplace/pkg/model" 41 "stream.place/streamplace/pkg/notifications" 42 "stream.place/streamplace/pkg/spmetrics" 43 "stream.place/streamplace/pkg/spxrpc" 44 "stream.place/streamplace/pkg/statedb" 45 "stream.place/streamplace/pkg/streamplace" 46 47 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 48 "github.com/slok/go-http-metrics/middleware" 49 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 50 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 51 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 52) 53 54type StreamplaceAPI struct { 55 CLI *config.CLI 56 Model model.Model 57 StatefulDB *statedb.StatefulDB 58 Updater *Updater 59 Signer *eip712.EIP712Signer 60 Mimes map[string]string 61 FirebaseNotifier notifications.FirebaseNotifier 62 MediaManager *media.MediaManager 63 MediaSigner media.MediaSigner 64 // not thread-safe yet 65 Aliases map[string]string 66 Bus *bus.Bus 67 ATSync *atproto.ATProtoSynchronizer 68 Director *director.Director 69 70 connTracker *WebsocketTracker 71 72 limiters map[string]*rate.Limiter 73 limitersMu sync.Mutex 74 SignerCache map[string]media.MediaSigner 75 SignerCacheMu sync.Mutex 76 op *oatproxy.OATProxy 77 78 // override tls port for http redirect server if we're using systemd file descriptors 79 HTTPRedirectTLSPort *int 80 sessions map[string]map[string]time.Time 81 sessionsLock sync.RWMutex 82 83 rtmpSessions map[string]*media.RTMPSession 84 rtmpSessionsLock sync.Mutex 85 rtmpInternalPlaybackAddr string 86} 87 88type WebsocketTracker struct { 89 connections map[string]int 90 maxConnsPerIP int 91 mu sync.RWMutex 92} 93 94func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 95 updater, err := PrepareUpdater(cli) 96 if err != nil { 97 return nil, err 98 } 99 a := &StreamplaceAPI{CLI: cli, 100 Model: mod, 101 StatefulDB: statefulDB, 102 Updater: updater, 103 FirebaseNotifier: noter, 104 MediaManager: mm, 105 MediaSigner: ms, 106 Aliases: map[string]string{}, 107 Bus: bus, 108 ATSync: atsync, 109 Director: d, 110 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 111 limiters: make(map[string]*rate.Limiter), 112 SignerCache: make(map[string]media.MediaSigner), 113 op: op, 114 sessions: make(map[string]map[string]time.Time), 115 sessionsLock: sync.RWMutex{}, 116 rtmpSessions: make(map[string]*media.RTMPSession), 117 rtmpSessionsLock: sync.Mutex{}, 118 } 119 a.Mimes, err = updater.GetMimes() 120 if err != nil { 121 return nil, err 122 } 123 return a, nil 124} 125 126type AppHostingFS struct { 127 http.FileSystem 128} 129 130var ErrorIndex = errors.New("not found, use index.html") 131 132func (fs AppHostingFS) Open(name string) (http.File, error) { 133 file, err1 := fs.FileSystem.Open(name) 134 if err1 == nil { 135 return file, nil 136 } 137 return nil, ErrorIndex 138} 139 140// api/playback/iame.li/webrtc?rendition=source 141// api/playback/iame.li/stream.mp4?rendition=source 142// api/playback/iame.li/stream.webm?rendition=source 143// api/playback/iame.li/hls/index.m3u8 144// api/playback/iame.li/hls/source/stream.m3u8 145// api/playback/iame.li/hls/source/000000000000.ts 146 147func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 148 149 mdlw := middleware.New(middleware.Config{ 150 Recorder: metrics.NewRecorder(metrics.Config{}), 151 }) 152 var xrpc http.Handler 153 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus) 154 if err != nil { 155 return nil, err 156 } 157 router := httprouter.New() 158 159 // Create our middleware factory with the default settings. 160 161 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 162 163 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 164 165 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 166 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 167 } 168 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 169 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 170 } 171 172 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 173 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 174 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 175 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 176 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 177 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 178 apiRouter := httprouter.New() 179 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 180 // old clients 181 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 182 // new ones 183 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 184 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 185 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 186 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 187 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 188 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 189 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 190 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 191 // they're jpegs now 192 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 193 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 194 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 195 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 196 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 197 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 198 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 199 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 200 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 201 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 202 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 203 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 204 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 205 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 206 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 207 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 208 apiRouter.NotFound = a.HandleAPI404(ctx) 209 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 210 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 211 router.Handler("GET", "/api/*resource", apiRouterHandler) 212 router.Handler("POST", "/api/*resource", apiRouterHandler) 213 router.Handler("PUT", "/api/*resource", apiRouterHandler) 214 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 215 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 216 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 217 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 218 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 219 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 220 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 221 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 222 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 223 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 224 router.POST("/", a.HandleWebRTCIngest(ctx)) 225 for _, redirect := range a.CLI.Redirects { 226 parts := strings.Split(redirect, ":") 227 if len(parts) != 2 { 228 log.Error(ctx, "invalid redirect", "redirect", redirect) 229 return nil, fmt.Errorf("invalid redirect: %s", redirect) 230 } 231 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 232 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 233 }) 234 } 235 if a.CLI.FrontendProxy != "" { 236 u, err := url.Parse(a.CLI.FrontendProxy) 237 if err != nil { 238 return nil, err 239 } 240 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 241 router.NotFound = &httputil.ReverseProxy{ 242 Rewrite: func(r *httputil.ProxyRequest) { 243 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 244 // we need to use 127.0.0.1 because the atproto oauth client requires it 245 r.Out.Header.Set("Origin", u.String()) 246 r.SetURL(u) 247 }, 248 } 249 } else { 250 files, err := app.Files() 251 if err != nil { 252 return nil, err 253 } 254 index, err := files.Open("index.html") 255 if err != nil { 256 return nil, err 257 } 258 bs, err := io.ReadAll(index) 259 if err != nil { 260 return nil, err 261 } 262 linker, err := linking.NewLinker(ctx, bs) 263 if err != nil { 264 return nil, err 265 } 266 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 267 if err != nil { 268 return nil, err 269 } 270 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 271 } 272 // needed because the WebRTC handler issues 405s from / otherwise 273 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 274 router.NotFound.ServeHTTP(w, r) 275 }) 276 handler := sloghttp.Recovery(router) 277 handler = cors.AllowAll().Handler(handler) 278 handler = sloghttp.New(slog.Default())(handler) 279 handler = a.RateLimitMiddleware(ctx)(handler) 280 281 // this needs to be LAST so nothing else clobbers the context 282 handler = a.ContextMiddleware(ctx)(handler) 283 284 return handler, nil 285} 286func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler { 287 return func(next http.Handler) http.Handler { 288 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 289 uuid := uuid.New().String() 290 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 291 r = r.WithContext(ctx) 292 next.ServeHTTP(w, r) 293 }) 294 } 295} 296func copyHeader(dst, src http.Header) { 297 for k, vv := range src { 298 // we'll handle CORS ourselves, thanks 299 if strings.HasPrefix(k, "Access-Control") { 300 continue 301 } 302 for _, v := range vv { 303 dst.Add(k, v) 304 } 305 } 306} 307 308// handler that takes care of static files and otherwise returns the index.html with the correct link card data 309func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 310 files, err := app.Files() 311 if err != nil { 312 return nil, err 313 } 314 fs := AppHostingFS{http.FS(files)} 315 316 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 317 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 318 f := strings.TrimPrefix(req.URL.Path, "/") 319 // under docs we need the index.html suffix due to astro rendering 320 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 321 f += "index.html" 322 } 323 _, err := fs.Open(f) 324 if err == nil { 325 fileHandler.ServeHTTP(w, req) 326 return 327 } 328 if errors.Is(err, ErrorIndex) || f == "" { 329 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN) 330 if err != nil { 331 log.Error(ctx, "error generating default card", "error", err) 332 } 333 w.Header().Set("Content-Type", "text/html") 334 if _, err := w.Write(bs); err != nil { 335 log.Error(ctx, "error writing response", "error", err) 336 } 337 } else { 338 log.Warn(ctx, "error opening file", "error", err) 339 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 340 } 341 }) 342 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 343 proto := "http" 344 if req.TLS != nil { 345 proto = "https" 346 } 347 fwProto := req.Header.Get("x-forwarded-proto") 348 if fwProto != "" { 349 proto = fwProto 350 } 351 req.URL.Host = req.Host 352 req.URL.Scheme = proto 353 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 354 // quick check for things that aren't valid handles/dids 355 if strings.ContainsAny(maybeHandle, "/_") { 356 defaultHandler.ServeHTTP(w, req) 357 return 358 } 359 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 360 if err != nil || repo == nil { 361 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 362 defaultHandler.ServeHTTP(w, req) 363 return 364 } 365 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 366 if err != nil || ls == nil { 367 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 368 defaultHandler.ServeHTTP(w, req) 369 return 370 } 371 lsv, err := ls.ToLivestreamView() 372 if err != nil || lsv == nil { 373 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 374 defaultHandler.ServeHTTP(w, req) 375 return 376 } 377 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN) 378 if err != nil { 379 log.Error(ctx, "error generating html", "error", err) 380 defaultHandler.ServeHTTP(w, req) 381 return 382 } 383 w.Header().Set("Content-Type", "text/html") 384 if _, err := w.Write(bs); err != nil { 385 log.Error(ctx, "error writing response", "error", err) 386 } 387 }), nil 388} 389 390func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 391 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 392 if !a.CLI.HasMist() { 393 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 394 return 395 } 396 stream := params.ByName("stream") 397 if stream == "" { 398 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 399 return 400 } 401 402 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 403 prefix := fmt.Sprintf(tmpl, fullstream) 404 resource := params.ByName("resource") 405 406 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 407 408 client := &http.Client{} 409 req.URL = &url.URL{ 410 Scheme: "http", 411 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 412 Path: fmt.Sprintf("%s%s", prefix, resource), 413 RawQuery: req.URL.RawQuery, 414 } 415 416 //http: Request.RequestURI can't be set in client requests. 417 //http://golang.org/src/pkg/net/http/client.go 418 req.RequestURI = "" 419 420 resp, err := client.Do(req) 421 if err != nil { 422 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 423 return 424 } 425 defer resp.Body.Close() 426 427 copyHeader(w.Header(), resp.Header) 428 w.WriteHeader(resp.StatusCode) 429 if _, err := io.Copy(w, resp.Body); err != nil { 430 log.Error(ctx, "error writing response", "error", err) 431 } 432 } 433} 434 435func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 436 return func(w http.ResponseWriter, req *http.Request) { 437 noslash := req.URL.Path[1:] 438 ct, ok := a.Mimes[noslash] 439 if ok { 440 w.Header().Set("content-type", ct) 441 } 442 fs.ServeHTTP(w, req) 443 } 444} 445 446func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 447 var tlsPort string 448 var err error 449 if a.HTTPRedirectTLSPort != nil { 450 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort) 451 } else { 452 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr) 453 if err != nil { 454 return nil, err 455 } 456 } 457 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 458 host, _, err := net.SplitHostPort(req.Host) 459 if err != nil { 460 host = req.Host 461 } 462 u := req.URL 463 if tlsPort == "443" { 464 u.Host = host 465 } else { 466 u.Host = net.JoinHostPort(host, tlsPort) 467 } 468 u.Scheme = "https" 469 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 470 } 471 mux := http.NewServeMux() 472 mux.HandleFunc("/", handleRedirect) 473 return mux, nil 474} 475 476type NotificationPayload struct { 477 Token string `json:"token"` 478 RepoDID string `json:"repoDID"` 479} 480 481func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 482 return func(w http.ResponseWriter, req *http.Request) { 483 w.WriteHeader(404) 484 } 485} 486 487func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 488 return func(w http.ResponseWriter, req *http.Request) { 489 payload, err := io.ReadAll(req.Body) 490 if err != nil { 491 log.Log(ctx, "error reading notification create", "error", err) 492 w.WriteHeader(400) 493 return 494 } 495 n := NotificationPayload{} 496 err = json.Unmarshal(payload, &n) 497 if err != nil { 498 log.Log(ctx, "error unmarshalling notification create", "error", err) 499 w.WriteHeader(400) 500 return 501 } 502 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 503 if err != nil { 504 log.Log(ctx, "error creating notification", "error", err) 505 w.WriteHeader(400) 506 return 507 } 508 log.Log(ctx, "successfully created notification", "token", n.Token) 509 w.WriteHeader(200) 510 if n.RepoDID != "" { 511 go func() { 512 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 513 if err != nil { 514 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 515 } 516 }() 517 } 518 } 519} 520 521func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 522 return func(w http.ResponseWriter, req *http.Request) { 523 err := a.MediaManager.ValidateMP4(ctx, req.Body, false) 524 if err != nil { 525 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 526 return 527 } 528 w.WriteHeader(200) 529 } 530} 531 532func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 533 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 534 var event model.PlayerEventAPI 535 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 536 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 537 return 538 } 539 err := a.Model.CreatePlayerEvent(event) 540 if err != nil { 541 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 542 return 543 } 544 w.WriteHeader(201) 545 } 546} 547 548func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 549 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 550 segs, err := a.Model.MostRecentSegments() 551 if err != nil { 552 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 553 return 554 } 555 bs, err := json.Marshal(segs) 556 if err != nil { 557 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 558 return 559 } 560 w.Header().Add("Content-Type", "application/json") 561 if _, err := w.Write(bs); err != nil { 562 log.Error(ctx, "error writing response", "error", err) 563 } 564 } 565} 566 567func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 568 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 569 user := params.ByName("repoDID") 570 if user == "" { 571 apierrors.WriteHTTPBadRequest(w, "user required", nil) 572 return 573 } 574 user, err := a.NormalizeUser(ctx, user) 575 if err != nil { 576 apierrors.WriteHTTPNotFound(w, "user not found", err) 577 return 578 } 579 seg, err := a.Model.LatestSegmentForUser(user) 580 if err != nil { 581 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 582 return 583 } 584 streamplaceSeg, err := seg.ToStreamplaceSegment() 585 if err != nil { 586 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 587 return 588 } 589 bs, err := json.Marshal(streamplaceSeg) 590 if err != nil { 591 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 592 return 593 } 594 w.Header().Add("Content-Type", "application/json") 595 if _, err := w.Write(bs); err != nil { 596 log.Error(ctx, "error writing response", "error", err) 597 } 598 } 599} 600 601func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 602 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 603 user := params.ByName("user") 604 if user == "" { 605 apierrors.WriteHTTPBadRequest(w, "user required", nil) 606 return 607 } 608 user, err := a.NormalizeUser(ctx, user) 609 if err != nil { 610 apierrors.WriteHTTPNotFound(w, "user not found", err) 611 return 612 } 613 count := spmetrics.GetViewCount(user) 614 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 615 if err != nil { 616 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 617 return 618 } 619 if _, err := w.Write(bs); err != nil { 620 log.Error(ctx, "error writing response", "error", err) 621 } 622 } 623} 624 625func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 626 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 627 log.Log(ctx, "got bluesky notification", "params", params) 628 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 629 if err != nil { 630 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 631 return 632 } 633 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 634 if err != nil { 635 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 636 return 637 } 638 bs, err := json.Marshal(signingKeys) 639 if err != nil { 640 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 641 return 642 } 643 if _, err := w.Write(bs); err != nil { 644 log.Error(ctx, "error writing response", "error", err) 645 } 646 } 647} 648 649type ChatResponse struct { 650 Post *bsky.FeedPost `json:"post"` 651 Repo *model.Repo `json:"repo"` 652 CID string `json:"cid"` 653} 654 655func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 656 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 657 user := params.ByName("repoDID") 658 if user == "" { 659 apierrors.WriteHTTPBadRequest(w, "user required", nil) 660 return 661 } 662 repoDID, err := a.NormalizeUser(ctx, user) 663 if err != nil { 664 apierrors.WriteHTTPNotFound(w, "user not found", err) 665 return 666 } 667 replies, err := a.Model.GetReplies(repoDID) 668 if err != nil { 669 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 670 return 671 } 672 bs, err := json.Marshal(replies) 673 if err != nil { 674 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 675 return 676 } 677 w.Header().Set("Content-Type", "application/json") 678 if _, err := w.Write(bs); err != nil { 679 log.Error(ctx, "error writing response", "error", err) 680 } 681 } 682} 683 684func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 685 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 686 user := params.ByName("repoDID") 687 if user == "" { 688 apierrors.WriteHTTPBadRequest(w, "user required", nil) 689 return 690 } 691 repoDID, err := a.NormalizeUser(ctx, user) 692 if err != nil { 693 apierrors.WriteHTTPNotFound(w, "user not found", err) 694 return 695 } 696 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 697 if err != nil { 698 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 699 return 700 } 701 702 doc, err := livestream.ToLivestreamView() 703 if err != nil { 704 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 705 return 706 } 707 708 if livestream == nil { 709 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 710 return 711 } 712 713 bs, err := json.Marshal(doc) 714 if err != nil { 715 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 716 return 717 } 718 w.Header().Set("Content-Type", "application/json") 719 if _, err := w.Write(bs); err != nil { 720 log.Error(ctx, "error writing response", "error", err) 721 } 722 } 723} 724 725func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 726 return func(next http.Handler) http.Handler { 727 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 728 ip, _, err := net.SplitHostPort(req.RemoteAddr) 729 if err != nil { 730 ip = req.RemoteAddr 731 } 732 733 if a.CLI.RateLimitPerSecond > 0 { 734 limiter := a.getLimiter(ip) 735 736 if !limiter.Allow() { 737 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 738 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 739 return 740 } 741 } 742 743 next.ServeHTTP(w, req) 744 }) 745 } 746} 747 748// helper for getting a listener from a systemd file descriptor 749func getListenerFromFD(fdName string) (net.Listener, error) { 750 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 751 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 752 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 753 for i, name := range names { 754 if name == fdName { 755 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3) 756 f1 := os.NewFile(uintptr(i+3), fdName) 757 return net.FileListener(f1) 758 } 759 } 760 } 761 return nil, nil 762} 763 764func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 765 handler, err := a.Handler(ctx) 766 if err != nil { 767 return err 768 } 769 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 770 ln, err := getListenerFromFD("http") 771 if err != nil { 772 return err 773 } 774 if ln == nil { 775 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 776 if err != nil { 777 return err 778 } 779 } else { 780 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 781 } 782 log.Log(ctx, "http server starting", "addr", ln.Addr()) 783 return s.Serve(ln) 784 }) 785} 786 787func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 788 handler, err := a.RedirectHandler(ctx) 789 if err != nil { 790 return err 791 } 792 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 793 ln, err := getListenerFromFD("http") 794 if err != nil { 795 return err 796 } 797 if ln == nil { 798 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 799 if err != nil { 800 return err 801 } 802 } else { 803 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 804 } 805 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 806 return s.Serve(ln) 807 }) 808} 809 810func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 811 handler, err := a.Handler(ctx) 812 if err != nil { 813 return err 814 } 815 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 816 ln, err := getListenerFromFD("https") 817 if err != nil { 818 return err 819 } 820 if ln == nil { 821 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 822 if err != nil { 823 return err 824 } 825 } else { 826 // tell the redirect handler we're using systemd and they should go to 443 827 port443 := 443 828 a.HTTPRedirectTLSPort = &port443 829 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 830 } 831 log.Log(ctx, "https server starting", 832 "addr", ln.Addr(), 833 "certPath", a.CLI.TLSCertPath, 834 "keyPath", a.CLI.TLSKeyPath, 835 ) 836 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 837 }) 838} 839 840func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 841 ctx, cancel := context.WithCancel(ctx) 842 handler = gziphandler.GzipHandler(handler) 843 server := http.Server{Handler: handler} 844 var serveErr error 845 go func() { 846 serveErr = serve(&server) 847 cancel() 848 }() 849 <-ctx.Done() 850 if serveErr != nil { 851 return fmt.Errorf("error in http server: %w", serveErr) 852 } 853 854 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 855 defer cancel() 856 return server.Shutdown(ctx) 857} 858 859func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 860 return func(w http.ResponseWriter, req *http.Request) { 861 w.WriteHeader(200) 862 } 863} 864 865func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 866 a.limitersMu.Lock() 867 defer a.limitersMu.Unlock() 868 869 limiter, exists := a.limiters[ip] 870 if !exists { 871 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 872 a.limiters[ip] = limiter 873 } 874 875 return limiter 876} 877 878func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 879 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 880 user := params.ByName("user") 881 file := params.ByName("file") 882 if user == "" || file == "" { 883 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 884 return 885 } 886 user, err := a.NormalizeUser(ctx, user) 887 if err != nil { 888 apierrors.WriteHTTPNotFound(w, "user not found", err) 889 return 890 } 891 fPath := []string{user, "clips", file} 892 exists, err := a.CLI.DataFileExists(fPath) 893 if err != nil { 894 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 895 return 896 } 897 if !exists { 898 apierrors.WriteHTTPNotFound(w, "file not found", nil) 899 return 900 } 901 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 902 if err != nil { 903 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 904 return 905 } 906 defer fd.Close() 907 w.Header().Set("Content-Type", "video/mp4") 908 if _, err := io.Copy(w, fd); err != nil { 909 log.Error(ctx, "error writing response", "error", err) 910 } 911 } 912} 913 914func NewWebsocketTracker(maxConns int) *WebsocketTracker { 915 return &WebsocketTracker{ 916 connections: make(map[string]int), 917 maxConnsPerIP: maxConns, 918 } 919} 920 921func (t *WebsocketTracker) AddConnection(ip string) bool { 922 t.mu.Lock() 923 defer t.mu.Unlock() 924 925 count := t.connections[ip] 926 927 if count >= t.maxConnsPerIP { 928 return false 929 } 930 931 t.connections[ip] = count + 1 932 return true 933} 934 935func (t *WebsocketTracker) RemoveConnection(ip string) { 936 t.mu.Lock() 937 defer t.mu.Unlock() 938 939 count := t.connections[ip] 940 if count > 0 { 941 t.connections[ip] = count - 1 942 } 943 944 if t.connections[ip] == 0 { 945 delete(t.connections, ip) 946 } 947}