Live video on the AT Protocol
at eli/server-restart-test 832 lines 27 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "strings" 15 "sync" 16 "time" 17 18 "github.com/NYTimes/gziphandler" 19 "github.com/bluesky-social/indigo/api/bsky" 20 "github.com/google/uuid" 21 "github.com/julienschmidt/httprouter" 22 "github.com/rs/cors" 23 sloghttp "github.com/samber/slog-http" 24 "golang.org/x/time/rate" 25 26 "github.com/streamplace/oatproxy/pkg/oatproxy" 27 "stream.place/streamplace/js/app" 28 "stream.place/streamplace/pkg/atproto" 29 "stream.place/streamplace/pkg/bus" 30 "stream.place/streamplace/pkg/config" 31 "stream.place/streamplace/pkg/crypto/signers/eip712" 32 "stream.place/streamplace/pkg/director" 33 apierrors "stream.place/streamplace/pkg/errors" 34 "stream.place/streamplace/pkg/linking" 35 "stream.place/streamplace/pkg/log" 36 "stream.place/streamplace/pkg/media" 37 "stream.place/streamplace/pkg/mist/mistconfig" 38 "stream.place/streamplace/pkg/model" 39 "stream.place/streamplace/pkg/notifications" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/spxrpc" 42 "stream.place/streamplace/pkg/streamplace" 43 44 metrics "github.com/slok/go-http-metrics/metrics/prometheus" 45 "github.com/slok/go-http-metrics/middleware" 46 echomiddleware "github.com/slok/go-http-metrics/middleware/echo" 47 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter" 48 middlewarestd "github.com/slok/go-http-metrics/middleware/std" 49) 50 51type StreamplaceAPI struct { 52 CLI *config.CLI 53 Model model.Model 54 Updater *Updater 55 Signer *eip712.EIP712Signer 56 Mimes map[string]string 57 FirebaseNotifier notifications.FirebaseNotifier 58 MediaManager *media.MediaManager 59 MediaSigner media.MediaSigner 60 // not thread-safe yet 61 Aliases map[string]string 62 Bus *bus.Bus 63 ATSync *atproto.ATProtoSynchronizer 64 Director *director.Director 65 66 connTracker *WebsocketTracker 67 68 limiters map[string]*rate.Limiter 69 limitersMu sync.Mutex 70 SignerCache map[string]media.MediaSigner 71 SignerCacheMu sync.Mutex 72 op *oatproxy.OATProxy 73} 74 75type WebsocketTracker struct { 76 connections map[string]int 77 maxConnsPerIP int 78 mu sync.RWMutex 79} 80 81func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 82 updater, err := PrepareUpdater(cli) 83 if err != nil { 84 return nil, err 85 } 86 a := &StreamplaceAPI{CLI: cli, 87 Model: mod, 88 Updater: updater, 89 Signer: signer, 90 FirebaseNotifier: noter, 91 MediaManager: mm, 92 MediaSigner: ms, 93 Aliases: map[string]string{}, 94 Bus: bus, 95 ATSync: atsync, 96 Director: d, 97 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket), 98 limiters: make(map[string]*rate.Limiter), 99 SignerCache: make(map[string]media.MediaSigner), 100 op: op, 101 } 102 a.Mimes, err = updater.GetMimes() 103 if err != nil { 104 return nil, err 105 } 106 return a, nil 107} 108 109type AppHostingFS struct { 110 http.FileSystem 111} 112 113var ErrorIndex = errors.New("not found, use index.html") 114 115func (fs AppHostingFS) Open(name string) (http.File, error) { 116 file, err1 := fs.FileSystem.Open(name) 117 if err1 == nil { 118 return file, nil 119 } 120 return nil, ErrorIndex 121} 122 123// api/playback/iame.li/webrtc?rendition=source 124// api/playback/iame.li/stream.mp4?rendition=source 125// api/playback/iame.li/stream.webm?rendition=source 126// api/playback/iame.li/hls/index.m3u8 127// api/playback/iame.li/hls/source/stream.m3u8 128// api/playback/iame.li/hls/source/000000000000.ts 129 130func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 131 132 mdlw := middleware.New(middleware.Config{ 133 Recorder: metrics.NewRecorder(metrics.Config{}), 134 }) 135 var xrpc http.Handler 136 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw) 137 if err != nil { 138 return nil, err 139 } 140 router := httprouter.New() 141 142 // Create our middleware factory with the default settings. 143 144 a.op.Echo.Use(echomiddleware.Handler("", mdlw)) 145 146 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw)) 147 148 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) { 149 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw)) 150 } 151 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) { 152 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler)) 153 } 154 155 router.Handler("GET", "/oauth/*anything", a.op.Handler()) 156 router.Handler("POST", "/oauth/*anything", a.op.Handler()) 157 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler()) 158 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler()) 159 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx)) 160 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx)) 161 apiRouter := httprouter.New() 162 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx)) 163 // old clients 164 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx)) 165 // new ones 166 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx)) 167 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 168 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 169 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 170 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 171 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx)) 172 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx)) 173 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 174 addHandle(apiRouter, "GET", "/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 175 addHandle(apiRouter, "GET", "/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 176 // they're jpegs now 177 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 178 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons) 179 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 180 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx)) 181 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 182 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 183 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 184 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx)) 185 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 186 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 187 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 188 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 189 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 190 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 191 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 192 apiRouter.NotFound = a.HandleAPI404(ctx) 193 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter) 194 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc) 195 router.Handler("GET", "/api/*resource", apiRouterHandler) 196 router.Handler("POST", "/api/*resource", apiRouterHandler) 197 router.Handler("PUT", "/api/*resource", apiRouterHandler) 198 router.Handler("PATCH", "/api/*resource", apiRouterHandler) 199 router.Handler("DELETE", "/api/*resource", apiRouterHandler) 200 router.Handler("GET", "/xrpc/*resource", xrpcHandler) 201 router.Handler("POST", "/xrpc/*resource", xrpcHandler) 202 router.Handler("PUT", "/xrpc/*resource", xrpcHandler) 203 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler) 204 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler) 205 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx)) 206 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx)) 207 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 208 router.POST("/", a.HandleWebRTCIngest(ctx)) 209 for _, redirect := range a.CLI.Redirects { 210 parts := strings.Split(redirect, ":") 211 if len(parts) != 2 { 212 log.Error(ctx, "invalid redirect", "redirect", redirect) 213 return nil, fmt.Errorf("invalid redirect: %s", redirect) 214 } 215 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 216 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 217 }) 218 } 219 if a.CLI.FrontendProxy != "" { 220 u, err := url.Parse(a.CLI.FrontendProxy) 221 if err != nil { 222 return nil, err 223 } 224 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 225 router.NotFound = &httputil.ReverseProxy{ 226 Rewrite: func(r *httputil.ProxyRequest) { 227 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 228 // we need to use 127.0.0.1 because the atproto oauth client requires it 229 r.Out.Header.Set("Origin", u.String()) 230 r.SetURL(u) 231 }, 232 } 233 } else { 234 files, err := app.Files() 235 if err != nil { 236 return nil, err 237 } 238 index, err := files.Open("index.html") 239 if err != nil { 240 return nil, err 241 } 242 bs, err := io.ReadAll(index) 243 if err != nil { 244 return nil, err 245 } 246 linker, err := linking.NewLinker(ctx, bs) 247 if err != nil { 248 return nil, err 249 } 250 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 251 if err != nil { 252 return nil, err 253 } 254 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler) 255 } 256 // needed because the WebRTC handler issues 405s from / otherwise 257 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 258 router.NotFound.ServeHTTP(w, r) 259 }) 260 handler := sloghttp.Recovery(router) 261 handler = cors.AllowAll().Handler(handler) 262 handler = sloghttp.New(slog.Default())(handler) 263 handler = a.RateLimitMiddleware(ctx)(handler) 264 265 // this needs to be LAST so nothing else clobbers the context 266 handler = a.ContextMiddleware(ctx)(handler) 267 268 return handler, nil 269} 270func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler { 271 return func(next http.Handler) http.Handler { 272 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 273 uuid := uuid.New().String() 274 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path) 275 r = r.WithContext(ctx) 276 next.ServeHTTP(w, r) 277 }) 278 } 279} 280func copyHeader(dst, src http.Header) { 281 for k, vv := range src { 282 // we'll handle CORS ourselves, thanks 283 if strings.HasPrefix(k, "Access-Control") { 284 continue 285 } 286 for _, v := range vv { 287 dst.Add(k, v) 288 } 289 } 290} 291 292// handler that takes care of static files and otherwise returns the index.html with the correct link card data 293func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 294 files, err := app.Files() 295 if err != nil { 296 return nil, err 297 } 298 fs := AppHostingFS{http.FS(files)} 299 300 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 301 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 302 f := strings.TrimPrefix(req.URL.Path, "/") 303 // under docs we need the index.html suffix due to astro rendering 304 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 305 f += "index.html" 306 } 307 _, err := fs.Open(f) 308 if err == nil { 309 fileHandler.ServeHTTP(w, req) 310 return 311 } 312 if errors.Is(err, ErrorIndex) || f == "" { 313 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 314 if err != nil { 315 log.Error(ctx, "error generating default card", "error", err) 316 } 317 w.Header().Set("Content-Type", "text/html") 318 if _, err := w.Write(bs); err != nil { 319 log.Error(ctx, "error writing response", "error", err) 320 } 321 } else { 322 log.Warn(ctx, "error opening file", "error", err) 323 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 324 } 325 }) 326 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 327 proto := "http" 328 if req.TLS != nil { 329 proto = "https" 330 } 331 fwProto := req.Header.Get("x-forwarded-proto") 332 if fwProto != "" { 333 proto = fwProto 334 } 335 req.URL.Host = req.Host 336 req.URL.Scheme = proto 337 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 338 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 339 if err != nil || repo == nil { 340 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 341 defaultHandler.ServeHTTP(w, req) 342 return 343 } 344 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 345 if err != nil || ls == nil { 346 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 347 defaultHandler.ServeHTTP(w, req) 348 return 349 } 350 lsv, err := ls.ToLivestreamView() 351 if err != nil || lsv == nil { 352 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 353 defaultHandler.ServeHTTP(w, req) 354 return 355 } 356 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 357 if err != nil { 358 log.Error(ctx, "error generating html", "error", err) 359 defaultHandler.ServeHTTP(w, req) 360 return 361 } 362 w.Header().Set("Content-Type", "text/html") 363 if _, err := w.Write(bs); err != nil { 364 log.Error(ctx, "error writing response", "error", err) 365 } 366 }), nil 367} 368 369func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 370 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 371 if !a.CLI.HasMist() { 372 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 373 return 374 } 375 stream := params.ByName("stream") 376 if stream == "" { 377 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 378 return 379 } 380 381 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream) 382 prefix := fmt.Sprintf(tmpl, fullstream) 383 resource := params.ByName("resource") 384 385 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 386 387 client := &http.Client{} 388 req.URL = &url.URL{ 389 Scheme: "http", 390 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 391 Path: fmt.Sprintf("%s%s", prefix, resource), 392 RawQuery: req.URL.RawQuery, 393 } 394 395 //http: Request.RequestURI can't be set in client requests. 396 //http://golang.org/src/pkg/net/http/client.go 397 req.RequestURI = "" 398 399 resp, err := client.Do(req) 400 if err != nil { 401 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 402 return 403 } 404 defer resp.Body.Close() 405 406 copyHeader(w.Header(), resp.Header) 407 w.WriteHeader(resp.StatusCode) 408 if _, err := io.Copy(w, resp.Body); err != nil { 409 log.Error(ctx, "error writing response", "error", err) 410 } 411 } 412} 413 414func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 415 return func(w http.ResponseWriter, req *http.Request) { 416 noslash := req.URL.Path[1:] 417 ct, ok := a.Mimes[noslash] 418 if ok { 419 w.Header().Set("content-type", ct) 420 } 421 fs.ServeHTTP(w, req) 422 } 423} 424 425func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 426 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr) 427 if err != nil { 428 return nil, err 429 } 430 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 431 host, _, err := net.SplitHostPort(req.Host) 432 if err != nil { 433 host = req.Host 434 } 435 u := req.URL 436 if tlsPort == "443" { 437 u.Host = host 438 } else { 439 u.Host = net.JoinHostPort(host, tlsPort) 440 } 441 u.Scheme = "https" 442 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 443 } 444 mux := http.NewServeMux() 445 mux.HandleFunc("/", handleRedirect) 446 return mux, nil 447} 448 449type NotificationPayload struct { 450 Token string `json:"token"` 451 RepoDID string `json:"repoDID"` 452} 453 454func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 455 return func(w http.ResponseWriter, req *http.Request) { 456 w.WriteHeader(404) 457 } 458} 459 460func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 461 return func(w http.ResponseWriter, req *http.Request) { 462 payload, err := io.ReadAll(req.Body) 463 if err != nil { 464 log.Log(ctx, "error reading notification create", "error", err) 465 w.WriteHeader(400) 466 return 467 } 468 n := NotificationPayload{} 469 err = json.Unmarshal(payload, &n) 470 if err != nil { 471 log.Log(ctx, "error unmarshalling notification create", "error", err) 472 w.WriteHeader(400) 473 return 474 } 475 err = a.Model.CreateNotification(n.Token, n.RepoDID) 476 if err != nil { 477 log.Log(ctx, "error creating notification", "error", err) 478 w.WriteHeader(400) 479 return 480 } 481 log.Log(ctx, "successfully created notification", "token", n.Token) 482 w.WriteHeader(200) 483 if n.RepoDID != "" { 484 go func() { 485 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 486 if err != nil { 487 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 488 } 489 }() 490 } 491 } 492} 493 494func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 495 return func(w http.ResponseWriter, req *http.Request) { 496 err := a.MediaManager.ValidateMP4(ctx, req.Body) 497 if err != nil { 498 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 499 return 500 } 501 w.WriteHeader(200) 502 } 503} 504 505func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 506 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 507 var event model.PlayerEventAPI 508 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 509 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 510 return 511 } 512 err := a.Model.CreatePlayerEvent(event) 513 if err != nil { 514 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 515 return 516 } 517 w.WriteHeader(201) 518 } 519} 520 521func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 522 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 523 segs, err := a.Model.MostRecentSegments() 524 if err != nil { 525 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 526 return 527 } 528 bs, err := json.Marshal(segs) 529 if err != nil { 530 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 531 return 532 } 533 w.Header().Add("Content-Type", "application/json") 534 if _, err := w.Write(bs); err != nil { 535 log.Error(ctx, "error writing response", "error", err) 536 } 537 } 538} 539 540func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 541 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 542 user := params.ByName("repoDID") 543 if user == "" { 544 apierrors.WriteHTTPBadRequest(w, "user required", nil) 545 return 546 } 547 user, err := a.NormalizeUser(ctx, user) 548 if err != nil { 549 apierrors.WriteHTTPNotFound(w, "user not found", err) 550 return 551 } 552 seg, err := a.Model.LatestSegmentForUser(user) 553 if err != nil { 554 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 555 return 556 } 557 streamplaceSeg, err := seg.ToStreamplaceSegment() 558 if err != nil { 559 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 560 return 561 } 562 bs, err := json.Marshal(streamplaceSeg) 563 if err != nil { 564 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 565 return 566 } 567 w.Header().Add("Content-Type", "application/json") 568 if _, err := w.Write(bs); err != nil { 569 log.Error(ctx, "error writing response", "error", err) 570 } 571 } 572} 573 574func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 575 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 576 user := params.ByName("user") 577 if user == "" { 578 apierrors.WriteHTTPBadRequest(w, "user required", nil) 579 return 580 } 581 user, err := a.NormalizeUser(ctx, user) 582 if err != nil { 583 apierrors.WriteHTTPNotFound(w, "user not found", err) 584 return 585 } 586 count := spmetrics.GetViewCount(user) 587 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 588 if err != nil { 589 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 590 return 591 } 592 if _, err := w.Write(bs); err != nil { 593 log.Error(ctx, "error writing response", "error", err) 594 } 595 } 596} 597 598func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 599 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 600 log.Log(ctx, "got bluesky notification", "params", params) 601 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 602 if err != nil { 603 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 604 return 605 } 606 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 607 if err != nil { 608 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 609 return 610 } 611 bs, err := json.Marshal(signingKeys) 612 if err != nil { 613 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 614 return 615 } 616 if _, err := w.Write(bs); err != nil { 617 log.Error(ctx, "error writing response", "error", err) 618 } 619 } 620} 621 622type ChatResponse struct { 623 Post *bsky.FeedPost `json:"post"` 624 Repo *model.Repo `json:"repo"` 625 CID string `json:"cid"` 626} 627 628func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 629 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 630 user := params.ByName("repoDID") 631 if user == "" { 632 apierrors.WriteHTTPBadRequest(w, "user required", nil) 633 return 634 } 635 repoDID, err := a.NormalizeUser(ctx, user) 636 if err != nil { 637 apierrors.WriteHTTPNotFound(w, "user not found", err) 638 return 639 } 640 replies, err := a.Model.GetReplies(repoDID) 641 if err != nil { 642 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 643 return 644 } 645 bs, err := json.Marshal(replies) 646 if err != nil { 647 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 648 return 649 } 650 w.Header().Set("Content-Type", "application/json") 651 if _, err := w.Write(bs); err != nil { 652 log.Error(ctx, "error writing response", "error", err) 653 } 654 } 655} 656 657func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 658 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 659 user := params.ByName("repoDID") 660 if user == "" { 661 apierrors.WriteHTTPBadRequest(w, "user required", nil) 662 return 663 } 664 repoDID, err := a.NormalizeUser(ctx, user) 665 if err != nil { 666 apierrors.WriteHTTPNotFound(w, "user not found", err) 667 return 668 } 669 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 670 if err != nil { 671 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 672 return 673 } 674 675 doc, err := livestream.ToLivestreamView() 676 if err != nil { 677 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 678 return 679 } 680 681 if livestream == nil { 682 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 683 return 684 } 685 686 bs, err := json.Marshal(doc) 687 if err != nil { 688 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 689 return 690 } 691 w.Header().Set("Content-Type", "application/json") 692 if _, err := w.Write(bs); err != nil { 693 log.Error(ctx, "error writing response", "error", err) 694 } 695 } 696} 697 698func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 699 return func(next http.Handler) http.Handler { 700 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 701 ip, _, err := net.SplitHostPort(req.RemoteAddr) 702 if err != nil { 703 ip = req.RemoteAddr 704 } 705 706 if a.CLI.RateLimitPerSecond > 0 { 707 limiter := a.getLimiter(ip) 708 709 if !limiter.Allow() { 710 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 711 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 712 return 713 } 714 } 715 716 next.ServeHTTP(w, req) 717 }) 718 } 719} 720 721func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 722 handler, err := a.Handler(ctx) 723 if err != nil { 724 return err 725 } 726 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 727 s.Addr = a.CLI.HTTPAddr 728 log.Log(ctx, "http server starting", "addr", s.Addr) 729 return s.ListenAndServe() 730 }) 731} 732 733func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 734 handler, err := a.RedirectHandler(ctx) 735 if err != nil { 736 return err 737 } 738 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 739 s.Addr = a.CLI.HTTPAddr 740 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 741 return s.ListenAndServe() 742 }) 743} 744 745func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 746 handler, err := a.Handler(ctx) 747 if err != nil { 748 return err 749 } 750 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 751 s.Addr = a.CLI.HTTPSAddr 752 log.Log(ctx, "https server starting", 753 "addr", s.Addr, 754 "certPath", a.CLI.TLSCertPath, 755 "keyPath", a.CLI.TLSKeyPath, 756 ) 757 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 758 }) 759} 760 761func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 762 ctx, cancel := context.WithCancel(ctx) 763 handler = gziphandler.GzipHandler(handler) 764 server := http.Server{Handler: handler} 765 var serveErr error 766 go func() { 767 serveErr = serve(&server) 768 cancel() 769 }() 770 <-ctx.Done() 771 if serveErr != nil { 772 return fmt.Errorf("error in http server: %w", serveErr) 773 } 774 775 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 776 defer cancel() 777 return server.Shutdown(ctx) 778} 779 780func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 781 return func(w http.ResponseWriter, req *http.Request) { 782 w.WriteHeader(200) 783 } 784} 785 786func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 787 a.limitersMu.Lock() 788 defer a.limitersMu.Unlock() 789 790 limiter, exists := a.limiters[ip] 791 if !exists { 792 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst) 793 a.limiters[ip] = limiter 794 } 795 796 return limiter 797} 798 799func NewWebsocketTracker(maxConns int) *WebsocketTracker { 800 return &WebsocketTracker{ 801 connections: make(map[string]int), 802 maxConnsPerIP: maxConns, 803 } 804} 805 806func (t *WebsocketTracker) AddConnection(ip string) bool { 807 t.mu.Lock() 808 defer t.mu.Unlock() 809 810 count := t.connections[ip] 811 812 if count >= t.maxConnsPerIP { 813 return false 814 } 815 816 t.connections[ip] = count + 1 817 return true 818} 819 820func (t *WebsocketTracker) RemoveConnection(ip string) { 821 t.mu.Lock() 822 defer t.mu.Unlock() 823 824 count := t.connections[ip] 825 if count > 0 { 826 t.connections[ip] = count - 1 827 } 828 829 if t.connections[ip] == 0 { 830 delete(t.connections, ip) 831 } 832}