Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strconv"
16 "strings"
17 "sync"
18 "time"
19
20 "github.com/NYTimes/gziphandler"
21 "github.com/bluesky-social/indigo/api/bsky"
22 "github.com/google/uuid"
23 "github.com/julienschmidt/httprouter"
24 "github.com/rs/cors"
25 sloghttp "github.com/samber/slog-http"
26 "golang.org/x/time/rate"
27
28 "github.com/streamplace/oatproxy/pkg/oatproxy"
29 "stream.place/streamplace/js/app"
30 "stream.place/streamplace/pkg/atproto"
31 "stream.place/streamplace/pkg/bus"
32 "stream.place/streamplace/pkg/config"
33 "stream.place/streamplace/pkg/crypto/signers/eip712"
34 "stream.place/streamplace/pkg/director"
35 apierrors "stream.place/streamplace/pkg/errors"
36 "stream.place/streamplace/pkg/linking"
37 "stream.place/streamplace/pkg/log"
38 "stream.place/streamplace/pkg/media"
39 "stream.place/streamplace/pkg/mist/mistconfig"
40 "stream.place/streamplace/pkg/model"
41 "stream.place/streamplace/pkg/notifications"
42 "stream.place/streamplace/pkg/spmetrics"
43 "stream.place/streamplace/pkg/spxrpc"
44 "stream.place/streamplace/pkg/statedb"
45 "stream.place/streamplace/pkg/streamplace"
46
47 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
48 "github.com/slok/go-http-metrics/middleware"
49 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
50 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
51 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
52)
53
54type StreamplaceAPI struct {
55 CLI *config.CLI
56 Model model.Model
57 StatefulDB *statedb.StatefulDB
58 Updater *Updater
59 Signer *eip712.EIP712Signer
60 Mimes map[string]string
61 FirebaseNotifier notifications.FirebaseNotifier
62 MediaManager *media.MediaManager
63 MediaSigner media.MediaSigner
64 // not thread-safe yet
65 Aliases map[string]string
66 Bus *bus.Bus
67 ATSync *atproto.ATProtoSynchronizer
68 Director *director.Director
69
70 connTracker *WebsocketTracker
71
72 limiters map[string]*rate.Limiter
73 limitersMu sync.Mutex
74 SignerCache map[string]media.MediaSigner
75 SignerCacheMu sync.Mutex
76 op *oatproxy.OATProxy
77
78 // override tls port for http redirect server if we're using systemd file descriptors
79 HTTPRedirectTLSPort *int
80 sessions map[string]map[string]time.Time
81 sessionsLock sync.RWMutex
82
83 rtmpSessions map[string]*media.RTMPSession
84 rtmpSessionsLock sync.Mutex
85 rtmpInternalPlaybackAddr string
86}
87
88type WebsocketTracker struct {
89 connections map[string]int
90 maxConnsPerIP int
91 mu sync.RWMutex
92}
93
94func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
95 updater, err := PrepareUpdater(cli)
96 if err != nil {
97 return nil, err
98 }
99 a := &StreamplaceAPI{CLI: cli,
100 Model: mod,
101 StatefulDB: statefulDB,
102 Updater: updater,
103 FirebaseNotifier: noter,
104 MediaManager: mm,
105 MediaSigner: ms,
106 Aliases: map[string]string{},
107 Bus: bus,
108 ATSync: atsync,
109 Director: d,
110 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
111 limiters: make(map[string]*rate.Limiter),
112 SignerCache: make(map[string]media.MediaSigner),
113 op: op,
114 sessions: make(map[string]map[string]time.Time),
115 sessionsLock: sync.RWMutex{},
116 rtmpSessions: make(map[string]*media.RTMPSession),
117 rtmpSessionsLock: sync.Mutex{},
118 }
119 a.Mimes, err = updater.GetMimes()
120 if err != nil {
121 return nil, err
122 }
123 return a, nil
124}
125
126type AppHostingFS struct {
127 http.FileSystem
128}
129
130var ErrorIndex = errors.New("not found, use index.html")
131
132func (fs AppHostingFS) Open(name string) (http.File, error) {
133 file, err1 := fs.FileSystem.Open(name)
134 if err1 == nil {
135 return file, nil
136 }
137 return nil, ErrorIndex
138}
139
140// api/playback/iame.li/webrtc?rendition=source
141// api/playback/iame.li/stream.mp4?rendition=source
142// api/playback/iame.li/stream.webm?rendition=source
143// api/playback/iame.li/hls/index.m3u8
144// api/playback/iame.li/hls/source/stream.m3u8
145// api/playback/iame.li/hls/source/000000000000.ts
146
147func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
148
149 mdlw := middleware.New(middleware.Config{
150 Recorder: metrics.NewRecorder(metrics.Config{}),
151 })
152 var xrpc http.Handler
153 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus)
154 if err != nil {
155 return nil, err
156 }
157 router := httprouter.New()
158
159 // Create our middleware factory with the default settings.
160
161 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
162
163 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
164
165 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
166 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
167 }
168 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
169 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
170 }
171
172 router.Handler("GET", "/oauth/*anything", a.op.Handler())
173 router.Handler("POST", "/oauth/*anything", a.op.Handler())
174 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
175 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
176 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
177 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
178 apiRouter := httprouter.New()
179 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
180 // old clients
181 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
182 // new ones
183 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
184 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
185 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
186 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
187 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
188 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
189 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
190 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
191 // they're jpegs now
192 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
193 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
194 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
195 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
196 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
197 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
198 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
199 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
200 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
201 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
202 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
203 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
204 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
205 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
206 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
207 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx))
208 apiRouter.NotFound = a.HandleAPI404(ctx)
209 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
210 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
211 router.Handler("GET", "/api/*resource", apiRouterHandler)
212 router.Handler("POST", "/api/*resource", apiRouterHandler)
213 router.Handler("PUT", "/api/*resource", apiRouterHandler)
214 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
215 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
216 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
217 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
218 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
219 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
220 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
221 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
222 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
223 router.GET("/dl/*params", a.HandleAppDownload(ctx))
224 router.POST("/", a.HandleWebRTCIngest(ctx))
225 for _, redirect := range a.CLI.Redirects {
226 parts := strings.Split(redirect, ":")
227 if len(parts) != 2 {
228 log.Error(ctx, "invalid redirect", "redirect", redirect)
229 return nil, fmt.Errorf("invalid redirect: %s", redirect)
230 }
231 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
232 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
233 })
234 }
235 if a.CLI.FrontendProxy != "" {
236 u, err := url.Parse(a.CLI.FrontendProxy)
237 if err != nil {
238 return nil, err
239 }
240 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
241 router.NotFound = &httputil.ReverseProxy{
242 Rewrite: func(r *httputil.ProxyRequest) {
243 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
244 // we need to use 127.0.0.1 because the atproto oauth client requires it
245 r.Out.Header.Set("Origin", u.String())
246 r.SetURL(u)
247 },
248 }
249 } else {
250 files, err := app.Files()
251 if err != nil {
252 return nil, err
253 }
254 index, err := files.Open("index.html")
255 if err != nil {
256 return nil, err
257 }
258 bs, err := io.ReadAll(index)
259 if err != nil {
260 return nil, err
261 }
262 linker, err := linking.NewLinker(ctx, bs)
263 if err != nil {
264 return nil, err
265 }
266 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
267 if err != nil {
268 return nil, err
269 }
270 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
271 }
272 // needed because the WebRTC handler issues 405s from / otherwise
273 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
274 router.NotFound.ServeHTTP(w, r)
275 })
276 handler := sloghttp.Recovery(router)
277 handler = cors.AllowAll().Handler(handler)
278 handler = sloghttp.New(slog.Default())(handler)
279 handler = a.RateLimitMiddleware(ctx)(handler)
280
281 // this needs to be LAST so nothing else clobbers the context
282 handler = a.ContextMiddleware(ctx)(handler)
283
284 return handler, nil
285}
286func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler {
287 return func(next http.Handler) http.Handler {
288 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
289 uuid := uuid.New().String()
290 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
291 r = r.WithContext(ctx)
292 next.ServeHTTP(w, r)
293 })
294 }
295}
296func copyHeader(dst, src http.Header) {
297 for k, vv := range src {
298 // we'll handle CORS ourselves, thanks
299 if strings.HasPrefix(k, "Access-Control") {
300 continue
301 }
302 for _, v := range vv {
303 dst.Add(k, v)
304 }
305 }
306}
307
308// handler that takes care of static files and otherwise returns the index.html with the correct link card data
309func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
310 files, err := app.Files()
311 if err != nil {
312 return nil, err
313 }
314 fs := AppHostingFS{http.FS(files)}
315
316 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
317 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
318 f := strings.TrimPrefix(req.URL.Path, "/")
319 // under docs we need the index.html suffix due to astro rendering
320 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
321 f += "index.html"
322 }
323 _, err := fs.Open(f)
324 if err == nil {
325 fileHandler.ServeHTTP(w, req)
326 return
327 }
328 if errors.Is(err, ErrorIndex) || f == "" {
329 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN)
330 if err != nil {
331 log.Error(ctx, "error generating default card", "error", err)
332 }
333 w.Header().Set("Content-Type", "text/html")
334 if _, err := w.Write(bs); err != nil {
335 log.Error(ctx, "error writing response", "error", err)
336 }
337 } else {
338 log.Warn(ctx, "error opening file", "error", err)
339 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
340 }
341 })
342 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
343 proto := "http"
344 if req.TLS != nil {
345 proto = "https"
346 }
347 fwProto := req.Header.Get("x-forwarded-proto")
348 if fwProto != "" {
349 proto = fwProto
350 }
351 req.URL.Host = req.Host
352 req.URL.Scheme = proto
353 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
354 // quick check for things that aren't valid handles/dids
355 if strings.ContainsAny(maybeHandle, "/_") {
356 defaultHandler.ServeHTTP(w, req)
357 return
358 }
359 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
360 if err != nil || repo == nil {
361 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
362 defaultHandler.ServeHTTP(w, req)
363 return
364 }
365 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
366 if err != nil || ls == nil {
367 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
368 defaultHandler.ServeHTTP(w, req)
369 return
370 }
371 lsv, err := ls.ToLivestreamView()
372 if err != nil || lsv == nil {
373 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
374 defaultHandler.ServeHTTP(w, req)
375 return
376 }
377 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN)
378 if err != nil {
379 log.Error(ctx, "error generating html", "error", err)
380 defaultHandler.ServeHTTP(w, req)
381 return
382 }
383 w.Header().Set("Content-Type", "text/html")
384 if _, err := w.Write(bs); err != nil {
385 log.Error(ctx, "error writing response", "error", err)
386 }
387 }), nil
388}
389
390func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
391 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
392 if !a.CLI.HasMist() {
393 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
394 return
395 }
396 stream := params.ByName("stream")
397 if stream == "" {
398 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
399 return
400 }
401
402 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
403 prefix := fmt.Sprintf(tmpl, fullstream)
404 resource := params.ByName("resource")
405
406 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
407
408 client := &http.Client{}
409 req.URL = &url.URL{
410 Scheme: "http",
411 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
412 Path: fmt.Sprintf("%s%s", prefix, resource),
413 RawQuery: req.URL.RawQuery,
414 }
415
416 //http: Request.RequestURI can't be set in client requests.
417 //http://golang.org/src/pkg/net/http/client.go
418 req.RequestURI = ""
419
420 resp, err := client.Do(req)
421 if err != nil {
422 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
423 return
424 }
425 defer resp.Body.Close()
426
427 copyHeader(w.Header(), resp.Header)
428 w.WriteHeader(resp.StatusCode)
429 if _, err := io.Copy(w, resp.Body); err != nil {
430 log.Error(ctx, "error writing response", "error", err)
431 }
432 }
433}
434
435func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
436 return func(w http.ResponseWriter, req *http.Request) {
437 noslash := req.URL.Path[1:]
438 ct, ok := a.Mimes[noslash]
439 if ok {
440 w.Header().Set("content-type", ct)
441 }
442 fs.ServeHTTP(w, req)
443 }
444}
445
446func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
447 var tlsPort string
448 var err error
449 if a.HTTPRedirectTLSPort != nil {
450 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort)
451 } else {
452 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr)
453 if err != nil {
454 return nil, err
455 }
456 }
457 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
458 host, _, err := net.SplitHostPort(req.Host)
459 if err != nil {
460 host = req.Host
461 }
462 u := req.URL
463 if tlsPort == "443" {
464 u.Host = host
465 } else {
466 u.Host = net.JoinHostPort(host, tlsPort)
467 }
468 u.Scheme = "https"
469 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
470 }
471 mux := http.NewServeMux()
472 mux.HandleFunc("/", handleRedirect)
473 return mux, nil
474}
475
476type NotificationPayload struct {
477 Token string `json:"token"`
478 RepoDID string `json:"repoDID"`
479}
480
481func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
482 return func(w http.ResponseWriter, req *http.Request) {
483 w.WriteHeader(404)
484 }
485}
486
487func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
488 return func(w http.ResponseWriter, req *http.Request) {
489 payload, err := io.ReadAll(req.Body)
490 if err != nil {
491 log.Log(ctx, "error reading notification create", "error", err)
492 w.WriteHeader(400)
493 return
494 }
495 n := NotificationPayload{}
496 err = json.Unmarshal(payload, &n)
497 if err != nil {
498 log.Log(ctx, "error unmarshalling notification create", "error", err)
499 w.WriteHeader(400)
500 return
501 }
502 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID)
503 if err != nil {
504 log.Log(ctx, "error creating notification", "error", err)
505 w.WriteHeader(400)
506 return
507 }
508 log.Log(ctx, "successfully created notification", "token", n.Token)
509 w.WriteHeader(200)
510 if n.RepoDID != "" {
511 go func() {
512 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
513 if err != nil {
514 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
515 }
516 }()
517 }
518 }
519}
520
521func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
522 return func(w http.ResponseWriter, req *http.Request) {
523 err := a.MediaManager.ValidateMP4(ctx, req.Body, false)
524 if err != nil {
525 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
526 return
527 }
528 w.WriteHeader(200)
529 }
530}
531
532func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
533 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
534 var event model.PlayerEventAPI
535 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
536 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
537 return
538 }
539 err := a.Model.CreatePlayerEvent(event)
540 if err != nil {
541 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
542 return
543 }
544 w.WriteHeader(201)
545 }
546}
547
548func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
549 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
550 segs, err := a.Model.MostRecentSegments()
551 if err != nil {
552 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
553 return
554 }
555 bs, err := json.Marshal(segs)
556 if err != nil {
557 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
558 return
559 }
560 w.Header().Add("Content-Type", "application/json")
561 if _, err := w.Write(bs); err != nil {
562 log.Error(ctx, "error writing response", "error", err)
563 }
564 }
565}
566
567func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
568 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
569 user := params.ByName("repoDID")
570 if user == "" {
571 apierrors.WriteHTTPBadRequest(w, "user required", nil)
572 return
573 }
574 user, err := a.NormalizeUser(ctx, user)
575 if err != nil {
576 apierrors.WriteHTTPNotFound(w, "user not found", err)
577 return
578 }
579 seg, err := a.Model.LatestSegmentForUser(user)
580 if err != nil {
581 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
582 return
583 }
584 streamplaceSeg, err := seg.ToStreamplaceSegment()
585 if err != nil {
586 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
587 return
588 }
589 bs, err := json.Marshal(streamplaceSeg)
590 if err != nil {
591 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
592 return
593 }
594 w.Header().Add("Content-Type", "application/json")
595 if _, err := w.Write(bs); err != nil {
596 log.Error(ctx, "error writing response", "error", err)
597 }
598 }
599}
600
601func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
602 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
603 user := params.ByName("user")
604 if user == "" {
605 apierrors.WriteHTTPBadRequest(w, "user required", nil)
606 return
607 }
608 user, err := a.NormalizeUser(ctx, user)
609 if err != nil {
610 apierrors.WriteHTTPNotFound(w, "user not found", err)
611 return
612 }
613 count := spmetrics.GetViewCount(user)
614 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
615 if err != nil {
616 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
617 return
618 }
619 if _, err := w.Write(bs); err != nil {
620 log.Error(ctx, "error writing response", "error", err)
621 }
622 }
623}
624
625func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
626 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
627 log.Log(ctx, "got bluesky notification", "params", params)
628 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
629 if err != nil {
630 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
631 return
632 }
633 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
634 if err != nil {
635 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
636 return
637 }
638 bs, err := json.Marshal(signingKeys)
639 if err != nil {
640 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
641 return
642 }
643 if _, err := w.Write(bs); err != nil {
644 log.Error(ctx, "error writing response", "error", err)
645 }
646 }
647}
648
649type ChatResponse struct {
650 Post *bsky.FeedPost `json:"post"`
651 Repo *model.Repo `json:"repo"`
652 CID string `json:"cid"`
653}
654
655func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
656 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
657 user := params.ByName("repoDID")
658 if user == "" {
659 apierrors.WriteHTTPBadRequest(w, "user required", nil)
660 return
661 }
662 repoDID, err := a.NormalizeUser(ctx, user)
663 if err != nil {
664 apierrors.WriteHTTPNotFound(w, "user not found", err)
665 return
666 }
667 replies, err := a.Model.GetReplies(repoDID)
668 if err != nil {
669 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
670 return
671 }
672 bs, err := json.Marshal(replies)
673 if err != nil {
674 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
675 return
676 }
677 w.Header().Set("Content-Type", "application/json")
678 if _, err := w.Write(bs); err != nil {
679 log.Error(ctx, "error writing response", "error", err)
680 }
681 }
682}
683
684func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
685 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
686 user := params.ByName("repoDID")
687 if user == "" {
688 apierrors.WriteHTTPBadRequest(w, "user required", nil)
689 return
690 }
691 repoDID, err := a.NormalizeUser(ctx, user)
692 if err != nil {
693 apierrors.WriteHTTPNotFound(w, "user not found", err)
694 return
695 }
696 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
697 if err != nil {
698 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
699 return
700 }
701
702 doc, err := livestream.ToLivestreamView()
703 if err != nil {
704 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
705 return
706 }
707
708 if livestream == nil {
709 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
710 return
711 }
712
713 bs, err := json.Marshal(doc)
714 if err != nil {
715 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
716 return
717 }
718 w.Header().Set("Content-Type", "application/json")
719 if _, err := w.Write(bs); err != nil {
720 log.Error(ctx, "error writing response", "error", err)
721 }
722 }
723}
724
725func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
726 return func(next http.Handler) http.Handler {
727 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
728 ip, _, err := net.SplitHostPort(req.RemoteAddr)
729 if err != nil {
730 ip = req.RemoteAddr
731 }
732
733 if a.CLI.RateLimitPerSecond > 0 {
734 limiter := a.getLimiter(ip)
735
736 if !limiter.Allow() {
737 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
738 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
739 return
740 }
741 }
742
743 next.ServeHTTP(w, req)
744 })
745 }
746}
747
748// helper for getting a listener from a systemd file descriptor
749func getListenerFromFD(fdName string) (net.Listener, error) {
750 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES"))
751 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) {
752 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":")
753 for i, name := range names {
754 if name == fdName {
755 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3)
756 f1 := os.NewFile(uintptr(i+3), fdName)
757 return net.FileListener(f1)
758 }
759 }
760 }
761 return nil, nil
762}
763
764func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
765 handler, err := a.Handler(ctx)
766 if err != nil {
767 return err
768 }
769 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
770 ln, err := getListenerFromFD("http")
771 if err != nil {
772 return err
773 }
774 if ln == nil {
775 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
776 if err != nil {
777 return err
778 }
779 } else {
780 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr())
781 }
782 log.Log(ctx, "http server starting", "addr", ln.Addr())
783 return s.Serve(ln)
784 })
785}
786
787func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
788 handler, err := a.RedirectHandler(ctx)
789 if err != nil {
790 return err
791 }
792 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
793 ln, err := getListenerFromFD("http")
794 if err != nil {
795 return err
796 }
797 if ln == nil {
798 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
799 if err != nil {
800 return err
801 }
802 } else {
803 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr())
804 }
805 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr())
806 return s.Serve(ln)
807 })
808}
809
810func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
811 handler, err := a.Handler(ctx)
812 if err != nil {
813 return err
814 }
815 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
816 ln, err := getListenerFromFD("https")
817 if err != nil {
818 return err
819 }
820 if ln == nil {
821 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr)
822 if err != nil {
823 return err
824 }
825 } else {
826 // tell the redirect handler we're using systemd and they should go to 443
827 port443 := 443
828 a.HTTPRedirectTLSPort = &port443
829 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr())
830 }
831 log.Log(ctx, "https server starting",
832 "addr", ln.Addr(),
833 "certPath", a.CLI.TLSCertPath,
834 "keyPath", a.CLI.TLSKeyPath,
835 )
836 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
837 })
838}
839
840func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
841 ctx, cancel := context.WithCancel(ctx)
842 handler = gziphandler.GzipHandler(handler)
843 server := http.Server{Handler: handler}
844 var serveErr error
845 go func() {
846 serveErr = serve(&server)
847 cancel()
848 }()
849 <-ctx.Done()
850 if serveErr != nil {
851 return fmt.Errorf("error in http server: %w", serveErr)
852 }
853
854 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
855 defer cancel()
856 return server.Shutdown(ctx)
857}
858
859func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
860 return func(w http.ResponseWriter, req *http.Request) {
861 w.WriteHeader(200)
862 }
863}
864
865func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
866 a.limitersMu.Lock()
867 defer a.limitersMu.Unlock()
868
869 limiter, exists := a.limiters[ip]
870 if !exists {
871 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
872 a.limiters[ip] = limiter
873 }
874
875 return limiter
876}
877
878func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle {
879 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
880 user := params.ByName("user")
881 file := params.ByName("file")
882 if user == "" || file == "" {
883 apierrors.WriteHTTPBadRequest(w, "user and file required", nil)
884 return
885 }
886 user, err := a.NormalizeUser(ctx, user)
887 if err != nil {
888 apierrors.WriteHTTPNotFound(w, "user not found", err)
889 return
890 }
891 fPath := []string{user, "clips", file}
892 exists, err := a.CLI.DataFileExists(fPath)
893 if err != nil {
894 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err)
895 return
896 }
897 if !exists {
898 apierrors.WriteHTTPNotFound(w, "file not found", nil)
899 return
900 }
901 fd, err := os.Open(a.CLI.DataFilePath(fPath))
902 if err != nil {
903 apierrors.WriteHTTPInternalServerError(w, "could not open file", err)
904 return
905 }
906 defer fd.Close()
907 w.Header().Set("Content-Type", "video/mp4")
908 if _, err := io.Copy(w, fd); err != nil {
909 log.Error(ctx, "error writing response", "error", err)
910 }
911 }
912}
913
914func NewWebsocketTracker(maxConns int) *WebsocketTracker {
915 return &WebsocketTracker{
916 connections: make(map[string]int),
917 maxConnsPerIP: maxConns,
918 }
919}
920
921func (t *WebsocketTracker) AddConnection(ip string) bool {
922 t.mu.Lock()
923 defer t.mu.Unlock()
924
925 count := t.connections[ip]
926
927 if count >= t.maxConnsPerIP {
928 return false
929 }
930
931 t.connections[ip] = count + 1
932 return true
933}
934
935func (t *WebsocketTracker) RemoveConnection(ip string) {
936 t.mu.Lock()
937 defer t.mu.Unlock()
938
939 count := t.connections[ip]
940 if count > 0 {
941 t.connections[ip] = count - 1
942 }
943
944 if t.connections[ip] == 0 {
945 delete(t.connections, ip)
946 }
947}