Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/google/uuid"
22 "github.com/julienschmidt/httprouter"
23 "github.com/rs/cors"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/time/rate"
26
27 "github.com/streamplace/oatproxy/pkg/oatproxy"
28 "stream.place/streamplace/js/app"
29 "stream.place/streamplace/pkg/atproto"
30 "stream.place/streamplace/pkg/bus"
31 "stream.place/streamplace/pkg/config"
32 "stream.place/streamplace/pkg/crypto/signers/eip712"
33 "stream.place/streamplace/pkg/director"
34 apierrors "stream.place/streamplace/pkg/errors"
35 "stream.place/streamplace/pkg/linking"
36 "stream.place/streamplace/pkg/log"
37 "stream.place/streamplace/pkg/media"
38 "stream.place/streamplace/pkg/mist/mistconfig"
39 "stream.place/streamplace/pkg/model"
40 "stream.place/streamplace/pkg/notifications"
41 "stream.place/streamplace/pkg/spmetrics"
42 "stream.place/streamplace/pkg/spxrpc"
43 "stream.place/streamplace/pkg/streamplace"
44
45 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
46 "github.com/slok/go-http-metrics/middleware"
47 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
48 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
49 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
50)
51
52type StreamplaceAPI struct {
53 CLI *config.CLI
54 Model model.Model
55 Updater *Updater
56 Signer *eip712.EIP712Signer
57 Mimes map[string]string
58 FirebaseNotifier notifications.FirebaseNotifier
59 MediaManager *media.MediaManager
60 MediaSigner media.MediaSigner
61 // not thread-safe yet
62 Aliases map[string]string
63 Bus *bus.Bus
64 ATSync *atproto.ATProtoSynchronizer
65 Director *director.Director
66
67 connTracker *WebsocketTracker
68
69 limiters map[string]*rate.Limiter
70 limitersMu sync.Mutex
71 SignerCache map[string]media.MediaSigner
72 SignerCacheMu sync.Mutex
73 op *oatproxy.OATProxy
74}
75
76type WebsocketTracker struct {
77 connections map[string]int
78 maxConnsPerIP int
79 mu sync.RWMutex
80}
81
82func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
83 updater, err := PrepareUpdater(cli)
84 if err != nil {
85 return nil, err
86 }
87 a := &StreamplaceAPI{CLI: cli,
88 Model: mod,
89 Updater: updater,
90 Signer: signer,
91 FirebaseNotifier: noter,
92 MediaManager: mm,
93 MediaSigner: ms,
94 Aliases: map[string]string{},
95 Bus: bus,
96 ATSync: atsync,
97 Director: d,
98 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
99 limiters: make(map[string]*rate.Limiter),
100 SignerCache: make(map[string]media.MediaSigner),
101 op: op,
102 }
103 a.Mimes, err = updater.GetMimes()
104 if err != nil {
105 return nil, err
106 }
107 return a, nil
108}
109
110type AppHostingFS struct {
111 http.FileSystem
112}
113
114var ErrorIndex = errors.New("not found, use index.html")
115
116func (fs AppHostingFS) Open(name string) (http.File, error) {
117 file, err1 := fs.FileSystem.Open(name)
118 if err1 == nil {
119 return file, nil
120 }
121 if !errors.Is(err1, os.ErrNotExist) {
122 return nil, err1
123 }
124
125 return nil, ErrorIndex
126}
127
128// api/playback/iame.li/webrtc?rendition=source
129// api/playback/iame.li/stream.mp4?rendition=source
130// api/playback/iame.li/stream.webm?rendition=source
131// api/playback/iame.li/hls/index.m3u8
132// api/playback/iame.li/hls/source/stream.m3u8
133// api/playback/iame.li/hls/source/000000000000.ts
134
135func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
136
137 mdlw := middleware.New(middleware.Config{
138 Recorder: metrics.NewRecorder(metrics.Config{}),
139 })
140 var xrpc http.Handler
141 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw)
142 if err != nil {
143 return nil, err
144 }
145 router := httprouter.New()
146
147 // Create our middleware factory with the default settings.
148
149 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
150
151 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
152
153 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
154 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
155 }
156 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
157 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
158 }
159
160 router.Handler("GET", "/oauth/*anything", a.op.Handler())
161 router.Handler("POST", "/oauth/*anything", a.op.Handler())
162 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
163 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
164 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
165 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
166 apiRouter := httprouter.New()
167 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
168 // old clients
169 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
170 // new ones
171 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
172 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
173 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
174 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
175 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
176 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
177 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
178 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
179 addHandle(apiRouter, "GET", "/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
180 addHandle(apiRouter, "GET", "/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
181 // they're jpegs now
182 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
183 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
184 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
185 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
186 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
187 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
188 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
189 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
190 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
191 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
192 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
193 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
194 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
195 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
196 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
197 apiRouter.NotFound = a.HandleAPI404(ctx)
198 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
199 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
200 router.Handler("GET", "/api/*resource", apiRouterHandler)
201 router.Handler("POST", "/api/*resource", apiRouterHandler)
202 router.Handler("PUT", "/api/*resource", apiRouterHandler)
203 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
204 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
205 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
206 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
207 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
208 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
209 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
210 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
211 router.GET("/dl/*params", a.HandleAppDownload(ctx))
212 router.POST("/", a.HandleWebRTCIngest(ctx))
213 for _, redirect := range a.CLI.Redirects {
214 parts := strings.Split(redirect, ":")
215 if len(parts) != 2 {
216 log.Error(ctx, "invalid redirect", "redirect", redirect)
217 return nil, fmt.Errorf("invalid redirect: %s", redirect)
218 }
219 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
220 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
221 })
222 }
223 if a.CLI.FrontendProxy != "" {
224 u, err := url.Parse(a.CLI.FrontendProxy)
225 if err != nil {
226 return nil, err
227 }
228 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
229 router.NotFound = &httputil.ReverseProxy{
230 Rewrite: func(r *httputil.ProxyRequest) {
231 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
232 // we need to use 127.0.0.1 because the atproto oauth client requires it
233 r.Out.Header.Set("Origin", u.String())
234 r.SetURL(u)
235 },
236 }
237 } else {
238 files, err := app.Files()
239 if err != nil {
240 return nil, err
241 }
242 index, err := files.Open("index.html")
243 if err != nil {
244 return nil, err
245 }
246 bs, err := io.ReadAll(index)
247 if err != nil {
248 return nil, err
249 }
250 linker, err := linking.NewLinker(ctx, bs)
251 if err != nil {
252 return nil, err
253 }
254 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
255 if err != nil {
256 return nil, err
257 }
258 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
259 }
260 // needed because the WebRTC handler issues 405s from / otherwise
261 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
262 router.NotFound.ServeHTTP(w, r)
263 })
264 handler := sloghttp.Recovery(router)
265 handler = cors.AllowAll().Handler(handler)
266 handler = sloghttp.New(slog.Default())(handler)
267 handler = a.RateLimitMiddleware(ctx)(handler)
268
269 // this needs to be LAST so nothing else clobbers the context
270 handler = a.ContextMiddleware(ctx)(handler)
271
272 return handler, nil
273}
274func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler {
275 return func(next http.Handler) http.Handler {
276 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
277 uuid := uuid.New().String()
278 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
279 r = r.WithContext(ctx)
280 next.ServeHTTP(w, r)
281 })
282 }
283}
284func copyHeader(dst, src http.Header) {
285 for k, vv := range src {
286 // we'll handle CORS ourselves, thanks
287 if strings.HasPrefix(k, "Access-Control") {
288 continue
289 }
290 for _, v := range vv {
291 dst.Add(k, v)
292 }
293 }
294}
295
296// handler that takes care of static files and otherwise returns the index.html with the correct link card data
297func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
298 files, err := app.Files()
299 if err != nil {
300 return nil, err
301 }
302 fs := AppHostingFS{http.FS(files)}
303
304 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
305 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
306 f := strings.TrimPrefix(req.URL.Path, "/")
307 // under docs we need the index.html suffix due to astro rendering
308 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
309 f += "index.html"
310 }
311 _, err := fs.Open(f)
312 if err == nil {
313 fileHandler.ServeHTTP(w, req)
314 return
315 }
316 if errors.Is(err, ErrorIndex) || f == "" {
317 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
318 if err != nil {
319 log.Error(ctx, "error generating default card", "error", err)
320 }
321 w.Header().Set("Content-Type", "text/html")
322 if _, err := w.Write(bs); err != nil {
323 log.Error(ctx, "error writing response", "error", err)
324 }
325 } else {
326 log.Warn(ctx, "error opening file", "error", err)
327 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
328 }
329 })
330 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
331 proto := "http"
332 if req.TLS != nil {
333 proto = "https"
334 }
335 fwProto := req.Header.Get("x-forwarded-proto")
336 if fwProto != "" {
337 proto = fwProto
338 }
339 req.URL.Host = req.Host
340 req.URL.Scheme = proto
341 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
342 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
343 if err != nil || repo == nil {
344 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
345 defaultHandler.ServeHTTP(w, req)
346 return
347 }
348 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
349 if err != nil || ls == nil {
350 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
351 defaultHandler.ServeHTTP(w, req)
352 return
353 }
354 lsv, err := ls.ToLivestreamView()
355 if err != nil || lsv == nil {
356 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
357 defaultHandler.ServeHTTP(w, req)
358 return
359 }
360 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
361 if err != nil {
362 log.Error(ctx, "error generating html", "error", err)
363 defaultHandler.ServeHTTP(w, req)
364 return
365 }
366 w.Header().Set("Content-Type", "text/html")
367 if _, err := w.Write(bs); err != nil {
368 log.Error(ctx, "error writing response", "error", err)
369 }
370 }), nil
371}
372
373func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
374 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
375 if !a.CLI.HasMist() {
376 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
377 return
378 }
379 stream := params.ByName("stream")
380 if stream == "" {
381 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
382 return
383 }
384
385 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
386 prefix := fmt.Sprintf(tmpl, fullstream)
387 resource := params.ByName("resource")
388
389 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
390
391 client := &http.Client{}
392 req.URL = &url.URL{
393 Scheme: "http",
394 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
395 Path: fmt.Sprintf("%s%s", prefix, resource),
396 RawQuery: req.URL.RawQuery,
397 }
398
399 //http: Request.RequestURI can't be set in client requests.
400 //http://golang.org/src/pkg/net/http/client.go
401 req.RequestURI = ""
402
403 resp, err := client.Do(req)
404 if err != nil {
405 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
406 return
407 }
408 defer resp.Body.Close()
409
410 copyHeader(w.Header(), resp.Header)
411 w.WriteHeader(resp.StatusCode)
412 if _, err := io.Copy(w, resp.Body); err != nil {
413 log.Error(ctx, "error writing response", "error", err)
414 }
415 }
416}
417
418func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
419 return func(w http.ResponseWriter, req *http.Request) {
420 noslash := req.URL.Path[1:]
421 ct, ok := a.Mimes[noslash]
422 if ok {
423 w.Header().Set("content-type", ct)
424 }
425 fs.ServeHTTP(w, req)
426 }
427}
428
429func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
430 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr)
431 if err != nil {
432 return nil, err
433 }
434 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
435 host, _, err := net.SplitHostPort(req.Host)
436 if err != nil {
437 host = req.Host
438 }
439 u := req.URL
440 if tlsPort == "443" {
441 u.Host = host
442 } else {
443 u.Host = net.JoinHostPort(host, tlsPort)
444 }
445 u.Scheme = "https"
446 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
447 }
448 mux := http.NewServeMux()
449 mux.HandleFunc("/", handleRedirect)
450 return mux, nil
451}
452
453type NotificationPayload struct {
454 Token string `json:"token"`
455 RepoDID string `json:"repoDID"`
456}
457
458func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
459 return func(w http.ResponseWriter, req *http.Request) {
460 w.WriteHeader(404)
461 }
462}
463
464func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
465 return func(w http.ResponseWriter, req *http.Request) {
466 payload, err := io.ReadAll(req.Body)
467 if err != nil {
468 log.Log(ctx, "error reading notification create", "error", err)
469 w.WriteHeader(400)
470 return
471 }
472 n := NotificationPayload{}
473 err = json.Unmarshal(payload, &n)
474 if err != nil {
475 log.Log(ctx, "error unmarshalling notification create", "error", err)
476 w.WriteHeader(400)
477 return
478 }
479 err = a.Model.CreateNotification(n.Token, n.RepoDID)
480 if err != nil {
481 log.Log(ctx, "error creating notification", "error", err)
482 w.WriteHeader(400)
483 return
484 }
485 log.Log(ctx, "successfully created notification", "token", n.Token)
486 w.WriteHeader(200)
487 if n.RepoDID != "" {
488 go func() {
489 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
490 if err != nil {
491 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
492 }
493 }()
494 }
495 }
496}
497
498func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
499 return func(w http.ResponseWriter, req *http.Request) {
500 err := a.MediaManager.ValidateMP4(ctx, req.Body)
501 if err != nil {
502 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
503 return
504 }
505 w.WriteHeader(200)
506 }
507}
508
509func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
510 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
511 var event model.PlayerEventAPI
512 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
513 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
514 return
515 }
516 err := a.Model.CreatePlayerEvent(event)
517 if err != nil {
518 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
519 return
520 }
521 w.WriteHeader(201)
522 }
523}
524
525func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
526 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
527 segs, err := a.Model.MostRecentSegments()
528 if err != nil {
529 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
530 return
531 }
532 bs, err := json.Marshal(segs)
533 if err != nil {
534 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
535 return
536 }
537 w.Header().Add("Content-Type", "application/json")
538 if _, err := w.Write(bs); err != nil {
539 log.Error(ctx, "error writing response", "error", err)
540 }
541 }
542}
543
544func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
545 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
546 user := params.ByName("repoDID")
547 if user == "" {
548 apierrors.WriteHTTPBadRequest(w, "user required", nil)
549 return
550 }
551 user, err := a.NormalizeUser(ctx, user)
552 if err != nil {
553 apierrors.WriteHTTPNotFound(w, "user not found", err)
554 return
555 }
556 seg, err := a.Model.LatestSegmentForUser(user)
557 if err != nil {
558 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
559 return
560 }
561 streamplaceSeg, err := seg.ToStreamplaceSegment()
562 if err != nil {
563 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
564 return
565 }
566 bs, err := json.Marshal(streamplaceSeg)
567 if err != nil {
568 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
569 return
570 }
571 w.Header().Add("Content-Type", "application/json")
572 if _, err := w.Write(bs); err != nil {
573 log.Error(ctx, "error writing response", "error", err)
574 }
575 }
576}
577
578func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
579 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
580 user := params.ByName("user")
581 if user == "" {
582 apierrors.WriteHTTPBadRequest(w, "user required", nil)
583 return
584 }
585 user, err := a.NormalizeUser(ctx, user)
586 if err != nil {
587 apierrors.WriteHTTPNotFound(w, "user not found", err)
588 return
589 }
590 count := spmetrics.GetViewCount(user)
591 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
592 if err != nil {
593 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
594 return
595 }
596 if _, err := w.Write(bs); err != nil {
597 log.Error(ctx, "error writing response", "error", err)
598 }
599 }
600}
601
602func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
603 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
604 log.Log(ctx, "got bluesky notification", "params", params)
605 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
606 if err != nil {
607 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
608 return
609 }
610 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
611 if err != nil {
612 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
613 return
614 }
615 bs, err := json.Marshal(signingKeys)
616 if err != nil {
617 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
618 return
619 }
620 if _, err := w.Write(bs); err != nil {
621 log.Error(ctx, "error writing response", "error", err)
622 }
623 }
624}
625
626type ChatResponse struct {
627 Post *bsky.FeedPost `json:"post"`
628 Repo *model.Repo `json:"repo"`
629 CID string `json:"cid"`
630}
631
632func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
633 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
634 user := params.ByName("repoDID")
635 if user == "" {
636 apierrors.WriteHTTPBadRequest(w, "user required", nil)
637 return
638 }
639 repoDID, err := a.NormalizeUser(ctx, user)
640 if err != nil {
641 apierrors.WriteHTTPNotFound(w, "user not found", err)
642 return
643 }
644 replies, err := a.Model.GetReplies(repoDID)
645 if err != nil {
646 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
647 return
648 }
649 bs, err := json.Marshal(replies)
650 if err != nil {
651 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
652 return
653 }
654 w.Header().Set("Content-Type", "application/json")
655 if _, err := w.Write(bs); err != nil {
656 log.Error(ctx, "error writing response", "error", err)
657 }
658 }
659}
660
661func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
662 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
663 user := params.ByName("repoDID")
664 if user == "" {
665 apierrors.WriteHTTPBadRequest(w, "user required", nil)
666 return
667 }
668 repoDID, err := a.NormalizeUser(ctx, user)
669 if err != nil {
670 apierrors.WriteHTTPNotFound(w, "user not found", err)
671 return
672 }
673 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
674 if err != nil {
675 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
676 return
677 }
678
679 doc, err := livestream.ToLivestreamView()
680 if err != nil {
681 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
682 return
683 }
684
685 if livestream == nil {
686 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
687 return
688 }
689
690 bs, err := json.Marshal(doc)
691 if err != nil {
692 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
693 return
694 }
695 w.Header().Set("Content-Type", "application/json")
696 if _, err := w.Write(bs); err != nil {
697 log.Error(ctx, "error writing response", "error", err)
698 }
699 }
700}
701
702func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
703 return func(next http.Handler) http.Handler {
704 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
705 ip, _, err := net.SplitHostPort(req.RemoteAddr)
706 if err != nil {
707 ip = req.RemoteAddr
708 }
709
710 if a.CLI.RateLimitPerSecond > 0 {
711 limiter := a.getLimiter(ip)
712
713 if !limiter.Allow() {
714 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
715 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
716 return
717 }
718 }
719
720 next.ServeHTTP(w, req)
721 })
722 }
723}
724
725func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
726 handler, err := a.Handler(ctx)
727 if err != nil {
728 return err
729 }
730 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
731 s.Addr = a.CLI.HTTPAddr
732 log.Log(ctx, "http server starting", "addr", s.Addr)
733 return s.ListenAndServe()
734 })
735}
736
737func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
738 handler, err := a.RedirectHandler(ctx)
739 if err != nil {
740 return err
741 }
742 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
743 s.Addr = a.CLI.HTTPAddr
744 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
745 return s.ListenAndServe()
746 })
747}
748
749func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
750 handler, err := a.Handler(ctx)
751 if err != nil {
752 return err
753 }
754 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
755 s.Addr = a.CLI.HTTPSAddr
756 log.Log(ctx, "https server starting",
757 "addr", s.Addr,
758 "certPath", a.CLI.TLSCertPath,
759 "keyPath", a.CLI.TLSKeyPath,
760 )
761 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
762 })
763}
764
765func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
766 ctx, cancel := context.WithCancel(ctx)
767 handler = gziphandler.GzipHandler(handler)
768 server := http.Server{Handler: handler}
769 var serveErr error
770 go func() {
771 serveErr = serve(&server)
772 cancel()
773 }()
774 <-ctx.Done()
775 if serveErr != nil {
776 return fmt.Errorf("error in http server: %w", serveErr)
777 }
778
779 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
780 defer cancel()
781 return server.Shutdown(ctx)
782}
783
784func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
785 return func(w http.ResponseWriter, req *http.Request) {
786 w.WriteHeader(200)
787 }
788}
789
790func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
791 a.limitersMu.Lock()
792 defer a.limitersMu.Unlock()
793
794 limiter, exists := a.limiters[ip]
795 if !exists {
796 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
797 a.limiters[ip] = limiter
798 }
799
800 return limiter
801}
802
803func NewWebsocketTracker(maxConns int) *WebsocketTracker {
804 return &WebsocketTracker{
805 connections: make(map[string]int),
806 maxConnsPerIP: maxConns,
807 }
808}
809
810func (t *WebsocketTracker) AddConnection(ip string) bool {
811 t.mu.Lock()
812 defer t.mu.Unlock()
813
814 count := t.connections[ip]
815
816 if count >= t.maxConnsPerIP {
817 return false
818 }
819
820 t.connections[ip] = count + 1
821 return true
822}
823
824func (t *WebsocketTracker) RemoveConnection(ip string) {
825 t.mu.Lock()
826 defer t.mu.Unlock()
827
828 count := t.connections[ip]
829 if count > 0 {
830 t.connections[ip] = count - 1
831 }
832
833 if t.connections[ip] == 0 {
834 delete(t.connections, ip)
835 }
836}