Live video on the AT Protocol
at eli/sync-tangled 821 lines 25 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "net" 11 "net/http" 12 "net/http/httputil" 13 "net/url" 14 "os" 15 "slices" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/NYTimes/gziphandler" 21 "github.com/bluesky-social/indigo/api/bsky" 22 "github.com/julienschmidt/httprouter" 23 "github.com/rs/cors" 24 sloghttp "github.com/samber/slog-http" 25 "golang.org/x/time/rate" 26 27 "stream.place/streamplace/js/app" 28 "stream.place/streamplace/pkg/atproto" 29 "stream.place/streamplace/pkg/bus" 30 "stream.place/streamplace/pkg/config" 31 "stream.place/streamplace/pkg/crypto/signers/eip712" 32 "stream.place/streamplace/pkg/director" 33 apierrors "stream.place/streamplace/pkg/errors" 34 "stream.place/streamplace/pkg/linking" 35 "stream.place/streamplace/pkg/log" 36 "stream.place/streamplace/pkg/media" 37 "stream.place/streamplace/pkg/mist/mistconfig" 38 "stream.place/streamplace/pkg/model" 39 "stream.place/streamplace/pkg/notifications" 40 "stream.place/streamplace/pkg/spmetrics" 41 "stream.place/streamplace/pkg/spxrpc" 42 "stream.place/streamplace/pkg/streamplace" 43) 44 45type StreamplaceAPI struct { 46 CLI *config.CLI 47 Model model.Model 48 Updater *Updater 49 Signer *eip712.EIP712Signer 50 Mimes map[string]string 51 FirebaseNotifier notifications.FirebaseNotifier 52 MediaManager *media.MediaManager 53 MediaSigner media.MediaSigner 54 // not thread-safe yet 55 Aliases map[string]string 56 Bus *bus.Bus 57 ATSync *atproto.ATProtoSynchronizer 58 Director *director.Director 59 60 connTracker *WebsocketTracker 61 62 limiters map[string]*rate.Limiter 63 limitersMu sync.Mutex 64} 65 66type WebsocketTracker struct { 67 connections map[string]int 68 maxConnsPerIP int 69 mu sync.RWMutex 70} 71 72func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director) (*StreamplaceAPI, error) { 73 updater, err := PrepareUpdater(cli) 74 if err != nil { 75 return nil, err 76 } 77 a := &StreamplaceAPI{CLI: cli, 78 Model: mod, 79 Updater: updater, 80 Signer: signer, 81 FirebaseNotifier: noter, 82 MediaManager: mm, 83 MediaSigner: ms, 84 Aliases: map[string]string{}, 85 Bus: bus, 86 ATSync: atsync, 87 Director: d, 88 connTracker: NewWebsocketTracker(5), 89 limiters: make(map[string]*rate.Limiter), 90 } 91 a.Mimes, err = updater.GetMimes() 92 if err != nil { 93 return nil, err 94 } 95 return a, nil 96} 97 98type AppHostingFS struct { 99 http.FileSystem 100} 101 102var ErrorIndex = errors.New("not found, use index.html") 103 104func (fs AppHostingFS) Open(name string) (http.File, error) { 105 file, err1 := fs.FileSystem.Open(name) 106 if err1 == nil { 107 return file, nil 108 } 109 if !errors.Is(err1, os.ErrNotExist) { 110 return nil, err1 111 } 112 113 return nil, ErrorIndex 114} 115 116// api/playback/iame.li/webrtc?rendition=source 117// api/playback/iame.li/stream.mp4?rendition=source 118// api/playback/iame.li/stream.webm?rendition=source 119// api/playback/iame.li/hls/index.m3u8 120// api/playback/iame.li/hls/source/stream.m3u8 121// api/playback/iame.li/hls/source/000000000000.ts 122 123func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) { 124 xrpc, err := spxrpc.NewServer(a.CLI, a.Model) 125 if err != nil { 126 return nil, err 127 } 128 router := httprouter.New() 129 apiRouter := httprouter.New() 130 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx)) 131 // old clients 132 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx)) 133 // new ones 134 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx)) 135 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx)) 136 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 137 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 138 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 139 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx)) 140 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx)) 141 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx)) 142 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx)) 143 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx)) 144 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility 145 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 146 // this one is not a lie 147 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx)) 148 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx)) 149 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 150 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 151 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx)) 152 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx)) 153 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx)) 154 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 155 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx)) 156 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx)) 157 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 158 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 159 for _, platform := range atproto.AllowedPlatforms { 160 apiRouter.GET(fmt.Sprintf("/api/atproto-oauth/%s", platform), a.HandleATProtoOAuth(ctx, platform)) 161 } 162 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx)) 163 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx)) 164 apiRouter.NotFound = a.HandleAPI404(ctx) 165 router.Handler("GET", "/api/*resource", apiRouter) 166 router.Handler("POST", "/api/*resource", apiRouter) 167 router.Handler("PUT", "/api/*resource", apiRouter) 168 router.Handler("PATCH", "/api/*resource", apiRouter) 169 router.Handler("DELETE", "/api/*resource", apiRouter) 170 router.Handler("GET", "/xrpc/*resource", xrpc) 171 router.Handler("POST", "/xrpc/*resource", xrpc) 172 router.Handler("PUT", "/xrpc/*resource", xrpc) 173 router.Handler("PATCH", "/xrpc/*resource", xrpc) 174 router.Handler("DELETE", "/xrpc/*resource", xrpc) 175 router.GET("/.well-known/did.json", a.HandleDidJson(ctx)) 176 router.GET("/dl/*params", a.HandleAppDownload(ctx)) 177 router.POST("/", a.HandleWebRTCIngest(ctx)) 178 for _, redirect := range a.CLI.Redirects { 179 parts := strings.Split(redirect, ":") 180 if len(parts) != 2 { 181 log.Error(ctx, "invalid redirect", "redirect", redirect) 182 return nil, fmt.Errorf("invalid redirect: %s", redirect) 183 } 184 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 185 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect) 186 }) 187 } 188 if a.CLI.FrontendProxy != "" { 189 u, err := url.Parse(a.CLI.FrontendProxy) 190 if err != nil { 191 return nil, err 192 } 193 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy) 194 router.NotFound = &httputil.ReverseProxy{ 195 Rewrite: func(r *httputil.ProxyRequest) { 196 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost 197 // we need to use 127.0.0.1 because the atproto oauth client requires it 198 r.Out.Header.Set("Origin", u.String()) 199 r.SetURL(u) 200 }, 201 } 202 } else { 203 files, err := app.Files() 204 if err != nil { 205 return nil, err 206 } 207 index, err := files.Open("index.html") 208 if err != nil { 209 return nil, err 210 } 211 bs, err := io.ReadAll(index) 212 if err != nil { 213 return nil, err 214 } 215 linker, err := linking.NewLinker(ctx, bs) 216 if err != nil { 217 return nil, err 218 } 219 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker) 220 if err != nil { 221 return nil, err 222 } 223 router.NotFound = linkingHandler 224 } 225 // needed because the WebRTC handler issues 405s from / otherwise 226 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { 227 router.NotFound.ServeHTTP(w, r) 228 }) 229 handler := sloghttp.Recovery(router) 230 handler = cors.AllowAll().Handler(handler) 231 handler = sloghttp.New(slog.Default())(handler) 232 handler = a.RateLimitMiddleware(ctx)(handler) 233 234 return handler, nil 235} 236 237func copyHeader(dst, src http.Header) { 238 for k, vv := range src { 239 // we'll handle CORS ourselves, thanks 240 if strings.HasPrefix(k, "Access-Control") { 241 continue 242 } 243 for _, v := range vv { 244 dst.Add(k, v) 245 } 246 } 247} 248 249// handler that takes care of static files and otherwise returns the index.html with the correct link card data 250func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) { 251 files, err := app.Files() 252 if err != nil { 253 return nil, err 254 } 255 fs := AppHostingFS{http.FS(files)} 256 257 fileHandler := a.FileHandler(ctx, http.FileServer(fs)) 258 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 259 f := strings.TrimPrefix(req.URL.Path, "/") 260 // under docs we need the index.html suffix due to astro rendering 261 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") { 262 f += "index.html" 263 } 264 _, err := fs.Open(f) 265 if err == nil { 266 fileHandler.ServeHTTP(w, req) 267 return 268 } 269 if errors.Is(err, ErrorIndex) || f == "" { 270 bs, err := linker.GenerateDefaultCard(ctx, req.URL) 271 if err != nil { 272 log.Error(ctx, "error generating default card", "error", err) 273 } 274 w.Header().Set("Content-Type", "text/html") 275 w.Write(bs) 276 } else { 277 log.Warn(ctx, "error opening file", "error", err) 278 apierrors.WriteHTTPInternalServerError(w, "file not found", err) 279 } 280 }) 281 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 282 proto := "http" 283 if req.TLS != nil { 284 proto = "https" 285 } 286 fwProto := req.Header.Get("x-forwarded-proto") 287 if fwProto != "" { 288 proto = fwProto 289 } 290 req.URL.Host = req.Host 291 req.URL.Scheme = proto 292 maybeHandle := strings.TrimPrefix(req.URL.Path, "/") 293 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle) 294 if err != nil || repo == nil { 295 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle) 296 defaultHandler.ServeHTTP(w, req) 297 return 298 } 299 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID) 300 if err != nil || ls == nil { 301 log.Error(ctx, "no livestream found", "repoDID", repo.DID) 302 defaultHandler.ServeHTTP(w, req) 303 return 304 } 305 lsv, err := ls.ToLivestreamView() 306 if err != nil || lsv == nil { 307 log.Error(ctx, "no livestream view found", "repoDID", repo.DID) 308 defaultHandler.ServeHTTP(w, req) 309 return 310 } 311 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv) 312 if err != nil { 313 log.Error(ctx, "error generating html", "error", err) 314 defaultHandler.ServeHTTP(w, req) 315 return 316 } 317 w.Header().Set("Content-Type", "text/html") 318 w.Write(bs) 319 }), nil 320} 321 322func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 323 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 324 if !a.CLI.HasMist() { 325 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 326 return 327 } 328 stream := params.ByName("stream") 329 if stream == "" { 330 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 331 return 332 } 333 334 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream) 335 prefix := fmt.Sprintf(tmpl, fullstream) 336 resource := params.ByName("resource") 337 338 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 339 340 client := &http.Client{} 341 req.URL = &url.URL{ 342 Scheme: "http", 343 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 344 Path: fmt.Sprintf("%s%s", prefix, resource), 345 RawQuery: req.URL.RawQuery, 346 } 347 348 //http: Request.RequestURI can't be set in client requests. 349 //http://golang.org/src/pkg/net/http/client.go 350 req.RequestURI = "" 351 352 resp, err := client.Do(req) 353 if err != nil { 354 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 355 return 356 } 357 defer resp.Body.Close() 358 359 copyHeader(w.Header(), resp.Header) 360 w.WriteHeader(resp.StatusCode) 361 io.Copy(w, resp.Body) 362 } 363} 364 365func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc { 366 return func(w http.ResponseWriter, req *http.Request) { 367 noslash := req.URL.Path[1:] 368 ct, ok := a.Mimes[noslash] 369 if ok { 370 w.Header().Set("content-type", ct) 371 } 372 fs.ServeHTTP(w, req) 373 } 374} 375 376func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) { 377 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr) 378 if err != nil { 379 return nil, err 380 } 381 handleRedirect := func(w http.ResponseWriter, req *http.Request) { 382 host, _, err := net.SplitHostPort(req.Host) 383 if err != nil { 384 host = req.Host 385 } 386 u := req.URL 387 if tlsPort == "443" { 388 u.Host = host 389 } else { 390 u.Host = net.JoinHostPort(host, tlsPort) 391 } 392 u.Scheme = "https" 393 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect) 394 } 395 mux := http.NewServeMux() 396 mux.HandleFunc("/", handleRedirect) 397 return mux, nil 398} 399 400type NotificationPayload struct { 401 Token string `json:"token"` 402 RepoDID string `json:"repoDID"` 403} 404 405func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc { 406 return func(w http.ResponseWriter, req *http.Request) { 407 w.WriteHeader(404) 408 } 409} 410 411func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc { 412 return func(w http.ResponseWriter, req *http.Request) { 413 payload, err := io.ReadAll(req.Body) 414 if err != nil { 415 log.Log(ctx, "error reading notification create", "error", err) 416 w.WriteHeader(400) 417 return 418 } 419 n := NotificationPayload{} 420 err = json.Unmarshal(payload, &n) 421 if err != nil { 422 log.Log(ctx, "error unmarshalling notification create", "error", err) 423 w.WriteHeader(400) 424 return 425 } 426 err = a.Model.CreateNotification(n.Token, n.RepoDID) 427 if err != nil { 428 log.Log(ctx, "error creating notification", "error", err) 429 w.WriteHeader(400) 430 return 431 } 432 log.Log(ctx, "successfully created notification", "token", n.Token) 433 w.WriteHeader(200) 434 if n.RepoDID != "" { 435 go func() { 436 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model) 437 if err != nil { 438 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err) 439 } 440 }() 441 } 442 } 443} 444 445func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 446 return func(w http.ResponseWriter, req *http.Request) { 447 err := a.MediaManager.ValidateMP4(ctx, req.Body) 448 if err != nil { 449 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 450 return 451 } 452 w.WriteHeader(200) 453 } 454} 455 456func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle { 457 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) { 458 var event model.PlayerEventAPI 459 if err := json.NewDecoder(req.Body).Decode(&event); err != nil { 460 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err) 461 return 462 } 463 err := a.Model.CreatePlayerEvent(event) 464 if err != nil { 465 apierrors.WriteHTTPBadRequest(w, "could not create player event", err) 466 return 467 } 468 w.WriteHeader(201) 469 } 470} 471 472func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 473 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 474 segs, err := a.Model.MostRecentSegments() 475 if err != nil { 476 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 477 return 478 } 479 bs, err := json.Marshal(segs) 480 if err != nil { 481 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 482 return 483 } 484 w.Header().Add("Content-Type", "application/json") 485 w.Write(bs) 486 } 487} 488 489func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 490 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 491 user := params.ByName("repoDID") 492 if user == "" { 493 apierrors.WriteHTTPBadRequest(w, "user required", nil) 494 return 495 } 496 user, err := a.NormalizeUser(ctx, user) 497 if err != nil { 498 apierrors.WriteHTTPNotFound(w, "user not found", err) 499 return 500 } 501 seg, err := a.Model.LatestSegmentForUser(user) 502 if err != nil { 503 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 504 return 505 } 506 streamplaceSeg, err := seg.ToStreamplaceSegment() 507 if err != nil { 508 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 509 return 510 } 511 bs, err := json.Marshal(streamplaceSeg) 512 if err != nil { 513 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 514 return 515 } 516 w.Header().Add("Content-Type", "application/json") 517 w.Write(bs) 518 } 519} 520 521type LiveUsersResponse struct { 522 model.Segment 523 Viewers int `json:"viewers"` 524} 525 526func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle { 527 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 528 repos, err := a.Model.MostRecentSegments() 529 if err != nil { 530 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err) 531 return 532 } 533 liveUsers := []LiveUsersResponse{} 534 for _, repo := range repos { 535 viewers := spmetrics.GetViewCount(repo.RepoDID) 536 liveUsers = append(liveUsers, LiveUsersResponse{ 537 Segment: repo, 538 Viewers: viewers, 539 }) 540 } 541 bs, err := json.Marshal(liveUsers) 542 if err != nil { 543 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err) 544 return 545 } 546 w.Write(bs) 547 } 548} 549 550func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle { 551 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 552 user := params.ByName("user") 553 if user == "" { 554 apierrors.WriteHTTPBadRequest(w, "user required", nil) 555 return 556 } 557 user, err := a.NormalizeUser(ctx, user) 558 if err != nil { 559 apierrors.WriteHTTPNotFound(w, "user not found", err) 560 return 561 } 562 count := spmetrics.GetViewCount(user) 563 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"}) 564 if err != nil { 565 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err) 566 return 567 } 568 w.Write(bs) 569 } 570} 571 572func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle { 573 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 574 log.Log(ctx, "got bluesky notification", "params", params) 575 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model) 576 if err != nil { 577 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err) 578 return 579 } 580 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID) 581 if err != nil { 582 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err) 583 return 584 } 585 bs, err := json.Marshal(signingKeys) 586 if err != nil { 587 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err) 588 return 589 } 590 w.Write(bs) 591 } 592} 593 594func (a *StreamplaceAPI) HandleATProtoOAuth(ctx context.Context, platform string) httprouter.Handle { 595 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 596 host, _, err := net.SplitHostPort(req.Host) 597 if err != nil { 598 host = req.Host 599 } 600 if !slices.Contains(atproto.AllowedPlatforms, platform) { 601 apierrors.WriteHTTPBadRequest(w, "unsupported platform", nil) 602 return 603 } 604 605 meta := atproto.GetMetadata(host, platform, a.CLI.AppBundleID) 606 bs, err := json.Marshal(meta) 607 if err != nil { 608 apierrors.WriteHTTPInternalServerError(w, "could not marshal metadata", err) 609 return 610 } 611 w.Header().Set("Content-Type", "application/json") 612 w.Write(bs) 613 } 614} 615 616type ChatResponse struct { 617 Post *bsky.FeedPost `json:"post"` 618 Repo *model.Repo `json:"repo"` 619 CID string `json:"cid"` 620} 621 622func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle { 623 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 624 user := params.ByName("repoDID") 625 if user == "" { 626 apierrors.WriteHTTPBadRequest(w, "user required", nil) 627 return 628 } 629 repoDID, err := a.NormalizeUser(ctx, user) 630 if err != nil { 631 apierrors.WriteHTTPNotFound(w, "user not found", err) 632 return 633 } 634 replies, err := a.Model.GetReplies(repoDID) 635 if err != nil { 636 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err) 637 return 638 } 639 bs, err := json.Marshal(replies) 640 if err != nil { 641 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err) 642 return 643 } 644 w.Header().Set("Content-Type", "application/json") 645 w.Write(bs) 646 } 647} 648 649func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle { 650 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 651 user := params.ByName("repoDID") 652 if user == "" { 653 apierrors.WriteHTTPBadRequest(w, "user required", nil) 654 return 655 } 656 repoDID, err := a.NormalizeUser(ctx, user) 657 if err != nil { 658 apierrors.WriteHTTPNotFound(w, "user not found", err) 659 return 660 } 661 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID) 662 if err != nil { 663 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err) 664 return 665 } 666 667 doc, err := livestream.ToLivestreamView() 668 if err != nil { 669 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 670 return 671 } 672 673 if livestream == nil { 674 apierrors.WriteHTTPNotFound(w, "no livestream found", nil) 675 return 676 } 677 678 bs, err := json.Marshal(doc) 679 if err != nil { 680 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err) 681 return 682 } 683 w.Header().Set("Content-Type", "application/json") 684 w.Write(bs) 685 } 686} 687 688func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler { 689 return func(next http.Handler) http.Handler { 690 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { 691 ip, _, err := net.SplitHostPort(req.RemoteAddr) 692 if err != nil { 693 ip = req.RemoteAddr 694 } 695 696 limiter := a.getLimiter(ip) 697 698 if !limiter.Allow() { 699 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path) 700 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded") 701 return 702 } 703 704 next.ServeHTTP(w, req) 705 }) 706 } 707} 708 709func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error { 710 handler, err := a.Handler(ctx) 711 if err != nil { 712 return err 713 } 714 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 715 s.Addr = a.CLI.HttpAddr 716 log.Log(ctx, "http server starting", "addr", s.Addr) 717 return s.ListenAndServe() 718 }) 719} 720 721func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error { 722 handler, err := a.RedirectHandler(ctx) 723 if err != nil { 724 return err 725 } 726 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 727 s.Addr = a.CLI.HttpAddr 728 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr) 729 return s.ListenAndServe() 730 }) 731} 732 733func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error { 734 handler, err := a.Handler(ctx) 735 if err != nil { 736 return err 737 } 738 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 739 s.Addr = a.CLI.HttpsAddr 740 log.Log(ctx, "https server starting", 741 "addr", s.Addr, 742 "certPath", a.CLI.TLSCertPath, 743 "keyPath", a.CLI.TLSKeyPath, 744 ) 745 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath) 746 }) 747} 748 749func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error { 750 ctx, cancel := context.WithCancel(ctx) 751 handler = gziphandler.GzipHandler(handler) 752 server := http.Server{Handler: handler} 753 var serveErr error 754 go func() { 755 serveErr = serve(&server) 756 cancel() 757 }() 758 <-ctx.Done() 759 if serveErr != nil { 760 return fmt.Errorf("error in http server: %w", serveErr) 761 } 762 763 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 764 defer cancel() 765 return server.Shutdown(ctx) 766} 767 768func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc { 769 return func(w http.ResponseWriter, req *http.Request) { 770 w.WriteHeader(200) 771 } 772} 773 774func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter { 775 a.limitersMu.Lock() 776 defer a.limitersMu.Unlock() 777 778 limiter, exists := a.limiters[ip] 779 if !exists { 780 // 5 actions per second with a burst of 3 781 limiter = rate.NewLimiter(rate.Limit(10.0), 8) 782 a.limiters[ip] = limiter 783 } 784 785 return limiter 786} 787 788func NewWebsocketTracker(maxConns int) *WebsocketTracker { 789 return &WebsocketTracker{ 790 connections: make(map[string]int), 791 maxConnsPerIP: maxConns, 792 } 793} 794 795func (t *WebsocketTracker) AddConnection(ip string) bool { 796 t.mu.Lock() 797 defer t.mu.Unlock() 798 799 count := t.connections[ip] 800 801 if count >= t.maxConnsPerIP { 802 return false 803 } 804 805 t.connections[ip] = count + 1 806 return true 807} 808 809func (t *WebsocketTracker) RemoveConnection(ip string) { 810 t.mu.Lock() 811 defer t.mu.Unlock() 812 813 count := t.connections[ip] 814 if count > 0 { 815 t.connections[ip] = count - 1 816 } 817 818 if t.connections[ip] == 0 { 819 delete(t.connections, ip) 820 } 821}