Live video on the AT Protocol
at eli/node-22 816 lines 25 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/NYTimes/gziphandler" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/julienschmidt/httprouter" 22 "github.com/rs/cors" 23 sloghttp "github.com/samber/slog-http" 24 "golang.org/x/time/rate" 25 26 "stream.place/streamplace/js/app" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/config" 30 "stream.place/streamplace/pkg/crypto/signers/eip712" 31 "stream.place/streamplace/pkg/director" 32 apierrors "stream.place/streamplace/pkg/errors" 33 "stream.place/streamplace/pkg/linking" 34 "stream.place/streamplace/pkg/log" 35 "stream.place/streamplace/pkg/media" 36 "stream.place/streamplace/pkg/mist/mistconfig" 37 "stream.place/streamplace/pkg/model" 38 "stream.place/streamplace/pkg/notifications" 39 "stream.place/streamplace/pkg/oproxy" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/spxrpc" 42 "stream.place/streamplace/pkg/streamplace" 43) 44 45type StreamplaceAPI struct { 46 CLI *config.CLI 47 Model model.Model 48 Updater *Updater 49 Signer *eip712.EIP712Signer 50 Mimes map[string]string 51 FirebaseNotifier notifications.FirebaseNotifier 52 MediaManager *media.MediaManager 53 MediaSigner media.MediaSigner 54 // not thread-safe yet 55 Aliases map[string]string 56 Bus *bus.Bus 57 ATSync *atproto.ATProtoSynchronizer 58 Director *director.Director 59 60 connTracker *WebsocketTracker 61 62 limiters map[string]*rate.Limiter 63 limitersMu sync.Mutex 64} 65 66type WebsocketTracker struct { 67 connections map[string]int 68 maxConnsPerIP int 69 mu sync.RWMutex 70} 71 72func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director) (*StreamplaceAPI, error) { 73 updater, err := PrepareUpdater(cli) 74 if err != nil { 75 return nil, err 76 } 77 a := &StreamplaceAPI{CLI: cli, 78 Model: mod, 79 Updater: updater, 80 Signer: signer, 81 FirebaseNotifier: noter, 82 MediaManager: mm, 83 MediaSigner: ms, 84 Aliases: map[string]string{}, 85 Bus: bus, 86 ATSync: atsync, 87 Director: d, 88 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 89 limiters: make(map[string]*rate.Limiter), 90 } 91 a.Mimes, err = updater.GetMimes() 92 if err != nil { 93 return nil, err 94 } 95 return a, nil 96} 97 98type AppHostingFS struct { 99 http.FileSystem 100} 101 102var ErrorIndex = errors.New("not found, use index.html") 103 104func (fs AppHostingFS) Open(name string) (http.File, error) { 105 file, err1 := fs.FileSystem.Open(name) 106 if err1 == nil { 107 return file, nil 108 } 109 if !errors.Is(err1, os.ErrNotExist) { 110 return nil, err1 111 } 112 113 return nil, ErrorIndex 114} 115 116// api/playback/iame.li/webrtc?rendition=source 117// api/playback/iame.li/stream.mp4?rendition=source 118// api/playback/iame.li/stream.webm?rendition=source 119// api/playback/iame.li/hls/index.m3u8 120// api/playback/iame.li/hls/source/stream.m3u8 121// api/playback/iame.li/hls/source/000000000000.ts 122 123func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 124 var xrpc http.Handler 125 xrpc, err := spxrpc.NewServer(a.CLI, a.Model) 126 if err != nil { 127 return nil, err 128 } 129 op := oproxy.New(&oproxy.Config{ 130 Host: a.CLI.PublicHost, 131 CreateOAuthSession: a.Model.CreateOAuthSession, 132 UpdateOAuthSession: a.Model.UpdateOAuthSession, 133 LoadOAuthSession: a.Model.LoadOAuthSession, 134 Scope: "atproto transition:generic", 135 UpstreamJWK: a.CLI.JWK, 136 DownstreamJWK: a.CLI.AccessJWK, 137 }) 138 139 xrpc = op.OAuthMiddleware(xrpc) 140 router := httprouter.New() 141 router.Handler("GET", "/oauth/*anything", op.Handler()) 142 router.Handler("POST", "/oauth/*anything", op.Handler()) 143 router.Handler("GET", "/.well-known/oauth-authorization-server", op.Handler()) 144 router.Handler("GET", "/.well-known/oauth-protected-resource", op.Handler()) 145 apiRouter := httprouter.New() 146 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx)) 147 // old clients 148 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx)) 149 150 // new ones 151 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx)) 152 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 153 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 154 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 155 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 156 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx)) 157 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx)) 158 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 159 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 160 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 161 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility 162 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 163 // this one is not a lie 164 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 165 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx)) 166 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 167 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 168 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 169 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx)) 170 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx)) 171 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 172 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx)) 173 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx)) 174 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 175 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 176 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx)) 177 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx)) 178 apiRouter.NotFound = a.HandleAPI404(ctx) 179 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 180 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 181 router.Handler("GET", "/api/*resource", apiRouterHandler) 182 router.Handler("POST", "/api/*resource", apiRouterHandler) 183 router.Handler("PUT", "/api/*resource", apiRouterHandler) 184 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 185 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 186 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 187 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 188 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 189 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 190 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 191 router.GET("/.well-known/did.json", a.HandleDidJson(ctx)) 192 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 193 router.POST("/", a.HandleWebRTCIngest(ctx)) 194 for _, redirect := range a.CLI.Redirects { 195 parts := strings.Split(redirect, ":") 196 if len(parts) != 2 { 197 log.Error(ctx, "invalid redirect", "redirect", redirect) 198 return nil, fmt.Errorf("invalid redirect: %s", redirect) 199 } 200 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 201 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 202 }) 203 } 204 if a.CLI.FrontendProxy != "" { 205 u, err := url.Parse(a.CLI.FrontendProxy) 206 if err != nil { 207 return nil, err 208 } 209 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 210 router.NotFound = &httputil.ReverseProxy{ 211 Rewrite: func(r *httputil.ProxyRequest) { 212 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 213 // we need to use 127.0.0.1 because the atproto oauth client requires it 214 r.Out.Header.Set("Origin", u.String()) 215 r.SetURL(u) 216 }, 217 } 218 } else { 219 files, err := app.Files() 220 if err != nil { 221 return nil, err 222 } 223 index, err := files.Open("index.html") 224 if err != nil { 225 return nil, err 226 } 227 bs, err := io.ReadAll(index) 228 if err != nil { 229 return nil, err 230 } 231 linker, err := linking.NewLinker(ctx, bs) 232 if err != nil { 233 return nil, err 234 } 235 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 236 if err != nil { 237 return nil, err 238 } 239 router.NotFound = linkingHandler 240 } 241 // needed because the WebRTC handler issues 405s from / otherwise 242 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 243 router.NotFound.ServeHTTP(w, r) 244 }) 245 handler := sloghttp.Recovery(router) 246 handler = cors.AllowAll().Handler(handler) 247 handler = sloghttp.New(slog.Default())(handler) 248 handler = a.RateLimitMiddleware(ctx)(handler) 249 250 return handler, nil 251} 252 253func copyHeader(dst, src http.Header) { 254 for k, vv := range src { 255 // we'll handle CORS ourselves, thanks 256 if strings.HasPrefix(k, "Access-Control") { 257 continue 258 } 259 for _, v := range vv { 260 dst.Add(k, v) 261 } 262 } 263} 264 265// handler that takes care of static files and otherwise returns the index.html with the correct link card data 266func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 267 files, err := app.Files() 268 if err != nil { 269 return nil, err 270 } 271 fs := AppHostingFS{http.FS(files)} 272 273 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 274 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 275 f := strings.TrimPrefix(req.URL.Path, "/") 276 // under docs we need the index.html suffix due to astro rendering 277 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 278 f += "index.html" 279 } 280 _, err := fs.Open(f) 281 if err == nil { 282 fileHandler.ServeHTTP(w, req) 283 return 284 } 285 if errors.Is(err, ErrorIndex) || f == "" { 286 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 287 if err != nil { 288 log.Error(ctx, "error generating default card", "error", err) 289 } 290 w.Header().Set("Content-Type", "text/html") 291 w.Write(bs) 292 } else { 293 log.Warn(ctx, "error opening file", "error", err) 294 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 295 } 296 }) 297 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 298 proto := "http" 299 if req.TLS != nil { 300 proto = "https" 301 } 302 fwProto := req.Header.Get("x-forwarded-proto") 303 if fwProto != "" { 304 proto = fwProto 305 } 306 req.URL.Host = req.Host 307 req.URL.Scheme = proto 308 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 309 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 310 if err != nil || repo == nil { 311 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 312 defaultHandler.ServeHTTP(w, req) 313 return 314 } 315 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 316 if err != nil || ls == nil { 317 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 318 defaultHandler.ServeHTTP(w, req) 319 return 320 } 321 lsv, err := ls.ToLivestreamView() 322 if err != nil || lsv == nil { 323 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 324 defaultHandler.ServeHTTP(w, req) 325 return 326 } 327 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 328 if err != nil { 329 log.Error(ctx, "error generating html", "error", err) 330 defaultHandler.ServeHTTP(w, req) 331 return 332 } 333 w.Header().Set("Content-Type", "text/html") 334 w.Write(bs) 335 }), nil 336} 337 338func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 339 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 340 if !a.CLI.HasMist() { 341 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 342 return 343 } 344 stream := params.ByName("stream") 345 if stream == "" { 346 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 347 return 348 } 349 350 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream) 351 prefix := fmt.Sprintf(tmpl, fullstream) 352 resource := params.ByName("resource") 353 354 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 355 356 client := &http.Client{} 357 req.URL = &url.URL{ 358 Scheme: "http", 359 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 360 Path: fmt.Sprintf("%s%s", prefix, resource), 361 RawQuery: req.URL.RawQuery, 362 } 363 364 //http: Request.RequestURI can't be set in client requests. 365 //http://golang.org/src/pkg/net/http/client.go 366 req.RequestURI = "" 367 368 resp, err := client.Do(req) 369 if err != nil { 370 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 371 return 372 } 373 defer resp.Body.Close() 374 375 copyHeader(w.Header(), resp.Header) 376 w.WriteHeader(resp.StatusCode) 377 io.Copy(w, resp.Body) 378 } 379} 380 381func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 382 return func(w http.ResponseWriter, req *http.Request) { 383 noslash := req.URL.Path[1:] 384 ct, ok := a.Mimes[noslash] 385 if ok { 386 w.Header().Set("content-type", ct) 387 } 388 fs.ServeHTTP(w, req) 389 } 390} 391 392func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 393 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr) 394 if err != nil { 395 return nil, err 396 } 397 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 398 host, _, err := net.SplitHostPort(req.Host) 399 if err != nil { 400 host = req.Host 401 } 402 u := req.URL 403 if tlsPort == "443" { 404 u.Host = host 405 } else { 406 u.Host = net.JoinHostPort(host, tlsPort) 407 } 408 u.Scheme = "https" 409 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 410 } 411 mux := http.NewServeMux() 412 mux.HandleFunc("/", handleRedirect) 413 return mux, nil 414} 415 416type NotificationPayload struct { 417 Token string `json:"token"` 418 RepoDID string `json:"repoDID"` 419} 420 421func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 422 return func(w http.ResponseWriter, req *http.Request) { 423 w.WriteHeader(404) 424 } 425} 426 427func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 428 return func(w http.ResponseWriter, req *http.Request) { 429 payload, err := io.ReadAll(req.Body) 430 if err != nil { 431 log.Log(ctx, "error reading notification create", "error", err) 432 w.WriteHeader(400) 433 return 434 } 435 n := NotificationPayload{} 436 err = json.Unmarshal(payload, &n) 437 if err != nil { 438 log.Log(ctx, "error unmarshalling notification create", "error", err) 439 w.WriteHeader(400) 440 return 441 } 442 err = a.Model.CreateNotification(n.Token, n.RepoDID) 443 if err != nil { 444 log.Log(ctx, "error creating notification", "error", err) 445 w.WriteHeader(400) 446 return 447 } 448 log.Log(ctx, "successfully created notification", "token", n.Token) 449 w.WriteHeader(200) 450 if n.RepoDID != "" { 451 go func() { 452 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 453 if err != nil { 454 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 455 } 456 }() 457 } 458 } 459} 460 461func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 462 return func(w http.ResponseWriter, req *http.Request) { 463 err := a.MediaManager.ValidateMP4(ctx, req.Body) 464 if err != nil { 465 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 466 return 467 } 468 w.WriteHeader(200) 469 } 470} 471 472func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 473 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 474 var event model.PlayerEventAPI 475 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 476 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 477 return 478 } 479 err := a.Model.CreatePlayerEvent(event) 480 if err != nil { 481 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 482 return 483 } 484 w.WriteHeader(201) 485 } 486} 487 488func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 489 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 490 segs, err := a.Model.MostRecentSegments() 491 if err != nil { 492 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 493 return 494 } 495 bs, err := json.Marshal(segs) 496 if err != nil { 497 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 498 return 499 } 500 w.Header().Add("Content-Type", "application/json") 501 w.Write(bs) 502 } 503} 504 505func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 506 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 507 user := params.ByName("repoDID") 508 if user == "" { 509 apierrors.WriteHTTPBadRequest(w, "user required", nil) 510 return 511 } 512 user, err := a.NormalizeUser(ctx, user) 513 if err != nil { 514 apierrors.WriteHTTPNotFound(w, "user not found", err) 515 return 516 } 517 seg, err := a.Model.LatestSegmentForUser(user) 518 if err != nil { 519 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 520 return 521 } 522 streamplaceSeg, err := seg.ToStreamplaceSegment() 523 if err != nil { 524 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 525 return 526 } 527 bs, err := json.Marshal(streamplaceSeg) 528 if err != nil { 529 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 530 return 531 } 532 w.Header().Add("Content-Type", "application/json") 533 w.Write(bs) 534 } 535} 536 537type LiveUsersResponse struct { 538 model.Segment 539 Viewers int `json:"viewers"` 540} 541 542func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle { 543 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 544 repos, err := a.Model.MostRecentSegments() 545 if err != nil { 546 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err) 547 return 548 } 549 liveUsers := []LiveUsersResponse{} 550 for _, repo := range repos { 551 viewers := spmetrics.GetViewCount(repo.RepoDID) 552 liveUsers = append(liveUsers, LiveUsersResponse{ 553 Segment: repo, 554 Viewers: viewers, 555 }) 556 } 557 bs, err := json.Marshal(liveUsers) 558 if err != nil { 559 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err) 560 return 561 } 562 w.Write(bs) 563 } 564} 565 566func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 567 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 568 user := params.ByName("user") 569 if user == "" { 570 apierrors.WriteHTTPBadRequest(w, "user required", nil) 571 return 572 } 573 user, err := a.NormalizeUser(ctx, user) 574 if err != nil { 575 apierrors.WriteHTTPNotFound(w, "user not found", err) 576 return 577 } 578 count := spmetrics.GetViewCount(user) 579 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 580 if err != nil { 581 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 582 return 583 } 584 w.Write(bs) 585 } 586} 587 588func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 589 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 590 log.Log(ctx, "got bluesky notification", "params", params) 591 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 592 if err != nil { 593 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 594 return 595 } 596 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 597 if err != nil { 598 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 599 return 600 } 601 bs, err := json.Marshal(signingKeys) 602 if err != nil { 603 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 604 return 605 } 606 w.Write(bs) 607 } 608} 609 610type ChatResponse struct { 611 Post *bsky.FeedPost `json:"post"` 612 Repo *model.Repo `json:"repo"` 613 CID string `json:"cid"` 614} 615 616func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 617 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 618 user := params.ByName("repoDID") 619 if user == "" { 620 apierrors.WriteHTTPBadRequest(w, "user required", nil) 621 return 622 } 623 repoDID, err := a.NormalizeUser(ctx, user) 624 if err != nil { 625 apierrors.WriteHTTPNotFound(w, "user not found", err) 626 return 627 } 628 replies, err := a.Model.GetReplies(repoDID) 629 if err != nil { 630 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 631 return 632 } 633 bs, err := json.Marshal(replies) 634 if err != nil { 635 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 636 return 637 } 638 w.Header().Set("Content-Type", "application/json") 639 w.Write(bs) 640 } 641} 642 643func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 644 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 645 user := params.ByName("repoDID") 646 if user == "" { 647 apierrors.WriteHTTPBadRequest(w, "user required", nil) 648 return 649 } 650 repoDID, err := a.NormalizeUser(ctx, user) 651 if err != nil { 652 apierrors.WriteHTTPNotFound(w, "user not found", err) 653 return 654 } 655 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 656 if err != nil { 657 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 658 return 659 } 660 661 doc, err := livestream.ToLivestreamView() 662 if err != nil { 663 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 664 return 665 } 666 667 if livestream == nil { 668 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 669 return 670 } 671 672 bs, err := json.Marshal(doc) 673 if err != nil { 674 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 675 return 676 } 677 w.Header().Set("Content-Type", "application/json") 678 w.Write(bs) 679 } 680} 681 682func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 683 return func(next http.Handler) http.Handler { 684 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 685 ip, _, err := net.SplitHostPort(req.RemoteAddr) 686 if err != nil { 687 ip = req.RemoteAddr 688 } 689 690 if a.CLI.RateLimitPerSecond > 0 { 691 limiter := a.getLimiter(ip) 692 693 if !limiter.Allow() { 694 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 695 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 696 return 697 } 698 } 699 700 next.ServeHTTP(w, req) 701 }) 702 } 703} 704 705func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 706 handler, err := a.Handler(ctx) 707 if err != nil { 708 return err 709 } 710 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 711 s.Addr = a.CLI.HttpAddr 712 log.Log(ctx, "http server starting", "addr", s.Addr) 713 return s.ListenAndServe() 714 }) 715} 716 717func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 718 handler, err := a.RedirectHandler(ctx) 719 if err != nil { 720 return err 721 } 722 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 723 s.Addr = a.CLI.HttpAddr 724 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 725 return s.ListenAndServe() 726 }) 727} 728 729func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 730 handler, err := a.Handler(ctx) 731 if err != nil { 732 return err 733 } 734 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 735 s.Addr = a.CLI.HttpsAddr 736 log.Log(ctx, "https server starting", 737 "addr", s.Addr, 738 "certPath", a.CLI.TLSCertPath, 739 "keyPath", a.CLI.TLSKeyPath, 740 ) 741 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 742 }) 743} 744 745func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 746 ctx, cancel := context.WithCancel(ctx) 747 handler = gziphandler.GzipHandler(handler) 748 server := http.Server{Handler: handler} 749 var serveErr error 750 go func() { 751 serveErr = serve(&server) 752 cancel() 753 }() 754 <-ctx.Done() 755 if serveErr != nil { 756 return fmt.Errorf("error in http server: %w", serveErr) 757 } 758 759 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 760 defer cancel() 761 return server.Shutdown(ctx) 762} 763 764func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 765 return func(w http.ResponseWriter, req *http.Request) { 766 w.WriteHeader(200) 767 } 768} 769 770func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 771 a.limitersMu.Lock() 772 defer a.limitersMu.Unlock() 773 774 limiter, exists := a.limiters[ip] 775 if !exists { 776 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 777 a.limiters[ip] = limiter 778 } 779 780 return limiter 781} 782 783func NewWebsocketTracker(maxConns int) *WebsocketTracker { 784 return &WebsocketTracker{ 785 connections: make(map[string]int), 786 maxConnsPerIP: maxConns, 787 } 788} 789 790func (t *WebsocketTracker) AddConnection(ip string) bool { 791 t.mu.Lock() 792 defer t.mu.Unlock() 793 794 count := t.connections[ip] 795 796 if count >= t.maxConnsPerIP { 797 return false 798 } 799 800 t.connections[ip] = count + 1 801 return true 802} 803 804func (t *WebsocketTracker) RemoveConnection(ip string) { 805 t.mu.Lock() 806 defer t.mu.Unlock() 807 808 count := t.connections[ip] 809 if count > 0 { 810 t.connections[ip] = count - 1 811 } 812 813 if t.connections[ip] == 0 { 814 delete(t.connections, ip) 815 } 816}