Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/google/uuid"
22 "github.com/julienschmidt/httprouter"
23 "github.com/rs/cors"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/time/rate"
26
27 "github.com/streamplace/oatproxy/pkg/oatproxy"
28 "stream.place/streamplace/js/app"
29 "stream.place/streamplace/pkg/atproto"
30 "stream.place/streamplace/pkg/bus"
31 "stream.place/streamplace/pkg/config"
32 "stream.place/streamplace/pkg/crypto/signers/eip712"
33 "stream.place/streamplace/pkg/director"
34 apierrors "stream.place/streamplace/pkg/errors"
35 "stream.place/streamplace/pkg/linking"
36 "stream.place/streamplace/pkg/log"
37 "stream.place/streamplace/pkg/media"
38 "stream.place/streamplace/pkg/mist/mistconfig"
39 "stream.place/streamplace/pkg/model"
40 "stream.place/streamplace/pkg/notifications"
41 "stream.place/streamplace/pkg/spmetrics"
42 "stream.place/streamplace/pkg/spxrpc"
43 "stream.place/streamplace/pkg/streamplace"
44)
45
46type StreamplaceAPI struct {
47 CLI *config.CLI
48 Model model.Model
49 Updater *Updater
50 Signer *eip712.EIP712Signer
51 Mimes map[string]string
52 FirebaseNotifier notifications.FirebaseNotifier
53 MediaManager *media.MediaManager
54 MediaSigner media.MediaSigner
55 // not thread-safe yet
56 Aliases map[string]string
57 Bus *bus.Bus
58 ATSync *atproto.ATProtoSynchronizer
59 Director *director.Director
60
61 connTracker *WebsocketTracker
62
63 limiters map[string]*rate.Limiter
64 limitersMu sync.Mutex
65 SignerCache map[string]media.MediaSigner
66 SignerCacheMu sync.Mutex
67 op *oatproxy.OATProxy
68}
69
70type WebsocketTracker struct {
71 connections map[string]int
72 maxConnsPerIP int
73 mu sync.RWMutex
74}
75
76func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) {
77 updater, err := PrepareUpdater(cli)
78 if err != nil {
79 return nil, err
80 }
81 a := &StreamplaceAPI{CLI: cli,
82 Model: mod,
83 Updater: updater,
84 Signer: signer,
85 FirebaseNotifier: noter,
86 MediaManager: mm,
87 MediaSigner: ms,
88 Aliases: map[string]string{},
89 Bus: bus,
90 ATSync: atsync,
91 Director: d,
92 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
93 limiters: make(map[string]*rate.Limiter),
94 SignerCache: make(map[string]media.MediaSigner),
95 op: op,
96 }
97 a.Mimes, err = updater.GetMimes()
98 if err != nil {
99 return nil, err
100 }
101 return a, nil
102}
103
104type AppHostingFS struct {
105 http.FileSystem
106}
107
108var ErrorIndex = errors.New("not found, use index.html")
109
110func (fs AppHostingFS) Open(name string) (http.File, error) {
111 file, err1 := fs.FileSystem.Open(name)
112 if err1 == nil {
113 return file, nil
114 }
115 if !errors.Is(err1, os.ErrNotExist) {
116 return nil, err1
117 }
118
119 return nil, ErrorIndex
120}
121
122// api/playback/iame.li/webrtc?rendition=source
123// api/playback/iame.li/stream.mp4?rendition=source
124// api/playback/iame.li/stream.webm?rendition=source
125// api/playback/iame.li/hls/index.m3u8
126// api/playback/iame.li/hls/source/stream.m3u8
127// api/playback/iame.li/hls/source/000000000000.ts
128
129func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
130
131 var xrpc http.Handler
132 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.op)
133 if err != nil {
134 return nil, err
135 }
136 router := httprouter.New()
137
138 router.Handler("GET", "/oauth/*anything", a.op.Handler())
139 router.Handler("POST", "/oauth/*anything", a.op.Handler())
140 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
141 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
142 apiRouter := httprouter.New()
143 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
144 // old clients
145 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
146
147 // new ones
148 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
149 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
150 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
151 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
152 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
153 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
154 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
155 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
156 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
157 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
158 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
159 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
160 // this one is not a lie
161 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
162 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
163 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
164 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
165 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
166 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
167 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
168 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
169 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
170 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
171 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
172 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
173 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
174 apiRouter.NotFound = a.HandleAPI404(ctx)
175 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
176 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
177 router.Handler("GET", "/api/*resource", apiRouterHandler)
178 router.Handler("POST", "/api/*resource", apiRouterHandler)
179 router.Handler("PUT", "/api/*resource", apiRouterHandler)
180 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
181 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
182 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
183 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
184 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
185 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
186 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
187 router.GET("/.well-known/did.json", a.HandleDidJSON(ctx))
188 router.GET("/dl/*params", a.HandleAppDownload(ctx))
189 router.POST("/", a.HandleWebRTCIngest(ctx))
190 for _, redirect := range a.CLI.Redirects {
191 parts := strings.Split(redirect, ":")
192 if len(parts) != 2 {
193 log.Error(ctx, "invalid redirect", "redirect", redirect)
194 return nil, fmt.Errorf("invalid redirect: %s", redirect)
195 }
196 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
197 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
198 })
199 }
200 if a.CLI.FrontendProxy != "" {
201 u, err := url.Parse(a.CLI.FrontendProxy)
202 if err != nil {
203 return nil, err
204 }
205 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
206 router.NotFound = &httputil.ReverseProxy{
207 Rewrite: func(r *httputil.ProxyRequest) {
208 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
209 // we need to use 127.0.0.1 because the atproto oauth client requires it
210 r.Out.Header.Set("Origin", u.String())
211 r.SetURL(u)
212 },
213 }
214 } else {
215 files, err := app.Files()
216 if err != nil {
217 return nil, err
218 }
219 index, err := files.Open("index.html")
220 if err != nil {
221 return nil, err
222 }
223 bs, err := io.ReadAll(index)
224 if err != nil {
225 return nil, err
226 }
227 linker, err := linking.NewLinker(ctx, bs)
228 if err != nil {
229 return nil, err
230 }
231 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
232 if err != nil {
233 return nil, err
234 }
235 router.NotFound = linkingHandler
236 }
237 // needed because the WebRTC handler issues 405s from / otherwise
238 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
239 router.NotFound.ServeHTTP(w, r)
240 })
241 handler := sloghttp.Recovery(router)
242 handler = cors.AllowAll().Handler(handler)
243 handler = sloghttp.New(slog.Default())(handler)
244 handler = a.RateLimitMiddleware(ctx)(handler)
245
246 // this needs to be LAST so nothing else clobbers the context
247 handler = a.ContextMiddleware(ctx)(handler)
248
249 return handler, nil
250}
251func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler {
252 return func(next http.Handler) http.Handler {
253 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
254 uuid := uuid.New().String()
255 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
256 r = r.WithContext(ctx)
257 next.ServeHTTP(w, r)
258 })
259 }
260}
261func copyHeader(dst, src http.Header) {
262 for k, vv := range src {
263 // we'll handle CORS ourselves, thanks
264 if strings.HasPrefix(k, "Access-Control") {
265 continue
266 }
267 for _, v := range vv {
268 dst.Add(k, v)
269 }
270 }
271}
272
273// handler that takes care of static files and otherwise returns the index.html with the correct link card data
274func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
275 files, err := app.Files()
276 if err != nil {
277 return nil, err
278 }
279 fs := AppHostingFS{http.FS(files)}
280
281 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
282 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
283 f := strings.TrimPrefix(req.URL.Path, "/")
284 // under docs we need the index.html suffix due to astro rendering
285 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
286 f += "index.html"
287 }
288 _, err := fs.Open(f)
289 if err == nil {
290 fileHandler.ServeHTTP(w, req)
291 return
292 }
293 if errors.Is(err, ErrorIndex) || f == "" {
294 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
295 if err != nil {
296 log.Error(ctx, "error generating default card", "error", err)
297 }
298 w.Header().Set("Content-Type", "text/html")
299 if _, err := w.Write(bs); err != nil {
300 log.Error(ctx, "error writing response", "error", err)
301 }
302 } else {
303 log.Warn(ctx, "error opening file", "error", err)
304 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
305 }
306 })
307 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
308 proto := "http"
309 if req.TLS != nil {
310 proto = "https"
311 }
312 fwProto := req.Header.Get("x-forwarded-proto")
313 if fwProto != "" {
314 proto = fwProto
315 }
316 req.URL.Host = req.Host
317 req.URL.Scheme = proto
318 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
319 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
320 if err != nil || repo == nil {
321 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
322 defaultHandler.ServeHTTP(w, req)
323 return
324 }
325 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
326 if err != nil || ls == nil {
327 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
328 defaultHandler.ServeHTTP(w, req)
329 return
330 }
331 lsv, err := ls.ToLivestreamView()
332 if err != nil || lsv == nil {
333 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
334 defaultHandler.ServeHTTP(w, req)
335 return
336 }
337 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
338 if err != nil {
339 log.Error(ctx, "error generating html", "error", err)
340 defaultHandler.ServeHTTP(w, req)
341 return
342 }
343 w.Header().Set("Content-Type", "text/html")
344 if _, err := w.Write(bs); err != nil {
345 log.Error(ctx, "error writing response", "error", err)
346 }
347 }), nil
348}
349
350func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
351 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
352 if !a.CLI.HasMist() {
353 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
354 return
355 }
356 stream := params.ByName("stream")
357 if stream == "" {
358 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
359 return
360 }
361
362 fullstream := fmt.Sprintf("%s+%s", mistconfig.StreamName, stream)
363 prefix := fmt.Sprintf(tmpl, fullstream)
364 resource := params.ByName("resource")
365
366 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
367
368 client := &http.Client{}
369 req.URL = &url.URL{
370 Scheme: "http",
371 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
372 Path: fmt.Sprintf("%s%s", prefix, resource),
373 RawQuery: req.URL.RawQuery,
374 }
375
376 //http: Request.RequestURI can't be set in client requests.
377 //http://golang.org/src/pkg/net/http/client.go
378 req.RequestURI = ""
379
380 resp, err := client.Do(req)
381 if err != nil {
382 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
383 return
384 }
385 defer resp.Body.Close()
386
387 copyHeader(w.Header(), resp.Header)
388 w.WriteHeader(resp.StatusCode)
389 if _, err := io.Copy(w, resp.Body); err != nil {
390 log.Error(ctx, "error writing response", "error", err)
391 }
392 }
393}
394
395func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
396 return func(w http.ResponseWriter, req *http.Request) {
397 noslash := req.URL.Path[1:]
398 ct, ok := a.Mimes[noslash]
399 if ok {
400 w.Header().Set("content-type", ct)
401 }
402 fs.ServeHTTP(w, req)
403 }
404}
405
406func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
407 _, tlsPort, err := net.SplitHostPort(a.CLI.HTTPSAddr)
408 if err != nil {
409 return nil, err
410 }
411 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
412 host, _, err := net.SplitHostPort(req.Host)
413 if err != nil {
414 host = req.Host
415 }
416 u := req.URL
417 if tlsPort == "443" {
418 u.Host = host
419 } else {
420 u.Host = net.JoinHostPort(host, tlsPort)
421 }
422 u.Scheme = "https"
423 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
424 }
425 mux := http.NewServeMux()
426 mux.HandleFunc("/", handleRedirect)
427 return mux, nil
428}
429
430type NotificationPayload struct {
431 Token string `json:"token"`
432 RepoDID string `json:"repoDID"`
433}
434
435func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
436 return func(w http.ResponseWriter, req *http.Request) {
437 w.WriteHeader(404)
438 }
439}
440
441func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
442 return func(w http.ResponseWriter, req *http.Request) {
443 payload, err := io.ReadAll(req.Body)
444 if err != nil {
445 log.Log(ctx, "error reading notification create", "error", err)
446 w.WriteHeader(400)
447 return
448 }
449 n := NotificationPayload{}
450 err = json.Unmarshal(payload, &n)
451 if err != nil {
452 log.Log(ctx, "error unmarshalling notification create", "error", err)
453 w.WriteHeader(400)
454 return
455 }
456 err = a.Model.CreateNotification(n.Token, n.RepoDID)
457 if err != nil {
458 log.Log(ctx, "error creating notification", "error", err)
459 w.WriteHeader(400)
460 return
461 }
462 log.Log(ctx, "successfully created notification", "token", n.Token)
463 w.WriteHeader(200)
464 if n.RepoDID != "" {
465 go func() {
466 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
467 if err != nil {
468 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
469 }
470 }()
471 }
472 }
473}
474
475func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
476 return func(w http.ResponseWriter, req *http.Request) {
477 err := a.MediaManager.ValidateMP4(ctx, req.Body)
478 if err != nil {
479 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
480 return
481 }
482 w.WriteHeader(200)
483 }
484}
485
486func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
487 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
488 var event model.PlayerEventAPI
489 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
490 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
491 return
492 }
493 err := a.Model.CreatePlayerEvent(event)
494 if err != nil {
495 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
496 return
497 }
498 w.WriteHeader(201)
499 }
500}
501
502func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
503 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
504 segs, err := a.Model.MostRecentSegments()
505 if err != nil {
506 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
507 return
508 }
509 bs, err := json.Marshal(segs)
510 if err != nil {
511 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
512 return
513 }
514 w.Header().Add("Content-Type", "application/json")
515 if _, err := w.Write(bs); err != nil {
516 log.Error(ctx, "error writing response", "error", err)
517 }
518 }
519}
520
521func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
522 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
523 user := params.ByName("repoDID")
524 if user == "" {
525 apierrors.WriteHTTPBadRequest(w, "user required", nil)
526 return
527 }
528 user, err := a.NormalizeUser(ctx, user)
529 if err != nil {
530 apierrors.WriteHTTPNotFound(w, "user not found", err)
531 return
532 }
533 seg, err := a.Model.LatestSegmentForUser(user)
534 if err != nil {
535 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
536 return
537 }
538 streamplaceSeg, err := seg.ToStreamplaceSegment()
539 if err != nil {
540 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
541 return
542 }
543 bs, err := json.Marshal(streamplaceSeg)
544 if err != nil {
545 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
546 return
547 }
548 w.Header().Add("Content-Type", "application/json")
549 if _, err := w.Write(bs); err != nil {
550 log.Error(ctx, "error writing response", "error", err)
551 }
552 }
553}
554
555func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
556 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
557 user := params.ByName("user")
558 if user == "" {
559 apierrors.WriteHTTPBadRequest(w, "user required", nil)
560 return
561 }
562 user, err := a.NormalizeUser(ctx, user)
563 if err != nil {
564 apierrors.WriteHTTPNotFound(w, "user not found", err)
565 return
566 }
567 count := spmetrics.GetViewCount(user)
568 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
569 if err != nil {
570 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
571 return
572 }
573 if _, err := w.Write(bs); err != nil {
574 log.Error(ctx, "error writing response", "error", err)
575 }
576 }
577}
578
579func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
580 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
581 log.Log(ctx, "got bluesky notification", "params", params)
582 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
583 if err != nil {
584 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
585 return
586 }
587 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
588 if err != nil {
589 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
590 return
591 }
592 bs, err := json.Marshal(signingKeys)
593 if err != nil {
594 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
595 return
596 }
597 if _, err := w.Write(bs); err != nil {
598 log.Error(ctx, "error writing response", "error", err)
599 }
600 }
601}
602
603type ChatResponse struct {
604 Post *bsky.FeedPost `json:"post"`
605 Repo *model.Repo `json:"repo"`
606 CID string `json:"cid"`
607}
608
609func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
610 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
611 user := params.ByName("repoDID")
612 if user == "" {
613 apierrors.WriteHTTPBadRequest(w, "user required", nil)
614 return
615 }
616 repoDID, err := a.NormalizeUser(ctx, user)
617 if err != nil {
618 apierrors.WriteHTTPNotFound(w, "user not found", err)
619 return
620 }
621 replies, err := a.Model.GetReplies(repoDID)
622 if err != nil {
623 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
624 return
625 }
626 bs, err := json.Marshal(replies)
627 if err != nil {
628 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
629 return
630 }
631 w.Header().Set("Content-Type", "application/json")
632 if _, err := w.Write(bs); err != nil {
633 log.Error(ctx, "error writing response", "error", err)
634 }
635 }
636}
637
638func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
639 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
640 user := params.ByName("repoDID")
641 if user == "" {
642 apierrors.WriteHTTPBadRequest(w, "user required", nil)
643 return
644 }
645 repoDID, err := a.NormalizeUser(ctx, user)
646 if err != nil {
647 apierrors.WriteHTTPNotFound(w, "user not found", err)
648 return
649 }
650 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
651 if err != nil {
652 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
653 return
654 }
655
656 doc, err := livestream.ToLivestreamView()
657 if err != nil {
658 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
659 return
660 }
661
662 if livestream == nil {
663 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
664 return
665 }
666
667 bs, err := json.Marshal(doc)
668 if err != nil {
669 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
670 return
671 }
672 w.Header().Set("Content-Type", "application/json")
673 if _, err := w.Write(bs); err != nil {
674 log.Error(ctx, "error writing response", "error", err)
675 }
676 }
677}
678
679func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
680 return func(next http.Handler) http.Handler {
681 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
682 ip, _, err := net.SplitHostPort(req.RemoteAddr)
683 if err != nil {
684 ip = req.RemoteAddr
685 }
686
687 if a.CLI.RateLimitPerSecond > 0 {
688 limiter := a.getLimiter(ip)
689
690 if !limiter.Allow() {
691 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
692 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
693 return
694 }
695 }
696
697 next.ServeHTTP(w, req)
698 })
699 }
700}
701
702func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
703 handler, err := a.Handler(ctx)
704 if err != nil {
705 return err
706 }
707 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
708 s.Addr = a.CLI.HTTPAddr
709 log.Log(ctx, "http server starting", "addr", s.Addr)
710 return s.ListenAndServe()
711 })
712}
713
714func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
715 handler, err := a.RedirectHandler(ctx)
716 if err != nil {
717 return err
718 }
719 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
720 s.Addr = a.CLI.HTTPAddr
721 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
722 return s.ListenAndServe()
723 })
724}
725
726func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
727 handler, err := a.Handler(ctx)
728 if err != nil {
729 return err
730 }
731 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
732 s.Addr = a.CLI.HTTPSAddr
733 log.Log(ctx, "https server starting",
734 "addr", s.Addr,
735 "certPath", a.CLI.TLSCertPath,
736 "keyPath", a.CLI.TLSKeyPath,
737 )
738 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
739 })
740}
741
742func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
743 ctx, cancel := context.WithCancel(ctx)
744 handler = gziphandler.GzipHandler(handler)
745 server := http.Server{Handler: handler}
746 var serveErr error
747 go func() {
748 serveErr = serve(&server)
749 cancel()
750 }()
751 <-ctx.Done()
752 if serveErr != nil {
753 return fmt.Errorf("error in http server: %w", serveErr)
754 }
755
756 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
757 defer cancel()
758 return server.Shutdown(ctx)
759}
760
761func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
762 return func(w http.ResponseWriter, req *http.Request) {
763 w.WriteHeader(200)
764 }
765}
766
767func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
768 a.limitersMu.Lock()
769 defer a.limitersMu.Unlock()
770
771 limiter, exists := a.limiters[ip]
772 if !exists {
773 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
774 a.limiters[ip] = limiter
775 }
776
777 return limiter
778}
779
780func NewWebsocketTracker(maxConns int) *WebsocketTracker {
781 return &WebsocketTracker{
782 connections: make(map[string]int),
783 maxConnsPerIP: maxConns,
784 }
785}
786
787func (t *WebsocketTracker) AddConnection(ip string) bool {
788 t.mu.Lock()
789 defer t.mu.Unlock()
790
791 count := t.connections[ip]
792
793 if count >= t.maxConnsPerIP {
794 return false
795 }
796
797 t.connections[ip] = count + 1
798 return true
799}
800
801func (t *WebsocketTracker) RemoveConnection(ip string) {
802 t.mu.Lock()
803 defer t.mu.Unlock()
804
805 count := t.connections[ip]
806 if count > 0 {
807 t.connections[ip] = count - 1
808 }
809
810 if t.connections[ip] == 0 {
811 delete(t.connections, ip)
812 }
813}