Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strconv"
16 "strings"
17 "sync"
18 "time"
19
20 "github.com/NYTimes/gziphandler"
21 "github.com/bluesky-social/indigo/api/bsky"
22 "github.com/google/uuid"
23 "github.com/julienschmidt/httprouter"
24 "github.com/labstack/echo/v4"
25 "github.com/rs/cors"
26 sloghttp "github.com/samber/slog-http"
27 "golang.org/x/time/rate"
28
29 "github.com/streamplace/oatproxy/pkg/oatproxy"
30 "stream.place/streamplace/js/app"
31 "stream.place/streamplace/pkg/atproto"
32 "stream.place/streamplace/pkg/bus"
33 "stream.place/streamplace/pkg/config"
34 "stream.place/streamplace/pkg/crypto/signers/eip712"
35 "stream.place/streamplace/pkg/director"
36 apierrors "stream.place/streamplace/pkg/errors"
37 "stream.place/streamplace/pkg/linking"
38 "stream.place/streamplace/pkg/localdb"
39 "stream.place/streamplace/pkg/log"
40 "stream.place/streamplace/pkg/media"
41 "stream.place/streamplace/pkg/mist/mistconfig"
42 "stream.place/streamplace/pkg/model"
43 "stream.place/streamplace/pkg/notifications"
44 "stream.place/streamplace/pkg/spmetrics"
45 "stream.place/streamplace/pkg/spxrpc"
46 "stream.place/streamplace/pkg/statedb"
47 "stream.place/streamplace/pkg/streamplace"
48
49 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
50 "github.com/slok/go-http-metrics/middleware"
51 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
52 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
53 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
54)
55
56type StreamplaceAPI struct {
57 CLI *config.CLI
58 Model model.Model
59 StatefulDB *statedb.StatefulDB
60 LocalDB localdb.LocalDB
61 Updater *Updater
62 Signer *eip712.EIP712Signer
63 Mimes map[string]string
64 FirebaseNotifier notifications.FirebaseNotifier
65 MediaManager *media.MediaManager
66 MediaSigner media.MediaSigner
67 XRPCServer *spxrpc.Server
68 // not thread-safe yet
69 Aliases map[string]string
70 Bus *bus.Bus
71 ATSync *atproto.ATProtoSynchronizer
72 Director *director.Director
73
74 connTracker *WebsocketTracker
75
76 limiters map[string]*rate.Limiter
77 limitersMu sync.Mutex
78 SignerCache map[string]media.MediaSigner
79 SignerCacheMu sync.Mutex
80 op *oatproxy.OATProxy
81
82 // override tls port for http redirect server if we're using systemd file descriptors
83 HTTPRedirectTLSPort *int
84 sessions map[string]map[string]time.Time
85 sessionsLock sync.RWMutex
86
87 rtmpSessions map[string]*media.RTMPSession
88 rtmpSessionsLock sync.Mutex
89 rtmpInternalPlaybackAddr string
90}
91
92type WebsocketTracker struct {
93 connections map[string]int
94 maxConnsPerIP int
95 mu sync.RWMutex
96}
97
98func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy, ldb localdb.LocalDB) (*StreamplaceAPI, error) {
99 updater, err := PrepareUpdater(cli)
100 if err != nil {
101 return nil, err
102 }
103 a := &StreamplaceAPI{CLI: cli,
104 Model: mod,
105 StatefulDB: statefulDB,
106 Updater: updater,
107 FirebaseNotifier: noter,
108 MediaManager: mm,
109 MediaSigner: ms,
110 Aliases: map[string]string{},
111 Bus: bus,
112 ATSync: atsync,
113 Director: d,
114 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
115 limiters: make(map[string]*rate.Limiter),
116 SignerCache: make(map[string]media.MediaSigner),
117 op: op,
118 sessions: make(map[string]map[string]time.Time),
119 sessionsLock: sync.RWMutex{},
120 rtmpSessions: make(map[string]*media.RTMPSession),
121 rtmpSessionsLock: sync.Mutex{},
122 LocalDB: ldb,
123 }
124 a.Mimes, err = updater.GetMimes()
125 if err != nil {
126 return nil, err
127 }
128 return a, nil
129}
130
131type AppHostingFS struct {
132 http.FileSystem
133}
134
135var ErrorIndex = errors.New("not found, use index.html")
136
137func (fs AppHostingFS) Open(name string) (http.File, error) {
138 file, err1 := fs.FileSystem.Open(name)
139 if err1 == nil {
140 return file, nil
141 }
142 return nil, ErrorIndex
143}
144
145// api/playback/iame.li/webrtc?rendition=source
146// api/playback/iame.li/stream.mp4?rendition=source
147// api/playback/iame.li/stream.webm?rendition=source
148// api/playback/iame.li/hls/index.m3u8
149// api/playback/iame.li/hls/source/stream.m3u8
150// api/playback/iame.li/hls/source/000000000000.ts
151
152func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
153
154 mdlw := middleware.New(middleware.Config{
155 Recorder: metrics.NewRecorder(metrics.Config{}),
156 })
157 var xrpc http.Handler
158 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus, a.LocalDB)
159 if err != nil {
160 return nil, err
161 }
162 a.XRPCServer = xrpc.(*spxrpc.Server)
163 router := httprouter.New()
164
165 // Create our middleware factory with the default settings.
166
167 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
168
169 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
170
171 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
172 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
173 }
174 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
175 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
176 }
177
178 router.Handler("GET", "/oauth/*anything", a.op.Handler())
179 router.Handler("POST", "/oauth/*anything", a.op.Handler())
180 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
181 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
182 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
183 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
184 apiRouter := httprouter.New()
185 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
186 // old clients
187 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
188 // new ones
189 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
190 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
191 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
192 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
193 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
194 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
195 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
196 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
197 // they're jpegs now
198 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
199 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
200 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
201 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
202 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
203 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
204 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
205 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
206 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
207 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
208 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
209 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
210 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
211 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx))
212 apiRouter.NotFound = a.HandleAPI404(ctx)
213 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
214 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
215 router.Handler("GET", "/api/*resource", apiRouterHandler)
216 router.Handler("POST", "/api/*resource", apiRouterHandler)
217 router.Handler("PUT", "/api/*resource", apiRouterHandler)
218 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
219 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
220 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
221 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
222 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
223 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
224 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
225 // i wonder if there's a better way to do this?
226 router.GET("/favicon.ico", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
227 err := a.XRPCServer.HandleFaviconICO(echo.New().NewContext(r, w))
228 if err != nil {
229 log.Error(ctx, "error handling favicon.ico", "error", err)
230 w.WriteHeader(500)
231 return
232 }
233 })
234 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
235 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
236 router.GET("/dl/*params", a.HandleAppDownload(ctx))
237 router.POST("/", a.HandleWebRTCIngest(ctx))
238 for _, redirect := range a.CLI.Redirects {
239 parts := strings.Split(redirect, ":")
240 if len(parts) != 2 {
241 log.Error(ctx, "invalid redirect", "redirect", redirect)
242 return nil, fmt.Errorf("invalid redirect: %s", redirect)
243 }
244 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
245 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
246 })
247 }
248 if a.CLI.FrontendProxy != "" {
249 u, err := url.Parse(a.CLI.FrontendProxy)
250 if err != nil {
251 return nil, err
252 }
253 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
254 router.NotFound = &httputil.ReverseProxy{
255 Rewrite: func(r *httputil.ProxyRequest) {
256 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
257 // we need to use 127.0.0.1 because the atproto oauth client requires it
258 r.Out.Header.Set("Origin", u.String())
259 r.SetURL(u)
260 },
261 }
262 } else {
263 files, err := app.Files()
264 if err != nil {
265 return nil, err
266 }
267 index, err := files.Open("index.html")
268 if err != nil {
269 return nil, err
270 }
271 bs, err := io.ReadAll(index)
272 if err != nil {
273 return nil, err
274 }
275 linker, err := linking.NewLinker(ctx, bs, a.StatefulDB, a.CLI)
276 if err != nil {
277 return nil, err
278 }
279 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
280 if err != nil {
281 return nil, err
282 }
283 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
284 }
285 // needed because the WebRTC handler issues 405s from / otherwise
286 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
287 router.NotFound.ServeHTTP(w, r)
288 })
289 handler := sloghttp.Recovery(router)
290 handler = cors.AllowAll().Handler(handler)
291 handler = sloghttp.New(slog.Default())(handler)
292 handler = a.RateLimitMiddleware(ctx)(handler)
293
294 // this needs to be LAST so nothing else clobbers the context
295 handler = a.ContextMiddleware(ctx)(handler)
296
297 return handler, nil
298}
299func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler {
300 return func(next http.Handler) http.Handler {
301 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
302 uuid := uuid.New().String()
303 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
304 r = r.WithContext(ctx)
305 next.ServeHTTP(w, r)
306 })
307 }
308}
309func copyHeader(dst, src http.Header) {
310 for k, vv := range src {
311 // we'll handle CORS ourselves, thanks
312 if strings.HasPrefix(k, "Access-Control") {
313 continue
314 }
315 for _, v := range vv {
316 dst.Add(k, v)
317 }
318 }
319}
320
321// handler that takes care of static files and otherwise returns the index.html with the correct link card data
322func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
323 files, err := app.Files()
324 if err != nil {
325 return nil, err
326 }
327 fs := AppHostingFS{http.FS(files)}
328
329 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
330 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
331 f := strings.TrimPrefix(req.URL.Path, "/")
332 // under docs we need the index.html suffix due to astro rendering
333 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
334 f += "index.html"
335 }
336 _, err := fs.Open(f)
337 if err == nil {
338 fileHandler.ServeHTTP(w, req)
339 return
340 }
341 if errors.Is(err, ErrorIndex) || f == "" {
342 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN)
343 if err != nil {
344 log.Error(ctx, "error generating default card", "error", err)
345 }
346 w.Header().Set("Content-Type", "text/html")
347 if _, err := w.Write(bs); err != nil {
348 log.Error(ctx, "error writing response", "error", err)
349 }
350 } else {
351 log.Warn(ctx, "error opening file", "error", err)
352 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
353 }
354 })
355 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
356 proto := "http"
357 if req.TLS != nil {
358 proto = "https"
359 }
360 fwProto := req.Header.Get("x-forwarded-proto")
361 if fwProto != "" {
362 proto = fwProto
363 }
364 req.URL.Host = req.Host
365 req.URL.Scheme = proto
366 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
367 // quick check for things that aren't valid handles/dids
368 if strings.ContainsAny(maybeHandle, "/_") {
369 defaultHandler.ServeHTTP(w, req)
370 return
371 }
372 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
373 if err != nil || repo == nil {
374 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
375 defaultHandler.ServeHTTP(w, req)
376 return
377 }
378 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
379 if err != nil || ls == nil {
380 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
381 defaultHandler.ServeHTTP(w, req)
382 return
383 }
384 lsv, err := ls.ToLivestreamView()
385 if err != nil || lsv == nil {
386 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
387 defaultHandler.ServeHTTP(w, req)
388 return
389 }
390 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN)
391 if err != nil {
392 log.Error(ctx, "error generating html", "error", err)
393 defaultHandler.ServeHTTP(w, req)
394 return
395 }
396 w.Header().Set("Content-Type", "text/html")
397 if _, err := w.Write(bs); err != nil {
398 log.Error(ctx, "error writing response", "error", err)
399 }
400 }), nil
401}
402
403func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
404 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
405 if !a.CLI.HasMist() {
406 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
407 return
408 }
409 stream := params.ByName("stream")
410 if stream == "" {
411 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
412 return
413 }
414
415 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
416 prefix := fmt.Sprintf(tmpl, fullstream)
417 resource := params.ByName("resource")
418
419 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
420
421 client := &http.Client{}
422 req.URL = &url.URL{
423 Scheme: "http",
424 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
425 Path: fmt.Sprintf("%s%s", prefix, resource),
426 RawQuery: req.URL.RawQuery,
427 }
428
429 //http: Request.RequestURI can't be set in client requests.
430 //http://golang.org/src/pkg/net/http/client.go
431 req.RequestURI = ""
432
433 resp, err := client.Do(req)
434 if err != nil {
435 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
436 return
437 }
438 defer resp.Body.Close()
439
440 copyHeader(w.Header(), resp.Header)
441 w.WriteHeader(resp.StatusCode)
442 if _, err := io.Copy(w, resp.Body); err != nil {
443 log.Error(ctx, "error writing response", "error", err)
444 }
445 }
446}
447
448func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
449 return func(w http.ResponseWriter, req *http.Request) {
450 noslash := req.URL.Path[1:]
451 ct, ok := a.Mimes[noslash]
452 if ok {
453 w.Header().Set("content-type", ct)
454 }
455 fs.ServeHTTP(w, req)
456 }
457}
458
459func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
460 var tlsPort string
461 var err error
462 if a.HTTPRedirectTLSPort != nil {
463 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort)
464 } else {
465 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr)
466 if err != nil {
467 return nil, err
468 }
469 }
470 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
471 host, _, err := net.SplitHostPort(req.Host)
472 if err != nil {
473 host = req.Host
474 }
475 u := req.URL
476 if tlsPort == "443" {
477 u.Host = host
478 } else {
479 u.Host = net.JoinHostPort(host, tlsPort)
480 }
481 u.Scheme = "https"
482 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
483 }
484 mux := http.NewServeMux()
485 mux.HandleFunc("/", handleRedirect)
486 return mux, nil
487}
488
489type NotificationPayload struct {
490 Token string `json:"token"`
491 RepoDID string `json:"repoDID"`
492}
493
494func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
495 return func(w http.ResponseWriter, req *http.Request) {
496 w.WriteHeader(404)
497 }
498}
499
500func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
501 return func(w http.ResponseWriter, req *http.Request) {
502 payload, err := io.ReadAll(req.Body)
503 if err != nil {
504 log.Log(ctx, "error reading notification create", "error", err)
505 w.WriteHeader(400)
506 return
507 }
508 n := NotificationPayload{}
509 err = json.Unmarshal(payload, &n)
510 if err != nil {
511 log.Log(ctx, "error unmarshalling notification create", "error", err)
512 w.WriteHeader(400)
513 return
514 }
515 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID)
516 if err != nil {
517 log.Log(ctx, "error creating notification", "error", err)
518 w.WriteHeader(400)
519 return
520 }
521 log.Log(ctx, "successfully created notification", "token", n.Token)
522 w.WriteHeader(200)
523 if n.RepoDID != "" {
524 go func() {
525 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
526 if err != nil {
527 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
528 }
529 }()
530 }
531 }
532}
533
534func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
535 return func(w http.ResponseWriter, req *http.Request) {
536 err := a.MediaManager.ValidateMP4(ctx, req.Body, false)
537 if err != nil {
538 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
539 return
540 }
541 w.WriteHeader(200)
542 }
543}
544
545func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
546 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
547 if !a.CLI.PlayerTelemetry {
548 w.WriteHeader(200)
549 return
550 }
551 var event model.PlayerEventAPI
552 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
553 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
554 return
555 }
556 err := a.Model.CreatePlayerEvent(event)
557 if err != nil {
558 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
559 return
560 }
561 w.WriteHeader(201)
562 }
563}
564
565func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
566 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
567 user := params.ByName("user")
568 if user == "" {
569 apierrors.WriteHTTPBadRequest(w, "user required", nil)
570 return
571 }
572 user, err := a.NormalizeUser(ctx, user)
573 if err != nil {
574 apierrors.WriteHTTPNotFound(w, "user not found", err)
575 return
576 }
577 count := spmetrics.GetViewCount(user)
578 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
579 if err != nil {
580 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
581 return
582 }
583 if _, err := w.Write(bs); err != nil {
584 log.Error(ctx, "error writing response", "error", err)
585 }
586 }
587}
588
589func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
590 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
591 log.Log(ctx, "got bluesky notification", "params", params)
592 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
593 if err != nil {
594 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
595 return
596 }
597 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
598 if err != nil {
599 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
600 return
601 }
602 bs, err := json.Marshal(signingKeys)
603 if err != nil {
604 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
605 return
606 }
607 if _, err := w.Write(bs); err != nil {
608 log.Error(ctx, "error writing response", "error", err)
609 }
610 }
611}
612
613type ChatResponse struct {
614 Post *bsky.FeedPost `json:"post"`
615 Repo *model.Repo `json:"repo"`
616 CID string `json:"cid"`
617}
618
619func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
620 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
621 user := params.ByName("repoDID")
622 if user == "" {
623 apierrors.WriteHTTPBadRequest(w, "user required", nil)
624 return
625 }
626 repoDID, err := a.NormalizeUser(ctx, user)
627 if err != nil {
628 apierrors.WriteHTTPNotFound(w, "user not found", err)
629 return
630 }
631 replies, err := a.Model.GetReplies(repoDID)
632 if err != nil {
633 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
634 return
635 }
636 bs, err := json.Marshal(replies)
637 if err != nil {
638 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
639 return
640 }
641 w.Header().Set("Content-Type", "application/json")
642 if _, err := w.Write(bs); err != nil {
643 log.Error(ctx, "error writing response", "error", err)
644 }
645 }
646}
647
648func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
649 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
650 user := params.ByName("repoDID")
651 if user == "" {
652 apierrors.WriteHTTPBadRequest(w, "user required", nil)
653 return
654 }
655 repoDID, err := a.NormalizeUser(ctx, user)
656 if err != nil {
657 apierrors.WriteHTTPNotFound(w, "user not found", err)
658 return
659 }
660 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
661 if err != nil {
662 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
663 return
664 }
665
666 doc, err := livestream.ToLivestreamView()
667 if err != nil {
668 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
669 return
670 }
671
672 if livestream == nil {
673 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
674 return
675 }
676
677 bs, err := json.Marshal(doc)
678 if err != nil {
679 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
680 return
681 }
682 w.Header().Set("Content-Type", "application/json")
683 if _, err := w.Write(bs); err != nil {
684 log.Error(ctx, "error writing response", "error", err)
685 }
686 }
687}
688
689func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
690 return func(next http.Handler) http.Handler {
691 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
692 ip, _, err := net.SplitHostPort(req.RemoteAddr)
693 if err != nil {
694 ip = req.RemoteAddr
695 }
696
697 if a.CLI.RateLimitPerSecond > 0 {
698 limiter := a.getLimiter(ip)
699
700 if !limiter.Allow() {
701 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
702 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
703 return
704 }
705 }
706
707 next.ServeHTTP(w, req)
708 })
709 }
710}
711
712// helper for getting a listener from a systemd file descriptor
713func getListenerFromFD(fdName string) (net.Listener, error) {
714 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES"))
715 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) {
716 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":")
717 for i, name := range names {
718 if name == fdName {
719 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3)
720 f1 := os.NewFile(uintptr(i+3), fdName)
721 return net.FileListener(f1)
722 }
723 }
724 }
725 return nil, nil
726}
727
728func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
729 handler, err := a.Handler(ctx)
730 if err != nil {
731 return err
732 }
733 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
734 ln, err := getListenerFromFD("http")
735 if err != nil {
736 return err
737 }
738 if ln == nil {
739 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
740 if err != nil {
741 return err
742 }
743 } else {
744 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr())
745 }
746 log.Log(ctx, "http server starting", "addr", ln.Addr())
747 return s.Serve(ln)
748 })
749}
750
751func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
752 handler, err := a.RedirectHandler(ctx)
753 if err != nil {
754 return err
755 }
756 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
757 ln, err := getListenerFromFD("http")
758 if err != nil {
759 return err
760 }
761 if ln == nil {
762 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
763 if err != nil {
764 return err
765 }
766 } else {
767 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr())
768 }
769 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr())
770 return s.Serve(ln)
771 })
772}
773
774func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
775 handler, err := a.Handler(ctx)
776 if err != nil {
777 return err
778 }
779 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
780 ln, err := getListenerFromFD("https")
781 if err != nil {
782 return err
783 }
784 if ln == nil {
785 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr)
786 if err != nil {
787 return err
788 }
789 } else {
790 // tell the redirect handler we're using systemd and they should go to 443
791 port443 := 443
792 a.HTTPRedirectTLSPort = &port443
793 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr())
794 }
795 log.Log(ctx, "https server starting",
796 "addr", ln.Addr(),
797 "certPath", a.CLI.TLSCertPath,
798 "keyPath", a.CLI.TLSKeyPath,
799 )
800 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
801 })
802}
803
804func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
805 ctx, cancel := context.WithCancel(ctx)
806 handler = gziphandler.GzipHandler(handler)
807 server := http.Server{Handler: handler}
808 var serveErr error
809 go func() {
810 serveErr = serve(&server)
811 cancel()
812 }()
813 <-ctx.Done()
814 if serveErr != nil {
815 return fmt.Errorf("error in http server: %w", serveErr)
816 }
817
818 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
819 defer cancel()
820 return server.Shutdown(ctx)
821}
822
823func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
824 return func(w http.ResponseWriter, req *http.Request) {
825 w.WriteHeader(200)
826 }
827}
828
829func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
830 a.limitersMu.Lock()
831 defer a.limitersMu.Unlock()
832
833 limiter, exists := a.limiters[ip]
834 if !exists {
835 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
836 a.limiters[ip] = limiter
837 }
838
839 return limiter
840}
841
842func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle {
843 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
844 user := params.ByName("user")
845 file := params.ByName("file")
846 if user == "" || file == "" {
847 apierrors.WriteHTTPBadRequest(w, "user and file required", nil)
848 return
849 }
850 user, err := a.NormalizeUser(ctx, user)
851 if err != nil {
852 apierrors.WriteHTTPNotFound(w, "user not found", err)
853 return
854 }
855 fPath := []string{user, "clips", file}
856 exists, err := a.CLI.DataFileExists(fPath)
857 if err != nil {
858 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err)
859 return
860 }
861 if !exists {
862 apierrors.WriteHTTPNotFound(w, "file not found", nil)
863 return
864 }
865 fd, err := os.Open(a.CLI.DataFilePath(fPath))
866 if err != nil {
867 apierrors.WriteHTTPInternalServerError(w, "could not open file", err)
868 return
869 }
870 defer fd.Close()
871 w.Header().Set("Content-Type", "video/mp4")
872 if _, err := io.Copy(w, fd); err != nil {
873 log.Error(ctx, "error writing response", "error", err)
874 }
875 }
876}
877
878func NewWebsocketTracker(maxConns int) *WebsocketTracker {
879 return &WebsocketTracker{
880 connections: make(map[string]int),
881 maxConnsPerIP: maxConns,
882 }
883}
884
885func (t *WebsocketTracker) AddConnection(ip string) bool {
886 t.mu.Lock()
887 defer t.mu.Unlock()
888
889 count := t.connections[ip]
890
891 if count >= t.maxConnsPerIP {
892 return false
893 }
894
895 t.connections[ip] = count + 1
896 return true
897}
898
899func (t *WebsocketTracker) RemoveConnection(ip string) {
900 t.mu.Lock()
901 defer t.mu.Unlock()
902
903 count := t.connections[ip]
904 if count > 0 {
905 t.connections[ip] = count - 1
906 }
907
908 if t.connections[ip] == 0 {
909 delete(t.connections, ip)
910 }
911}