Live video on the AT Protocol
at v0.7.26 920 lines 29 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/google/uuid" 23 "github.com/julienschmidt/httprouter" 24 "github.com/rs/cors" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/time/rate" 27 28 "github.com/streamplace/oatproxy/pkg/oatproxy" 29 "stream.place/streamplace/js/app" 30 "stream.place/streamplace/pkg/atproto" 31 "stream.place/streamplace/pkg/bus" 32 "stream.place/streamplace/pkg/config" 33 "stream.place/streamplace/pkg/crypto/signers/eip712" 34 "stream.place/streamplace/pkg/director" 35 apierrors "stream.place/streamplace/pkg/errors" 36 "stream.place/streamplace/pkg/linking" 37 "stream.place/streamplace/pkg/log" 38 "stream.place/streamplace/pkg/media" 39 "stream.place/streamplace/pkg/mist/mistconfig" 40 "stream.place/streamplace/pkg/model" 41 "stream.place/streamplace/pkg/notifications" 42 "stream.place/streamplace/pkg/spmetrics" 43 "stream.place/streamplace/pkg/spxrpc" 44 "stream.place/streamplace/pkg/statedb" 45 "stream.place/streamplace/pkg/streamplace" 46 47 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 48 "github.com/slok/go-http-metrics/middleware" 49 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 50 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 51 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 52) 53 54type StreamplaceAPI struct { 55 CLI *config.CLI 56 Model model.Model 57 StatefulDB *statedb.StatefulDB 58 Updater *Updater 59 Signer *eip712.EIP712Signer 60 Mimes map[string]string 61 FirebaseNotifier notifications.FirebaseNotifier 62 MediaManager *media.MediaManager 63 MediaSigner media.MediaSigner 64 // not thread-safe yet 65 Aliases map[string]string 66 Bus *bus.Bus 67 ATSync *atproto.ATProtoSynchronizer 68 Director *director.Director 69 70 connTracker *WebsocketTracker 71 72 limiters map[string]*rate.Limiter 73 limitersMu sync.Mutex 74 SignerCache map[string]media.MediaSigner 75 SignerCacheMu sync.Mutex 76 op *oatproxy.OATProxy 77} 78 79type WebsocketTracker struct { 80 connections map[string]int 81 maxConnsPerIP int 82 mu sync.RWMutex 83} 84 85func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 86 updater, err := PrepareUpdater(cli) 87 if err != nil { 88 return nil, err 89 } 90 a := &StreamplaceAPI{CLI: cli, 91 Model: mod, 92 StatefulDB: statefulDB, 93 Updater: updater, 94 Signer: signer, 95 FirebaseNotifier: noter, 96 MediaManager: mm, 97 MediaSigner: ms, 98 Aliases: map[string]string{}, 99 Bus: bus, 100 ATSync: atsync, 101 Director: d, 102 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 103 limiters: make(map[string]*rate.Limiter), 104 SignerCache: make(map[string]media.MediaSigner), 105 op: op, 106 } 107 a.Mimes, err = updater.GetMimes() 108 if err != nil { 109 return nil, err 110 } 111 return a, nil 112} 113 114type AppHostingFS struct { 115 http.FileSystem 116} 117 118var ErrorIndex = errors.New("not found, use index.html") 119 120func (fs AppHostingFS) Open(name string) (http.File, error) { 121 file, err1 := fs.FileSystem.Open(name) 122 if err1 == nil { 123 return file, nil 124 } 125 return nil, ErrorIndex 126} 127 128// api/playback/iame.li/webrtc?rendition=source 129// api/playback/iame.li/stream.mp4?rendition=source 130// api/playback/iame.li/stream.webm?rendition=source 131// api/playback/iame.li/hls/index.m3u8 132// api/playback/iame.li/hls/source/stream.m3u8 133// api/playback/iame.li/hls/source/000000000000.ts 134 135func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 136 137 mdlw := middleware.New(middleware.Config{ 138 Recorder: metrics.NewRecorder(metrics.Config{}), 139 }) 140 var xrpc http.Handler 141 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync) 142 if err != nil { 143 return nil, err 144 } 145 router := httprouter.New() 146 147 // Create our middleware factory with the default settings. 148 149 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 150 151 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 152 153 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 154 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 155 } 156 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 157 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 158 } 159 160 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 161 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 162 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 163 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 164 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 165 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 166 apiRouter := httprouter.New() 167 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 168 // old clients 169 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 170 // new ones 171 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 172 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 173 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 174 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 175 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 176 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 177 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 178 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 179 // they're jpegs now 180 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 181 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 182 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 183 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 184 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 185 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 186 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 187 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 188 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 189 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 190 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 191 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 192 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 193 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 194 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 195 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 196 apiRouter.NotFound = a.HandleAPI404(ctx) 197 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 198 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 199 router.Handler("GET", "/api/*resource", apiRouterHandler) 200 router.Handler("POST", "/api/*resource", apiRouterHandler) 201 router.Handler("PUT", "/api/*resource", apiRouterHandler) 202 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 203 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 204 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 205 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 206 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 207 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 208 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 209 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 210 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 211 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 212 router.POST("/", a.HandleWebRTCIngest(ctx)) 213 for _, redirect := range a.CLI.Redirects { 214 parts := strings.Split(redirect, ":") 215 if len(parts) != 2 { 216 log.Error(ctx, "invalid redirect", "redirect", redirect) 217 return nil, fmt.Errorf("invalid redirect: %s", redirect) 218 } 219 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 220 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 221 }) 222 } 223 if a.CLI.FrontendProxy != "" { 224 u, err := url.Parse(a.CLI.FrontendProxy) 225 if err != nil { 226 return nil, err 227 } 228 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 229 router.NotFound = &httputil.ReverseProxy{ 230 Rewrite: func(r *httputil.ProxyRequest) { 231 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 232 // we need to use 127.0.0.1 because the atproto oauth client requires it 233 r.Out.Header.Set("Origin", u.String()) 234 r.SetURL(u) 235 }, 236 } 237 } else { 238 files, err := app.Files() 239 if err != nil { 240 return nil, err 241 } 242 index, err := files.Open("index.html") 243 if err != nil { 244 return nil, err 245 } 246 bs, err := io.ReadAll(index) 247 if err != nil { 248 return nil, err 249 } 250 linker, err := linking.NewLinker(ctx, bs) 251 if err != nil { 252 return nil, err 253 } 254 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 255 if err != nil { 256 return nil, err 257 } 258 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 259 } 260 // needed because the WebRTC handler issues 405s from / otherwise 261 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 262 router.NotFound.ServeHTTP(w, r) 263 }) 264 handler := sloghttp.Recovery(router) 265 handler = cors.AllowAll().Handler(handler) 266 handler = sloghttp.New(slog.Default())(handler) 267 handler = a.RateLimitMiddleware(ctx)(handler) 268 269 // this needs to be LAST so nothing else clobbers the context 270 handler = a.ContextMiddleware(ctx)(handler) 271 272 return handler, nil 273} 274func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler { 275 return func(next http.Handler) http.Handler { 276 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 277 uuid := uuid.New().String() 278 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 279 r = r.WithContext(ctx) 280 next.ServeHTTP(w, r) 281 }) 282 } 283} 284func copyHeader(dst, src http.Header) { 285 for k, vv := range src { 286 // we'll handle CORS ourselves, thanks 287 if strings.HasPrefix(k, "Access-Control") { 288 continue 289 } 290 for _, v := range vv { 291 dst.Add(k, v) 292 } 293 } 294} 295 296// handler that takes care of static files and otherwise returns the index.html with the correct link card data 297func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 298 files, err := app.Files() 299 if err != nil { 300 return nil, err 301 } 302 fs := AppHostingFS{http.FS(files)} 303 304 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 305 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 306 f := strings.TrimPrefix(req.URL.Path, "/") 307 // under docs we need the index.html suffix due to astro rendering 308 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 309 f += "index.html" 310 } 311 _, err := fs.Open(f) 312 if err == nil { 313 fileHandler.ServeHTTP(w, req) 314 return 315 } 316 if errors.Is(err, ErrorIndex) || f == "" { 317 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 318 if err != nil { 319 log.Error(ctx, "error generating default card", "error", err) 320 } 321 w.Header().Set("Content-Type", "text/html") 322 if _, err := w.Write(bs); err != nil { 323 log.Error(ctx, "error writing response", "error", err) 324 } 325 } else { 326 log.Warn(ctx, "error opening file", "error", err) 327 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 328 } 329 }) 330 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 331 proto := "http" 332 if req.TLS != nil { 333 proto = "https" 334 } 335 fwProto := req.Header.Get("x-forwarded-proto") 336 if fwProto != "" { 337 proto = fwProto 338 } 339 req.URL.Host = req.Host 340 req.URL.Scheme = proto 341 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 342 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 343 if err != nil || repo == nil { 344 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 345 defaultHandler.ServeHTTP(w, req) 346 return 347 } 348 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 349 if err != nil || ls == nil { 350 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 351 defaultHandler.ServeHTTP(w, req) 352 return 353 } 354 lsv, err := ls.ToLivestreamView() 355 if err != nil || lsv == nil { 356 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 357 defaultHandler.ServeHTTP(w, req) 358 return 359 } 360 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 361 if err != nil { 362 log.Error(ctx, "error generating html", "error", err) 363 defaultHandler.ServeHTTP(w, req) 364 return 365 } 366 w.Header().Set("Content-Type", "text/html") 367 if _, err := w.Write(bs); err != nil { 368 log.Error(ctx, "error writing response", "error", err) 369 } 370 }), nil 371} 372 373func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 374 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 375 if !a.CLI.HasMist() { 376 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 377 return 378 } 379 stream := params.ByName("stream") 380 if stream == "" { 381 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 382 return 383 } 384 385 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 386 prefix := fmt.Sprintf(tmpl, fullstream) 387 resource := params.ByName("resource") 388 389 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 390 391 client := &http.Client{} 392 req.URL = &url.URL{ 393 Scheme: "http", 394 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 395 Path: fmt.Sprintf("%s%s", prefix, resource), 396 RawQuery: req.URL.RawQuery, 397 } 398 399 //http: Request.RequestURI can't be set in client requests. 400 //http://golang.org/src/pkg/net/http/client.go 401 req.RequestURI = "" 402 403 resp, err := client.Do(req) 404 if err != nil { 405 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 406 return 407 } 408 defer resp.Body.Close() 409 410 copyHeader(w.Header(), resp.Header) 411 w.WriteHeader(resp.StatusCode) 412 if _, err := io.Copy(w, resp.Body); err != nil { 413 log.Error(ctx, "error writing response", "error", err) 414 } 415 } 416} 417 418func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 419 return func(w http.ResponseWriter, req *http.Request) { 420 noslash := req.URL.Path[1:] 421 ct, ok := a.Mimes[noslash] 422 if ok { 423 w.Header().Set("content-type", ct) 424 } 425 fs.ServeHTTP(w, req) 426 } 427} 428 429func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 430 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr) 431 if err != nil { 432 return nil, err 433 } 434 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 435 host, _, err := net.SplitHostPort(req.Host) 436 if err != nil { 437 host = req.Host 438 } 439 u := req.URL 440 if tlsPort == "443" { 441 u.Host = host 442 } else { 443 u.Host = net.JoinHostPort(host, tlsPort) 444 } 445 u.Scheme = "https" 446 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 447 } 448 mux := http.NewServeMux() 449 mux.HandleFunc("/", handleRedirect) 450 return mux, nil 451} 452 453type NotificationPayload struct { 454 Token string `json:"token"` 455 RepoDID string `json:"repoDID"` 456} 457 458func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 459 return func(w http.ResponseWriter, req *http.Request) { 460 w.WriteHeader(404) 461 } 462} 463 464func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 465 return func(w http.ResponseWriter, req *http.Request) { 466 payload, err := io.ReadAll(req.Body) 467 if err != nil { 468 log.Log(ctx, "error reading notification create", "error", err) 469 w.WriteHeader(400) 470 return 471 } 472 n := NotificationPayload{} 473 err = json.Unmarshal(payload, &n) 474 if err != nil { 475 log.Log(ctx, "error unmarshalling notification create", "error", err) 476 w.WriteHeader(400) 477 return 478 } 479 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 480 if err != nil { 481 log.Log(ctx, "error creating notification", "error", err) 482 w.WriteHeader(400) 483 return 484 } 485 log.Log(ctx, "successfully created notification", "token", n.Token) 486 w.WriteHeader(200) 487 if n.RepoDID != "" { 488 go func() { 489 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 490 if err != nil { 491 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 492 } 493 }() 494 } 495 } 496} 497 498func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 499 return func(w http.ResponseWriter, req *http.Request) { 500 err := a.MediaManager.ValidateMP4(ctx, req.Body) 501 if err != nil { 502 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 503 return 504 } 505 w.WriteHeader(200) 506 } 507} 508 509func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 510 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 511 var event model.PlayerEventAPI 512 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 513 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 514 return 515 } 516 err := a.Model.CreatePlayerEvent(event) 517 if err != nil { 518 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 519 return 520 } 521 w.WriteHeader(201) 522 } 523} 524 525func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 526 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 527 segs, err := a.Model.MostRecentSegments() 528 if err != nil { 529 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 530 return 531 } 532 bs, err := json.Marshal(segs) 533 if err != nil { 534 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 535 return 536 } 537 w.Header().Add("Content-Type", "application/json") 538 if _, err := w.Write(bs); err != nil { 539 log.Error(ctx, "error writing response", "error", err) 540 } 541 } 542} 543 544func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 545 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 546 user := params.ByName("repoDID") 547 if user == "" { 548 apierrors.WriteHTTPBadRequest(w, "user required", nil) 549 return 550 } 551 user, err := a.NormalizeUser(ctx, user) 552 if err != nil { 553 apierrors.WriteHTTPNotFound(w, "user not found", err) 554 return 555 } 556 seg, err := a.Model.LatestSegmentForUser(user) 557 if err != nil { 558 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 559 return 560 } 561 streamplaceSeg, err := seg.ToStreamplaceSegment() 562 if err != nil { 563 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 564 return 565 } 566 bs, err := json.Marshal(streamplaceSeg) 567 if err != nil { 568 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 569 return 570 } 571 w.Header().Add("Content-Type", "application/json") 572 if _, err := w.Write(bs); err != nil { 573 log.Error(ctx, "error writing response", "error", err) 574 } 575 } 576} 577 578func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 579 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 580 user := params.ByName("user") 581 if user == "" { 582 apierrors.WriteHTTPBadRequest(w, "user required", nil) 583 return 584 } 585 user, err := a.NormalizeUser(ctx, user) 586 if err != nil { 587 apierrors.WriteHTTPNotFound(w, "user not found", err) 588 return 589 } 590 count := spmetrics.GetViewCount(user) 591 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 592 if err != nil { 593 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 594 return 595 } 596 if _, err := w.Write(bs); err != nil { 597 log.Error(ctx, "error writing response", "error", err) 598 } 599 } 600} 601 602func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 603 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 604 log.Log(ctx, "got bluesky notification", "params", params) 605 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 606 if err != nil { 607 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 608 return 609 } 610 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 611 if err != nil { 612 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 613 return 614 } 615 bs, err := json.Marshal(signingKeys) 616 if err != nil { 617 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 618 return 619 } 620 if _, err := w.Write(bs); err != nil { 621 log.Error(ctx, "error writing response", "error", err) 622 } 623 } 624} 625 626type ChatResponse struct { 627 Post *bsky.FeedPost `json:"post"` 628 Repo *model.Repo `json:"repo"` 629 CID string `json:"cid"` 630} 631 632func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 633 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 634 user := params.ByName("repoDID") 635 if user == "" { 636 apierrors.WriteHTTPBadRequest(w, "user required", nil) 637 return 638 } 639 repoDID, err := a.NormalizeUser(ctx, user) 640 if err != nil { 641 apierrors.WriteHTTPNotFound(w, "user not found", err) 642 return 643 } 644 replies, err := a.Model.GetReplies(repoDID) 645 if err != nil { 646 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 647 return 648 } 649 bs, err := json.Marshal(replies) 650 if err != nil { 651 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 652 return 653 } 654 w.Header().Set("Content-Type", "application/json") 655 if _, err := w.Write(bs); err != nil { 656 log.Error(ctx, "error writing response", "error", err) 657 } 658 } 659} 660 661func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 662 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 663 user := params.ByName("repoDID") 664 if user == "" { 665 apierrors.WriteHTTPBadRequest(w, "user required", nil) 666 return 667 } 668 repoDID, err := a.NormalizeUser(ctx, user) 669 if err != nil { 670 apierrors.WriteHTTPNotFound(w, "user not found", err) 671 return 672 } 673 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 674 if err != nil { 675 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 676 return 677 } 678 679 doc, err := livestream.ToLivestreamView() 680 if err != nil { 681 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 682 return 683 } 684 685 if livestream == nil { 686 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 687 return 688 } 689 690 bs, err := json.Marshal(doc) 691 if err != nil { 692 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 693 return 694 } 695 w.Header().Set("Content-Type", "application/json") 696 if _, err := w.Write(bs); err != nil { 697 log.Error(ctx, "error writing response", "error", err) 698 } 699 } 700} 701 702func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 703 return func(next http.Handler) http.Handler { 704 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 705 ip, _, err := net.SplitHostPort(req.RemoteAddr) 706 if err != nil { 707 ip = req.RemoteAddr 708 } 709 710 if a.CLI.RateLimitPerSecond > 0 { 711 limiter := a.getLimiter(ip) 712 713 if !limiter.Allow() { 714 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 715 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 716 return 717 } 718 } 719 720 next.ServeHTTP(w, req) 721 }) 722 } 723} 724 725// helper for getting a listener from a systemd file descriptor 726func getListenerFromFD(fdName string) (net.Listener, error) { 727 log.Log(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 728 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 729 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 730 for i, name := range names { 731 if name == fdName { 732 f1 := os.NewFile(uintptr(i+3), fdName) 733 return net.FileListener(f1) 734 } 735 } 736 } 737 return nil, nil 738} 739 740func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 741 handler, err := a.Handler(ctx) 742 if err != nil { 743 return err 744 } 745 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 746 ln, err := getListenerFromFD("http") 747 if err != nil { 748 return err 749 } 750 if ln == nil { 751 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 752 if err != nil { 753 return err 754 } 755 } else { 756 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 757 } 758 log.Log(ctx, "http server starting", "addr", ln.Addr()) 759 return s.Serve(ln) 760 }) 761} 762 763func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 764 handler, err := a.RedirectHandler(ctx) 765 if err != nil { 766 return err 767 } 768 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 769 ln, err := getListenerFromFD("http") 770 if err != nil { 771 return err 772 } 773 if ln == nil { 774 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 775 if err != nil { 776 return err 777 } 778 } else { 779 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 780 } 781 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 782 return s.Serve(ln) 783 }) 784} 785 786func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 787 handler, err := a.Handler(ctx) 788 if err != nil { 789 return err 790 } 791 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 792 ln, err := getListenerFromFD("https") 793 if err != nil { 794 return err 795 } 796 if ln == nil { 797 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 798 if err != nil { 799 return err 800 } 801 } else { 802 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 803 } 804 log.Log(ctx, "https server starting", 805 "addr", ln.Addr(), 806 "certPath", a.CLI.TLSCertPath, 807 "keyPath", a.CLI.TLSKeyPath, 808 ) 809 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 810 }) 811} 812 813func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 814 ctx, cancel := context.WithCancel(ctx) 815 handler = gziphandler.GzipHandler(handler) 816 server := http.Server{Handler: handler} 817 var serveErr error 818 go func() { 819 serveErr = serve(&server) 820 cancel() 821 }() 822 <-ctx.Done() 823 if serveErr != nil { 824 return fmt.Errorf("error in http server: %w", serveErr) 825 } 826 827 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 828 defer cancel() 829 return server.Shutdown(ctx) 830} 831 832func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 833 return func(w http.ResponseWriter, req *http.Request) { 834 w.WriteHeader(200) 835 } 836} 837 838func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 839 a.limitersMu.Lock() 840 defer a.limitersMu.Unlock() 841 842 limiter, exists := a.limiters[ip] 843 if !exists { 844 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 845 a.limiters[ip] = limiter 846 } 847 848 return limiter 849} 850 851func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 852 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 853 user := params.ByName("user") 854 file := params.ByName("file") 855 if user == "" || file == "" { 856 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 857 return 858 } 859 user, err := a.NormalizeUser(ctx, user) 860 if err != nil { 861 apierrors.WriteHTTPNotFound(w, "user not found", err) 862 return 863 } 864 fPath := []string{user, "clips", file} 865 exists, err := a.CLI.DataFileExists(fPath) 866 if err != nil { 867 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 868 return 869 } 870 if !exists { 871 apierrors.WriteHTTPNotFound(w, "file not found", nil) 872 return 873 } 874 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 875 if err != nil { 876 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 877 return 878 } 879 defer fd.Close() 880 w.Header().Set("Content-Type", "video/mp4") 881 if _, err := io.Copy(w, fd); err != nil { 882 log.Error(ctx, "error writing response", "error", err) 883 } 884 } 885} 886 887func NewWebsocketTracker(maxConns int) *WebsocketTracker { 888 return &WebsocketTracker{ 889 connections: make(map[string]int), 890 maxConnsPerIP: maxConns, 891 } 892} 893 894func (t *WebsocketTracker) AddConnection(ip string) bool { 895 t.mu.Lock() 896 defer t.mu.Unlock() 897 898 count := t.connections[ip] 899 900 if count >= t.maxConnsPerIP { 901 return false 902 } 903 904 t.connections[ip] = count + 1 905 return true 906} 907 908func (t *WebsocketTracker) RemoveConnection(ip string) { 909 t.mu.Lock() 910 defer t.mu.Unlock() 911 912 count := t.connections[ip] 913 if count > 0 { 914 t.connections[ip] = count - 1 915 } 916 917 if t.connections[ip] == 0 { 918 delete(t.connections, ip) 919 } 920}