Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strconv"
16 "strings"
17 "sync"
18 "time"
19
20 "github.com/NYTimes/gziphandler"
21 "github.com/bluesky-social/indigo/api/bsky"
22 "github.com/google/uuid"
23 "github.com/julienschmidt/httprouter"
24 "github.com/labstack/echo/v4"
25 "github.com/rs/cors"
26 sloghttp "github.com/samber/slog-http"
27 "golang.org/x/time/rate"
28
29 "github.com/streamplace/oatproxy/pkg/oatproxy"
30 "stream.place/streamplace/js/app"
31 "stream.place/streamplace/pkg/atproto"
32 "stream.place/streamplace/pkg/bus"
33 "stream.place/streamplace/pkg/config"
34 "stream.place/streamplace/pkg/crypto/signers/eip712"
35 "stream.place/streamplace/pkg/director"
36 apierrors "stream.place/streamplace/pkg/errors"
37 "stream.place/streamplace/pkg/linking"
38 "stream.place/streamplace/pkg/log"
39 "stream.place/streamplace/pkg/media"
40 "stream.place/streamplace/pkg/mist/mistconfig"
41 "stream.place/streamplace/pkg/model"
42 "stream.place/streamplace/pkg/notifications"
43 "stream.place/streamplace/pkg/spmetrics"
44 "stream.place/streamplace/pkg/spxrpc"
45 "stream.place/streamplace/pkg/statedb"
46 "stream.place/streamplace/pkg/streamplace"
47
48 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
49 "github.com/slok/go-http-metrics/middleware"
50 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
51 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
52 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
53)
54
55type StreamplaceAPI struct {
56 CLI *config.CLI
57 Model model.Model
58 StatefulDB *statedb.StatefulDB
59 Updater *Updater
60 Signer *eip712.EIP712Signer
61 Mimes map[string]string
62 FirebaseNotifier notifications.FirebaseNotifier
63 MediaManager *media.MediaManager
64 MediaSigner media.MediaSigner
65 XRPCServer *spxrpc.Server
66 // not thread-safe yet
67 Aliases map[string]string
68 Bus *bus.Bus
69 ATSync *atproto.ATProtoSynchronizer
70 Director *director.Director
71
72 connTracker *WebsocketTracker
73
74 limiters map[string]*rate.Limiter
75 limitersMu sync.Mutex
76 SignerCache map[string]media.MediaSigner
77 SignerCacheMu sync.Mutex
78 op *oatproxy.OATProxy
79
80 // override tls port for http redirect server if we're using systemd file descriptors
81 HTTPRedirectTLSPort *int
82 sessions map[string]map[string]time.Time
83 sessionsLock sync.RWMutex
84
85 rtmpSessions map[string]*media.RTMPSession
86 rtmpSessionsLock sync.Mutex
87 rtmpInternalPlaybackAddr string
88}
89
90type WebsocketTracker struct {
91 connections map[string]int
92 maxConnsPerIP int
93 mu sync.RWMutex
94}
95
96func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
97 updater, err := PrepareUpdater(cli)
98 if err != nil {
99 return nil, err
100 }
101 a := &StreamplaceAPI{CLI: cli,
102 Model: mod,
103 StatefulDB: statefulDB,
104 Updater: updater,
105 FirebaseNotifier: noter,
106 MediaManager: mm,
107 MediaSigner: ms,
108 Aliases: map[string]string{},
109 Bus: bus,
110 ATSync: atsync,
111 Director: d,
112 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
113 limiters: make(map[string]*rate.Limiter),
114 SignerCache: make(map[string]media.MediaSigner),
115 op: op,
116 sessions: make(map[string]map[string]time.Time),
117 sessionsLock: sync.RWMutex{},
118 rtmpSessions: make(map[string]*media.RTMPSession),
119 rtmpSessionsLock: sync.Mutex{},
120 }
121 a.Mimes, err = updater.GetMimes()
122 if err != nil {
123 return nil, err
124 }
125 return a, nil
126}
127
128type AppHostingFS struct {
129 http.FileSystem
130}
131
132var ErrorIndex = errors.New("not found, use index.html")
133
134func (fs AppHostingFS) Open(name string) (http.File, error) {
135 file, err1 := fs.FileSystem.Open(name)
136 if err1 == nil {
137 return file, nil
138 }
139 return nil, ErrorIndex
140}
141
142// api/playback/iame.li/webrtc?rendition=source
143// api/playback/iame.li/stream.mp4?rendition=source
144// api/playback/iame.li/stream.webm?rendition=source
145// api/playback/iame.li/hls/index.m3u8
146// api/playback/iame.li/hls/source/stream.m3u8
147// api/playback/iame.li/hls/source/000000000000.ts
148
149func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
150
151 mdlw := middleware.New(middleware.Config{
152 Recorder: metrics.NewRecorder(metrics.Config{}),
153 })
154 var xrpc http.Handler
155 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus)
156 if err != nil {
157 return nil, err
158 }
159 a.XRPCServer = xrpc.(*spxrpc.Server)
160 router := httprouter.New()
161
162 // Create our middleware factory with the default settings.
163
164 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
165
166 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
167
168 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
169 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
170 }
171 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
172 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
173 }
174
175 router.Handler("GET", "/oauth/*anything", a.op.Handler())
176 router.Handler("POST", "/oauth/*anything", a.op.Handler())
177 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
178 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
179 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
180 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
181 apiRouter := httprouter.New()
182 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
183 // old clients
184 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
185 // new ones
186 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
187 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
188 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
189 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
190 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
191 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
192 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
193 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
194 // they're jpegs now
195 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
196 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
197 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
198 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
199 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
200 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
201 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
202 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
203 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
204 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
205 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
206 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
207 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
208 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
209 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
210 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx))
211 apiRouter.NotFound = a.HandleAPI404(ctx)
212 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
213 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
214 router.Handler("GET", "/api/*resource", apiRouterHandler)
215 router.Handler("POST", "/api/*resource", apiRouterHandler)
216 router.Handler("PUT", "/api/*resource", apiRouterHandler)
217 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
218 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
219 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
220 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
221 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
222 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
223 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
224 // i wonder if there's a better way to do this?
225 router.GET("/favicon.ico", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
226 err := a.XRPCServer.HandleFaviconICO(echo.New().NewContext(r, w))
227 if err != nil {
228 log.Error(ctx, "error handling favicon.ico", "error", err)
229 w.WriteHeader(500)
230 return
231 }
232 })
233 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
234 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
235 router.GET("/dl/*params", a.HandleAppDownload(ctx))
236 router.POST("/", a.HandleWebRTCIngest(ctx))
237 for _, redirect := range a.CLI.Redirects {
238 parts := strings.Split(redirect, ":")
239 if len(parts) != 2 {
240 log.Error(ctx, "invalid redirect", "redirect", redirect)
241 return nil, fmt.Errorf("invalid redirect: %s", redirect)
242 }
243 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
244 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
245 })
246 }
247 if a.CLI.FrontendProxy != "" {
248 u, err := url.Parse(a.CLI.FrontendProxy)
249 if err != nil {
250 return nil, err
251 }
252 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
253 router.NotFound = &httputil.ReverseProxy{
254 Rewrite: func(r *httputil.ProxyRequest) {
255 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
256 // we need to use 127.0.0.1 because the atproto oauth client requires it
257 r.Out.Header.Set("Origin", u.String())
258 r.SetURL(u)
259 },
260 }
261 } else {
262 files, err := app.Files()
263 if err != nil {
264 return nil, err
265 }
266 index, err := files.Open("index.html")
267 if err != nil {
268 return nil, err
269 }
270 bs, err := io.ReadAll(index)
271 if err != nil {
272 return nil, err
273 }
274 linker, err := linking.NewLinker(ctx, bs)
275 if err != nil {
276 return nil, err
277 }
278 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
279 if err != nil {
280 return nil, err
281 }
282 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
283 }
284 // needed because the WebRTC handler issues 405s from / otherwise
285 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
286 router.NotFound.ServeHTTP(w, r)
287 })
288 handler := sloghttp.Recovery(router)
289 handler = cors.AllowAll().Handler(handler)
290 handler = sloghttp.New(slog.Default())(handler)
291 handler = a.RateLimitMiddleware(ctx)(handler)
292
293 // this needs to be LAST so nothing else clobbers the context
294 handler = a.ContextMiddleware(ctx)(handler)
295
296 return handler, nil
297}
298func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler {
299 return func(next http.Handler) http.Handler {
300 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
301 uuid := uuid.New().String()
302 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
303 r = r.WithContext(ctx)
304 next.ServeHTTP(w, r)
305 })
306 }
307}
308func copyHeader(dst, src http.Header) {
309 for k, vv := range src {
310 // we'll handle CORS ourselves, thanks
311 if strings.HasPrefix(k, "Access-Control") {
312 continue
313 }
314 for _, v := range vv {
315 dst.Add(k, v)
316 }
317 }
318}
319
320// handler that takes care of static files and otherwise returns the index.html with the correct link card data
321func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
322 files, err := app.Files()
323 if err != nil {
324 return nil, err
325 }
326 fs := AppHostingFS{http.FS(files)}
327
328 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
329 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
330 f := strings.TrimPrefix(req.URL.Path, "/")
331 // under docs we need the index.html suffix due to astro rendering
332 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
333 f += "index.html"
334 }
335 _, err := fs.Open(f)
336 if err == nil {
337 fileHandler.ServeHTTP(w, req)
338 return
339 }
340 if errors.Is(err, ErrorIndex) || f == "" {
341 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN)
342 if err != nil {
343 log.Error(ctx, "error generating default card", "error", err)
344 }
345 w.Header().Set("Content-Type", "text/html")
346 if _, err := w.Write(bs); err != nil {
347 log.Error(ctx, "error writing response", "error", err)
348 }
349 } else {
350 log.Warn(ctx, "error opening file", "error", err)
351 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
352 }
353 })
354 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
355 proto := "http"
356 if req.TLS != nil {
357 proto = "https"
358 }
359 fwProto := req.Header.Get("x-forwarded-proto")
360 if fwProto != "" {
361 proto = fwProto
362 }
363 req.URL.Host = req.Host
364 req.URL.Scheme = proto
365 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
366 // quick check for things that aren't valid handles/dids
367 if strings.ContainsAny(maybeHandle, "/_") {
368 defaultHandler.ServeHTTP(w, req)
369 return
370 }
371 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
372 if err != nil || repo == nil {
373 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
374 defaultHandler.ServeHTTP(w, req)
375 return
376 }
377 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
378 if err != nil || ls == nil {
379 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
380 defaultHandler.ServeHTTP(w, req)
381 return
382 }
383 lsv, err := ls.ToLivestreamView()
384 if err != nil || lsv == nil {
385 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
386 defaultHandler.ServeHTTP(w, req)
387 return
388 }
389 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN)
390 if err != nil {
391 log.Error(ctx, "error generating html", "error", err)
392 defaultHandler.ServeHTTP(w, req)
393 return
394 }
395 w.Header().Set("Content-Type", "text/html")
396 if _, err := w.Write(bs); err != nil {
397 log.Error(ctx, "error writing response", "error", err)
398 }
399 }), nil
400}
401
402func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
403 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
404 if !a.CLI.HasMist() {
405 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
406 return
407 }
408 stream := params.ByName("stream")
409 if stream == "" {
410 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
411 return
412 }
413
414 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
415 prefix := fmt.Sprintf(tmpl, fullstream)
416 resource := params.ByName("resource")
417
418 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
419
420 client := &http.Client{}
421 req.URL = &url.URL{
422 Scheme: "http",
423 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
424 Path: fmt.Sprintf("%s%s", prefix, resource),
425 RawQuery: req.URL.RawQuery,
426 }
427
428 //http: Request.RequestURI can't be set in client requests.
429 //http://golang.org/src/pkg/net/http/client.go
430 req.RequestURI = ""
431
432 resp, err := client.Do(req)
433 if err != nil {
434 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
435 return
436 }
437 defer resp.Body.Close()
438
439 copyHeader(w.Header(), resp.Header)
440 w.WriteHeader(resp.StatusCode)
441 if _, err := io.Copy(w, resp.Body); err != nil {
442 log.Error(ctx, "error writing response", "error", err)
443 }
444 }
445}
446
447func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
448 return func(w http.ResponseWriter, req *http.Request) {
449 noslash := req.URL.Path[1:]
450 ct, ok := a.Mimes[noslash]
451 if ok {
452 w.Header().Set("content-type", ct)
453 }
454 fs.ServeHTTP(w, req)
455 }
456}
457
458func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
459 var tlsPort string
460 var err error
461 if a.HTTPRedirectTLSPort != nil {
462 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort)
463 } else {
464 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr)
465 if err != nil {
466 return nil, err
467 }
468 }
469 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
470 host, _, err := net.SplitHostPort(req.Host)
471 if err != nil {
472 host = req.Host
473 }
474 u := req.URL
475 if tlsPort == "443" {
476 u.Host = host
477 } else {
478 u.Host = net.JoinHostPort(host, tlsPort)
479 }
480 u.Scheme = "https"
481 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
482 }
483 mux := http.NewServeMux()
484 mux.HandleFunc("/", handleRedirect)
485 return mux, nil
486}
487
488type NotificationPayload struct {
489 Token string `json:"token"`
490 RepoDID string `json:"repoDID"`
491}
492
493func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
494 return func(w http.ResponseWriter, req *http.Request) {
495 w.WriteHeader(404)
496 }
497}
498
499func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
500 return func(w http.ResponseWriter, req *http.Request) {
501 payload, err := io.ReadAll(req.Body)
502 if err != nil {
503 log.Log(ctx, "error reading notification create", "error", err)
504 w.WriteHeader(400)
505 return
506 }
507 n := NotificationPayload{}
508 err = json.Unmarshal(payload, &n)
509 if err != nil {
510 log.Log(ctx, "error unmarshalling notification create", "error", err)
511 w.WriteHeader(400)
512 return
513 }
514 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID)
515 if err != nil {
516 log.Log(ctx, "error creating notification", "error", err)
517 w.WriteHeader(400)
518 return
519 }
520 log.Log(ctx, "successfully created notification", "token", n.Token)
521 w.WriteHeader(200)
522 if n.RepoDID != "" {
523 go func() {
524 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
525 if err != nil {
526 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
527 }
528 }()
529 }
530 }
531}
532
533func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
534 return func(w http.ResponseWriter, req *http.Request) {
535 err := a.MediaManager.ValidateMP4(ctx, req.Body, false)
536 if err != nil {
537 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
538 return
539 }
540 w.WriteHeader(200)
541 }
542}
543
544func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
545 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
546 if !a.CLI.PlayerTelemetry {
547 w.WriteHeader(200)
548 return
549 }
550 var event model.PlayerEventAPI
551 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
552 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
553 return
554 }
555 err := a.Model.CreatePlayerEvent(event)
556 if err != nil {
557 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
558 return
559 }
560 w.WriteHeader(201)
561 }
562}
563
564func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
565 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
566 segs, err := a.Model.MostRecentSegments()
567 if err != nil {
568 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
569 return
570 }
571 bs, err := json.Marshal(segs)
572 if err != nil {
573 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
574 return
575 }
576 w.Header().Add("Content-Type", "application/json")
577 if _, err := w.Write(bs); err != nil {
578 log.Error(ctx, "error writing response", "error", err)
579 }
580 }
581}
582
583func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
584 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
585 user := params.ByName("repoDID")
586 if user == "" {
587 apierrors.WriteHTTPBadRequest(w, "user required", nil)
588 return
589 }
590 user, err := a.NormalizeUser(ctx, user)
591 if err != nil {
592 apierrors.WriteHTTPNotFound(w, "user not found", err)
593 return
594 }
595 seg, err := a.Model.LatestSegmentForUser(user)
596 if err != nil {
597 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
598 return
599 }
600 streamplaceSeg, err := seg.ToStreamplaceSegment()
601 if err != nil {
602 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
603 return
604 }
605 bs, err := json.Marshal(streamplaceSeg)
606 if err != nil {
607 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
608 return
609 }
610 w.Header().Add("Content-Type", "application/json")
611 if _, err := w.Write(bs); err != nil {
612 log.Error(ctx, "error writing response", "error", err)
613 }
614 }
615}
616
617func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
618 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
619 user := params.ByName("user")
620 if user == "" {
621 apierrors.WriteHTTPBadRequest(w, "user required", nil)
622 return
623 }
624 user, err := a.NormalizeUser(ctx, user)
625 if err != nil {
626 apierrors.WriteHTTPNotFound(w, "user not found", err)
627 return
628 }
629 count := spmetrics.GetViewCount(user)
630 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
631 if err != nil {
632 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
633 return
634 }
635 if _, err := w.Write(bs); err != nil {
636 log.Error(ctx, "error writing response", "error", err)
637 }
638 }
639}
640
641func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
642 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
643 log.Log(ctx, "got bluesky notification", "params", params)
644 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
645 if err != nil {
646 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
647 return
648 }
649 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
650 if err != nil {
651 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
652 return
653 }
654 bs, err := json.Marshal(signingKeys)
655 if err != nil {
656 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
657 return
658 }
659 if _, err := w.Write(bs); err != nil {
660 log.Error(ctx, "error writing response", "error", err)
661 }
662 }
663}
664
665type ChatResponse struct {
666 Post *bsky.FeedPost `json:"post"`
667 Repo *model.Repo `json:"repo"`
668 CID string `json:"cid"`
669}
670
671func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
672 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
673 user := params.ByName("repoDID")
674 if user == "" {
675 apierrors.WriteHTTPBadRequest(w, "user required", nil)
676 return
677 }
678 repoDID, err := a.NormalizeUser(ctx, user)
679 if err != nil {
680 apierrors.WriteHTTPNotFound(w, "user not found", err)
681 return
682 }
683 replies, err := a.Model.GetReplies(repoDID)
684 if err != nil {
685 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
686 return
687 }
688 bs, err := json.Marshal(replies)
689 if err != nil {
690 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
691 return
692 }
693 w.Header().Set("Content-Type", "application/json")
694 if _, err := w.Write(bs); err != nil {
695 log.Error(ctx, "error writing response", "error", err)
696 }
697 }
698}
699
700func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
701 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
702 user := params.ByName("repoDID")
703 if user == "" {
704 apierrors.WriteHTTPBadRequest(w, "user required", nil)
705 return
706 }
707 repoDID, err := a.NormalizeUser(ctx, user)
708 if err != nil {
709 apierrors.WriteHTTPNotFound(w, "user not found", err)
710 return
711 }
712 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
713 if err != nil {
714 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
715 return
716 }
717
718 doc, err := livestream.ToLivestreamView()
719 if err != nil {
720 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
721 return
722 }
723
724 if livestream == nil {
725 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
726 return
727 }
728
729 bs, err := json.Marshal(doc)
730 if err != nil {
731 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
732 return
733 }
734 w.Header().Set("Content-Type", "application/json")
735 if _, err := w.Write(bs); err != nil {
736 log.Error(ctx, "error writing response", "error", err)
737 }
738 }
739}
740
741func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
742 return func(next http.Handler) http.Handler {
743 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
744 ip, _, err := net.SplitHostPort(req.RemoteAddr)
745 if err != nil {
746 ip = req.RemoteAddr
747 }
748
749 if a.CLI.RateLimitPerSecond > 0 {
750 limiter := a.getLimiter(ip)
751
752 if !limiter.Allow() {
753 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
754 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
755 return
756 }
757 }
758
759 next.ServeHTTP(w, req)
760 })
761 }
762}
763
764// helper for getting a listener from a systemd file descriptor
765func getListenerFromFD(fdName string) (net.Listener, error) {
766 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES"))
767 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) {
768 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":")
769 for i, name := range names {
770 if name == fdName {
771 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3)
772 f1 := os.NewFile(uintptr(i+3), fdName)
773 return net.FileListener(f1)
774 }
775 }
776 }
777 return nil, nil
778}
779
780func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
781 handler, err := a.Handler(ctx)
782 if err != nil {
783 return err
784 }
785 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
786 ln, err := getListenerFromFD("http")
787 if err != nil {
788 return err
789 }
790 if ln == nil {
791 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
792 if err != nil {
793 return err
794 }
795 } else {
796 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr())
797 }
798 log.Log(ctx, "http server starting", "addr", ln.Addr())
799 return s.Serve(ln)
800 })
801}
802
803func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
804 handler, err := a.RedirectHandler(ctx)
805 if err != nil {
806 return err
807 }
808 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
809 ln, err := getListenerFromFD("http")
810 if err != nil {
811 return err
812 }
813 if ln == nil {
814 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
815 if err != nil {
816 return err
817 }
818 } else {
819 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr())
820 }
821 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr())
822 return s.Serve(ln)
823 })
824}
825
826func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
827 handler, err := a.Handler(ctx)
828 if err != nil {
829 return err
830 }
831 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
832 ln, err := getListenerFromFD("https")
833 if err != nil {
834 return err
835 }
836 if ln == nil {
837 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr)
838 if err != nil {
839 return err
840 }
841 } else {
842 // tell the redirect handler we're using systemd and they should go to 443
843 port443 := 443
844 a.HTTPRedirectTLSPort = &port443
845 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr())
846 }
847 log.Log(ctx, "https server starting",
848 "addr", ln.Addr(),
849 "certPath", a.CLI.TLSCertPath,
850 "keyPath", a.CLI.TLSKeyPath,
851 )
852 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
853 })
854}
855
856func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
857 ctx, cancel := context.WithCancel(ctx)
858 handler = gziphandler.GzipHandler(handler)
859 server := http.Server{Handler: handler}
860 var serveErr error
861 go func() {
862 serveErr = serve(&server)
863 cancel()
864 }()
865 <-ctx.Done()
866 if serveErr != nil {
867 return fmt.Errorf("error in http server: %w", serveErr)
868 }
869
870 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
871 defer cancel()
872 return server.Shutdown(ctx)
873}
874
875func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
876 return func(w http.ResponseWriter, req *http.Request) {
877 w.WriteHeader(200)
878 }
879}
880
881func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
882 a.limitersMu.Lock()
883 defer a.limitersMu.Unlock()
884
885 limiter, exists := a.limiters[ip]
886 if !exists {
887 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
888 a.limiters[ip] = limiter
889 }
890
891 return limiter
892}
893
894func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle {
895 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
896 user := params.ByName("user")
897 file := params.ByName("file")
898 if user == "" || file == "" {
899 apierrors.WriteHTTPBadRequest(w, "user and file required", nil)
900 return
901 }
902 user, err := a.NormalizeUser(ctx, user)
903 if err != nil {
904 apierrors.WriteHTTPNotFound(w, "user not found", err)
905 return
906 }
907 fPath := []string{user, "clips", file}
908 exists, err := a.CLI.DataFileExists(fPath)
909 if err != nil {
910 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err)
911 return
912 }
913 if !exists {
914 apierrors.WriteHTTPNotFound(w, "file not found", nil)
915 return
916 }
917 fd, err := os.Open(a.CLI.DataFilePath(fPath))
918 if err != nil {
919 apierrors.WriteHTTPInternalServerError(w, "could not open file", err)
920 return
921 }
922 defer fd.Close()
923 w.Header().Set("Content-Type", "video/mp4")
924 if _, err := io.Copy(w, fd); err != nil {
925 log.Error(ctx, "error writing response", "error", err)
926 }
927 }
928}
929
930func NewWebsocketTracker(maxConns int) *WebsocketTracker {
931 return &WebsocketTracker{
932 connections: make(map[string]int),
933 maxConnsPerIP: maxConns,
934 }
935}
936
937func (t *WebsocketTracker) AddConnection(ip string) bool {
938 t.mu.Lock()
939 defer t.mu.Unlock()
940
941 count := t.connections[ip]
942
943 if count >= t.maxConnsPerIP {
944 return false
945 }
946
947 t.connections[ip] = count + 1
948 return true
949}
950
951func (t *WebsocketTracker) RemoveConnection(ip string) {
952 t.mu.Lock()
953 defer t.mu.Unlock()
954
955 count := t.connections[ip]
956 if count > 0 {
957 t.connections[ip] = count - 1
958 }
959
960 if t.connections[ip] == 0 {
961 delete(t.connections, ip)
962 }
963}