Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/julienschmidt/httprouter"
22 "github.com/rs/cors"
23 sloghttp "github.com/samber/slog-http"
24 "golang.org/x/time/rate"
25
26 "stream.place/streamplace/js/app"
27 "stream.place/streamplace/pkg/atproto"
28 "stream.place/streamplace/pkg/bus"
29 "stream.place/streamplace/pkg/config"
30 "stream.place/streamplace/pkg/crypto/signers/eip712"
31 "stream.place/streamplace/pkg/director"
32 apierrors "stream.place/streamplace/pkg/errors"
33 "stream.place/streamplace/pkg/linking"
34 "stream.place/streamplace/pkg/log"
35 "stream.place/streamplace/pkg/media"
36 "stream.place/streamplace/pkg/mist/mistconfig"
37 "stream.place/streamplace/pkg/model"
38 "stream.place/streamplace/pkg/notifications"
39 "stream.place/streamplace/pkg/oproxy"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/spxrpc"
42 "stream.place/streamplace/pkg/streamplace"
43)
44
45type StreamplaceAPI struct {
46 CLI *config.CLI
47 Model model.Model
48 Updater *Updater
49 Signer *eip712.EIP712Signer
50 Mimes map[string]string
51 FirebaseNotifier notifications.FirebaseNotifier
52 MediaManager *media.MediaManager
53 MediaSigner media.MediaSigner
54 // not thread-safe yet
55 Aliases map[string]string
56 Bus *bus.Bus
57 ATSync *atproto.ATProtoSynchronizer
58 Director *director.Director
59
60 connTracker *WebsocketTracker
61
62 limiters map[string]*rate.Limiter
63 limitersMu sync.Mutex
64 SignerCache map[string]media.MediaSigner
65 SignerCacheMu sync.Mutex
66}
67
68type WebsocketTracker struct {
69 connections map[string]int
70 maxConnsPerIP int
71 mu sync.RWMutex
72}
73
74func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director) (*StreamplaceAPI, error) {
75 updater, err := PrepareUpdater(cli)
76 if err != nil {
77 return nil, err
78 }
79 a := &StreamplaceAPI{CLI: cli,
80 Model: mod,
81 Updater: updater,
82 Signer: signer,
83 FirebaseNotifier: noter,
84 MediaManager: mm,
85 MediaSigner: ms,
86 Aliases: map[string]string{},
87 Bus: bus,
88 ATSync: atsync,
89 Director: d,
90 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
91 limiters: make(map[string]*rate.Limiter),
92 SignerCache: make(map[string]media.MediaSigner),
93 }
94 a.Mimes, err = updater.GetMimes()
95 if err != nil {
96 return nil, err
97 }
98 return a, nil
99}
100
101type AppHostingFS struct {
102 http.FileSystem
103}
104
105var ErrorIndex = errors.New("not found, use index.html")
106
107func (fs AppHostingFS) Open(name string) (http.File, error) {
108 file, err1 := fs.FileSystem.Open(name)
109 if err1 == nil {
110 return file, nil
111 }
112 if !errors.Is(err1, os.ErrNotExist) {
113 return nil, err1
114 }
115
116 return nil, ErrorIndex
117}
118
119// api/playback/iame.li/webrtc?rendition=source
120// api/playback/iame.li/stream.mp4?rendition=source
121// api/playback/iame.li/stream.webm?rendition=source
122// api/playback/iame.li/hls/index.m3u8
123// api/playback/iame.li/hls/source/stream.m3u8
124// api/playback/iame.li/hls/source/000000000000.ts
125
126func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
127 var xrpc http.Handler
128 xrpc, err := spxrpc.NewServer(a.CLI, a.Model)
129 if err != nil {
130 return nil, err
131 }
132 op := oproxy.New(&oproxy.Config{
133 Host: a.CLI.PublicHost,
134 CreateOAuthSession: a.Model.CreateOAuthSession,
135 UpdateOAuthSession: a.Model.UpdateOAuthSession,
136 LoadOAuthSession: a.Model.LoadOAuthSession,
137 Scope: "atproto transition:generic",
138 UpstreamJWK: a.CLI.JWK,
139 DownstreamJWK: a.CLI.AccessJWK,
140 })
141
142 xrpc = op.OAuthMiddleware(xrpc)
143 router := httprouter.New()
144 router.Handler("GET", "/oauth/*anything", op.Handler())
145 router.Handler("POST", "/oauth/*anything", op.Handler())
146 router.Handler("GET", "/.well-known/oauth-authorization-server", op.Handler())
147 router.Handler("GET", "/.well-known/oauth-protected-resource", op.Handler())
148 apiRouter := httprouter.New()
149 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
150 // old clients
151 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
152
153 // new ones
154 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
155 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
156 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
157 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
158 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
159 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
160 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
161 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
162 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
163 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
164 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
165 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
166 // this one is not a lie
167 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
168 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
169 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
170 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
171 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
172 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
173 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
174 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
175 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
176 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
177 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
178 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
179 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx))
180 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
181 apiRouter.NotFound = a.HandleAPI404(ctx)
182 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
183 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
184 router.Handler("GET", "/api/*resource", apiRouterHandler)
185 router.Handler("POST", "/api/*resource", apiRouterHandler)
186 router.Handler("PUT", "/api/*resource", apiRouterHandler)
187 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
188 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
189 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
190 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
191 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
192 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
193 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
194 router.GET("/.well-known/did.json", a.HandleDidJson(ctx))
195 router.GET("/dl/*params", a.HandleAppDownload(ctx))
196 router.POST("/", a.HandleWebRTCIngest(ctx))
197 for _, redirect := range a.CLI.Redirects {
198 parts := strings.Split(redirect, ":")
199 if len(parts) != 2 {
200 log.Error(ctx, "invalid redirect", "redirect", redirect)
201 return nil, fmt.Errorf("invalid redirect: %s", redirect)
202 }
203 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
204 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
205 })
206 }
207 if a.CLI.FrontendProxy != "" {
208 u, err := url.Parse(a.CLI.FrontendProxy)
209 if err != nil {
210 return nil, err
211 }
212 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
213 router.NotFound = &httputil.ReverseProxy{
214 Rewrite: func(r *httputil.ProxyRequest) {
215 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
216 // we need to use 127.0.0.1 because the atproto oauth client requires it
217 r.Out.Header.Set("Origin", u.String())
218 r.SetURL(u)
219 },
220 }
221 } else {
222 files, err := app.Files()
223 if err != nil {
224 return nil, err
225 }
226 index, err := files.Open("index.html")
227 if err != nil {
228 return nil, err
229 }
230 bs, err := io.ReadAll(index)
231 if err != nil {
232 return nil, err
233 }
234 linker, err := linking.NewLinker(ctx, bs)
235 if err != nil {
236 return nil, err
237 }
238 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
239 if err != nil {
240 return nil, err
241 }
242 router.NotFound = linkingHandler
243 }
244 // needed because the WebRTC handler issues 405s from / otherwise
245 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
246 router.NotFound.ServeHTTP(w, r)
247 })
248 handler := sloghttp.Recovery(router)
249 handler = cors.AllowAll().Handler(handler)
250 handler = sloghttp.New(slog.Default())(handler)
251 handler = a.RateLimitMiddleware(ctx)(handler)
252
253 return handler, nil
254}
255
256func copyHeader(dst, src http.Header) {
257 for k, vv := range src {
258 // we'll handle CORS ourselves, thanks
259 if strings.HasPrefix(k, "Access-Control") {
260 continue
261 }
262 for _, v := range vv {
263 dst.Add(k, v)
264 }
265 }
266}
267
268// handler that takes care of static files and otherwise returns the index.html with the correct link card data
269func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
270 files, err := app.Files()
271 if err != nil {
272 return nil, err
273 }
274 fs := AppHostingFS{http.FS(files)}
275
276 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
277 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
278 f := strings.TrimPrefix(req.URL.Path, "/")
279 // under docs we need the index.html suffix due to astro rendering
280 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
281 f += "index.html"
282 }
283 _, err := fs.Open(f)
284 if err == nil {
285 fileHandler.ServeHTTP(w, req)
286 return
287 }
288 if errors.Is(err, ErrorIndex) || f == "" {
289 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
290 if err != nil {
291 log.Error(ctx, "error generating default card", "error", err)
292 }
293 w.Header().Set("Content-Type", "text/html")
294 w.Write(bs)
295 } else {
296 log.Warn(ctx, "error opening file", "error", err)
297 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
298 }
299 })
300 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
301 proto := "http"
302 if req.TLS != nil {
303 proto = "https"
304 }
305 fwProto := req.Header.Get("x-forwarded-proto")
306 if fwProto != "" {
307 proto = fwProto
308 }
309 req.URL.Host = req.Host
310 req.URL.Scheme = proto
311 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
312 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
313 if err != nil || repo == nil {
314 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
315 defaultHandler.ServeHTTP(w, req)
316 return
317 }
318 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
319 if err != nil || ls == nil {
320 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
321 defaultHandler.ServeHTTP(w, req)
322 return
323 }
324 lsv, err := ls.ToLivestreamView()
325 if err != nil || lsv == nil {
326 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
327 defaultHandler.ServeHTTP(w, req)
328 return
329 }
330 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
331 if err != nil {
332 log.Error(ctx, "error generating html", "error", err)
333 defaultHandler.ServeHTTP(w, req)
334 return
335 }
336 w.Header().Set("Content-Type", "text/html")
337 w.Write(bs)
338 }), nil
339}
340
341func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
342 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
343 if !a.CLI.HasMist() {
344 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
345 return
346 }
347 stream := params.ByName("stream")
348 if stream == "" {
349 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
350 return
351 }
352
353 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream)
354 prefix := fmt.Sprintf(tmpl, fullstream)
355 resource := params.ByName("resource")
356
357 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
358
359 client := &http.Client{}
360 req.URL = &url.URL{
361 Scheme: "http",
362 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
363 Path: fmt.Sprintf("%s%s", prefix, resource),
364 RawQuery: req.URL.RawQuery,
365 }
366
367 //http: Request.RequestURI can't be set in client requests.
368 //http://golang.org/src/pkg/net/http/client.go
369 req.RequestURI = ""
370
371 resp, err := client.Do(req)
372 if err != nil {
373 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
374 return
375 }
376 defer resp.Body.Close()
377
378 copyHeader(w.Header(), resp.Header)
379 w.WriteHeader(resp.StatusCode)
380 io.Copy(w, resp.Body)
381 }
382}
383
384func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
385 return func(w http.ResponseWriter, req *http.Request) {
386 noslash := req.URL.Path[1:]
387 ct, ok := a.Mimes[noslash]
388 if ok {
389 w.Header().Set("content-type", ct)
390 }
391 fs.ServeHTTP(w, req)
392 }
393}
394
395func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
396 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr)
397 if err != nil {
398 return nil, err
399 }
400 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
401 host, _, err := net.SplitHostPort(req.Host)
402 if err != nil {
403 host = req.Host
404 }
405 u := req.URL
406 if tlsPort == "443" {
407 u.Host = host
408 } else {
409 u.Host = net.JoinHostPort(host, tlsPort)
410 }
411 u.Scheme = "https"
412 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
413 }
414 mux := http.NewServeMux()
415 mux.HandleFunc("/", handleRedirect)
416 return mux, nil
417}
418
419type NotificationPayload struct {
420 Token string `json:"token"`
421 RepoDID string `json:"repoDID"`
422}
423
424func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
425 return func(w http.ResponseWriter, req *http.Request) {
426 w.WriteHeader(404)
427 }
428}
429
430func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
431 return func(w http.ResponseWriter, req *http.Request) {
432 payload, err := io.ReadAll(req.Body)
433 if err != nil {
434 log.Log(ctx, "error reading notification create", "error", err)
435 w.WriteHeader(400)
436 return
437 }
438 n := NotificationPayload{}
439 err = json.Unmarshal(payload, &n)
440 if err != nil {
441 log.Log(ctx, "error unmarshalling notification create", "error", err)
442 w.WriteHeader(400)
443 return
444 }
445 err = a.Model.CreateNotification(n.Token, n.RepoDID)
446 if err != nil {
447 log.Log(ctx, "error creating notification", "error", err)
448 w.WriteHeader(400)
449 return
450 }
451 log.Log(ctx, "successfully created notification", "token", n.Token)
452 w.WriteHeader(200)
453 if n.RepoDID != "" {
454 go func() {
455 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
456 if err != nil {
457 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
458 }
459 }()
460 }
461 }
462}
463
464func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
465 return func(w http.ResponseWriter, req *http.Request) {
466 err := a.MediaManager.ValidateMP4(ctx, req.Body)
467 if err != nil {
468 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
469 return
470 }
471 w.WriteHeader(200)
472 }
473}
474
475func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
476 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
477 var event model.PlayerEventAPI
478 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
479 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
480 return
481 }
482 err := a.Model.CreatePlayerEvent(event)
483 if err != nil {
484 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
485 return
486 }
487 w.WriteHeader(201)
488 }
489}
490
491func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
492 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
493 segs, err := a.Model.MostRecentSegments()
494 if err != nil {
495 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
496 return
497 }
498 bs, err := json.Marshal(segs)
499 if err != nil {
500 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
501 return
502 }
503 w.Header().Add("Content-Type", "application/json")
504 w.Write(bs)
505 }
506}
507
508func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
509 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
510 user := params.ByName("repoDID")
511 if user == "" {
512 apierrors.WriteHTTPBadRequest(w, "user required", nil)
513 return
514 }
515 user, err := a.NormalizeUser(ctx, user)
516 if err != nil {
517 apierrors.WriteHTTPNotFound(w, "user not found", err)
518 return
519 }
520 seg, err := a.Model.LatestSegmentForUser(user)
521 if err != nil {
522 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
523 return
524 }
525 streamplaceSeg, err := seg.ToStreamplaceSegment()
526 if err != nil {
527 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
528 return
529 }
530 bs, err := json.Marshal(streamplaceSeg)
531 if err != nil {
532 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
533 return
534 }
535 w.Header().Add("Content-Type", "application/json")
536 w.Write(bs)
537 }
538}
539
540type LiveUsersResponse struct {
541 model.Segment
542 Viewers int `json:"viewers"`
543}
544
545func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle {
546 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
547 repos, err := a.Model.MostRecentSegments()
548 if err != nil {
549 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err)
550 return
551 }
552 liveUsers := []LiveUsersResponse{}
553 for _, repo := range repos {
554 viewers := spmetrics.GetViewCount(repo.RepoDID)
555 liveUsers = append(liveUsers, LiveUsersResponse{
556 Segment: repo,
557 Viewers: viewers,
558 })
559 }
560 bs, err := json.Marshal(liveUsers)
561 if err != nil {
562 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err)
563 return
564 }
565 w.Write(bs)
566 }
567}
568
569func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
570 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
571 user := params.ByName("user")
572 if user == "" {
573 apierrors.WriteHTTPBadRequest(w, "user required", nil)
574 return
575 }
576 user, err := a.NormalizeUser(ctx, user)
577 if err != nil {
578 apierrors.WriteHTTPNotFound(w, "user not found", err)
579 return
580 }
581 count := spmetrics.GetViewCount(user)
582 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
583 if err != nil {
584 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
585 return
586 }
587 w.Write(bs)
588 }
589}
590
591func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
592 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
593 log.Log(ctx, "got bluesky notification", "params", params)
594 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
595 if err != nil {
596 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
597 return
598 }
599 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
600 if err != nil {
601 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
602 return
603 }
604 bs, err := json.Marshal(signingKeys)
605 if err != nil {
606 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
607 return
608 }
609 w.Write(bs)
610 }
611}
612
613type ChatResponse struct {
614 Post *bsky.FeedPost `json:"post"`
615 Repo *model.Repo `json:"repo"`
616 CID string `json:"cid"`
617}
618
619func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
620 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
621 user := params.ByName("repoDID")
622 if user == "" {
623 apierrors.WriteHTTPBadRequest(w, "user required", nil)
624 return
625 }
626 repoDID, err := a.NormalizeUser(ctx, user)
627 if err != nil {
628 apierrors.WriteHTTPNotFound(w, "user not found", err)
629 return
630 }
631 replies, err := a.Model.GetReplies(repoDID)
632 if err != nil {
633 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
634 return
635 }
636 bs, err := json.Marshal(replies)
637 if err != nil {
638 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
639 return
640 }
641 w.Header().Set("Content-Type", "application/json")
642 w.Write(bs)
643 }
644}
645
646func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
647 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
648 user := params.ByName("repoDID")
649 if user == "" {
650 apierrors.WriteHTTPBadRequest(w, "user required", nil)
651 return
652 }
653 repoDID, err := a.NormalizeUser(ctx, user)
654 if err != nil {
655 apierrors.WriteHTTPNotFound(w, "user not found", err)
656 return
657 }
658 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
659 if err != nil {
660 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
661 return
662 }
663
664 doc, err := livestream.ToLivestreamView()
665 if err != nil {
666 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
667 return
668 }
669
670 if livestream == nil {
671 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
672 return
673 }
674
675 bs, err := json.Marshal(doc)
676 if err != nil {
677 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
678 return
679 }
680 w.Header().Set("Content-Type", "application/json")
681 w.Write(bs)
682 }
683}
684
685func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
686 return func(next http.Handler) http.Handler {
687 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
688 ip, _, err := net.SplitHostPort(req.RemoteAddr)
689 if err != nil {
690 ip = req.RemoteAddr
691 }
692
693 if a.CLI.RateLimitPerSecond > 0 {
694 limiter := a.getLimiter(ip)
695
696 if !limiter.Allow() {
697 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
698 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
699 return
700 }
701 }
702
703 next.ServeHTTP(w, req)
704 })
705 }
706}
707
708func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
709 handler, err := a.Handler(ctx)
710 if err != nil {
711 return err
712 }
713 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
714 s.Addr = a.CLI.HttpAddr
715 log.Log(ctx, "http server starting", "addr", s.Addr)
716 return s.ListenAndServe()
717 })
718}
719
720func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
721 handler, err := a.RedirectHandler(ctx)
722 if err != nil {
723 return err
724 }
725 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
726 s.Addr = a.CLI.HttpAddr
727 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
728 return s.ListenAndServe()
729 })
730}
731
732func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
733 handler, err := a.Handler(ctx)
734 if err != nil {
735 return err
736 }
737 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
738 s.Addr = a.CLI.HttpsAddr
739 log.Log(ctx, "https server starting",
740 "addr", s.Addr,
741 "certPath", a.CLI.TLSCertPath,
742 "keyPath", a.CLI.TLSKeyPath,
743 )
744 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
745 })
746}
747
748func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
749 ctx, cancel := context.WithCancel(ctx)
750 handler = gziphandler.GzipHandler(handler)
751 server := http.Server{Handler: handler}
752 var serveErr error
753 go func() {
754 serveErr = serve(&server)
755 cancel()
756 }()
757 <-ctx.Done()
758 if serveErr != nil {
759 return fmt.Errorf("error in http server: %w", serveErr)
760 }
761
762 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
763 defer cancel()
764 return server.Shutdown(ctx)
765}
766
767func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
768 return func(w http.ResponseWriter, req *http.Request) {
769 w.WriteHeader(200)
770 }
771}
772
773func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
774 a.limitersMu.Lock()
775 defer a.limitersMu.Unlock()
776
777 limiter, exists := a.limiters[ip]
778 if !exists {
779 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
780 a.limiters[ip] = limiter
781 }
782
783 return limiter
784}
785
786func NewWebsocketTracker(maxConns int) *WebsocketTracker {
787 return &WebsocketTracker{
788 connections: make(map[string]int),
789 maxConnsPerIP: maxConns,
790 }
791}
792
793func (t *WebsocketTracker) AddConnection(ip string) bool {
794 t.mu.Lock()
795 defer t.mu.Unlock()
796
797 count := t.connections[ip]
798
799 if count >= t.maxConnsPerIP {
800 return false
801 }
802
803 t.connections[ip] = count + 1
804 return true
805}
806
807func (t *WebsocketTracker) RemoveConnection(ip string) {
808 t.mu.Lock()
809 defer t.mu.Unlock()
810
811 count := t.connections[ip]
812 if count > 0 {
813 t.connections[ip] = count - 1
814 }
815
816 if t.connections[ip] == 0 {
817 delete(t.connections, ip)
818 }
819}