Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strconv"
16 "strings"
17 "sync"
18 "time"
19
20 "github.com/NYTimes/gziphandler"
21 "github.com/bluesky-social/indigo/api/bsky"
22 "github.com/google/uuid"
23 "github.com/julienschmidt/httprouter"
24 "github.com/rs/cors"
25 sloghttp "github.com/samber/slog-http"
26 "golang.org/x/time/rate"
27
28 "github.com/streamplace/oatproxy/pkg/oatproxy"
29 "stream.place/streamplace/js/app"
30 "stream.place/streamplace/pkg/atproto"
31 "stream.place/streamplace/pkg/bus"
32 "stream.place/streamplace/pkg/config"
33 "stream.place/streamplace/pkg/crypto/signers/eip712"
34 "stream.place/streamplace/pkg/director"
35 apierrors "stream.place/streamplace/pkg/errors"
36 "stream.place/streamplace/pkg/linking"
37 "stream.place/streamplace/pkg/log"
38 "stream.place/streamplace/pkg/media"
39 "stream.place/streamplace/pkg/mist/mistconfig"
40 "stream.place/streamplace/pkg/model"
41 "stream.place/streamplace/pkg/notifications"
42 "stream.place/streamplace/pkg/spmetrics"
43 "stream.place/streamplace/pkg/spxrpc"
44 "stream.place/streamplace/pkg/statedb"
45 "stream.place/streamplace/pkg/streamplace"
46
47 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
48 "github.com/slok/go-http-metrics/middleware"
49 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
50 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
51 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
52)
53
54type StreamplaceAPI struct {
55 CLI *config.CLI
56 Model model.Model
57 StatefulDB *statedb.StatefulDB
58 Updater *Updater
59 Signer *eip712.EIP712Signer
60 Mimes map[string]string
61 FirebaseNotifier notifications.FirebaseNotifier
62 MediaManager *media.MediaManager
63 MediaSigner media.MediaSigner
64 // not thread-safe yet
65 Aliases map[string]string
66 Bus *bus.Bus
67 ATSync *atproto.ATProtoSynchronizer
68 Director *director.Director
69
70 connTracker *WebsocketTracker
71
72 limiters map[string]*rate.Limiter
73 limitersMu sync.Mutex
74 SignerCache map[string]media.MediaSigner
75 SignerCacheMu sync.Mutex
76 op *oatproxy.OATProxy
77
78 // override tls port for http redirect server if we're using systemd file descriptors
79 HTTPRedirectTLSPort *int
80 sessions map[string]map[string]time.Time
81 sessionsLock sync.RWMutex
82}
83
84type WebsocketTracker struct {
85 connections map[string]int
86 maxConnsPerIP int
87 mu sync.RWMutex
88}
89
90func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
91 updater, err := PrepareUpdater(cli)
92 if err != nil {
93 return nil, err
94 }
95 a := &StreamplaceAPI{CLI: cli,
96 Model: mod,
97 StatefulDB: statefulDB,
98 Updater: updater,
99 FirebaseNotifier: noter,
100 MediaManager: mm,
101 MediaSigner: ms,
102 Aliases: map[string]string{},
103 Bus: bus,
104 ATSync: atsync,
105 Director: d,
106 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
107 limiters: make(map[string]*rate.Limiter),
108 SignerCache: make(map[string]media.MediaSigner),
109 op: op,
110 sessions: make(map[string]map[string]time.Time),
111 sessionsLock: sync.RWMutex{},
112 }
113 a.Mimes, err = updater.GetMimes()
114 if err != nil {
115 return nil, err
116 }
117 return a, nil
118}
119
120type AppHostingFS struct {
121 http.FileSystem
122}
123
124var ErrorIndex = errors.New("not found, use index.html")
125
126func (fs AppHostingFS) Open(name string) (http.File, error) {
127 file, err1 := fs.FileSystem.Open(name)
128 if err1 == nil {
129 return file, nil
130 }
131 return nil, ErrorIndex
132}
133
134// api/playback/iame.li/webrtc?rendition=source
135// api/playback/iame.li/stream.mp4?rendition=source
136// api/playback/iame.li/stream.webm?rendition=source
137// api/playback/iame.li/hls/index.m3u8
138// api/playback/iame.li/hls/source/stream.m3u8
139// api/playback/iame.li/hls/source/000000000000.ts
140
141func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
142
143 mdlw := middleware.New(middleware.Config{
144 Recorder: metrics.NewRecorder(metrics.Config{}),
145 })
146 var xrpc http.Handler
147 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus)
148 if err != nil {
149 return nil, err
150 }
151 router := httprouter.New()
152
153 // Create our middleware factory with the default settings.
154
155 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
156
157 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
158
159 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
160 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
161 }
162 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
163 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
164 }
165
166 router.Handler("GET", "/oauth/*anything", a.op.Handler())
167 router.Handler("POST", "/oauth/*anything", a.op.Handler())
168 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
169 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
170 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
171 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
172 apiRouter := httprouter.New()
173 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
174 // old clients
175 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
176 // new ones
177 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
178 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
179 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
180 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
181 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
182 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
183 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
184 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
185 // they're jpegs now
186 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
187 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
188 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
189 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
190 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
191 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
192 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
193 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
194 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
195 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
196 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
197 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
198 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
199 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
200 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
201 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx))
202 apiRouter.NotFound = a.HandleAPI404(ctx)
203 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
204 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
205 router.Handler("GET", "/api/*resource", apiRouterHandler)
206 router.Handler("POST", "/api/*resource", apiRouterHandler)
207 router.Handler("PUT", "/api/*resource", apiRouterHandler)
208 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
209 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
210 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
211 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
212 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
213 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
214 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
215 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
216 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
217 router.GET("/dl/*params", a.HandleAppDownload(ctx))
218 router.POST("/", a.HandleWebRTCIngest(ctx))
219 for _, redirect := range a.CLI.Redirects {
220 parts := strings.Split(redirect, ":")
221 if len(parts) != 2 {
222 log.Error(ctx, "invalid redirect", "redirect", redirect)
223 return nil, fmt.Errorf("invalid redirect: %s", redirect)
224 }
225 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
226 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
227 })
228 }
229 if a.CLI.FrontendProxy != "" {
230 u, err := url.Parse(a.CLI.FrontendProxy)
231 if err != nil {
232 return nil, err
233 }
234 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
235 router.NotFound = &httputil.ReverseProxy{
236 Rewrite: func(r *httputil.ProxyRequest) {
237 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
238 // we need to use 127.0.0.1 because the atproto oauth client requires it
239 r.Out.Header.Set("Origin", u.String())
240 r.SetURL(u)
241 },
242 }
243 } else {
244 files, err := app.Files()
245 if err != nil {
246 return nil, err
247 }
248 index, err := files.Open("index.html")
249 if err != nil {
250 return nil, err
251 }
252 bs, err := io.ReadAll(index)
253 if err != nil {
254 return nil, err
255 }
256 linker, err := linking.NewLinker(ctx, bs)
257 if err != nil {
258 return nil, err
259 }
260 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
261 if err != nil {
262 return nil, err
263 }
264 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
265 }
266 // needed because the WebRTC handler issues 405s from / otherwise
267 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
268 router.NotFound.ServeHTTP(w, r)
269 })
270 handler := sloghttp.Recovery(router)
271 handler = cors.AllowAll().Handler(handler)
272 handler = sloghttp.New(slog.Default())(handler)
273 handler = a.RateLimitMiddleware(ctx)(handler)
274
275 // this needs to be LAST so nothing else clobbers the context
276 handler = a.ContextMiddleware(ctx)(handler)
277
278 return handler, nil
279}
280func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler {
281 return func(next http.Handler) http.Handler {
282 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
283 uuid := uuid.New().String()
284 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
285 r = r.WithContext(ctx)
286 next.ServeHTTP(w, r)
287 })
288 }
289}
290func copyHeader(dst, src http.Header) {
291 for k, vv := range src {
292 // we'll handle CORS ourselves, thanks
293 if strings.HasPrefix(k, "Access-Control") {
294 continue
295 }
296 for _, v := range vv {
297 dst.Add(k, v)
298 }
299 }
300}
301
302// handler that takes care of static files and otherwise returns the index.html with the correct link card data
303func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
304 files, err := app.Files()
305 if err != nil {
306 return nil, err
307 }
308 fs := AppHostingFS{http.FS(files)}
309
310 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
311 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
312 f := strings.TrimPrefix(req.URL.Path, "/")
313 // under docs we need the index.html suffix due to astro rendering
314 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
315 f += "index.html"
316 }
317 _, err := fs.Open(f)
318 if err == nil {
319 fileHandler.ServeHTTP(w, req)
320 return
321 }
322 if errors.Is(err, ErrorIndex) || f == "" {
323 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN)
324 if err != nil {
325 log.Error(ctx, "error generating default card", "error", err)
326 }
327 w.Header().Set("Content-Type", "text/html")
328 if _, err := w.Write(bs); err != nil {
329 log.Error(ctx, "error writing response", "error", err)
330 }
331 } else {
332 log.Warn(ctx, "error opening file", "error", err)
333 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
334 }
335 })
336 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
337 proto := "http"
338 if req.TLS != nil {
339 proto = "https"
340 }
341 fwProto := req.Header.Get("x-forwarded-proto")
342 if fwProto != "" {
343 proto = fwProto
344 }
345 req.URL.Host = req.Host
346 req.URL.Scheme = proto
347 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
348 // quick check for things that aren't valid handles/dids
349 if strings.ContainsAny(maybeHandle, "/_") {
350 defaultHandler.ServeHTTP(w, req)
351 return
352 }
353 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
354 if err != nil || repo == nil {
355 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
356 defaultHandler.ServeHTTP(w, req)
357 return
358 }
359 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
360 if err != nil || ls == nil {
361 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
362 defaultHandler.ServeHTTP(w, req)
363 return
364 }
365 lsv, err := ls.ToLivestreamView()
366 if err != nil || lsv == nil {
367 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
368 defaultHandler.ServeHTTP(w, req)
369 return
370 }
371 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN)
372 if err != nil {
373 log.Error(ctx, "error generating html", "error", err)
374 defaultHandler.ServeHTTP(w, req)
375 return
376 }
377 w.Header().Set("Content-Type", "text/html")
378 if _, err := w.Write(bs); err != nil {
379 log.Error(ctx, "error writing response", "error", err)
380 }
381 }), nil
382}
383
384func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
385 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
386 if !a.CLI.HasMist() {
387 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
388 return
389 }
390 stream := params.ByName("stream")
391 if stream == "" {
392 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
393 return
394 }
395
396 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
397 prefix := fmt.Sprintf(tmpl, fullstream)
398 resource := params.ByName("resource")
399
400 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
401
402 client := &http.Client{}
403 req.URL = &url.URL{
404 Scheme: "http",
405 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
406 Path: fmt.Sprintf("%s%s", prefix, resource),
407 RawQuery: req.URL.RawQuery,
408 }
409
410 //http: Request.RequestURI can't be set in client requests.
411 //http://golang.org/src/pkg/net/http/client.go
412 req.RequestURI = ""
413
414 resp, err := client.Do(req)
415 if err != nil {
416 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
417 return
418 }
419 defer resp.Body.Close()
420
421 copyHeader(w.Header(), resp.Header)
422 w.WriteHeader(resp.StatusCode)
423 if _, err := io.Copy(w, resp.Body); err != nil {
424 log.Error(ctx, "error writing response", "error", err)
425 }
426 }
427}
428
429func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
430 return func(w http.ResponseWriter, req *http.Request) {
431 noslash := req.URL.Path[1:]
432 ct, ok := a.Mimes[noslash]
433 if ok {
434 w.Header().Set("content-type", ct)
435 }
436 fs.ServeHTTP(w, req)
437 }
438}
439
440func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
441 var tlsPort string
442 var err error
443 if a.HTTPRedirectTLSPort != nil {
444 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort)
445 } else {
446 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr)
447 if err != nil {
448 return nil, err
449 }
450 }
451 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
452 host, _, err := net.SplitHostPort(req.Host)
453 if err != nil {
454 host = req.Host
455 }
456 u := req.URL
457 if tlsPort == "443" {
458 u.Host = host
459 } else {
460 u.Host = net.JoinHostPort(host, tlsPort)
461 }
462 u.Scheme = "https"
463 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
464 }
465 mux := http.NewServeMux()
466 mux.HandleFunc("/", handleRedirect)
467 return mux, nil
468}
469
470type NotificationPayload struct {
471 Token string `json:"token"`
472 RepoDID string `json:"repoDID"`
473}
474
475func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
476 return func(w http.ResponseWriter, req *http.Request) {
477 w.WriteHeader(404)
478 }
479}
480
481func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
482 return func(w http.ResponseWriter, req *http.Request) {
483 payload, err := io.ReadAll(req.Body)
484 if err != nil {
485 log.Log(ctx, "error reading notification create", "error", err)
486 w.WriteHeader(400)
487 return
488 }
489 n := NotificationPayload{}
490 err = json.Unmarshal(payload, &n)
491 if err != nil {
492 log.Log(ctx, "error unmarshalling notification create", "error", err)
493 w.WriteHeader(400)
494 return
495 }
496 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID)
497 if err != nil {
498 log.Log(ctx, "error creating notification", "error", err)
499 w.WriteHeader(400)
500 return
501 }
502 log.Log(ctx, "successfully created notification", "token", n.Token)
503 w.WriteHeader(200)
504 if n.RepoDID != "" {
505 go func() {
506 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
507 if err != nil {
508 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
509 }
510 }()
511 }
512 }
513}
514
515func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
516 return func(w http.ResponseWriter, req *http.Request) {
517 err := a.MediaManager.ValidateMP4(ctx, req.Body, false)
518 if err != nil {
519 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
520 return
521 }
522 w.WriteHeader(200)
523 }
524}
525
526func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
527 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
528 var event model.PlayerEventAPI
529 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
530 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
531 return
532 }
533 err := a.Model.CreatePlayerEvent(event)
534 if err != nil {
535 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
536 return
537 }
538 w.WriteHeader(201)
539 }
540}
541
542func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
543 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
544 segs, err := a.Model.MostRecentSegments()
545 if err != nil {
546 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
547 return
548 }
549 bs, err := json.Marshal(segs)
550 if err != nil {
551 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
552 return
553 }
554 w.Header().Add("Content-Type", "application/json")
555 if _, err := w.Write(bs); err != nil {
556 log.Error(ctx, "error writing response", "error", err)
557 }
558 }
559}
560
561func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
562 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
563 user := params.ByName("repoDID")
564 if user == "" {
565 apierrors.WriteHTTPBadRequest(w, "user required", nil)
566 return
567 }
568 user, err := a.NormalizeUser(ctx, user)
569 if err != nil {
570 apierrors.WriteHTTPNotFound(w, "user not found", err)
571 return
572 }
573 seg, err := a.Model.LatestSegmentForUser(user)
574 if err != nil {
575 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
576 return
577 }
578 streamplaceSeg, err := seg.ToStreamplaceSegment()
579 if err != nil {
580 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
581 return
582 }
583 bs, err := json.Marshal(streamplaceSeg)
584 if err != nil {
585 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
586 return
587 }
588 w.Header().Add("Content-Type", "application/json")
589 if _, err := w.Write(bs); err != nil {
590 log.Error(ctx, "error writing response", "error", err)
591 }
592 }
593}
594
595func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
596 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
597 user := params.ByName("user")
598 if user == "" {
599 apierrors.WriteHTTPBadRequest(w, "user required", nil)
600 return
601 }
602 user, err := a.NormalizeUser(ctx, user)
603 if err != nil {
604 apierrors.WriteHTTPNotFound(w, "user not found", err)
605 return
606 }
607 count := spmetrics.GetViewCount(user)
608 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
609 if err != nil {
610 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
611 return
612 }
613 if _, err := w.Write(bs); err != nil {
614 log.Error(ctx, "error writing response", "error", err)
615 }
616 }
617}
618
619func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
620 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
621 log.Log(ctx, "got bluesky notification", "params", params)
622 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
623 if err != nil {
624 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
625 return
626 }
627 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
628 if err != nil {
629 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
630 return
631 }
632 bs, err := json.Marshal(signingKeys)
633 if err != nil {
634 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
635 return
636 }
637 if _, err := w.Write(bs); err != nil {
638 log.Error(ctx, "error writing response", "error", err)
639 }
640 }
641}
642
643type ChatResponse struct {
644 Post *bsky.FeedPost `json:"post"`
645 Repo *model.Repo `json:"repo"`
646 CID string `json:"cid"`
647}
648
649func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
650 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
651 user := params.ByName("repoDID")
652 if user == "" {
653 apierrors.WriteHTTPBadRequest(w, "user required", nil)
654 return
655 }
656 repoDID, err := a.NormalizeUser(ctx, user)
657 if err != nil {
658 apierrors.WriteHTTPNotFound(w, "user not found", err)
659 return
660 }
661 replies, err := a.Model.GetReplies(repoDID)
662 if err != nil {
663 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
664 return
665 }
666 bs, err := json.Marshal(replies)
667 if err != nil {
668 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
669 return
670 }
671 w.Header().Set("Content-Type", "application/json")
672 if _, err := w.Write(bs); err != nil {
673 log.Error(ctx, "error writing response", "error", err)
674 }
675 }
676}
677
678func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
679 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
680 user := params.ByName("repoDID")
681 if user == "" {
682 apierrors.WriteHTTPBadRequest(w, "user required", nil)
683 return
684 }
685 repoDID, err := a.NormalizeUser(ctx, user)
686 if err != nil {
687 apierrors.WriteHTTPNotFound(w, "user not found", err)
688 return
689 }
690 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
691 if err != nil {
692 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
693 return
694 }
695
696 doc, err := livestream.ToLivestreamView()
697 if err != nil {
698 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
699 return
700 }
701
702 if livestream == nil {
703 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
704 return
705 }
706
707 bs, err := json.Marshal(doc)
708 if err != nil {
709 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
710 return
711 }
712 w.Header().Set("Content-Type", "application/json")
713 if _, err := w.Write(bs); err != nil {
714 log.Error(ctx, "error writing response", "error", err)
715 }
716 }
717}
718
719func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
720 return func(next http.Handler) http.Handler {
721 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
722 ip, _, err := net.SplitHostPort(req.RemoteAddr)
723 if err != nil {
724 ip = req.RemoteAddr
725 }
726
727 if a.CLI.RateLimitPerSecond > 0 {
728 limiter := a.getLimiter(ip)
729
730 if !limiter.Allow() {
731 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
732 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
733 return
734 }
735 }
736
737 next.ServeHTTP(w, req)
738 })
739 }
740}
741
742// helper for getting a listener from a systemd file descriptor
743func getListenerFromFD(fdName string) (net.Listener, error) {
744 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES"))
745 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) {
746 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":")
747 for i, name := range names {
748 if name == fdName {
749 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3)
750 f1 := os.NewFile(uintptr(i+3), fdName)
751 return net.FileListener(f1)
752 }
753 }
754 }
755 return nil, nil
756}
757
758func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
759 handler, err := a.Handler(ctx)
760 if err != nil {
761 return err
762 }
763 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
764 ln, err := getListenerFromFD("http")
765 if err != nil {
766 return err
767 }
768 if ln == nil {
769 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
770 if err != nil {
771 return err
772 }
773 } else {
774 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr())
775 }
776 log.Log(ctx, "http server starting", "addr", ln.Addr())
777 return s.Serve(ln)
778 })
779}
780
781func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
782 handler, err := a.RedirectHandler(ctx)
783 if err != nil {
784 return err
785 }
786 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
787 ln, err := getListenerFromFD("http")
788 if err != nil {
789 return err
790 }
791 if ln == nil {
792 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
793 if err != nil {
794 return err
795 }
796 } else {
797 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr())
798 }
799 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr())
800 return s.Serve(ln)
801 })
802}
803
804func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
805 handler, err := a.Handler(ctx)
806 if err != nil {
807 return err
808 }
809 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
810 ln, err := getListenerFromFD("https")
811 if err != nil {
812 return err
813 }
814 if ln == nil {
815 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr)
816 if err != nil {
817 return err
818 }
819 } else {
820 // tell the redirect handler we're using systemd and they should go to 443
821 port443 := 443
822 a.HTTPRedirectTLSPort = &port443
823 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr())
824 }
825 log.Log(ctx, "https server starting",
826 "addr", ln.Addr(),
827 "certPath", a.CLI.TLSCertPath,
828 "keyPath", a.CLI.TLSKeyPath,
829 )
830 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
831 })
832}
833
834func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
835 ctx, cancel := context.WithCancel(ctx)
836 handler = gziphandler.GzipHandler(handler)
837 server := http.Server{Handler: handler}
838 var serveErr error
839 go func() {
840 serveErr = serve(&server)
841 cancel()
842 }()
843 <-ctx.Done()
844 if serveErr != nil {
845 return fmt.Errorf("error in http server: %w", serveErr)
846 }
847
848 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
849 defer cancel()
850 return server.Shutdown(ctx)
851}
852
853func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
854 return func(w http.ResponseWriter, req *http.Request) {
855 w.WriteHeader(200)
856 }
857}
858
859func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
860 a.limitersMu.Lock()
861 defer a.limitersMu.Unlock()
862
863 limiter, exists := a.limiters[ip]
864 if !exists {
865 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
866 a.limiters[ip] = limiter
867 }
868
869 return limiter
870}
871
872func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle {
873 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
874 user := params.ByName("user")
875 file := params.ByName("file")
876 if user == "" || file == "" {
877 apierrors.WriteHTTPBadRequest(w, "user and file required", nil)
878 return
879 }
880 user, err := a.NormalizeUser(ctx, user)
881 if err != nil {
882 apierrors.WriteHTTPNotFound(w, "user not found", err)
883 return
884 }
885 fPath := []string{user, "clips", file}
886 exists, err := a.CLI.DataFileExists(fPath)
887 if err != nil {
888 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err)
889 return
890 }
891 if !exists {
892 apierrors.WriteHTTPNotFound(w, "file not found", nil)
893 return
894 }
895 fd, err := os.Open(a.CLI.DataFilePath(fPath))
896 if err != nil {
897 apierrors.WriteHTTPInternalServerError(w, "could not open file", err)
898 return
899 }
900 defer fd.Close()
901 w.Header().Set("Content-Type", "video/mp4")
902 if _, err := io.Copy(w, fd); err != nil {
903 log.Error(ctx, "error writing response", "error", err)
904 }
905 }
906}
907
908func NewWebsocketTracker(maxConns int) *WebsocketTracker {
909 return &WebsocketTracker{
910 connections: make(map[string]int),
911 maxConnsPerIP: maxConns,
912 }
913}
914
915func (t *WebsocketTracker) AddConnection(ip string) bool {
916 t.mu.Lock()
917 defer t.mu.Unlock()
918
919 count := t.connections[ip]
920
921 if count >= t.maxConnsPerIP {
922 return false
923 }
924
925 t.connections[ip] = count + 1
926 return true
927}
928
929func (t *WebsocketTracker) RemoveConnection(ip string) {
930 t.mu.Lock()
931 defer t.mu.Unlock()
932
933 count := t.connections[ip]
934 if count > 0 {
935 t.connections[ip] = count - 1
936 }
937
938 if t.connections[ip] == 0 {
939 delete(t.connections, ip)
940 }
941}