Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/google/uuid"
22 "github.com/julienschmidt/httprouter"
23 "github.com/rs/cors"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/time/rate"
26
27 "github.com/streamplace/oatproxy/pkg/oatproxy"
28 "stream.place/streamplace/js/app"
29 "stream.place/streamplace/pkg/atproto"
30 "stream.place/streamplace/pkg/bus"
31 "stream.place/streamplace/pkg/config"
32 "stream.place/streamplace/pkg/crypto/signers/eip712"
33 "stream.place/streamplace/pkg/director"
34 apierrors "stream.place/streamplace/pkg/errors"
35 "stream.place/streamplace/pkg/linking"
36 "stream.place/streamplace/pkg/log"
37 "stream.place/streamplace/pkg/media"
38 "stream.place/streamplace/pkg/mist/mistconfig"
39 "stream.place/streamplace/pkg/model"
40 "stream.place/streamplace/pkg/notifications"
41 "stream.place/streamplace/pkg/spmetrics"
42 "stream.place/streamplace/pkg/spxrpc"
43 "stream.place/streamplace/pkg/streamplace"
44
45 metrics "github.com/slok/go-http-metrics/metrics/prometheus"
46 "github.com/slok/go-http-metrics/middleware"
47 echomiddleware "github.com/slok/go-http-metrics/middleware/echo"
48 httproutermiddleware "github.com/slok/go-http-metrics/middleware/httprouter"
49 middlewarestd "github.com/slok/go-http-metrics/middleware/std"
50)
51
52type StreamplaceAPI struct {
53 CLI *config.CLI
54 Model model.Model
55 Updater *Updater
56 Signer *eip712.EIP712Signer
57 Mimes map[string]string
58 FirebaseNotifier notifications.FirebaseNotifier
59 MediaManager *media.MediaManager
60 MediaSigner media.MediaSigner
61 // not thread-safe yet
62 Aliases map[string]string
63 Bus *bus.Bus
64 ATSync *atproto.ATProtoSynchronizer
65 Director *director.Director
66
67 connTracker *WebsocketTracker
68
69 limiters map[string]*rate.Limiter
70 limitersMu sync.Mutex
71 SignerCache map[string]media.MediaSigner
72 SignerCacheMu sync.Mutex
73 op *oatproxy.OATProxy
74}
75
76type WebsocketTracker struct {
77 connections map[string]int
78 maxConnsPerIP int
79 mu sync.RWMutex
80}
81
82func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
83 updater, err := PrepareUpdater(cli)
84 if err != nil {
85 return nil, err
86 }
87 a := &StreamplaceAPI{CLI: cli,
88 Model: mod,
89 Updater: updater,
90 Signer: signer,
91 FirebaseNotifier: noter,
92 MediaManager: mm,
93 MediaSigner: ms,
94 Aliases: map[string]string{},
95 Bus: bus,
96 ATSync: atsync,
97 Director: d,
98 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
99 limiters: make(map[string]*rate.Limiter),
100 SignerCache: make(map[string]media.MediaSigner),
101 op: op,
102 }
103 a.Mimes, err = updater.GetMimes()
104 if err != nil {
105 return nil, err
106 }
107 return a, nil
108}
109
110type AppHostingFS struct {
111 http.FileSystem
112}
113
114var ErrorIndex = errors.New("not found, use index.html")
115
116func (fs AppHostingFS) Open(name string) (http.File, error) {
117 file, err1 := fs.FileSystem.Open(name)
118 if err1 == nil {
119 return file, nil
120 }
121 if !errors.Is(err1, os.ErrNotExist) {
122 return nil, err1
123 }
124
125 return nil, ErrorIndex
126}
127
128// api/playback/iame.li/webrtc?rendition=source
129// api/playback/iame.li/stream.mp4?rendition=source
130// api/playback/iame.li/stream.webm?rendition=source
131// api/playback/iame.li/hls/index.m3u8
132// api/playback/iame.li/hls/source/stream.m3u8
133// api/playback/iame.li/hls/source/000000000000.ts
134
135func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
136
137 mdlw := middleware.New(middleware.Config{
138 Recorder: metrics.NewRecorder(metrics.Config{}),
139 })
140 var xrpc http.Handler
141 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op, mdlw)
142 if err != nil {
143 return nil, err
144 }
145 router := httprouter.New()
146
147 // Create our middleware factory with the default settings.
148
149 a.op.Echo.Use(echomiddleware.Handler("", mdlw))
150
151 // r.GET("/test/:id", httproutermiddleware.Handler("/test/:id", h1, mdlw))
152
153 addHandle := func(router *httprouter.Router, method, path string, handler httprouter.Handle) {
154 router.Handle(method, path, httproutermiddleware.Handler(path, handler, mdlw))
155 }
156 addFunc := func(router *httprouter.Router, method, path string, handler http.HandlerFunc) {
157 router.Handler(method, path, middlewarestd.Handler(path, mdlw, handler))
158 }
159
160 router.Handler("GET", "/oauth/*anything", a.op.Handler())
161 router.Handler("POST", "/oauth/*anything", a.op.Handler())
162 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
163 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
164 router.Handler("GET", "/.well-known/apple-app-site-association", a.HandleAppleAppSiteAssociation(ctx))
165 router.Handler("GET", "/.well-known/assetlinks.json", a.HandleAndroidAssetLinks(ctx))
166 apiRouter := httprouter.New()
167 addFunc(apiRouter, "POST", "/api/notification", a.HandleNotification(ctx))
168 // old clients
169 addFunc(router, "GET", "/app-updates", a.HandleAppUpdates(ctx))
170 // new ones
171 addFunc(apiRouter, "GET", "/api/manifest", a.HandleAppUpdates(ctx))
172 addHandle(apiRouter, "GET", "/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
173 addHandle(apiRouter, "POST", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
174 addHandle(apiRouter, "OPTIONS", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
175 addHandle(apiRouter, "DELETE", "/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
176 addFunc(apiRouter, "POST", "/api/segment", a.HandleSegment(ctx))
177 addFunc(apiRouter, "GET", "/api/healthz", a.HandleHealthz(ctx))
178 addHandle(apiRouter, "GET", "/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
179 addHandle(apiRouter, "GET", "/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
180 addHandle(apiRouter, "GET", "/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
181 // they're jpegs now
182 addHandle(apiRouter, "GET", "/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
183 // this one is actually a jpeg (used previously and shouldn't remove for historical reasons)
184 addHandle(apiRouter, "GET", "/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
185 addHandle(apiRouter, "GET", "/api/app-return/*anything", a.HandleAppReturn(ctx))
186 addHandle(apiRouter, "POST", "/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
187 addHandle(apiRouter, "POST", "/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
188 addHandle(apiRouter, "POST", "/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
189 addHandle(apiRouter, "POST", "/api/player-event", a.HandlePlayerEvent(ctx))
190 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx))
191 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx))
192 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx))
193 addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx))
194 addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
195 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
196 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx))
197 apiRouter.NotFound = a.HandleAPI404(ctx)
198 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
199 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
200 router.Handler("GET", "/api/*resource", apiRouterHandler)
201 router.Handler("POST", "/api/*resource", apiRouterHandler)
202 router.Handler("PUT", "/api/*resource", apiRouterHandler)
203 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
204 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
205 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
206 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
207 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
208 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
209 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
210 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
211 router.GET("/.well-known/atproto-did", a.HandleAtprotoDID(ctx))
212 router.GET("/dl/*params", a.HandleAppDownload(ctx))
213 router.POST("/", a.HandleWebRTCIngest(ctx))
214 for _, redirect := range a.CLI.Redirects {
215 parts := strings.Split(redirect, ":")
216 if len(parts) != 2 {
217 log.Error(ctx, "invalid redirect", "redirect", redirect)
218 return nil, fmt.Errorf("invalid redirect: %s", redirect)
219 }
220 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
221 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
222 })
223 }
224 if a.CLI.FrontendProxy != "" {
225 u, err := url.Parse(a.CLI.FrontendProxy)
226 if err != nil {
227 return nil, err
228 }
229 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
230 router.NotFound = &httputil.ReverseProxy{
231 Rewrite: func(r *httputil.ProxyRequest) {
232 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
233 // we need to use 127.0.0.1 because the atproto oauth client requires it
234 r.Out.Header.Set("Origin", u.String())
235 r.SetURL(u)
236 },
237 }
238 } else {
239 files, err := app.Files()
240 if err != nil {
241 return nil, err
242 }
243 index, err := files.Open("index.html")
244 if err != nil {
245 return nil, err
246 }
247 bs, err := io.ReadAll(index)
248 if err != nil {
249 return nil, err
250 }
251 linker, err := linking.NewLinker(ctx, bs)
252 if err != nil {
253 return nil, err
254 }
255 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
256 if err != nil {
257 return nil, err
258 }
259 router.NotFound = middlewarestd.Handler("/*static", mdlw, linkingHandler)
260 }
261 // needed because the WebRTC handler issues 405s from / otherwise
262 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
263 router.NotFound.ServeHTTP(w, r)
264 })
265 handler := sloghttp.Recovery(router)
266 handler = cors.AllowAll().Handler(handler)
267 handler = sloghttp.New(slog.Default())(handler)
268 handler = a.RateLimitMiddleware(ctx)(handler)
269
270 // this needs to be LAST so nothing else clobbers the context
271 handler = a.ContextMiddleware(ctx)(handler)
272
273 return handler, nil
274}
275func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler {
276 return func(next http.Handler) http.Handler {
277 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
278 uuid := uuid.New().String()
279 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
280 r = r.WithContext(ctx)
281 next.ServeHTTP(w, r)
282 })
283 }
284}
285func copyHeader(dst, src http.Header) {
286 for k, vv := range src {
287 // we'll handle CORS ourselves, thanks
288 if strings.HasPrefix(k, "Access-Control") {
289 continue
290 }
291 for _, v := range vv {
292 dst.Add(k, v)
293 }
294 }
295}
296
297// handler that takes care of static files and otherwise returns the index.html with the correct link card data
298func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
299 files, err := app.Files()
300 if err != nil {
301 return nil, err
302 }
303 fs := AppHostingFS{http.FS(files)}
304
305 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
306 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
307 f := strings.TrimPrefix(req.URL.Path, "/")
308 // under docs we need the index.html suffix due to astro rendering
309 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
310 f += "index.html"
311 }
312 _, err := fs.Open(f)
313 if err == nil {
314 fileHandler.ServeHTTP(w, req)
315 return
316 }
317 if errors.Is(err, ErrorIndex) || f == "" {
318 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
319 if err != nil {
320 log.Error(ctx, "error generating default card", "error", err)
321 }
322 w.Header().Set("Content-Type", "text/html")
323 if _, err := w.Write(bs); err != nil {
324 log.Error(ctx, "error writing response", "error", err)
325 }
326 } else {
327 log.Warn(ctx, "error opening file", "error", err)
328 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
329 }
330 })
331 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
332 proto := "http"
333 if req.TLS != nil {
334 proto = "https"
335 }
336 fwProto := req.Header.Get("x-forwarded-proto")
337 if fwProto != "" {
338 proto = fwProto
339 }
340 req.URL.Host = req.Host
341 req.URL.Scheme = proto
342 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
343 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
344 if err != nil || repo == nil {
345 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
346 defaultHandler.ServeHTTP(w, req)
347 return
348 }
349 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
350 if err != nil || ls == nil {
351 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
352 defaultHandler.ServeHTTP(w, req)
353 return
354 }
355 lsv, err := ls.ToLivestreamView()
356 if err != nil || lsv == nil {
357 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
358 defaultHandler.ServeHTTP(w, req)
359 return
360 }
361 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
362 if err != nil {
363 log.Error(ctx, "error generating html", "error", err)
364 defaultHandler.ServeHTTP(w, req)
365 return
366 }
367 w.Header().Set("Content-Type", "text/html")
368 if _, err := w.Write(bs); err != nil {
369 log.Error(ctx, "error writing response", "error", err)
370 }
371 }), nil
372}
373
374func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
375 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
376 if !a.CLI.HasMist() {
377 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
378 return
379 }
380 stream := params.ByName("stream")
381 if stream == "" {
382 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
383 return
384 }
385
386 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
387 prefix := fmt.Sprintf(tmpl, fullstream)
388 resource := params.ByName("resource")
389
390 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
391
392 client := &http.Client{}
393 req.URL = &url.URL{
394 Scheme: "http",
395 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
396 Path: fmt.Sprintf("%s%s", prefix, resource),
397 RawQuery: req.URL.RawQuery,
398 }
399
400 //http: Request.RequestURI can't be set in client requests.
401 //http://golang.org/src/pkg/net/http/client.go
402 req.RequestURI = ""
403
404 resp, err := client.Do(req)
405 if err != nil {
406 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
407 return
408 }
409 defer resp.Body.Close()
410
411 copyHeader(w.Header(), resp.Header)
412 w.WriteHeader(resp.StatusCode)
413 if _, err := io.Copy(w, resp.Body); err != nil {
414 log.Error(ctx, "error writing response", "error", err)
415 }
416 }
417}
418
419func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
420 return func(w http.ResponseWriter, req *http.Request) {
421 noslash := req.URL.Path[1:]
422 ct, ok := a.Mimes[noslash]
423 if ok {
424 w.Header().Set("content-type", ct)
425 }
426 fs.ServeHTTP(w, req)
427 }
428}
429
430func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
431 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr)
432 if err != nil {
433 return nil, err
434 }
435 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
436 host, _, err := net.SplitHostPort(req.Host)
437 if err != nil {
438 host = req.Host
439 }
440 u := req.URL
441 if tlsPort == "443" {
442 u.Host = host
443 } else {
444 u.Host = net.JoinHostPort(host, tlsPort)
445 }
446 u.Scheme = "https"
447 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
448 }
449 mux := http.NewServeMux()
450 mux.HandleFunc("/", handleRedirect)
451 return mux, nil
452}
453
454type NotificationPayload struct {
455 Token string `json:"token"`
456 RepoDID string `json:"repoDID"`
457}
458
459func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
460 return func(w http.ResponseWriter, req *http.Request) {
461 w.WriteHeader(404)
462 }
463}
464
465func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
466 return func(w http.ResponseWriter, req *http.Request) {
467 payload, err := io.ReadAll(req.Body)
468 if err != nil {
469 log.Log(ctx, "error reading notification create", "error", err)
470 w.WriteHeader(400)
471 return
472 }
473 n := NotificationPayload{}
474 err = json.Unmarshal(payload, &n)
475 if err != nil {
476 log.Log(ctx, "error unmarshalling notification create", "error", err)
477 w.WriteHeader(400)
478 return
479 }
480 err = a.Model.CreateNotification(n.Token, n.RepoDID)
481 if err != nil {
482 log.Log(ctx, "error creating notification", "error", err)
483 w.WriteHeader(400)
484 return
485 }
486 log.Log(ctx, "successfully created notification", "token", n.Token)
487 w.WriteHeader(200)
488 if n.RepoDID != "" {
489 go func() {
490 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
491 if err != nil {
492 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
493 }
494 }()
495 }
496 }
497}
498
499func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
500 return func(w http.ResponseWriter, req *http.Request) {
501 err := a.MediaManager.ValidateMP4(ctx, req.Body)
502 if err != nil {
503 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
504 return
505 }
506 w.WriteHeader(200)
507 }
508}
509
510func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
511 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
512 var event model.PlayerEventAPI
513 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
514 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
515 return
516 }
517 err := a.Model.CreatePlayerEvent(event)
518 if err != nil {
519 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
520 return
521 }
522 w.WriteHeader(201)
523 }
524}
525
526func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
527 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
528 segs, err := a.Model.MostRecentSegments()
529 if err != nil {
530 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
531 return
532 }
533 bs, err := json.Marshal(segs)
534 if err != nil {
535 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
536 return
537 }
538 w.Header().Add("Content-Type", "application/json")
539 if _, err := w.Write(bs); err != nil {
540 log.Error(ctx, "error writing response", "error", err)
541 }
542 }
543}
544
545func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
546 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
547 user := params.ByName("repoDID")
548 if user == "" {
549 apierrors.WriteHTTPBadRequest(w, "user required", nil)
550 return
551 }
552 user, err := a.NormalizeUser(ctx, user)
553 if err != nil {
554 apierrors.WriteHTTPNotFound(w, "user not found", err)
555 return
556 }
557 seg, err := a.Model.LatestSegmentForUser(user)
558 if err != nil {
559 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
560 return
561 }
562 streamplaceSeg, err := seg.ToStreamplaceSegment()
563 if err != nil {
564 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
565 return
566 }
567 bs, err := json.Marshal(streamplaceSeg)
568 if err != nil {
569 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
570 return
571 }
572 w.Header().Add("Content-Type", "application/json")
573 if _, err := w.Write(bs); err != nil {
574 log.Error(ctx, "error writing response", "error", err)
575 }
576 }
577}
578
579func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
580 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
581 user := params.ByName("user")
582 if user == "" {
583 apierrors.WriteHTTPBadRequest(w, "user required", nil)
584 return
585 }
586 user, err := a.NormalizeUser(ctx, user)
587 if err != nil {
588 apierrors.WriteHTTPNotFound(w, "user not found", err)
589 return
590 }
591 count := spmetrics.GetViewCount(user)
592 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
593 if err != nil {
594 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
595 return
596 }
597 if _, err := w.Write(bs); err != nil {
598 log.Error(ctx, "error writing response", "error", err)
599 }
600 }
601}
602
603func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
604 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
605 log.Log(ctx, "got bluesky notification", "params", params)
606 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
607 if err != nil {
608 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
609 return
610 }
611 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
612 if err != nil {
613 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
614 return
615 }
616 bs, err := json.Marshal(signingKeys)
617 if err != nil {
618 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
619 return
620 }
621 if _, err := w.Write(bs); err != nil {
622 log.Error(ctx, "error writing response", "error", err)
623 }
624 }
625}
626
627type ChatResponse struct {
628 Post *bsky.FeedPost `json:"post"`
629 Repo *model.Repo `json:"repo"`
630 CID string `json:"cid"`
631}
632
633func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
634 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
635 user := params.ByName("repoDID")
636 if user == "" {
637 apierrors.WriteHTTPBadRequest(w, "user required", nil)
638 return
639 }
640 repoDID, err := a.NormalizeUser(ctx, user)
641 if err != nil {
642 apierrors.WriteHTTPNotFound(w, "user not found", err)
643 return
644 }
645 replies, err := a.Model.GetReplies(repoDID)
646 if err != nil {
647 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
648 return
649 }
650 bs, err := json.Marshal(replies)
651 if err != nil {
652 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
653 return
654 }
655 w.Header().Set("Content-Type", "application/json")
656 if _, err := w.Write(bs); err != nil {
657 log.Error(ctx, "error writing response", "error", err)
658 }
659 }
660}
661
662func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
663 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
664 user := params.ByName("repoDID")
665 if user == "" {
666 apierrors.WriteHTTPBadRequest(w, "user required", nil)
667 return
668 }
669 repoDID, err := a.NormalizeUser(ctx, user)
670 if err != nil {
671 apierrors.WriteHTTPNotFound(w, "user not found", err)
672 return
673 }
674 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
675 if err != nil {
676 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
677 return
678 }
679
680 doc, err := livestream.ToLivestreamView()
681 if err != nil {
682 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
683 return
684 }
685
686 if livestream == nil {
687 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
688 return
689 }
690
691 bs, err := json.Marshal(doc)
692 if err != nil {
693 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
694 return
695 }
696 w.Header().Set("Content-Type", "application/json")
697 if _, err := w.Write(bs); err != nil {
698 log.Error(ctx, "error writing response", "error", err)
699 }
700 }
701}
702
703func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
704 return func(next http.Handler) http.Handler {
705 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
706 ip, _, err := net.SplitHostPort(req.RemoteAddr)
707 if err != nil {
708 ip = req.RemoteAddr
709 }
710
711 if a.CLI.RateLimitPerSecond > 0 {
712 limiter := a.getLimiter(ip)
713
714 if !limiter.Allow() {
715 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
716 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
717 return
718 }
719 }
720
721 next.ServeHTTP(w, req)
722 })
723 }
724}
725
726func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
727 handler, err := a.Handler(ctx)
728 if err != nil {
729 return err
730 }
731 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
732 s.Addr = a.CLI.HTTPAddr
733 log.Log(ctx, "http server starting", "addr", s.Addr)
734 return s.ListenAndServe()
735 })
736}
737
738func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
739 handler, err := a.RedirectHandler(ctx)
740 if err != nil {
741 return err
742 }
743 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
744 s.Addr = a.CLI.HTTPAddr
745 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
746 return s.ListenAndServe()
747 })
748}
749
750func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
751 handler, err := a.Handler(ctx)
752 if err != nil {
753 return err
754 }
755 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
756 s.Addr = a.CLI.HTTPSAddr
757 log.Log(ctx, "https server starting",
758 "addr", s.Addr,
759 "certPath", a.CLI.TLSCertPath,
760 "keyPath", a.CLI.TLSKeyPath,
761 )
762 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
763 })
764}
765
766func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
767 ctx, cancel := context.WithCancel(ctx)
768 handler = gziphandler.GzipHandler(handler)
769 server := http.Server{Handler: handler}
770 var serveErr error
771 go func() {
772 serveErr = serve(&server)
773 cancel()
774 }()
775 <-ctx.Done()
776 if serveErr != nil {
777 return fmt.Errorf("error in http server: %w", serveErr)
778 }
779
780 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
781 defer cancel()
782 return server.Shutdown(ctx)
783}
784
785func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
786 return func(w http.ResponseWriter, req *http.Request) {
787 w.WriteHeader(200)
788 }
789}
790
791func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
792 a.limitersMu.Lock()
793 defer a.limitersMu.Unlock()
794
795 limiter, exists := a.limiters[ip]
796 if !exists {
797 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
798 a.limiters[ip] = limiter
799 }
800
801 return limiter
802}
803
804func NewWebsocketTracker(maxConns int) *WebsocketTracker {
805 return &WebsocketTracker{
806 connections: make(map[string]int),
807 maxConnsPerIP: maxConns,
808 }
809}
810
811func (t *WebsocketTracker) AddConnection(ip string) bool {
812 t.mu.Lock()
813 defer t.mu.Unlock()
814
815 count := t.connections[ip]
816
817 if count >= t.maxConnsPerIP {
818 return false
819 }
820
821 t.connections[ip] = count + 1
822 return true
823}
824
825func (t *WebsocketTracker) RemoveConnection(ip string) {
826 t.mu.Lock()
827 defer t.mu.Unlock()
828
829 count := t.connections[ip]
830 if count > 0 {
831 t.connections[ip] = count - 1
832 }
833
834 if t.connections[ip] == 0 {
835 delete(t.connections, ip)
836 }
837}