Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strconv"
16 "strings"
17 "sync"
18 "time"
19
20 "github.com/NYTimes/gziphandler"
21 "github.com/bluesky-social/indigo/api/bsky"
22 "github.com/google/uuid"
23 "github.com/julienschmidt/httprouter"
24 "github.com/rs/cors"
25 sloghttp "github.com/samber/slog-http"
26 "golang.org/x/time/rate"
27
28 "github.com/streamplace/oatproxy/pkg/oatproxy"
29 "stream.place/streamplace/js/app"
30 "stream.place/streamplace/pkg/atproto"
31 "stream.place/streamplace/pkg/bus"
32 "stream.place/streamplace/pkg/config"
33 "stream.place/streamplace/pkg/crypto/signers/eip712"
34 "stream.place/streamplace/pkg/director"
35 apierrors "stream.place/streamplace/pkg/errors"
36 "stream.place/streamplace/pkg/linking"
37 "stream.place/streamplace/pkg/log"
38 "stream.place/streamplace/pkg/media"
39 "stream.place/streamplace/pkg/mist/mistconfig"
40 "stream.place/streamplace/pkg/model"
41 "stream.place/streamplace/pkg/notifications"
42 "stream.place/streamplace/pkg/spmetrics"
43 "stream.place/streamplace/pkg/spxrpc"
44 "stream.place/streamplace/pkg/statedb"
45 "stream.place/streamplace/pkg/streamplace"
46
47 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
48 "github.com/slok/go-http-metrics/middleware"
49 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
50 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
51 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
52)
53
54type StreamplaceAPI struct {
55 CLI *config.CLI
56 Model model.Model
57 StatefulDB *statedb.StatefulDB
58 Updater *Updater
59 Signer *eip712.EIP712Signer
60 Mimes map[string]string
61 FirebaseNotifier notifications.FirebaseNotifier
62 MediaManager *media.MediaManager
63 MediaSigner media.MediaSigner
64 // not thread-safe yet
65 Aliases map[string]string
66 Bus *bus.Bus
67 ATSync *atproto.ATProtoSynchronizer
68 Director *director.Director
69
70 connTracker *WebsocketTracker
71
72 limiters map[string]*rate.Limiter
73 limitersMu sync.Mutex
74 SignerCache map[string]media.MediaSigner
75 SignerCacheMu sync.Mutex
76 op *oatproxy.OATProxy
77
78 // override tls port for http redirect server if we're using systemd file descriptors
79 HTTPRedirectTLSPort *int
80 sessions map[string]map[string]time.Time
81 sessionsLock sync.RWMutex
82}
83
84type WebsocketTracker struct {
85 connections map[string]int
86 maxConnsPerIP int
87 mu sync.RWMutex
88}
89
90func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
91 updater, err := PrepareUpdater(cli)
92 if err != nil {
93 return nil, err
94 }
95 a := &StreamplaceAPI{CLI: cli,
96 Model: mod,
97 StatefulDB: statefulDB,
98 Updater: updater,
99 Signer: signer,
100 FirebaseNotifier: noter,
101 MediaManager: mm,
102 MediaSigner: ms,
103 Aliases: map[string]string{},
104 Bus: bus,
105 ATSync: atsync,
106 Director: d,
107 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
108 limiters: make(map[string]*rate.Limiter),
109 SignerCache: make(map[string]media.MediaSigner),
110 op: op,
111 sessions: make(map[string]map[string]time.Time),
112 sessionsLock: sync.RWMutex{},
113 }
114 a.Mimes, err = updater.GetMimes()
115 if err != nil {
116 return nil, err
117 }
118 return a, nil
119}
120
121type AppHostingFS struct {
122 http.FileSystem
123}
124
125var ErrorIndex = errors.New("not found, use index.html")
126
127func (fs AppHostingFS) Open(name string) (http.File, error) {
128 file, err1 := fs.FileSystem.Open(name)
129 if err1 == nil {
130 return file, nil
131 }
132 return nil, ErrorIndex
133}
134
135// api/playback/iame.li/webrtc?rendition=source
136// api/playback/iame.li/stream.mp4?rendition=source
137// api/playback/iame.li/stream.webm?rendition=source
138// api/playback/iame.li/hls/index.m3u8
139// api/playback/iame.li/hls/source/stream.m3u8
140// api/playback/iame.li/hls/source/000000000000.ts
141
142func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
143
144 mdlw := middleware.New(middleware.Config{
145 Recorder: metrics.NewRecorder(metrics.Config{}),
146 })
147 var xrpc http.Handler
148 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync)
149 if err != nil {
150 return nil, err
151 }
152 router := httprouter.New()
153
154 // Create our middleware factory with the default settings.
155
156 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
157
158 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
159
160 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
161 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
162 }
163 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
164 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
165 }
166
167 router.Handler("GET", "/oauth/*anything", a.op.Handler())
168 router.Handler("POST", "/oauth/*anything", a.op.Handler())
169 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
170 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
171 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
172 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
173 apiRouter := httprouter.New()
174 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
175 // old clients
176 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
177 // new ones
178 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
179 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
180 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
181 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
182 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
183 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
184 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
185 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
186 // they're jpegs now
187 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
188 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
189 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
190 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
191 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
192 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
193 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
194 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
195 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
196 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
197 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
198 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
199 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
200 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
201 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
202 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx))
203 apiRouter.NotFound = a.HandleAPI404(ctx)
204 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
205 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
206 router.Handler("GET", "/api/*resource", apiRouterHandler)
207 router.Handler("POST", "/api/*resource", apiRouterHandler)
208 router.Handler("PUT", "/api/*resource", apiRouterHandler)
209 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
210 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
211 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
212 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
213 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
214 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
215 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
216 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
217 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
218 router.GET("/dl/*params", a.HandleAppDownload(ctx))
219 router.POST("/", a.HandleWebRTCIngest(ctx))
220 for _, redirect := range a.CLI.Redirects {
221 parts := strings.Split(redirect, ":")
222 if len(parts) != 2 {
223 log.Error(ctx, "invalid redirect", "redirect", redirect)
224 return nil, fmt.Errorf("invalid redirect: %s", redirect)
225 }
226 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
227 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
228 })
229 }
230 if a.CLI.FrontendProxy != "" {
231 u, err := url.Parse(a.CLI.FrontendProxy)
232 if err != nil {
233 return nil, err
234 }
235 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
236 router.NotFound = &httputil.ReverseProxy{
237 Rewrite: func(r *httputil.ProxyRequest) {
238 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
239 // we need to use 127.0.0.1 because the atproto oauth client requires it
240 r.Out.Header.Set("Origin", u.String())
241 r.SetURL(u)
242 },
243 }
244 } else {
245 files, err := app.Files()
246 if err != nil {
247 return nil, err
248 }
249 index, err := files.Open("index.html")
250 if err != nil {
251 return nil, err
252 }
253 bs, err := io.ReadAll(index)
254 if err != nil {
255 return nil, err
256 }
257 linker, err := linking.NewLinker(ctx, bs)
258 if err != nil {
259 return nil, err
260 }
261 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
262 if err != nil {
263 return nil, err
264 }
265 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
266 }
267 // needed because the WebRTC handler issues 405s from / otherwise
268 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
269 router.NotFound.ServeHTTP(w, r)
270 })
271 handler := sloghttp.Recovery(router)
272 handler = cors.AllowAll().Handler(handler)
273 handler = sloghttp.New(slog.Default())(handler)
274 handler = a.RateLimitMiddleware(ctx)(handler)
275
276 // this needs to be LAST so nothing else clobbers the context
277 handler = a.ContextMiddleware(ctx)(handler)
278
279 return handler, nil
280}
281func (a *StreamplaceAPI) ContextMiddleware(parentContext context.Context) func(next http.Handler) http.Handler {
282 return func(next http.Handler) http.Handler {
283 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
284 uuid := uuid.New().String()
285 ctx := log.WithLogValues(parentContext, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
286 r = r.WithContext(ctx)
287 next.ServeHTTP(w, r)
288 })
289 }
290}
291func copyHeader(dst, src http.Header) {
292 for k, vv := range src {
293 // we'll handle CORS ourselves, thanks
294 if strings.HasPrefix(k, "Access-Control") {
295 continue
296 }
297 for _, v := range vv {
298 dst.Add(k, v)
299 }
300 }
301}
302
303// handler that takes care of static files and otherwise returns the index.html with the correct link card data
304func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
305 files, err := app.Files()
306 if err != nil {
307 return nil, err
308 }
309 fs := AppHostingFS{http.FS(files)}
310
311 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
312 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
313 f := strings.TrimPrefix(req.URL.Path, "/")
314 // under docs we need the index.html suffix due to astro rendering
315 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
316 f += "index.html"
317 }
318 _, err := fs.Open(f)
319 if err == nil {
320 fileHandler.ServeHTTP(w, req)
321 return
322 }
323 if errors.Is(err, ErrorIndex) || f == "" {
324 bs, err := linker.GenerateDefaultCard(ctx, req.URL, a.CLI.SentryDSN)
325 if err != nil {
326 log.Error(ctx, "error generating default card", "error", err)
327 }
328 w.Header().Set("Content-Type", "text/html")
329 if _, err := w.Write(bs); err != nil {
330 log.Error(ctx, "error writing response", "error", err)
331 }
332 } else {
333 log.Warn(ctx, "error opening file", "error", err)
334 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
335 }
336 })
337 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
338 proto := "http"
339 if req.TLS != nil {
340 proto = "https"
341 }
342 fwProto := req.Header.Get("x-forwarded-proto")
343 if fwProto != "" {
344 proto = fwProto
345 }
346 req.URL.Host = req.Host
347 req.URL.Scheme = proto
348 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
349 // quick check for things that aren't valid handles/dids
350 if strings.ContainsAny(maybeHandle, "/_") {
351 defaultHandler.ServeHTTP(w, req)
352 return
353 }
354 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
355 if err != nil || repo == nil {
356 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
357 defaultHandler.ServeHTTP(w, req)
358 return
359 }
360 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
361 if err != nil || ls == nil {
362 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
363 defaultHandler.ServeHTTP(w, req)
364 return
365 }
366 lsv, err := ls.ToLivestreamView()
367 if err != nil || lsv == nil {
368 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
369 defaultHandler.ServeHTTP(w, req)
370 return
371 }
372 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv, a.CLI.SentryDSN)
373 if err != nil {
374 log.Error(ctx, "error generating html", "error", err)
375 defaultHandler.ServeHTTP(w, req)
376 return
377 }
378 w.Header().Set("Content-Type", "text/html")
379 if _, err := w.Write(bs); err != nil {
380 log.Error(ctx, "error writing response", "error", err)
381 }
382 }), nil
383}
384
385func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
386 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
387 if !a.CLI.HasMist() {
388 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
389 return
390 }
391 stream := params.ByName("stream")
392 if stream == "" {
393 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
394 return
395 }
396
397 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
398 prefix := fmt.Sprintf(tmpl, fullstream)
399 resource := params.ByName("resource")
400
401 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
402
403 client := &http.Client{}
404 req.URL = &url.URL{
405 Scheme: "http",
406 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
407 Path: fmt.Sprintf("%s%s", prefix, resource),
408 RawQuery: req.URL.RawQuery,
409 }
410
411 //http: Request.RequestURI can't be set in client requests.
412 //http://golang.org/src/pkg/net/http/client.go
413 req.RequestURI = ""
414
415 resp, err := client.Do(req)
416 if err != nil {
417 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
418 return
419 }
420 defer resp.Body.Close()
421
422 copyHeader(w.Header(), resp.Header)
423 w.WriteHeader(resp.StatusCode)
424 if _, err := io.Copy(w, resp.Body); err != nil {
425 log.Error(ctx, "error writing response", "error", err)
426 }
427 }
428}
429
430func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
431 return func(w http.ResponseWriter, req *http.Request) {
432 noslash := req.URL.Path[1:]
433 ct, ok := a.Mimes[noslash]
434 if ok {
435 w.Header().Set("content-type", ct)
436 }
437 fs.ServeHTTP(w, req)
438 }
439}
440
441func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
442 var tlsPort string
443 var err error
444 if a.HTTPRedirectTLSPort != nil {
445 tlsPort = fmt.Sprintf("%d", *a.HTTPRedirectTLSPort)
446 } else {
447 _, tlsPort, err = net.SplitHostPort(a.CLI.HTTPSAddr)
448 if err != nil {
449 return nil, err
450 }
451 }
452 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
453 host, _, err := net.SplitHostPort(req.Host)
454 if err != nil {
455 host = req.Host
456 }
457 u := req.URL
458 if tlsPort == "443" {
459 u.Host = host
460 } else {
461 u.Host = net.JoinHostPort(host, tlsPort)
462 }
463 u.Scheme = "https"
464 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
465 }
466 mux := http.NewServeMux()
467 mux.HandleFunc("/", handleRedirect)
468 return mux, nil
469}
470
471type NotificationPayload struct {
472 Token string `json:"token"`
473 RepoDID string `json:"repoDID"`
474}
475
476func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
477 return func(w http.ResponseWriter, req *http.Request) {
478 w.WriteHeader(404)
479 }
480}
481
482func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
483 return func(w http.ResponseWriter, req *http.Request) {
484 payload, err := io.ReadAll(req.Body)
485 if err != nil {
486 log.Log(ctx, "error reading notification create", "error", err)
487 w.WriteHeader(400)
488 return
489 }
490 n := NotificationPayload{}
491 err = json.Unmarshal(payload, &n)
492 if err != nil {
493 log.Log(ctx, "error unmarshalling notification create", "error", err)
494 w.WriteHeader(400)
495 return
496 }
497 err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID)
498 if err != nil {
499 log.Log(ctx, "error creating notification", "error", err)
500 w.WriteHeader(400)
501 return
502 }
503 log.Log(ctx, "successfully created notification", "token", n.Token)
504 w.WriteHeader(200)
505 if n.RepoDID != "" {
506 go func() {
507 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
508 if err != nil {
509 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
510 }
511 }()
512 }
513 }
514}
515
516func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
517 return func(w http.ResponseWriter, req *http.Request) {
518 err := a.MediaManager.ValidateMP4(ctx, req.Body, false)
519 if err != nil {
520 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
521 return
522 }
523 w.WriteHeader(200)
524 }
525}
526
527func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
528 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
529 var event model.PlayerEventAPI
530 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
531 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
532 return
533 }
534 err := a.Model.CreatePlayerEvent(event)
535 if err != nil {
536 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
537 return
538 }
539 w.WriteHeader(201)
540 }
541}
542
543func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
544 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
545 segs, err := a.Model.MostRecentSegments()
546 if err != nil {
547 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
548 return
549 }
550 bs, err := json.Marshal(segs)
551 if err != nil {
552 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
553 return
554 }
555 w.Header().Add("Content-Type", "application/json")
556 if _, err := w.Write(bs); err != nil {
557 log.Error(ctx, "error writing response", "error", err)
558 }
559 }
560}
561
562func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
563 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
564 user := params.ByName("repoDID")
565 if user == "" {
566 apierrors.WriteHTTPBadRequest(w, "user required", nil)
567 return
568 }
569 user, err := a.NormalizeUser(ctx, user)
570 if err != nil {
571 apierrors.WriteHTTPNotFound(w, "user not found", err)
572 return
573 }
574 seg, err := a.Model.LatestSegmentForUser(user)
575 if err != nil {
576 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
577 return
578 }
579 streamplaceSeg, err := seg.ToStreamplaceSegment()
580 if err != nil {
581 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
582 return
583 }
584 bs, err := json.Marshal(streamplaceSeg)
585 if err != nil {
586 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
587 return
588 }
589 w.Header().Add("Content-Type", "application/json")
590 if _, err := w.Write(bs); err != nil {
591 log.Error(ctx, "error writing response", "error", err)
592 }
593 }
594}
595
596func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
597 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
598 user := params.ByName("user")
599 if user == "" {
600 apierrors.WriteHTTPBadRequest(w, "user required", nil)
601 return
602 }
603 user, err := a.NormalizeUser(ctx, user)
604 if err != nil {
605 apierrors.WriteHTTPNotFound(w, "user not found", err)
606 return
607 }
608 count := spmetrics.GetViewCount(user)
609 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
610 if err != nil {
611 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
612 return
613 }
614 if _, err := w.Write(bs); err != nil {
615 log.Error(ctx, "error writing response", "error", err)
616 }
617 }
618}
619
620func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
621 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
622 log.Log(ctx, "got bluesky notification", "params", params)
623 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
624 if err != nil {
625 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
626 return
627 }
628 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
629 if err != nil {
630 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
631 return
632 }
633 bs, err := json.Marshal(signingKeys)
634 if err != nil {
635 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
636 return
637 }
638 if _, err := w.Write(bs); err != nil {
639 log.Error(ctx, "error writing response", "error", err)
640 }
641 }
642}
643
644type ChatResponse struct {
645 Post *bsky.FeedPost `json:"post"`
646 Repo *model.Repo `json:"repo"`
647 CID string `json:"cid"`
648}
649
650func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
651 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
652 user := params.ByName("repoDID")
653 if user == "" {
654 apierrors.WriteHTTPBadRequest(w, "user required", nil)
655 return
656 }
657 repoDID, err := a.NormalizeUser(ctx, user)
658 if err != nil {
659 apierrors.WriteHTTPNotFound(w, "user not found", err)
660 return
661 }
662 replies, err := a.Model.GetReplies(repoDID)
663 if err != nil {
664 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
665 return
666 }
667 bs, err := json.Marshal(replies)
668 if err != nil {
669 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
670 return
671 }
672 w.Header().Set("Content-Type", "application/json")
673 if _, err := w.Write(bs); err != nil {
674 log.Error(ctx, "error writing response", "error", err)
675 }
676 }
677}
678
679func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
680 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
681 user := params.ByName("repoDID")
682 if user == "" {
683 apierrors.WriteHTTPBadRequest(w, "user required", nil)
684 return
685 }
686 repoDID, err := a.NormalizeUser(ctx, user)
687 if err != nil {
688 apierrors.WriteHTTPNotFound(w, "user not found", err)
689 return
690 }
691 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
692 if err != nil {
693 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
694 return
695 }
696
697 doc, err := livestream.ToLivestreamView()
698 if err != nil {
699 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
700 return
701 }
702
703 if livestream == nil {
704 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
705 return
706 }
707
708 bs, err := json.Marshal(doc)
709 if err != nil {
710 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
711 return
712 }
713 w.Header().Set("Content-Type", "application/json")
714 if _, err := w.Write(bs); err != nil {
715 log.Error(ctx, "error writing response", "error", err)
716 }
717 }
718}
719
720func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
721 return func(next http.Handler) http.Handler {
722 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
723 ip, _, err := net.SplitHostPort(req.RemoteAddr)
724 if err != nil {
725 ip = req.RemoteAddr
726 }
727
728 if a.CLI.RateLimitPerSecond > 0 {
729 limiter := a.getLimiter(ip)
730
731 if !limiter.Allow() {
732 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
733 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
734 return
735 }
736 }
737
738 next.ServeHTTP(w, req)
739 })
740 }
741}
742
743// helper for getting a listener from a systemd file descriptor
744func getListenerFromFD(fdName string) (net.Listener, error) {
745 log.Debug(context.TODO(), "getting listener from fd", "fdName", fdName, "LISTEN_PID", os.Getenv("LISTEN_PID"), "LISTEN_FDNAMES", os.Getenv("LISTEN_FDNAMES"))
746 if os.Getenv("LISTEN_PID") == strconv.Itoa(os.Getpid()) {
747 names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":")
748 for i, name := range names {
749 if name == fdName {
750 log.Warn(context.TODO(), "using systemd file descriptor", "fdName", fdName, "fdIndex", i+3)
751 f1 := os.NewFile(uintptr(i+3), fdName)
752 return net.FileListener(f1)
753 }
754 }
755 }
756 return nil, nil
757}
758
759func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
760 handler, err := a.Handler(ctx)
761 if err != nil {
762 return err
763 }
764 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
765 ln, err := getListenerFromFD("http")
766 if err != nil {
767 return err
768 }
769 if ln == nil {
770 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
771 if err != nil {
772 return err
773 }
774 } else {
775 log.Warn(ctx, "api server listening for http over systemd socket", "addr", ln.Addr())
776 }
777 log.Log(ctx, "http server starting", "addr", ln.Addr())
778 return s.Serve(ln)
779 })
780}
781
782func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
783 handler, err := a.RedirectHandler(ctx)
784 if err != nil {
785 return err
786 }
787 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
788 ln, err := getListenerFromFD("http")
789 if err != nil {
790 return err
791 }
792 if ln == nil {
793 ln, err = net.Listen("tcp", a.CLI.HTTPAddr)
794 if err != nil {
795 return err
796 }
797 } else {
798 log.Warn(ctx, "http tls redirect server listening for http over systemd socket", "addr", ln.Addr())
799 }
800 log.Log(ctx, "http tls redirect server starting", "addr", ln.Addr())
801 return s.Serve(ln)
802 })
803}
804
805func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
806 handler, err := a.Handler(ctx)
807 if err != nil {
808 return err
809 }
810 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
811 ln, err := getListenerFromFD("https")
812 if err != nil {
813 return err
814 }
815 if ln == nil {
816 ln, err = net.Listen("tcp", a.CLI.HTTPSAddr)
817 if err != nil {
818 return err
819 }
820 } else {
821 // tell the redirect handler we're using systemd and they should go to 443
822 port443 := 443
823 a.HTTPRedirectTLSPort = &port443
824 log.Warn(ctx, "https server listening for https over systemd socket", "addr", ln.Addr())
825 }
826 log.Log(ctx, "https server starting",
827 "addr", ln.Addr(),
828 "certPath", a.CLI.TLSCertPath,
829 "keyPath", a.CLI.TLSKeyPath,
830 )
831 return s.ServeTLS(ln, a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
832 })
833}
834
835func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
836 ctx, cancel := context.WithCancel(ctx)
837 handler = gziphandler.GzipHandler(handler)
838 server := http.Server{Handler: handler}
839 var serveErr error
840 go func() {
841 serveErr = serve(&server)
842 cancel()
843 }()
844 <-ctx.Done()
845 if serveErr != nil {
846 return fmt.Errorf("error in http server: %w", serveErr)
847 }
848
849 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
850 defer cancel()
851 return server.Shutdown(ctx)
852}
853
854func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
855 return func(w http.ResponseWriter, req *http.Request) {
856 w.WriteHeader(200)
857 }
858}
859
860func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
861 a.limitersMu.Lock()
862 defer a.limitersMu.Unlock()
863
864 limiter, exists := a.limiters[ip]
865 if !exists {
866 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
867 a.limiters[ip] = limiter
868 }
869
870 return limiter
871}
872
873func (a *StreamplaceAPI) HandleClip(ctx context.Context) httprouter.Handle {
874 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
875 user := params.ByName("user")
876 file := params.ByName("file")
877 if user == "" || file == "" {
878 apierrors.WriteHTTPBadRequest(w, "user and file required", nil)
879 return
880 }
881 user, err := a.NormalizeUser(ctx, user)
882 if err != nil {
883 apierrors.WriteHTTPNotFound(w, "user not found", err)
884 return
885 }
886 fPath := []string{user, "clips", file}
887 exists, err := a.CLI.DataFileExists(fPath)
888 if err != nil {
889 apierrors.WriteHTTPInternalServerError(w, "could not check if file exists", err)
890 return
891 }
892 if !exists {
893 apierrors.WriteHTTPNotFound(w, "file not found", nil)
894 return
895 }
896 fd, err := os.Open(a.CLI.DataFilePath(fPath))
897 if err != nil {
898 apierrors.WriteHTTPInternalServerError(w, "could not open file", err)
899 return
900 }
901 defer fd.Close()
902 w.Header().Set("Content-Type", "video/mp4")
903 if _, err := io.Copy(w, fd); err != nil {
904 log.Error(ctx, "error writing response", "error", err)
905 }
906 }
907}
908
909func NewWebsocketTracker(maxConns int) *WebsocketTracker {
910 return &WebsocketTracker{
911 connections: make(map[string]int),
912 maxConnsPerIP: maxConns,
913 }
914}
915
916func (t *WebsocketTracker) AddConnection(ip string) bool {
917 t.mu.Lock()
918 defer t.mu.Unlock()
919
920 count := t.connections[ip]
921
922 if count >= t.maxConnsPerIP {
923 return false
924 }
925
926 t.connections[ip] = count + 1
927 return true
928}
929
930func (t *WebsocketTracker) RemoveConnection(ip string) {
931 t.mu.Lock()
932 defer t.mu.Unlock()
933
934 count := t.connections[ip]
935 if count > 0 {
936 t.connections[ip] = count - 1
937 }
938
939 if t.connections[ip] == 0 {
940 delete(t.connections, ip)
941 }
942}