Live video on the AT Protocol
at natb/force-dark-chat 837 lines 27 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/NYTimes/gziphandler" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/google/uuid" 22 "github.com/julienschmidt/httprouter" 23 "github.com/rs/cors" 24 sloghttp "github.com/samber/slog-http" 25 "golang.org/x/time/rate" 26 27 "github.com/streamplace/oatproxy/pkg/oatproxy" 28 "stream.place/streamplace/js/app" 29 "stream.place/streamplace/pkg/atproto" 30 "stream.place/streamplace/pkg/bus" 31 "stream.place/streamplace/pkg/config" 32 "stream.place/streamplace/pkg/crypto/signers/eip712" 33 "stream.place/streamplace/pkg/director" 34 apierrors "stream.place/streamplace/pkg/errors" 35 "stream.place/streamplace/pkg/linking" 36 "stream.place/streamplace/pkg/log" 37 "stream.place/streamplace/pkg/media" 38 "stream.place/streamplace/pkg/mist/mistconfig" 39 "stream.place/streamplace/pkg/model" 40 "stream.place/streamplace/pkg/notifications" 41 "stream.place/streamplace/pkg/spmetrics" 42 "stream.place/streamplace/pkg/spxrpc" 43 "stream.place/streamplace/pkg/streamplace" 44 45 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 46 "github.com/slok/go-http-metrics/middleware" 47 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 48 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 49 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 50) 51 52type StreamplaceAPI struct { 53 CLI *config.CLI 54 Model model.Model 55 Updater *Updater 56 Signer *eip712.EIP712Signer 57 Mimes map[string]string 58 FirebaseNotifier notifications.FirebaseNotifier 59 MediaManager *media.MediaManager 60 MediaSigner media.MediaSigner 61 // not thread-safe yet 62 Aliases map[string]string 63 Bus *bus.Bus 64 ATSync *atproto.ATProtoSynchronizer 65 Director *director.Director 66 67 connTracker *WebsocketTracker 68 69 limiters map[string]*rate.Limiter 70 limitersMu sync.Mutex 71 SignerCache map[string]media.MediaSigner 72 SignerCacheMu sync.Mutex 73 op *oatproxy.OATProxy 74} 75 76type WebsocketTracker struct { 77 connections map[string]int 78 maxConnsPerIP int 79 mu sync.RWMutex 80} 81 82func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 83 updater, err := PrepareUpdater(cli) 84 if err != nil { 85 return nil, err 86 } 87 a := &StreamplaceAPI{CLI: cli, 88 Model: mod, 89 Updater: updater, 90 Signer: signer, 91 FirebaseNotifier: noter, 92 MediaManager: mm, 93 MediaSigner: ms, 94 Aliases: map[string]string{}, 95 Bus: bus, 96 ATSync: atsync, 97 Director: d, 98 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 99 limiters: make(map[string]*rate.Limiter), 100 SignerCache: make(map[string]media.MediaSigner), 101 op: op, 102 } 103 a.Mimes, err = updater.GetMimes() 104 if err != nil { 105 return nil, err 106 } 107 return a, nil 108} 109 110type AppHostingFS struct { 111 http.FileSystem 112} 113 114var ErrorIndex = errors.New("not found, use index.html") 115 116func (fs AppHostingFS) Open(name string) (http.File, error) { 117 file, err1 := fs.FileSystem.Open(name) 118 if err1 == nil { 119 return file, nil 120 } 121 if !errors.Is(err1, os.ErrNotExist) { 122 return nil, err1 123 } 124 125 return nil, ErrorIndex 126} 127 128// api/playback/iame.li/webrtc?rendition=source 129// api/playback/iame.li/stream.mp4?rendition=source 130// api/playback/iame.li/stream.webm?rendition=source 131// api/playback/iame.li/hls/index.m3u8 132// api/playback/iame.li/hls/source/stream.m3u8 133// api/playback/iame.li/hls/source/000000000000.ts 134 135func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 136 137 mdlw := middleware.New(middleware.Config{ 138 Recorder: metrics.NewRecorder(metrics.Config{}), 139 }) 140 var xrpc http.Handler 141 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw) 142 if err != nil { 143 return nil, err 144 } 145 router := httprouter.New() 146 147 // Create our middleware factory with the default settings. 148 149 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 150 151 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 152 153 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 154 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 155 } 156 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 157 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 158 } 159 160 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 161 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 162 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 163 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 164 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 165 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 166 apiRouter := httprouter.New() 167 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 168 // old clients 169 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 170 // new ones 171 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 172 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 173 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 174 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 175 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 176 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 177 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 178 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 179 addHandle(apiRouter, "GET", "/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 180 addHandle(apiRouter, "GET", "/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 181 // they're jpegs now 182 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 183 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 184 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 185 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 186 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 187 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 188 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 189 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 190 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 191 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 192 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 193 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 194 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 195 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 196 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 197 apiRouter.NotFound = a.HandleAPI404(ctx) 198 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 199 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 200 router.Handler("GET", "/api/*resource", apiRouterHandler) 201 router.Handler("POST", "/api/*resource", apiRouterHandler) 202 router.Handler("PUT", "/api/*resource", apiRouterHandler) 203 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 204 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 205 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 206 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 207 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 208 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 209 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 210 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 211 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 212 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 213 router.POST("/", a.HandleWebRTCIngest(ctx)) 214 for _, redirect := range a.CLI.Redirects { 215 parts := strings.Split(redirect, ":") 216 if len(parts) != 2 { 217 log.Error(ctx, "invalid redirect", "redirect", redirect) 218 return nil, fmt.Errorf("invalid redirect: %s", redirect) 219 } 220 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 221 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 222 }) 223 } 224 if a.CLI.FrontendProxy != "" { 225 u, err := url.Parse(a.CLI.FrontendProxy) 226 if err != nil { 227 return nil, err 228 } 229 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 230 router.NotFound = &httputil.ReverseProxy{ 231 Rewrite: func(r *httputil.ProxyRequest) { 232 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 233 // we need to use 127.0.0.1 because the atproto oauth client requires it 234 r.Out.Header.Set("Origin", u.String()) 235 r.SetURL(u) 236 }, 237 } 238 } else { 239 files, err := app.Files() 240 if err != nil { 241 return nil, err 242 } 243 index, err := files.Open("index.html") 244 if err != nil { 245 return nil, err 246 } 247 bs, err := io.ReadAll(index) 248 if err != nil { 249 return nil, err 250 } 251 linker, err := linking.NewLinker(ctx, bs) 252 if err != nil { 253 return nil, err 254 } 255 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 256 if err != nil { 257 return nil, err 258 } 259 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 260 } 261 // needed because the WebRTC handler issues 405s from / otherwise 262 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 263 router.NotFound.ServeHTTP(w, r) 264 }) 265 handler := sloghttp.Recovery(router) 266 handler = cors.AllowAll().Handler(handler) 267 handler = sloghttp.New(slog.Default())(handler) 268 handler = a.RateLimitMiddleware(ctx)(handler) 269 270 // this needs to be LAST so nothing else clobbers the context 271 handler = a.ContextMiddleware(ctx)(handler) 272 273 return handler, nil 274} 275func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler { 276 return func(next http.Handler) http.Handler { 277 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 278 uuid := uuid.New().String() 279 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 280 r = r.WithContext(ctx) 281 next.ServeHTTP(w, r) 282 }) 283 } 284} 285func copyHeader(dst, src http.Header) { 286 for k, vv := range src { 287 // we'll handle CORS ourselves, thanks 288 if strings.HasPrefix(k, "Access-Control") { 289 continue 290 } 291 for _, v := range vv { 292 dst.Add(k, v) 293 } 294 } 295} 296 297// handler that takes care of static files and otherwise returns the index.html with the correct link card data 298func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 299 files, err := app.Files() 300 if err != nil { 301 return nil, err 302 } 303 fs := AppHostingFS{http.FS(files)} 304 305 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 306 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 307 f := strings.TrimPrefix(req.URL.Path, "/") 308 // under docs we need the index.html suffix due to astro rendering 309 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 310 f += "index.html" 311 } 312 _, err := fs.Open(f) 313 if err == nil { 314 fileHandler.ServeHTTP(w, req) 315 return 316 } 317 if errors.Is(err, ErrorIndex) || f == "" { 318 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 319 if err != nil { 320 log.Error(ctx, "error generating default card", "error", err) 321 } 322 w.Header().Set("Content-Type", "text/html") 323 if _, err := w.Write(bs); err != nil { 324 log.Error(ctx, "error writing response", "error", err) 325 } 326 } else { 327 log.Warn(ctx, "error opening file", "error", err) 328 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 329 } 330 }) 331 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 332 proto := "http" 333 if req.TLS != nil { 334 proto = "https" 335 } 336 fwProto := req.Header.Get("x-forwarded-proto") 337 if fwProto != "" { 338 proto = fwProto 339 } 340 req.URL.Host = req.Host 341 req.URL.Scheme = proto 342 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 343 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 344 if err != nil || repo == nil { 345 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 346 defaultHandler.ServeHTTP(w, req) 347 return 348 } 349 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 350 if err != nil || ls == nil { 351 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 352 defaultHandler.ServeHTTP(w, req) 353 return 354 } 355 lsv, err := ls.ToLivestreamView() 356 if err != nil || lsv == nil { 357 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 358 defaultHandler.ServeHTTP(w, req) 359 return 360 } 361 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 362 if err != nil { 363 log.Error(ctx, "error generating html", "error", err) 364 defaultHandler.ServeHTTP(w, req) 365 return 366 } 367 w.Header().Set("Content-Type", "text/html") 368 if _, err := w.Write(bs); err != nil { 369 log.Error(ctx, "error writing response", "error", err) 370 } 371 }), nil 372} 373 374func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 375 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 376 if !a.CLI.HasMist() { 377 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 378 return 379 } 380 stream := params.ByName("stream") 381 if stream == "" { 382 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 383 return 384 } 385 386 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 387 prefix := fmt.Sprintf(tmpl, fullstream) 388 resource := params.ByName("resource") 389 390 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 391 392 client := &http.Client{} 393 req.URL = &url.URL{ 394 Scheme: "http", 395 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 396 Path: fmt.Sprintf("%s%s", prefix, resource), 397 RawQuery: req.URL.RawQuery, 398 } 399 400 //http: Request.RequestURI can't be set in client requests. 401 //http://golang.org/src/pkg/net/http/client.go 402 req.RequestURI = "" 403 404 resp, err := client.Do(req) 405 if err != nil { 406 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 407 return 408 } 409 defer resp.Body.Close() 410 411 copyHeader(w.Header(), resp.Header) 412 w.WriteHeader(resp.StatusCode) 413 if _, err := io.Copy(w, resp.Body); err != nil { 414 log.Error(ctx, "error writing response", "error", err) 415 } 416 } 417} 418 419func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 420 return func(w http.ResponseWriter, req *http.Request) { 421 noslash := req.URL.Path[1:] 422 ct, ok := a.Mimes[noslash] 423 if ok { 424 w.Header().Set("content-type", ct) 425 } 426 fs.ServeHTTP(w, req) 427 } 428} 429 430func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 431 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr) 432 if err != nil { 433 return nil, err 434 } 435 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 436 host, _, err := net.SplitHostPort(req.Host) 437 if err != nil { 438 host = req.Host 439 } 440 u := req.URL 441 if tlsPort == "443" { 442 u.Host = host 443 } else { 444 u.Host = net.JoinHostPort(host, tlsPort) 445 } 446 u.Scheme = "https" 447 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 448 } 449 mux := http.NewServeMux() 450 mux.HandleFunc("/", handleRedirect) 451 return mux, nil 452} 453 454type NotificationPayload struct { 455 Token string `json:"token"` 456 RepoDID string `json:"repoDID"` 457} 458 459func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 460 return func(w http.ResponseWriter, req *http.Request) { 461 w.WriteHeader(404) 462 } 463} 464 465func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 466 return func(w http.ResponseWriter, req *http.Request) { 467 payload, err := io.ReadAll(req.Body) 468 if err != nil { 469 log.Log(ctx, "error reading notification create", "error", err) 470 w.WriteHeader(400) 471 return 472 } 473 n := NotificationPayload{} 474 err = json.Unmarshal(payload, &n) 475 if err != nil { 476 log.Log(ctx, "error unmarshalling notification create", "error", err) 477 w.WriteHeader(400) 478 return 479 } 480 err = a.Model.CreateNotification(n.Token, n.RepoDID) 481 if err != nil { 482 log.Log(ctx, "error creating notification", "error", err) 483 w.WriteHeader(400) 484 return 485 } 486 log.Log(ctx, "successfully created notification", "token", n.Token) 487 w.WriteHeader(200) 488 if n.RepoDID != "" { 489 go func() { 490 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 491 if err != nil { 492 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 493 } 494 }() 495 } 496 } 497} 498 499func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 500 return func(w http.ResponseWriter, req *http.Request) { 501 err := a.MediaManager.ValidateMP4(ctx, req.Body) 502 if err != nil { 503 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 504 return 505 } 506 w.WriteHeader(200) 507 } 508} 509 510func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 511 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 512 var event model.PlayerEventAPI 513 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 514 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 515 return 516 } 517 err := a.Model.CreatePlayerEvent(event) 518 if err != nil { 519 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 520 return 521 } 522 w.WriteHeader(201) 523 } 524} 525 526func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 527 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 528 segs, err := a.Model.MostRecentSegments() 529 if err != nil { 530 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 531 return 532 } 533 bs, err := json.Marshal(segs) 534 if err != nil { 535 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 536 return 537 } 538 w.Header().Add("Content-Type", "application/json") 539 if _, err := w.Write(bs); err != nil { 540 log.Error(ctx, "error writing response", "error", err) 541 } 542 } 543} 544 545func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 546 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 547 user := params.ByName("repoDID") 548 if user == "" { 549 apierrors.WriteHTTPBadRequest(w, "user required", nil) 550 return 551 } 552 user, err := a.NormalizeUser(ctx, user) 553 if err != nil { 554 apierrors.WriteHTTPNotFound(w, "user not found", err) 555 return 556 } 557 seg, err := a.Model.LatestSegmentForUser(user) 558 if err != nil { 559 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 560 return 561 } 562 streamplaceSeg, err := seg.ToStreamplaceSegment() 563 if err != nil { 564 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 565 return 566 } 567 bs, err := json.Marshal(streamplaceSeg) 568 if err != nil { 569 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 570 return 571 } 572 w.Header().Add("Content-Type", "application/json") 573 if _, err := w.Write(bs); err != nil { 574 log.Error(ctx, "error writing response", "error", err) 575 } 576 } 577} 578 579func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 580 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 581 user := params.ByName("user") 582 if user == "" { 583 apierrors.WriteHTTPBadRequest(w, "user required", nil) 584 return 585 } 586 user, err := a.NormalizeUser(ctx, user) 587 if err != nil { 588 apierrors.WriteHTTPNotFound(w, "user not found", err) 589 return 590 } 591 count := spmetrics.GetViewCount(user) 592 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 593 if err != nil { 594 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 595 return 596 } 597 if _, err := w.Write(bs); err != nil { 598 log.Error(ctx, "error writing response", "error", err) 599 } 600 } 601} 602 603func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 604 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 605 log.Log(ctx, "got bluesky notification", "params", params) 606 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 607 if err != nil { 608 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 609 return 610 } 611 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 612 if err != nil { 613 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 614 return 615 } 616 bs, err := json.Marshal(signingKeys) 617 if err != nil { 618 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 619 return 620 } 621 if _, err := w.Write(bs); err != nil { 622 log.Error(ctx, "error writing response", "error", err) 623 } 624 } 625} 626 627type ChatResponse struct { 628 Post *bsky.FeedPost `json:"post"` 629 Repo *model.Repo `json:"repo"` 630 CID string `json:"cid"` 631} 632 633func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 634 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 635 user := params.ByName("repoDID") 636 if user == "" { 637 apierrors.WriteHTTPBadRequest(w, "user required", nil) 638 return 639 } 640 repoDID, err := a.NormalizeUser(ctx, user) 641 if err != nil { 642 apierrors.WriteHTTPNotFound(w, "user not found", err) 643 return 644 } 645 replies, err := a.Model.GetReplies(repoDID) 646 if err != nil { 647 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 648 return 649 } 650 bs, err := json.Marshal(replies) 651 if err != nil { 652 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 653 return 654 } 655 w.Header().Set("Content-Type", "application/json") 656 if _, err := w.Write(bs); err != nil { 657 log.Error(ctx, "error writing response", "error", err) 658 } 659 } 660} 661 662func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 663 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 664 user := params.ByName("repoDID") 665 if user == "" { 666 apierrors.WriteHTTPBadRequest(w, "user required", nil) 667 return 668 } 669 repoDID, err := a.NormalizeUser(ctx, user) 670 if err != nil { 671 apierrors.WriteHTTPNotFound(w, "user not found", err) 672 return 673 } 674 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 675 if err != nil { 676 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 677 return 678 } 679 680 doc, err := livestream.ToLivestreamView() 681 if err != nil { 682 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 683 return 684 } 685 686 if livestream == nil { 687 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 688 return 689 } 690 691 bs, err := json.Marshal(doc) 692 if err != nil { 693 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 694 return 695 } 696 w.Header().Set("Content-Type", "application/json") 697 if _, err := w.Write(bs); err != nil { 698 log.Error(ctx, "error writing response", "error", err) 699 } 700 } 701} 702 703func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 704 return func(next http.Handler) http.Handler { 705 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 706 ip, _, err := net.SplitHostPort(req.RemoteAddr) 707 if err != nil { 708 ip = req.RemoteAddr 709 } 710 711 if a.CLI.RateLimitPerSecond > 0 { 712 limiter := a.getLimiter(ip) 713 714 if !limiter.Allow() { 715 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 716 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 717 return 718 } 719 } 720 721 next.ServeHTTP(w, req) 722 }) 723 } 724} 725 726func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 727 handler, err := a.Handler(ctx) 728 if err != nil { 729 return err 730 } 731 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 732 s.Addr = a.CLI.HTTPAddr 733 log.Log(ctx, "http server starting", "addr", s.Addr) 734 return s.ListenAndServe() 735 }) 736} 737 738func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 739 handler, err := a.RedirectHandler(ctx) 740 if err != nil { 741 return err 742 } 743 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 744 s.Addr = a.CLI.HTTPAddr 745 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 746 return s.ListenAndServe() 747 }) 748} 749 750func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 751 handler, err := a.Handler(ctx) 752 if err != nil { 753 return err 754 } 755 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 756 s.Addr = a.CLI.HTTPSAddr 757 log.Log(ctx, "https server starting", 758 "addr", s.Addr, 759 "certPath", a.CLI.TLSCertPath, 760 "keyPath", a.CLI.TLSKeyPath, 761 ) 762 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 763 }) 764} 765 766func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 767 ctx, cancel := context.WithCancel(ctx) 768 handler = gziphandler.GzipHandler(handler) 769 server := http.Server{Handler: handler} 770 var serveErr error 771 go func() { 772 serveErr = serve(&server) 773 cancel() 774 }() 775 <-ctx.Done() 776 if serveErr != nil { 777 return fmt.Errorf("error in http server: %w", serveErr) 778 } 779 780 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 781 defer cancel() 782 return server.Shutdown(ctx) 783} 784 785func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 786 return func(w http.ResponseWriter, req *http.Request) { 787 w.WriteHeader(200) 788 } 789} 790 791func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 792 a.limitersMu.Lock() 793 defer a.limitersMu.Unlock() 794 795 limiter, exists := a.limiters[ip] 796 if !exists { 797 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 798 a.limiters[ip] = limiter 799 } 800 801 return limiter 802} 803 804func NewWebsocketTracker(maxConns int) *WebsocketTracker { 805 return &WebsocketTracker{ 806 connections: make(map[string]int), 807 maxConnsPerIP: maxConns, 808 } 809} 810 811func (t *WebsocketTracker) AddConnection(ip string) bool { 812 t.mu.Lock() 813 defer t.mu.Unlock() 814 815 count := t.connections[ip] 816 817 if count >= t.maxConnsPerIP { 818 return false 819 } 820 821 t.connections[ip] = count + 1 822 return true 823} 824 825func (t *WebsocketTracker) RemoveConnection(ip string) { 826 t.mu.Lock() 827 defer t.mu.Unlock() 828 829 count := t.connections[ip] 830 if count > 0 { 831 t.connections[ip] = count - 1 832 } 833 834 if t.connections[ip] == 0 { 835 delete(t.connections, ip) 836 } 837}