Live video on the AT Protocol
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/google/uuid" 23 "github.com/julienschmidt/httprouter" 24 "github.com/rs/cors" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/time/rate" 27 28 "github.com/streamplace/oatproxy/pkg/oatproxy" 29 "stream.place/streamplace/js/app" 30 "stream.place/streamplace/pkg/atproto" 31 "stream.place/streamplace/pkg/bus" 32 "stream.place/streamplace/pkg/config" 33 "stream.place/streamplace/pkg/crypto/signers/eip712" 34 "stream.place/streamplace/pkg/director" 35 apierrors "stream.place/streamplace/pkg/errors" 36 "stream.place/streamplace/pkg/linking" 37 "stream.place/streamplace/pkg/log" 38 "stream.place/streamplace/pkg/media" 39 "stream.place/streamplace/pkg/mist/mistconfig" 40 "stream.place/streamplace/pkg/model" 41 "stream.place/streamplace/pkg/notifications" 42 "stream.place/streamplace/pkg/spmetrics" 43 "stream.place/streamplace/pkg/spxrpc" 44 "stream.place/streamplace/pkg/streamplace" 45 46 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 47 "github.com/slok/go-http-metrics/middleware" 48 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 49 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 50 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 51) 52 53type StreamplaceAPI struct { 54 CLI *config.CLI 55 Model model.Model 56 Updater *Updater 57 Signer *eip712.EIP712Signer 58 Mimes map[string]string 59 FirebaseNotifier notifications.FirebaseNotifier 60 MediaManager *media.MediaManager 61 MediaSigner media.MediaSigner 62 // not thread-safe yet 63 Aliases map[string]string 64 Bus *bus.Bus 65 ATSync *atproto.ATProtoSynchronizer 66 Director *director.Director 67 68 connTracker *WebsocketTracker 69 70 limiters map[string]*rate.Limiter 71 limitersMu sync.Mutex 72 SignerCache map[string]media.MediaSigner 73 SignerCacheMu sync.Mutex 74 op *oatproxy.OATProxy 75} 76 77type WebsocketTracker struct { 78 connections map[string]int 79 maxConnsPerIP int 80 mu sync.RWMutex 81} 82 83func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 84 updater, err := PrepareUpdater(cli) 85 if err != nil { 86 return nil, err 87 } 88 a := &StreamplaceAPI{CLI: cli, 89 Model: mod, 90 Updater: updater, 91 Signer: signer, 92 FirebaseNotifier: noter, 93 MediaManager: mm, 94 MediaSigner: ms, 95 Aliases: map[string]string{}, 96 Bus: bus, 97 ATSync: atsync, 98 Director: d, 99 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 100 limiters: make(map[string]*rate.Limiter), 101 SignerCache: make(map[string]media.MediaSigner), 102 op: op, 103 } 104 a.Mimes, err = updater.GetMimes() 105 if err != nil { 106 return nil, err 107 } 108 return a, nil 109} 110 111type AppHostingFS struct { 112 http.FileSystem 113} 114 115var ErrorIndex = errors.New("not found, use index.html") 116 117func (fs AppHostingFS) Open(name string) (http.File, error) { 118 file, err1 := fs.FileSystem.Open(name) 119 if err1 == nil { 120 return file, nil 121 } 122 return nil, ErrorIndex 123} 124 125// api/playback/iame.li/webrtc?rendition=source 126// api/playback/iame.li/stream.mp4?rendition=source 127// api/playback/iame.li/stream.webm?rendition=source 128// api/playback/iame.li/hls/index.m3u8 129// api/playback/iame.li/hls/source/stream.m3u8 130// api/playback/iame.li/hls/source/000000000000.ts 131 132func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 133 134 mdlw := middleware.New(middleware.Config{ 135 Recorder: metrics.NewRecorder(metrics.Config{}), 136 }) 137 var xrpc http.Handler 138 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw) 139 if err != nil { 140 return nil, err 141 } 142 router := httprouter.New() 143 144 // Create our middleware factory with the default settings. 145 146 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 147 148 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 149 150 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 151 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 152 } 153 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 154 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 155 } 156 157 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 158 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 159 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 160 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 161 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 162 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 163 apiRouter := httprouter.New() 164 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 165 // old clients 166 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 167 // new ones 168 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 169 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 170 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 171 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 172 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 173 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 174 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 175 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 176 // they're jpegs now 177 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 178 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 179 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 180 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 181 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 182 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 183 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 184 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 185 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 186 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 187 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 188 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 189 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 190 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 191 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 192 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 193 apiRouter.NotFound = a.HandleAPI404(ctx) 194 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 195 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 196 router.Handler("GET", "/api/*resource", apiRouterHandler) 197 router.Handler("POST", "/api/*resource", apiRouterHandler) 198 router.Handler("PUT", "/api/*resource", apiRouterHandler) 199 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 200 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 201 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 202 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 203 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 204 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 205 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 206 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 207 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 208 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 209 router.POST("/", a.HandleWebRTCIngest(ctx)) 210 for _, redirect := range a.CLI.Redirects { 211 parts := strings.Split(redirect, ":") 212 if len(parts) != 2 { 213 log.Error(ctx, "invalid redirect", "redirect", redirect) 214 return nil, fmt.Errorf("invalid redirect: %s", redirect) 215 } 216 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 217 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 218 }) 219 } 220 if a.CLI.FrontendProxy != "" { 221 u, err := url.Parse(a.CLI.FrontendProxy) 222 if err != nil { 223 return nil, err 224 } 225 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 226 router.NotFound = &httputil.ReverseProxy{ 227 Rewrite: func(r *httputil.ProxyRequest) { 228 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 229 // we need to use 127.0.0.1 because the atproto oauth client requires it 230 r.Out.Header.Set("Origin", u.String()) 231 r.SetURL(u) 232 }, 233 } 234 } else { 235 files, err := app.Files() 236 if err != nil { 237 return nil, err 238 } 239 index, err := files.Open("index.html") 240 if err != nil { 241 return nil, err 242 } 243 bs, err := io.ReadAll(index) 244 if err != nil { 245 return nil, err 246 } 247 linker, err := linking.NewLinker(ctx, bs) 248 if err != nil { 249 return nil, err 250 } 251 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 252 if err != nil { 253 return nil, err 254 } 255 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 256 } 257 // needed because the WebRTC handler issues 405s from / otherwise 258 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 259 router.NotFound.ServeHTTP(w, r) 260 }) 261 handler := sloghttp.Recovery(router) 262 handler = cors.AllowAll().Handler(handler) 263 handler = sloghttp.New(slog.Default())(handler) 264 handler = a.RateLimitMiddleware(ctx)(handler) 265 266 // this needs to be LAST so nothing else clobbers the context 267 handler = a.ContextMiddleware(ctx)(handler) 268 269 return handler, nil 270} 271func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler { 272 return func(next http.Handler) http.Handler { 273 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 274 uuid := uuid.New().String() 275 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 276 r = r.WithContext(ctx) 277 next.ServeHTTP(w, r) 278 }) 279 } 280} 281func copyHeader(dst, src http.Header) { 282 for k, vv := range src { 283 // we'll handle CORS ourselves, thanks 284 if strings.HasPrefix(k, "Access-Control") { 285 continue 286 } 287 for _, v := range vv { 288 dst.Add(k, v) 289 } 290 } 291} 292 293// handler that takes care of static files and otherwise returns the index.html with the correct link card data 294func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 295 files, err := app.Files() 296 if err != nil { 297 return nil, err 298 } 299 fs := AppHostingFS{http.FS(files)} 300 301 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 302 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 303 f := strings.TrimPrefix(req.URL.Path, "/") 304 // under docs we need the index.html suffix due to astro rendering 305 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 306 f += "index.html" 307 } 308 _, err := fs.Open(f) 309 if err == nil { 310 fileHandler.ServeHTTP(w, req) 311 return 312 } 313 if errors.Is(err, ErrorIndex) || f == "" { 314 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 315 if err != nil { 316 log.Error(ctx, "error generating default card", "error", err) 317 } 318 w.Header().Set("Content-Type", "text/html") 319 if _, err := w.Write(bs); err != nil { 320 log.Error(ctx, "error writing response", "error", err) 321 } 322 } else { 323 log.Warn(ctx, "error opening file", "error", err) 324 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 325 } 326 }) 327 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 328 proto := "http" 329 if req.TLS != nil { 330 proto = "https" 331 } 332 fwProto := req.Header.Get("x-forwarded-proto") 333 if fwProto != "" { 334 proto = fwProto 335 } 336 req.URL.Host = req.Host 337 req.URL.Scheme = proto 338 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 339 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 340 if err != nil || repo == nil { 341 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 342 defaultHandler.ServeHTTP(w, req) 343 return 344 } 345 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 346 if err != nil || ls == nil { 347 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 348 defaultHandler.ServeHTTP(w, req) 349 return 350 } 351 lsv, err := ls.ToLivestreamView() 352 if err != nil || lsv == nil { 353 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 354 defaultHandler.ServeHTTP(w, req) 355 return 356 } 357 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 358 if err != nil { 359 log.Error(ctx, "error generating html", "error", err) 360 defaultHandler.ServeHTTP(w, req) 361 return 362 } 363 w.Header().Set("Content-Type", "text/html") 364 if _, err := w.Write(bs); err != nil { 365 log.Error(ctx, "error writing response", "error", err) 366 } 367 }), nil 368} 369 370func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 371 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 372 if !a.CLI.HasMist() { 373 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 374 return 375 } 376 stream := params.ByName("stream") 377 if stream == "" { 378 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 379 return 380 } 381 382 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 383 prefix := fmt.Sprintf(tmpl, fullstream) 384 resource := params.ByName("resource") 385 386 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 387 388 client := &http.Client{} 389 req.URL = &url.URL{ 390 Scheme: "http", 391 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 392 Path: fmt.Sprintf("%s%s", prefix, resource), 393 RawQuery: req.URL.RawQuery, 394 } 395 396 //http: Request.RequestURI can't be set in client requests. 397 //http://golang.org/src/pkg/net/http/client.go 398 req.RequestURI = "" 399 400 resp, err := client.Do(req) 401 if err != nil { 402 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 403 return 404 } 405 defer resp.Body.Close() 406 407 copyHeader(w.Header(), resp.Header) 408 w.WriteHeader(resp.StatusCode) 409 if _, err := io.Copy(w, resp.Body); err != nil { 410 log.Error(ctx, "error writing response", "error", err) 411 } 412 } 413} 414 415func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 416 return func(w http.ResponseWriter, req *http.Request) { 417 noslash := req.URL.Path[1:] 418 ct, ok := a.Mimes[noslash] 419 if ok { 420 w.Header().Set("content-type", ct) 421 } 422 fs.ServeHTTP(w, req) 423 } 424} 425 426func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 427 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr) 428 if err != nil { 429 return nil, err 430 } 431 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 432 host, _, err := net.SplitHostPort(req.Host) 433 if err != nil { 434 host = req.Host 435 } 436 u := req.URL 437 if tlsPort == "443" { 438 u.Host = host 439 } else { 440 u.Host = net.JoinHostPort(host, tlsPort) 441 } 442 u.Scheme = "https" 443 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 444 } 445 mux := http.NewServeMux() 446 mux.HandleFunc("/", handleRedirect) 447 return mux, nil 448} 449 450type NotificationPayload struct { 451 Token string `json:"token"` 452 RepoDID string `json:"repoDID"` 453} 454 455func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 456 return func(w http.ResponseWriter, req *http.Request) { 457 w.WriteHeader(404) 458 } 459} 460 461func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 462 return func(w http.ResponseWriter, req *http.Request) { 463 payload, err := io.ReadAll(req.Body) 464 if err != nil { 465 log.Log(ctx, "error reading notification create", "error", err) 466 w.WriteHeader(400) 467 return 468 } 469 n := NotificationPayload{} 470 err = json.Unmarshal(payload, &n) 471 if err != nil { 472 log.Log(ctx, "error unmarshalling notification create", "error", err) 473 w.WriteHeader(400) 474 return 475 } 476 err = a.Model.CreateNotification(n.Token, n.RepoDID) 477 if err != nil { 478 log.Log(ctx, "error creating notification", "error", err) 479 w.WriteHeader(400) 480 return 481 } 482 log.Log(ctx, "successfully created notification", "token", n.Token) 483 w.WriteHeader(200) 484 if n.RepoDID != "" { 485 go func() { 486 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 487 if err != nil { 488 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 489 } 490 }() 491 } 492 } 493} 494 495func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 496 return func(w http.ResponseWriter, req *http.Request) { 497 err := a.MediaManager.ValidateMP4(ctx, req.Body) 498 if err != nil { 499 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 500 return 501 } 502 w.WriteHeader(200) 503 } 504} 505 506func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 507 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 508 var event model.PlayerEventAPI 509 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 510 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 511 return 512 } 513 err := a.Model.CreatePlayerEvent(event) 514 if err != nil { 515 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 516 return 517 } 518 w.WriteHeader(201) 519 } 520} 521 522func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 523 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 524 segs, err := a.Model.MostRecentSegments() 525 if err != nil { 526 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 527 return 528 } 529 bs, err := json.Marshal(segs) 530 if err != nil { 531 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 532 return 533 } 534 w.Header().Add("Content-Type", "application/json") 535 if _, err := w.Write(bs); err != nil { 536 log.Error(ctx, "error writing response", "error", err) 537 } 538 } 539} 540 541func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 542 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 543 user := params.ByName("repoDID") 544 if user == "" { 545 apierrors.WriteHTTPBadRequest(w, "user required", nil) 546 return 547 } 548 user, err := a.NormalizeUser(ctx, user) 549 if err != nil { 550 apierrors.WriteHTTPNotFound(w, "user not found", err) 551 return 552 } 553 seg, err := a.Model.LatestSegmentForUser(user) 554 if err != nil { 555 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 556 return 557 } 558 streamplaceSeg, err := seg.ToStreamplaceSegment() 559 if err != nil { 560 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 561 return 562 } 563 bs, err := json.Marshal(streamplaceSeg) 564 if err != nil { 565 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 566 return 567 } 568 w.Header().Add("Content-Type", "application/json") 569 if _, err := w.Write(bs); err != nil { 570 log.Error(ctx, "error writing response", "error", err) 571 } 572 } 573} 574 575func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 576 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 577 user := params.ByName("user") 578 if user == "" { 579 apierrors.WriteHTTPBadRequest(w, "user required", nil) 580 return 581 } 582 user, err := a.NormalizeUser(ctx, user) 583 if err != nil { 584 apierrors.WriteHTTPNotFound(w, "user not found", err) 585 return 586 } 587 count := spmetrics.GetViewCount(user) 588 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 589 if err != nil { 590 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 591 return 592 } 593 if _, err := w.Write(bs); err != nil { 594 log.Error(ctx, "error writing response", "error", err) 595 } 596 } 597} 598 599func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 600 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 601 log.Log(ctx, "got bluesky notification", "params", params) 602 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 603 if err != nil { 604 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 605 return 606 } 607 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 608 if err != nil { 609 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 610 return 611 } 612 bs, err := json.Marshal(signingKeys) 613 if err != nil { 614 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 615 return 616 } 617 if _, err := w.Write(bs); err != nil { 618 log.Error(ctx, "error writing response", "error", err) 619 } 620 } 621} 622 623type ChatResponse struct { 624 Post *bsky.FeedPost `json:"post"` 625 Repo *model.Repo `json:"repo"` 626 CID string `json:"cid"` 627} 628 629func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 630 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 631 user := params.ByName("repoDID") 632 if user == "" { 633 apierrors.WriteHTTPBadRequest(w, "user required", nil) 634 return 635 } 636 repoDID, err := a.NormalizeUser(ctx, user) 637 if err != nil { 638 apierrors.WriteHTTPNotFound(w, "user not found", err) 639 return 640 } 641 replies, err := a.Model.GetReplies(repoDID) 642 if err != nil { 643 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 644 return 645 } 646 bs, err := json.Marshal(replies) 647 if err != nil { 648 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 649 return 650 } 651 w.Header().Set("Content-Type", "application/json") 652 if _, err := w.Write(bs); err != nil { 653 log.Error(ctx, "error writing response", "error", err) 654 } 655 } 656} 657 658func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 659 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 660 user := params.ByName("repoDID") 661 if user == "" { 662 apierrors.WriteHTTPBadRequest(w, "user required", nil) 663 return 664 } 665 repoDID, err := a.NormalizeUser(ctx, user) 666 if err != nil { 667 apierrors.WriteHTTPNotFound(w, "user not found", err) 668 return 669 } 670 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 671 if err != nil { 672 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 673 return 674 } 675 676 doc, err := livestream.ToLivestreamView() 677 if err != nil { 678 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 679 return 680 } 681 682 if livestream == nil { 683 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 684 return 685 } 686 687 bs, err := json.Marshal(doc) 688 if err != nil { 689 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 690 return 691 } 692 w.Header().Set("Content-Type", "application/json") 693 if _, err := w.Write(bs); err != nil { 694 log.Error(ctx, "error writing response", "error", err) 695 } 696 } 697} 698 699func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 700 return func(next http.Handler) http.Handler { 701 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 702 ip, _, err := net.SplitHostPort(req.RemoteAddr) 703 if err != nil { 704 ip = req.RemoteAddr 705 } 706 707 if a.CLI.RateLimitPerSecond > 0 { 708 limiter := a.getLimiter(ip) 709 710 if !limiter.Allow() { 711 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 712 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 713 return 714 } 715 } 716 717 next.ServeHTTP(w, req) 718 }) 719 } 720} 721 722// helper for getting a listener from a systemd file descriptor 723func getListenerFromFD(fdName string) (net.Listener, error) { 724 log.Log(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 725 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 726 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 727 for i, name := range names { 728 if name == fdName { 729 f1 := os.NewFile(uintptr(i+3), fdName) 730 return net.FileListener(f1) 731 } 732 } 733 } 734 return nil, nil 735} 736 737func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 738 handler, err := a.Handler(ctx) 739 if err != nil { 740 return err 741 } 742 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 743 ln, err := getListenerFromFD("http") 744 if err != nil { 745 return err 746 } 747 if ln == nil { 748 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 749 if err != nil { 750 return err 751 } 752 } else { 753 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 754 } 755 log.Log(ctx, "http server starting", "addr", ln.Addr()) 756 return s.Serve(ln) 757 }) 758} 759 760func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 761 handler, err := a.RedirectHandler(ctx) 762 if err != nil { 763 return err 764 } 765 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 766 ln, err := getListenerFromFD("http") 767 if err != nil { 768 return err 769 } 770 if ln == nil { 771 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 772 if err != nil { 773 return err 774 } 775 } else { 776 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 777 } 778 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 779 return s.Serve(ln) 780 }) 781} 782 783func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 784 handler, err := a.Handler(ctx) 785 if err != nil { 786 return err 787 } 788 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 789 ln, err := getListenerFromFD("https") 790 if err != nil { 791 return err 792 } 793 if ln == nil { 794 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 795 if err != nil { 796 return err 797 } 798 } else { 799 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 800 } 801 log.Log(ctx, "https server starting", 802 "addr", ln.Addr(), 803 "certPath", a.CLI.TLSCertPath, 804 "keyPath", a.CLI.TLSKeyPath, 805 ) 806 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 807 }) 808} 809 810func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 811 ctx, cancel := context.WithCancel(ctx) 812 handler = gziphandler.GzipHandler(handler) 813 server := http.Server{Handler: handler} 814 var serveErr error 815 go func() { 816 serveErr = serve(&server) 817 cancel() 818 }() 819 <-ctx.Done() 820 if serveErr != nil { 821 return fmt.Errorf("error in http server: %w", serveErr) 822 } 823 824 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 825 defer cancel() 826 return server.Shutdown(ctx) 827} 828 829func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 830 return func(w http.ResponseWriter, req *http.Request) { 831 w.WriteHeader(200) 832 } 833} 834 835func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 836 a.limitersMu.Lock() 837 defer a.limitersMu.Unlock() 838 839 limiter, exists := a.limiters[ip] 840 if !exists { 841 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 842 a.limiters[ip] = limiter 843 } 844 845 return limiter 846} 847 848func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 849 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 850 user := params.ByName("user") 851 file := params.ByName("file") 852 if user == "" || file == "" { 853 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 854 return 855 } 856 user, err := a.NormalizeUser(ctx, user) 857 if err != nil { 858 apierrors.WriteHTTPNotFound(w, "user not found", err) 859 return 860 } 861 fPath := []string{user, "clips", file} 862 exists, err := a.CLI.DataFileExists(fPath) 863 if err != nil { 864 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 865 return 866 } 867 if !exists { 868 apierrors.WriteHTTPNotFound(w, "file not found", nil) 869 return 870 } 871 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 872 if err != nil { 873 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 874 return 875 } 876 defer fd.Close() 877 w.Header().Set("Content-Type", "video/mp4") 878 if _, err := io.Copy(w, fd); err != nil { 879 log.Error(ctx, "error writing response", "error", err) 880 } 881 } 882} 883 884func NewWebsocketTracker(maxConns int) *WebsocketTracker { 885 return &WebsocketTracker{ 886 connections: make(map[string]int), 887 maxConnsPerIP: maxConns, 888 } 889} 890 891func (t *WebsocketTracker) AddConnection(ip string) bool { 892 t.mu.Lock() 893 defer t.mu.Unlock() 894 895 count := t.connections[ip] 896 897 if count >= t.maxConnsPerIP { 898 return false 899 } 900 901 t.connections[ip] = count + 1 902 return true 903} 904 905func (t *WebsocketTracker) RemoveConnection(ip string) { 906 t.mu.Lock() 907 defer t.mu.Unlock() 908 909 count := t.connections[ip] 910 if count > 0 { 911 t.connections[ip] = count - 1 912 } 913 914 if t.connections[ip] == 0 { 915 delete(t.connections, ip) 916 } 917}