Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/julienschmidt/httprouter"
22 "github.com/rs/cors"
23 sloghttp "github.com/samber/slog-http"
24 "golang.org/x/time/rate"
25
26 "stream.place/streamplace/js/app"
27 "stream.place/streamplace/pkg/atproto"
28 "stream.place/streamplace/pkg/bus"
29 "stream.place/streamplace/pkg/config"
30 "stream.place/streamplace/pkg/crypto/signers/eip712"
31 "stream.place/streamplace/pkg/director"
32 apierrors "stream.place/streamplace/pkg/errors"
33 "stream.place/streamplace/pkg/linking"
34 "stream.place/streamplace/pkg/log"
35 "stream.place/streamplace/pkg/media"
36 "stream.place/streamplace/pkg/mist/mistconfig"
37 "stream.place/streamplace/pkg/model"
38 "stream.place/streamplace/pkg/notifications"
39 "stream.place/streamplace/pkg/oproxy"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/spxrpc"
42 "stream.place/streamplace/pkg/streamplace"
43)
44
45type StreamplaceAPI struct {
46 CLI *config.CLI
47 Model model.Model
48 Updater *Updater
49 Signer *eip712.EIP712Signer
50 Mimes map[string]string
51 FirebaseNotifier notifications.FirebaseNotifier
52 MediaManager *media.MediaManager
53 MediaSigner media.MediaSigner
54 // not thread-safe yet
55 Aliases map[string]string
56 Bus *bus.Bus
57 ATSync *atproto.ATProtoSynchronizer
58 Director *director.Director
59
60 connTracker *WebsocketTracker
61
62 limiters map[string]*rate.Limiter
63 limitersMu sync.Mutex
64}
65
66type WebsocketTracker struct {
67 connections map[string]int
68 maxConnsPerIP int
69 mu sync.RWMutex
70}
71
72func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director) (*StreamplaceAPI, error) {
73 updater, err := PrepareUpdater(cli)
74 if err != nil {
75 return nil, err
76 }
77 a := &StreamplaceAPI{CLI: cli,
78 Model: mod,
79 Updater: updater,
80 Signer: signer,
81 FirebaseNotifier: noter,
82 MediaManager: mm,
83 MediaSigner: ms,
84 Aliases: map[string]string{},
85 Bus: bus,
86 ATSync: atsync,
87 Director: d,
88 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
89 limiters: make(map[string]*rate.Limiter),
90 }
91 a.Mimes, err = updater.GetMimes()
92 if err != nil {
93 return nil, err
94 }
95 return a, nil
96}
97
98type AppHostingFS struct {
99 http.FileSystem
100}
101
102var ErrorIndex = errors.New("not found, use index.html")
103
104func (fs AppHostingFS) Open(name string) (http.File, error) {
105 file, err1 := fs.FileSystem.Open(name)
106 if err1 == nil {
107 return file, nil
108 }
109 if !errors.Is(err1, os.ErrNotExist) {
110 return nil, err1
111 }
112
113 return nil, ErrorIndex
114}
115
116// api/playback/iame.li/webrtc?rendition=source
117// api/playback/iame.li/stream.mp4?rendition=source
118// api/playback/iame.li/stream.webm?rendition=source
119// api/playback/iame.li/hls/index.m3u8
120// api/playback/iame.li/hls/source/stream.m3u8
121// api/playback/iame.li/hls/source/000000000000.ts
122
123func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
124 var xrpc http.Handler
125 xrpc, err := spxrpc.NewServer(a.CLI, a.Model)
126 if err != nil {
127 return nil, err
128 }
129 op := oproxy.New(&oproxy.Config{
130 Host: a.CLI.PublicHost,
131 CreateOAuthSession: a.Model.CreateOAuthSession,
132 UpdateOAuthSession: a.Model.UpdateOAuthSession,
133 LoadOAuthSession: a.Model.LoadOAuthSession,
134 Scope: "atproto transition:generic",
135 UpstreamJWK: a.CLI.JWK,
136 DownstreamJWK: a.CLI.AccessJWK,
137 })
138
139 xrpc = op.OAuthMiddleware(xrpc)
140 router := httprouter.New()
141 router.Handler("GET", "/oauth/*anything", op.Handler())
142 router.Handler("POST", "/oauth/*anything", op.Handler())
143 router.Handler("GET", "/.well-known/oauth-authorization-server", op.Handler())
144 router.Handler("GET", "/.well-known/oauth-protected-resource", op.Handler())
145 apiRouter := httprouter.New()
146 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
147 // old clients
148 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
149
150 // new ones
151 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
152 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
153 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
154 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
155 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
156 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
157 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
158 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
159 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
160 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
161 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
162 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
163 // this one is not a lie
164 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
165 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
166 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
167 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
168 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
169 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
170 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
171 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
172 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
173 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
174 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
175 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
176 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx))
177 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
178 apiRouter.NotFound = a.HandleAPI404(ctx)
179 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
180 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
181 router.Handler("GET", "/api/*resource", apiRouterHandler)
182 router.Handler("POST", "/api/*resource", apiRouterHandler)
183 router.Handler("PUT", "/api/*resource", apiRouterHandler)
184 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
185 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
186 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
187 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
188 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
189 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
190 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
191 router.GET("/.well-known/did.json", a.HandleDidJson(ctx))
192 router.GET("/dl/*params", a.HandleAppDownload(ctx))
193 router.POST("/", a.HandleWebRTCIngest(ctx))
194 for _, redirect := range a.CLI.Redirects {
195 parts := strings.Split(redirect, ":")
196 if len(parts) != 2 {
197 log.Error(ctx, "invalid redirect", "redirect", redirect)
198 return nil, fmt.Errorf("invalid redirect: %s", redirect)
199 }
200 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
201 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
202 })
203 }
204 if a.CLI.FrontendProxy != "" {
205 u, err := url.Parse(a.CLI.FrontendProxy)
206 if err != nil {
207 return nil, err
208 }
209 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
210 router.NotFound = &httputil.ReverseProxy{
211 Rewrite: func(r *httputil.ProxyRequest) {
212 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
213 // we need to use 127.0.0.1 because the atproto oauth client requires it
214 r.Out.Header.Set("Origin", u.String())
215 r.SetURL(u)
216 },
217 }
218 } else {
219 files, err := app.Files()
220 if err != nil {
221 return nil, err
222 }
223 index, err := files.Open("index.html")
224 if err != nil {
225 return nil, err
226 }
227 bs, err := io.ReadAll(index)
228 if err != nil {
229 return nil, err
230 }
231 linker, err := linking.NewLinker(ctx, bs)
232 if err != nil {
233 return nil, err
234 }
235 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
236 if err != nil {
237 return nil, err
238 }
239 router.NotFound = linkingHandler
240 }
241 // needed because the WebRTC handler issues 405s from / otherwise
242 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
243 router.NotFound.ServeHTTP(w, r)
244 })
245 handler := sloghttp.Recovery(router)
246 handler = cors.AllowAll().Handler(handler)
247 handler = sloghttp.New(slog.Default())(handler)
248 handler = a.RateLimitMiddleware(ctx)(handler)
249
250 return handler, nil
251}
252
253func copyHeader(dst, src http.Header) {
254 for k, vv := range src {
255 // we'll handle CORS ourselves, thanks
256 if strings.HasPrefix(k, "Access-Control") {
257 continue
258 }
259 for _, v := range vv {
260 dst.Add(k, v)
261 }
262 }
263}
264
265// handler that takes care of static files and otherwise returns the index.html with the correct link card data
266func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
267 files, err := app.Files()
268 if err != nil {
269 return nil, err
270 }
271 fs := AppHostingFS{http.FS(files)}
272
273 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
274 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
275 f := strings.TrimPrefix(req.URL.Path, "/")
276 // under docs we need the index.html suffix due to astro rendering
277 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
278 f += "index.html"
279 }
280 _, err := fs.Open(f)
281 if err == nil {
282 fileHandler.ServeHTTP(w, req)
283 return
284 }
285 if errors.Is(err, ErrorIndex) || f == "" {
286 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
287 if err != nil {
288 log.Error(ctx, "error generating default card", "error", err)
289 }
290 w.Header().Set("Content-Type", "text/html")
291 w.Write(bs)
292 } else {
293 log.Warn(ctx, "error opening file", "error", err)
294 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
295 }
296 })
297 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
298 proto := "http"
299 if req.TLS != nil {
300 proto = "https"
301 }
302 fwProto := req.Header.Get("x-forwarded-proto")
303 if fwProto != "" {
304 proto = fwProto
305 }
306 req.URL.Host = req.Host
307 req.URL.Scheme = proto
308 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
309 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
310 if err != nil || repo == nil {
311 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
312 defaultHandler.ServeHTTP(w, req)
313 return
314 }
315 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
316 if err != nil || ls == nil {
317 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
318 defaultHandler.ServeHTTP(w, req)
319 return
320 }
321 lsv, err := ls.ToLivestreamView()
322 if err != nil || lsv == nil {
323 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
324 defaultHandler.ServeHTTP(w, req)
325 return
326 }
327 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
328 if err != nil {
329 log.Error(ctx, "error generating html", "error", err)
330 defaultHandler.ServeHTTP(w, req)
331 return
332 }
333 w.Header().Set("Content-Type", "text/html")
334 w.Write(bs)
335 }), nil
336}
337
338func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
339 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
340 if !a.CLI.HasMist() {
341 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
342 return
343 }
344 stream := params.ByName("stream")
345 if stream == "" {
346 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
347 return
348 }
349
350 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream)
351 prefix := fmt.Sprintf(tmpl, fullstream)
352 resource := params.ByName("resource")
353
354 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
355
356 client := &http.Client{}
357 req.URL = &url.URL{
358 Scheme: "http",
359 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
360 Path: fmt.Sprintf("%s%s", prefix, resource),
361 RawQuery: req.URL.RawQuery,
362 }
363
364 //http: Request.RequestURI can't be set in client requests.
365 //http://golang.org/src/pkg/net/http/client.go
366 req.RequestURI = ""
367
368 resp, err := client.Do(req)
369 if err != nil {
370 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
371 return
372 }
373 defer resp.Body.Close()
374
375 copyHeader(w.Header(), resp.Header)
376 w.WriteHeader(resp.StatusCode)
377 io.Copy(w, resp.Body)
378 }
379}
380
381func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
382 return func(w http.ResponseWriter, req *http.Request) {
383 noslash := req.URL.Path[1:]
384 ct, ok := a.Mimes[noslash]
385 if ok {
386 w.Header().Set("content-type", ct)
387 }
388 fs.ServeHTTP(w, req)
389 }
390}
391
392func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
393 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr)
394 if err != nil {
395 return nil, err
396 }
397 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
398 host, _, err := net.SplitHostPort(req.Host)
399 if err != nil {
400 host = req.Host
401 }
402 u := req.URL
403 if tlsPort == "443" {
404 u.Host = host
405 } else {
406 u.Host = net.JoinHostPort(host, tlsPort)
407 }
408 u.Scheme = "https"
409 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
410 }
411 mux := http.NewServeMux()
412 mux.HandleFunc("/", handleRedirect)
413 return mux, nil
414}
415
416type NotificationPayload struct {
417 Token string `json:"token"`
418 RepoDID string `json:"repoDID"`
419}
420
421func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
422 return func(w http.ResponseWriter, req *http.Request) {
423 w.WriteHeader(404)
424 }
425}
426
427func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
428 return func(w http.ResponseWriter, req *http.Request) {
429 payload, err := io.ReadAll(req.Body)
430 if err != nil {
431 log.Log(ctx, "error reading notification create", "error", err)
432 w.WriteHeader(400)
433 return
434 }
435 n := NotificationPayload{}
436 err = json.Unmarshal(payload, &n)
437 if err != nil {
438 log.Log(ctx, "error unmarshalling notification create", "error", err)
439 w.WriteHeader(400)
440 return
441 }
442 err = a.Model.CreateNotification(n.Token, n.RepoDID)
443 if err != nil {
444 log.Log(ctx, "error creating notification", "error", err)
445 w.WriteHeader(400)
446 return
447 }
448 log.Log(ctx, "successfully created notification", "token", n.Token)
449 w.WriteHeader(200)
450 if n.RepoDID != "" {
451 go func() {
452 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
453 if err != nil {
454 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
455 }
456 }()
457 }
458 }
459}
460
461func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
462 return func(w http.ResponseWriter, req *http.Request) {
463 err := a.MediaManager.ValidateMP4(ctx, req.Body)
464 if err != nil {
465 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
466 return
467 }
468 w.WriteHeader(200)
469 }
470}
471
472func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
473 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
474 var event model.PlayerEventAPI
475 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
476 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
477 return
478 }
479 err := a.Model.CreatePlayerEvent(event)
480 if err != nil {
481 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
482 return
483 }
484 w.WriteHeader(201)
485 }
486}
487
488func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
489 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
490 segs, err := a.Model.MostRecentSegments()
491 if err != nil {
492 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
493 return
494 }
495 bs, err := json.Marshal(segs)
496 if err != nil {
497 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
498 return
499 }
500 w.Header().Add("Content-Type", "application/json")
501 w.Write(bs)
502 }
503}
504
505func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
506 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
507 user := params.ByName("repoDID")
508 if user == "" {
509 apierrors.WriteHTTPBadRequest(w, "user required", nil)
510 return
511 }
512 user, err := a.NormalizeUser(ctx, user)
513 if err != nil {
514 apierrors.WriteHTTPNotFound(w, "user not found", err)
515 return
516 }
517 seg, err := a.Model.LatestSegmentForUser(user)
518 if err != nil {
519 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
520 return
521 }
522 streamplaceSeg, err := seg.ToStreamplaceSegment()
523 if err != nil {
524 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
525 return
526 }
527 bs, err := json.Marshal(streamplaceSeg)
528 if err != nil {
529 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
530 return
531 }
532 w.Header().Add("Content-Type", "application/json")
533 w.Write(bs)
534 }
535}
536
537type LiveUsersResponse struct {
538 model.Segment
539 Viewers int `json:"viewers"`
540}
541
542func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle {
543 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
544 repos, err := a.Model.MostRecentSegments()
545 if err != nil {
546 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err)
547 return
548 }
549 liveUsers := []LiveUsersResponse{}
550 for _, repo := range repos {
551 viewers := spmetrics.GetViewCount(repo.RepoDID)
552 liveUsers = append(liveUsers, LiveUsersResponse{
553 Segment: repo,
554 Viewers: viewers,
555 })
556 }
557 bs, err := json.Marshal(liveUsers)
558 if err != nil {
559 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err)
560 return
561 }
562 w.Write(bs)
563 }
564}
565
566func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
567 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
568 user := params.ByName("user")
569 if user == "" {
570 apierrors.WriteHTTPBadRequest(w, "user required", nil)
571 return
572 }
573 user, err := a.NormalizeUser(ctx, user)
574 if err != nil {
575 apierrors.WriteHTTPNotFound(w, "user not found", err)
576 return
577 }
578 count := spmetrics.GetViewCount(user)
579 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
580 if err != nil {
581 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
582 return
583 }
584 w.Write(bs)
585 }
586}
587
588func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
589 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
590 log.Log(ctx, "got bluesky notification", "params", params)
591 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
592 if err != nil {
593 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
594 return
595 }
596 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
597 if err != nil {
598 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
599 return
600 }
601 bs, err := json.Marshal(signingKeys)
602 if err != nil {
603 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
604 return
605 }
606 w.Write(bs)
607 }
608}
609
610type ChatResponse struct {
611 Post *bsky.FeedPost `json:"post"`
612 Repo *model.Repo `json:"repo"`
613 CID string `json:"cid"`
614}
615
616func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
617 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
618 user := params.ByName("repoDID")
619 if user == "" {
620 apierrors.WriteHTTPBadRequest(w, "user required", nil)
621 return
622 }
623 repoDID, err := a.NormalizeUser(ctx, user)
624 if err != nil {
625 apierrors.WriteHTTPNotFound(w, "user not found", err)
626 return
627 }
628 replies, err := a.Model.GetReplies(repoDID)
629 if err != nil {
630 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
631 return
632 }
633 bs, err := json.Marshal(replies)
634 if err != nil {
635 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
636 return
637 }
638 w.Header().Set("Content-Type", "application/json")
639 w.Write(bs)
640 }
641}
642
643func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
644 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
645 user := params.ByName("repoDID")
646 if user == "" {
647 apierrors.WriteHTTPBadRequest(w, "user required", nil)
648 return
649 }
650 repoDID, err := a.NormalizeUser(ctx, user)
651 if err != nil {
652 apierrors.WriteHTTPNotFound(w, "user not found", err)
653 return
654 }
655 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
656 if err != nil {
657 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
658 return
659 }
660
661 doc, err := livestream.ToLivestreamView()
662 if err != nil {
663 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
664 return
665 }
666
667 if livestream == nil {
668 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
669 return
670 }
671
672 bs, err := json.Marshal(doc)
673 if err != nil {
674 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
675 return
676 }
677 w.Header().Set("Content-Type", "application/json")
678 w.Write(bs)
679 }
680}
681
682func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
683 return func(next http.Handler) http.Handler {
684 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
685 ip, _, err := net.SplitHostPort(req.RemoteAddr)
686 if err != nil {
687 ip = req.RemoteAddr
688 }
689
690 if a.CLI.RateLimitPerSecond > 0 {
691 limiter := a.getLimiter(ip)
692
693 if !limiter.Allow() {
694 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
695 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
696 return
697 }
698 }
699
700 next.ServeHTTP(w, req)
701 })
702 }
703}
704
705func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
706 handler, err := a.Handler(ctx)
707 if err != nil {
708 return err
709 }
710 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
711 s.Addr = a.CLI.HttpAddr
712 log.Log(ctx, "http server starting", "addr", s.Addr)
713 return s.ListenAndServe()
714 })
715}
716
717func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
718 handler, err := a.RedirectHandler(ctx)
719 if err != nil {
720 return err
721 }
722 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
723 s.Addr = a.CLI.HttpAddr
724 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
725 return s.ListenAndServe()
726 })
727}
728
729func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
730 handler, err := a.Handler(ctx)
731 if err != nil {
732 return err
733 }
734 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
735 s.Addr = a.CLI.HttpsAddr
736 log.Log(ctx, "https server starting",
737 "addr", s.Addr,
738 "certPath", a.CLI.TLSCertPath,
739 "keyPath", a.CLI.TLSKeyPath,
740 )
741 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
742 })
743}
744
745func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
746 ctx, cancel := context.WithCancel(ctx)
747 handler = gziphandler.GzipHandler(handler)
748 server := http.Server{Handler: handler}
749 var serveErr error
750 go func() {
751 serveErr = serve(&server)
752 cancel()
753 }()
754 <-ctx.Done()
755 if serveErr != nil {
756 return fmt.Errorf("error in http server: %w", serveErr)
757 }
758
759 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
760 defer cancel()
761 return server.Shutdown(ctx)
762}
763
764func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
765 return func(w http.ResponseWriter, req *http.Request) {
766 w.WriteHeader(200)
767 }
768}
769
770func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
771 a.limitersMu.Lock()
772 defer a.limitersMu.Unlock()
773
774 limiter, exists := a.limiters[ip]
775 if !exists {
776 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
777 a.limiters[ip] = limiter
778 }
779
780 return limiter
781}
782
783func NewWebsocketTracker(maxConns int) *WebsocketTracker {
784 return &WebsocketTracker{
785 connections: make(map[string]int),
786 maxConnsPerIP: maxConns,
787 }
788}
789
790func (t *WebsocketTracker) AddConnection(ip string) bool {
791 t.mu.Lock()
792 defer t.mu.Unlock()
793
794 count := t.connections[ip]
795
796 if count >= t.maxConnsPerIP {
797 return false
798 }
799
800 t.connections[ip] = count + 1
801 return true
802}
803
804func (t *WebsocketTracker) RemoveConnection(ip string) {
805 t.mu.Lock()
806 defer t.mu.Unlock()
807
808 count := t.connections[ip]
809 if count > 0 {
810 t.connections[ip] = count - 1
811 }
812
813 if t.connections[ip] == 0 {
814 delete(t.connections, ip)
815 }
816}