Live video on the AT Protocol
at eli/standalone-oproxy 795 lines 25 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/NYTimes/gziphandler" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/google/uuid" 22 "github.com/julienschmidt/httprouter" 23 "github.com/rs/cors" 24 sloghttp "github.com/samber/slog-http" 25 "golang.org/x/time/rate" 26 27 "github.com/streamplace/oatproxy/pkg/oatproxy" 28 "stream.place/streamplace/js/app" 29 "stream.place/streamplace/pkg/atproto" 30 "stream.place/streamplace/pkg/bus" 31 "stream.place/streamplace/pkg/config" 32 "stream.place/streamplace/pkg/crypto/signers/eip712" 33 "stream.place/streamplace/pkg/director" 34 apierrors "stream.place/streamplace/pkg/errors" 35 "stream.place/streamplace/pkg/linking" 36 "stream.place/streamplace/pkg/log" 37 "stream.place/streamplace/pkg/media" 38 "stream.place/streamplace/pkg/mist/mistconfig" 39 "stream.place/streamplace/pkg/model" 40 "stream.place/streamplace/pkg/notifications" 41 "stream.place/streamplace/pkg/spmetrics" 42 "stream.place/streamplace/pkg/spxrpc" 43 "stream.place/streamplace/pkg/streamplace" 44) 45 46type StreamplaceAPI struct { 47 CLI *config.CLI 48 Model model.Model 49 Updater *Updater 50 Signer *eip712.EIP712Signer 51 Mimes map[string]string 52 FirebaseNotifier notifications.FirebaseNotifier 53 MediaManager *media.MediaManager 54 MediaSigner media.MediaSigner 55 // not thread-safe yet 56 Aliases map[string]string 57 Bus *bus.Bus 58 ATSync *atproto.ATProtoSynchronizer 59 Director *director.Director 60 61 connTracker *WebsocketTracker 62 63 limiters map[string]*rate.Limiter 64 limitersMu sync.Mutex 65 SignerCache map[string]media.MediaSigner 66 SignerCacheMu sync.Mutex 67 op *oatproxy.OATProxy 68} 69 70type WebsocketTracker struct { 71 connections map[string]int 72 maxConnsPerIP int 73 mu sync.RWMutex 74} 75 76func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 77 updater, err := PrepareUpdater(cli) 78 if err != nil { 79 return nil, err 80 } 81 a := &StreamplaceAPI{CLI: cli, 82 Model: mod, 83 Updater: updater, 84 Signer: signer, 85 FirebaseNotifier: noter, 86 MediaManager: mm, 87 MediaSigner: ms, 88 Aliases: map[string]string{}, 89 Bus: bus, 90 ATSync: atsync, 91 Director: d, 92 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 93 limiters: make(map[string]*rate.Limiter), 94 SignerCache: make(map[string]media.MediaSigner), 95 op: op, 96 } 97 a.Mimes, err = updater.GetMimes() 98 if err != nil { 99 return nil, err 100 } 101 return a, nil 102} 103 104type AppHostingFS struct { 105 http.FileSystem 106} 107 108var ErrorIndex = errors.New("not found, use index.html") 109 110func (fs AppHostingFS) Open(name string) (http.File, error) { 111 file, err1 := fs.FileSystem.Open(name) 112 if err1 == nil { 113 return file, nil 114 } 115 if !errors.Is(err1, os.ErrNotExist) { 116 return nil, err1 117 } 118 119 return nil, ErrorIndex 120} 121 122// api/playback/iame.li/webrtc?rendition=source 123// api/playback/iame.li/stream.mp4?rendition=source 124// api/playback/iame.li/stream.webm?rendition=source 125// api/playback/iame.li/hls/index.m3u8 126// api/playback/iame.li/hls/source/stream.m3u8 127// api/playback/iame.li/hls/source/000000000000.ts 128 129func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 130 131 var xrpc http.Handler 132 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op) 133 if err != nil { 134 return nil, err 135 } 136 router := httprouter.New() 137 138 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 139 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 140 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 141 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 142 apiRouter := httprouter.New() 143 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx)) 144 // old clients 145 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx)) 146 147 // new ones 148 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx)) 149 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 150 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 151 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 152 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 153 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx)) 154 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx)) 155 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 156 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 157 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 158 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility 159 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 160 // this one is not a lie 161 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 162 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx)) 163 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 164 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 165 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 166 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx)) 167 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx)) 168 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 169 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx)) 170 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx)) 171 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 172 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 173 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx)) 174 apiRouter.NotFound = a.HandleAPI404(ctx) 175 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 176 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 177 router.Handler("GET", "/api/*resource", apiRouterHandler) 178 router.Handler("POST", "/api/*resource", apiRouterHandler) 179 router.Handler("PUT", "/api/*resource", apiRouterHandler) 180 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 181 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 182 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 183 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 184 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 185 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 186 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 187 router.GET("/.well-known/did.json", a.HandleDidJson(ctx)) 188 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 189 router.POST("/", a.HandleWebRTCIngest(ctx)) 190 for _, redirect := range a.CLI.Redirects { 191 parts := strings.Split(redirect, ":") 192 if len(parts) != 2 { 193 log.Error(ctx, "invalid redirect", "redirect", redirect) 194 return nil, fmt.Errorf("invalid redirect: %s", redirect) 195 } 196 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 197 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 198 }) 199 } 200 if a.CLI.FrontendProxy != "" { 201 u, err := url.Parse(a.CLI.FrontendProxy) 202 if err != nil { 203 return nil, err 204 } 205 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 206 router.NotFound = &httputil.ReverseProxy{ 207 Rewrite: func(r *httputil.ProxyRequest) { 208 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 209 // we need to use 127.0.0.1 because the atproto oauth client requires it 210 r.Out.Header.Set("Origin", u.String()) 211 r.SetURL(u) 212 }, 213 } 214 } else { 215 files, err := app.Files() 216 if err != nil { 217 return nil, err 218 } 219 index, err := files.Open("index.html") 220 if err != nil { 221 return nil, err 222 } 223 bs, err := io.ReadAll(index) 224 if err != nil { 225 return nil, err 226 } 227 linker, err := linking.NewLinker(ctx, bs) 228 if err != nil { 229 return nil, err 230 } 231 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 232 if err != nil { 233 return nil, err 234 } 235 router.NotFound = linkingHandler 236 } 237 // needed because the WebRTC handler issues 405s from / otherwise 238 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 239 router.NotFound.ServeHTTP(w, r) 240 }) 241 handler := sloghttp.Recovery(router) 242 handler = cors.AllowAll().Handler(handler) 243 handler = sloghttp.New(slog.Default())(handler) 244 handler = a.RateLimitMiddleware(ctx)(handler) 245 246 // this needs to be LAST so nothing else clobbers the context 247 handler = a.ContextMiddleware(ctx)(handler) 248 249 return handler, nil 250} 251func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler { 252 return func(next http.Handler) http.Handler { 253 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 254 uuid := uuid.New().String() 255 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 256 r = r.WithContext(ctx) 257 next.ServeHTTP(w, r) 258 }) 259 } 260} 261func copyHeader(dst, src http.Header) { 262 for k, vv := range src { 263 // we'll handle CORS ourselves, thanks 264 if strings.HasPrefix(k, "Access-Control") { 265 continue 266 } 267 for _, v := range vv { 268 dst.Add(k, v) 269 } 270 } 271} 272 273// handler that takes care of static files and otherwise returns the index.html with the correct link card data 274func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 275 files, err := app.Files() 276 if err != nil { 277 return nil, err 278 } 279 fs := AppHostingFS{http.FS(files)} 280 281 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 282 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 283 f := strings.TrimPrefix(req.URL.Path, "/") 284 // under docs we need the index.html suffix due to astro rendering 285 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 286 f += "index.html" 287 } 288 _, err := fs.Open(f) 289 if err == nil { 290 fileHandler.ServeHTTP(w, req) 291 return 292 } 293 if errors.Is(err, ErrorIndex) || f == "" { 294 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 295 if err != nil { 296 log.Error(ctx, "error generating default card", "error", err) 297 } 298 w.Header().Set("Content-Type", "text/html") 299 w.Write(bs) 300 } else { 301 log.Warn(ctx, "error opening file", "error", err) 302 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 303 } 304 }) 305 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 306 proto := "http" 307 if req.TLS != nil { 308 proto = "https" 309 } 310 fwProto := req.Header.Get("x-forwarded-proto") 311 if fwProto != "" { 312 proto = fwProto 313 } 314 req.URL.Host = req.Host 315 req.URL.Scheme = proto 316 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 317 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 318 if err != nil || repo == nil { 319 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 320 defaultHandler.ServeHTTP(w, req) 321 return 322 } 323 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 324 if err != nil || ls == nil { 325 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 326 defaultHandler.ServeHTTP(w, req) 327 return 328 } 329 lsv, err := ls.ToLivestreamView() 330 if err != nil || lsv == nil { 331 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 332 defaultHandler.ServeHTTP(w, req) 333 return 334 } 335 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 336 if err != nil { 337 log.Error(ctx, "error generating html", "error", err) 338 defaultHandler.ServeHTTP(w, req) 339 return 340 } 341 w.Header().Set("Content-Type", "text/html") 342 w.Write(bs) 343 }), nil 344} 345 346func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 347 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 348 if !a.CLI.HasMist() { 349 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 350 return 351 } 352 stream := params.ByName("stream") 353 if stream == "" { 354 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 355 return 356 } 357 358 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream) 359 prefix := fmt.Sprintf(tmpl, fullstream) 360 resource := params.ByName("resource") 361 362 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 363 364 client := &http.Client{} 365 req.URL = &url.URL{ 366 Scheme: "http", 367 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 368 Path: fmt.Sprintf("%s%s", prefix, resource), 369 RawQuery: req.URL.RawQuery, 370 } 371 372 //http: Request.RequestURI can't be set in client requests. 373 //http://golang.org/src/pkg/net/http/client.go 374 req.RequestURI = "" 375 376 resp, err := client.Do(req) 377 if err != nil { 378 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 379 return 380 } 381 defer resp.Body.Close() 382 383 copyHeader(w.Header(), resp.Header) 384 w.WriteHeader(resp.StatusCode) 385 io.Copy(w, resp.Body) 386 } 387} 388 389func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 390 return func(w http.ResponseWriter, req *http.Request) { 391 noslash := req.URL.Path[1:] 392 ct, ok := a.Mimes[noslash] 393 if ok { 394 w.Header().Set("content-type", ct) 395 } 396 fs.ServeHTTP(w, req) 397 } 398} 399 400func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 401 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr) 402 if err != nil { 403 return nil, err 404 } 405 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 406 host, _, err := net.SplitHostPort(req.Host) 407 if err != nil { 408 host = req.Host 409 } 410 u := req.URL 411 if tlsPort == "443" { 412 u.Host = host 413 } else { 414 u.Host = net.JoinHostPort(host, tlsPort) 415 } 416 u.Scheme = "https" 417 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 418 } 419 mux := http.NewServeMux() 420 mux.HandleFunc("/", handleRedirect) 421 return mux, nil 422} 423 424type NotificationPayload struct { 425 Token string `json:"token"` 426 RepoDID string `json:"repoDID"` 427} 428 429func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 430 return func(w http.ResponseWriter, req *http.Request) { 431 w.WriteHeader(404) 432 } 433} 434 435func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 436 return func(w http.ResponseWriter, req *http.Request) { 437 payload, err := io.ReadAll(req.Body) 438 if err != nil { 439 log.Log(ctx, "error reading notification create", "error", err) 440 w.WriteHeader(400) 441 return 442 } 443 n := NotificationPayload{} 444 err = json.Unmarshal(payload, &n) 445 if err != nil { 446 log.Log(ctx, "error unmarshalling notification create", "error", err) 447 w.WriteHeader(400) 448 return 449 } 450 err = a.Model.CreateNotification(n.Token, n.RepoDID) 451 if err != nil { 452 log.Log(ctx, "error creating notification", "error", err) 453 w.WriteHeader(400) 454 return 455 } 456 log.Log(ctx, "successfully created notification", "token", n.Token) 457 w.WriteHeader(200) 458 if n.RepoDID != "" { 459 go func() { 460 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 461 if err != nil { 462 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 463 } 464 }() 465 } 466 } 467} 468 469func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 470 return func(w http.ResponseWriter, req *http.Request) { 471 err := a.MediaManager.ValidateMP4(ctx, req.Body) 472 if err != nil { 473 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 474 return 475 } 476 w.WriteHeader(200) 477 } 478} 479 480func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 481 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 482 var event model.PlayerEventAPI 483 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 484 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 485 return 486 } 487 err := a.Model.CreatePlayerEvent(event) 488 if err != nil { 489 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 490 return 491 } 492 w.WriteHeader(201) 493 } 494} 495 496func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 497 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 498 segs, err := a.Model.MostRecentSegments() 499 if err != nil { 500 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 501 return 502 } 503 bs, err := json.Marshal(segs) 504 if err != nil { 505 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 506 return 507 } 508 w.Header().Add("Content-Type", "application/json") 509 w.Write(bs) 510 } 511} 512 513func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 514 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 515 user := params.ByName("repoDID") 516 if user == "" { 517 apierrors.WriteHTTPBadRequest(w, "user required", nil) 518 return 519 } 520 user, err := a.NormalizeUser(ctx, user) 521 if err != nil { 522 apierrors.WriteHTTPNotFound(w, "user not found", err) 523 return 524 } 525 seg, err := a.Model.LatestSegmentForUser(user) 526 if err != nil { 527 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 528 return 529 } 530 streamplaceSeg, err := seg.ToStreamplaceSegment() 531 if err != nil { 532 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 533 return 534 } 535 bs, err := json.Marshal(streamplaceSeg) 536 if err != nil { 537 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 538 return 539 } 540 w.Header().Add("Content-Type", "application/json") 541 w.Write(bs) 542 } 543} 544 545func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 546 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 547 user := params.ByName("user") 548 if user == "" { 549 apierrors.WriteHTTPBadRequest(w, "user required", nil) 550 return 551 } 552 user, err := a.NormalizeUser(ctx, user) 553 if err != nil { 554 apierrors.WriteHTTPNotFound(w, "user not found", err) 555 return 556 } 557 count := spmetrics.GetViewCount(user) 558 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 559 if err != nil { 560 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 561 return 562 } 563 w.Write(bs) 564 } 565} 566 567func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 568 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 569 log.Log(ctx, "got bluesky notification", "params", params) 570 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 571 if err != nil { 572 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 573 return 574 } 575 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 576 if err != nil { 577 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 578 return 579 } 580 bs, err := json.Marshal(signingKeys) 581 if err != nil { 582 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 583 return 584 } 585 w.Write(bs) 586 } 587} 588 589type ChatResponse struct { 590 Post *bsky.FeedPost `json:"post"` 591 Repo *model.Repo `json:"repo"` 592 CID string `json:"cid"` 593} 594 595func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 596 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 597 user := params.ByName("repoDID") 598 if user == "" { 599 apierrors.WriteHTTPBadRequest(w, "user required", nil) 600 return 601 } 602 repoDID, err := a.NormalizeUser(ctx, user) 603 if err != nil { 604 apierrors.WriteHTTPNotFound(w, "user not found", err) 605 return 606 } 607 replies, err := a.Model.GetReplies(repoDID) 608 if err != nil { 609 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 610 return 611 } 612 bs, err := json.Marshal(replies) 613 if err != nil { 614 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 615 return 616 } 617 w.Header().Set("Content-Type", "application/json") 618 w.Write(bs) 619 } 620} 621 622func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 623 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 624 user := params.ByName("repoDID") 625 if user == "" { 626 apierrors.WriteHTTPBadRequest(w, "user required", nil) 627 return 628 } 629 repoDID, err := a.NormalizeUser(ctx, user) 630 if err != nil { 631 apierrors.WriteHTTPNotFound(w, "user not found", err) 632 return 633 } 634 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 635 if err != nil { 636 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 637 return 638 } 639 640 doc, err := livestream.ToLivestreamView() 641 if err != nil { 642 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 643 return 644 } 645 646 if livestream == nil { 647 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 648 return 649 } 650 651 bs, err := json.Marshal(doc) 652 if err != nil { 653 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 654 return 655 } 656 w.Header().Set("Content-Type", "application/json") 657 w.Write(bs) 658 } 659} 660 661func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 662 return func(next http.Handler) http.Handler { 663 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 664 ip, _, err := net.SplitHostPort(req.RemoteAddr) 665 if err != nil { 666 ip = req.RemoteAddr 667 } 668 669 if a.CLI.RateLimitPerSecond > 0 { 670 limiter := a.getLimiter(ip) 671 672 if !limiter.Allow() { 673 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 674 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 675 return 676 } 677 } 678 679 next.ServeHTTP(w, req) 680 }) 681 } 682} 683 684func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 685 handler, err := a.Handler(ctx) 686 if err != nil { 687 return err 688 } 689 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 690 s.Addr = a.CLI.HttpAddr 691 log.Log(ctx, "http server starting", "addr", s.Addr) 692 return s.ListenAndServe() 693 }) 694} 695 696func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 697 handler, err := a.RedirectHandler(ctx) 698 if err != nil { 699 return err 700 } 701 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 702 s.Addr = a.CLI.HttpAddr 703 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 704 return s.ListenAndServe() 705 }) 706} 707 708func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 709 handler, err := a.Handler(ctx) 710 if err != nil { 711 return err 712 } 713 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 714 s.Addr = a.CLI.HttpsAddr 715 log.Log(ctx, "https server starting", 716 "addr", s.Addr, 717 "certPath", a.CLI.TLSCertPath, 718 "keyPath", a.CLI.TLSKeyPath, 719 ) 720 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 721 }) 722} 723 724func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 725 ctx, cancel := context.WithCancel(ctx) 726 handler = gziphandler.GzipHandler(handler) 727 server := http.Server{Handler: handler} 728 var serveErr error 729 go func() { 730 serveErr = serve(&server) 731 cancel() 732 }() 733 <-ctx.Done() 734 if serveErr != nil { 735 return fmt.Errorf("error in http server: %w", serveErr) 736 } 737 738 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 739 defer cancel() 740 return server.Shutdown(ctx) 741} 742 743func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 744 return func(w http.ResponseWriter, req *http.Request) { 745 w.WriteHeader(200) 746 } 747} 748 749func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 750 a.limitersMu.Lock() 751 defer a.limitersMu.Unlock() 752 753 limiter, exists := a.limiters[ip] 754 if !exists { 755 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 756 a.limiters[ip] = limiter 757 } 758 759 return limiter 760} 761 762func NewWebsocketTracker(maxConns int) *WebsocketTracker { 763 return &WebsocketTracker{ 764 connections: make(map[string]int), 765 maxConnsPerIP: maxConns, 766 } 767} 768 769func (t *WebsocketTracker) AddConnection(ip string) bool { 770 t.mu.Lock() 771 defer t.mu.Unlock() 772 773 count := t.connections[ip] 774 775 if count >= t.maxConnsPerIP { 776 return false 777 } 778 779 t.connections[ip] = count + 1 780 return true 781} 782 783func (t *WebsocketTracker) RemoveConnection(ip string) { 784 t.mu.Lock() 785 defer t.mu.Unlock() 786 787 count := t.connections[ip] 788 if count > 0 { 789 t.connections[ip] = count - 1 790 } 791 792 if t.connections[ip] == 0 { 793 delete(t.connections, ip) 794 } 795}