Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/google/uuid"
22 "github.com/julienschmidt/httprouter"
23 "github.com/rs/cors"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/time/rate"
26
27 "github.com/streamplace/oatproxy/pkg/oatproxy"
28 "stream.place/streamplace/js/app"
29 "stream.place/streamplace/pkg/atproto"
30 "stream.place/streamplace/pkg/bus"
31 "stream.place/streamplace/pkg/config"
32 "stream.place/streamplace/pkg/crypto/signers/eip712"
33 "stream.place/streamplace/pkg/director"
34 apierrors "stream.place/streamplace/pkg/errors"
35 "stream.place/streamplace/pkg/linking"
36 "stream.place/streamplace/pkg/log"
37 "stream.place/streamplace/pkg/media"
38 "stream.place/streamplace/pkg/mist/mistconfig"
39 "stream.place/streamplace/pkg/model"
40 "stream.place/streamplace/pkg/notifications"
41 "stream.place/streamplace/pkg/spmetrics"
42 "stream.place/streamplace/pkg/spxrpc"
43 "stream.place/streamplace/pkg/streamplace"
44)
45
46type StreamplaceAPI struct {
47 CLI *config.CLI
48 Model model.Model
49 Updater *Updater
50 Signer *eip712.EIP712Signer
51 Mimes map[string]string
52 FirebaseNotifier notifications.FirebaseNotifier
53 MediaManager *media.MediaManager
54 MediaSigner media.MediaSigner
55 // not thread-safe yet
56 Aliases map[string]string
57 Bus *bus.Bus
58 ATSync *atproto.ATProtoSynchronizer
59 Director *director.Director
60
61 connTracker *WebsocketTracker
62
63 limiters map[string]*rate.Limiter
64 limitersMu sync.Mutex
65 SignerCache map[string]media.MediaSigner
66 SignerCacheMu sync.Mutex
67 op *oatproxy.OATProxy
68}
69
70type WebsocketTracker struct {
71 connections map[string]int
72 maxConnsPerIP int
73 mu sync.RWMutex
74}
75
76func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
77 updater, err := PrepareUpdater(cli)
78 if err != nil {
79 return nil, err
80 }
81 a := &StreamplaceAPI{CLI: cli,
82 Model: mod,
83 Updater: updater,
84 Signer: signer,
85 FirebaseNotifier: noter,
86 MediaManager: mm,
87 MediaSigner: ms,
88 Aliases: map[string]string{},
89 Bus: bus,
90 ATSync: atsync,
91 Director: d,
92 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
93 limiters: make(map[string]*rate.Limiter),
94 SignerCache: make(map[string]media.MediaSigner),
95 op: op,
96 }
97 a.Mimes, err = updater.GetMimes()
98 if err != nil {
99 return nil, err
100 }
101 return a, nil
102}
103
104type AppHostingFS struct {
105 http.FileSystem
106}
107
108var ErrorIndex = errors.New("not found, use index.html")
109
110func (fs AppHostingFS) Open(name string) (http.File, error) {
111 file, err1 := fs.FileSystem.Open(name)
112 if err1 == nil {
113 return file, nil
114 }
115 if !errors.Is(err1, os.ErrNotExist) {
116 return nil, err1
117 }
118
119 return nil, ErrorIndex
120}
121
122// api/playback/iame.li/webrtc?rendition=source
123// api/playback/iame.li/stream.mp4?rendition=source
124// api/playback/iame.li/stream.webm?rendition=source
125// api/playback/iame.li/hls/index.m3u8
126// api/playback/iame.li/hls/source/stream.m3u8
127// api/playback/iame.li/hls/source/000000000000.ts
128
129func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
130
131 var xrpc http.Handler
132 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op)
133 if err != nil {
134 return nil, err
135 }
136 router := httprouter.New()
137
138 router.Handler("GET", "/oauth/*anything", a.op.Handler())
139 router.Handler("POST", "/oauth/*anything", a.op.Handler())
140 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
141 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
142 apiRouter := httprouter.New()
143 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
144 // old clients
145 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
146
147 // new ones
148 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
149 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
150 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
151 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
152 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
153 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
154 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
155 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
156 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
157 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
158 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
159 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
160 // this one is not a lie
161 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
162 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
163 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
164 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
165 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
166 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
167 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
168 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
169 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
170 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
171 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
172 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
173 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
174 apiRouter.NotFound = a.HandleAPI404(ctx)
175 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
176 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
177 router.Handler("GET", "/api/*resource", apiRouterHandler)
178 router.Handler("POST", "/api/*resource", apiRouterHandler)
179 router.Handler("PUT", "/api/*resource", apiRouterHandler)
180 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
181 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
182 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
183 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
184 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
185 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
186 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
187 router.GET("/.well-known/did.json", a.HandleDidJson(ctx))
188 router.GET("/dl/*params", a.HandleAppDownload(ctx))
189 router.POST("/", a.HandleWebRTCIngest(ctx))
190 for _, redirect := range a.CLI.Redirects {
191 parts := strings.Split(redirect, ":")
192 if len(parts) != 2 {
193 log.Error(ctx, "invalid redirect", "redirect", redirect)
194 return nil, fmt.Errorf("invalid redirect: %s", redirect)
195 }
196 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
197 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
198 })
199 }
200 if a.CLI.FrontendProxy != "" {
201 u, err := url.Parse(a.CLI.FrontendProxy)
202 if err != nil {
203 return nil, err
204 }
205 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
206 router.NotFound = &httputil.ReverseProxy{
207 Rewrite: func(r *httputil.ProxyRequest) {
208 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
209 // we need to use 127.0.0.1 because the atproto oauth client requires it
210 r.Out.Header.Set("Origin", u.String())
211 r.SetURL(u)
212 },
213 }
214 } else {
215 files, err := app.Files()
216 if err != nil {
217 return nil, err
218 }
219 index, err := files.Open("index.html")
220 if err != nil {
221 return nil, err
222 }
223 bs, err := io.ReadAll(index)
224 if err != nil {
225 return nil, err
226 }
227 linker, err := linking.NewLinker(ctx, bs)
228 if err != nil {
229 return nil, err
230 }
231 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
232 if err != nil {
233 return nil, err
234 }
235 router.NotFound = linkingHandler
236 }
237 // needed because the WebRTC handler issues 405s from / otherwise
238 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
239 router.NotFound.ServeHTTP(w, r)
240 })
241 handler := sloghttp.Recovery(router)
242 handler = cors.AllowAll().Handler(handler)
243 handler = sloghttp.New(slog.Default())(handler)
244 handler = a.RateLimitMiddleware(ctx)(handler)
245
246 // this needs to be LAST so nothing else clobbers the context
247 handler = a.ContextMiddleware(ctx)(handler)
248
249 return handler, nil
250}
251func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler {
252 return func(next http.Handler) http.Handler {
253 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
254 uuid := uuid.New().String()
255 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
256 r = r.WithContext(ctx)
257 next.ServeHTTP(w, r)
258 })
259 }
260}
261func copyHeader(dst, src http.Header) {
262 for k, vv := range src {
263 // we'll handle CORS ourselves, thanks
264 if strings.HasPrefix(k, "Access-Control") {
265 continue
266 }
267 for _, v := range vv {
268 dst.Add(k, v)
269 }
270 }
271}
272
273// handler that takes care of static files and otherwise returns the index.html with the correct link card data
274func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
275 files, err := app.Files()
276 if err != nil {
277 return nil, err
278 }
279 fs := AppHostingFS{http.FS(files)}
280
281 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
282 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
283 f := strings.TrimPrefix(req.URL.Path, "/")
284 // under docs we need the index.html suffix due to astro rendering
285 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
286 f += "index.html"
287 }
288 _, err := fs.Open(f)
289 if err == nil {
290 fileHandler.ServeHTTP(w, req)
291 return
292 }
293 if errors.Is(err, ErrorIndex) || f == "" {
294 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
295 if err != nil {
296 log.Error(ctx, "error generating default card", "error", err)
297 }
298 w.Header().Set("Content-Type", "text/html")
299 w.Write(bs)
300 } else {
301 log.Warn(ctx, "error opening file", "error", err)
302 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
303 }
304 })
305 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
306 proto := "http"
307 if req.TLS != nil {
308 proto = "https"
309 }
310 fwProto := req.Header.Get("x-forwarded-proto")
311 if fwProto != "" {
312 proto = fwProto
313 }
314 req.URL.Host = req.Host
315 req.URL.Scheme = proto
316 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
317 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
318 if err != nil || repo == nil {
319 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
320 defaultHandler.ServeHTTP(w, req)
321 return
322 }
323 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
324 if err != nil || ls == nil {
325 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
326 defaultHandler.ServeHTTP(w, req)
327 return
328 }
329 lsv, err := ls.ToLivestreamView()
330 if err != nil || lsv == nil {
331 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
332 defaultHandler.ServeHTTP(w, req)
333 return
334 }
335 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
336 if err != nil {
337 log.Error(ctx, "error generating html", "error", err)
338 defaultHandler.ServeHTTP(w, req)
339 return
340 }
341 w.Header().Set("Content-Type", "text/html")
342 w.Write(bs)
343 }), nil
344}
345
346func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
347 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
348 if !a.CLI.HasMist() {
349 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
350 return
351 }
352 stream := params.ByName("stream")
353 if stream == "" {
354 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
355 return
356 }
357
358 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream)
359 prefix := fmt.Sprintf(tmpl, fullstream)
360 resource := params.ByName("resource")
361
362 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
363
364 client := &http.Client{}
365 req.URL = &url.URL{
366 Scheme: "http",
367 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
368 Path: fmt.Sprintf("%s%s", prefix, resource),
369 RawQuery: req.URL.RawQuery,
370 }
371
372 //http: Request.RequestURI can't be set in client requests.
373 //http://golang.org/src/pkg/net/http/client.go
374 req.RequestURI = ""
375
376 resp, err := client.Do(req)
377 if err != nil {
378 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
379 return
380 }
381 defer resp.Body.Close()
382
383 copyHeader(w.Header(), resp.Header)
384 w.WriteHeader(resp.StatusCode)
385 io.Copy(w, resp.Body)
386 }
387}
388
389func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
390 return func(w http.ResponseWriter, req *http.Request) {
391 noslash := req.URL.Path[1:]
392 ct, ok := a.Mimes[noslash]
393 if ok {
394 w.Header().Set("content-type", ct)
395 }
396 fs.ServeHTTP(w, req)
397 }
398}
399
400func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
401 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr)
402 if err != nil {
403 return nil, err
404 }
405 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
406 host, _, err := net.SplitHostPort(req.Host)
407 if err != nil {
408 host = req.Host
409 }
410 u := req.URL
411 if tlsPort == "443" {
412 u.Host = host
413 } else {
414 u.Host = net.JoinHostPort(host, tlsPort)
415 }
416 u.Scheme = "https"
417 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
418 }
419 mux := http.NewServeMux()
420 mux.HandleFunc("/", handleRedirect)
421 return mux, nil
422}
423
424type NotificationPayload struct {
425 Token string `json:"token"`
426 RepoDID string `json:"repoDID"`
427}
428
429func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
430 return func(w http.ResponseWriter, req *http.Request) {
431 w.WriteHeader(404)
432 }
433}
434
435func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
436 return func(w http.ResponseWriter, req *http.Request) {
437 payload, err := io.ReadAll(req.Body)
438 if err != nil {
439 log.Log(ctx, "error reading notification create", "error", err)
440 w.WriteHeader(400)
441 return
442 }
443 n := NotificationPayload{}
444 err = json.Unmarshal(payload, &n)
445 if err != nil {
446 log.Log(ctx, "error unmarshalling notification create", "error", err)
447 w.WriteHeader(400)
448 return
449 }
450 err = a.Model.CreateNotification(n.Token, n.RepoDID)
451 if err != nil {
452 log.Log(ctx, "error creating notification", "error", err)
453 w.WriteHeader(400)
454 return
455 }
456 log.Log(ctx, "successfully created notification", "token", n.Token)
457 w.WriteHeader(200)
458 if n.RepoDID != "" {
459 go func() {
460 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
461 if err != nil {
462 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
463 }
464 }()
465 }
466 }
467}
468
469func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
470 return func(w http.ResponseWriter, req *http.Request) {
471 err := a.MediaManager.ValidateMP4(ctx, req.Body)
472 if err != nil {
473 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
474 return
475 }
476 w.WriteHeader(200)
477 }
478}
479
480func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
481 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
482 var event model.PlayerEventAPI
483 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
484 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
485 return
486 }
487 err := a.Model.CreatePlayerEvent(event)
488 if err != nil {
489 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
490 return
491 }
492 w.WriteHeader(201)
493 }
494}
495
496func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
497 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
498 segs, err := a.Model.MostRecentSegments()
499 if err != nil {
500 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
501 return
502 }
503 bs, err := json.Marshal(segs)
504 if err != nil {
505 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
506 return
507 }
508 w.Header().Add("Content-Type", "application/json")
509 w.Write(bs)
510 }
511}
512
513func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
514 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
515 user := params.ByName("repoDID")
516 if user == "" {
517 apierrors.WriteHTTPBadRequest(w, "user required", nil)
518 return
519 }
520 user, err := a.NormalizeUser(ctx, user)
521 if err != nil {
522 apierrors.WriteHTTPNotFound(w, "user not found", err)
523 return
524 }
525 seg, err := a.Model.LatestSegmentForUser(user)
526 if err != nil {
527 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
528 return
529 }
530 streamplaceSeg, err := seg.ToStreamplaceSegment()
531 if err != nil {
532 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
533 return
534 }
535 bs, err := json.Marshal(streamplaceSeg)
536 if err != nil {
537 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
538 return
539 }
540 w.Header().Add("Content-Type", "application/json")
541 w.Write(bs)
542 }
543}
544
545func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
546 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
547 user := params.ByName("user")
548 if user == "" {
549 apierrors.WriteHTTPBadRequest(w, "user required", nil)
550 return
551 }
552 user, err := a.NormalizeUser(ctx, user)
553 if err != nil {
554 apierrors.WriteHTTPNotFound(w, "user not found", err)
555 return
556 }
557 count := spmetrics.GetViewCount(user)
558 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
559 if err != nil {
560 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
561 return
562 }
563 w.Write(bs)
564 }
565}
566
567func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
568 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
569 log.Log(ctx, "got bluesky notification", "params", params)
570 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
571 if err != nil {
572 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
573 return
574 }
575 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
576 if err != nil {
577 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
578 return
579 }
580 bs, err := json.Marshal(signingKeys)
581 if err != nil {
582 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
583 return
584 }
585 w.Write(bs)
586 }
587}
588
589type ChatResponse struct {
590 Post *bsky.FeedPost `json:"post"`
591 Repo *model.Repo `json:"repo"`
592 CID string `json:"cid"`
593}
594
595func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
596 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
597 user := params.ByName("repoDID")
598 if user == "" {
599 apierrors.WriteHTTPBadRequest(w, "user required", nil)
600 return
601 }
602 repoDID, err := a.NormalizeUser(ctx, user)
603 if err != nil {
604 apierrors.WriteHTTPNotFound(w, "user not found", err)
605 return
606 }
607 replies, err := a.Model.GetReplies(repoDID)
608 if err != nil {
609 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
610 return
611 }
612 bs, err := json.Marshal(replies)
613 if err != nil {
614 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
615 return
616 }
617 w.Header().Set("Content-Type", "application/json")
618 w.Write(bs)
619 }
620}
621
622func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
623 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
624 user := params.ByName("repoDID")
625 if user == "" {
626 apierrors.WriteHTTPBadRequest(w, "user required", nil)
627 return
628 }
629 repoDID, err := a.NormalizeUser(ctx, user)
630 if err != nil {
631 apierrors.WriteHTTPNotFound(w, "user not found", err)
632 return
633 }
634 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
635 if err != nil {
636 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
637 return
638 }
639
640 doc, err := livestream.ToLivestreamView()
641 if err != nil {
642 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
643 return
644 }
645
646 if livestream == nil {
647 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
648 return
649 }
650
651 bs, err := json.Marshal(doc)
652 if err != nil {
653 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
654 return
655 }
656 w.Header().Set("Content-Type", "application/json")
657 w.Write(bs)
658 }
659}
660
661func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
662 return func(next http.Handler) http.Handler {
663 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
664 ip, _, err := net.SplitHostPort(req.RemoteAddr)
665 if err != nil {
666 ip = req.RemoteAddr
667 }
668
669 if a.CLI.RateLimitPerSecond > 0 {
670 limiter := a.getLimiter(ip)
671
672 if !limiter.Allow() {
673 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
674 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
675 return
676 }
677 }
678
679 next.ServeHTTP(w, req)
680 })
681 }
682}
683
684func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
685 handler, err := a.Handler(ctx)
686 if err != nil {
687 return err
688 }
689 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
690 s.Addr = a.CLI.HttpAddr
691 log.Log(ctx, "http server starting", "addr", s.Addr)
692 return s.ListenAndServe()
693 })
694}
695
696func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
697 handler, err := a.RedirectHandler(ctx)
698 if err != nil {
699 return err
700 }
701 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
702 s.Addr = a.CLI.HttpAddr
703 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
704 return s.ListenAndServe()
705 })
706}
707
708func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
709 handler, err := a.Handler(ctx)
710 if err != nil {
711 return err
712 }
713 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
714 s.Addr = a.CLI.HttpsAddr
715 log.Log(ctx, "https server starting",
716 "addr", s.Addr,
717 "certPath", a.CLI.TLSCertPath,
718 "keyPath", a.CLI.TLSKeyPath,
719 )
720 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
721 })
722}
723
724func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
725 ctx, cancel := context.WithCancel(ctx)
726 handler = gziphandler.GzipHandler(handler)
727 server := http.Server{Handler: handler}
728 var serveErr error
729 go func() {
730 serveErr = serve(&server)
731 cancel()
732 }()
733 <-ctx.Done()
734 if serveErr != nil {
735 return fmt.Errorf("error in http server: %w", serveErr)
736 }
737
738 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
739 defer cancel()
740 return server.Shutdown(ctx)
741}
742
743func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
744 return func(w http.ResponseWriter, req *http.Request) {
745 w.WriteHeader(200)
746 }
747}
748
749func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
750 a.limitersMu.Lock()
751 defer a.limitersMu.Unlock()
752
753 limiter, exists := a.limiters[ip]
754 if !exists {
755 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
756 a.limiters[ip] = limiter
757 }
758
759 return limiter
760}
761
762func NewWebsocketTracker(maxConns int) *WebsocketTracker {
763 return &WebsocketTracker{
764 connections: make(map[string]int),
765 maxConnsPerIP: maxConns,
766 }
767}
768
769func (t *WebsocketTracker) AddConnection(ip string) bool {
770 t.mu.Lock()
771 defer t.mu.Unlock()
772
773 count := t.connections[ip]
774
775 if count >= t.maxConnsPerIP {
776 return false
777 }
778
779 t.connections[ip] = count + 1
780 return true
781}
782
783func (t *WebsocketTracker) RemoveConnection(ip string) {
784 t.mu.Lock()
785 defer t.mu.Unlock()
786
787 count := t.connections[ip]
788 if count > 0 {
789 t.connections[ip] = count - 1
790 }
791
792 if t.connections[ip] == 0 {
793 delete(t.connections, ip)
794 }
795}