Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/julienschmidt/httprouter"
22 "github.com/rs/cors"
23 sloghttp "github.com/samber/slog-http"
24 "golang.org/x/time/rate"
25
26 "stream.place/streamplace/js/app"
27 "stream.place/streamplace/pkg/atproto"
28 "stream.place/streamplace/pkg/bus"
29 "stream.place/streamplace/pkg/config"
30 "stream.place/streamplace/pkg/crypto/signers/eip712"
31 "stream.place/streamplace/pkg/director"
32 apierrors "stream.place/streamplace/pkg/errors"
33 "stream.place/streamplace/pkg/linking"
34 "stream.place/streamplace/pkg/log"
35 "stream.place/streamplace/pkg/media"
36 "stream.place/streamplace/pkg/mist/mistconfig"
37 "stream.place/streamplace/pkg/model"
38 "stream.place/streamplace/pkg/notifications"
39 "stream.place/streamplace/pkg/oproxy"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/spxrpc"
42 "stream.place/streamplace/pkg/streamplace"
43)
44
45type StreamplaceAPI struct {
46 CLI *config.CLI
47 Model model.Model
48 Updater *Updater
49 Signer *eip712.EIP712Signer
50 Mimes map[string]string
51 FirebaseNotifier notifications.FirebaseNotifier
52 MediaManager *media.MediaManager
53 MediaSigner media.MediaSigner
54 // not thread-safe yet
55 Aliases map[string]string
56 Bus *bus.Bus
57 ATSync *atproto.ATProtoSynchronizer
58 Director *director.Director
59
60 connTracker *WebsocketTracker
61
62 limiters map[string]*rate.Limiter
63 limitersMu sync.Mutex
64 SignerCache map[string]media.MediaSigner
65 SignerCacheMu sync.Mutex
66 op *oproxy.OProxy
67}
68
69type WebsocketTracker struct {
70 connections map[string]int
71 maxConnsPerIP int
72 mu sync.RWMutex
73}
74
75func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oproxy.OProxy) (*StreamplaceAPI, error) {
76 updater, err := PrepareUpdater(cli)
77 if err != nil {
78 return nil, err
79 }
80 a := &StreamplaceAPI{CLI: cli,
81 Model: mod,
82 Updater: updater,
83 Signer: signer,
84 FirebaseNotifier: noter,
85 MediaManager: mm,
86 MediaSigner: ms,
87 Aliases: map[string]string{},
88 Bus: bus,
89 ATSync: atsync,
90 Director: d,
91 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
92 limiters: make(map[string]*rate.Limiter),
93 SignerCache: make(map[string]media.MediaSigner),
94 op: op,
95 }
96 a.Mimes, err = updater.GetMimes()
97 if err != nil {
98 return nil, err
99 }
100 return a, nil
101}
102
103type AppHostingFS struct {
104 http.FileSystem
105}
106
107var ErrorIndex = errors.New("not found, use index.html")
108
109func (fs AppHostingFS) Open(name string) (http.File, error) {
110 file, err1 := fs.FileSystem.Open(name)
111 if err1 == nil {
112 return file, nil
113 }
114 if !errors.Is(err1, os.ErrNotExist) {
115 return nil, err1
116 }
117
118 return nil, ErrorIndex
119}
120
121// api/playback/iame.li/webrtc?rendition=source
122// api/playback/iame.li/stream.mp4?rendition=source
123// api/playback/iame.li/stream.webm?rendition=source
124// api/playback/iame.li/hls/index.m3u8
125// api/playback/iame.li/hls/source/stream.m3u8
126// api/playback/iame.li/hls/source/000000000000.ts
127
128func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
129 var xrpc http.Handler
130 xrpc, err := spxrpc.NewServer(a.CLI, a.Model)
131 if err != nil {
132 return nil, err
133 }
134
135 xrpc = a.op.OAuthMiddleware(xrpc)
136 router := httprouter.New()
137 router.Handler("GET", "/oauth/*anything", a.op.Handler())
138 router.Handler("POST", "/oauth/*anything", a.op.Handler())
139 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
140 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
141 apiRouter := httprouter.New()
142 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
143 // old clients
144 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
145
146 // new ones
147 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
148 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
149 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
150 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
151 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
152 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
153 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
154 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
155 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
156 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
157 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
158 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
159 // this one is not a lie
160 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
161 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
162 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
163 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
164 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
165 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
166 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
167 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
168 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
169 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
170 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
171 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
172 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
173 apiRouter.NotFound = a.HandleAPI404(ctx)
174 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
175 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
176 router.Handler("GET", "/api/*resource", apiRouterHandler)
177 router.Handler("POST", "/api/*resource", apiRouterHandler)
178 router.Handler("PUT", "/api/*resource", apiRouterHandler)
179 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
180 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
181 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
182 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
183 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
184 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
185 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
186 router.GET("/.well-known/did.json", a.HandleDidJson(ctx))
187 router.GET("/dl/*params", a.HandleAppDownload(ctx))
188 router.POST("/", a.HandleWebRTCIngest(ctx))
189 for _, redirect := range a.CLI.Redirects {
190 parts := strings.Split(redirect, ":")
191 if len(parts) != 2 {
192 log.Error(ctx, "invalid redirect", "redirect", redirect)
193 return nil, fmt.Errorf("invalid redirect: %s", redirect)
194 }
195 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
196 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
197 })
198 }
199 if a.CLI.FrontendProxy != "" {
200 u, err := url.Parse(a.CLI.FrontendProxy)
201 if err != nil {
202 return nil, err
203 }
204 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
205 router.NotFound = &httputil.ReverseProxy{
206 Rewrite: func(r *httputil.ProxyRequest) {
207 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
208 // we need to use 127.0.0.1 because the atproto oauth client requires it
209 r.Out.Header.Set("Origin", u.String())
210 r.SetURL(u)
211 },
212 }
213 } else {
214 files, err := app.Files()
215 if err != nil {
216 return nil, err
217 }
218 index, err := files.Open("index.html")
219 if err != nil {
220 return nil, err
221 }
222 bs, err := io.ReadAll(index)
223 if err != nil {
224 return nil, err
225 }
226 linker, err := linking.NewLinker(ctx, bs)
227 if err != nil {
228 return nil, err
229 }
230 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
231 if err != nil {
232 return nil, err
233 }
234 router.NotFound = linkingHandler
235 }
236 // needed because the WebRTC handler issues 405s from / otherwise
237 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
238 router.NotFound.ServeHTTP(w, r)
239 })
240 handler := sloghttp.Recovery(router)
241 handler = cors.AllowAll().Handler(handler)
242 handler = sloghttp.New(slog.Default())(handler)
243 handler = a.RateLimitMiddleware(ctx)(handler)
244
245 return handler, nil
246}
247
248func copyHeader(dst, src http.Header) {
249 for k, vv := range src {
250 // we'll handle CORS ourselves, thanks
251 if strings.HasPrefix(k, "Access-Control") {
252 continue
253 }
254 for _, v := range vv {
255 dst.Add(k, v)
256 }
257 }
258}
259
260// handler that takes care of static files and otherwise returns the index.html with the correct link card data
261func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
262 files, err := app.Files()
263 if err != nil {
264 return nil, err
265 }
266 fs := AppHostingFS{http.FS(files)}
267
268 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
269 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
270 f := strings.TrimPrefix(req.URL.Path, "/")
271 // under docs we need the index.html suffix due to astro rendering
272 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
273 f += "index.html"
274 }
275 _, err := fs.Open(f)
276 if err == nil {
277 fileHandler.ServeHTTP(w, req)
278 return
279 }
280 if errors.Is(err, ErrorIndex) || f == "" {
281 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
282 if err != nil {
283 log.Error(ctx, "error generating default card", "error", err)
284 }
285 w.Header().Set("Content-Type", "text/html")
286 w.Write(bs)
287 } else {
288 log.Warn(ctx, "error opening file", "error", err)
289 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
290 }
291 })
292 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
293 proto := "http"
294 if req.TLS != nil {
295 proto = "https"
296 }
297 fwProto := req.Header.Get("x-forwarded-proto")
298 if fwProto != "" {
299 proto = fwProto
300 }
301 req.URL.Host = req.Host
302 req.URL.Scheme = proto
303 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
304 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
305 if err != nil || repo == nil {
306 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
307 defaultHandler.ServeHTTP(w, req)
308 return
309 }
310 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
311 if err != nil || ls == nil {
312 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
313 defaultHandler.ServeHTTP(w, req)
314 return
315 }
316 lsv, err := ls.ToLivestreamView()
317 if err != nil || lsv == nil {
318 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
319 defaultHandler.ServeHTTP(w, req)
320 return
321 }
322 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
323 if err != nil {
324 log.Error(ctx, "error generating html", "error", err)
325 defaultHandler.ServeHTTP(w, req)
326 return
327 }
328 w.Header().Set("Content-Type", "text/html")
329 w.Write(bs)
330 }), nil
331}
332
333func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
334 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
335 if !a.CLI.HasMist() {
336 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
337 return
338 }
339 stream := params.ByName("stream")
340 if stream == "" {
341 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
342 return
343 }
344
345 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream)
346 prefix := fmt.Sprintf(tmpl, fullstream)
347 resource := params.ByName("resource")
348
349 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
350
351 client := &http.Client{}
352 req.URL = &url.URL{
353 Scheme: "http",
354 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
355 Path: fmt.Sprintf("%s%s", prefix, resource),
356 RawQuery: req.URL.RawQuery,
357 }
358
359 //http: Request.RequestURI can't be set in client requests.
360 //http://golang.org/src/pkg/net/http/client.go
361 req.RequestURI = ""
362
363 resp, err := client.Do(req)
364 if err != nil {
365 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
366 return
367 }
368 defer resp.Body.Close()
369
370 copyHeader(w.Header(), resp.Header)
371 w.WriteHeader(resp.StatusCode)
372 io.Copy(w, resp.Body)
373 }
374}
375
376func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
377 return func(w http.ResponseWriter, req *http.Request) {
378 noslash := req.URL.Path[1:]
379 ct, ok := a.Mimes[noslash]
380 if ok {
381 w.Header().Set("content-type", ct)
382 }
383 fs.ServeHTTP(w, req)
384 }
385}
386
387func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
388 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr)
389 if err != nil {
390 return nil, err
391 }
392 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
393 host, _, err := net.SplitHostPort(req.Host)
394 if err != nil {
395 host = req.Host
396 }
397 u := req.URL
398 if tlsPort == "443" {
399 u.Host = host
400 } else {
401 u.Host = net.JoinHostPort(host, tlsPort)
402 }
403 u.Scheme = "https"
404 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
405 }
406 mux := http.NewServeMux()
407 mux.HandleFunc("/", handleRedirect)
408 return mux, nil
409}
410
411type NotificationPayload struct {
412 Token string `json:"token"`
413 RepoDID string `json:"repoDID"`
414}
415
416func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
417 return func(w http.ResponseWriter, req *http.Request) {
418 w.WriteHeader(404)
419 }
420}
421
422func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
423 return func(w http.ResponseWriter, req *http.Request) {
424 payload, err := io.ReadAll(req.Body)
425 if err != nil {
426 log.Log(ctx, "error reading notification create", "error", err)
427 w.WriteHeader(400)
428 return
429 }
430 n := NotificationPayload{}
431 err = json.Unmarshal(payload, &n)
432 if err != nil {
433 log.Log(ctx, "error unmarshalling notification create", "error", err)
434 w.WriteHeader(400)
435 return
436 }
437 err = a.Model.CreateNotification(n.Token, n.RepoDID)
438 if err != nil {
439 log.Log(ctx, "error creating notification", "error", err)
440 w.WriteHeader(400)
441 return
442 }
443 log.Log(ctx, "successfully created notification", "token", n.Token)
444 w.WriteHeader(200)
445 if n.RepoDID != "" {
446 go func() {
447 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
448 if err != nil {
449 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
450 }
451 }()
452 }
453 }
454}
455
456func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
457 return func(w http.ResponseWriter, req *http.Request) {
458 err := a.MediaManager.ValidateMP4(ctx, req.Body)
459 if err != nil {
460 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
461 return
462 }
463 w.WriteHeader(200)
464 }
465}
466
467func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
468 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
469 var event model.PlayerEventAPI
470 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
471 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
472 return
473 }
474 err := a.Model.CreatePlayerEvent(event)
475 if err != nil {
476 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
477 return
478 }
479 w.WriteHeader(201)
480 }
481}
482
483func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
484 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
485 segs, err := a.Model.MostRecentSegments()
486 if err != nil {
487 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
488 return
489 }
490 bs, err := json.Marshal(segs)
491 if err != nil {
492 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
493 return
494 }
495 w.Header().Add("Content-Type", "application/json")
496 w.Write(bs)
497 }
498}
499
500func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
501 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
502 user := params.ByName("repoDID")
503 if user == "" {
504 apierrors.WriteHTTPBadRequest(w, "user required", nil)
505 return
506 }
507 user, err := a.NormalizeUser(ctx, user)
508 if err != nil {
509 apierrors.WriteHTTPNotFound(w, "user not found", err)
510 return
511 }
512 seg, err := a.Model.LatestSegmentForUser(user)
513 if err != nil {
514 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
515 return
516 }
517 streamplaceSeg, err := seg.ToStreamplaceSegment()
518 if err != nil {
519 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
520 return
521 }
522 bs, err := json.Marshal(streamplaceSeg)
523 if err != nil {
524 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
525 return
526 }
527 w.Header().Add("Content-Type", "application/json")
528 w.Write(bs)
529 }
530}
531
532func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
533 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
534 user := params.ByName("user")
535 if user == "" {
536 apierrors.WriteHTTPBadRequest(w, "user required", nil)
537 return
538 }
539 user, err := a.NormalizeUser(ctx, user)
540 if err != nil {
541 apierrors.WriteHTTPNotFound(w, "user not found", err)
542 return
543 }
544 count := spmetrics.GetViewCount(user)
545 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
546 if err != nil {
547 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
548 return
549 }
550 w.Write(bs)
551 }
552}
553
554func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
555 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
556 log.Log(ctx, "got bluesky notification", "params", params)
557 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
558 if err != nil {
559 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
560 return
561 }
562 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
563 if err != nil {
564 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
565 return
566 }
567 bs, err := json.Marshal(signingKeys)
568 if err != nil {
569 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
570 return
571 }
572 w.Write(bs)
573 }
574}
575
576type ChatResponse struct {
577 Post *bsky.FeedPost `json:"post"`
578 Repo *model.Repo `json:"repo"`
579 CID string `json:"cid"`
580}
581
582func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
583 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
584 user := params.ByName("repoDID")
585 if user == "" {
586 apierrors.WriteHTTPBadRequest(w, "user required", nil)
587 return
588 }
589 repoDID, err := a.NormalizeUser(ctx, user)
590 if err != nil {
591 apierrors.WriteHTTPNotFound(w, "user not found", err)
592 return
593 }
594 replies, err := a.Model.GetReplies(repoDID)
595 if err != nil {
596 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
597 return
598 }
599 bs, err := json.Marshal(replies)
600 if err != nil {
601 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
602 return
603 }
604 w.Header().Set("Content-Type", "application/json")
605 w.Write(bs)
606 }
607}
608
609func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
610 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
611 user := params.ByName("repoDID")
612 if user == "" {
613 apierrors.WriteHTTPBadRequest(w, "user required", nil)
614 return
615 }
616 repoDID, err := a.NormalizeUser(ctx, user)
617 if err != nil {
618 apierrors.WriteHTTPNotFound(w, "user not found", err)
619 return
620 }
621 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
622 if err != nil {
623 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
624 return
625 }
626
627 doc, err := livestream.ToLivestreamView()
628 if err != nil {
629 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
630 return
631 }
632
633 if livestream == nil {
634 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
635 return
636 }
637
638 bs, err := json.Marshal(doc)
639 if err != nil {
640 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
641 return
642 }
643 w.Header().Set("Content-Type", "application/json")
644 w.Write(bs)
645 }
646}
647
648func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
649 return func(next http.Handler) http.Handler {
650 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
651 ip, _, err := net.SplitHostPort(req.RemoteAddr)
652 if err != nil {
653 ip = req.RemoteAddr
654 }
655
656 if a.CLI.RateLimitPerSecond > 0 {
657 limiter := a.getLimiter(ip)
658
659 if !limiter.Allow() {
660 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
661 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
662 return
663 }
664 }
665
666 next.ServeHTTP(w, req)
667 })
668 }
669}
670
671func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
672 handler, err := a.Handler(ctx)
673 if err != nil {
674 return err
675 }
676 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
677 s.Addr = a.CLI.HttpAddr
678 log.Log(ctx, "http server starting", "addr", s.Addr)
679 return s.ListenAndServe()
680 })
681}
682
683func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
684 handler, err := a.RedirectHandler(ctx)
685 if err != nil {
686 return err
687 }
688 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
689 s.Addr = a.CLI.HttpAddr
690 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
691 return s.ListenAndServe()
692 })
693}
694
695func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
696 handler, err := a.Handler(ctx)
697 if err != nil {
698 return err
699 }
700 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
701 s.Addr = a.CLI.HttpsAddr
702 log.Log(ctx, "https server starting",
703 "addr", s.Addr,
704 "certPath", a.CLI.TLSCertPath,
705 "keyPath", a.CLI.TLSKeyPath,
706 )
707 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
708 })
709}
710
711func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
712 ctx, cancel := context.WithCancel(ctx)
713 handler = gziphandler.GzipHandler(handler)
714 server := http.Server{Handler: handler}
715 var serveErr error
716 go func() {
717 serveErr = serve(&server)
718 cancel()
719 }()
720 <-ctx.Done()
721 if serveErr != nil {
722 return fmt.Errorf("error in http server: %w", serveErr)
723 }
724
725 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
726 defer cancel()
727 return server.Shutdown(ctx)
728}
729
730func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
731 return func(w http.ResponseWriter, req *http.Request) {
732 w.WriteHeader(200)
733 }
734}
735
736func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
737 a.limitersMu.Lock()
738 defer a.limitersMu.Unlock()
739
740 limiter, exists := a.limiters[ip]
741 if !exists {
742 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
743 a.limiters[ip] = limiter
744 }
745
746 return limiter
747}
748
749func NewWebsocketTracker(maxConns int) *WebsocketTracker {
750 return &WebsocketTracker{
751 connections: make(map[string]int),
752 maxConnsPerIP: maxConns,
753 }
754}
755
756func (t *WebsocketTracker) AddConnection(ip string) bool {
757 t.mu.Lock()
758 defer t.mu.Unlock()
759
760 count := t.connections[ip]
761
762 if count >= t.maxConnsPerIP {
763 return false
764 }
765
766 t.connections[ip] = count + 1
767 return true
768}
769
770func (t *WebsocketTracker) RemoveConnection(ip string) {
771 t.mu.Lock()
772 defer t.mu.Unlock()
773
774 count := t.connections[ip]
775 if count > 0 {
776 t.connections[ip] = count - 1
777 }
778
779 if t.connections[ip] == 0 {
780 delete(t.connections, ip)
781 }
782}