Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/julienschmidt/httprouter"
22 "github.com/rs/cors"
23 sloghttp "github.com/samber/slog-http"
24 "golang.org/x/time/rate"
25
26 "stream.place/streamplace/js/app"
27 "stream.place/streamplace/pkg/atproto"
28 "stream.place/streamplace/pkg/bus"
29 "stream.place/streamplace/pkg/config"
30 "stream.place/streamplace/pkg/crypto/signers/eip712"
31 "stream.place/streamplace/pkg/director"
32 apierrors "stream.place/streamplace/pkg/errors"
33 "stream.place/streamplace/pkg/linking"
34 "stream.place/streamplace/pkg/log"
35 "stream.place/streamplace/pkg/media"
36 "stream.place/streamplace/pkg/mist/mistconfig"
37 "stream.place/streamplace/pkg/model"
38 "stream.place/streamplace/pkg/notifications"
39 "stream.place/streamplace/pkg/oproxy"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/spxrpc"
42 "stream.place/streamplace/pkg/streamplace"
43)
44
45type StreamplaceAPI struct {
46 CLI *config.CLI
47 Model model.Model
48 Updater *Updater
49 Signer *eip712.EIP712Signer
50 Mimes map[string]string
51 FirebaseNotifier notifications.FirebaseNotifier
52 MediaManager *media.MediaManager
53 MediaSigner media.MediaSigner
54 // not thread-safe yet
55 Aliases map[string]string
56 Bus *bus.Bus
57 ATSync *atproto.ATProtoSynchronizer
58 Director *director.Director
59
60 connTracker *WebsocketTracker
61
62 limiters map[string]*rate.Limiter
63 limitersMu sync.Mutex
64 SignerCache map[string]media.MediaSigner
65 SignerCacheMu sync.Mutex
66 op *oproxy.OProxy
67}
68
69type WebsocketTracker struct {
70 connections map[string]int
71 maxConnsPerIP int
72 mu sync.RWMutex
73}
74
75func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oproxy.OProxy) (*StreamplaceAPI, error) {
76 updater, err := PrepareUpdater(cli)
77 if err != nil {
78 return nil, err
79 }
80 a := &StreamplaceAPI{CLI: cli,
81 Model: mod,
82 Updater: updater,
83 Signer: signer,
84 FirebaseNotifier: noter,
85 MediaManager: mm,
86 MediaSigner: ms,
87 Aliases: map[string]string{},
88 Bus: bus,
89 ATSync: atsync,
90 Director: d,
91 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
92 limiters: make(map[string]*rate.Limiter),
93 SignerCache: make(map[string]media.MediaSigner),
94 op: op,
95 }
96 a.Mimes, err = updater.GetMimes()
97 if err != nil {
98 return nil, err
99 }
100 return a, nil
101}
102
103type AppHostingFS struct {
104 http.FileSystem
105}
106
107var ErrorIndex = errors.New("not found, use index.html")
108
109func (fs AppHostingFS) Open(name string) (http.File, error) {
110 file, err1 := fs.FileSystem.Open(name)
111 if err1 == nil {
112 return file, nil
113 }
114 if !errors.Is(err1, os.ErrNotExist) {
115 return nil, err1
116 }
117
118 return nil, ErrorIndex
119}
120
121// api/playback/iame.li/webrtc?rendition=source
122// api/playback/iame.li/stream.mp4?rendition=source
123// api/playback/iame.li/stream.webm?rendition=source
124// api/playback/iame.li/hls/index.m3u8
125// api/playback/iame.li/hls/source/stream.m3u8
126// api/playback/iame.li/hls/source/000000000000.ts
127
128func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
129 var xrpc http.Handler
130 xrpc, err := spxrpc.NewServer(a.CLI, a.Model)
131 if err != nil {
132 return nil, err
133 }
134
135 xrpc = a.op.OAuthMiddleware(xrpc)
136 router := httprouter.New()
137 router.Handler("GET", "/oauth/*anything", a.op.Handler())
138 router.Handler("POST", "/oauth/*anything", a.op.Handler())
139 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
140 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
141 apiRouter := httprouter.New()
142 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
143 // old clients
144 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
145
146 // new ones
147 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
148 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
149 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
150 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
151 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
152 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
153 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
154 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
155 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
156 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
157 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
158 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
159 // this one is not a lie
160 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
161 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
162 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
163 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
164 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
165 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
166 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
167 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
168 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
169 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
170 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
171 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
172 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx))
173 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
174 apiRouter.NotFound = a.HandleAPI404(ctx)
175 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
176 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
177 router.Handler("GET", "/api/*resource", apiRouterHandler)
178 router.Handler("POST", "/api/*resource", apiRouterHandler)
179 router.Handler("PUT", "/api/*resource", apiRouterHandler)
180 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
181 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
182 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
183 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
184 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
185 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
186 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
187 router.GET("/.well-known/did.json", a.HandleDidJson(ctx))
188 router.GET("/dl/*params", a.HandleAppDownload(ctx))
189 router.POST("/", a.HandleWebRTCIngest(ctx))
190 for _, redirect := range a.CLI.Redirects {
191 parts := strings.Split(redirect, ":")
192 if len(parts) != 2 {
193 log.Error(ctx, "invalid redirect", "redirect", redirect)
194 return nil, fmt.Errorf("invalid redirect: %s", redirect)
195 }
196 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
197 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
198 })
199 }
200 if a.CLI.FrontendProxy != "" {
201 u, err := url.Parse(a.CLI.FrontendProxy)
202 if err != nil {
203 return nil, err
204 }
205 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
206 router.NotFound = &httputil.ReverseProxy{
207 Rewrite: func(r *httputil.ProxyRequest) {
208 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
209 // we need to use 127.0.0.1 because the atproto oauth client requires it
210 r.Out.Header.Set("Origin", u.String())
211 r.SetURL(u)
212 },
213 }
214 } else {
215 files, err := app.Files()
216 if err != nil {
217 return nil, err
218 }
219 index, err := files.Open("index.html")
220 if err != nil {
221 return nil, err
222 }
223 bs, err := io.ReadAll(index)
224 if err != nil {
225 return nil, err
226 }
227 linker, err := linking.NewLinker(ctx, bs)
228 if err != nil {
229 return nil, err
230 }
231 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
232 if err != nil {
233 return nil, err
234 }
235 router.NotFound = linkingHandler
236 }
237 // needed because the WebRTC handler issues 405s from / otherwise
238 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
239 router.NotFound.ServeHTTP(w, r)
240 })
241 handler := sloghttp.Recovery(router)
242 handler = cors.AllowAll().Handler(handler)
243 handler = sloghttp.New(slog.Default())(handler)
244 handler = a.RateLimitMiddleware(ctx)(handler)
245
246 return handler, nil
247}
248
249func copyHeader(dst, src http.Header) {
250 for k, vv := range src {
251 // we'll handle CORS ourselves, thanks
252 if strings.HasPrefix(k, "Access-Control") {
253 continue
254 }
255 for _, v := range vv {
256 dst.Add(k, v)
257 }
258 }
259}
260
261// handler that takes care of static files and otherwise returns the index.html with the correct link card data
262func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
263 files, err := app.Files()
264 if err != nil {
265 return nil, err
266 }
267 fs := AppHostingFS{http.FS(files)}
268
269 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
270 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
271 f := strings.TrimPrefix(req.URL.Path, "/")
272 // under docs we need the index.html suffix due to astro rendering
273 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
274 f += "index.html"
275 }
276 _, err := fs.Open(f)
277 if err == nil {
278 fileHandler.ServeHTTP(w, req)
279 return
280 }
281 if errors.Is(err, ErrorIndex) || f == "" {
282 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
283 if err != nil {
284 log.Error(ctx, "error generating default card", "error", err)
285 }
286 w.Header().Set("Content-Type", "text/html")
287 w.Write(bs)
288 } else {
289 log.Warn(ctx, "error opening file", "error", err)
290 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
291 }
292 })
293 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
294 proto := "http"
295 if req.TLS != nil {
296 proto = "https"
297 }
298 fwProto := req.Header.Get("x-forwarded-proto")
299 if fwProto != "" {
300 proto = fwProto
301 }
302 req.URL.Host = req.Host
303 req.URL.Scheme = proto
304 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
305 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
306 if err != nil || repo == nil {
307 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
308 defaultHandler.ServeHTTP(w, req)
309 return
310 }
311 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
312 if err != nil || ls == nil {
313 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
314 defaultHandler.ServeHTTP(w, req)
315 return
316 }
317 lsv, err := ls.ToLivestreamView()
318 if err != nil || lsv == nil {
319 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
320 defaultHandler.ServeHTTP(w, req)
321 return
322 }
323 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
324 if err != nil {
325 log.Error(ctx, "error generating html", "error", err)
326 defaultHandler.ServeHTTP(w, req)
327 return
328 }
329 w.Header().Set("Content-Type", "text/html")
330 w.Write(bs)
331 }), nil
332}
333
334func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
335 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
336 if !a.CLI.HasMist() {
337 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
338 return
339 }
340 stream := params.ByName("stream")
341 if stream == "" {
342 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
343 return
344 }
345
346 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream)
347 prefix := fmt.Sprintf(tmpl, fullstream)
348 resource := params.ByName("resource")
349
350 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
351
352 client := &http.Client{}
353 req.URL = &url.URL{
354 Scheme: "http",
355 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
356 Path: fmt.Sprintf("%s%s", prefix, resource),
357 RawQuery: req.URL.RawQuery,
358 }
359
360 //http: Request.RequestURI can't be set in client requests.
361 //http://golang.org/src/pkg/net/http/client.go
362 req.RequestURI = ""
363
364 resp, err := client.Do(req)
365 if err != nil {
366 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
367 return
368 }
369 defer resp.Body.Close()
370
371 copyHeader(w.Header(), resp.Header)
372 w.WriteHeader(resp.StatusCode)
373 io.Copy(w, resp.Body)
374 }
375}
376
377func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
378 return func(w http.ResponseWriter, req *http.Request) {
379 noslash := req.URL.Path[1:]
380 ct, ok := a.Mimes[noslash]
381 if ok {
382 w.Header().Set("content-type", ct)
383 }
384 fs.ServeHTTP(w, req)
385 }
386}
387
388func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
389 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr)
390 if err != nil {
391 return nil, err
392 }
393 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
394 host, _, err := net.SplitHostPort(req.Host)
395 if err != nil {
396 host = req.Host
397 }
398 u := req.URL
399 if tlsPort == "443" {
400 u.Host = host
401 } else {
402 u.Host = net.JoinHostPort(host, tlsPort)
403 }
404 u.Scheme = "https"
405 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
406 }
407 mux := http.NewServeMux()
408 mux.HandleFunc("/", handleRedirect)
409 return mux, nil
410}
411
412type NotificationPayload struct {
413 Token string `json:"token"`
414 RepoDID string `json:"repoDID"`
415}
416
417func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
418 return func(w http.ResponseWriter, req *http.Request) {
419 w.WriteHeader(404)
420 }
421}
422
423func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
424 return func(w http.ResponseWriter, req *http.Request) {
425 payload, err := io.ReadAll(req.Body)
426 if err != nil {
427 log.Log(ctx, "error reading notification create", "error", err)
428 w.WriteHeader(400)
429 return
430 }
431 n := NotificationPayload{}
432 err = json.Unmarshal(payload, &n)
433 if err != nil {
434 log.Log(ctx, "error unmarshalling notification create", "error", err)
435 w.WriteHeader(400)
436 return
437 }
438 err = a.Model.CreateNotification(n.Token, n.RepoDID)
439 if err != nil {
440 log.Log(ctx, "error creating notification", "error", err)
441 w.WriteHeader(400)
442 return
443 }
444 log.Log(ctx, "successfully created notification", "token", n.Token)
445 w.WriteHeader(200)
446 if n.RepoDID != "" {
447 go func() {
448 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
449 if err != nil {
450 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
451 }
452 }()
453 }
454 }
455}
456
457func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
458 return func(w http.ResponseWriter, req *http.Request) {
459 err := a.MediaManager.ValidateMP4(ctx, req.Body)
460 if err != nil {
461 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
462 return
463 }
464 w.WriteHeader(200)
465 }
466}
467
468func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
469 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
470 var event model.PlayerEventAPI
471 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
472 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
473 return
474 }
475 err := a.Model.CreatePlayerEvent(event)
476 if err != nil {
477 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
478 return
479 }
480 w.WriteHeader(201)
481 }
482}
483
484func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
485 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
486 segs, err := a.Model.MostRecentSegments()
487 if err != nil {
488 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
489 return
490 }
491 bs, err := json.Marshal(segs)
492 if err != nil {
493 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
494 return
495 }
496 w.Header().Add("Content-Type", "application/json")
497 w.Write(bs)
498 }
499}
500
501func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
502 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
503 user := params.ByName("repoDID")
504 if user == "" {
505 apierrors.WriteHTTPBadRequest(w, "user required", nil)
506 return
507 }
508 user, err := a.NormalizeUser(ctx, user)
509 if err != nil {
510 apierrors.WriteHTTPNotFound(w, "user not found", err)
511 return
512 }
513 seg, err := a.Model.LatestSegmentForUser(user)
514 if err != nil {
515 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
516 return
517 }
518 streamplaceSeg, err := seg.ToStreamplaceSegment()
519 if err != nil {
520 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
521 return
522 }
523 bs, err := json.Marshal(streamplaceSeg)
524 if err != nil {
525 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
526 return
527 }
528 w.Header().Add("Content-Type", "application/json")
529 w.Write(bs)
530 }
531}
532
533type LiveUsersResponse struct {
534 model.Segment
535 Viewers int `json:"viewers"`
536}
537
538func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle {
539 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
540 repos, err := a.Model.MostRecentSegments()
541 if err != nil {
542 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err)
543 return
544 }
545 liveUsers := []LiveUsersResponse{}
546 for _, repo := range repos {
547 viewers := spmetrics.GetViewCount(repo.RepoDID)
548 liveUsers = append(liveUsers, LiveUsersResponse{
549 Segment: repo,
550 Viewers: viewers,
551 })
552 }
553 bs, err := json.Marshal(liveUsers)
554 if err != nil {
555 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err)
556 return
557 }
558 w.Write(bs)
559 }
560}
561
562func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
563 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
564 user := params.ByName("user")
565 if user == "" {
566 apierrors.WriteHTTPBadRequest(w, "user required", nil)
567 return
568 }
569 user, err := a.NormalizeUser(ctx, user)
570 if err != nil {
571 apierrors.WriteHTTPNotFound(w, "user not found", err)
572 return
573 }
574 count := spmetrics.GetViewCount(user)
575 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
576 if err != nil {
577 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
578 return
579 }
580 w.Write(bs)
581 }
582}
583
584func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
585 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
586 log.Log(ctx, "got bluesky notification", "params", params)
587 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
588 if err != nil {
589 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
590 return
591 }
592 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
593 if err != nil {
594 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
595 return
596 }
597 bs, err := json.Marshal(signingKeys)
598 if err != nil {
599 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
600 return
601 }
602 w.Write(bs)
603 }
604}
605
606type ChatResponse struct {
607 Post *bsky.FeedPost `json:"post"`
608 Repo *model.Repo `json:"repo"`
609 CID string `json:"cid"`
610}
611
612func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
613 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
614 user := params.ByName("repoDID")
615 if user == "" {
616 apierrors.WriteHTTPBadRequest(w, "user required", nil)
617 return
618 }
619 repoDID, err := a.NormalizeUser(ctx, user)
620 if err != nil {
621 apierrors.WriteHTTPNotFound(w, "user not found", err)
622 return
623 }
624 replies, err := a.Model.GetReplies(repoDID)
625 if err != nil {
626 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
627 return
628 }
629 bs, err := json.Marshal(replies)
630 if err != nil {
631 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
632 return
633 }
634 w.Header().Set("Content-Type", "application/json")
635 w.Write(bs)
636 }
637}
638
639func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
640 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
641 user := params.ByName("repoDID")
642 if user == "" {
643 apierrors.WriteHTTPBadRequest(w, "user required", nil)
644 return
645 }
646 repoDID, err := a.NormalizeUser(ctx, user)
647 if err != nil {
648 apierrors.WriteHTTPNotFound(w, "user not found", err)
649 return
650 }
651 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
652 if err != nil {
653 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
654 return
655 }
656
657 doc, err := livestream.ToLivestreamView()
658 if err != nil {
659 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
660 return
661 }
662
663 if livestream == nil {
664 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
665 return
666 }
667
668 bs, err := json.Marshal(doc)
669 if err != nil {
670 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
671 return
672 }
673 w.Header().Set("Content-Type", "application/json")
674 w.Write(bs)
675 }
676}
677
678func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
679 return func(next http.Handler) http.Handler {
680 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
681 ip, _, err := net.SplitHostPort(req.RemoteAddr)
682 if err != nil {
683 ip = req.RemoteAddr
684 }
685
686 if a.CLI.RateLimitPerSecond > 0 {
687 limiter := a.getLimiter(ip)
688
689 if !limiter.Allow() {
690 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
691 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
692 return
693 }
694 }
695
696 next.ServeHTTP(w, req)
697 })
698 }
699}
700
701func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
702 handler, err := a.Handler(ctx)
703 if err != nil {
704 return err
705 }
706 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
707 s.Addr = a.CLI.HttpAddr
708 log.Log(ctx, "http server starting", "addr", s.Addr)
709 return s.ListenAndServe()
710 })
711}
712
713func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
714 handler, err := a.RedirectHandler(ctx)
715 if err != nil {
716 return err
717 }
718 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
719 s.Addr = a.CLI.HttpAddr
720 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
721 return s.ListenAndServe()
722 })
723}
724
725func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
726 handler, err := a.Handler(ctx)
727 if err != nil {
728 return err
729 }
730 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
731 s.Addr = a.CLI.HttpsAddr
732 log.Log(ctx, "https server starting",
733 "addr", s.Addr,
734 "certPath", a.CLI.TLSCertPath,
735 "keyPath", a.CLI.TLSKeyPath,
736 )
737 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
738 })
739}
740
741func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
742 ctx, cancel := context.WithCancel(ctx)
743 handler = gziphandler.GzipHandler(handler)
744 server := http.Server{Handler: handler}
745 var serveErr error
746 go func() {
747 serveErr = serve(&server)
748 cancel()
749 }()
750 <-ctx.Done()
751 if serveErr != nil {
752 return fmt.Errorf("error in http server: %w", serveErr)
753 }
754
755 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
756 defer cancel()
757 return server.Shutdown(ctx)
758}
759
760func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
761 return func(w http.ResponseWriter, req *http.Request) {
762 w.WriteHeader(200)
763 }
764}
765
766func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
767 a.limitersMu.Lock()
768 defer a.limitersMu.Unlock()
769
770 limiter, exists := a.limiters[ip]
771 if !exists {
772 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
773 a.limiters[ip] = limiter
774 }
775
776 return limiter
777}
778
779func NewWebsocketTracker(maxConns int) *WebsocketTracker {
780 return &WebsocketTracker{
781 connections: make(map[string]int),
782 maxConnsPerIP: maxConns,
783 }
784}
785
786func (t *WebsocketTracker) AddConnection(ip string) bool {
787 t.mu.Lock()
788 defer t.mu.Unlock()
789
790 count := t.connections[ip]
791
792 if count >= t.maxConnsPerIP {
793 return false
794 }
795
796 t.connections[ip] = count + 1
797 return true
798}
799
800func (t *WebsocketTracker) RemoveConnection(ip string) {
801 t.mu.Lock()
802 defer t.mu.Unlock()
803
804 count := t.connections[ip]
805 if count > 0 {
806 t.connections[ip] = count - 1
807 }
808
809 if t.connections[ip] == 0 {
810 delete(t.connections, ip)
811 }
812}