Live video on the AT Protocol
at natb/fix-fullscreen 819 lines 25 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/NYTimes/gziphandler" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/julienschmidt/httprouter" 22 "github.com/rs/cors" 23 sloghttp "github.com/samber/slog-http" 24 "golang.org/x/time/rate" 25 26 "stream.place/streamplace/js/app" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/config" 30 "stream.place/streamplace/pkg/crypto/signers/eip712" 31 "stream.place/streamplace/pkg/director" 32 apierrors "stream.place/streamplace/pkg/errors" 33 "stream.place/streamplace/pkg/linking" 34 "stream.place/streamplace/pkg/log" 35 "stream.place/streamplace/pkg/media" 36 "stream.place/streamplace/pkg/mist/mistconfig" 37 "stream.place/streamplace/pkg/model" 38 "stream.place/streamplace/pkg/notifications" 39 "stream.place/streamplace/pkg/oproxy" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/spxrpc" 42 "stream.place/streamplace/pkg/streamplace" 43) 44 45type StreamplaceAPI struct { 46 CLI *config.CLI 47 Model model.Model 48 Updater *Updater 49 Signer *eip712.EIP712Signer 50 Mimes map[string]string 51 FirebaseNotifier notifications.FirebaseNotifier 52 MediaManager *media.MediaManager 53 MediaSigner media.MediaSigner 54 // not thread-safe yet 55 Aliases map[string]string 56 Bus *bus.Bus 57 ATSync *atproto.ATProtoSynchronizer 58 Director *director.Director 59 60 connTracker *WebsocketTracker 61 62 limiters map[string]*rate.Limiter 63 limitersMu sync.Mutex 64 SignerCache map[string]media.MediaSigner 65 SignerCacheMu sync.Mutex 66} 67 68type WebsocketTracker struct { 69 connections map[string]int 70 maxConnsPerIP int 71 mu sync.RWMutex 72} 73 74func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director) (*StreamplaceAPI, error) { 75 updater, err := PrepareUpdater(cli) 76 if err != nil { 77 return nil, err 78 } 79 a := &StreamplaceAPI{CLI: cli, 80 Model: mod, 81 Updater: updater, 82 Signer: signer, 83 FirebaseNotifier: noter, 84 MediaManager: mm, 85 MediaSigner: ms, 86 Aliases: map[string]string{}, 87 Bus: bus, 88 ATSync: atsync, 89 Director: d, 90 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 91 limiters: make(map[string]*rate.Limiter), 92 SignerCache: make(map[string]media.MediaSigner), 93 } 94 a.Mimes, err = updater.GetMimes() 95 if err != nil { 96 return nil, err 97 } 98 return a, nil 99} 100 101type AppHostingFS struct { 102 http.FileSystem 103} 104 105var ErrorIndex = errors.New("not found, use index.html") 106 107func (fs AppHostingFS) Open(name string) (http.File, error) { 108 file, err1 := fs.FileSystem.Open(name) 109 if err1 == nil { 110 return file, nil 111 } 112 if !errors.Is(err1, os.ErrNotExist) { 113 return nil, err1 114 } 115 116 return nil, ErrorIndex 117} 118 119// api/playback/iame.li/webrtc?rendition=source 120// api/playback/iame.li/stream.mp4?rendition=source 121// api/playback/iame.li/stream.webm?rendition=source 122// api/playback/iame.li/hls/index.m3u8 123// api/playback/iame.li/hls/source/stream.m3u8 124// api/playback/iame.li/hls/source/000000000000.ts 125 126func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 127 var xrpc http.Handler 128 xrpc, err := spxrpc.NewServer(a.CLI, a.Model) 129 if err != nil { 130 return nil, err 131 } 132 op := oproxy.New(&oproxy.Config{ 133 Host: a.CLI.PublicHost, 134 CreateOAuthSession: a.Model.CreateOAuthSession, 135 UpdateOAuthSession: a.Model.UpdateOAuthSession, 136 LoadOAuthSession: a.Model.LoadOAuthSession, 137 Scope: "atproto transition:generic", 138 UpstreamJWK: a.CLI.JWK, 139 DownstreamJWK: a.CLI.AccessJWK, 140 }) 141 142 xrpc = op.OAuthMiddleware(xrpc) 143 router := httprouter.New() 144 router.Handler("GET", "/oauth/*anything", op.Handler()) 145 router.Handler("POST", "/oauth/*anything", op.Handler()) 146 router.Handler("GET", "/.well-known/oauth-authorization-server", op.Handler()) 147 router.Handler("GET", "/.well-known/oauth-protected-resource", op.Handler()) 148 apiRouter := httprouter.New() 149 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx)) 150 // old clients 151 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx)) 152 153 // new ones 154 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx)) 155 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 156 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 157 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 158 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 159 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx)) 160 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx)) 161 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 162 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 163 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 164 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility 165 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 166 // this one is not a lie 167 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 168 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx)) 169 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 170 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 171 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 172 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx)) 173 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx)) 174 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 175 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx)) 176 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx)) 177 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 178 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 179 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx)) 180 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx)) 181 apiRouter.NotFound = a.HandleAPI404(ctx) 182 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 183 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 184 router.Handler("GET", "/api/*resource", apiRouterHandler) 185 router.Handler("POST", "/api/*resource", apiRouterHandler) 186 router.Handler("PUT", "/api/*resource", apiRouterHandler) 187 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 188 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 189 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 190 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 191 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 192 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 193 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 194 router.GET("/.well-known/did.json", a.HandleDidJson(ctx)) 195 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 196 router.POST("/", a.HandleWebRTCIngest(ctx)) 197 for _, redirect := range a.CLI.Redirects { 198 parts := strings.Split(redirect, ":") 199 if len(parts) != 2 { 200 log.Error(ctx, "invalid redirect", "redirect", redirect) 201 return nil, fmt.Errorf("invalid redirect: %s", redirect) 202 } 203 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 204 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 205 }) 206 } 207 if a.CLI.FrontendProxy != "" { 208 u, err := url.Parse(a.CLI.FrontendProxy) 209 if err != nil { 210 return nil, err 211 } 212 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 213 router.NotFound = &httputil.ReverseProxy{ 214 Rewrite: func(r *httputil.ProxyRequest) { 215 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 216 // we need to use 127.0.0.1 because the atproto oauth client requires it 217 r.Out.Header.Set("Origin", u.String()) 218 r.SetURL(u) 219 }, 220 } 221 } else { 222 files, err := app.Files() 223 if err != nil { 224 return nil, err 225 } 226 index, err := files.Open("index.html") 227 if err != nil { 228 return nil, err 229 } 230 bs, err := io.ReadAll(index) 231 if err != nil { 232 return nil, err 233 } 234 linker, err := linking.NewLinker(ctx, bs) 235 if err != nil { 236 return nil, err 237 } 238 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 239 if err != nil { 240 return nil, err 241 } 242 router.NotFound = linkingHandler 243 } 244 // needed because the WebRTC handler issues 405s from / otherwise 245 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 246 router.NotFound.ServeHTTP(w, r) 247 }) 248 handler := sloghttp.Recovery(router) 249 handler = cors.AllowAll().Handler(handler) 250 handler = sloghttp.New(slog.Default())(handler) 251 handler = a.RateLimitMiddleware(ctx)(handler) 252 253 return handler, nil 254} 255 256func copyHeader(dst, src http.Header) { 257 for k, vv := range src { 258 // we'll handle CORS ourselves, thanks 259 if strings.HasPrefix(k, "Access-Control") { 260 continue 261 } 262 for _, v := range vv { 263 dst.Add(k, v) 264 } 265 } 266} 267 268// handler that takes care of static files and otherwise returns the index.html with the correct link card data 269func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 270 files, err := app.Files() 271 if err != nil { 272 return nil, err 273 } 274 fs := AppHostingFS{http.FS(files)} 275 276 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 277 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 278 f := strings.TrimPrefix(req.URL.Path, "/") 279 // under docs we need the index.html suffix due to astro rendering 280 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 281 f += "index.html" 282 } 283 _, err := fs.Open(f) 284 if err == nil { 285 fileHandler.ServeHTTP(w, req) 286 return 287 } 288 if errors.Is(err, ErrorIndex) || f == "" { 289 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 290 if err != nil { 291 log.Error(ctx, "error generating default card", "error", err) 292 } 293 w.Header().Set("Content-Type", "text/html") 294 w.Write(bs) 295 } else { 296 log.Warn(ctx, "error opening file", "error", err) 297 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 298 } 299 }) 300 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 301 proto := "http" 302 if req.TLS != nil { 303 proto = "https" 304 } 305 fwProto := req.Header.Get("x-forwarded-proto") 306 if fwProto != "" { 307 proto = fwProto 308 } 309 req.URL.Host = req.Host 310 req.URL.Scheme = proto 311 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 312 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 313 if err != nil || repo == nil { 314 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 315 defaultHandler.ServeHTTP(w, req) 316 return 317 } 318 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 319 if err != nil || ls == nil { 320 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 321 defaultHandler.ServeHTTP(w, req) 322 return 323 } 324 lsv, err := ls.ToLivestreamView() 325 if err != nil || lsv == nil { 326 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 327 defaultHandler.ServeHTTP(w, req) 328 return 329 } 330 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 331 if err != nil { 332 log.Error(ctx, "error generating html", "error", err) 333 defaultHandler.ServeHTTP(w, req) 334 return 335 } 336 w.Header().Set("Content-Type", "text/html") 337 w.Write(bs) 338 }), nil 339} 340 341func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 342 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 343 if !a.CLI.HasMist() { 344 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 345 return 346 } 347 stream := params.ByName("stream") 348 if stream == "" { 349 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 350 return 351 } 352 353 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream) 354 prefix := fmt.Sprintf(tmpl, fullstream) 355 resource := params.ByName("resource") 356 357 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 358 359 client := &http.Client{} 360 req.URL = &url.URL{ 361 Scheme: "http", 362 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 363 Path: fmt.Sprintf("%s%s", prefix, resource), 364 RawQuery: req.URL.RawQuery, 365 } 366 367 //http: Request.RequestURI can't be set in client requests. 368 //http://golang.org/src/pkg/net/http/client.go 369 req.RequestURI = "" 370 371 resp, err := client.Do(req) 372 if err != nil { 373 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 374 return 375 } 376 defer resp.Body.Close() 377 378 copyHeader(w.Header(), resp.Header) 379 w.WriteHeader(resp.StatusCode) 380 io.Copy(w, resp.Body) 381 } 382} 383 384func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 385 return func(w http.ResponseWriter, req *http.Request) { 386 noslash := req.URL.Path[1:] 387 ct, ok := a.Mimes[noslash] 388 if ok { 389 w.Header().Set("content-type", ct) 390 } 391 fs.ServeHTTP(w, req) 392 } 393} 394 395func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 396 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr) 397 if err != nil { 398 return nil, err 399 } 400 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 401 host, _, err := net.SplitHostPort(req.Host) 402 if err != nil { 403 host = req.Host 404 } 405 u := req.URL 406 if tlsPort == "443" { 407 u.Host = host 408 } else { 409 u.Host = net.JoinHostPort(host, tlsPort) 410 } 411 u.Scheme = "https" 412 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 413 } 414 mux := http.NewServeMux() 415 mux.HandleFunc("/", handleRedirect) 416 return mux, nil 417} 418 419type NotificationPayload struct { 420 Token string `json:"token"` 421 RepoDID string `json:"repoDID"` 422} 423 424func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 425 return func(w http.ResponseWriter, req *http.Request) { 426 w.WriteHeader(404) 427 } 428} 429 430func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 431 return func(w http.ResponseWriter, req *http.Request) { 432 payload, err := io.ReadAll(req.Body) 433 if err != nil { 434 log.Log(ctx, "error reading notification create", "error", err) 435 w.WriteHeader(400) 436 return 437 } 438 n := NotificationPayload{} 439 err = json.Unmarshal(payload, &n) 440 if err != nil { 441 log.Log(ctx, "error unmarshalling notification create", "error", err) 442 w.WriteHeader(400) 443 return 444 } 445 err = a.Model.CreateNotification(n.Token, n.RepoDID) 446 if err != nil { 447 log.Log(ctx, "error creating notification", "error", err) 448 w.WriteHeader(400) 449 return 450 } 451 log.Log(ctx, "successfully created notification", "token", n.Token) 452 w.WriteHeader(200) 453 if n.RepoDID != "" { 454 go func() { 455 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 456 if err != nil { 457 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 458 } 459 }() 460 } 461 } 462} 463 464func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 465 return func(w http.ResponseWriter, req *http.Request) { 466 err := a.MediaManager.ValidateMP4(ctx, req.Body) 467 if err != nil { 468 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 469 return 470 } 471 w.WriteHeader(200) 472 } 473} 474 475func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 476 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 477 var event model.PlayerEventAPI 478 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 479 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 480 return 481 } 482 err := a.Model.CreatePlayerEvent(event) 483 if err != nil { 484 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 485 return 486 } 487 w.WriteHeader(201) 488 } 489} 490 491func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 492 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 493 segs, err := a.Model.MostRecentSegments() 494 if err != nil { 495 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 496 return 497 } 498 bs, err := json.Marshal(segs) 499 if err != nil { 500 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 501 return 502 } 503 w.Header().Add("Content-Type", "application/json") 504 w.Write(bs) 505 } 506} 507 508func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 509 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 510 user := params.ByName("repoDID") 511 if user == "" { 512 apierrors.WriteHTTPBadRequest(w, "user required", nil) 513 return 514 } 515 user, err := a.NormalizeUser(ctx, user) 516 if err != nil { 517 apierrors.WriteHTTPNotFound(w, "user not found", err) 518 return 519 } 520 seg, err := a.Model.LatestSegmentForUser(user) 521 if err != nil { 522 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 523 return 524 } 525 streamplaceSeg, err := seg.ToStreamplaceSegment() 526 if err != nil { 527 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 528 return 529 } 530 bs, err := json.Marshal(streamplaceSeg) 531 if err != nil { 532 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 533 return 534 } 535 w.Header().Add("Content-Type", "application/json") 536 w.Write(bs) 537 } 538} 539 540type LiveUsersResponse struct { 541 model.Segment 542 Viewers int `json:"viewers"` 543} 544 545func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle { 546 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 547 repos, err := a.Model.MostRecentSegments() 548 if err != nil { 549 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err) 550 return 551 } 552 liveUsers := []LiveUsersResponse{} 553 for _, repo := range repos { 554 viewers := spmetrics.GetViewCount(repo.RepoDID) 555 liveUsers = append(liveUsers, LiveUsersResponse{ 556 Segment: repo, 557 Viewers: viewers, 558 }) 559 } 560 bs, err := json.Marshal(liveUsers) 561 if err != nil { 562 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err) 563 return 564 } 565 w.Write(bs) 566 } 567} 568 569func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 570 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 571 user := params.ByName("user") 572 if user == "" { 573 apierrors.WriteHTTPBadRequest(w, "user required", nil) 574 return 575 } 576 user, err := a.NormalizeUser(ctx, user) 577 if err != nil { 578 apierrors.WriteHTTPNotFound(w, "user not found", err) 579 return 580 } 581 count := spmetrics.GetViewCount(user) 582 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 583 if err != nil { 584 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 585 return 586 } 587 w.Write(bs) 588 } 589} 590 591func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 592 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 593 log.Log(ctx, "got bluesky notification", "params", params) 594 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 595 if err != nil { 596 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 597 return 598 } 599 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 600 if err != nil { 601 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 602 return 603 } 604 bs, err := json.Marshal(signingKeys) 605 if err != nil { 606 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 607 return 608 } 609 w.Write(bs) 610 } 611} 612 613type ChatResponse struct { 614 Post *bsky.FeedPost `json:"post"` 615 Repo *model.Repo `json:"repo"` 616 CID string `json:"cid"` 617} 618 619func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 620 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 621 user := params.ByName("repoDID") 622 if user == "" { 623 apierrors.WriteHTTPBadRequest(w, "user required", nil) 624 return 625 } 626 repoDID, err := a.NormalizeUser(ctx, user) 627 if err != nil { 628 apierrors.WriteHTTPNotFound(w, "user not found", err) 629 return 630 } 631 replies, err := a.Model.GetReplies(repoDID) 632 if err != nil { 633 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 634 return 635 } 636 bs, err := json.Marshal(replies) 637 if err != nil { 638 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 639 return 640 } 641 w.Header().Set("Content-Type", "application/json") 642 w.Write(bs) 643 } 644} 645 646func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 647 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 648 user := params.ByName("repoDID") 649 if user == "" { 650 apierrors.WriteHTTPBadRequest(w, "user required", nil) 651 return 652 } 653 repoDID, err := a.NormalizeUser(ctx, user) 654 if err != nil { 655 apierrors.WriteHTTPNotFound(w, "user not found", err) 656 return 657 } 658 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 659 if err != nil { 660 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 661 return 662 } 663 664 doc, err := livestream.ToLivestreamView() 665 if err != nil { 666 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 667 return 668 } 669 670 if livestream == nil { 671 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 672 return 673 } 674 675 bs, err := json.Marshal(doc) 676 if err != nil { 677 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 678 return 679 } 680 w.Header().Set("Content-Type", "application/json") 681 w.Write(bs) 682 } 683} 684 685func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 686 return func(next http.Handler) http.Handler { 687 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 688 ip, _, err := net.SplitHostPort(req.RemoteAddr) 689 if err != nil { 690 ip = req.RemoteAddr 691 } 692 693 if a.CLI.RateLimitPerSecond > 0 { 694 limiter := a.getLimiter(ip) 695 696 if !limiter.Allow() { 697 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 698 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 699 return 700 } 701 } 702 703 next.ServeHTTP(w, req) 704 }) 705 } 706} 707 708func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 709 handler, err := a.Handler(ctx) 710 if err != nil { 711 return err 712 } 713 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 714 s.Addr = a.CLI.HttpAddr 715 log.Log(ctx, "http server starting", "addr", s.Addr) 716 return s.ListenAndServe() 717 }) 718} 719 720func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 721 handler, err := a.RedirectHandler(ctx) 722 if err != nil { 723 return err 724 } 725 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 726 s.Addr = a.CLI.HttpAddr 727 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 728 return s.ListenAndServe() 729 }) 730} 731 732func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 733 handler, err := a.Handler(ctx) 734 if err != nil { 735 return err 736 } 737 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 738 s.Addr = a.CLI.HttpsAddr 739 log.Log(ctx, "https server starting", 740 "addr", s.Addr, 741 "certPath", a.CLI.TLSCertPath, 742 "keyPath", a.CLI.TLSKeyPath, 743 ) 744 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 745 }) 746} 747 748func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 749 ctx, cancel := context.WithCancel(ctx) 750 handler = gziphandler.GzipHandler(handler) 751 server := http.Server{Handler: handler} 752 var serveErr error 753 go func() { 754 serveErr = serve(&server) 755 cancel() 756 }() 757 <-ctx.Done() 758 if serveErr != nil { 759 return fmt.Errorf("error in http server: %w", serveErr) 760 } 761 762 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 763 defer cancel() 764 return server.Shutdown(ctx) 765} 766 767func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 768 return func(w http.ResponseWriter, req *http.Request) { 769 w.WriteHeader(200) 770 } 771} 772 773func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 774 a.limitersMu.Lock() 775 defer a.limitersMu.Unlock() 776 777 limiter, exists := a.limiters[ip] 778 if !exists { 779 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 780 a.limiters[ip] = limiter 781 } 782 783 return limiter 784} 785 786func NewWebsocketTracker(maxConns int) *WebsocketTracker { 787 return &WebsocketTracker{ 788 connections: make(map[string]int), 789 maxConnsPerIP: maxConns, 790 } 791} 792 793func (t *WebsocketTracker) AddConnection(ip string) bool { 794 t.mu.Lock() 795 defer t.mu.Unlock() 796 797 count := t.connections[ip] 798 799 if count >= t.maxConnsPerIP { 800 return false 801 } 802 803 t.connections[ip] = count + 1 804 return true 805} 806 807func (t *WebsocketTracker) RemoveConnection(ip string) { 808 t.mu.Lock() 809 defer t.mu.Unlock() 810 811 count := t.connections[ip] 812 if count > 0 { 813 t.connections[ip] = count - 1 814 } 815 816 if t.connections[ip] == 0 { 817 delete(t.connections, ip) 818 } 819}