Live video on the AT Protocol
at eli/custom-sources 796 lines 25 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/NYTimes/gziphandler" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/google/uuid" 22 "github.com/julienschmidt/httprouter" 23 "github.com/rs/cors" 24 sloghttp "github.com/samber/slog-http" 25 "golang.org/x/time/rate" 26 27 "stream.place/streamplace/js/app" 28 "stream.place/streamplace/pkg/atproto" 29 "stream.place/streamplace/pkg/bus" 30 "stream.place/streamplace/pkg/config" 31 "stream.place/streamplace/pkg/crypto/signers/eip712" 32 "stream.place/streamplace/pkg/director" 33 apierrors "stream.place/streamplace/pkg/errors" 34 "stream.place/streamplace/pkg/linking" 35 "stream.place/streamplace/pkg/log" 36 "stream.place/streamplace/pkg/media" 37 "stream.place/streamplace/pkg/mist/mistconfig" 38 "stream.place/streamplace/pkg/model" 39 "stream.place/streamplace/pkg/notifications" 40 "stream.place/streamplace/pkg/oproxy" 41 "stream.place/streamplace/pkg/spmetrics" 42 "stream.place/streamplace/pkg/spxrpc" 43 "stream.place/streamplace/pkg/streamplace" 44) 45 46type StreamplaceAPI struct { 47 CLI *config.CLI 48 Model model.Model 49 Updater *Updater 50 Signer *eip712.EIP712Signer 51 Mimes map[string]string 52 FirebaseNotifier notifications.FirebaseNotifier 53 MediaManager *media.MediaManager 54 MediaSigner media.MediaSigner 55 // not thread-safe yet 56 Aliases map[string]string 57 Bus *bus.Bus 58 ATSync *atproto.ATProtoSynchronizer 59 Director *director.Director 60 61 connTracker *WebsocketTracker 62 63 limiters map[string]*rate.Limiter 64 limitersMu sync.Mutex 65 SignerCache map[string]media.MediaSigner 66 SignerCacheMu sync.Mutex 67 op *oproxy.OProxy 68} 69 70type WebsocketTracker struct { 71 connections map[string]int 72 maxConnsPerIP int 73 mu sync.RWMutex 74} 75 76func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oproxy.OProxy) (*StreamplaceAPI, error) { 77 updater, err := PrepareUpdater(cli) 78 if err != nil { 79 return nil, err 80 } 81 a := &StreamplaceAPI{CLI: cli, 82 Model: mod, 83 Updater: updater, 84 Signer: signer, 85 FirebaseNotifier: noter, 86 MediaManager: mm, 87 MediaSigner: ms, 88 Aliases: map[string]string{}, 89 Bus: bus, 90 ATSync: atsync, 91 Director: d, 92 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 93 limiters: make(map[string]*rate.Limiter), 94 SignerCache: make(map[string]media.MediaSigner), 95 op: op, 96 } 97 a.Mimes, err = updater.GetMimes() 98 if err != nil { 99 return nil, err 100 } 101 return a, nil 102} 103 104type AppHostingFS struct { 105 http.FileSystem 106} 107 108var ErrorIndex = errors.New("not found, use index.html") 109 110func (fs AppHostingFS) Open(name string) (http.File, error) { 111 file, err1 := fs.FileSystem.Open(name) 112 if err1 == nil { 113 return file, nil 114 } 115 if !errors.Is(err1, os.ErrNotExist) { 116 return nil, err1 117 } 118 119 return nil, ErrorIndex 120} 121 122// api/playback/iame.li/webrtc?rendition=source 123// api/playback/iame.li/stream.mp4?rendition=source 124// api/playback/iame.li/stream.webm?rendition=source 125// api/playback/iame.li/hls/index.m3u8 126// api/playback/iame.li/hls/source/stream.m3u8 127// api/playback/iame.li/hls/source/000000000000.ts 128 129func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 130 131 var xrpc http.Handler 132 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model) 133 if err != nil { 134 return nil, err 135 } 136 xrpc = a.op.OAuthMiddleware(xrpc) 137 router := httprouter.New() 138 139 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 140 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 141 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 142 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 143 apiRouter := httprouter.New() 144 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx)) 145 // old clients 146 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx)) 147 148 // new ones 149 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx)) 150 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 151 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 152 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 153 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 154 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx)) 155 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx)) 156 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 157 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 158 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 159 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility 160 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 161 // this one is not a lie 162 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 163 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx)) 164 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 165 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 166 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 167 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx)) 168 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx)) 169 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 170 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx)) 171 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx)) 172 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 173 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 174 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx)) 175 apiRouter.NotFound = a.HandleAPI404(ctx) 176 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 177 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 178 router.Handler("GET", "/api/*resource", apiRouterHandler) 179 router.Handler("POST", "/api/*resource", apiRouterHandler) 180 router.Handler("PUT", "/api/*resource", apiRouterHandler) 181 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 182 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 183 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 184 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 185 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 186 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 187 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 188 router.GET("/.well-known/did.json", a.HandleDidJson(ctx)) 189 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 190 router.POST("/", a.HandleWebRTCIngest(ctx)) 191 for _, redirect := range a.CLI.Redirects { 192 parts := strings.Split(redirect, ":") 193 if len(parts) != 2 { 194 log.Error(ctx, "invalid redirect", "redirect", redirect) 195 return nil, fmt.Errorf("invalid redirect: %s", redirect) 196 } 197 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 198 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 199 }) 200 } 201 if a.CLI.FrontendProxy != "" { 202 u, err := url.Parse(a.CLI.FrontendProxy) 203 if err != nil { 204 return nil, err 205 } 206 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 207 router.NotFound = &httputil.ReverseProxy{ 208 Rewrite: func(r *httputil.ProxyRequest) { 209 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 210 // we need to use 127.0.0.1 because the atproto oauth client requires it 211 r.Out.Header.Set("Origin", u.String()) 212 r.SetURL(u) 213 }, 214 } 215 } else { 216 files, err := app.Files() 217 if err != nil { 218 return nil, err 219 } 220 index, err := files.Open("index.html") 221 if err != nil { 222 return nil, err 223 } 224 bs, err := io.ReadAll(index) 225 if err != nil { 226 return nil, err 227 } 228 linker, err := linking.NewLinker(ctx, bs) 229 if err != nil { 230 return nil, err 231 } 232 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 233 if err != nil { 234 return nil, err 235 } 236 router.NotFound = linkingHandler 237 } 238 // needed because the WebRTC handler issues 405s from / otherwise 239 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 240 router.NotFound.ServeHTTP(w, r) 241 }) 242 handler := sloghttp.Recovery(router) 243 handler = cors.AllowAll().Handler(handler) 244 handler = sloghttp.New(slog.Default())(handler) 245 handler = a.RateLimitMiddleware(ctx)(handler) 246 247 // this needs to be LAST so nothing else clobbers the context 248 handler = a.ContextMiddleware(ctx)(handler) 249 250 return handler, nil 251} 252func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler { 253 return func(next http.Handler) http.Handler { 254 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 255 uuid := uuid.New().String() 256 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 257 r = r.WithContext(ctx) 258 next.ServeHTTP(w, r) 259 }) 260 } 261} 262func copyHeader(dst, src http.Header) { 263 for k, vv := range src { 264 // we'll handle CORS ourselves, thanks 265 if strings.HasPrefix(k, "Access-Control") { 266 continue 267 } 268 for _, v := range vv { 269 dst.Add(k, v) 270 } 271 } 272} 273 274// handler that takes care of static files and otherwise returns the index.html with the correct link card data 275func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 276 files, err := app.Files() 277 if err != nil { 278 return nil, err 279 } 280 fs := AppHostingFS{http.FS(files)} 281 282 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 283 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 284 f := strings.TrimPrefix(req.URL.Path, "/") 285 // under docs we need the index.html suffix due to astro rendering 286 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 287 f += "index.html" 288 } 289 _, err := fs.Open(f) 290 if err == nil { 291 fileHandler.ServeHTTP(w, req) 292 return 293 } 294 if errors.Is(err, ErrorIndex) || f == "" { 295 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 296 if err != nil { 297 log.Error(ctx, "error generating default card", "error", err) 298 } 299 w.Header().Set("Content-Type", "text/html") 300 w.Write(bs) 301 } else { 302 log.Warn(ctx, "error opening file", "error", err) 303 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 304 } 305 }) 306 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 307 proto := "http" 308 if req.TLS != nil { 309 proto = "https" 310 } 311 fwProto := req.Header.Get("x-forwarded-proto") 312 if fwProto != "" { 313 proto = fwProto 314 } 315 req.URL.Host = req.Host 316 req.URL.Scheme = proto 317 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 318 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 319 if err != nil || repo == nil { 320 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 321 defaultHandler.ServeHTTP(w, req) 322 return 323 } 324 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 325 if err != nil || ls == nil { 326 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 327 defaultHandler.ServeHTTP(w, req) 328 return 329 } 330 lsv, err := ls.ToLivestreamView() 331 if err != nil || lsv == nil { 332 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 333 defaultHandler.ServeHTTP(w, req) 334 return 335 } 336 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 337 if err != nil { 338 log.Error(ctx, "error generating html", "error", err) 339 defaultHandler.ServeHTTP(w, req) 340 return 341 } 342 w.Header().Set("Content-Type", "text/html") 343 w.Write(bs) 344 }), nil 345} 346 347func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 348 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 349 if !a.CLI.HasMist() { 350 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 351 return 352 } 353 stream := params.ByName("stream") 354 if stream == "" { 355 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 356 return 357 } 358 359 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream) 360 prefix := fmt.Sprintf(tmpl, fullstream) 361 resource := params.ByName("resource") 362 363 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 364 365 client := &http.Client{} 366 req.URL = &url.URL{ 367 Scheme: "http", 368 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 369 Path: fmt.Sprintf("%s%s", prefix, resource), 370 RawQuery: req.URL.RawQuery, 371 } 372 373 //http: Request.RequestURI can't be set in client requests. 374 //http://golang.org/src/pkg/net/http/client.go 375 req.RequestURI = "" 376 377 resp, err := client.Do(req) 378 if err != nil { 379 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 380 return 381 } 382 defer resp.Body.Close() 383 384 copyHeader(w.Header(), resp.Header) 385 w.WriteHeader(resp.StatusCode) 386 io.Copy(w, resp.Body) 387 } 388} 389 390func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 391 return func(w http.ResponseWriter, req *http.Request) { 392 noslash := req.URL.Path[1:] 393 ct, ok := a.Mimes[noslash] 394 if ok { 395 w.Header().Set("content-type", ct) 396 } 397 fs.ServeHTTP(w, req) 398 } 399} 400 401func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 402 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr) 403 if err != nil { 404 return nil, err 405 } 406 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 407 host, _, err := net.SplitHostPort(req.Host) 408 if err != nil { 409 host = req.Host 410 } 411 u := req.URL 412 if tlsPort == "443" { 413 u.Host = host 414 } else { 415 u.Host = net.JoinHostPort(host, tlsPort) 416 } 417 u.Scheme = "https" 418 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 419 } 420 mux := http.NewServeMux() 421 mux.HandleFunc("/", handleRedirect) 422 return mux, nil 423} 424 425type NotificationPayload struct { 426 Token string `json:"token"` 427 RepoDID string `json:"repoDID"` 428} 429 430func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 431 return func(w http.ResponseWriter, req *http.Request) { 432 w.WriteHeader(404) 433 } 434} 435 436func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 437 return func(w http.ResponseWriter, req *http.Request) { 438 payload, err := io.ReadAll(req.Body) 439 if err != nil { 440 log.Log(ctx, "error reading notification create", "error", err) 441 w.WriteHeader(400) 442 return 443 } 444 n := NotificationPayload{} 445 err = json.Unmarshal(payload, &n) 446 if err != nil { 447 log.Log(ctx, "error unmarshalling notification create", "error", err) 448 w.WriteHeader(400) 449 return 450 } 451 err = a.Model.CreateNotification(n.Token, n.RepoDID) 452 if err != nil { 453 log.Log(ctx, "error creating notification", "error", err) 454 w.WriteHeader(400) 455 return 456 } 457 log.Log(ctx, "successfully created notification", "token", n.Token) 458 w.WriteHeader(200) 459 if n.RepoDID != "" { 460 go func() { 461 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 462 if err != nil { 463 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 464 } 465 }() 466 } 467 } 468} 469 470func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 471 return func(w http.ResponseWriter, req *http.Request) { 472 err := a.MediaManager.ValidateMP4(ctx, req.Body) 473 if err != nil { 474 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 475 return 476 } 477 w.WriteHeader(200) 478 } 479} 480 481func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 482 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 483 var event model.PlayerEventAPI 484 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 485 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 486 return 487 } 488 err := a.Model.CreatePlayerEvent(event) 489 if err != nil { 490 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 491 return 492 } 493 w.WriteHeader(201) 494 } 495} 496 497func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 498 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 499 segs, err := a.Model.MostRecentSegments() 500 if err != nil { 501 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 502 return 503 } 504 bs, err := json.Marshal(segs) 505 if err != nil { 506 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 507 return 508 } 509 w.Header().Add("Content-Type", "application/json") 510 w.Write(bs) 511 } 512} 513 514func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 515 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 516 user := params.ByName("repoDID") 517 if user == "" { 518 apierrors.WriteHTTPBadRequest(w, "user required", nil) 519 return 520 } 521 user, err := a.NormalizeUser(ctx, user) 522 if err != nil { 523 apierrors.WriteHTTPNotFound(w, "user not found", err) 524 return 525 } 526 seg, err := a.Model.LatestSegmentForUser(user) 527 if err != nil { 528 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 529 return 530 } 531 streamplaceSeg, err := seg.ToStreamplaceSegment() 532 if err != nil { 533 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 534 return 535 } 536 bs, err := json.Marshal(streamplaceSeg) 537 if err != nil { 538 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 539 return 540 } 541 w.Header().Add("Content-Type", "application/json") 542 w.Write(bs) 543 } 544} 545 546func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 547 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 548 user := params.ByName("user") 549 if user == "" { 550 apierrors.WriteHTTPBadRequest(w, "user required", nil) 551 return 552 } 553 user, err := a.NormalizeUser(ctx, user) 554 if err != nil { 555 apierrors.WriteHTTPNotFound(w, "user not found", err) 556 return 557 } 558 count := spmetrics.GetViewCount(user) 559 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 560 if err != nil { 561 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 562 return 563 } 564 w.Write(bs) 565 } 566} 567 568func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 569 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 570 log.Log(ctx, "got bluesky notification", "params", params) 571 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 572 if err != nil { 573 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 574 return 575 } 576 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 577 if err != nil { 578 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 579 return 580 } 581 bs, err := json.Marshal(signingKeys) 582 if err != nil { 583 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 584 return 585 } 586 w.Write(bs) 587 } 588} 589 590type ChatResponse struct { 591 Post *bsky.FeedPost `json:"post"` 592 Repo *model.Repo `json:"repo"` 593 CID string `json:"cid"` 594} 595 596func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 597 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 598 user := params.ByName("repoDID") 599 if user == "" { 600 apierrors.WriteHTTPBadRequest(w, "user required", nil) 601 return 602 } 603 repoDID, err := a.NormalizeUser(ctx, user) 604 if err != nil { 605 apierrors.WriteHTTPNotFound(w, "user not found", err) 606 return 607 } 608 replies, err := a.Model.GetReplies(repoDID) 609 if err != nil { 610 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 611 return 612 } 613 bs, err := json.Marshal(replies) 614 if err != nil { 615 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 616 return 617 } 618 w.Header().Set("Content-Type", "application/json") 619 w.Write(bs) 620 } 621} 622 623func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 624 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 625 user := params.ByName("repoDID") 626 if user == "" { 627 apierrors.WriteHTTPBadRequest(w, "user required", nil) 628 return 629 } 630 repoDID, err := a.NormalizeUser(ctx, user) 631 if err != nil { 632 apierrors.WriteHTTPNotFound(w, "user not found", err) 633 return 634 } 635 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 636 if err != nil { 637 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 638 return 639 } 640 641 doc, err := livestream.ToLivestreamView() 642 if err != nil { 643 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 644 return 645 } 646 647 if livestream == nil { 648 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 649 return 650 } 651 652 bs, err := json.Marshal(doc) 653 if err != nil { 654 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 655 return 656 } 657 w.Header().Set("Content-Type", "application/json") 658 w.Write(bs) 659 } 660} 661 662func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 663 return func(next http.Handler) http.Handler { 664 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 665 ip, _, err := net.SplitHostPort(req.RemoteAddr) 666 if err != nil { 667 ip = req.RemoteAddr 668 } 669 670 if a.CLI.RateLimitPerSecond > 0 { 671 limiter := a.getLimiter(ip) 672 673 if !limiter.Allow() { 674 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 675 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 676 return 677 } 678 } 679 680 next.ServeHTTP(w, req) 681 }) 682 } 683} 684 685func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 686 handler, err := a.Handler(ctx) 687 if err != nil { 688 return err 689 } 690 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 691 s.Addr = a.CLI.HttpAddr 692 log.Log(ctx, "http server starting", "addr", s.Addr) 693 return s.ListenAndServe() 694 }) 695} 696 697func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 698 handler, err := a.RedirectHandler(ctx) 699 if err != nil { 700 return err 701 } 702 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 703 s.Addr = a.CLI.HttpAddr 704 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 705 return s.ListenAndServe() 706 }) 707} 708 709func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 710 handler, err := a.Handler(ctx) 711 if err != nil { 712 return err 713 } 714 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 715 s.Addr = a.CLI.HttpsAddr 716 log.Log(ctx, "https server starting", 717 "addr", s.Addr, 718 "certPath", a.CLI.TLSCertPath, 719 "keyPath", a.CLI.TLSKeyPath, 720 ) 721 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 722 }) 723} 724 725func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 726 ctx, cancel := context.WithCancel(ctx) 727 handler = gziphandler.GzipHandler(handler) 728 server := http.Server{Handler: handler} 729 var serveErr error 730 go func() { 731 serveErr = serve(&server) 732 cancel() 733 }() 734 <-ctx.Done() 735 if serveErr != nil { 736 return fmt.Errorf("error in http server: %w", serveErr) 737 } 738 739 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 740 defer cancel() 741 return server.Shutdown(ctx) 742} 743 744func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 745 return func(w http.ResponseWriter, req *http.Request) { 746 w.WriteHeader(200) 747 } 748} 749 750func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 751 a.limitersMu.Lock() 752 defer a.limitersMu.Unlock() 753 754 limiter, exists := a.limiters[ip] 755 if !exists { 756 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 757 a.limiters[ip] = limiter 758 } 759 760 return limiter 761} 762 763func NewWebsocketTracker(maxConns int) *WebsocketTracker { 764 return &WebsocketTracker{ 765 connections: make(map[string]int), 766 maxConnsPerIP: maxConns, 767 } 768} 769 770func (t *WebsocketTracker) AddConnection(ip string) bool { 771 t.mu.Lock() 772 defer t.mu.Unlock() 773 774 count := t.connections[ip] 775 776 if count >= t.maxConnsPerIP { 777 return false 778 } 779 780 t.connections[ip] = count + 1 781 return true 782} 783 784func (t *WebsocketTracker) RemoveConnection(ip string) { 785 t.mu.Lock() 786 defer t.mu.Unlock() 787 788 count := t.connections[ip] 789 if count > 0 { 790 t.connections[ip] = count - 1 791 } 792 793 if t.connections[ip] == 0 { 794 delete(t.connections, ip) 795 } 796}