Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "strings"
15 "sync"
16 "time"
17
18 "github.com/NYTimes/gziphandler"
19 "github.com/bluesky-social/indigo/api/bsky"
20 "github.com/google/uuid"
21 "github.com/julienschmidt/httprouter"
22 "github.com/rs/cors"
23 sloghttp "github.com/samber/slog-http"
24 "golang.org/x/time/rate"
25
26 "github.com/streamplace/oatproxy/pkg/oatproxy"
27 "stream.place/streamplace/js/app"
28 "stream.place/streamplace/pkg/atproto"
29 "stream.place/streamplace/pkg/bus"
30 "stream.place/streamplace/pkg/config"
31 "stream.place/streamplace/pkg/crypto/signers/eip712"
32 "stream.place/streamplace/pkg/director"
33 apierrors "stream.place/streamplace/pkg/errors"
34 "stream.place/streamplace/pkg/linking"
35 "stream.place/streamplace/pkg/log"
36 "stream.place/streamplace/pkg/media"
37 "stream.place/streamplace/pkg/mist/mistconfig"
38 "stream.place/streamplace/pkg/model"
39 "stream.place/streamplace/pkg/notifications"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/spxrpc"
42 "stream.place/streamplace/pkg/streamplace"
43
44 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
45 "github.com/slok/go-http-metrics/middleware"
46 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
47 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
48 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
49)
50
51type StreamplaceAPI struct {
52 CLI *config.CLI
53 Model model.Model
54 Updater *Updater
55 Signer *eip712.EIP712Signer
56 Mimes map[string]string
57 FirebaseNotifier notifications.FirebaseNotifier
58 MediaManager *media.MediaManager
59 MediaSigner media.MediaSigner
60 // not thread-safe yet
61 Aliases map[string]string
62 Bus *bus.Bus
63 ATSync *atproto.ATProtoSynchronizer
64 Director *director.Director
65
66 connTracker *WebsocketTracker
67
68 limiters map[string]*rate.Limiter
69 limitersMu sync.Mutex
70 SignerCache map[string]media.MediaSigner
71 SignerCacheMu sync.Mutex
72 op *oatproxy.OATProxy
73}
74
75type WebsocketTracker struct {
76 connections map[string]int
77 maxConnsPerIP int
78 mu sync.RWMutex
79}
80
81func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
82 updater, err := PrepareUpdater(cli)
83 if err != nil {
84 return nil, err
85 }
86 a := &StreamplaceAPI{CLI: cli,
87 Model: mod,
88 Updater: updater,
89 Signer: signer,
90 FirebaseNotifier: noter,
91 MediaManager: mm,
92 MediaSigner: ms,
93 Aliases: map[string]string{},
94 Bus: bus,
95 ATSync: atsync,
96 Director: d,
97 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
98 limiters: make(map[string]*rate.Limiter),
99 SignerCache: make(map[string]media.MediaSigner),
100 op: op,
101 }
102 a.Mimes, err = updater.GetMimes()
103 if err != nil {
104 return nil, err
105 }
106 return a, nil
107}
108
109type AppHostingFS struct {
110 http.FileSystem
111}
112
113var ErrorIndex = errors.New("not found, use index.html")
114
115func (fs AppHostingFS) Open(name string) (http.File, error) {
116 file, err1 := fs.FileSystem.Open(name)
117 if err1 == nil {
118 return file, nil
119 }
120 return nil, ErrorIndex
121}
122
123// api/playback/iame.li/webrtc?rendition=source
124// api/playback/iame.li/stream.mp4?rendition=source
125// api/playback/iame.li/stream.webm?rendition=source
126// api/playback/iame.li/hls/index.m3u8
127// api/playback/iame.li/hls/source/stream.m3u8
128// api/playback/iame.li/hls/source/000000000000.ts
129
130func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
131
132 mdlw := middleware.New(middleware.Config{
133 Recorder: metrics.NewRecorder(metrics.Config{}),
134 })
135 var xrpc http.Handler
136 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw)
137 if err != nil {
138 return nil, err
139 }
140 router := httprouter.New()
141
142 // Create our middleware factory with the default settings.
143
144 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
145
146 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
147
148 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
149 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
150 }
151 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
152 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
153 }
154
155 router.Handler("GET", "/oauth/*anything", a.op.Handler())
156 router.Handler("POST", "/oauth/*anything", a.op.Handler())
157 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
158 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
159 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
160 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
161 apiRouter := httprouter.New()
162 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
163 // old clients
164 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
165 // new ones
166 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
167 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
168 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
169 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
170 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
171 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
172 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
173 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
174 addHandle(apiRouter, "GET", "/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
175 addHandle(apiRouter, "GET", "/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
176 // they're jpegs now
177 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
178 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
179 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
180 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
181 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
182 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
183 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
184 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
185 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
186 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
187 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
188 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
189 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
190 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
191 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
192 apiRouter.NotFound = a.HandleAPI404(ctx)
193 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
194 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
195 router.Handler("GET", "/api/*resource", apiRouterHandler)
196 router.Handler("POST", "/api/*resource", apiRouterHandler)
197 router.Handler("PUT", "/api/*resource", apiRouterHandler)
198 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
199 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
200 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
201 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
202 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
203 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
204 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
205 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
206 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
207 router.GET("/dl/*params", a.HandleAppDownload(ctx))
208 router.POST("/", a.HandleWebRTCIngest(ctx))
209 for _, redirect := range a.CLI.Redirects {
210 parts := strings.Split(redirect, ":")
211 if len(parts) != 2 {
212 log.Error(ctx, "invalid redirect", "redirect", redirect)
213 return nil, fmt.Errorf("invalid redirect: %s", redirect)
214 }
215 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
216 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
217 })
218 }
219 if a.CLI.FrontendProxy != "" {
220 u, err := url.Parse(a.CLI.FrontendProxy)
221 if err != nil {
222 return nil, err
223 }
224 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
225 router.NotFound = &httputil.ReverseProxy{
226 Rewrite: func(r *httputil.ProxyRequest) {
227 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
228 // we need to use 127.0.0.1 because the atproto oauth client requires it
229 r.Out.Header.Set("Origin", u.String())
230 r.SetURL(u)
231 },
232 }
233 } else {
234 files, err := app.Files()
235 if err != nil {
236 return nil, err
237 }
238 index, err := files.Open("index.html")
239 if err != nil {
240 return nil, err
241 }
242 bs, err := io.ReadAll(index)
243 if err != nil {
244 return nil, err
245 }
246 linker, err := linking.NewLinker(ctx, bs)
247 if err != nil {
248 return nil, err
249 }
250 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
251 if err != nil {
252 return nil, err
253 }
254 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
255 }
256 // needed because the WebRTC handler issues 405s from / otherwise
257 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
258 router.NotFound.ServeHTTP(w, r)
259 })
260 handler := sloghttp.Recovery(router)
261 handler = cors.AllowAll().Handler(handler)
262 handler = sloghttp.New(slog.Default())(handler)
263 handler = a.RateLimitMiddleware(ctx)(handler)
264
265 // this needs to be LAST so nothing else clobbers the context
266 handler = a.ContextMiddleware(ctx)(handler)
267
268 return handler, nil
269}
270func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler {
271 return func(next http.Handler) http.Handler {
272 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
273 uuid := uuid.New().String()
274 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
275 r = r.WithContext(ctx)
276 next.ServeHTTP(w, r)
277 })
278 }
279}
280func copyHeader(dst, src http.Header) {
281 for k, vv := range src {
282 // we'll handle CORS ourselves, thanks
283 if strings.HasPrefix(k, "Access-Control") {
284 continue
285 }
286 for _, v := range vv {
287 dst.Add(k, v)
288 }
289 }
290}
291
292// handler that takes care of static files and otherwise returns the index.html with the correct link card data
293func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
294 files, err := app.Files()
295 if err != nil {
296 return nil, err
297 }
298 fs := AppHostingFS{http.FS(files)}
299
300 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
301 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
302 f := strings.TrimPrefix(req.URL.Path, "/")
303 // under docs we need the index.html suffix due to astro rendering
304 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
305 f += "index.html"
306 }
307 _, err := fs.Open(f)
308 if err == nil {
309 fileHandler.ServeHTTP(w, req)
310 return
311 }
312 if errors.Is(err, ErrorIndex) || f == "" {
313 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
314 if err != nil {
315 log.Error(ctx, "error generating default card", "error", err)
316 }
317 w.Header().Set("Content-Type", "text/html")
318 if _, err := w.Write(bs); err != nil {
319 log.Error(ctx, "error writing response", "error", err)
320 }
321 } else {
322 log.Warn(ctx, "error opening file", "error", err)
323 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
324 }
325 })
326 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
327 proto := "http"
328 if req.TLS != nil {
329 proto = "https"
330 }
331 fwProto := req.Header.Get("x-forwarded-proto")
332 if fwProto != "" {
333 proto = fwProto
334 }
335 req.URL.Host = req.Host
336 req.URL.Scheme = proto
337 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
338 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
339 if err != nil || repo == nil {
340 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
341 defaultHandler.ServeHTTP(w, req)
342 return
343 }
344 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
345 if err != nil || ls == nil {
346 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
347 defaultHandler.ServeHTTP(w, req)
348 return
349 }
350 lsv, err := ls.ToLivestreamView()
351 if err != nil || lsv == nil {
352 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
353 defaultHandler.ServeHTTP(w, req)
354 return
355 }
356 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
357 if err != nil {
358 log.Error(ctx, "error generating html", "error", err)
359 defaultHandler.ServeHTTP(w, req)
360 return
361 }
362 w.Header().Set("Content-Type", "text/html")
363 if _, err := w.Write(bs); err != nil {
364 log.Error(ctx, "error writing response", "error", err)
365 }
366 }), nil
367}
368
369func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
370 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
371 if !a.CLI.HasMist() {
372 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
373 return
374 }
375 stream := params.ByName("stream")
376 if stream == "" {
377 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
378 return
379 }
380
381 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
382 prefix := fmt.Sprintf(tmpl, fullstream)
383 resource := params.ByName("resource")
384
385 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
386
387 client := &http.Client{}
388 req.URL = &url.URL{
389 Scheme: "http",
390 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
391 Path: fmt.Sprintf("%s%s", prefix, resource),
392 RawQuery: req.URL.RawQuery,
393 }
394
395 //http: Request.RequestURI can't be set in client requests.
396 //http://golang.org/src/pkg/net/http/client.go
397 req.RequestURI = ""
398
399 resp, err := client.Do(req)
400 if err != nil {
401 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
402 return
403 }
404 defer resp.Body.Close()
405
406 copyHeader(w.Header(), resp.Header)
407 w.WriteHeader(resp.StatusCode)
408 if _, err := io.Copy(w, resp.Body); err != nil {
409 log.Error(ctx, "error writing response", "error", err)
410 }
411 }
412}
413
414func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
415 return func(w http.ResponseWriter, req *http.Request) {
416 noslash := req.URL.Path[1:]
417 ct, ok := a.Mimes[noslash]
418 if ok {
419 w.Header().Set("content-type", ct)
420 }
421 fs.ServeHTTP(w, req)
422 }
423}
424
425func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
426 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr)
427 if err != nil {
428 return nil, err
429 }
430 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
431 host, _, err := net.SplitHostPort(req.Host)
432 if err != nil {
433 host = req.Host
434 }
435 u := req.URL
436 if tlsPort == "443" {
437 u.Host = host
438 } else {
439 u.Host = net.JoinHostPort(host, tlsPort)
440 }
441 u.Scheme = "https"
442 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
443 }
444 mux := http.NewServeMux()
445 mux.HandleFunc("/", handleRedirect)
446 return mux, nil
447}
448
449type NotificationPayload struct {
450 Token string `json:"token"`
451 RepoDID string `json:"repoDID"`
452}
453
454func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
455 return func(w http.ResponseWriter, req *http.Request) {
456 w.WriteHeader(404)
457 }
458}
459
460func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
461 return func(w http.ResponseWriter, req *http.Request) {
462 payload, err := io.ReadAll(req.Body)
463 if err != nil {
464 log.Log(ctx, "error reading notification create", "error", err)
465 w.WriteHeader(400)
466 return
467 }
468 n := NotificationPayload{}
469 err = json.Unmarshal(payload, &n)
470 if err != nil {
471 log.Log(ctx, "error unmarshalling notification create", "error", err)
472 w.WriteHeader(400)
473 return
474 }
475 err = a.Model.CreateNotification(n.Token, n.RepoDID)
476 if err != nil {
477 log.Log(ctx, "error creating notification", "error", err)
478 w.WriteHeader(400)
479 return
480 }
481 log.Log(ctx, "successfully created notification", "token", n.Token)
482 w.WriteHeader(200)
483 if n.RepoDID != "" {
484 go func() {
485 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
486 if err != nil {
487 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
488 }
489 }()
490 }
491 }
492}
493
494func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
495 return func(w http.ResponseWriter, req *http.Request) {
496 err := a.MediaManager.ValidateMP4(ctx, req.Body)
497 if err != nil {
498 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
499 return
500 }
501 w.WriteHeader(200)
502 }
503}
504
505func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
506 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
507 var event model.PlayerEventAPI
508 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
509 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
510 return
511 }
512 err := a.Model.CreatePlayerEvent(event)
513 if err != nil {
514 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
515 return
516 }
517 w.WriteHeader(201)
518 }
519}
520
521func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
522 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
523 segs, err := a.Model.MostRecentSegments()
524 if err != nil {
525 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
526 return
527 }
528 bs, err := json.Marshal(segs)
529 if err != nil {
530 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
531 return
532 }
533 w.Header().Add("Content-Type", "application/json")
534 if _, err := w.Write(bs); err != nil {
535 log.Error(ctx, "error writing response", "error", err)
536 }
537 }
538}
539
540func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
541 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
542 user := params.ByName("repoDID")
543 if user == "" {
544 apierrors.WriteHTTPBadRequest(w, "user required", nil)
545 return
546 }
547 user, err := a.NormalizeUser(ctx, user)
548 if err != nil {
549 apierrors.WriteHTTPNotFound(w, "user not found", err)
550 return
551 }
552 seg, err := a.Model.LatestSegmentForUser(user)
553 if err != nil {
554 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
555 return
556 }
557 streamplaceSeg, err := seg.ToStreamplaceSegment()
558 if err != nil {
559 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
560 return
561 }
562 bs, err := json.Marshal(streamplaceSeg)
563 if err != nil {
564 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
565 return
566 }
567 w.Header().Add("Content-Type", "application/json")
568 if _, err := w.Write(bs); err != nil {
569 log.Error(ctx, "error writing response", "error", err)
570 }
571 }
572}
573
574func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
575 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
576 user := params.ByName("user")
577 if user == "" {
578 apierrors.WriteHTTPBadRequest(w, "user required", nil)
579 return
580 }
581 user, err := a.NormalizeUser(ctx, user)
582 if err != nil {
583 apierrors.WriteHTTPNotFound(w, "user not found", err)
584 return
585 }
586 count := spmetrics.GetViewCount(user)
587 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
588 if err != nil {
589 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
590 return
591 }
592 if _, err := w.Write(bs); err != nil {
593 log.Error(ctx, "error writing response", "error", err)
594 }
595 }
596}
597
598func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
599 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
600 log.Log(ctx, "got bluesky notification", "params", params)
601 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
602 if err != nil {
603 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
604 return
605 }
606 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
607 if err != nil {
608 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
609 return
610 }
611 bs, err := json.Marshal(signingKeys)
612 if err != nil {
613 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
614 return
615 }
616 if _, err := w.Write(bs); err != nil {
617 log.Error(ctx, "error writing response", "error", err)
618 }
619 }
620}
621
622type ChatResponse struct {
623 Post *bsky.FeedPost `json:"post"`
624 Repo *model.Repo `json:"repo"`
625 CID string `json:"cid"`
626}
627
628func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
629 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
630 user := params.ByName("repoDID")
631 if user == "" {
632 apierrors.WriteHTTPBadRequest(w, "user required", nil)
633 return
634 }
635 repoDID, err := a.NormalizeUser(ctx, user)
636 if err != nil {
637 apierrors.WriteHTTPNotFound(w, "user not found", err)
638 return
639 }
640 replies, err := a.Model.GetReplies(repoDID)
641 if err != nil {
642 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
643 return
644 }
645 bs, err := json.Marshal(replies)
646 if err != nil {
647 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
648 return
649 }
650 w.Header().Set("Content-Type", "application/json")
651 if _, err := w.Write(bs); err != nil {
652 log.Error(ctx, "error writing response", "error", err)
653 }
654 }
655}
656
657func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
658 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
659 user := params.ByName("repoDID")
660 if user == "" {
661 apierrors.WriteHTTPBadRequest(w, "user required", nil)
662 return
663 }
664 repoDID, err := a.NormalizeUser(ctx, user)
665 if err != nil {
666 apierrors.WriteHTTPNotFound(w, "user not found", err)
667 return
668 }
669 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
670 if err != nil {
671 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
672 return
673 }
674
675 doc, err := livestream.ToLivestreamView()
676 if err != nil {
677 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
678 return
679 }
680
681 if livestream == nil {
682 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
683 return
684 }
685
686 bs, err := json.Marshal(doc)
687 if err != nil {
688 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
689 return
690 }
691 w.Header().Set("Content-Type", "application/json")
692 if _, err := w.Write(bs); err != nil {
693 log.Error(ctx, "error writing response", "error", err)
694 }
695 }
696}
697
698func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
699 return func(next http.Handler) http.Handler {
700 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
701 ip, _, err := net.SplitHostPort(req.RemoteAddr)
702 if err != nil {
703 ip = req.RemoteAddr
704 }
705
706 if a.CLI.RateLimitPerSecond > 0 {
707 limiter := a.getLimiter(ip)
708
709 if !limiter.Allow() {
710 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
711 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
712 return
713 }
714 }
715
716 next.ServeHTTP(w, req)
717 })
718 }
719}
720
721func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
722 handler, err := a.Handler(ctx)
723 if err != nil {
724 return err
725 }
726 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
727 s.Addr = a.CLI.HTTPAddr
728 log.Log(ctx, "http server starting", "addr", s.Addr)
729 return s.ListenAndServe()
730 })
731}
732
733func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
734 handler, err := a.RedirectHandler(ctx)
735 if err != nil {
736 return err
737 }
738 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
739 s.Addr = a.CLI.HTTPAddr
740 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
741 return s.ListenAndServe()
742 })
743}
744
745func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
746 handler, err := a.Handler(ctx)
747 if err != nil {
748 return err
749 }
750 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
751 s.Addr = a.CLI.HTTPSAddr
752 log.Log(ctx, "https server starting",
753 "addr", s.Addr,
754 "certPath", a.CLI.TLSCertPath,
755 "keyPath", a.CLI.TLSKeyPath,
756 )
757 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
758 })
759}
760
761func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
762 ctx, cancel := context.WithCancel(ctx)
763 handler = gziphandler.GzipHandler(handler)
764 server := http.Server{Handler: handler}
765 var serveErr error
766 go func() {
767 serveErr = serve(&server)
768 cancel()
769 }()
770 <-ctx.Done()
771 if serveErr != nil {
772 return fmt.Errorf("error in http server: %w", serveErr)
773 }
774
775 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
776 defer cancel()
777 return server.Shutdown(ctx)
778}
779
780func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
781 return func(w http.ResponseWriter, req *http.Request) {
782 w.WriteHeader(200)
783 }
784}
785
786func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
787 a.limitersMu.Lock()
788 defer a.limitersMu.Unlock()
789
790 limiter, exists := a.limiters[ip]
791 if !exists {
792 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
793 a.limiters[ip] = limiter
794 }
795
796 return limiter
797}
798
799func NewWebsocketTracker(maxConns int) *WebsocketTracker {
800 return &WebsocketTracker{
801 connections: make(map[string]int),
802 maxConnsPerIP: maxConns,
803 }
804}
805
806func (t *WebsocketTracker) AddConnection(ip string) bool {
807 t.mu.Lock()
808 defer t.mu.Unlock()
809
810 count := t.connections[ip]
811
812 if count >= t.maxConnsPerIP {
813 return false
814 }
815
816 t.connections[ip] = count + 1
817 return true
818}
819
820func (t *WebsocketTracker) RemoveConnection(ip string) {
821 t.mu.Lock()
822 defer t.mu.Unlock()
823
824 count := t.connections[ip]
825 if count > 0 {
826 t.connections[ip] = count - 1
827 }
828
829 if t.connections[ip] == 0 {
830 delete(t.connections, ip)
831 }
832}