Live video on the AT Protocol
at blog-link-in-docs 919 lines 29 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/google/uuid" 23 "github.com/julienschmidt/httprouter" 24 "github.com/rs/cors" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/time/rate" 27 28 "github.com/streamplace/oatproxy/pkg/oatproxy" 29 "stream.place/streamplace/js/app" 30 "stream.place/streamplace/pkg/atproto" 31 "stream.place/streamplace/pkg/bus" 32 "stream.place/streamplace/pkg/config" 33 "stream.place/streamplace/pkg/crypto/signers/eip712" 34 "stream.place/streamplace/pkg/director" 35 apierrors "stream.place/streamplace/pkg/errors" 36 "stream.place/streamplace/pkg/linking" 37 "stream.place/streamplace/pkg/log" 38 "stream.place/streamplace/pkg/media" 39 "stream.place/streamplace/pkg/mist/mistconfig" 40 "stream.place/streamplace/pkg/model" 41 "stream.place/streamplace/pkg/notifications" 42 "stream.place/streamplace/pkg/spmetrics" 43 "stream.place/streamplace/pkg/spxrpc" 44 "stream.place/streamplace/pkg/streamplace" 45 46 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 47 "github.com/slok/go-http-metrics/middleware" 48 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 49 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 50 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 51) 52 53type StreamplaceAPI struct { 54 CLI *config.CLI 55 Model model.Model 56 Updater *Updater 57 Signer *eip712.EIP712Signer 58 Mimes map[string]string 59 FirebaseNotifier notifications.FirebaseNotifier 60 MediaManager *media.MediaManager 61 MediaSigner media.MediaSigner 62 // not thread-safe yet 63 Aliases map[string]string 64 Bus *bus.Bus 65 ATSync *atproto.ATProtoSynchronizer 66 Director *director.Director 67 68 connTracker *WebsocketTracker 69 70 limiters map[string]*rate.Limiter 71 limitersMu sync.Mutex 72 SignerCache map[string]media.MediaSigner 73 SignerCacheMu sync.Mutex 74 op *oatproxy.OATProxy 75} 76 77type WebsocketTracker struct { 78 connections map[string]int 79 maxConnsPerIP int 80 mu sync.RWMutex 81} 82 83func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 84 updater, err := PrepareUpdater(cli) 85 if err != nil { 86 return nil, err 87 } 88 a := &StreamplaceAPI{CLI: cli, 89 Model: mod, 90 Updater: updater, 91 Signer: signer, 92 FirebaseNotifier: noter, 93 MediaManager: mm, 94 MediaSigner: ms, 95 Aliases: map[string]string{}, 96 Bus: bus, 97 ATSync: atsync, 98 Director: d, 99 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 100 limiters: make(map[string]*rate.Limiter), 101 SignerCache: make(map[string]media.MediaSigner), 102 op: op, 103 } 104 a.Mimes, err = updater.GetMimes() 105 if err != nil { 106 return nil, err 107 } 108 return a, nil 109} 110 111type AppHostingFS struct { 112 http.FileSystem 113} 114 115var ErrorIndex = errors.New("not found, use index.html") 116 117func (fs AppHostingFS) Open(name string) (http.File, error) { 118 file, err1 := fs.FileSystem.Open(name) 119 if err1 == nil { 120 return file, nil 121 } 122 return nil, ErrorIndex 123} 124 125// api/playback/iame.li/webrtc?rendition=source 126// api/playback/iame.li/stream.mp4?rendition=source 127// api/playback/iame.li/stream.webm?rendition=source 128// api/playback/iame.li/hls/index.m3u8 129// api/playback/iame.li/hls/source/stream.m3u8 130// api/playback/iame.li/hls/source/000000000000.ts 131 132func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 133 134 mdlw := middleware.New(middleware.Config{ 135 Recorder: metrics.NewRecorder(metrics.Config{}), 136 }) 137 var xrpc http.Handler 138 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw) 139 if err != nil { 140 return nil, err 141 } 142 router := httprouter.New() 143 144 // Create our middleware factory with the default settings. 145 146 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 147 148 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 149 150 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 151 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 152 } 153 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 154 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 155 } 156 157 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 158 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 159 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 160 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 161 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 162 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 163 apiRouter := httprouter.New() 164 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 165 // old clients 166 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 167 // new ones 168 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 169 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 170 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 171 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 172 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 173 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 174 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 175 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 176 addHandle(apiRouter, "GET", "/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 177 addHandle(apiRouter, "GET", "/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 178 // they're jpegs now 179 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 180 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 181 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 182 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 183 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 184 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 185 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 186 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 187 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 188 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 189 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 190 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 191 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 192 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 193 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 194 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 195 apiRouter.NotFound = a.HandleAPI404(ctx) 196 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 197 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 198 router.Handler("GET", "/api/*resource", apiRouterHandler) 199 router.Handler("POST", "/api/*resource", apiRouterHandler) 200 router.Handler("PUT", "/api/*resource", apiRouterHandler) 201 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 202 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 203 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 204 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 205 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 206 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 207 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 208 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 209 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 210 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 211 router.POST("/", a.HandleWebRTCIngest(ctx)) 212 for _, redirect := range a.CLI.Redirects { 213 parts := strings.Split(redirect, ":") 214 if len(parts) != 2 { 215 log.Error(ctx, "invalid redirect", "redirect", redirect) 216 return nil, fmt.Errorf("invalid redirect: %s", redirect) 217 } 218 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 219 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 220 }) 221 } 222 if a.CLI.FrontendProxy != "" { 223 u, err := url.Parse(a.CLI.FrontendProxy) 224 if err != nil { 225 return nil, err 226 } 227 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 228 router.NotFound = &httputil.ReverseProxy{ 229 Rewrite: func(r *httputil.ProxyRequest) { 230 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 231 // we need to use 127.0.0.1 because the atproto oauth client requires it 232 r.Out.Header.Set("Origin", u.String()) 233 r.SetURL(u) 234 }, 235 } 236 } else { 237 files, err := app.Files() 238 if err != nil { 239 return nil, err 240 } 241 index, err := files.Open("index.html") 242 if err != nil { 243 return nil, err 244 } 245 bs, err := io.ReadAll(index) 246 if err != nil { 247 return nil, err 248 } 249 linker, err := linking.NewLinker(ctx, bs) 250 if err != nil { 251 return nil, err 252 } 253 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 254 if err != nil { 255 return nil, err 256 } 257 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 258 } 259 // needed because the WebRTC handler issues 405s from / otherwise 260 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 261 router.NotFound.ServeHTTP(w, r) 262 }) 263 handler := sloghttp.Recovery(router) 264 handler = cors.AllowAll().Handler(handler) 265 handler = sloghttp.New(slog.Default())(handler) 266 handler = a.RateLimitMiddleware(ctx)(handler) 267 268 // this needs to be LAST so nothing else clobbers the context 269 handler = a.ContextMiddleware(ctx)(handler) 270 271 return handler, nil 272} 273func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler { 274 return func(next http.Handler) http.Handler { 275 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 276 uuid := uuid.New().String() 277 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 278 r = r.WithContext(ctx) 279 next.ServeHTTP(w, r) 280 }) 281 } 282} 283func copyHeader(dst, src http.Header) { 284 for k, vv := range src { 285 // we'll handle CORS ourselves, thanks 286 if strings.HasPrefix(k, "Access-Control") { 287 continue 288 } 289 for _, v := range vv { 290 dst.Add(k, v) 291 } 292 } 293} 294 295// handler that takes care of static files and otherwise returns the index.html with the correct link card data 296func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 297 files, err := app.Files() 298 if err != nil { 299 return nil, err 300 } 301 fs := AppHostingFS{http.FS(files)} 302 303 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 304 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 305 f := strings.TrimPrefix(req.URL.Path, "/") 306 // under docs we need the index.html suffix due to astro rendering 307 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 308 f += "index.html" 309 } 310 _, err := fs.Open(f) 311 if err == nil { 312 fileHandler.ServeHTTP(w, req) 313 return 314 } 315 if errors.Is(err, ErrorIndex) || f == "" { 316 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 317 if err != nil { 318 log.Error(ctx, "error generating default card", "error", err) 319 } 320 w.Header().Set("Content-Type", "text/html") 321 if _, err := w.Write(bs); err != nil { 322 log.Error(ctx, "error writing response", "error", err) 323 } 324 } else { 325 log.Warn(ctx, "error opening file", "error", err) 326 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 327 } 328 }) 329 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 330 proto := "http" 331 if req.TLS != nil { 332 proto = "https" 333 } 334 fwProto := req.Header.Get("x-forwarded-proto") 335 if fwProto != "" { 336 proto = fwProto 337 } 338 req.URL.Host = req.Host 339 req.URL.Scheme = proto 340 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 341 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 342 if err != nil || repo == nil { 343 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 344 defaultHandler.ServeHTTP(w, req) 345 return 346 } 347 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 348 if err != nil || ls == nil { 349 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 350 defaultHandler.ServeHTTP(w, req) 351 return 352 } 353 lsv, err := ls.ToLivestreamView() 354 if err != nil || lsv == nil { 355 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 356 defaultHandler.ServeHTTP(w, req) 357 return 358 } 359 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 360 if err != nil { 361 log.Error(ctx, "error generating html", "error", err) 362 defaultHandler.ServeHTTP(w, req) 363 return 364 } 365 w.Header().Set("Content-Type", "text/html") 366 if _, err := w.Write(bs); err != nil { 367 log.Error(ctx, "error writing response", "error", err) 368 } 369 }), nil 370} 371 372func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 373 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 374 if !a.CLI.HasMist() { 375 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 376 return 377 } 378 stream := params.ByName("stream") 379 if stream == "" { 380 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 381 return 382 } 383 384 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 385 prefix := fmt.Sprintf(tmpl, fullstream) 386 resource := params.ByName("resource") 387 388 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 389 390 client := &http.Client{} 391 req.URL = &url.URL{ 392 Scheme: "http", 393 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 394 Path: fmt.Sprintf("%s%s", prefix, resource), 395 RawQuery: req.URL.RawQuery, 396 } 397 398 //http: Request.RequestURI can't be set in client requests. 399 //http://golang.org/src/pkg/net/http/client.go 400 req.RequestURI = "" 401 402 resp, err := client.Do(req) 403 if err != nil { 404 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 405 return 406 } 407 defer resp.Body.Close() 408 409 copyHeader(w.Header(), resp.Header) 410 w.WriteHeader(resp.StatusCode) 411 if _, err := io.Copy(w, resp.Body); err != nil { 412 log.Error(ctx, "error writing response", "error", err) 413 } 414 } 415} 416 417func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 418 return func(w http.ResponseWriter, req *http.Request) { 419 noslash := req.URL.Path[1:] 420 ct, ok := a.Mimes[noslash] 421 if ok { 422 w.Header().Set("content-type", ct) 423 } 424 fs.ServeHTTP(w, req) 425 } 426} 427 428func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 429 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr) 430 if err != nil { 431 return nil, err 432 } 433 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 434 host, _, err := net.SplitHostPort(req.Host) 435 if err != nil { 436 host = req.Host 437 } 438 u := req.URL 439 if tlsPort == "443" { 440 u.Host = host 441 } else { 442 u.Host = net.JoinHostPort(host, tlsPort) 443 } 444 u.Scheme = "https" 445 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 446 } 447 mux := http.NewServeMux() 448 mux.HandleFunc("/", handleRedirect) 449 return mux, nil 450} 451 452type NotificationPayload struct { 453 Token string `json:"token"` 454 RepoDID string `json:"repoDID"` 455} 456 457func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 458 return func(w http.ResponseWriter, req *http.Request) { 459 w.WriteHeader(404) 460 } 461} 462 463func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 464 return func(w http.ResponseWriter, req *http.Request) { 465 payload, err := io.ReadAll(req.Body) 466 if err != nil { 467 log.Log(ctx, "error reading notification create", "error", err) 468 w.WriteHeader(400) 469 return 470 } 471 n := NotificationPayload{} 472 err = json.Unmarshal(payload, &n) 473 if err != nil { 474 log.Log(ctx, "error unmarshalling notification create", "error", err) 475 w.WriteHeader(400) 476 return 477 } 478 err = a.Model.CreateNotification(n.Token, n.RepoDID) 479 if err != nil { 480 log.Log(ctx, "error creating notification", "error", err) 481 w.WriteHeader(400) 482 return 483 } 484 log.Log(ctx, "successfully created notification", "token", n.Token) 485 w.WriteHeader(200) 486 if n.RepoDID != "" { 487 go func() { 488 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 489 if err != nil { 490 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 491 } 492 }() 493 } 494 } 495} 496 497func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 498 return func(w http.ResponseWriter, req *http.Request) { 499 err := a.MediaManager.ValidateMP4(ctx, req.Body) 500 if err != nil { 501 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 502 return 503 } 504 w.WriteHeader(200) 505 } 506} 507 508func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 509 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 510 var event model.PlayerEventAPI 511 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 512 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 513 return 514 } 515 err := a.Model.CreatePlayerEvent(event) 516 if err != nil { 517 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 518 return 519 } 520 w.WriteHeader(201) 521 } 522} 523 524func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 525 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 526 segs, err := a.Model.MostRecentSegments() 527 if err != nil { 528 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 529 return 530 } 531 bs, err := json.Marshal(segs) 532 if err != nil { 533 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 534 return 535 } 536 w.Header().Add("Content-Type", "application/json") 537 if _, err := w.Write(bs); err != nil { 538 log.Error(ctx, "error writing response", "error", err) 539 } 540 } 541} 542 543func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 544 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 545 user := params.ByName("repoDID") 546 if user == "" { 547 apierrors.WriteHTTPBadRequest(w, "user required", nil) 548 return 549 } 550 user, err := a.NormalizeUser(ctx, user) 551 if err != nil { 552 apierrors.WriteHTTPNotFound(w, "user not found", err) 553 return 554 } 555 seg, err := a.Model.LatestSegmentForUser(user) 556 if err != nil { 557 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 558 return 559 } 560 streamplaceSeg, err := seg.ToStreamplaceSegment() 561 if err != nil { 562 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 563 return 564 } 565 bs, err := json.Marshal(streamplaceSeg) 566 if err != nil { 567 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 568 return 569 } 570 w.Header().Add("Content-Type", "application/json") 571 if _, err := w.Write(bs); err != nil { 572 log.Error(ctx, "error writing response", "error", err) 573 } 574 } 575} 576 577func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 578 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 579 user := params.ByName("user") 580 if user == "" { 581 apierrors.WriteHTTPBadRequest(w, "user required", nil) 582 return 583 } 584 user, err := a.NormalizeUser(ctx, user) 585 if err != nil { 586 apierrors.WriteHTTPNotFound(w, "user not found", err) 587 return 588 } 589 count := spmetrics.GetViewCount(user) 590 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 591 if err != nil { 592 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 593 return 594 } 595 if _, err := w.Write(bs); err != nil { 596 log.Error(ctx, "error writing response", "error", err) 597 } 598 } 599} 600 601func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 602 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 603 log.Log(ctx, "got bluesky notification", "params", params) 604 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 605 if err != nil { 606 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 607 return 608 } 609 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 610 if err != nil { 611 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 612 return 613 } 614 bs, err := json.Marshal(signingKeys) 615 if err != nil { 616 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 617 return 618 } 619 if _, err := w.Write(bs); err != nil { 620 log.Error(ctx, "error writing response", "error", err) 621 } 622 } 623} 624 625type ChatResponse struct { 626 Post *bsky.FeedPost `json:"post"` 627 Repo *model.Repo `json:"repo"` 628 CID string `json:"cid"` 629} 630 631func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 632 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 633 user := params.ByName("repoDID") 634 if user == "" { 635 apierrors.WriteHTTPBadRequest(w, "user required", nil) 636 return 637 } 638 repoDID, err := a.NormalizeUser(ctx, user) 639 if err != nil { 640 apierrors.WriteHTTPNotFound(w, "user not found", err) 641 return 642 } 643 replies, err := a.Model.GetReplies(repoDID) 644 if err != nil { 645 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 646 return 647 } 648 bs, err := json.Marshal(replies) 649 if err != nil { 650 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 651 return 652 } 653 w.Header().Set("Content-Type", "application/json") 654 if _, err := w.Write(bs); err != nil { 655 log.Error(ctx, "error writing response", "error", err) 656 } 657 } 658} 659 660func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 661 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 662 user := params.ByName("repoDID") 663 if user == "" { 664 apierrors.WriteHTTPBadRequest(w, "user required", nil) 665 return 666 } 667 repoDID, err := a.NormalizeUser(ctx, user) 668 if err != nil { 669 apierrors.WriteHTTPNotFound(w, "user not found", err) 670 return 671 } 672 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 673 if err != nil { 674 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 675 return 676 } 677 678 doc, err := livestream.ToLivestreamView() 679 if err != nil { 680 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 681 return 682 } 683 684 if livestream == nil { 685 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 686 return 687 } 688 689 bs, err := json.Marshal(doc) 690 if err != nil { 691 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 692 return 693 } 694 w.Header().Set("Content-Type", "application/json") 695 if _, err := w.Write(bs); err != nil { 696 log.Error(ctx, "error writing response", "error", err) 697 } 698 } 699} 700 701func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 702 return func(next http.Handler) http.Handler { 703 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 704 ip, _, err := net.SplitHostPort(req.RemoteAddr) 705 if err != nil { 706 ip = req.RemoteAddr 707 } 708 709 if a.CLI.RateLimitPerSecond > 0 { 710 limiter := a.getLimiter(ip) 711 712 if !limiter.Allow() { 713 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 714 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 715 return 716 } 717 } 718 719 next.ServeHTTP(w, req) 720 }) 721 } 722} 723 724// helper for getting a listener from a systemd file descriptor 725func getListenerFromFD(fdName string) (net.Listener, error) { 726 log.Log(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 727 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 728 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 729 for i, name := range names { 730 if name == fdName { 731 f1 := os.NewFile(uintptr(i+3), fdName) 732 return net.FileListener(f1) 733 } 734 } 735 } 736 return nil, nil 737} 738 739func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 740 handler, err := a.Handler(ctx) 741 if err != nil { 742 return err 743 } 744 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 745 ln, err := getListenerFromFD("http") 746 if err != nil { 747 return err 748 } 749 if ln == nil { 750 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 751 if err != nil { 752 return err 753 } 754 } else { 755 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 756 } 757 log.Log(ctx, "http server starting", "addr", ln.Addr()) 758 return s.Serve(ln) 759 }) 760} 761 762func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 763 handler, err := a.RedirectHandler(ctx) 764 if err != nil { 765 return err 766 } 767 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 768 ln, err := getListenerFromFD("http") 769 if err != nil { 770 return err 771 } 772 if ln == nil { 773 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 774 if err != nil { 775 return err 776 } 777 } else { 778 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 779 } 780 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 781 return s.Serve(ln) 782 }) 783} 784 785func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 786 handler, err := a.Handler(ctx) 787 if err != nil { 788 return err 789 } 790 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 791 ln, err := getListenerFromFD("https") 792 if err != nil { 793 return err 794 } 795 if ln == nil { 796 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 797 if err != nil { 798 return err 799 } 800 } else { 801 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 802 } 803 log.Log(ctx, "https server starting", 804 "addr", ln.Addr(), 805 "certPath", a.CLI.TLSCertPath, 806 "keyPath", a.CLI.TLSKeyPath, 807 ) 808 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 809 }) 810} 811 812func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 813 ctx, cancel := context.WithCancel(ctx) 814 handler = gziphandler.GzipHandler(handler) 815 server := http.Server{Handler: handler} 816 var serveErr error 817 go func() { 818 serveErr = serve(&server) 819 cancel() 820 }() 821 <-ctx.Done() 822 if serveErr != nil { 823 return fmt.Errorf("error in http server: %w", serveErr) 824 } 825 826 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 827 defer cancel() 828 return server.Shutdown(ctx) 829} 830 831func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 832 return func(w http.ResponseWriter, req *http.Request) { 833 w.WriteHeader(200) 834 } 835} 836 837func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 838 a.limitersMu.Lock() 839 defer a.limitersMu.Unlock() 840 841 limiter, exists := a.limiters[ip] 842 if !exists { 843 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 844 a.limiters[ip] = limiter 845 } 846 847 return limiter 848} 849 850func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 851 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 852 user := params.ByName("user") 853 file := params.ByName("file") 854 if user == "" || file == "" { 855 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 856 return 857 } 858 user, err := a.NormalizeUser(ctx, user) 859 if err != nil { 860 apierrors.WriteHTTPNotFound(w, "user not found", err) 861 return 862 } 863 fPath := []string{user, "clips", file} 864 exists, err := a.CLI.DataFileExists(fPath) 865 if err != nil { 866 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 867 return 868 } 869 if !exists { 870 apierrors.WriteHTTPNotFound(w, "file not found", nil) 871 return 872 } 873 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 874 if err != nil { 875 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 876 return 877 } 878 defer fd.Close() 879 w.Header().Set("Content-Type", "video/mp4") 880 if _, err := io.Copy(w, fd); err != nil { 881 log.Error(ctx, "error writing response", "error", err) 882 } 883 } 884} 885 886func NewWebsocketTracker(maxConns int) *WebsocketTracker { 887 return &WebsocketTracker{ 888 connections: make(map[string]int), 889 maxConnsPerIP: maxConns, 890 } 891} 892 893func (t *WebsocketTracker) AddConnection(ip string) bool { 894 t.mu.Lock() 895 defer t.mu.Unlock() 896 897 count := t.connections[ip] 898 899 if count >= t.maxConnsPerIP { 900 return false 901 } 902 903 t.connections[ip] = count + 1 904 return true 905} 906 907func (t *WebsocketTracker) RemoveConnection(ip string) { 908 t.mu.Lock() 909 defer t.mu.Unlock() 910 911 count := t.connections[ip] 912 if count > 0 { 913 t.connections[ip] = count - 1 914 } 915 916 if t.connections[ip] == 0 { 917 delete(t.connections, ip) 918 } 919}