Live video on the AT Protocol
at eli/actor-status 812 lines 25 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/NYTimes/gziphandler" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/julienschmidt/httprouter" 22 "github.com/rs/cors" 23 sloghttp "github.com/samber/slog-http" 24 "golang.org/x/time/rate" 25 26 "stream.place/streamplace/js/app" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/config" 30 "stream.place/streamplace/pkg/crypto/signers/eip712" 31 "stream.place/streamplace/pkg/director" 32 apierrors "stream.place/streamplace/pkg/errors" 33 "stream.place/streamplace/pkg/linking" 34 "stream.place/streamplace/pkg/log" 35 "stream.place/streamplace/pkg/media" 36 "stream.place/streamplace/pkg/mist/mistconfig" 37 "stream.place/streamplace/pkg/model" 38 "stream.place/streamplace/pkg/notifications" 39 "stream.place/streamplace/pkg/oproxy" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/spxrpc" 42 "stream.place/streamplace/pkg/streamplace" 43) 44 45type StreamplaceAPI struct { 46 CLI *config.CLI 47 Model model.Model 48 Updater *Updater 49 Signer *eip712.EIP712Signer 50 Mimes map[string]string 51 FirebaseNotifier notifications.FirebaseNotifier 52 MediaManager *media.MediaManager 53 MediaSigner media.MediaSigner 54 // not thread-safe yet 55 Aliases map[string]string 56 Bus *bus.Bus 57 ATSync *atproto.ATProtoSynchronizer 58 Director *director.Director 59 60 connTracker *WebsocketTracker 61 62 limiters map[string]*rate.Limiter 63 limitersMu sync.Mutex 64 SignerCache map[string]media.MediaSigner 65 SignerCacheMu sync.Mutex 66 op *oproxy.OProxy 67} 68 69type WebsocketTracker struct { 70 connections map[string]int 71 maxConnsPerIP int 72 mu sync.RWMutex 73} 74 75func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oproxy.OProxy) (*StreamplaceAPI, error) { 76 updater, err := PrepareUpdater(cli) 77 if err != nil { 78 return nil, err 79 } 80 a := &StreamplaceAPI{CLI: cli, 81 Model: mod, 82 Updater: updater, 83 Signer: signer, 84 FirebaseNotifier: noter, 85 MediaManager: mm, 86 MediaSigner: ms, 87 Aliases: map[string]string{}, 88 Bus: bus, 89 ATSync: atsync, 90 Director: d, 91 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 92 limiters: make(map[string]*rate.Limiter), 93 SignerCache: make(map[string]media.MediaSigner), 94 op: op, 95 } 96 a.Mimes, err = updater.GetMimes() 97 if err != nil { 98 return nil, err 99 } 100 return a, nil 101} 102 103type AppHostingFS struct { 104 http.FileSystem 105} 106 107var ErrorIndex = errors.New("not found, use index.html") 108 109func (fs AppHostingFS) Open(name string) (http.File, error) { 110 file, err1 := fs.FileSystem.Open(name) 111 if err1 == nil { 112 return file, nil 113 } 114 if !errors.Is(err1, os.ErrNotExist) { 115 return nil, err1 116 } 117 118 return nil, ErrorIndex 119} 120 121// api/playback/iame.li/webrtc?rendition=source 122// api/playback/iame.li/stream.mp4?rendition=source 123// api/playback/iame.li/stream.webm?rendition=source 124// api/playback/iame.li/hls/index.m3u8 125// api/playback/iame.li/hls/source/stream.m3u8 126// api/playback/iame.li/hls/source/000000000000.ts 127 128func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 129 var xrpc http.Handler 130 xrpc, err := spxrpc.NewServer(a.CLI, a.Model) 131 if err != nil { 132 return nil, err 133 } 134 135 xrpc = a.op.OAuthMiddleware(xrpc) 136 router := httprouter.New() 137 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 138 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 139 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 140 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 141 apiRouter := httprouter.New() 142 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx)) 143 // old clients 144 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx)) 145 146 // new ones 147 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx)) 148 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 149 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 150 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 151 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 152 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx)) 153 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx)) 154 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 155 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 156 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 157 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility 158 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 159 // this one is not a lie 160 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 161 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx)) 162 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 163 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 164 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 165 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx)) 166 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx)) 167 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 168 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx)) 169 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx)) 170 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 171 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 172 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx)) 173 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx)) 174 apiRouter.NotFound = a.HandleAPI404(ctx) 175 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 176 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 177 router.Handler("GET", "/api/*resource", apiRouterHandler) 178 router.Handler("POST", "/api/*resource", apiRouterHandler) 179 router.Handler("PUT", "/api/*resource", apiRouterHandler) 180 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 181 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 182 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 183 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 184 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 185 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 186 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 187 router.GET("/.well-known/did.json", a.HandleDidJson(ctx)) 188 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 189 router.POST("/", a.HandleWebRTCIngest(ctx)) 190 for _, redirect := range a.CLI.Redirects { 191 parts := strings.Split(redirect, ":") 192 if len(parts) != 2 { 193 log.Error(ctx, "invalid redirect", "redirect", redirect) 194 return nil, fmt.Errorf("invalid redirect: %s", redirect) 195 } 196 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 197 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 198 }) 199 } 200 if a.CLI.FrontendProxy != "" { 201 u, err := url.Parse(a.CLI.FrontendProxy) 202 if err != nil { 203 return nil, err 204 } 205 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 206 router.NotFound = &httputil.ReverseProxy{ 207 Rewrite: func(r *httputil.ProxyRequest) { 208 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 209 // we need to use 127.0.0.1 because the atproto oauth client requires it 210 r.Out.Header.Set("Origin", u.String()) 211 r.SetURL(u) 212 }, 213 } 214 } else { 215 files, err := app.Files() 216 if err != nil { 217 return nil, err 218 } 219 index, err := files.Open("index.html") 220 if err != nil { 221 return nil, err 222 } 223 bs, err := io.ReadAll(index) 224 if err != nil { 225 return nil, err 226 } 227 linker, err := linking.NewLinker(ctx, bs) 228 if err != nil { 229 return nil, err 230 } 231 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 232 if err != nil { 233 return nil, err 234 } 235 router.NotFound = linkingHandler 236 } 237 // needed because the WebRTC handler issues 405s from / otherwise 238 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 239 router.NotFound.ServeHTTP(w, r) 240 }) 241 handler := sloghttp.Recovery(router) 242 handler = cors.AllowAll().Handler(handler) 243 handler = sloghttp.New(slog.Default())(handler) 244 handler = a.RateLimitMiddleware(ctx)(handler) 245 246 return handler, nil 247} 248 249func copyHeader(dst, src http.Header) { 250 for k, vv := range src { 251 // we'll handle CORS ourselves, thanks 252 if strings.HasPrefix(k, "Access-Control") { 253 continue 254 } 255 for _, v := range vv { 256 dst.Add(k, v) 257 } 258 } 259} 260 261// handler that takes care of static files and otherwise returns the index.html with the correct link card data 262func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 263 files, err := app.Files() 264 if err != nil { 265 return nil, err 266 } 267 fs := AppHostingFS{http.FS(files)} 268 269 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 270 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 271 f := strings.TrimPrefix(req.URL.Path, "/") 272 // under docs we need the index.html suffix due to astro rendering 273 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 274 f += "index.html" 275 } 276 _, err := fs.Open(f) 277 if err == nil { 278 fileHandler.ServeHTTP(w, req) 279 return 280 } 281 if errors.Is(err, ErrorIndex) || f == "" { 282 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 283 if err != nil { 284 log.Error(ctx, "error generating default card", "error", err) 285 } 286 w.Header().Set("Content-Type", "text/html") 287 w.Write(bs) 288 } else { 289 log.Warn(ctx, "error opening file", "error", err) 290 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 291 } 292 }) 293 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 294 proto := "http" 295 if req.TLS != nil { 296 proto = "https" 297 } 298 fwProto := req.Header.Get("x-forwarded-proto") 299 if fwProto != "" { 300 proto = fwProto 301 } 302 req.URL.Host = req.Host 303 req.URL.Scheme = proto 304 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 305 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 306 if err != nil || repo == nil { 307 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 308 defaultHandler.ServeHTTP(w, req) 309 return 310 } 311 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 312 if err != nil || ls == nil { 313 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 314 defaultHandler.ServeHTTP(w, req) 315 return 316 } 317 lsv, err := ls.ToLivestreamView() 318 if err != nil || lsv == nil { 319 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 320 defaultHandler.ServeHTTP(w, req) 321 return 322 } 323 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 324 if err != nil { 325 log.Error(ctx, "error generating html", "error", err) 326 defaultHandler.ServeHTTP(w, req) 327 return 328 } 329 w.Header().Set("Content-Type", "text/html") 330 w.Write(bs) 331 }), nil 332} 333 334func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 335 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 336 if !a.CLI.HasMist() { 337 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 338 return 339 } 340 stream := params.ByName("stream") 341 if stream == "" { 342 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 343 return 344 } 345 346 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream) 347 prefix := fmt.Sprintf(tmpl, fullstream) 348 resource := params.ByName("resource") 349 350 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 351 352 client := &http.Client{} 353 req.URL = &url.URL{ 354 Scheme: "http", 355 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 356 Path: fmt.Sprintf("%s%s", prefix, resource), 357 RawQuery: req.URL.RawQuery, 358 } 359 360 //http: Request.RequestURI can't be set in client requests. 361 //http://golang.org/src/pkg/net/http/client.go 362 req.RequestURI = "" 363 364 resp, err := client.Do(req) 365 if err != nil { 366 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 367 return 368 } 369 defer resp.Body.Close() 370 371 copyHeader(w.Header(), resp.Header) 372 w.WriteHeader(resp.StatusCode) 373 io.Copy(w, resp.Body) 374 } 375} 376 377func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 378 return func(w http.ResponseWriter, req *http.Request) { 379 noslash := req.URL.Path[1:] 380 ct, ok := a.Mimes[noslash] 381 if ok { 382 w.Header().Set("content-type", ct) 383 } 384 fs.ServeHTTP(w, req) 385 } 386} 387 388func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 389 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr) 390 if err != nil { 391 return nil, err 392 } 393 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 394 host, _, err := net.SplitHostPort(req.Host) 395 if err != nil { 396 host = req.Host 397 } 398 u := req.URL 399 if tlsPort == "443" { 400 u.Host = host 401 } else { 402 u.Host = net.JoinHostPort(host, tlsPort) 403 } 404 u.Scheme = "https" 405 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 406 } 407 mux := http.NewServeMux() 408 mux.HandleFunc("/", handleRedirect) 409 return mux, nil 410} 411 412type NotificationPayload struct { 413 Token string `json:"token"` 414 RepoDID string `json:"repoDID"` 415} 416 417func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 418 return func(w http.ResponseWriter, req *http.Request) { 419 w.WriteHeader(404) 420 } 421} 422 423func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 424 return func(w http.ResponseWriter, req *http.Request) { 425 payload, err := io.ReadAll(req.Body) 426 if err != nil { 427 log.Log(ctx, "error reading notification create", "error", err) 428 w.WriteHeader(400) 429 return 430 } 431 n := NotificationPayload{} 432 err = json.Unmarshal(payload, &n) 433 if err != nil { 434 log.Log(ctx, "error unmarshalling notification create", "error", err) 435 w.WriteHeader(400) 436 return 437 } 438 err = a.Model.CreateNotification(n.Token, n.RepoDID) 439 if err != nil { 440 log.Log(ctx, "error creating notification", "error", err) 441 w.WriteHeader(400) 442 return 443 } 444 log.Log(ctx, "successfully created notification", "token", n.Token) 445 w.WriteHeader(200) 446 if n.RepoDID != "" { 447 go func() { 448 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 449 if err != nil { 450 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 451 } 452 }() 453 } 454 } 455} 456 457func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 458 return func(w http.ResponseWriter, req *http.Request) { 459 err := a.MediaManager.ValidateMP4(ctx, req.Body) 460 if err != nil { 461 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 462 return 463 } 464 w.WriteHeader(200) 465 } 466} 467 468func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 469 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 470 var event model.PlayerEventAPI 471 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 472 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 473 return 474 } 475 err := a.Model.CreatePlayerEvent(event) 476 if err != nil { 477 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 478 return 479 } 480 w.WriteHeader(201) 481 } 482} 483 484func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 485 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 486 segs, err := a.Model.MostRecentSegments() 487 if err != nil { 488 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 489 return 490 } 491 bs, err := json.Marshal(segs) 492 if err != nil { 493 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 494 return 495 } 496 w.Header().Add("Content-Type", "application/json") 497 w.Write(bs) 498 } 499} 500 501func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 502 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 503 user := params.ByName("repoDID") 504 if user == "" { 505 apierrors.WriteHTTPBadRequest(w, "user required", nil) 506 return 507 } 508 user, err := a.NormalizeUser(ctx, user) 509 if err != nil { 510 apierrors.WriteHTTPNotFound(w, "user not found", err) 511 return 512 } 513 seg, err := a.Model.LatestSegmentForUser(user) 514 if err != nil { 515 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 516 return 517 } 518 streamplaceSeg, err := seg.ToStreamplaceSegment() 519 if err != nil { 520 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 521 return 522 } 523 bs, err := json.Marshal(streamplaceSeg) 524 if err != nil { 525 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 526 return 527 } 528 w.Header().Add("Content-Type", "application/json") 529 w.Write(bs) 530 } 531} 532 533type LiveUsersResponse struct { 534 model.Segment 535 Viewers int `json:"viewers"` 536} 537 538func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle { 539 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 540 repos, err := a.Model.MostRecentSegments() 541 if err != nil { 542 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err) 543 return 544 } 545 liveUsers := []LiveUsersResponse{} 546 for _, repo := range repos { 547 viewers := spmetrics.GetViewCount(repo.RepoDID) 548 liveUsers = append(liveUsers, LiveUsersResponse{ 549 Segment: repo, 550 Viewers: viewers, 551 }) 552 } 553 bs, err := json.Marshal(liveUsers) 554 if err != nil { 555 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err) 556 return 557 } 558 w.Write(bs) 559 } 560} 561 562func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 563 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 564 user := params.ByName("user") 565 if user == "" { 566 apierrors.WriteHTTPBadRequest(w, "user required", nil) 567 return 568 } 569 user, err := a.NormalizeUser(ctx, user) 570 if err != nil { 571 apierrors.WriteHTTPNotFound(w, "user not found", err) 572 return 573 } 574 count := spmetrics.GetViewCount(user) 575 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 576 if err != nil { 577 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 578 return 579 } 580 w.Write(bs) 581 } 582} 583 584func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 585 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 586 log.Log(ctx, "got bluesky notification", "params", params) 587 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 588 if err != nil { 589 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 590 return 591 } 592 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 593 if err != nil { 594 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 595 return 596 } 597 bs, err := json.Marshal(signingKeys) 598 if err != nil { 599 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 600 return 601 } 602 w.Write(bs) 603 } 604} 605 606type ChatResponse struct { 607 Post *bsky.FeedPost `json:"post"` 608 Repo *model.Repo `json:"repo"` 609 CID string `json:"cid"` 610} 611 612func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 613 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 614 user := params.ByName("repoDID") 615 if user == "" { 616 apierrors.WriteHTTPBadRequest(w, "user required", nil) 617 return 618 } 619 repoDID, err := a.NormalizeUser(ctx, user) 620 if err != nil { 621 apierrors.WriteHTTPNotFound(w, "user not found", err) 622 return 623 } 624 replies, err := a.Model.GetReplies(repoDID) 625 if err != nil { 626 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 627 return 628 } 629 bs, err := json.Marshal(replies) 630 if err != nil { 631 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 632 return 633 } 634 w.Header().Set("Content-Type", "application/json") 635 w.Write(bs) 636 } 637} 638 639func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 640 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 641 user := params.ByName("repoDID") 642 if user == "" { 643 apierrors.WriteHTTPBadRequest(w, "user required", nil) 644 return 645 } 646 repoDID, err := a.NormalizeUser(ctx, user) 647 if err != nil { 648 apierrors.WriteHTTPNotFound(w, "user not found", err) 649 return 650 } 651 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 652 if err != nil { 653 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 654 return 655 } 656 657 doc, err := livestream.ToLivestreamView() 658 if err != nil { 659 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 660 return 661 } 662 663 if livestream == nil { 664 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 665 return 666 } 667 668 bs, err := json.Marshal(doc) 669 if err != nil { 670 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 671 return 672 } 673 w.Header().Set("Content-Type", "application/json") 674 w.Write(bs) 675 } 676} 677 678func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 679 return func(next http.Handler) http.Handler { 680 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 681 ip, _, err := net.SplitHostPort(req.RemoteAddr) 682 if err != nil { 683 ip = req.RemoteAddr 684 } 685 686 if a.CLI.RateLimitPerSecond > 0 { 687 limiter := a.getLimiter(ip) 688 689 if !limiter.Allow() { 690 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 691 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 692 return 693 } 694 } 695 696 next.ServeHTTP(w, req) 697 }) 698 } 699} 700 701func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 702 handler, err := a.Handler(ctx) 703 if err != nil { 704 return err 705 } 706 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 707 s.Addr = a.CLI.HttpAddr 708 log.Log(ctx, "http server starting", "addr", s.Addr) 709 return s.ListenAndServe() 710 }) 711} 712 713func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 714 handler, err := a.RedirectHandler(ctx) 715 if err != nil { 716 return err 717 } 718 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 719 s.Addr = a.CLI.HttpAddr 720 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 721 return s.ListenAndServe() 722 }) 723} 724 725func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 726 handler, err := a.Handler(ctx) 727 if err != nil { 728 return err 729 } 730 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 731 s.Addr = a.CLI.HttpsAddr 732 log.Log(ctx, "https server starting", 733 "addr", s.Addr, 734 "certPath", a.CLI.TLSCertPath, 735 "keyPath", a.CLI.TLSKeyPath, 736 ) 737 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 738 }) 739} 740 741func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 742 ctx, cancel := context.WithCancel(ctx) 743 handler = gziphandler.GzipHandler(handler) 744 server := http.Server{Handler: handler} 745 var serveErr error 746 go func() { 747 serveErr = serve(&server) 748 cancel() 749 }() 750 <-ctx.Done() 751 if serveErr != nil { 752 return fmt.Errorf("error in http server: %w", serveErr) 753 } 754 755 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 756 defer cancel() 757 return server.Shutdown(ctx) 758} 759 760func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 761 return func(w http.ResponseWriter, req *http.Request) { 762 w.WriteHeader(200) 763 } 764} 765 766func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 767 a.limitersMu.Lock() 768 defer a.limitersMu.Unlock() 769 770 limiter, exists := a.limiters[ip] 771 if !exists { 772 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 773 a.limiters[ip] = limiter 774 } 775 776 return limiter 777} 778 779func NewWebsocketTracker(maxConns int) *WebsocketTracker { 780 return &WebsocketTracker{ 781 connections: make(map[string]int), 782 maxConnsPerIP: maxConns, 783 } 784} 785 786func (t *WebsocketTracker) AddConnection(ip string) bool { 787 t.mu.Lock() 788 defer t.mu.Unlock() 789 790 count := t.connections[ip] 791 792 if count >= t.maxConnsPerIP { 793 return false 794 } 795 796 t.connections[ip] = count + 1 797 return true 798} 799 800func (t *WebsocketTracker) RemoveConnection(ip string) { 801 t.mu.Lock() 802 defer t.mu.Unlock() 803 804 count := t.connections[ip] 805 if count > 0 { 806 t.connections[ip] = count - 1 807 } 808 809 if t.connections[ip] == 0 { 810 delete(t.connections, ip) 811 } 812}