Live video on the AT Protocol
at natb/i18next-cli 941 lines 30 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/google/uuid" 23 "github.com/julienschmidt/httprouter" 24 "github.com/rs/cors" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/time/rate" 27 28 "github.com/streamplace/oatproxy/pkg/oatproxy" 29 "stream.place/streamplace/js/app" 30 "stream.place/streamplace/pkg/atproto" 31 "stream.place/streamplace/pkg/bus" 32 "stream.place/streamplace/pkg/config" 33 "stream.place/streamplace/pkg/crypto/signers/eip712" 34 "stream.place/streamplace/pkg/director" 35 apierrors "stream.place/streamplace/pkg/errors" 36 "stream.place/streamplace/pkg/linking" 37 "stream.place/streamplace/pkg/log" 38 "stream.place/streamplace/pkg/media" 39 "stream.place/streamplace/pkg/mist/mistconfig" 40 "stream.place/streamplace/pkg/model" 41 "stream.place/streamplace/pkg/notifications" 42 "stream.place/streamplace/pkg/spmetrics" 43 "stream.place/streamplace/pkg/spxrpc" 44 "stream.place/streamplace/pkg/statedb" 45 "stream.place/streamplace/pkg/streamplace" 46 47 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 48 "github.com/slok/go-http-metrics/middleware" 49 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 50 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 51 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 52) 53 54type StreamplaceAPI struct { 55 CLI *config.CLI 56 Model model.Model 57 StatefulDB *statedb.StatefulDB 58 Updater *Updater 59 Signer *eip712.EIP712Signer 60 Mimes map[string]string 61 FirebaseNotifier notifications.FirebaseNotifier 62 MediaManager *media.MediaManager 63 MediaSigner media.MediaSigner 64 // not thread-safe yet 65 Aliases map[string]string 66 Bus *bus.Bus 67 ATSync *atproto.ATProtoSynchronizer 68 Director *director.Director 69 70 connTracker *WebsocketTracker 71 72 limiters map[string]*rate.Limiter 73 limitersMu sync.Mutex 74 SignerCache map[string]media.MediaSigner 75 SignerCacheMu sync.Mutex 76 op *oatproxy.OATProxy 77 78 // override tls port for http redirect server if we're using systemd file descriptors 79 HTTPRedirectTLSPort *int 80 sessions map[string]map[string]time.Time 81 sessionsLock sync.RWMutex 82} 83 84type WebsocketTracker struct { 85 connections map[string]int 86 maxConnsPerIP int 87 mu sync.RWMutex 88} 89 90func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 91 updater, err := PrepareUpdater(cli) 92 if err != nil { 93 return nil, err 94 } 95 a := &StreamplaceAPI{CLI: cli, 96 Model: mod, 97 StatefulDB: statefulDB, 98 Updater: updater, 99 FirebaseNotifier: noter, 100 MediaManager: mm, 101 MediaSigner: ms, 102 Aliases: map[string]string{}, 103 Bus: bus, 104 ATSync: atsync, 105 Director: d, 106 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 107 limiters: make(map[string]*rate.Limiter), 108 SignerCache: make(map[string]media.MediaSigner), 109 op: op, 110 sessions: make(map[string]map[string]time.Time), 111 sessionsLock: sync.RWMutex{}, 112 } 113 a.Mimes, err = updater.GetMimes() 114 if err != nil { 115 return nil, err 116 } 117 return a, nil 118} 119 120type AppHostingFS struct { 121 http.FileSystem 122} 123 124var ErrorIndex = errors.New("not found, use index.html") 125 126func (fs AppHostingFS) Open(name string) (http.File, error) { 127 file, err1 := fs.FileSystem.Open(name) 128 if err1 == nil { 129 return file, nil 130 } 131 return nil, ErrorIndex 132} 133 134// api/playback/iame.li/webrtc?rendition=source 135// api/playback/iame.li/stream.mp4?rendition=source 136// api/playback/iame.li/stream.webm?rendition=source 137// api/playback/iame.li/hls/index.m3u8 138// api/playback/iame.li/hls/source/stream.m3u8 139// api/playback/iame.li/hls/source/000000000000.ts 140 141func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 142 143 mdlw := middleware.New(middleware.Config{ 144 Recorder: metrics.NewRecorder(metrics.Config{}), 145 }) 146 var xrpc http.Handler 147 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus) 148 if err != nil { 149 return nil, err 150 } 151 router := httprouter.New() 152 153 // Create our middleware factory with the default settings. 154 155 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 156 157 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 158 159 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 160 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 161 } 162 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 163 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 164 } 165 166 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 167 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 168 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 169 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 170 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 171 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 172 apiRouter := httprouter.New() 173 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 174 // old clients 175 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 176 // new ones 177 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 178 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 179 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 180 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 181 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 182 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 183 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 184 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 185 // they're jpegs now 186 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 187 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 188 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 189 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 190 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 191 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 192 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 193 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 194 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 195 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 196 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 197 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 198 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 199 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 200 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 201 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 202 apiRouter.NotFound = a.HandleAPI404(ctx) 203 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 204 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 205 router.Handler("GET", "/api/*resource", apiRouterHandler) 206 router.Handler("POST", "/api/*resource", apiRouterHandler) 207 router.Handler("PUT", "/api/*resource", apiRouterHandler) 208 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 209 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 210 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 211 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 212 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 213 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 214 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 215 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 216 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 217 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 218 router.POST("/", a.HandleWebRTCIngest(ctx)) 219 for _, redirect := range a.CLI.Redirects { 220 parts := strings.Split(redirect, ":") 221 if len(parts) != 2 { 222 log.Error(ctx, "invalid redirect", "redirect", redirect) 223 return nil, fmt.Errorf("invalid redirect: %s", redirect) 224 } 225 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 226 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 227 }) 228 } 229 if a.CLI.FrontendProxy != "" { 230 u, err := url.Parse(a.CLI.FrontendProxy) 231 if err != nil { 232 return nil, err 233 } 234 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 235 router.NotFound = &httputil.ReverseProxy{ 236 Rewrite: func(r *httputil.ProxyRequest) { 237 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 238 // we need to use 127.0.0.1 because the atproto oauth client requires it 239 r.Out.Header.Set("Origin", u.String()) 240 r.SetURL(u) 241 }, 242 } 243 } else { 244 files, err := app.Files() 245 if err != nil { 246 return nil, err 247 } 248 index, err := files.Open("index.html") 249 if err != nil { 250 return nil, err 251 } 252 bs, err := io.ReadAll(index) 253 if err != nil { 254 return nil, err 255 } 256 linker, err := linking.NewLinker(ctx, bs) 257 if err != nil { 258 return nil, err 259 } 260 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 261 if err != nil { 262 return nil, err 263 } 264 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 265 } 266 // needed because the WebRTC handler issues 405s from / otherwise 267 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 268 router.NotFound.ServeHTTP(w, r) 269 }) 270 handler := sloghttp.Recovery(router) 271 handler = cors.AllowAll().Handler(handler) 272 handler = sloghttp.New(slog.Default())(handler) 273 handler = a.RateLimitMiddleware(ctx)(handler) 274 275 // this needs to be LAST so nothing else clobbers the context 276 handler = a.ContextMiddleware(ctx)(handler) 277 278 return handler, nil 279} 280func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler { 281 return func(next http.Handler) http.Handler { 282 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 283 uuid := uuid.New().String() 284 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 285 r = r.WithContext(ctx) 286 next.ServeHTTP(w, r) 287 }) 288 } 289} 290func copyHeader(dst, src http.Header) { 291 for k, vv := range src { 292 // we'll handle CORS ourselves, thanks 293 if strings.HasPrefix(k, "Access-Control") { 294 continue 295 } 296 for _, v := range vv { 297 dst.Add(k, v) 298 } 299 } 300} 301 302// handler that takes care of static files and otherwise returns the index.html with the correct link card data 303func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 304 files, err := app.Files() 305 if err != nil { 306 return nil, err 307 } 308 fs := AppHostingFS{http.FS(files)} 309 310 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 311 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 312 f := strings.TrimPrefix(req.URL.Path, "/") 313 // under docs we need the index.html suffix due to astro rendering 314 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 315 f += "index.html" 316 } 317 _, err := fs.Open(f) 318 if err == nil { 319 fileHandler.ServeHTTP(w, req) 320 return 321 } 322 if errors.Is(err, ErrorIndex) || f == "" { 323 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN) 324 if err != nil { 325 log.Error(ctx, "error generating default card", "error", err) 326 } 327 w.Header().Set("Content-Type", "text/html") 328 if _, err := w.Write(bs); err != nil { 329 log.Error(ctx, "error writing response", "error", err) 330 } 331 } else { 332 log.Warn(ctx, "error opening file", "error", err) 333 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 334 } 335 }) 336 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 337 proto := "http" 338 if req.TLS != nil { 339 proto = "https" 340 } 341 fwProto := req.Header.Get("x-forwarded-proto") 342 if fwProto != "" { 343 proto = fwProto 344 } 345 req.URL.Host = req.Host 346 req.URL.Scheme = proto 347 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 348 // quick check for things that aren't valid handles/dids 349 if strings.ContainsAny(maybeHandle, "/_") { 350 defaultHandler.ServeHTTP(w, req) 351 return 352 } 353 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 354 if err != nil || repo == nil { 355 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 356 defaultHandler.ServeHTTP(w, req) 357 return 358 } 359 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 360 if err != nil || ls == nil { 361 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 362 defaultHandler.ServeHTTP(w, req) 363 return 364 } 365 lsv, err := ls.ToLivestreamView() 366 if err != nil || lsv == nil { 367 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 368 defaultHandler.ServeHTTP(w, req) 369 return 370 } 371 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN) 372 if err != nil { 373 log.Error(ctx, "error generating html", "error", err) 374 defaultHandler.ServeHTTP(w, req) 375 return 376 } 377 w.Header().Set("Content-Type", "text/html") 378 if _, err := w.Write(bs); err != nil { 379 log.Error(ctx, "error writing response", "error", err) 380 } 381 }), nil 382} 383 384func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 385 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 386 if !a.CLI.HasMist() { 387 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 388 return 389 } 390 stream := params.ByName("stream") 391 if stream == "" { 392 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 393 return 394 } 395 396 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 397 prefix := fmt.Sprintf(tmpl, fullstream) 398 resource := params.ByName("resource") 399 400 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 401 402 client := &http.Client{} 403 req.URL = &url.URL{ 404 Scheme: "http", 405 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 406 Path: fmt.Sprintf("%s%s", prefix, resource), 407 RawQuery: req.URL.RawQuery, 408 } 409 410 //http: Request.RequestURI can't be set in client requests. 411 //http://golang.org/src/pkg/net/http/client.go 412 req.RequestURI = "" 413 414 resp, err := client.Do(req) 415 if err != nil { 416 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 417 return 418 } 419 defer resp.Body.Close() 420 421 copyHeader(w.Header(), resp.Header) 422 w.WriteHeader(resp.StatusCode) 423 if _, err := io.Copy(w, resp.Body); err != nil { 424 log.Error(ctx, "error writing response", "error", err) 425 } 426 } 427} 428 429func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 430 return func(w http.ResponseWriter, req *http.Request) { 431 noslash := req.URL.Path[1:] 432 ct, ok := a.Mimes[noslash] 433 if ok { 434 w.Header().Set("content-type", ct) 435 } 436 fs.ServeHTTP(w, req) 437 } 438} 439 440func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 441 var tlsPort string 442 var err error 443 if a.HTTPRedirectTLSPort != nil { 444 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort) 445 } else { 446 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr) 447 if err != nil { 448 return nil, err 449 } 450 } 451 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 452 host, _, err := net.SplitHostPort(req.Host) 453 if err != nil { 454 host = req.Host 455 } 456 u := req.URL 457 if tlsPort == "443" { 458 u.Host = host 459 } else { 460 u.Host = net.JoinHostPort(host, tlsPort) 461 } 462 u.Scheme = "https" 463 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 464 } 465 mux := http.NewServeMux() 466 mux.HandleFunc("/", handleRedirect) 467 return mux, nil 468} 469 470type NotificationPayload struct { 471 Token string `json:"token"` 472 RepoDID string `json:"repoDID"` 473} 474 475func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 476 return func(w http.ResponseWriter, req *http.Request) { 477 w.WriteHeader(404) 478 } 479} 480 481func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 482 return func(w http.ResponseWriter, req *http.Request) { 483 payload, err := io.ReadAll(req.Body) 484 if err != nil { 485 log.Log(ctx, "error reading notification create", "error", err) 486 w.WriteHeader(400) 487 return 488 } 489 n := NotificationPayload{} 490 err = json.Unmarshal(payload, &n) 491 if err != nil { 492 log.Log(ctx, "error unmarshalling notification create", "error", err) 493 w.WriteHeader(400) 494 return 495 } 496 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 497 if err != nil { 498 log.Log(ctx, "error creating notification", "error", err) 499 w.WriteHeader(400) 500 return 501 } 502 log.Log(ctx, "successfully created notification", "token", n.Token) 503 w.WriteHeader(200) 504 if n.RepoDID != "" { 505 go func() { 506 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 507 if err != nil { 508 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 509 } 510 }() 511 } 512 } 513} 514 515func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 516 return func(w http.ResponseWriter, req *http.Request) { 517 err := a.MediaManager.ValidateMP4(ctx, req.Body, false) 518 if err != nil { 519 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 520 return 521 } 522 w.WriteHeader(200) 523 } 524} 525 526func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 527 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 528 var event model.PlayerEventAPI 529 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 530 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 531 return 532 } 533 err := a.Model.CreatePlayerEvent(event) 534 if err != nil { 535 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 536 return 537 } 538 w.WriteHeader(201) 539 } 540} 541 542func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 543 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 544 segs, err := a.Model.MostRecentSegments() 545 if err != nil { 546 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 547 return 548 } 549 bs, err := json.Marshal(segs) 550 if err != nil { 551 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 552 return 553 } 554 w.Header().Add("Content-Type", "application/json") 555 if _, err := w.Write(bs); err != nil { 556 log.Error(ctx, "error writing response", "error", err) 557 } 558 } 559} 560 561func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 562 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 563 user := params.ByName("repoDID") 564 if user == "" { 565 apierrors.WriteHTTPBadRequest(w, "user required", nil) 566 return 567 } 568 user, err := a.NormalizeUser(ctx, user) 569 if err != nil { 570 apierrors.WriteHTTPNotFound(w, "user not found", err) 571 return 572 } 573 seg, err := a.Model.LatestSegmentForUser(user) 574 if err != nil { 575 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 576 return 577 } 578 streamplaceSeg, err := seg.ToStreamplaceSegment() 579 if err != nil { 580 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 581 return 582 } 583 bs, err := json.Marshal(streamplaceSeg) 584 if err != nil { 585 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 586 return 587 } 588 w.Header().Add("Content-Type", "application/json") 589 if _, err := w.Write(bs); err != nil { 590 log.Error(ctx, "error writing response", "error", err) 591 } 592 } 593} 594 595func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 596 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 597 user := params.ByName("user") 598 if user == "" { 599 apierrors.WriteHTTPBadRequest(w, "user required", nil) 600 return 601 } 602 user, err := a.NormalizeUser(ctx, user) 603 if err != nil { 604 apierrors.WriteHTTPNotFound(w, "user not found", err) 605 return 606 } 607 count := spmetrics.GetViewCount(user) 608 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 609 if err != nil { 610 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 611 return 612 } 613 if _, err := w.Write(bs); err != nil { 614 log.Error(ctx, "error writing response", "error", err) 615 } 616 } 617} 618 619func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 620 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 621 log.Log(ctx, "got bluesky notification", "params", params) 622 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 623 if err != nil { 624 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 625 return 626 } 627 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 628 if err != nil { 629 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 630 return 631 } 632 bs, err := json.Marshal(signingKeys) 633 if err != nil { 634 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 635 return 636 } 637 if _, err := w.Write(bs); err != nil { 638 log.Error(ctx, "error writing response", "error", err) 639 } 640 } 641} 642 643type ChatResponse struct { 644 Post *bsky.FeedPost `json:"post"` 645 Repo *model.Repo `json:"repo"` 646 CID string `json:"cid"` 647} 648 649func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 650 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 651 user := params.ByName("repoDID") 652 if user == "" { 653 apierrors.WriteHTTPBadRequest(w, "user required", nil) 654 return 655 } 656 repoDID, err := a.NormalizeUser(ctx, user) 657 if err != nil { 658 apierrors.WriteHTTPNotFound(w, "user not found", err) 659 return 660 } 661 replies, err := a.Model.GetReplies(repoDID) 662 if err != nil { 663 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 664 return 665 } 666 bs, err := json.Marshal(replies) 667 if err != nil { 668 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 669 return 670 } 671 w.Header().Set("Content-Type", "application/json") 672 if _, err := w.Write(bs); err != nil { 673 log.Error(ctx, "error writing response", "error", err) 674 } 675 } 676} 677 678func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 679 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 680 user := params.ByName("repoDID") 681 if user == "" { 682 apierrors.WriteHTTPBadRequest(w, "user required", nil) 683 return 684 } 685 repoDID, err := a.NormalizeUser(ctx, user) 686 if err != nil { 687 apierrors.WriteHTTPNotFound(w, "user not found", err) 688 return 689 } 690 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 691 if err != nil { 692 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 693 return 694 } 695 696 doc, err := livestream.ToLivestreamView() 697 if err != nil { 698 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 699 return 700 } 701 702 if livestream == nil { 703 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 704 return 705 } 706 707 bs, err := json.Marshal(doc) 708 if err != nil { 709 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 710 return 711 } 712 w.Header().Set("Content-Type", "application/json") 713 if _, err := w.Write(bs); err != nil { 714 log.Error(ctx, "error writing response", "error", err) 715 } 716 } 717} 718 719func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 720 return func(next http.Handler) http.Handler { 721 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 722 ip, _, err := net.SplitHostPort(req.RemoteAddr) 723 if err != nil { 724 ip = req.RemoteAddr 725 } 726 727 if a.CLI.RateLimitPerSecond > 0 { 728 limiter := a.getLimiter(ip) 729 730 if !limiter.Allow() { 731 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 732 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 733 return 734 } 735 } 736 737 next.ServeHTTP(w, req) 738 }) 739 } 740} 741 742// helper for getting a listener from a systemd file descriptor 743func getListenerFromFD(fdName string) (net.Listener, error) { 744 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 745 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 746 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 747 for i, name := range names { 748 if name == fdName { 749 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3) 750 f1 := os.NewFile(uintptr(i+3), fdName) 751 return net.FileListener(f1) 752 } 753 } 754 } 755 return nil, nil 756} 757 758func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 759 handler, err := a.Handler(ctx) 760 if err != nil { 761 return err 762 } 763 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 764 ln, err := getListenerFromFD("http") 765 if err != nil { 766 return err 767 } 768 if ln == nil { 769 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 770 if err != nil { 771 return err 772 } 773 } else { 774 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 775 } 776 log.Log(ctx, "http server starting", "addr", ln.Addr()) 777 return s.Serve(ln) 778 }) 779} 780 781func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 782 handler, err := a.RedirectHandler(ctx) 783 if err != nil { 784 return err 785 } 786 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 787 ln, err := getListenerFromFD("http") 788 if err != nil { 789 return err 790 } 791 if ln == nil { 792 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 793 if err != nil { 794 return err 795 } 796 } else { 797 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 798 } 799 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 800 return s.Serve(ln) 801 }) 802} 803 804func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 805 handler, err := a.Handler(ctx) 806 if err != nil { 807 return err 808 } 809 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 810 ln, err := getListenerFromFD("https") 811 if err != nil { 812 return err 813 } 814 if ln == nil { 815 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 816 if err != nil { 817 return err 818 } 819 } else { 820 // tell the redirect handler we're using systemd and they should go to 443 821 port443 := 443 822 a.HTTPRedirectTLSPort = &port443 823 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 824 } 825 log.Log(ctx, "https server starting", 826 "addr", ln.Addr(), 827 "certPath", a.CLI.TLSCertPath, 828 "keyPath", a.CLI.TLSKeyPath, 829 ) 830 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 831 }) 832} 833 834func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 835 ctx, cancel := context.WithCancel(ctx) 836 handler = gziphandler.GzipHandler(handler) 837 server := http.Server{Handler: handler} 838 var serveErr error 839 go func() { 840 serveErr = serve(&server) 841 cancel() 842 }() 843 <-ctx.Done() 844 if serveErr != nil { 845 return fmt.Errorf("error in http server: %w", serveErr) 846 } 847 848 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 849 defer cancel() 850 return server.Shutdown(ctx) 851} 852 853func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 854 return func(w http.ResponseWriter, req *http.Request) { 855 w.WriteHeader(200) 856 } 857} 858 859func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 860 a.limitersMu.Lock() 861 defer a.limitersMu.Unlock() 862 863 limiter, exists := a.limiters[ip] 864 if !exists { 865 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 866 a.limiters[ip] = limiter 867 } 868 869 return limiter 870} 871 872func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 873 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 874 user := params.ByName("user") 875 file := params.ByName("file") 876 if user == "" || file == "" { 877 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 878 return 879 } 880 user, err := a.NormalizeUser(ctx, user) 881 if err != nil { 882 apierrors.WriteHTTPNotFound(w, "user not found", err) 883 return 884 } 885 fPath := []string{user, "clips", file} 886 exists, err := a.CLI.DataFileExists(fPath) 887 if err != nil { 888 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 889 return 890 } 891 if !exists { 892 apierrors.WriteHTTPNotFound(w, "file not found", nil) 893 return 894 } 895 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 896 if err != nil { 897 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 898 return 899 } 900 defer fd.Close() 901 w.Header().Set("Content-Type", "video/mp4") 902 if _, err := io.Copy(w, fd); err != nil { 903 log.Error(ctx, "error writing response", "error", err) 904 } 905 } 906} 907 908func NewWebsocketTracker(maxConns int) *WebsocketTracker { 909 return &WebsocketTracker{ 910 connections: make(map[string]int), 911 maxConnsPerIP: maxConns, 912 } 913} 914 915func (t *WebsocketTracker) AddConnection(ip string) bool { 916 t.mu.Lock() 917 defer t.mu.Unlock() 918 919 count := t.connections[ip] 920 921 if count >= t.maxConnsPerIP { 922 return false 923 } 924 925 t.connections[ip] = count + 1 926 return true 927} 928 929func (t *WebsocketTracker) RemoveConnection(ip string) { 930 t.mu.Lock() 931 defer t.mu.Unlock() 932 933 count := t.connections[ip] 934 if count > 0 { 935 t.connections[ip] = count - 1 936 } 937 938 if t.connections[ip] == 0 { 939 delete(t.connections, ip) 940 } 941}