Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "slices"
16 "strings"
17 "sync"
18 "time"
19
20 "github.com/NYTimes/gziphandler"
21 "github.com/bluesky-social/indigo/api/bsky"
22 "github.com/julienschmidt/httprouter"
23 "github.com/rs/cors"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/time/rate"
26
27 "stream.place/streamplace/js/app"
28 "stream.place/streamplace/pkg/atproto"
29 "stream.place/streamplace/pkg/bus"
30 "stream.place/streamplace/pkg/config"
31 "stream.place/streamplace/pkg/crypto/signers/eip712"
32 "stream.place/streamplace/pkg/director"
33 apierrors "stream.place/streamplace/pkg/errors"
34 "stream.place/streamplace/pkg/linking"
35 "stream.place/streamplace/pkg/log"
36 "stream.place/streamplace/pkg/media"
37 "stream.place/streamplace/pkg/mist/mistconfig"
38 "stream.place/streamplace/pkg/model"
39 "stream.place/streamplace/pkg/notifications"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/spxrpc"
42 "stream.place/streamplace/pkg/streamplace"
43)
44
45type StreamplaceAPI struct {
46 CLI *config.CLI
47 Model model.Model
48 Updater *Updater
49 Signer *eip712.EIP712Signer
50 Mimes map[string]string
51 FirebaseNotifier notifications.FirebaseNotifier
52 MediaManager *media.MediaManager
53 MediaSigner media.MediaSigner
54 // not thread-safe yet
55 Aliases map[string]string
56 Bus *bus.Bus
57 ATSync *atproto.ATProtoSynchronizer
58 Director *director.Director
59
60 connTracker *WebsocketTracker
61
62 limiters map[string]*rate.Limiter
63 limitersMu sync.Mutex
64}
65
66type WebsocketTracker struct {
67 connections map[string]int
68 maxConnsPerIP int
69 mu sync.RWMutex
70}
71
72func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director) (*StreamplaceAPI, error) {
73 updater, err := PrepareUpdater(cli)
74 if err != nil {
75 return nil, err
76 }
77 a := &StreamplaceAPI{CLI: cli,
78 Model: mod,
79 Updater: updater,
80 Signer: signer,
81 FirebaseNotifier: noter,
82 MediaManager: mm,
83 MediaSigner: ms,
84 Aliases: map[string]string{},
85 Bus: bus,
86 ATSync: atsync,
87 Director: d,
88 connTracker: NewWebsocketTracker(5),
89 limiters: make(map[string]*rate.Limiter),
90 }
91 a.Mimes, err = updater.GetMimes()
92 if err != nil {
93 return nil, err
94 }
95 return a, nil
96}
97
98type AppHostingFS struct {
99 http.FileSystem
100}
101
102var ErrorIndex = errors.New("not found, use index.html")
103
104func (fs AppHostingFS) Open(name string) (http.File, error) {
105 file, err1 := fs.FileSystem.Open(name)
106 if err1 == nil {
107 return file, nil
108 }
109 if !errors.Is(err1, os.ErrNotExist) {
110 return nil, err1
111 }
112
113 return nil, ErrorIndex
114}
115
116// api/playback/iame.li/webrtc?rendition=source
117// api/playback/iame.li/stream.mp4?rendition=source
118// api/playback/iame.li/stream.webm?rendition=source
119// api/playback/iame.li/hls/index.m3u8
120// api/playback/iame.li/hls/source/stream.m3u8
121// api/playback/iame.li/hls/source/000000000000.ts
122
123func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
124 xrpc, err := spxrpc.NewServer(a.CLI, a.Model)
125 if err != nil {
126 return nil, err
127 }
128 router := httprouter.New()
129 apiRouter := httprouter.New()
130 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
131 // old clients
132 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
133 // new ones
134 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
135 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
136 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
137 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
138 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
139 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
140 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
141 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
142 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
143 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
144 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
145 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
146 // this one is not a lie
147 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
148 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
149 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
150 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
151 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
152 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
153 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
154 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
155 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
156 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
157 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
158 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
159 for _, platform := range atproto.AllowedPlatforms {
160 apiRouter.GET(fmt.Sprintf("/api/atproto-oauth/%s", platform), a.HandleATProtoOAuth(ctx, platform))
161 }
162 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx))
163 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
164 apiRouter.NotFound = a.HandleAPI404(ctx)
165 router.Handler("GET", "/api/*resource", apiRouter)
166 router.Handler("POST", "/api/*resource", apiRouter)
167 router.Handler("PUT", "/api/*resource", apiRouter)
168 router.Handler("PATCH", "/api/*resource", apiRouter)
169 router.Handler("DELETE", "/api/*resource", apiRouter)
170 router.Handler("GET", "/xrpc/*resource", xrpc)
171 router.Handler("POST", "/xrpc/*resource", xrpc)
172 router.Handler("PUT", "/xrpc/*resource", xrpc)
173 router.Handler("PATCH", "/xrpc/*resource", xrpc)
174 router.Handler("DELETE", "/xrpc/*resource", xrpc)
175 router.GET("/.well-known/did.json", a.HandleDidJson(ctx))
176 router.GET("/dl/*params", a.HandleAppDownload(ctx))
177 router.POST("/", a.HandleWebRTCIngest(ctx))
178 for _, redirect := range a.CLI.Redirects {
179 parts := strings.Split(redirect, ":")
180 if len(parts) != 2 {
181 log.Error(ctx, "invalid redirect", "redirect", redirect)
182 return nil, fmt.Errorf("invalid redirect: %s", redirect)
183 }
184 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
185 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
186 })
187 }
188 if a.CLI.FrontendProxy != "" {
189 u, err := url.Parse(a.CLI.FrontendProxy)
190 if err != nil {
191 return nil, err
192 }
193 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
194 router.NotFound = &httputil.ReverseProxy{
195 Rewrite: func(r *httputil.ProxyRequest) {
196 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
197 // we need to use 127.0.0.1 because the atproto oauth client requires it
198 r.Out.Header.Set("Origin", u.String())
199 r.SetURL(u)
200 },
201 }
202 } else {
203 files, err := app.Files()
204 if err != nil {
205 return nil, err
206 }
207 index, err := files.Open("index.html")
208 if err != nil {
209 return nil, err
210 }
211 bs, err := io.ReadAll(index)
212 if err != nil {
213 return nil, err
214 }
215 linker, err := linking.NewLinker(ctx, bs)
216 if err != nil {
217 return nil, err
218 }
219 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
220 if err != nil {
221 return nil, err
222 }
223 router.NotFound = linkingHandler
224 }
225 // needed because the WebRTC handler issues 405s from / otherwise
226 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
227 router.NotFound.ServeHTTP(w, r)
228 })
229 handler := sloghttp.Recovery(router)
230 handler = cors.AllowAll().Handler(handler)
231 handler = sloghttp.New(slog.Default())(handler)
232 handler = a.RateLimitMiddleware(ctx)(handler)
233
234 return handler, nil
235}
236
237func copyHeader(dst, src http.Header) {
238 for k, vv := range src {
239 // we'll handle CORS ourselves, thanks
240 if strings.HasPrefix(k, "Access-Control") {
241 continue
242 }
243 for _, v := range vv {
244 dst.Add(k, v)
245 }
246 }
247}
248
249// handler that takes care of static files and otherwise returns the index.html with the correct link card data
250func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
251 files, err := app.Files()
252 if err != nil {
253 return nil, err
254 }
255 fs := AppHostingFS{http.FS(files)}
256
257 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
258 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
259 f := strings.TrimPrefix(req.URL.Path, "/")
260 // under docs we need the index.html suffix due to astro rendering
261 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
262 f += "index.html"
263 }
264 _, err := fs.Open(f)
265 if err == nil {
266 fileHandler.ServeHTTP(w, req)
267 return
268 }
269 if errors.Is(err, ErrorIndex) || f == "" {
270 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
271 if err != nil {
272 log.Error(ctx, "error generating default card", "error", err)
273 }
274 w.Header().Set("Content-Type", "text/html")
275 w.Write(bs)
276 } else {
277 log.Warn(ctx, "error opening file", "error", err)
278 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
279 }
280 })
281 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
282 proto := "http"
283 if req.TLS != nil {
284 proto = "https"
285 }
286 fwProto := req.Header.Get("x-forwarded-proto")
287 if fwProto != "" {
288 proto = fwProto
289 }
290 req.URL.Host = req.Host
291 req.URL.Scheme = proto
292 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
293 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
294 if err != nil || repo == nil {
295 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
296 defaultHandler.ServeHTTP(w, req)
297 return
298 }
299 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
300 if err != nil || ls == nil {
301 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
302 defaultHandler.ServeHTTP(w, req)
303 return
304 }
305 lsv, err := ls.ToLivestreamView()
306 if err != nil || lsv == nil {
307 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
308 defaultHandler.ServeHTTP(w, req)
309 return
310 }
311 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
312 if err != nil {
313 log.Error(ctx, "error generating html", "error", err)
314 defaultHandler.ServeHTTP(w, req)
315 return
316 }
317 w.Header().Set("Content-Type", "text/html")
318 w.Write(bs)
319 }), nil
320}
321
322func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
323 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
324 if !a.CLI.HasMist() {
325 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
326 return
327 }
328 stream := params.ByName("stream")
329 if stream == "" {
330 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
331 return
332 }
333
334 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream)
335 prefix := fmt.Sprintf(tmpl, fullstream)
336 resource := params.ByName("resource")
337
338 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
339
340 client := &http.Client{}
341 req.URL = &url.URL{
342 Scheme: "http",
343 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
344 Path: fmt.Sprintf("%s%s", prefix, resource),
345 RawQuery: req.URL.RawQuery,
346 }
347
348 //http: Request.RequestURI can't be set in client requests.
349 //http://golang.org/src/pkg/net/http/client.go
350 req.RequestURI = ""
351
352 resp, err := client.Do(req)
353 if err != nil {
354 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
355 return
356 }
357 defer resp.Body.Close()
358
359 copyHeader(w.Header(), resp.Header)
360 w.WriteHeader(resp.StatusCode)
361 io.Copy(w, resp.Body)
362 }
363}
364
365func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
366 return func(w http.ResponseWriter, req *http.Request) {
367 noslash := req.URL.Path[1:]
368 ct, ok := a.Mimes[noslash]
369 if ok {
370 w.Header().Set("content-type", ct)
371 }
372 fs.ServeHTTP(w, req)
373 }
374}
375
376func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
377 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr)
378 if err != nil {
379 return nil, err
380 }
381 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
382 host, _, err := net.SplitHostPort(req.Host)
383 if err != nil {
384 host = req.Host
385 }
386 u := req.URL
387 if tlsPort == "443" {
388 u.Host = host
389 } else {
390 u.Host = net.JoinHostPort(host, tlsPort)
391 }
392 u.Scheme = "https"
393 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
394 }
395 mux := http.NewServeMux()
396 mux.HandleFunc("/", handleRedirect)
397 return mux, nil
398}
399
400type NotificationPayload struct {
401 Token string `json:"token"`
402 RepoDID string `json:"repoDID"`
403}
404
405func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
406 return func(w http.ResponseWriter, req *http.Request) {
407 w.WriteHeader(404)
408 }
409}
410
411func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
412 return func(w http.ResponseWriter, req *http.Request) {
413 payload, err := io.ReadAll(req.Body)
414 if err != nil {
415 log.Log(ctx, "error reading notification create", "error", err)
416 w.WriteHeader(400)
417 return
418 }
419 n := NotificationPayload{}
420 err = json.Unmarshal(payload, &n)
421 if err != nil {
422 log.Log(ctx, "error unmarshalling notification create", "error", err)
423 w.WriteHeader(400)
424 return
425 }
426 err = a.Model.CreateNotification(n.Token, n.RepoDID)
427 if err != nil {
428 log.Log(ctx, "error creating notification", "error", err)
429 w.WriteHeader(400)
430 return
431 }
432 log.Log(ctx, "successfully created notification", "token", n.Token)
433 w.WriteHeader(200)
434 if n.RepoDID != "" {
435 go func() {
436 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
437 if err != nil {
438 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
439 }
440 }()
441 }
442 }
443}
444
445func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
446 return func(w http.ResponseWriter, req *http.Request) {
447 err := a.MediaManager.ValidateMP4(ctx, req.Body)
448 if err != nil {
449 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
450 return
451 }
452 w.WriteHeader(200)
453 }
454}
455
456func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
457 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
458 var event model.PlayerEventAPI
459 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
460 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
461 return
462 }
463 err := a.Model.CreatePlayerEvent(event)
464 if err != nil {
465 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
466 return
467 }
468 w.WriteHeader(201)
469 }
470}
471
472func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
473 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
474 segs, err := a.Model.MostRecentSegments()
475 if err != nil {
476 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
477 return
478 }
479 bs, err := json.Marshal(segs)
480 if err != nil {
481 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
482 return
483 }
484 w.Header().Add("Content-Type", "application/json")
485 w.Write(bs)
486 }
487}
488
489func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
490 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
491 user := params.ByName("repoDID")
492 if user == "" {
493 apierrors.WriteHTTPBadRequest(w, "user required", nil)
494 return
495 }
496 user, err := a.NormalizeUser(ctx, user)
497 if err != nil {
498 apierrors.WriteHTTPNotFound(w, "user not found", err)
499 return
500 }
501 seg, err := a.Model.LatestSegmentForUser(user)
502 if err != nil {
503 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
504 return
505 }
506 streamplaceSeg, err := seg.ToStreamplaceSegment()
507 if err != nil {
508 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
509 return
510 }
511 bs, err := json.Marshal(streamplaceSeg)
512 if err != nil {
513 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
514 return
515 }
516 w.Header().Add("Content-Type", "application/json")
517 w.Write(bs)
518 }
519}
520
521type LiveUsersResponse struct {
522 model.Segment
523 Viewers int `json:"viewers"`
524}
525
526func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle {
527 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
528 repos, err := a.Model.MostRecentSegments()
529 if err != nil {
530 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err)
531 return
532 }
533 liveUsers := []LiveUsersResponse{}
534 for _, repo := range repos {
535 viewers := spmetrics.GetViewCount(repo.RepoDID)
536 liveUsers = append(liveUsers, LiveUsersResponse{
537 Segment: repo,
538 Viewers: viewers,
539 })
540 }
541 bs, err := json.Marshal(liveUsers)
542 if err != nil {
543 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err)
544 return
545 }
546 w.Write(bs)
547 }
548}
549
550func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
551 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
552 user := params.ByName("user")
553 if user == "" {
554 apierrors.WriteHTTPBadRequest(w, "user required", nil)
555 return
556 }
557 user, err := a.NormalizeUser(ctx, user)
558 if err != nil {
559 apierrors.WriteHTTPNotFound(w, "user not found", err)
560 return
561 }
562 count := spmetrics.GetViewCount(user)
563 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
564 if err != nil {
565 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
566 return
567 }
568 w.Write(bs)
569 }
570}
571
572func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
573 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
574 log.Log(ctx, "got bluesky notification", "params", params)
575 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
576 if err != nil {
577 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
578 return
579 }
580 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
581 if err != nil {
582 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
583 return
584 }
585 bs, err := json.Marshal(signingKeys)
586 if err != nil {
587 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
588 return
589 }
590 w.Write(bs)
591 }
592}
593
594func (a *StreamplaceAPI) HandleATProtoOAuth(ctx context.Context, platform string) httprouter.Handle {
595 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
596 host, _, err := net.SplitHostPort(req.Host)
597 if err != nil {
598 host = req.Host
599 }
600 if !slices.Contains(atproto.AllowedPlatforms, platform) {
601 apierrors.WriteHTTPBadRequest(w, "unsupported platform", nil)
602 return
603 }
604
605 meta := atproto.GetMetadata(host, platform, a.CLI.AppBundleID)
606 bs, err := json.Marshal(meta)
607 if err != nil {
608 apierrors.WriteHTTPInternalServerError(w, "could not marshal metadata", err)
609 return
610 }
611 w.Header().Set("Content-Type", "application/json")
612 w.Write(bs)
613 }
614}
615
616type ChatResponse struct {
617 Post *bsky.FeedPost `json:"post"`
618 Repo *model.Repo `json:"repo"`
619 CID string `json:"cid"`
620}
621
622func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
623 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
624 user := params.ByName("repoDID")
625 if user == "" {
626 apierrors.WriteHTTPBadRequest(w, "user required", nil)
627 return
628 }
629 repoDID, err := a.NormalizeUser(ctx, user)
630 if err != nil {
631 apierrors.WriteHTTPNotFound(w, "user not found", err)
632 return
633 }
634 replies, err := a.Model.GetReplies(repoDID)
635 if err != nil {
636 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
637 return
638 }
639 bs, err := json.Marshal(replies)
640 if err != nil {
641 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
642 return
643 }
644 w.Header().Set("Content-Type", "application/json")
645 w.Write(bs)
646 }
647}
648
649func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
650 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
651 user := params.ByName("repoDID")
652 if user == "" {
653 apierrors.WriteHTTPBadRequest(w, "user required", nil)
654 return
655 }
656 repoDID, err := a.NormalizeUser(ctx, user)
657 if err != nil {
658 apierrors.WriteHTTPNotFound(w, "user not found", err)
659 return
660 }
661 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
662 if err != nil {
663 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
664 return
665 }
666
667 doc, err := livestream.ToLivestreamView()
668 if err != nil {
669 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
670 return
671 }
672
673 if livestream == nil {
674 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
675 return
676 }
677
678 bs, err := json.Marshal(doc)
679 if err != nil {
680 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
681 return
682 }
683 w.Header().Set("Content-Type", "application/json")
684 w.Write(bs)
685 }
686}
687
688func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
689 return func(next http.Handler) http.Handler {
690 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
691 ip, _, err := net.SplitHostPort(req.RemoteAddr)
692 if err != nil {
693 ip = req.RemoteAddr
694 }
695
696 limiter := a.getLimiter(ip)
697
698 if !limiter.Allow() {
699 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
700 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
701 return
702 }
703
704 next.ServeHTTP(w, req)
705 })
706 }
707}
708
709func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
710 handler, err := a.Handler(ctx)
711 if err != nil {
712 return err
713 }
714 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
715 s.Addr = a.CLI.HttpAddr
716 log.Log(ctx, "http server starting", "addr", s.Addr)
717 return s.ListenAndServe()
718 })
719}
720
721func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
722 handler, err := a.RedirectHandler(ctx)
723 if err != nil {
724 return err
725 }
726 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
727 s.Addr = a.CLI.HttpAddr
728 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
729 return s.ListenAndServe()
730 })
731}
732
733func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
734 handler, err := a.Handler(ctx)
735 if err != nil {
736 return err
737 }
738 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
739 s.Addr = a.CLI.HttpsAddr
740 log.Log(ctx, "https server starting",
741 "addr", s.Addr,
742 "certPath", a.CLI.TLSCertPath,
743 "keyPath", a.CLI.TLSKeyPath,
744 )
745 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
746 })
747}
748
749func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
750 ctx, cancel := context.WithCancel(ctx)
751 handler = gziphandler.GzipHandler(handler)
752 server := http.Server{Handler: handler}
753 var serveErr error
754 go func() {
755 serveErr = serve(&server)
756 cancel()
757 }()
758 <-ctx.Done()
759 if serveErr != nil {
760 return fmt.Errorf("error in http server: %w", serveErr)
761 }
762
763 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
764 defer cancel()
765 return server.Shutdown(ctx)
766}
767
768func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
769 return func(w http.ResponseWriter, req *http.Request) {
770 w.WriteHeader(200)
771 }
772}
773
774func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
775 a.limitersMu.Lock()
776 defer a.limitersMu.Unlock()
777
778 limiter, exists := a.limiters[ip]
779 if !exists {
780 // 5 actions per second with a burst of 3
781 limiter = rate.NewLimiter(rate.Limit(10.0), 8)
782 a.limiters[ip] = limiter
783 }
784
785 return limiter
786}
787
788func NewWebsocketTracker(maxConns int) *WebsocketTracker {
789 return &WebsocketTracker{
790 connections: make(map[string]int),
791 maxConnsPerIP: maxConns,
792 }
793}
794
795func (t *WebsocketTracker) AddConnection(ip string) bool {
796 t.mu.Lock()
797 defer t.mu.Unlock()
798
799 count := t.connections[ip]
800
801 if count >= t.maxConnsPerIP {
802 return false
803 }
804
805 t.connections[ip] = count + 1
806 return true
807}
808
809func (t *WebsocketTracker) RemoveConnection(ip string) {
810 t.mu.Lock()
811 defer t.mu.Unlock()
812
813 count := t.connections[ip]
814 if count > 0 {
815 t.connections[ip] = count - 1
816 }
817
818 if t.connections[ip] == 0 {
819 delete(t.connections, ip)
820 }
821}