Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strconv"
16 "strings"
17 "sync"
18 "time"
19
20 "github.com/NYTimes/gziphandler"
21 "github.com/bluesky-social/indigo/api/bsky"
22 "github.com/google/uuid"
23 "github.com/julienschmidt/httprouter"
24 "github.com/labstack/echo/v4"
25 "github.com/rs/cors"
26 sloghttp "github.com/samber/slog-http"
27 "golang.org/x/time/rate"
28
29 "github.com/streamplace/oatproxy/pkg/oatproxy"
30 "stream.place/streamplace/js/app"
31 "stream.place/streamplace/pkg/analytics"
32 "stream.place/streamplace/pkg/atproto"
33 "stream.place/streamplace/pkg/bus"
34 "stream.place/streamplace/pkg/config"
35 "stream.place/streamplace/pkg/crypto/signers/eip712"
36 "stream.place/streamplace/pkg/director"
37 apierrors "stream.place/streamplace/pkg/errors"
38 "stream.place/streamplace/pkg/linking"
39 "stream.place/streamplace/pkg/log"
40 "stream.place/streamplace/pkg/media"
41 "stream.place/streamplace/pkg/mist/mistconfig"
42 "stream.place/streamplace/pkg/model"
43 "stream.place/streamplace/pkg/notifications"
44 "stream.place/streamplace/pkg/spmetrics"
45 "stream.place/streamplace/pkg/spxrpc"
46 "stream.place/streamplace/pkg/statedb"
47 "stream.place/streamplace/pkg/streamplace"
48
49 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
50 "github.com/slok/go-http-metrics/middleware"
51 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
52 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
53 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
54)
55
56type StreamplaceAPI struct {
57 CLI *config.CLI
58 Model model.Model
59 StatefulDB *statedb.StatefulDB
60 Updater *Updater
61 Signer *eip712.EIP712Signer
62 Mimes map[string]string
63 FirebaseNotifier notifications.FirebaseNotifier
64 AnalyticsClient analytics.Client
65 MediaManager *media.MediaManager
66 MediaSigner media.MediaSigner
67 XRPCServer *spxrpc.Server
68 // not thread-safe yet
69 Aliases map[string]string
70 Bus *bus.Bus
71 ATSync *atproto.ATProtoSynchronizer
72 Director *director.Director
73
74 connTracker *WebsocketTracker
75
76 limiters map[string]*rate.Limiter
77 limitersMu sync.Mutex
78 SignerCache map[string]media.MediaSigner
79 SignerCacheMu sync.Mutex
80 op *oatproxy.OATProxy
81
82 // override tls port for http redirect server if we're using systemd file descriptors
83 HTTPRedirectTLSPort *int
84 sessions map[string]map[string]time.Time
85 sessionsLock sync.RWMutex
86
87 rtmpSessions map[string]*media.RTMPSession
88 rtmpSessionsLock sync.Mutex
89 rtmpInternalPlaybackAddr string
90}
91
92type WebsocketTracker struct {
93 connections map[string]int
94 maxConnsPerIP int
95 mu sync.RWMutex
96}
97
98func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, analyticsClient analytics.Client, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
99 updater, err := PrepareUpdater(cli)
100 if err != nil {
101 return nil, err
102 }
103 a := &StreamplaceAPI{CLI: cli,
104 Model: mod,
105 StatefulDB: statefulDB,
106 Updater: updater,
107 FirebaseNotifier: noter,
108 AnalyticsClient: analyticsClient,
109 MediaManager: mm,
110 MediaSigner: ms,
111 Aliases: map[string]string{},
112 Bus: bus,
113 ATSync: atsync,
114 Director: d,
115 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
116 limiters: make(map[string]*rate.Limiter),
117 SignerCache: make(map[string]media.MediaSigner),
118 op: op,
119 sessions: make(map[string]map[string]time.Time),
120 sessionsLock: sync.RWMutex{},
121 rtmpSessions: make(map[string]*media.RTMPSession),
122 rtmpSessionsLock: sync.Mutex{},
123 }
124 a.Mimes, err = updater.GetMimes()
125 if err != nil {
126 return nil, err
127 }
128 return a, nil
129}
130
131type AppHostingFS struct {
132 http.FileSystem
133}
134
135var ErrorIndex = errors.New("not found, use index.html")
136
137func (fs AppHostingFS) Open(name string) (http.File, error) {
138 file, err1 := fs.FileSystem.Open(name)
139 if err1 == nil {
140 return file, nil
141 }
142 return nil, ErrorIndex
143}
144
145// api/playback/iame.li/webrtc?rendition=source
146// api/playback/iame.li/stream.mp4?rendition=source
147// api/playback/iame.li/stream.webm?rendition=source
148// api/playback/iame.li/hls/index.m3u8
149// api/playback/iame.li/hls/source/stream.m3u8
150// api/playback/iame.li/hls/source/000000000000.ts
151
152func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
153
154 mdlw := middleware.New(middleware.Config{
155 Recorder: metrics.NewRecorder(metrics.Config{}),
156 })
157 var xrpc http.Handler
158 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus)
159 if err != nil {
160 return nil, err
161 }
162 a.XRPCServer = xrpc.(*spxrpc.Server)
163 router := httprouter.New()
164
165 // Create our middleware factory with the default settings.
166
167 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
168
169 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
170
171 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
172 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
173 }
174 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
175 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
176 }
177
178 router.Handler("GET", "/oauth/*anything", a.op.Handler())
179 router.Handler("POST", "/oauth/*anything", a.op.Handler())
180 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
181 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
182 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
183 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
184 apiRouter := httprouter.New()
185 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
186 // old clients
187 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
188 // new ones
189 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
190 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
191 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
192 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
193 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
194 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
195 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
196 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
197 // they're jpegs now
198 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
199 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
200 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
201 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
202 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
203 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
204 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
205 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
206 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
207 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
208 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
209 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
210 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
211 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
212 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
213 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx))
214
215 // Analytics routes (optional, only if analytics is configured)
216 if a.AnalyticsClient != nil {
217 addFunc(apiRouter, "GET", "/api/analytics/realtime", analytics.HandleRealtimeStats(a.AnalyticsClient))
218 addHandle(apiRouter, "GET", "/api/analytics/streamer/:did", analytics.HandleStreamerStats(a.AnalyticsClient))
219 addHandle(apiRouter, "GET", "/api/analytics/viewer/:did", analytics.HandleViewerHistory(a.AnalyticsClient))
220 log.Log(ctx, "registered analytics HTTP endpoints")
221 } else {
222 log.Debug(ctx, "analytics not configured, skipping analytics endpoints")
223 }
224
225 apiRouter.NotFound = a.HandleAPI404(ctx)
226 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
227 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
228 router.Handler("GET", "/api/*resource", apiRouterHandler)
229 router.Handler("POST", "/api/*resource", apiRouterHandler)
230 router.Handler("PUT", "/api/*resource", apiRouterHandler)
231 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
232 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
233 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
234 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
235 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
236 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
237 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
238 // i wonder if there's a better way to do this?
239 router.GET("/favicon.ico", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
240 err := a.XRPCServer.HandleFaviconICO(echo.New().NewContext(r, w))
241 if err != nil {
242 log.Error(ctx, "error handling favicon.ico", "error", err)
243 w.WriteHeader(500)
244 return
245 }
246 })
247 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
248 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
249 router.GET("/dl/*params", a.HandleAppDownload(ctx))
250 router.POST("/", a.HandleWebRTCIngest(ctx))
251 for _, redirect := range a.CLI.Redirects {
252 parts := strings.Split(redirect, ":")
253 if len(parts) != 2 {
254 log.Error(ctx, "invalid redirect", "redirect", redirect)
255 return nil, fmt.Errorf("invalid redirect: %s", redirect)
256 }
257 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
258 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
259 })
260 }
261 if a.CLI.FrontendProxy != "" {
262 u, err := url.Parse(a.CLI.FrontendProxy)
263 if err != nil {
264 return nil, err
265 }
266 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
267 router.NotFound = &httputil.ReverseProxy{
268 Rewrite: func(r *httputil.ProxyRequest) {
269 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
270 // we need to use 127.0.0.1 because the atproto oauth client requires it
271 r.Out.Header.Set("Origin", u.String())
272 r.SetURL(u)
273 },
274 }
275 } else {
276 files, err := app.Files()
277 if err != nil {
278 return nil, err
279 }
280 index, err := files.Open("index.html")
281 if err != nil {
282 return nil, err
283 }
284 bs, err := io.ReadAll(index)
285 if err != nil {
286 return nil, err
287 }
288 linker, err := linking.NewLinker(ctx, bs)
289 if err != nil {
290 return nil, err
291 }
292 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
293 if err != nil {
294 return nil, err
295 }
296 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
297 }
298 // needed because the WebRTC handler issues 405s from / otherwise
299 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
300 router.NotFound.ServeHTTP(w, r)
301 })
302 handler := sloghttp.Recovery(router)
303 handler = cors.AllowAll().Handler(handler)
304 handler = sloghttp.New(slog.Default())(handler)
305 handler = a.RateLimitMiddleware(ctx)(handler)
306
307 // this needs to be LAST so nothing else clobbers the context
308 handler = a.ContextMiddleware(ctx)(handler)
309
310 return handler, nil
311}
312func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler {
313 return func(next http.Handler) http.Handler {
314 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
315 uuid := uuid.New().String()
316 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
317 r = r.WithContext(ctx)
318 next.ServeHTTP(w, r)
319 })
320 }
321}
322func copyHeader(dst, src http.Header) {
323 for k, vv := range src {
324 // we'll handle CORS ourselves, thanks
325 if strings.HasPrefix(k, "Access-Control") {
326 continue
327 }
328 for _, v := range vv {
329 dst.Add(k, v)
330 }
331 }
332}
333
334// handler that takes care of static files and otherwise returns the index.html with the correct link card data
335func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
336 files, err := app.Files()
337 if err != nil {
338 return nil, err
339 }
340 fs := AppHostingFS{http.FS(files)}
341
342 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
343 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
344 f := strings.TrimPrefix(req.URL.Path, "/")
345 // under docs we need the index.html suffix due to astro rendering
346 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
347 f += "index.html"
348 }
349 _, err := fs.Open(f)
350 if err == nil {
351 fileHandler.ServeHTTP(w, req)
352 return
353 }
354 if errors.Is(err, ErrorIndex) || f == "" {
355 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN)
356 if err != nil {
357 log.Error(ctx, "error generating default card", "error", err)
358 }
359 w.Header().Set("Content-Type", "text/html")
360 if _, err := w.Write(bs); err != nil {
361 log.Error(ctx, "error writing response", "error", err)
362 }
363 } else {
364 log.Warn(ctx, "error opening file", "error", err)
365 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
366 }
367 })
368 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
369 proto := "http"
370 if req.TLS != nil {
371 proto = "https"
372 }
373 fwProto := req.Header.Get("x-forwarded-proto")
374 if fwProto != "" {
375 proto = fwProto
376 }
377 req.URL.Host = req.Host
378 req.URL.Scheme = proto
379 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
380 // quick check for things that aren't valid handles/dids
381 if strings.ContainsAny(maybeHandle, "/_") {
382 defaultHandler.ServeHTTP(w, req)
383 return
384 }
385 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
386 if err != nil || repo == nil {
387 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
388 defaultHandler.ServeHTTP(w, req)
389 return
390 }
391 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
392 if err != nil || ls == nil {
393 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
394 defaultHandler.ServeHTTP(w, req)
395 return
396 }
397 lsv, err := ls.ToLivestreamView()
398 if err != nil || lsv == nil {
399 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
400 defaultHandler.ServeHTTP(w, req)
401 return
402 }
403 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN)
404 if err != nil {
405 log.Error(ctx, "error generating html", "error", err)
406 defaultHandler.ServeHTTP(w, req)
407 return
408 }
409 w.Header().Set("Content-Type", "text/html")
410 if _, err := w.Write(bs); err != nil {
411 log.Error(ctx, "error writing response", "error", err)
412 }
413 }), nil
414}
415
416func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
417 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
418 if !a.CLI.HasMist() {
419 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
420 return
421 }
422 stream := params.ByName("stream")
423 if stream == "" {
424 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
425 return
426 }
427
428 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
429 prefix := fmt.Sprintf(tmpl, fullstream)
430 resource := params.ByName("resource")
431
432 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
433
434 client := &http.Client{}
435 req.URL = &url.URL{
436 Scheme: "http",
437 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
438 Path: fmt.Sprintf("%s%s", prefix, resource),
439 RawQuery: req.URL.RawQuery,
440 }
441
442 //http: Request.RequestURI can't be set in client requests.
443 //http://golang.org/src/pkg/net/http/client.go
444 req.RequestURI = ""
445
446 resp, err := client.Do(req)
447 if err != nil {
448 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
449 return
450 }
451 defer resp.Body.Close()
452
453 copyHeader(w.Header(), resp.Header)
454 w.WriteHeader(resp.StatusCode)
455 if _, err := io.Copy(w, resp.Body); err != nil {
456 log.Error(ctx, "error writing response", "error", err)
457 }
458 }
459}
460
461func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
462 return func(w http.ResponseWriter, req *http.Request) {
463 noslash := req.URL.Path[1:]
464 ct, ok := a.Mimes[noslash]
465 if ok {
466 w.Header().Set("content-type", ct)
467 }
468 fs.ServeHTTP(w, req)
469 }
470}
471
472func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
473 var tlsPort string
474 var err error
475 if a.HTTPRedirectTLSPort != nil {
476 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort)
477 } else {
478 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr)
479 if err != nil {
480 return nil, err
481 }
482 }
483 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
484 host, _, err := net.SplitHostPort(req.Host)
485 if err != nil {
486 host = req.Host
487 }
488 u := req.URL
489 if tlsPort == "443" {
490 u.Host = host
491 } else {
492 u.Host = net.JoinHostPort(host, tlsPort)
493 }
494 u.Scheme = "https"
495 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
496 }
497 mux := http.NewServeMux()
498 mux.HandleFunc("/", handleRedirect)
499 return mux, nil
500}
501
502type NotificationPayload struct {
503 Token string `json:"token"`
504 RepoDID string `json:"repoDID"`
505}
506
507func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
508 return func(w http.ResponseWriter, req *http.Request) {
509 w.WriteHeader(404)
510 }
511}
512
513func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
514 return func(w http.ResponseWriter, req *http.Request) {
515 payload, err := io.ReadAll(req.Body)
516 if err != nil {
517 log.Log(ctx, "error reading notification create", "error", err)
518 w.WriteHeader(400)
519 return
520 }
521 n := NotificationPayload{}
522 err = json.Unmarshal(payload, &n)
523 if err != nil {
524 log.Log(ctx, "error unmarshalling notification create", "error", err)
525 w.WriteHeader(400)
526 return
527 }
528 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID)
529 if err != nil {
530 log.Log(ctx, "error creating notification", "error", err)
531 w.WriteHeader(400)
532 return
533 }
534 log.Log(ctx, "successfully created notification", "token", n.Token)
535 w.WriteHeader(200)
536 if n.RepoDID != "" {
537 go func() {
538 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
539 if err != nil {
540 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
541 }
542 }()
543 }
544 }
545}
546
547func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
548 return func(w http.ResponseWriter, req *http.Request) {
549 err := a.MediaManager.ValidateMP4(ctx, req.Body, false)
550 if err != nil {
551 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
552 return
553 }
554 w.WriteHeader(200)
555 }
556}
557
558func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
559 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
560 var event model.PlayerEventAPI
561 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
562 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
563 return
564 }
565
566 // Generate server-side ID
567 uu, err := uuid.NewV7()
568 if err != nil {
569 apierrors.WriteHTTPInternalServerError(w, "could not generate event ID", err)
570 return
571 }
572 event.ID = uu.String()
573
574 err = a.Model.CreatePlayerEvent(event)
575 if err != nil {
576 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
577 return
578 }
579
580 // Forward relevant events to analytics service if configured
581 if a.AnalyticsClient != nil {
582 go a.forwardPlayerEventToAnalytics(req.Context(), event)
583 } else {
584 log.Log(req.Context(), "no analytics client configured, skipping forwarding player event")
585 }
586
587 w.WriteHeader(201)
588 }
589}
590
591func (a *StreamplaceAPI) forwardPlayerEventToAnalytics(ctx context.Context, event model.PlayerEventAPI) {
592 // Only forward watch/playback events
593 if event.EventType != "aq-played" && event.EventType != "watch" {
594 return
595 }
596
597 // Parse metadata into typed struct
598 var meta model.PlayerEventMeta
599 metaBytes, _ := json.Marshal(event.Meta)
600 if err := json.Unmarshal(metaBytes, &meta); err != nil {
601 log.Log(ctx, "failed to parse event metadata", "error", err)
602 return
603 }
604
605 // Validate required fields
606 if err := meta.Validate(); err != nil {
607 log.Log(ctx, "invalid event metadata", "error", err)
608 return
609 }
610
611 analyticsEvent := &analytics.Event{
612 EventID: event.ID,
613 EventType: event.EventType,
614 DeviceID: meta.DeviceID,
615 DID: meta.DID,
616 SessionID: meta.SessionID,
617 TimestampMs: event.Time.UnixMilli(),
618 StreamerDID: meta.StreamerDID,
619 StreamID: meta.StreamID,
620 PropertiesJSON: string(metaBytes),
621 SchemaVersion: 1,
622 ClientVersion: meta.ClientVersion,
623 Platform: meta.Platform,
624 }
625
626 if err := a.AnalyticsClient.IngestEvents(ctx, []*analytics.Event{analyticsEvent}); err != nil {
627 log.Log(ctx, "failed to ingest analytics event", "error", err)
628 }
629}
630
631func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
632 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
633 segs, err := a.Model.MostRecentSegments()
634 if err != nil {
635 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
636 return
637 }
638 bs, err := json.Marshal(segs)
639 if err != nil {
640 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
641 return
642 }
643 w.Header().Add("Content-Type", "application/json")
644 if _, err := w.Write(bs); err != nil {
645 log.Error(ctx, "error writing response", "error", err)
646 }
647 }
648}
649
650func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
651 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
652 user := params.ByName("repoDID")
653 if user == "" {
654 apierrors.WriteHTTPBadRequest(w, "user required", nil)
655 return
656 }
657 user, err := a.NormalizeUser(ctx, user)
658 if err != nil {
659 apierrors.WriteHTTPNotFound(w, "user not found", err)
660 return
661 }
662 seg, err := a.Model.LatestSegmentForUser(user)
663 if err != nil {
664 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
665 return
666 }
667 streamplaceSeg, err := seg.ToStreamplaceSegment()
668 if err != nil {
669 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
670 return
671 }
672 bs, err := json.Marshal(streamplaceSeg)
673 if err != nil {
674 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
675 return
676 }
677 w.Header().Add("Content-Type", "application/json")
678 if _, err := w.Write(bs); err != nil {
679 log.Error(ctx, "error writing response", "error", err)
680 }
681 }
682}
683
684func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
685 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
686 user := params.ByName("user")
687 if user == "" {
688 apierrors.WriteHTTPBadRequest(w, "user required", nil)
689 return
690 }
691 user, err := a.NormalizeUser(ctx, user)
692 if err != nil {
693 apierrors.WriteHTTPNotFound(w, "user not found", err)
694 return
695 }
696 count := spmetrics.GetViewCount(user)
697 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
698 if err != nil {
699 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
700 return
701 }
702 if _, err := w.Write(bs); err != nil {
703 log.Error(ctx, "error writing response", "error", err)
704 }
705 }
706}
707
708func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
709 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
710 log.Log(ctx, "got bluesky notification", "params", params)
711 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
712 if err != nil {
713 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
714 return
715 }
716 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
717 if err != nil {
718 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
719 return
720 }
721 bs, err := json.Marshal(signingKeys)
722 if err != nil {
723 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
724 return
725 }
726 if _, err := w.Write(bs); err != nil {
727 log.Error(ctx, "error writing response", "error", err)
728 }
729 }
730}
731
732type ChatResponse struct {
733 Post *bsky.FeedPost `json:"post"`
734 Repo *model.Repo `json:"repo"`
735 CID string `json:"cid"`
736}
737
738func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
739 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
740 user := params.ByName("repoDID")
741 if user == "" {
742 apierrors.WriteHTTPBadRequest(w, "user required", nil)
743 return
744 }
745 repoDID, err := a.NormalizeUser(ctx, user)
746 if err != nil {
747 apierrors.WriteHTTPNotFound(w, "user not found", err)
748 return
749 }
750 replies, err := a.Model.GetReplies(repoDID)
751 if err != nil {
752 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
753 return
754 }
755 bs, err := json.Marshal(replies)
756 if err != nil {
757 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
758 return
759 }
760 w.Header().Set("Content-Type", "application/json")
761 if _, err := w.Write(bs); err != nil {
762 log.Error(ctx, "error writing response", "error", err)
763 }
764 }
765}
766
767func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
768 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
769 user := params.ByName("repoDID")
770 if user == "" {
771 apierrors.WriteHTTPBadRequest(w, "user required", nil)
772 return
773 }
774 repoDID, err := a.NormalizeUser(ctx, user)
775 if err != nil {
776 apierrors.WriteHTTPNotFound(w, "user not found", err)
777 return
778 }
779 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
780 if err != nil {
781 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
782 return
783 }
784
785 doc, err := livestream.ToLivestreamView()
786 if err != nil {
787 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
788 return
789 }
790
791 if livestream == nil {
792 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
793 return
794 }
795
796 bs, err := json.Marshal(doc)
797 if err != nil {
798 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
799 return
800 }
801 w.Header().Set("Content-Type", "application/json")
802 if _, err := w.Write(bs); err != nil {
803 log.Error(ctx, "error writing response", "error", err)
804 }
805 }
806}
807
808func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
809 return func(next http.Handler) http.Handler {
810 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
811 ip, _, err := net.SplitHostPort(req.RemoteAddr)
812 if err != nil {
813 ip = req.RemoteAddr
814 }
815
816 if a.CLI.RateLimitPerSecond > 0 {
817 limiter := a.getLimiter(ip)
818
819 if !limiter.Allow() {
820 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
821 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
822 return
823 }
824 }
825
826 next.ServeHTTP(w, req)
827 })
828 }
829}
830
831// helper for getting a listener from a systemd file descriptor
832func getListenerFromFD(fdName string) (net.Listener, error) {
833 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES"))
834 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) {
835 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":")
836 for i, name := range names {
837 if name == fdName {
838 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3)
839 f1 := os.NewFile(uintptr(i+3), fdName)
840 return net.FileListener(f1)
841 }
842 }
843 }
844 return nil, nil
845}
846
847func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
848 handler, err := a.Handler(ctx)
849 if err != nil {
850 return err
851 }
852 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
853 ln, err := getListenerFromFD("http")
854 if err != nil {
855 return err
856 }
857 if ln == nil {
858 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
859 if err != nil {
860 return err
861 }
862 } else {
863 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr())
864 }
865 log.Log(ctx, "http server starting", "addr", ln.Addr())
866 return s.Serve(ln)
867 })
868}
869
870func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
871 handler, err := a.RedirectHandler(ctx)
872 if err != nil {
873 return err
874 }
875 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
876 ln, err := getListenerFromFD("http")
877 if err != nil {
878 return err
879 }
880 if ln == nil {
881 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
882 if err != nil {
883 return err
884 }
885 } else {
886 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr())
887 }
888 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr())
889 return s.Serve(ln)
890 })
891}
892
893func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
894 handler, err := a.Handler(ctx)
895 if err != nil {
896 return err
897 }
898 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
899 ln, err := getListenerFromFD("https")
900 if err != nil {
901 return err
902 }
903 if ln == nil {
904 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr)
905 if err != nil {
906 return err
907 }
908 } else {
909 // tell the redirect handler we're using systemd and they should go to 443
910 port443 := 443
911 a.HTTPRedirectTLSPort = &port443
912 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr())
913 }
914 log.Log(ctx, "https server starting",
915 "addr", ln.Addr(),
916 "certPath", a.CLI.TLSCertPath,
917 "keyPath", a.CLI.TLSKeyPath,
918 )
919 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
920 })
921}
922
923func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
924 ctx, cancel := context.WithCancel(ctx)
925 handler = gziphandler.GzipHandler(handler)
926 server := http.Server{Handler: handler}
927 var serveErr error
928 go func() {
929 serveErr = serve(&server)
930 cancel()
931 }()
932 <-ctx.Done()
933 if serveErr != nil {
934 return fmt.Errorf("error in http server: %w", serveErr)
935 }
936
937 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
938 defer cancel()
939 return server.Shutdown(ctx)
940}
941
942func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
943 return func(w http.ResponseWriter, req *http.Request) {
944 w.WriteHeader(200)
945 }
946}
947
948func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
949 a.limitersMu.Lock()
950 defer a.limitersMu.Unlock()
951
952 limiter, exists := a.limiters[ip]
953 if !exists {
954 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
955 a.limiters[ip] = limiter
956 }
957
958 return limiter
959}
960
961func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle {
962 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
963 user := params.ByName("user")
964 file := params.ByName("file")
965 if user == "" || file == "" {
966 apierrors.WriteHTTPBadRequest(w, "user and file required", nil)
967 return
968 }
969 user, err := a.NormalizeUser(ctx, user)
970 if err != nil {
971 apierrors.WriteHTTPNotFound(w, "user not found", err)
972 return
973 }
974 fPath := []string{user, "clips", file}
975 exists, err := a.CLI.DataFileExists(fPath)
976 if err != nil {
977 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err)
978 return
979 }
980 if !exists {
981 apierrors.WriteHTTPNotFound(w, "file not found", nil)
982 return
983 }
984 fd, err := os.Open(a.CLI.DataFilePath(fPath))
985 if err != nil {
986 apierrors.WriteHTTPInternalServerError(w, "could not open file", err)
987 return
988 }
989 defer fd.Close()
990 w.Header().Set("Content-Type", "video/mp4")
991 if _, err := io.Copy(w, fd); err != nil {
992 log.Error(ctx, "error writing response", "error", err)
993 }
994 }
995}
996
997func NewWebsocketTracker(maxConns int) *WebsocketTracker {
998 return &WebsocketTracker{
999 connections: make(map[string]int),
1000 maxConnsPerIP: maxConns,
1001 }
1002}
1003
1004func (t *WebsocketTracker) AddConnection(ip string) bool {
1005 t.mu.Lock()
1006 defer t.mu.Unlock()
1007
1008 count := t.connections[ip]
1009
1010 if count >= t.maxConnsPerIP {
1011 return false
1012 }
1013
1014 t.connections[ip] = count + 1
1015 return true
1016}
1017
1018func (t *WebsocketTracker) RemoveConnection(ip string) {
1019 t.mu.Lock()
1020 defer t.mu.Unlock()
1021
1022 count := t.connections[ip]
1023 if count > 0 {
1024 t.connections[ip] = count - 1
1025 }
1026
1027 if t.connections[ip] == 0 {
1028 delete(t.connections, ip)
1029 }
1030}