Live video on the AT Protocol
at v0.8.8 942 lines 30 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/google/uuid" 23 "github.com/julienschmidt/httprouter" 24 "github.com/rs/cors" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/time/rate" 27 28 "github.com/streamplace/oatproxy/pkg/oatproxy" 29 "stream.place/streamplace/js/app" 30 "stream.place/streamplace/pkg/atproto" 31 "stream.place/streamplace/pkg/bus" 32 "stream.place/streamplace/pkg/config" 33 "stream.place/streamplace/pkg/crypto/signers/eip712" 34 "stream.place/streamplace/pkg/director" 35 apierrors "stream.place/streamplace/pkg/errors" 36 "stream.place/streamplace/pkg/linking" 37 "stream.place/streamplace/pkg/log" 38 "stream.place/streamplace/pkg/media" 39 "stream.place/streamplace/pkg/mist/mistconfig" 40 "stream.place/streamplace/pkg/model" 41 "stream.place/streamplace/pkg/notifications" 42 "stream.place/streamplace/pkg/spmetrics" 43 "stream.place/streamplace/pkg/spxrpc" 44 "stream.place/streamplace/pkg/statedb" 45 "stream.place/streamplace/pkg/streamplace" 46 47 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 48 "github.com/slok/go-http-metrics/middleware" 49 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 50 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 51 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 52) 53 54type StreamplaceAPI struct { 55 CLI *config.CLI 56 Model model.Model 57 StatefulDB *statedb.StatefulDB 58 Updater *Updater 59 Signer *eip712.EIP712Signer 60 Mimes map[string]string 61 FirebaseNotifier notifications.FirebaseNotifier 62 MediaManager *media.MediaManager 63 MediaSigner media.MediaSigner 64 // not thread-safe yet 65 Aliases map[string]string 66 Bus *bus.Bus 67 ATSync *atproto.ATProtoSynchronizer 68 Director *director.Director 69 70 connTracker *WebsocketTracker 71 72 limiters map[string]*rate.Limiter 73 limitersMu sync.Mutex 74 SignerCache map[string]media.MediaSigner 75 SignerCacheMu sync.Mutex 76 op *oatproxy.OATProxy 77 78 // override tls port for http redirect server if we're using systemd file descriptors 79 HTTPRedirectTLSPort *int 80 sessions map[string]map[string]time.Time 81 sessionsLock sync.RWMutex 82} 83 84type WebsocketTracker struct { 85 connections map[string]int 86 maxConnsPerIP int 87 mu sync.RWMutex 88} 89 90func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 91 updater, err := PrepareUpdater(cli) 92 if err != nil { 93 return nil, err 94 } 95 a := &StreamplaceAPI{CLI: cli, 96 Model: mod, 97 StatefulDB: statefulDB, 98 Updater: updater, 99 Signer: signer, 100 FirebaseNotifier: noter, 101 MediaManager: mm, 102 MediaSigner: ms, 103 Aliases: map[string]string{}, 104 Bus: bus, 105 ATSync: atsync, 106 Director: d, 107 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 108 limiters: make(map[string]*rate.Limiter), 109 SignerCache: make(map[string]media.MediaSigner), 110 op: op, 111 sessions: make(map[string]map[string]time.Time), 112 sessionsLock: sync.RWMutex{}, 113 } 114 a.Mimes, err = updater.GetMimes() 115 if err != nil { 116 return nil, err 117 } 118 return a, nil 119} 120 121type AppHostingFS struct { 122 http.FileSystem 123} 124 125var ErrorIndex = errors.New("not found, use index.html") 126 127func (fs AppHostingFS) Open(name string) (http.File, error) { 128 file, err1 := fs.FileSystem.Open(name) 129 if err1 == nil { 130 return file, nil 131 } 132 return nil, ErrorIndex 133} 134 135// api/playback/iame.li/webrtc?rendition=source 136// api/playback/iame.li/stream.mp4?rendition=source 137// api/playback/iame.li/stream.webm?rendition=source 138// api/playback/iame.li/hls/index.m3u8 139// api/playback/iame.li/hls/source/stream.m3u8 140// api/playback/iame.li/hls/source/000000000000.ts 141 142func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 143 144 mdlw := middleware.New(middleware.Config{ 145 Recorder: metrics.NewRecorder(metrics.Config{}), 146 }) 147 var xrpc http.Handler 148 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync) 149 if err != nil { 150 return nil, err 151 } 152 router := httprouter.New() 153 154 // Create our middleware factory with the default settings. 155 156 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 157 158 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 159 160 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 161 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 162 } 163 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 164 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 165 } 166 167 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 168 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 169 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 170 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 171 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 172 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 173 apiRouter := httprouter.New() 174 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 175 // old clients 176 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 177 // new ones 178 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 179 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 180 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 181 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 182 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 183 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 184 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 185 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 186 // they're jpegs now 187 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 188 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 189 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 190 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 191 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 192 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 193 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 194 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 195 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 196 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 197 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 198 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 199 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 200 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 201 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 202 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) 203 apiRouter.NotFound = a.HandleAPI404(ctx) 204 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 205 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 206 router.Handler("GET", "/api/*resource", apiRouterHandler) 207 router.Handler("POST", "/api/*resource", apiRouterHandler) 208 router.Handler("PUT", "/api/*resource", apiRouterHandler) 209 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 210 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 211 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 212 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 213 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 214 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 215 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 216 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 217 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 218 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 219 router.POST("/", a.HandleWebRTCIngest(ctx)) 220 for _, redirect := range a.CLI.Redirects { 221 parts := strings.Split(redirect, ":") 222 if len(parts) != 2 { 223 log.Error(ctx, "invalid redirect", "redirect", redirect) 224 return nil, fmt.Errorf("invalid redirect: %s", redirect) 225 } 226 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 227 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 228 }) 229 } 230 if a.CLI.FrontendProxy != "" { 231 u, err := url.Parse(a.CLI.FrontendProxy) 232 if err != nil { 233 return nil, err 234 } 235 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 236 router.NotFound = &httputil.ReverseProxy{ 237 Rewrite: func(r *httputil.ProxyRequest) { 238 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 239 // we need to use 127.0.0.1 because the atproto oauth client requires it 240 r.Out.Header.Set("Origin", u.String()) 241 r.SetURL(u) 242 }, 243 } 244 } else { 245 files, err := app.Files() 246 if err != nil { 247 return nil, err 248 } 249 index, err := files.Open("index.html") 250 if err != nil { 251 return nil, err 252 } 253 bs, err := io.ReadAll(index) 254 if err != nil { 255 return nil, err 256 } 257 linker, err := linking.NewLinker(ctx, bs) 258 if err != nil { 259 return nil, err 260 } 261 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 262 if err != nil { 263 return nil, err 264 } 265 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 266 } 267 // needed because the WebRTC handler issues 405s from / otherwise 268 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 269 router.NotFound.ServeHTTP(w, r) 270 }) 271 handler := sloghttp.Recovery(router) 272 handler = cors.AllowAll().Handler(handler) 273 handler = sloghttp.New(slog.Default())(handler) 274 handler = a.RateLimitMiddleware(ctx)(handler) 275 276 // this needs to be LAST so nothing else clobbers the context 277 handler = a.ContextMiddleware(ctx)(handler) 278 279 return handler, nil 280} 281func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler { 282 return func(next http.Handler) http.Handler { 283 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 284 uuid := uuid.New().String() 285 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 286 r = r.WithContext(ctx) 287 next.ServeHTTP(w, r) 288 }) 289 } 290} 291func copyHeader(dst, src http.Header) { 292 for k, vv := range src { 293 // we'll handle CORS ourselves, thanks 294 if strings.HasPrefix(k, "Access-Control") { 295 continue 296 } 297 for _, v := range vv { 298 dst.Add(k, v) 299 } 300 } 301} 302 303// handler that takes care of static files and otherwise returns the index.html with the correct link card data 304func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 305 files, err := app.Files() 306 if err != nil { 307 return nil, err 308 } 309 fs := AppHostingFS{http.FS(files)} 310 311 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 312 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 313 f := strings.TrimPrefix(req.URL.Path, "/") 314 // under docs we need the index.html suffix due to astro rendering 315 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 316 f += "index.html" 317 } 318 _, err := fs.Open(f) 319 if err == nil { 320 fileHandler.ServeHTTP(w, req) 321 return 322 } 323 if errors.Is(err, ErrorIndex) || f == "" { 324 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN) 325 if err != nil { 326 log.Error(ctx, "error generating default card", "error", err) 327 } 328 w.Header().Set("Content-Type", "text/html") 329 if _, err := w.Write(bs); err != nil { 330 log.Error(ctx, "error writing response", "error", err) 331 } 332 } else { 333 log.Warn(ctx, "error opening file", "error", err) 334 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 335 } 336 }) 337 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 338 proto := "http" 339 if req.TLS != nil { 340 proto = "https" 341 } 342 fwProto := req.Header.Get("x-forwarded-proto") 343 if fwProto != "" { 344 proto = fwProto 345 } 346 req.URL.Host = req.Host 347 req.URL.Scheme = proto 348 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 349 // quick check for things that aren't valid handles/dids 350 if strings.ContainsAny(maybeHandle, "/_") { 351 defaultHandler.ServeHTTP(w, req) 352 return 353 } 354 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 355 if err != nil || repo == nil { 356 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 357 defaultHandler.ServeHTTP(w, req) 358 return 359 } 360 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 361 if err != nil || ls == nil { 362 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 363 defaultHandler.ServeHTTP(w, req) 364 return 365 } 366 lsv, err := ls.ToLivestreamView() 367 if err != nil || lsv == nil { 368 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 369 defaultHandler.ServeHTTP(w, req) 370 return 371 } 372 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN) 373 if err != nil { 374 log.Error(ctx, "error generating html", "error", err) 375 defaultHandler.ServeHTTP(w, req) 376 return 377 } 378 w.Header().Set("Content-Type", "text/html") 379 if _, err := w.Write(bs); err != nil { 380 log.Error(ctx, "error writing response", "error", err) 381 } 382 }), nil 383} 384 385func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 386 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 387 if !a.CLI.HasMist() { 388 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 389 return 390 } 391 stream := params.ByName("stream") 392 if stream == "" { 393 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 394 return 395 } 396 397 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 398 prefix := fmt.Sprintf(tmpl, fullstream) 399 resource := params.ByName("resource") 400 401 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 402 403 client := &http.Client{} 404 req.URL = &url.URL{ 405 Scheme: "http", 406 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 407 Path: fmt.Sprintf("%s%s", prefix, resource), 408 RawQuery: req.URL.RawQuery, 409 } 410 411 //http: Request.RequestURI can't be set in client requests. 412 //http://golang.org/src/pkg/net/http/client.go 413 req.RequestURI = "" 414 415 resp, err := client.Do(req) 416 if err != nil { 417 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 418 return 419 } 420 defer resp.Body.Close() 421 422 copyHeader(w.Header(), resp.Header) 423 w.WriteHeader(resp.StatusCode) 424 if _, err := io.Copy(w, resp.Body); err != nil { 425 log.Error(ctx, "error writing response", "error", err) 426 } 427 } 428} 429 430func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 431 return func(w http.ResponseWriter, req *http.Request) { 432 noslash := req.URL.Path[1:] 433 ct, ok := a.Mimes[noslash] 434 if ok { 435 w.Header().Set("content-type", ct) 436 } 437 fs.ServeHTTP(w, req) 438 } 439} 440 441func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 442 var tlsPort string 443 var err error 444 if a.HTTPRedirectTLSPort != nil { 445 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort) 446 } else { 447 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr) 448 if err != nil { 449 return nil, err 450 } 451 } 452 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 453 host, _, err := net.SplitHostPort(req.Host) 454 if err != nil { 455 host = req.Host 456 } 457 u := req.URL 458 if tlsPort == "443" { 459 u.Host = host 460 } else { 461 u.Host = net.JoinHostPort(host, tlsPort) 462 } 463 u.Scheme = "https" 464 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 465 } 466 mux := http.NewServeMux() 467 mux.HandleFunc("/", handleRedirect) 468 return mux, nil 469} 470 471type NotificationPayload struct { 472 Token string `json:"token"` 473 RepoDID string `json:"repoDID"` 474} 475 476func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 477 return func(w http.ResponseWriter, req *http.Request) { 478 w.WriteHeader(404) 479 } 480} 481 482func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 483 return func(w http.ResponseWriter, req *http.Request) { 484 payload, err := io.ReadAll(req.Body) 485 if err != nil { 486 log.Log(ctx, "error reading notification create", "error", err) 487 w.WriteHeader(400) 488 return 489 } 490 n := NotificationPayload{} 491 err = json.Unmarshal(payload, &n) 492 if err != nil { 493 log.Log(ctx, "error unmarshalling notification create", "error", err) 494 w.WriteHeader(400) 495 return 496 } 497 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 498 if err != nil { 499 log.Log(ctx, "error creating notification", "error", err) 500 w.WriteHeader(400) 501 return 502 } 503 log.Log(ctx, "successfully created notification", "token", n.Token) 504 w.WriteHeader(200) 505 if n.RepoDID != "" { 506 go func() { 507 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 508 if err != nil { 509 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 510 } 511 }() 512 } 513 } 514} 515 516func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 517 return func(w http.ResponseWriter, req *http.Request) { 518 err := a.MediaManager.ValidateMP4(ctx, req.Body, false) 519 if err != nil { 520 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 521 return 522 } 523 w.WriteHeader(200) 524 } 525} 526 527func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 528 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 529 var event model.PlayerEventAPI 530 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 531 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 532 return 533 } 534 err := a.Model.CreatePlayerEvent(event) 535 if err != nil { 536 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 537 return 538 } 539 w.WriteHeader(201) 540 } 541} 542 543func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 544 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 545 segs, err := a.Model.MostRecentSegments() 546 if err != nil { 547 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 548 return 549 } 550 bs, err := json.Marshal(segs) 551 if err != nil { 552 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 553 return 554 } 555 w.Header().Add("Content-Type", "application/json") 556 if _, err := w.Write(bs); err != nil { 557 log.Error(ctx, "error writing response", "error", err) 558 } 559 } 560} 561 562func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 563 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 564 user := params.ByName("repoDID") 565 if user == "" { 566 apierrors.WriteHTTPBadRequest(w, "user required", nil) 567 return 568 } 569 user, err := a.NormalizeUser(ctx, user) 570 if err != nil { 571 apierrors.WriteHTTPNotFound(w, "user not found", err) 572 return 573 } 574 seg, err := a.Model.LatestSegmentForUser(user) 575 if err != nil { 576 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 577 return 578 } 579 streamplaceSeg, err := seg.ToStreamplaceSegment() 580 if err != nil { 581 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 582 return 583 } 584 bs, err := json.Marshal(streamplaceSeg) 585 if err != nil { 586 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 587 return 588 } 589 w.Header().Add("Content-Type", "application/json") 590 if _, err := w.Write(bs); err != nil { 591 log.Error(ctx, "error writing response", "error", err) 592 } 593 } 594} 595 596func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 597 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 598 user := params.ByName("user") 599 if user == "" { 600 apierrors.WriteHTTPBadRequest(w, "user required", nil) 601 return 602 } 603 user, err := a.NormalizeUser(ctx, user) 604 if err != nil { 605 apierrors.WriteHTTPNotFound(w, "user not found", err) 606 return 607 } 608 count := spmetrics.GetViewCount(user) 609 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 610 if err != nil { 611 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 612 return 613 } 614 if _, err := w.Write(bs); err != nil { 615 log.Error(ctx, "error writing response", "error", err) 616 } 617 } 618} 619 620func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 621 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 622 log.Log(ctx, "got bluesky notification", "params", params) 623 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 624 if err != nil { 625 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 626 return 627 } 628 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 629 if err != nil { 630 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 631 return 632 } 633 bs, err := json.Marshal(signingKeys) 634 if err != nil { 635 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 636 return 637 } 638 if _, err := w.Write(bs); err != nil { 639 log.Error(ctx, "error writing response", "error", err) 640 } 641 } 642} 643 644type ChatResponse struct { 645 Post *bsky.FeedPost `json:"post"` 646 Repo *model.Repo `json:"repo"` 647 CID string `json:"cid"` 648} 649 650func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 651 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 652 user := params.ByName("repoDID") 653 if user == "" { 654 apierrors.WriteHTTPBadRequest(w, "user required", nil) 655 return 656 } 657 repoDID, err := a.NormalizeUser(ctx, user) 658 if err != nil { 659 apierrors.WriteHTTPNotFound(w, "user not found", err) 660 return 661 } 662 replies, err := a.Model.GetReplies(repoDID) 663 if err != nil { 664 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 665 return 666 } 667 bs, err := json.Marshal(replies) 668 if err != nil { 669 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 670 return 671 } 672 w.Header().Set("Content-Type", "application/json") 673 if _, err := w.Write(bs); err != nil { 674 log.Error(ctx, "error writing response", "error", err) 675 } 676 } 677} 678 679func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 680 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 681 user := params.ByName("repoDID") 682 if user == "" { 683 apierrors.WriteHTTPBadRequest(w, "user required", nil) 684 return 685 } 686 repoDID, err := a.NormalizeUser(ctx, user) 687 if err != nil { 688 apierrors.WriteHTTPNotFound(w, "user not found", err) 689 return 690 } 691 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 692 if err != nil { 693 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 694 return 695 } 696 697 doc, err := livestream.ToLivestreamView() 698 if err != nil { 699 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 700 return 701 } 702 703 if livestream == nil { 704 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 705 return 706 } 707 708 bs, err := json.Marshal(doc) 709 if err != nil { 710 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 711 return 712 } 713 w.Header().Set("Content-Type", "application/json") 714 if _, err := w.Write(bs); err != nil { 715 log.Error(ctx, "error writing response", "error", err) 716 } 717 } 718} 719 720func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 721 return func(next http.Handler) http.Handler { 722 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 723 ip, _, err := net.SplitHostPort(req.RemoteAddr) 724 if err != nil { 725 ip = req.RemoteAddr 726 } 727 728 if a.CLI.RateLimitPerSecond > 0 { 729 limiter := a.getLimiter(ip) 730 731 if !limiter.Allow() { 732 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 733 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 734 return 735 } 736 } 737 738 next.ServeHTTP(w, req) 739 }) 740 } 741} 742 743// helper for getting a listener from a systemd file descriptor 744func getListenerFromFD(fdName string) (net.Listener, error) { 745 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES")) 746 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) { 747 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") 748 for i, name := range names { 749 if name == fdName { 750 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3) 751 f1 := os.NewFile(uintptr(i+3), fdName) 752 return net.FileListener(f1) 753 } 754 } 755 } 756 return nil, nil 757} 758 759func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 760 handler, err := a.Handler(ctx) 761 if err != nil { 762 return err 763 } 764 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 765 ln, err := getListenerFromFD("http") 766 if err != nil { 767 return err 768 } 769 if ln == nil { 770 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 771 if err != nil { 772 return err 773 } 774 } else { 775 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr()) 776 } 777 log.Log(ctx, "http server starting", "addr", ln.Addr()) 778 return s.Serve(ln) 779 }) 780} 781 782func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 783 handler, err := a.RedirectHandler(ctx) 784 if err != nil { 785 return err 786 } 787 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 788 ln, err := getListenerFromFD("http") 789 if err != nil { 790 return err 791 } 792 if ln == nil { 793 ln, err = net.Listen("tcp", a.CLI.HTTPAddr) 794 if err != nil { 795 return err 796 } 797 } else { 798 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr()) 799 } 800 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr()) 801 return s.Serve(ln) 802 }) 803} 804 805func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 806 handler, err := a.Handler(ctx) 807 if err != nil { 808 return err 809 } 810 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 811 ln, err := getListenerFromFD("https") 812 if err != nil { 813 return err 814 } 815 if ln == nil { 816 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr) 817 if err != nil { 818 return err 819 } 820 } else { 821 // tell the redirect handler we're using systemd and they should go to 443 822 port443 := 443 823 a.HTTPRedirectTLSPort = &port443 824 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr()) 825 } 826 log.Log(ctx, "https server starting", 827 "addr", ln.Addr(), 828 "certPath", a.CLI.TLSCertPath, 829 "keyPath", a.CLI.TLSKeyPath, 830 ) 831 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 832 }) 833} 834 835func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 836 ctx, cancel := context.WithCancel(ctx) 837 handler = gziphandler.GzipHandler(handler) 838 server := http.Server{Handler: handler} 839 var serveErr error 840 go func() { 841 serveErr = serve(&server) 842 cancel() 843 }() 844 <-ctx.Done() 845 if serveErr != nil { 846 return fmt.Errorf("error in http server: %w", serveErr) 847 } 848 849 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 850 defer cancel() 851 return server.Shutdown(ctx) 852} 853 854func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 855 return func(w http.ResponseWriter, req *http.Request) { 856 w.WriteHeader(200) 857 } 858} 859 860func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 861 a.limitersMu.Lock() 862 defer a.limitersMu.Unlock() 863 864 limiter, exists := a.limiters[ip] 865 if !exists { 866 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 867 a.limiters[ip] = limiter 868 } 869 870 return limiter 871} 872 873func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle { 874 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 875 user := params.ByName("user") 876 file := params.ByName("file") 877 if user == "" || file == "" { 878 apierrors.WriteHTTPBadRequest(w, "user and file required", nil) 879 return 880 } 881 user, err := a.NormalizeUser(ctx, user) 882 if err != nil { 883 apierrors.WriteHTTPNotFound(w, "user not found", err) 884 return 885 } 886 fPath := []string{user, "clips", file} 887 exists, err := a.CLI.DataFileExists(fPath) 888 if err != nil { 889 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err) 890 return 891 } 892 if !exists { 893 apierrors.WriteHTTPNotFound(w, "file not found", nil) 894 return 895 } 896 fd, err := os.Open(a.CLI.DataFilePath(fPath)) 897 if err != nil { 898 apierrors.WriteHTTPInternalServerError(w, "could not open file", err) 899 return 900 } 901 defer fd.Close() 902 w.Header().Set("Content-Type", "video/mp4") 903 if _, err := io.Copy(w, fd); err != nil { 904 log.Error(ctx, "error writing response", "error", err) 905 } 906 } 907} 908 909func NewWebsocketTracker(maxConns int) *WebsocketTracker { 910 return &WebsocketTracker{ 911 connections: make(map[string]int), 912 maxConnsPerIP: maxConns, 913 } 914} 915 916func (t *WebsocketTracker) AddConnection(ip string) bool { 917 t.mu.Lock() 918 defer t.mu.Unlock() 919 920 count := t.connections[ip] 921 922 if count >= t.maxConnsPerIP { 923 return false 924 } 925 926 t.connections[ip] = count + 1 927 return true 928} 929 930func (t *WebsocketTracker) RemoveConnection(ip string) { 931 t.mu.Lock() 932 defer t.mu.Unlock() 933 934 count := t.connections[ip] 935 if count > 0 { 936 t.connections[ip] = count - 1 937 } 938 939 if t.connections[ip] == 0 { 940 delete(t.connections, ip) 941 } 942}