Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strconv"
16 "strings"
17 "sync"
18 "time"
19
20 "github.com/NYTimes/gziphandler"
21 "github.com/bluesky-social/indigo/api/bsky"
22 "github.com/google/uuid"
23 "github.com/julienschmidt/httprouter"
24 "github.com/rs/cors"
25 sloghttp "github.com/samber/slog-http"
26 "golang.org/x/time/rate"
27
28 "github.com/streamplace/oatproxy/pkg/oatproxy"
29 "stream.place/streamplace/js/app"
30 "stream.place/streamplace/pkg/atproto"
31 "stream.place/streamplace/pkg/bus"
32 "stream.place/streamplace/pkg/config"
33 "stream.place/streamplace/pkg/crypto/signers/eip712"
34 "stream.place/streamplace/pkg/director"
35 apierrors "stream.place/streamplace/pkg/errors"
36 "stream.place/streamplace/pkg/linking"
37 "stream.place/streamplace/pkg/log"
38 "stream.place/streamplace/pkg/media"
39 "stream.place/streamplace/pkg/mist/mistconfig"
40 "stream.place/streamplace/pkg/model"
41 "stream.place/streamplace/pkg/notifications"
42 "stream.place/streamplace/pkg/spmetrics"
43 "stream.place/streamplace/pkg/spxrpc"
44 "stream.place/streamplace/pkg/streamplace"
45
46 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
47 "github.com/slok/go-http-metrics/middleware"
48 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
49 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
50 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
51)
52
53type StreamplaceAPI struct {
54 CLI *config.CLI
55 Model model.Model
56 Updater *Updater
57 Signer *eip712.EIP712Signer
58 Mimes map[string]string
59 FirebaseNotifier notifications.FirebaseNotifier
60 MediaManager *media.MediaManager
61 MediaSigner media.MediaSigner
62 // not thread-safe yet
63 Aliases map[string]string
64 Bus *bus.Bus
65 ATSync *atproto.ATProtoSynchronizer
66 Director *director.Director
67
68 connTracker *WebsocketTracker
69
70 limiters map[string]*rate.Limiter
71 limitersMu sync.Mutex
72 SignerCache map[string]media.MediaSigner
73 SignerCacheMu sync.Mutex
74 op *oatproxy.OATProxy
75}
76
77type WebsocketTracker struct {
78 connections map[string]int
79 maxConnsPerIP int
80 mu sync.RWMutex
81}
82
83func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
84 updater, err := PrepareUpdater(cli)
85 if err != nil {
86 return nil, err
87 }
88 a := &StreamplaceAPI{CLI: cli,
89 Model: mod,
90 Updater: updater,
91 Signer: signer,
92 FirebaseNotifier: noter,
93 MediaManager: mm,
94 MediaSigner: ms,
95 Aliases: map[string]string{},
96 Bus: bus,
97 ATSync: atsync,
98 Director: d,
99 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
100 limiters: make(map[string]*rate.Limiter),
101 SignerCache: make(map[string]media.MediaSigner),
102 op: op,
103 }
104 a.Mimes, err = updater.GetMimes()
105 if err != nil {
106 return nil, err
107 }
108 return a, nil
109}
110
111type AppHostingFS struct {
112 http.FileSystem
113}
114
115var ErrorIndex = errors.New("not found, use index.html")
116
117func (fs AppHostingFS) Open(name string) (http.File, error) {
118 file, err1 := fs.FileSystem.Open(name)
119 if err1 == nil {
120 return file, nil
121 }
122 return nil, ErrorIndex
123}
124
125// api/playback/iame.li/webrtc?rendition=source
126// api/playback/iame.li/stream.mp4?rendition=source
127// api/playback/iame.li/stream.webm?rendition=source
128// api/playback/iame.li/hls/index.m3u8
129// api/playback/iame.li/hls/source/stream.m3u8
130// api/playback/iame.li/hls/source/000000000000.ts
131
132func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
133
134 mdlw := middleware.New(middleware.Config{
135 Recorder: metrics.NewRecorder(metrics.Config{}),
136 })
137 var xrpc http.Handler
138 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw)
139 if err != nil {
140 return nil, err
141 }
142 router := httprouter.New()
143
144 // Create our middleware factory with the default settings.
145
146 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
147
148 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
149
150 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
151 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
152 }
153 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
154 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
155 }
156
157 router.Handler("GET", "/oauth/*anything", a.op.Handler())
158 router.Handler("POST", "/oauth/*anything", a.op.Handler())
159 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
160 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
161 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
162 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
163 apiRouter := httprouter.New()
164 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
165 // old clients
166 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
167 // new ones
168 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
169 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
170 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
171 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
172 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
173 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
174 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
175 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
176 addHandle(apiRouter, "GET", "/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
177 addHandle(apiRouter, "GET", "/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
178 // they're jpegs now
179 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
180 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
181 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
182 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
183 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
184 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
185 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
186 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
187 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
188 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
189 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
190 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
191 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
192 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
193 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
194 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx))
195 apiRouter.NotFound = a.HandleAPI404(ctx)
196 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
197 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
198 router.Handler("GET", "/api/*resource", apiRouterHandler)
199 router.Handler("POST", "/api/*resource", apiRouterHandler)
200 router.Handler("PUT", "/api/*resource", apiRouterHandler)
201 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
202 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
203 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
204 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
205 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
206 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
207 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
208 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
209 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
210 router.GET("/dl/*params", a.HandleAppDownload(ctx))
211 router.POST("/", a.HandleWebRTCIngest(ctx))
212 for _, redirect := range a.CLI.Redirects {
213 parts := strings.Split(redirect, ":")
214 if len(parts) != 2 {
215 log.Error(ctx, "invalid redirect", "redirect", redirect)
216 return nil, fmt.Errorf("invalid redirect: %s", redirect)
217 }
218 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
219 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
220 })
221 }
222 if a.CLI.FrontendProxy != "" {
223 u, err := url.Parse(a.CLI.FrontendProxy)
224 if err != nil {
225 return nil, err
226 }
227 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
228 router.NotFound = &httputil.ReverseProxy{
229 Rewrite: func(r *httputil.ProxyRequest) {
230 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
231 // we need to use 127.0.0.1 because the atproto oauth client requires it
232 r.Out.Header.Set("Origin", u.String())
233 r.SetURL(u)
234 },
235 }
236 } else {
237 files, err := app.Files()
238 if err != nil {
239 return nil, err
240 }
241 index, err := files.Open("index.html")
242 if err != nil {
243 return nil, err
244 }
245 bs, err := io.ReadAll(index)
246 if err != nil {
247 return nil, err
248 }
249 linker, err := linking.NewLinker(ctx, bs)
250 if err != nil {
251 return nil, err
252 }
253 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
254 if err != nil {
255 return nil, err
256 }
257 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
258 }
259 // needed because the WebRTC handler issues 405s from / otherwise
260 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
261 router.NotFound.ServeHTTP(w, r)
262 })
263 handler := sloghttp.Recovery(router)
264 handler = cors.AllowAll().Handler(handler)
265 handler = sloghttp.New(slog.Default())(handler)
266 handler = a.RateLimitMiddleware(ctx)(handler)
267
268 // this needs to be LAST so nothing else clobbers the context
269 handler = a.ContextMiddleware(ctx)(handler)
270
271 return handler, nil
272}
273func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler {
274 return func(next http.Handler) http.Handler {
275 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
276 uuid := uuid.New().String()
277 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
278 r = r.WithContext(ctx)
279 next.ServeHTTP(w, r)
280 })
281 }
282}
283func copyHeader(dst, src http.Header) {
284 for k, vv := range src {
285 // we'll handle CORS ourselves, thanks
286 if strings.HasPrefix(k, "Access-Control") {
287 continue
288 }
289 for _, v := range vv {
290 dst.Add(k, v)
291 }
292 }
293}
294
295// handler that takes care of static files and otherwise returns the index.html with the correct link card data
296func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
297 files, err := app.Files()
298 if err != nil {
299 return nil, err
300 }
301 fs := AppHostingFS{http.FS(files)}
302
303 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
304 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
305 f := strings.TrimPrefix(req.URL.Path, "/")
306 // under docs we need the index.html suffix due to astro rendering
307 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
308 f += "index.html"
309 }
310 _, err := fs.Open(f)
311 if err == nil {
312 fileHandler.ServeHTTP(w, req)
313 return
314 }
315 if errors.Is(err, ErrorIndex) || f == "" {
316 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
317 if err != nil {
318 log.Error(ctx, "error generating default card", "error", err)
319 }
320 w.Header().Set("Content-Type", "text/html")
321 if _, err := w.Write(bs); err != nil {
322 log.Error(ctx, "error writing response", "error", err)
323 }
324 } else {
325 log.Warn(ctx, "error opening file", "error", err)
326 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
327 }
328 })
329 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
330 proto := "http"
331 if req.TLS != nil {
332 proto = "https"
333 }
334 fwProto := req.Header.Get("x-forwarded-proto")
335 if fwProto != "" {
336 proto = fwProto
337 }
338 req.URL.Host = req.Host
339 req.URL.Scheme = proto
340 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
341 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
342 if err != nil || repo == nil {
343 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
344 defaultHandler.ServeHTTP(w, req)
345 return
346 }
347 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
348 if err != nil || ls == nil {
349 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
350 defaultHandler.ServeHTTP(w, req)
351 return
352 }
353 lsv, err := ls.ToLivestreamView()
354 if err != nil || lsv == nil {
355 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
356 defaultHandler.ServeHTTP(w, req)
357 return
358 }
359 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
360 if err != nil {
361 log.Error(ctx, "error generating html", "error", err)
362 defaultHandler.ServeHTTP(w, req)
363 return
364 }
365 w.Header().Set("Content-Type", "text/html")
366 if _, err := w.Write(bs); err != nil {
367 log.Error(ctx, "error writing response", "error", err)
368 }
369 }), nil
370}
371
372func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
373 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
374 if !a.CLI.HasMist() {
375 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
376 return
377 }
378 stream := params.ByName("stream")
379 if stream == "" {
380 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
381 return
382 }
383
384 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
385 prefix := fmt.Sprintf(tmpl, fullstream)
386 resource := params.ByName("resource")
387
388 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
389
390 client := &http.Client{}
391 req.URL = &url.URL{
392 Scheme: "http",
393 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
394 Path: fmt.Sprintf("%s%s", prefix, resource),
395 RawQuery: req.URL.RawQuery,
396 }
397
398 //http: Request.RequestURI can't be set in client requests.
399 //http://golang.org/src/pkg/net/http/client.go
400 req.RequestURI = ""
401
402 resp, err := client.Do(req)
403 if err != nil {
404 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
405 return
406 }
407 defer resp.Body.Close()
408
409 copyHeader(w.Header(), resp.Header)
410 w.WriteHeader(resp.StatusCode)
411 if _, err := io.Copy(w, resp.Body); err != nil {
412 log.Error(ctx, "error writing response", "error", err)
413 }
414 }
415}
416
417func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
418 return func(w http.ResponseWriter, req *http.Request) {
419 noslash := req.URL.Path[1:]
420 ct, ok := a.Mimes[noslash]
421 if ok {
422 w.Header().Set("content-type", ct)
423 }
424 fs.ServeHTTP(w, req)
425 }
426}
427
428func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
429 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr)
430 if err != nil {
431 return nil, err
432 }
433 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
434 host, _, err := net.SplitHostPort(req.Host)
435 if err != nil {
436 host = req.Host
437 }
438 u := req.URL
439 if tlsPort == "443" {
440 u.Host = host
441 } else {
442 u.Host = net.JoinHostPort(host, tlsPort)
443 }
444 u.Scheme = "https"
445 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
446 }
447 mux := http.NewServeMux()
448 mux.HandleFunc("/", handleRedirect)
449 return mux, nil
450}
451
452type NotificationPayload struct {
453 Token string `json:"token"`
454 RepoDID string `json:"repoDID"`
455}
456
457func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
458 return func(w http.ResponseWriter, req *http.Request) {
459 w.WriteHeader(404)
460 }
461}
462
463func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
464 return func(w http.ResponseWriter, req *http.Request) {
465 payload, err := io.ReadAll(req.Body)
466 if err != nil {
467 log.Log(ctx, "error reading notification create", "error", err)
468 w.WriteHeader(400)
469 return
470 }
471 n := NotificationPayload{}
472 err = json.Unmarshal(payload, &n)
473 if err != nil {
474 log.Log(ctx, "error unmarshalling notification create", "error", err)
475 w.WriteHeader(400)
476 return
477 }
478 err = a.Model.CreateNotification(n.Token, n.RepoDID)
479 if err != nil {
480 log.Log(ctx, "error creating notification", "error", err)
481 w.WriteHeader(400)
482 return
483 }
484 log.Log(ctx, "successfully created notification", "token", n.Token)
485 w.WriteHeader(200)
486 if n.RepoDID != "" {
487 go func() {
488 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
489 if err != nil {
490 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
491 }
492 }()
493 }
494 }
495}
496
497func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
498 return func(w http.ResponseWriter, req *http.Request) {
499 err := a.MediaManager.ValidateMP4(ctx, req.Body)
500 if err != nil {
501 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
502 return
503 }
504 w.WriteHeader(200)
505 }
506}
507
508func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
509 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
510 var event model.PlayerEventAPI
511 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
512 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
513 return
514 }
515 err := a.Model.CreatePlayerEvent(event)
516 if err != nil {
517 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
518 return
519 }
520 w.WriteHeader(201)
521 }
522}
523
524func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
525 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
526 segs, err := a.Model.MostRecentSegments()
527 if err != nil {
528 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
529 return
530 }
531 bs, err := json.Marshal(segs)
532 if err != nil {
533 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
534 return
535 }
536 w.Header().Add("Content-Type", "application/json")
537 if _, err := w.Write(bs); err != nil {
538 log.Error(ctx, "error writing response", "error", err)
539 }
540 }
541}
542
543func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
544 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
545 user := params.ByName("repoDID")
546 if user == "" {
547 apierrors.WriteHTTPBadRequest(w, "user required", nil)
548 return
549 }
550 user, err := a.NormalizeUser(ctx, user)
551 if err != nil {
552 apierrors.WriteHTTPNotFound(w, "user not found", err)
553 return
554 }
555 seg, err := a.Model.LatestSegmentForUser(user)
556 if err != nil {
557 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
558 return
559 }
560 streamplaceSeg, err := seg.ToStreamplaceSegment()
561 if err != nil {
562 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
563 return
564 }
565 bs, err := json.Marshal(streamplaceSeg)
566 if err != nil {
567 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
568 return
569 }
570 w.Header().Add("Content-Type", "application/json")
571 if _, err := w.Write(bs); err != nil {
572 log.Error(ctx, "error writing response", "error", err)
573 }
574 }
575}
576
577func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
578 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
579 user := params.ByName("user")
580 if user == "" {
581 apierrors.WriteHTTPBadRequest(w, "user required", nil)
582 return
583 }
584 user, err := a.NormalizeUser(ctx, user)
585 if err != nil {
586 apierrors.WriteHTTPNotFound(w, "user not found", err)
587 return
588 }
589 count := spmetrics.GetViewCount(user)
590 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
591 if err != nil {
592 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
593 return
594 }
595 if _, err := w.Write(bs); err != nil {
596 log.Error(ctx, "error writing response", "error", err)
597 }
598 }
599}
600
601func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
602 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
603 log.Log(ctx, "got bluesky notification", "params", params)
604 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
605 if err != nil {
606 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
607 return
608 }
609 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
610 if err != nil {
611 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
612 return
613 }
614 bs, err := json.Marshal(signingKeys)
615 if err != nil {
616 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
617 return
618 }
619 if _, err := w.Write(bs); err != nil {
620 log.Error(ctx, "error writing response", "error", err)
621 }
622 }
623}
624
625type ChatResponse struct {
626 Post *bsky.FeedPost `json:"post"`
627 Repo *model.Repo `json:"repo"`
628 CID string `json:"cid"`
629}
630
631func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
632 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
633 user := params.ByName("repoDID")
634 if user == "" {
635 apierrors.WriteHTTPBadRequest(w, "user required", nil)
636 return
637 }
638 repoDID, err := a.NormalizeUser(ctx, user)
639 if err != nil {
640 apierrors.WriteHTTPNotFound(w, "user not found", err)
641 return
642 }
643 replies, err := a.Model.GetReplies(repoDID)
644 if err != nil {
645 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
646 return
647 }
648 bs, err := json.Marshal(replies)
649 if err != nil {
650 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
651 return
652 }
653 w.Header().Set("Content-Type", "application/json")
654 if _, err := w.Write(bs); err != nil {
655 log.Error(ctx, "error writing response", "error", err)
656 }
657 }
658}
659
660func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
661 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
662 user := params.ByName("repoDID")
663 if user == "" {
664 apierrors.WriteHTTPBadRequest(w, "user required", nil)
665 return
666 }
667 repoDID, err := a.NormalizeUser(ctx, user)
668 if err != nil {
669 apierrors.WriteHTTPNotFound(w, "user not found", err)
670 return
671 }
672 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
673 if err != nil {
674 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
675 return
676 }
677
678 doc, err := livestream.ToLivestreamView()
679 if err != nil {
680 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
681 return
682 }
683
684 if livestream == nil {
685 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
686 return
687 }
688
689 bs, err := json.Marshal(doc)
690 if err != nil {
691 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
692 return
693 }
694 w.Header().Set("Content-Type", "application/json")
695 if _, err := w.Write(bs); err != nil {
696 log.Error(ctx, "error writing response", "error", err)
697 }
698 }
699}
700
701func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
702 return func(next http.Handler) http.Handler {
703 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
704 ip, _, err := net.SplitHostPort(req.RemoteAddr)
705 if err != nil {
706 ip = req.RemoteAddr
707 }
708
709 if a.CLI.RateLimitPerSecond > 0 {
710 limiter := a.getLimiter(ip)
711
712 if !limiter.Allow() {
713 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
714 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
715 return
716 }
717 }
718
719 next.ServeHTTP(w, req)
720 })
721 }
722}
723
724// helper for getting a listener from a systemd file descriptor
725func getListenerFromFD(fdName string) (net.Listener, error) {
726 log.Log(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES"))
727 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) {
728 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":")
729 for i, name := range names {
730 if name == fdName {
731 f1 := os.NewFile(uintptr(i+3), fdName)
732 return net.FileListener(f1)
733 }
734 }
735 }
736 return nil, nil
737}
738
739func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
740 handler, err := a.Handler(ctx)
741 if err != nil {
742 return err
743 }
744 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
745 ln, err := getListenerFromFD("http")
746 if err != nil {
747 return err
748 }
749 if ln == nil {
750 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
751 if err != nil {
752 return err
753 }
754 } else {
755 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr())
756 }
757 log.Log(ctx, "http server starting", "addr", ln.Addr())
758 return s.Serve(ln)
759 })
760}
761
762func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
763 handler, err := a.RedirectHandler(ctx)
764 if err != nil {
765 return err
766 }
767 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
768 ln, err := getListenerFromFD("http")
769 if err != nil {
770 return err
771 }
772 if ln == nil {
773 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
774 if err != nil {
775 return err
776 }
777 } else {
778 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr())
779 }
780 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr())
781 return s.Serve(ln)
782 })
783}
784
785func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
786 handler, err := a.Handler(ctx)
787 if err != nil {
788 return err
789 }
790 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
791 ln, err := getListenerFromFD("https")
792 if err != nil {
793 return err
794 }
795 if ln == nil {
796 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr)
797 if err != nil {
798 return err
799 }
800 } else {
801 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr())
802 }
803 log.Log(ctx, "https server starting",
804 "addr", ln.Addr(),
805 "certPath", a.CLI.TLSCertPath,
806 "keyPath", a.CLI.TLSKeyPath,
807 )
808 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
809 })
810}
811
812func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
813 ctx, cancel := context.WithCancel(ctx)
814 handler = gziphandler.GzipHandler(handler)
815 server := http.Server{Handler: handler}
816 var serveErr error
817 go func() {
818 serveErr = serve(&server)
819 cancel()
820 }()
821 <-ctx.Done()
822 if serveErr != nil {
823 return fmt.Errorf("error in http server: %w", serveErr)
824 }
825
826 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
827 defer cancel()
828 return server.Shutdown(ctx)
829}
830
831func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
832 return func(w http.ResponseWriter, req *http.Request) {
833 w.WriteHeader(200)
834 }
835}
836
837func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
838 a.limitersMu.Lock()
839 defer a.limitersMu.Unlock()
840
841 limiter, exists := a.limiters[ip]
842 if !exists {
843 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
844 a.limiters[ip] = limiter
845 }
846
847 return limiter
848}
849
850func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle {
851 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
852 user := params.ByName("user")
853 file := params.ByName("file")
854 if user == "" || file == "" {
855 apierrors.WriteHTTPBadRequest(w, "user and file required", nil)
856 return
857 }
858 user, err := a.NormalizeUser(ctx, user)
859 if err != nil {
860 apierrors.WriteHTTPNotFound(w, "user not found", err)
861 return
862 }
863 fPath := []string{user, "clips", file}
864 exists, err := a.CLI.DataFileExists(fPath)
865 if err != nil {
866 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err)
867 return
868 }
869 if !exists {
870 apierrors.WriteHTTPNotFound(w, "file not found", nil)
871 return
872 }
873 fd, err := os.Open(a.CLI.DataFilePath(fPath))
874 if err != nil {
875 apierrors.WriteHTTPInternalServerError(w, "could not open file", err)
876 return
877 }
878 defer fd.Close()
879 w.Header().Set("Content-Type", "video/mp4")
880 if _, err := io.Copy(w, fd); err != nil {
881 log.Error(ctx, "error writing response", "error", err)
882 }
883 }
884}
885
886func NewWebsocketTracker(maxConns int) *WebsocketTracker {
887 return &WebsocketTracker{
888 connections: make(map[string]int),
889 maxConnsPerIP: maxConns,
890 }
891}
892
893func (t *WebsocketTracker) AddConnection(ip string) bool {
894 t.mu.Lock()
895 defer t.mu.Unlock()
896
897 count := t.connections[ip]
898
899 if count >= t.maxConnsPerIP {
900 return false
901 }
902
903 t.connections[ip] = count + 1
904 return true
905}
906
907func (t *WebsocketTracker) RemoveConnection(ip string) {
908 t.mu.Lock()
909 defer t.mu.Unlock()
910
911 count := t.connections[ip]
912 if count > 0 {
913 t.connections[ip] = count - 1
914 }
915
916 if t.connections[ip] == 0 {
917 delete(t.connections, ip)
918 }
919}