Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "regexp"
16 "strconv"
17 "strings"
18 "sync"
19 "time"
20
21 "github.com/NYTimes/gziphandler"
22 "github.com/bluesky-social/indigo/api/bsky"
23 "github.com/google/uuid"
24 "github.com/julienschmidt/httprouter"
25 "github.com/labstack/echo/v4"
26 "github.com/rs/cors"
27 sloghttp "github.com/samber/slog-http"
28 "golang.org/x/time/rate"
29
30 "github.com/streamplace/oatproxy/pkg/oatproxy"
31 "stream.place/streamplace/js/app"
32 "stream.place/streamplace/pkg/atproto"
33 "stream.place/streamplace/pkg/bus"
34 "stream.place/streamplace/pkg/config"
35 "stream.place/streamplace/pkg/crypto/signers/eip712"
36 "stream.place/streamplace/pkg/director"
37 apierrors "stream.place/streamplace/pkg/errors"
38 "stream.place/streamplace/pkg/linking"
39 "stream.place/streamplace/pkg/localdb"
40 "stream.place/streamplace/pkg/log"
41 "stream.place/streamplace/pkg/media"
42 "stream.place/streamplace/pkg/mist/mistconfig"
43 "stream.place/streamplace/pkg/model"
44 "stream.place/streamplace/pkg/notifications"
45 "stream.place/streamplace/pkg/spxrpc"
46 "stream.place/streamplace/pkg/statedb"
47 "stream.place/streamplace/pkg/streamplace"
48
49 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
50 "github.com/slok/go-http-metrics/middleware"
51 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
52 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
53 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
54)
55
56type StreamplaceAPI struct {
57 CLI *config.CLI
58 Model model.Model
59 StatefulDB *statedb.StatefulDB
60 LocalDB localdb.LocalDB
61 Updater *Updater
62 Signer *eip712.EIP712Signer
63 Mimes map[string]string
64 FirebaseNotifier notifications.FirebaseNotifier
65 MediaManager *media.MediaManager
66 MediaSigner media.MediaSigner
67 XRPCServer *spxrpc.Server
68 // not thread-safe yet
69 Aliases map[string]string
70 Bus *bus.Bus
71 ATSync *atproto.ATProtoSynchronizer
72 Director *director.Director
73
74 connTracker *WebsocketTracker
75
76 limiters map[string]*rate.Limiter
77 limitersMu sync.Mutex
78 SignerCache map[string]media.MediaSigner
79 SignerCacheMu sync.Mutex
80 op *oatproxy.OATProxy
81
82 // override tls port for http redirect server if we're using systemd file descriptors
83 HTTPRedirectTLSPort *int
84 sessions map[string]map[string]time.Time
85 sessionsLock sync.RWMutex
86
87 rtmpSessions map[string]*media.RTMPSession
88 rtmpSessionsLock sync.Mutex
89 rtmpInternalPlaybackAddr string
90}
91
92type WebsocketTracker struct {
93 connections map[string]int
94 maxConnsPerIP int
95 mu sync.RWMutex
96}
97
98func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy, ldb localdb.LocalDB) (*StreamplaceAPI, error) {
99 updater, err := PrepareUpdater(cli)
100 if err != nil {
101 return nil, err
102 }
103 a := &StreamplaceAPI{CLI: cli,
104 Model: mod,
105 StatefulDB: statefulDB,
106 Updater: updater,
107 FirebaseNotifier: noter,
108 MediaManager: mm,
109 MediaSigner: ms,
110 Aliases: map[string]string{},
111 Bus: bus,
112 ATSync: atsync,
113 Director: d,
114 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
115 limiters: make(map[string]*rate.Limiter),
116 SignerCache: make(map[string]media.MediaSigner),
117 op: op,
118 sessions: make(map[string]map[string]time.Time),
119 sessionsLock: sync.RWMutex{},
120 rtmpSessions: make(map[string]*media.RTMPSession),
121 rtmpSessionsLock: sync.Mutex{},
122 LocalDB: ldb,
123 }
124 a.Mimes, err = updater.GetMimes()
125 if err != nil {
126 return nil, err
127 }
128 return a, nil
129}
130
131type AppHostingFS struct {
132 http.FileSystem
133}
134
135var ErrorIndex = errors.New("not found, use index.html")
136
137func (fs AppHostingFS) Open(name string) (http.File, error) {
138 file, err1 := fs.FileSystem.Open(name)
139 if err1 == nil {
140 return file, nil
141 }
142 return nil, ErrorIndex
143}
144
145// api/playback/iame.li/webrtc?rendition=source
146// api/playback/iame.li/stream.mp4?rendition=source
147// api/playback/iame.li/stream.webm?rendition=source
148// api/playback/iame.li/hls/index.m3u8
149// api/playback/iame.li/hls/source/stream.m3u8
150// api/playback/iame.li/hls/source/000000000000.ts
151
152func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
153
154 mdlw := middleware.New(middleware.Config{
155 Recorder: metrics.NewRecorder(metrics.Config{}),
156 })
157 var xrpc http.Handler
158 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus, a.LocalDB, a.MediaManager, a.Aliases)
159 if err != nil {
160 return nil, err
161 }
162 a.XRPCServer = xrpc.(*spxrpc.Server)
163 router := httprouter.New()
164
165 // Create our middleware factory with the default settings.
166
167 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
168
169 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
170
171 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
172 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
173 }
174 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
175 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
176 }
177
178 router.Handler("GET", "/oauth/*anything", a.op.Handler())
179 router.Handler("POST", "/oauth/*anything", a.op.Handler())
180 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
181 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
182 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
183 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
184 apiRouter := httprouter.New()
185 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
186 // old clients
187 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
188 // new ones
189 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
190 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
191 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
192 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
193 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
194 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
195 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
196 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
197 // they're jpegs now
198 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
199 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
200 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
201 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
202 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
203 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
204 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
205 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
206 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
207 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
208 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
209 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
210 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
211 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx))
212 apiRouter.NotFound = a.HandleAPI404(ctx)
213 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
214 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
215 router.Handler("GET", "/api/*resource", apiRouterHandler)
216 router.Handler("POST", "/api/*resource", apiRouterHandler)
217 router.Handler("PUT", "/api/*resource", apiRouterHandler)
218 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
219 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
220 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
221 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
222 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
223 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
224 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
225 // i wonder if there's a better way to do this?
226 router.GET("/favicon.ico", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
227 err := a.XRPCServer.HandleFaviconICO(echo.New().NewContext(r, w))
228 if err != nil {
229 log.Error(ctx, "error handling favicon.ico", "error", err)
230 w.WriteHeader(500)
231 return
232 }
233 })
234 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
235 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
236 router.GET("/dl/*params", a.HandleAppDownload(ctx))
237 router.POST("/", a.HandleWebRTCIngest(ctx))
238 if a.CLI.FrontendProxy != "" {
239 u, err := url.Parse(a.CLI.FrontendProxy)
240 if err != nil {
241 return nil, err
242 }
243 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
244 router.NotFound = &httputil.ReverseProxy{
245 Rewrite: func(r *httputil.ProxyRequest) {
246 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
247 // we need to use 127.0.0.1 because the atproto oauth client requires it
248 r.Out.Header.Set("Origin", u.String())
249 r.SetURL(u)
250 },
251 }
252 } else {
253 files, err := app.Files()
254 if err != nil {
255 return nil, err
256 }
257 index, err := files.Open("index.html")
258 if err != nil {
259 return nil, err
260 }
261 bs, err := io.ReadAll(index)
262 if err != nil {
263 return nil, err
264 }
265 linker, err := linking.NewLinker(ctx, bs, a.StatefulDB, a.CLI)
266 if err != nil {
267 return nil, err
268 }
269 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
270 if err != nil {
271 return nil, err
272 }
273 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
274 }
275 // needed because the WebRTC handler issues 405s from / otherwise
276 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
277 router.NotFound.ServeHTTP(w, r)
278 })
279 handler := sloghttp.Recovery(router)
280 handler = cors.AllowAll().Handler(handler)
281 handler = sloghttp.New(slog.Default())(handler)
282 handler = a.RateLimitMiddleware(ctx)(handler)
283 redirectMiddleware, err := a.RedirectMiddleware()
284 if err != nil {
285 return nil, err
286 }
287 handler = redirectMiddleware(handler)
288
289 // this needs to be LAST so nothing else clobbers the context
290 handler = a.ContextMiddleware(ctx)(handler)
291
292 return handler, nil
293}
294func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler {
295 return func(next http.Handler) http.Handler {
296 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
297 uuid := uuid.New().String()
298 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
299 r = r.WithContext(ctx)
300 next.ServeHTTP(w, r)
301 })
302 }
303}
304func copyHeader(dst, src http.Header) {
305 for k, vv := range src {
306 // we'll handle CORS ourselves, thanks
307 if strings.HasPrefix(k, "Access-Control") {
308 continue
309 }
310 for _, v := range vv {
311 dst.Add(k, v)
312 }
313 }
314}
315
316// handler that takes care of static files and otherwise returns the index.html with the correct link card data
317func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
318 files, err := app.Files()
319 if err != nil {
320 return nil, err
321 }
322 fs := AppHostingFS{http.FS(files)}
323
324 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
325 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
326 f := strings.TrimPrefix(req.URL.Path, "/")
327 // under docs we need the index.html suffix due to astro rendering
328 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
329 f += "index.html"
330 }
331 _, err := fs.Open(f)
332 if err == nil {
333 fileHandler.ServeHTTP(w, req)
334 return
335 }
336 if errors.Is(err, ErrorIndex) || f == "" {
337 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN)
338 if err != nil {
339 log.Error(ctx, "error generating default card", "error", err)
340 }
341 w.Header().Set("Content-Type", "text/html")
342 if _, err := w.Write(bs); err != nil {
343 log.Error(ctx, "error writing response", "error", err)
344 }
345 } else {
346 log.Warn(ctx, "error opening file", "error", err)
347 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
348 }
349 })
350 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
351 proto := "http"
352 if req.TLS != nil {
353 proto = "https"
354 }
355 fwProto := req.Header.Get("x-forwarded-proto")
356 if fwProto != "" {
357 proto = fwProto
358 }
359 req.URL.Host = req.Host
360 req.URL.Scheme = proto
361 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
362 // quick check for things that aren't valid handles/dids
363 if strings.ContainsAny(maybeHandle, "/_") {
364 defaultHandler.ServeHTTP(w, req)
365 return
366 }
367 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
368 if err != nil || repo == nil {
369 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
370 defaultHandler.ServeHTTP(w, req)
371 return
372 }
373 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
374 if err != nil || ls == nil {
375 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
376 defaultHandler.ServeHTTP(w, req)
377 return
378 }
379 lsv, err := ls.ToLivestreamView()
380 if err != nil || lsv == nil {
381 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
382 defaultHandler.ServeHTTP(w, req)
383 return
384 }
385 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN)
386 if err != nil {
387 log.Error(ctx, "error generating html", "error", err)
388 defaultHandler.ServeHTTP(w, req)
389 return
390 }
391 w.Header().Set("Content-Type", "text/html")
392 if _, err := w.Write(bs); err != nil {
393 log.Error(ctx, "error writing response", "error", err)
394 }
395 }), nil
396}
397
398func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
399 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
400 if !a.CLI.HasMist() {
401 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
402 return
403 }
404 stream := params.ByName("stream")
405 if stream == "" {
406 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
407 return
408 }
409
410 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
411 prefix := fmt.Sprintf(tmpl, fullstream)
412 resource := params.ByName("resource")
413
414 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
415
416 client := &http.Client{}
417 req.URL = &url.URL{
418 Scheme: "http",
419 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
420 Path: fmt.Sprintf("%s%s", prefix, resource),
421 RawQuery: req.URL.RawQuery,
422 }
423
424 //http: Request.RequestURI can't be set in client requests.
425 //http://golang.org/src/pkg/net/http/client.go
426 req.RequestURI = ""
427
428 resp, err := client.Do(req)
429 if err != nil {
430 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
431 return
432 }
433 defer resp.Body.Close()
434
435 copyHeader(w.Header(), resp.Header)
436 w.WriteHeader(resp.StatusCode)
437 if _, err := io.Copy(w, resp.Body); err != nil {
438 log.Error(ctx, "error writing response", "error", err)
439 }
440 }
441}
442
443func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
444 return func(w http.ResponseWriter, req *http.Request) {
445 noslash := req.URL.Path[1:]
446 ct, ok := a.Mimes[noslash]
447 if ok {
448 w.Header().Set("content-type", ct)
449 }
450 fs.ServeHTTP(w, req)
451 }
452}
453
454func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
455 var tlsPort string
456 var err error
457 if a.HTTPRedirectTLSPort != nil {
458 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort)
459 } else {
460 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr)
461 if err != nil {
462 return nil, err
463 }
464 }
465 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
466 host, _, err := net.SplitHostPort(req.Host)
467 if err != nil {
468 host = req.Host
469 }
470 u := req.URL
471 if tlsPort == "443" {
472 u.Host = host
473 } else {
474 u.Host = net.JoinHostPort(host, tlsPort)
475 }
476 u.Scheme = "https"
477 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
478 }
479 mux := http.NewServeMux()
480 mux.HandleFunc("/", handleRedirect)
481 return mux, nil
482}
483
484type NotificationPayload struct {
485 Token string `json:"token"`
486 RepoDID string `json:"repoDID"`
487}
488
489func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
490 return func(w http.ResponseWriter, req *http.Request) {
491 w.WriteHeader(404)
492 }
493}
494
495func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
496 return func(w http.ResponseWriter, req *http.Request) {
497 payload, err := io.ReadAll(req.Body)
498 if err != nil {
499 log.Log(ctx, "error reading notification create", "error", err)
500 w.WriteHeader(400)
501 return
502 }
503 n := NotificationPayload{}
504 err = json.Unmarshal(payload, &n)
505 if err != nil {
506 log.Log(ctx, "error unmarshalling notification create", "error", err)
507 w.WriteHeader(400)
508 return
509 }
510 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID)
511 if err != nil {
512 log.Log(ctx, "error creating notification", "error", err)
513 w.WriteHeader(400)
514 return
515 }
516 log.Log(ctx, "successfully created notification", "token", n.Token)
517 w.WriteHeader(200)
518 if n.RepoDID != "" {
519 go func() {
520 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
521 if err != nil {
522 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
523 }
524 }()
525 }
526 }
527}
528
529func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
530 return func(w http.ResponseWriter, req *http.Request) {
531 err := a.MediaManager.ValidateMP4(ctx, req.Body, false)
532 if err != nil {
533 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
534 return
535 }
536 w.WriteHeader(200)
537 }
538}
539
540func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
541 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
542 if !a.CLI.PlayerTelemetry {
543 w.WriteHeader(200)
544 return
545 }
546 var event model.PlayerEventAPI
547 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
548 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
549 return
550 }
551 err := a.Model.CreatePlayerEvent(event)
552 if err != nil {
553 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
554 return
555 }
556 w.WriteHeader(201)
557 }
558}
559
560func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
561 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
562 user := params.ByName("user")
563 if user == "" {
564 apierrors.WriteHTTPBadRequest(w, "user required", nil)
565 return
566 }
567 user, err := a.NormalizeUser(ctx, user)
568 if err != nil {
569 apierrors.WriteHTTPNotFound(w, "user not found", err)
570 return
571 }
572 count := a.Bus.GetViewerCount(user)
573 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
574 if err != nil {
575 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
576 return
577 }
578 if _, err := w.Write(bs); err != nil {
579 log.Error(ctx, "error writing response", "error", err)
580 }
581 }
582}
583
584func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
585 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
586 log.Log(ctx, "got bluesky notification", "params", params)
587 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
588 if err != nil {
589 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
590 return
591 }
592 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
593 if err != nil {
594 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
595 return
596 }
597 bs, err := json.Marshal(signingKeys)
598 if err != nil {
599 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
600 return
601 }
602 if _, err := w.Write(bs); err != nil {
603 log.Error(ctx, "error writing response", "error", err)
604 }
605 }
606}
607
608type ChatResponse struct {
609 Post *bsky.FeedPost `json:"post"`
610 Repo *model.Repo `json:"repo"`
611 CID string `json:"cid"`
612}
613
614func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
615 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
616 user := params.ByName("repoDID")
617 if user == "" {
618 apierrors.WriteHTTPBadRequest(w, "user required", nil)
619 return
620 }
621 repoDID, err := a.NormalizeUser(ctx, user)
622 if err != nil {
623 apierrors.WriteHTTPNotFound(w, "user not found", err)
624 return
625 }
626 replies, err := a.Model.GetReplies(repoDID)
627 if err != nil {
628 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
629 return
630 }
631 bs, err := json.Marshal(replies)
632 if err != nil {
633 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
634 return
635 }
636 w.Header().Set("Content-Type", "application/json")
637 if _, err := w.Write(bs); err != nil {
638 log.Error(ctx, "error writing response", "error", err)
639 }
640 }
641}
642
643func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
644 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
645 user := params.ByName("repoDID")
646 if user == "" {
647 apierrors.WriteHTTPBadRequest(w, "user required", nil)
648 return
649 }
650 repoDID, err := a.NormalizeUser(ctx, user)
651 if err != nil {
652 apierrors.WriteHTTPNotFound(w, "user not found", err)
653 return
654 }
655 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
656 if err != nil {
657 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
658 return
659 }
660 if livestream == nil {
661 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
662 return
663 }
664
665 doc, err := livestream.ToLivestreamView()
666 if err != nil {
667 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
668 return
669 }
670
671 if livestream == nil {
672 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
673 return
674 }
675
676 bs, err := json.Marshal(doc)
677 if err != nil {
678 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
679 return
680 }
681 w.Header().Set("Content-Type", "application/json")
682 if _, err := w.Write(bs); err != nil {
683 log.Error(ctx, "error writing response", "error", err)
684 }
685 }
686}
687
688func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
689 return func(next http.Handler) http.Handler {
690 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
691 ip, _, err := net.SplitHostPort(req.RemoteAddr)
692 if err != nil {
693 ip = req.RemoteAddr
694 }
695
696 if a.CLI.RateLimitPerSecond > 0 {
697 limiter := a.getLimiter(ip)
698
699 if !limiter.Allow() {
700 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
701 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
702 return
703 }
704 }
705
706 next.ServeHTTP(w, req)
707 })
708 }
709}
710
711type redirectRule struct {
712 re *regexp.Regexp
713 toURL *url.URL
714 rawTo string
715}
716
717// RedirectMiddleware returns a middleware that handles path redirects according to CLI.Redirects
718func (a *StreamplaceAPI) RedirectMiddleware() (func(http.Handler) http.Handler, error) {
719 var redirectRules []redirectRule
720 for from, to := range a.CLI.Redirects {
721 re, err := regexp.Compile(from)
722 if err != nil {
723 return nil, fmt.Errorf("invalid redirect pattern: %s (regex error: %w)", from, err)
724 }
725 toBase, err := url.Parse(to)
726 if err != nil {
727 return nil, fmt.Errorf("invalid redirect destination: %s", to)
728 }
729 redirectRules = append(redirectRules, redirectRule{re: re, toURL: toBase, rawTo: to})
730 }
731
732 return func(next http.Handler) http.Handler {
733 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
734 // Only match redirections for GET requests
735 if r.Method == http.MethodGet {
736 for _, rule := range redirectRules {
737 if rule.re.MatchString(r.URL.Path) {
738 // Make new URL by copying base and setting url query param
739 redirectURL := *rule.toURL
740 q := redirectURL.Query()
741 q.Set("url", r.URL.String())
742 redirectURL.RawQuery = q.Encode()
743 http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect)
744 return
745 }
746 }
747 }
748 next.ServeHTTP(w, r)
749 })
750 }, nil
751}
752
753// helper for getting a listener from a systemd file descriptor
754func getListenerFromFD(fdName string) (net.Listener, error) {
755 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES"))
756 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) {
757 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":")
758 for i, name := range names {
759 if name == fdName {
760 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3)
761 f1 := os.NewFile(uintptr(i+3), fdName)
762 return net.FileListener(f1)
763 }
764 }
765 }
766 return nil, nil
767}
768
769func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
770 handler, err := a.Handler(ctx)
771 if err != nil {
772 return err
773 }
774 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
775 ln, err := getListenerFromFD("http")
776 if err != nil {
777 return err
778 }
779 if ln == nil {
780 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
781 if err != nil {
782 return err
783 }
784 } else {
785 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr())
786 }
787 log.Log(ctx, "http server starting", "addr", ln.Addr())
788 return s.Serve(ln)
789 })
790}
791
792func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
793 handler, err := a.RedirectHandler(ctx)
794 if err != nil {
795 return err
796 }
797 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
798 ln, err := getListenerFromFD("http")
799 if err != nil {
800 return err
801 }
802 if ln == nil {
803 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
804 if err != nil {
805 return err
806 }
807 } else {
808 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr())
809 }
810 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr())
811 return s.Serve(ln)
812 })
813}
814
815func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
816 handler, err := a.Handler(ctx)
817 if err != nil {
818 return err
819 }
820 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
821 ln, err := getListenerFromFD("https")
822 if err != nil {
823 return err
824 }
825 if ln == nil {
826 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr)
827 if err != nil {
828 return err
829 }
830 } else {
831 // tell the redirect handler we're using systemd and they should go to 443
832 port443 := 443
833 a.HTTPRedirectTLSPort = &port443
834 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr())
835 }
836 log.Log(ctx, "https server starting",
837 "addr", ln.Addr(),
838 "certPath", a.CLI.TLSCertPath,
839 "keyPath", a.CLI.TLSKeyPath,
840 )
841 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
842 })
843}
844
845func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
846 ctx, cancel := context.WithCancel(ctx)
847 handler = gziphandler.GzipHandler(handler)
848 server := http.Server{Handler: handler}
849 var serveErr error
850 go func() {
851 serveErr = serve(&server)
852 cancel()
853 }()
854 <-ctx.Done()
855 if serveErr != nil {
856 return fmt.Errorf("error in http server: %w", serveErr)
857 }
858
859 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
860 defer cancel()
861 return server.Shutdown(ctx)
862}
863
864func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
865 return func(w http.ResponseWriter, req *http.Request) {
866 w.WriteHeader(200)
867 }
868}
869
870func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
871 a.limitersMu.Lock()
872 defer a.limitersMu.Unlock()
873
874 limiter, exists := a.limiters[ip]
875 if !exists {
876 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
877 a.limiters[ip] = limiter
878 }
879
880 return limiter
881}
882
883func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle {
884 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
885 user := params.ByName("user")
886 file := params.ByName("file")
887 if user == "" || file == "" {
888 apierrors.WriteHTTPBadRequest(w, "user and file required", nil)
889 return
890 }
891 user, err := a.NormalizeUser(ctx, user)
892 if err != nil {
893 apierrors.WriteHTTPNotFound(w, "user not found", err)
894 return
895 }
896 fPath := []string{user, "clips", file}
897 exists, err := a.CLI.DataFileExists(fPath)
898 if err != nil {
899 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err)
900 return
901 }
902 if !exists {
903 apierrors.WriteHTTPNotFound(w, "file not found", nil)
904 return
905 }
906 fd, err := os.Open(a.CLI.DataFilePath(fPath))
907 if err != nil {
908 apierrors.WriteHTTPInternalServerError(w, "could not open file", err)
909 return
910 }
911 defer fd.Close()
912 w.Header().Set("Content-Type", "video/mp4")
913 if _, err := io.Copy(w, fd); err != nil {
914 log.Error(ctx, "error writing response", "error", err)
915 }
916 }
917}
918
919func NewWebsocketTracker(maxConns int) *WebsocketTracker {
920 return &WebsocketTracker{
921 connections: make(map[string]int),
922 maxConnsPerIP: maxConns,
923 }
924}
925
926func (t *WebsocketTracker) AddConnection(ip string) bool {
927 t.mu.Lock()
928 defer t.mu.Unlock()
929
930 count := t.connections[ip]
931
932 if count >= t.maxConnsPerIP {
933 return false
934 }
935
936 t.connections[ip] = count + 1
937 return true
938}
939
940func (t *WebsocketTracker) RemoveConnection(ip string) {
941 t.mu.Lock()
942 defer t.mu.Unlock()
943
944 count := t.connections[ip]
945 if count > 0 {
946 t.connections[ip] = count - 1
947 }
948
949 if t.connections[ip] == 0 {
950 delete(t.connections, ip)
951 }
952}