Live video on the AT Protocol
at eli/oauth-in-dev-stinks 827 lines 26 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/NYTimes/gziphandler" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/julienschmidt/httprouter" 22 "github.com/rs/cors" 23 sloghttp "github.com/samber/slog-http" 24 "golang.org/x/time/rate" 25 26 "stream.place/streamplace/js/app" 27 "stream.place/streamplace/pkg/atproto" 28 "stream.place/streamplace/pkg/bus" 29 "stream.place/streamplace/pkg/config" 30 "stream.place/streamplace/pkg/crypto/signers/eip712" 31 "stream.place/streamplace/pkg/director" 32 apierrors "stream.place/streamplace/pkg/errors" 33 "stream.place/streamplace/pkg/linking" 34 "stream.place/streamplace/pkg/log" 35 "stream.place/streamplace/pkg/media" 36 "stream.place/streamplace/pkg/mist/mistconfig" 37 "stream.place/streamplace/pkg/model" 38 "stream.place/streamplace/pkg/notifications" 39 "stream.place/streamplace/pkg/oproxy" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/spxrpc" 42 "stream.place/streamplace/pkg/streamplace" 43) 44 45type StreamplaceAPI struct { 46 CLI *config.CLI 47 Model model.Model 48 Updater *Updater 49 Signer *eip712.EIP712Signer 50 Mimes map[string]string 51 FirebaseNotifier notifications.FirebaseNotifier 52 MediaManager *media.MediaManager 53 MediaSigner media.MediaSigner 54 // not thread-safe yet 55 Aliases map[string]string 56 Bus *bus.Bus 57 ATSync *atproto.ATProtoSynchronizer 58 Director *director.Director 59 60 connTracker *WebsocketTracker 61 62 limiters map[string]*rate.Limiter 63 limitersMu sync.Mutex 64} 65 66type WebsocketTracker struct { 67 connections map[string]int 68 maxConnsPerIP int 69 mu sync.RWMutex 70} 71 72func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director) (*StreamplaceAPI, error) { 73 updater, err := PrepareUpdater(cli) 74 if err != nil { 75 return nil, err 76 } 77 a := &StreamplaceAPI{CLI: cli, 78 Model: mod, 79 Updater: updater, 80 Signer: signer, 81 FirebaseNotifier: noter, 82 MediaManager: mm, 83 MediaSigner: ms, 84 Aliases: map[string]string{}, 85 Bus: bus, 86 ATSync: atsync, 87 Director: d, 88 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 89 limiters: make(map[string]*rate.Limiter), 90 } 91 a.Mimes, err = updater.GetMimes() 92 if err != nil { 93 return nil, err 94 } 95 return a, nil 96} 97 98type AppHostingFS struct { 99 http.FileSystem 100} 101 102var ErrorIndex = errors.New("not found, use index.html") 103 104func (fs AppHostingFS) Open(name string) (http.File, error) { 105 file, err1 := fs.FileSystem.Open(name) 106 if err1 == nil { 107 return file, nil 108 } 109 if !errors.Is(err1, os.ErrNotExist) { 110 return nil, err1 111 } 112 113 return nil, ErrorIndex 114} 115 116// api/playback/iame.li/webrtc?rendition=source 117// api/playback/iame.li/stream.mp4?rendition=source 118// api/playback/iame.li/stream.webm?rendition=source 119// api/playback/iame.li/hls/index.m3u8 120// api/playback/iame.li/hls/source/stream.m3u8 121// api/playback/iame.li/hls/source/000000000000.ts 122 123func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 124 var xrpc http.Handler 125 xrpc, err := spxrpc.NewServer(a.CLI, a.Model) 126 if err != nil { 127 return nil, err 128 } 129 130 clientMetadata := &oproxy.OAuthClientMetadata{ 131 ClientName: "Streamplace", 132 ClientURI: "https://stream.place", 133 LogoURI: "https://stream.place/logo.png", 134 RedirectURIs: []string{"https://fairway.iameli.link/login", "https://fairway.iameli.link/api/app-return"}, 135 } 136 137 // RedirectURIs: []string{"http://127.0.0.1:38080/login", "https://%s/api/app-return"}, 138 op := oproxy.New(&oproxy.Config{ 139 Host: a.CLI.PublicHost, 140 CreateOAuthSession: a.Model.CreateOAuthSession, 141 UpdateOAuthSession: a.Model.UpdateOAuthSession, 142 LoadOAuthSession: a.Model.LoadOAuthSession, 143 Scope: "atproto transition:generic", 144 UpstreamJWK: a.CLI.JWK, 145 DownstreamJWK: a.CLI.AccessJWK, 146 ClientMetadata: clientMetadata, 147 AdditionalRedirectURIs: []string{"http://127.0.0.1:38080/login"}, 148 }) 149 150 xrpc = op.OAuthMiddleware(xrpc) 151 router := httprouter.New() 152 router.Handler("GET", "/oauth/*anything", op.Handler()) 153 router.Handler("POST", "/oauth/*anything", op.Handler()) 154 router.Handler("GET", "/.well-known/oauth-authorization-server", op.Handler()) 155 router.Handler("GET", "/.well-known/oauth-protected-resource", op.Handler()) 156 apiRouter := httprouter.New() 157 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx)) 158 // old clients 159 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx)) 160 161 // new ones 162 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx)) 163 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 164 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 165 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 166 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 167 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx)) 168 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx)) 169 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 170 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 171 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 172 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility 173 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 174 // this one is not a lie 175 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 176 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx)) 177 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 178 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 179 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 180 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx)) 181 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx)) 182 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 183 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx)) 184 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx)) 185 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 186 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 187 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx)) 188 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx)) 189 apiRouter.NotFound = a.HandleAPI404(ctx) 190 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 191 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 192 router.Handler("GET", "/api/*resource", apiRouterHandler) 193 router.Handler("POST", "/api/*resource", apiRouterHandler) 194 router.Handler("PUT", "/api/*resource", apiRouterHandler) 195 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 196 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 197 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 198 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 199 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 200 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 201 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 202 router.GET("/.well-known/did.json", a.HandleDidJson(ctx)) 203 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 204 router.POST("/", a.HandleWebRTCIngest(ctx)) 205 for _, redirect := range a.CLI.Redirects { 206 parts := strings.Split(redirect, ":") 207 if len(parts) != 2 { 208 log.Error(ctx, "invalid redirect", "redirect", redirect) 209 return nil, fmt.Errorf("invalid redirect: %s", redirect) 210 } 211 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 212 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 213 }) 214 } 215 if a.CLI.FrontendProxy != "" { 216 u, err := url.Parse(a.CLI.FrontendProxy) 217 if err != nil { 218 return nil, err 219 } 220 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 221 router.NotFound = &httputil.ReverseProxy{ 222 Rewrite: func(r *httputil.ProxyRequest) { 223 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 224 // we need to use 127.0.0.1 because the atproto oauth client requires it 225 r.Out.Header.Set("Origin", u.String()) 226 r.SetURL(u) 227 }, 228 } 229 } else { 230 files, err := app.Files() 231 if err != nil { 232 return nil, err 233 } 234 index, err := files.Open("index.html") 235 if err != nil { 236 return nil, err 237 } 238 bs, err := io.ReadAll(index) 239 if err != nil { 240 return nil, err 241 } 242 linker, err := linking.NewLinker(ctx, bs) 243 if err != nil { 244 return nil, err 245 } 246 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 247 if err != nil { 248 return nil, err 249 } 250 router.NotFound = linkingHandler 251 } 252 // needed because the WebRTC handler issues 405s from / otherwise 253 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 254 router.NotFound.ServeHTTP(w, r) 255 }) 256 handler := sloghttp.Recovery(router) 257 handler = cors.AllowAll().Handler(handler) 258 handler = sloghttp.New(slog.Default())(handler) 259 handler = a.RateLimitMiddleware(ctx)(handler) 260 261 return handler, nil 262} 263 264func copyHeader(dst, src http.Header) { 265 for k, vv := range src { 266 // we'll handle CORS ourselves, thanks 267 if strings.HasPrefix(k, "Access-Control") { 268 continue 269 } 270 for _, v := range vv { 271 dst.Add(k, v) 272 } 273 } 274} 275 276// handler that takes care of static files and otherwise returns the index.html with the correct link card data 277func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 278 files, err := app.Files() 279 if err != nil { 280 return nil, err 281 } 282 fs := AppHostingFS{http.FS(files)} 283 284 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 285 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 286 f := strings.TrimPrefix(req.URL.Path, "/") 287 // under docs we need the index.html suffix due to astro rendering 288 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 289 f += "index.html" 290 } 291 _, err := fs.Open(f) 292 if err == nil { 293 fileHandler.ServeHTTP(w, req) 294 return 295 } 296 if errors.Is(err, ErrorIndex) || f == "" { 297 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 298 if err != nil { 299 log.Error(ctx, "error generating default card", "error", err) 300 } 301 w.Header().Set("Content-Type", "text/html") 302 w.Write(bs) 303 } else { 304 log.Warn(ctx, "error opening file", "error", err) 305 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 306 } 307 }) 308 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 309 proto := "http" 310 if req.TLS != nil { 311 proto = "https" 312 } 313 fwProto := req.Header.Get("x-forwarded-proto") 314 if fwProto != "" { 315 proto = fwProto 316 } 317 req.URL.Host = req.Host 318 req.URL.Scheme = proto 319 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 320 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 321 if err != nil || repo == nil { 322 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 323 defaultHandler.ServeHTTP(w, req) 324 return 325 } 326 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 327 if err != nil || ls == nil { 328 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 329 defaultHandler.ServeHTTP(w, req) 330 return 331 } 332 lsv, err := ls.ToLivestreamView() 333 if err != nil || lsv == nil { 334 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 335 defaultHandler.ServeHTTP(w, req) 336 return 337 } 338 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 339 if err != nil { 340 log.Error(ctx, "error generating html", "error", err) 341 defaultHandler.ServeHTTP(w, req) 342 return 343 } 344 w.Header().Set("Content-Type", "text/html") 345 w.Write(bs) 346 }), nil 347} 348 349func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 350 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 351 if !a.CLI.HasMist() { 352 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 353 return 354 } 355 stream := params.ByName("stream") 356 if stream == "" { 357 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 358 return 359 } 360 361 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream) 362 prefix := fmt.Sprintf(tmpl, fullstream) 363 resource := params.ByName("resource") 364 365 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 366 367 client := &http.Client{} 368 req.URL = &url.URL{ 369 Scheme: "http", 370 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 371 Path: fmt.Sprintf("%s%s", prefix, resource), 372 RawQuery: req.URL.RawQuery, 373 } 374 375 //http: Request.RequestURI can't be set in client requests. 376 //http://golang.org/src/pkg/net/http/client.go 377 req.RequestURI = "" 378 379 resp, err := client.Do(req) 380 if err != nil { 381 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 382 return 383 } 384 defer resp.Body.Close() 385 386 copyHeader(w.Header(), resp.Header) 387 w.WriteHeader(resp.StatusCode) 388 io.Copy(w, resp.Body) 389 } 390} 391 392func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 393 return func(w http.ResponseWriter, req *http.Request) { 394 noslash := req.URL.Path[1:] 395 ct, ok := a.Mimes[noslash] 396 if ok { 397 w.Header().Set("content-type", ct) 398 } 399 fs.ServeHTTP(w, req) 400 } 401} 402 403func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 404 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr) 405 if err != nil { 406 return nil, err 407 } 408 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 409 host, _, err := net.SplitHostPort(req.Host) 410 if err != nil { 411 host = req.Host 412 } 413 u := req.URL 414 if tlsPort == "443" { 415 u.Host = host 416 } else { 417 u.Host = net.JoinHostPort(host, tlsPort) 418 } 419 u.Scheme = "https" 420 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 421 } 422 mux := http.NewServeMux() 423 mux.HandleFunc("/", handleRedirect) 424 return mux, nil 425} 426 427type NotificationPayload struct { 428 Token string `json:"token"` 429 RepoDID string `json:"repoDID"` 430} 431 432func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 433 return func(w http.ResponseWriter, req *http.Request) { 434 w.WriteHeader(404) 435 } 436} 437 438func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 439 return func(w http.ResponseWriter, req *http.Request) { 440 payload, err := io.ReadAll(req.Body) 441 if err != nil { 442 log.Log(ctx, "error reading notification create", "error", err) 443 w.WriteHeader(400) 444 return 445 } 446 n := NotificationPayload{} 447 err = json.Unmarshal(payload, &n) 448 if err != nil { 449 log.Log(ctx, "error unmarshalling notification create", "error", err) 450 w.WriteHeader(400) 451 return 452 } 453 err = a.Model.CreateNotification(n.Token, n.RepoDID) 454 if err != nil { 455 log.Log(ctx, "error creating notification", "error", err) 456 w.WriteHeader(400) 457 return 458 } 459 log.Log(ctx, "successfully created notification", "token", n.Token) 460 w.WriteHeader(200) 461 if n.RepoDID != "" { 462 go func() { 463 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 464 if err != nil { 465 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 466 } 467 }() 468 } 469 } 470} 471 472func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 473 return func(w http.ResponseWriter, req *http.Request) { 474 err := a.MediaManager.ValidateMP4(ctx, req.Body) 475 if err != nil { 476 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 477 return 478 } 479 w.WriteHeader(200) 480 } 481} 482 483func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 484 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 485 var event model.PlayerEventAPI 486 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 487 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 488 return 489 } 490 err := a.Model.CreatePlayerEvent(event) 491 if err != nil { 492 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 493 return 494 } 495 w.WriteHeader(201) 496 } 497} 498 499func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 500 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 501 segs, err := a.Model.MostRecentSegments() 502 if err != nil { 503 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 504 return 505 } 506 bs, err := json.Marshal(segs) 507 if err != nil { 508 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 509 return 510 } 511 w.Header().Add("Content-Type", "application/json") 512 w.Write(bs) 513 } 514} 515 516func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 517 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 518 user := params.ByName("repoDID") 519 if user == "" { 520 apierrors.WriteHTTPBadRequest(w, "user required", nil) 521 return 522 } 523 user, err := a.NormalizeUser(ctx, user) 524 if err != nil { 525 apierrors.WriteHTTPNotFound(w, "user not found", err) 526 return 527 } 528 seg, err := a.Model.LatestSegmentForUser(user) 529 if err != nil { 530 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 531 return 532 } 533 streamplaceSeg, err := seg.ToStreamplaceSegment() 534 if err != nil { 535 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 536 return 537 } 538 bs, err := json.Marshal(streamplaceSeg) 539 if err != nil { 540 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 541 return 542 } 543 w.Header().Add("Content-Type", "application/json") 544 w.Write(bs) 545 } 546} 547 548type LiveUsersResponse struct { 549 model.Segment 550 Viewers int `json:"viewers"` 551} 552 553func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle { 554 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 555 repos, err := a.Model.MostRecentSegments() 556 if err != nil { 557 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err) 558 return 559 } 560 liveUsers := []LiveUsersResponse{} 561 for _, repo := range repos { 562 viewers := spmetrics.GetViewCount(repo.RepoDID) 563 liveUsers = append(liveUsers, LiveUsersResponse{ 564 Segment: repo, 565 Viewers: viewers, 566 }) 567 } 568 bs, err := json.Marshal(liveUsers) 569 if err != nil { 570 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err) 571 return 572 } 573 w.Write(bs) 574 } 575} 576 577func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 578 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 579 user := params.ByName("user") 580 if user == "" { 581 apierrors.WriteHTTPBadRequest(w, "user required", nil) 582 return 583 } 584 user, err := a.NormalizeUser(ctx, user) 585 if err != nil { 586 apierrors.WriteHTTPNotFound(w, "user not found", err) 587 return 588 } 589 count := spmetrics.GetViewCount(user) 590 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 591 if err != nil { 592 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 593 return 594 } 595 w.Write(bs) 596 } 597} 598 599func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 600 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 601 log.Log(ctx, "got bluesky notification", "params", params) 602 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 603 if err != nil { 604 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 605 return 606 } 607 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 608 if err != nil { 609 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 610 return 611 } 612 bs, err := json.Marshal(signingKeys) 613 if err != nil { 614 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 615 return 616 } 617 w.Write(bs) 618 } 619} 620 621type ChatResponse struct { 622 Post *bsky.FeedPost `json:"post"` 623 Repo *model.Repo `json:"repo"` 624 CID string `json:"cid"` 625} 626 627func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 628 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 629 user := params.ByName("repoDID") 630 if user == "" { 631 apierrors.WriteHTTPBadRequest(w, "user required", nil) 632 return 633 } 634 repoDID, err := a.NormalizeUser(ctx, user) 635 if err != nil { 636 apierrors.WriteHTTPNotFound(w, "user not found", err) 637 return 638 } 639 replies, err := a.Model.GetReplies(repoDID) 640 if err != nil { 641 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 642 return 643 } 644 bs, err := json.Marshal(replies) 645 if err != nil { 646 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 647 return 648 } 649 w.Header().Set("Content-Type", "application/json") 650 w.Write(bs) 651 } 652} 653 654func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 655 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 656 user := params.ByName("repoDID") 657 if user == "" { 658 apierrors.WriteHTTPBadRequest(w, "user required", nil) 659 return 660 } 661 repoDID, err := a.NormalizeUser(ctx, user) 662 if err != nil { 663 apierrors.WriteHTTPNotFound(w, "user not found", err) 664 return 665 } 666 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 667 if err != nil { 668 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 669 return 670 } 671 672 doc, err := livestream.ToLivestreamView() 673 if err != nil { 674 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 675 return 676 } 677 678 if livestream == nil { 679 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 680 return 681 } 682 683 bs, err := json.Marshal(doc) 684 if err != nil { 685 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 686 return 687 } 688 w.Header().Set("Content-Type", "application/json") 689 w.Write(bs) 690 } 691} 692 693func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 694 return func(next http.Handler) http.Handler { 695 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 696 ip, _, err := net.SplitHostPort(req.RemoteAddr) 697 if err != nil { 698 ip = req.RemoteAddr 699 } 700 701 if a.CLI.RateLimitPerSecond > 0 { 702 limiter := a.getLimiter(ip) 703 704 if !limiter.Allow() { 705 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 706 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 707 return 708 } 709 } 710 711 next.ServeHTTP(w, req) 712 }) 713 } 714} 715 716func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 717 handler, err := a.Handler(ctx) 718 if err != nil { 719 return err 720 } 721 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 722 s.Addr = a.CLI.HttpAddr 723 log.Log(ctx, "http server starting", "addr", s.Addr) 724 return s.ListenAndServe() 725 }) 726} 727 728func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 729 handler, err := a.RedirectHandler(ctx) 730 if err != nil { 731 return err 732 } 733 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 734 s.Addr = a.CLI.HttpAddr 735 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 736 return s.ListenAndServe() 737 }) 738} 739 740func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 741 handler, err := a.Handler(ctx) 742 if err != nil { 743 return err 744 } 745 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 746 s.Addr = a.CLI.HttpsAddr 747 log.Log(ctx, "https server starting", 748 "addr", s.Addr, 749 "certPath", a.CLI.TLSCertPath, 750 "keyPath", a.CLI.TLSKeyPath, 751 ) 752 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 753 }) 754} 755 756func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 757 ctx, cancel := context.WithCancel(ctx) 758 handler = gziphandler.GzipHandler(handler) 759 server := http.Server{Handler: handler} 760 var serveErr error 761 go func() { 762 serveErr = serve(&server) 763 cancel() 764 }() 765 <-ctx.Done() 766 if serveErr != nil { 767 return fmt.Errorf("error in http server: %w", serveErr) 768 } 769 770 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 771 defer cancel() 772 return server.Shutdown(ctx) 773} 774 775func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 776 return func(w http.ResponseWriter, req *http.Request) { 777 w.WriteHeader(200) 778 } 779} 780 781func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 782 a.limitersMu.Lock() 783 defer a.limitersMu.Unlock() 784 785 limiter, exists := a.limiters[ip] 786 if !exists { 787 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 788 a.limiters[ip] = limiter 789 } 790 791 return limiter 792} 793 794func NewWebsocketTracker(maxConns int) *WebsocketTracker { 795 return &WebsocketTracker{ 796 connections: make(map[string]int), 797 maxConnsPerIP: maxConns, 798 } 799} 800 801func (t *WebsocketTracker) AddConnection(ip string) bool { 802 t.mu.Lock() 803 defer t.mu.Unlock() 804 805 count := t.connections[ip] 806 807 if count >= t.maxConnsPerIP { 808 return false 809 } 810 811 t.connections[ip] = count + 1 812 return true 813} 814 815func (t *WebsocketTracker) RemoveConnection(ip string) { 816 t.mu.Lock() 817 defer t.mu.Unlock() 818 819 count := t.connections[ip] 820 if count > 0 { 821 t.connections[ip] = count - 1 822 } 823 824 if t.connections[ip] == 0 { 825 delete(t.connections, ip) 826 } 827}