Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/google/uuid"
22 "github.com/julienschmidt/httprouter"
23 "github.com/rs/cors"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/time/rate"
26
27 "stream.place/streamplace/js/app"
28 "stream.place/streamplace/pkg/atproto"
29 "stream.place/streamplace/pkg/bus"
30 "stream.place/streamplace/pkg/config"
31 "stream.place/streamplace/pkg/crypto/signers/eip712"
32 "stream.place/streamplace/pkg/director"
33 apierrors "stream.place/streamplace/pkg/errors"
34 "stream.place/streamplace/pkg/linking"
35 "stream.place/streamplace/pkg/log"
36 "stream.place/streamplace/pkg/media"
37 "stream.place/streamplace/pkg/mist/mistconfig"
38 "stream.place/streamplace/pkg/model"
39 "stream.place/streamplace/pkg/notifications"
40 "stream.place/streamplace/pkg/oproxy"
41 "stream.place/streamplace/pkg/spmetrics"
42 "stream.place/streamplace/pkg/spxrpc"
43 "stream.place/streamplace/pkg/streamplace"
44)
45
46type StreamplaceAPI struct {
47 CLI *config.CLI
48 Model model.Model
49 Updater *Updater
50 Signer *eip712.EIP712Signer
51 Mimes map[string]string
52 FirebaseNotifier notifications.FirebaseNotifier
53 MediaManager *media.MediaManager
54 MediaSigner media.MediaSigner
55 // not thread-safe yet
56 Aliases map[string]string
57 Bus *bus.Bus
58 ATSync *atproto.ATProtoSynchronizer
59 Director *director.Director
60
61 connTracker *WebsocketTracker
62
63 limiters map[string]*rate.Limiter
64 limitersMu sync.Mutex
65 SignerCache map[string]media.MediaSigner
66 SignerCacheMu sync.Mutex
67 op *oproxy.OProxy
68}
69
70type WebsocketTracker struct {
71 connections map[string]int
72 maxConnsPerIP int
73 mu sync.RWMutex
74}
75
76func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oproxy.OProxy) (*StreamplaceAPI, error) {
77 updater, err := PrepareUpdater(cli)
78 if err != nil {
79 return nil, err
80 }
81 a := &StreamplaceAPI{CLI: cli,
82 Model: mod,
83 Updater: updater,
84 Signer: signer,
85 FirebaseNotifier: noter,
86 MediaManager: mm,
87 MediaSigner: ms,
88 Aliases: map[string]string{},
89 Bus: bus,
90 ATSync: atsync,
91 Director: d,
92 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
93 limiters: make(map[string]*rate.Limiter),
94 SignerCache: make(map[string]media.MediaSigner),
95 op: op,
96 }
97 a.Mimes, err = updater.GetMimes()
98 if err != nil {
99 return nil, err
100 }
101 return a, nil
102}
103
104type AppHostingFS struct {
105 http.FileSystem
106}
107
108var ErrorIndex = errors.New("not found, use index.html")
109
110func (fs AppHostingFS) Open(name string) (http.File, error) {
111 file, err1 := fs.FileSystem.Open(name)
112 if err1 == nil {
113 return file, nil
114 }
115 if !errors.Is(err1, os.ErrNotExist) {
116 return nil, err1
117 }
118
119 return nil, ErrorIndex
120}
121
122// api/playback/iame.li/webrtc?rendition=source
123// api/playback/iame.li/stream.mp4?rendition=source
124// api/playback/iame.li/stream.webm?rendition=source
125// api/playback/iame.li/hls/index.m3u8
126// api/playback/iame.li/hls/source/stream.m3u8
127// api/playback/iame.li/hls/source/000000000000.ts
128
129func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
130
131 var xrpc http.Handler
132 xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model)
133 if err != nil {
134 return nil, err
135 }
136 xrpc = a.op.OAuthMiddleware(xrpc)
137 router := httprouter.New()
138
139 router.Handler("GET", "/oauth/*anything", a.op.Handler())
140 router.Handler("POST", "/oauth/*anything", a.op.Handler())
141 router.Handler("GET", "/.well-known/oauth-authorization-server", a.op.Handler())
142 router.Handler("GET", "/.well-known/oauth-protected-resource", a.op.Handler())
143 apiRouter := httprouter.New()
144 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
145 // old clients
146 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
147
148 // new ones
149 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
150 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
151 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
152 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
153 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
154 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
155 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
156 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
157 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
158 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
159 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
160 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
161 // this one is not a lie
162 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
163 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
164 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
165 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
166 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
167 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
168 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
169 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
170 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
171 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
172 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
173 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
174 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx))
175 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
176 apiRouter.NotFound = a.HandleAPI404(ctx)
177 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
178 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
179 router.Handler("GET", "/api/*resource", apiRouterHandler)
180 router.Handler("POST", "/api/*resource", apiRouterHandler)
181 router.Handler("PUT", "/api/*resource", apiRouterHandler)
182 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
183 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
184 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
185 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
186 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
187 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
188 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
189 router.GET("/.well-known/did.json", a.HandleDidJson(ctx))
190 router.GET("/dl/*params", a.HandleAppDownload(ctx))
191 router.POST("/", a.HandleWebRTCIngest(ctx))
192 for _, redirect := range a.CLI.Redirects {
193 parts := strings.Split(redirect, ":")
194 if len(parts) != 2 {
195 log.Error(ctx, "invalid redirect", "redirect", redirect)
196 return nil, fmt.Errorf("invalid redirect: %s", redirect)
197 }
198 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
199 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
200 })
201 }
202 if a.CLI.FrontendProxy != "" {
203 u, err := url.Parse(a.CLI.FrontendProxy)
204 if err != nil {
205 return nil, err
206 }
207 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
208 router.NotFound = &httputil.ReverseProxy{
209 Rewrite: func(r *httputil.ProxyRequest) {
210 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
211 // we need to use 127.0.0.1 because the atproto oauth client requires it
212 r.Out.Header.Set("Origin", u.String())
213 r.SetURL(u)
214 },
215 }
216 } else {
217 files, err := app.Files()
218 if err != nil {
219 return nil, err
220 }
221 index, err := files.Open("index.html")
222 if err != nil {
223 return nil, err
224 }
225 bs, err := io.ReadAll(index)
226 if err != nil {
227 return nil, err
228 }
229 linker, err := linking.NewLinker(ctx, bs)
230 if err != nil {
231 return nil, err
232 }
233 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
234 if err != nil {
235 return nil, err
236 }
237 router.NotFound = linkingHandler
238 }
239 // needed because the WebRTC handler issues 405s from / otherwise
240 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
241 router.NotFound.ServeHTTP(w, r)
242 })
243 handler := sloghttp.Recovery(router)
244 handler = cors.AllowAll().Handler(handler)
245 handler = sloghttp.New(slog.Default())(handler)
246 handler = a.RateLimitMiddleware(ctx)(handler)
247
248 // this needs to be LAST so nothing else clobbers the context
249 handler = a.ContextMiddleware(ctx)(handler)
250
251 return handler, nil
252}
253func (a *StreamplaceAPI) ContextMiddleware(ctx context.Context) func(next http.Handler) http.Handler {
254 return func(next http.Handler) http.Handler {
255 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
256 uuid := uuid.New().String()
257 ctx = log.WithLogValues(ctx, "requestID", uuid, "method", r.Method, "path", r.URL.Path)
258 r = r.WithContext(ctx)
259 next.ServeHTTP(w, r)
260 })
261 }
262}
263func copyHeader(dst, src http.Header) {
264 for k, vv := range src {
265 // we'll handle CORS ourselves, thanks
266 if strings.HasPrefix(k, "Access-Control") {
267 continue
268 }
269 for _, v := range vv {
270 dst.Add(k, v)
271 }
272 }
273}
274
275// handler that takes care of static files and otherwise returns the index.html with the correct link card data
276func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
277 files, err := app.Files()
278 if err != nil {
279 return nil, err
280 }
281 fs := AppHostingFS{http.FS(files)}
282
283 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
284 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
285 f := strings.TrimPrefix(req.URL.Path, "/")
286 // under docs we need the index.html suffix due to astro rendering
287 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
288 f += "index.html"
289 }
290 _, err := fs.Open(f)
291 if err == nil {
292 fileHandler.ServeHTTP(w, req)
293 return
294 }
295 if errors.Is(err, ErrorIndex) || f == "" {
296 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
297 if err != nil {
298 log.Error(ctx, "error generating default card", "error", err)
299 }
300 w.Header().Set("Content-Type", "text/html")
301 w.Write(bs)
302 } else {
303 log.Warn(ctx, "error opening file", "error", err)
304 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
305 }
306 })
307 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
308 proto := "http"
309 if req.TLS != nil {
310 proto = "https"
311 }
312 fwProto := req.Header.Get("x-forwarded-proto")
313 if fwProto != "" {
314 proto = fwProto
315 }
316 req.URL.Host = req.Host
317 req.URL.Scheme = proto
318 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
319 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
320 if err != nil || repo == nil {
321 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
322 defaultHandler.ServeHTTP(w, req)
323 return
324 }
325 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
326 if err != nil || ls == nil {
327 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
328 defaultHandler.ServeHTTP(w, req)
329 return
330 }
331 lsv, err := ls.ToLivestreamView()
332 if err != nil || lsv == nil {
333 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
334 defaultHandler.ServeHTTP(w, req)
335 return
336 }
337 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
338 if err != nil {
339 log.Error(ctx, "error generating html", "error", err)
340 defaultHandler.ServeHTTP(w, req)
341 return
342 }
343 w.Header().Set("Content-Type", "text/html")
344 w.Write(bs)
345 }), nil
346}
347
348func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
349 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
350 if !a.CLI.HasMist() {
351 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
352 return
353 }
354 stream := params.ByName("stream")
355 if stream == "" {
356 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
357 return
358 }
359
360 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream)
361 prefix := fmt.Sprintf(tmpl, fullstream)
362 resource := params.ByName("resource")
363
364 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
365
366 client := &http.Client{}
367 req.URL = &url.URL{
368 Scheme: "http",
369 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
370 Path: fmt.Sprintf("%s%s", prefix, resource),
371 RawQuery: req.URL.RawQuery,
372 }
373
374 //http: Request.RequestURI can't be set in client requests.
375 //http://golang.org/src/pkg/net/http/client.go
376 req.RequestURI = ""
377
378 resp, err := client.Do(req)
379 if err != nil {
380 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
381 return
382 }
383 defer resp.Body.Close()
384
385 copyHeader(w.Header(), resp.Header)
386 w.WriteHeader(resp.StatusCode)
387 io.Copy(w, resp.Body)
388 }
389}
390
391func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
392 return func(w http.ResponseWriter, req *http.Request) {
393 noslash := req.URL.Path[1:]
394 ct, ok := a.Mimes[noslash]
395 if ok {
396 w.Header().Set("content-type", ct)
397 }
398 fs.ServeHTTP(w, req)
399 }
400}
401
402func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
403 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr)
404 if err != nil {
405 return nil, err
406 }
407 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
408 host, _, err := net.SplitHostPort(req.Host)
409 if err != nil {
410 host = req.Host
411 }
412 u := req.URL
413 if tlsPort == "443" {
414 u.Host = host
415 } else {
416 u.Host = net.JoinHostPort(host, tlsPort)
417 }
418 u.Scheme = "https"
419 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
420 }
421 mux := http.NewServeMux()
422 mux.HandleFunc("/", handleRedirect)
423 return mux, nil
424}
425
426type NotificationPayload struct {
427 Token string `json:"token"`
428 RepoDID string `json:"repoDID"`
429}
430
431func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
432 return func(w http.ResponseWriter, req *http.Request) {
433 w.WriteHeader(404)
434 }
435}
436
437func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
438 return func(w http.ResponseWriter, req *http.Request) {
439 payload, err := io.ReadAll(req.Body)
440 if err != nil {
441 log.Log(ctx, "error reading notification create", "error", err)
442 w.WriteHeader(400)
443 return
444 }
445 n := NotificationPayload{}
446 err = json.Unmarshal(payload, &n)
447 if err != nil {
448 log.Log(ctx, "error unmarshalling notification create", "error", err)
449 w.WriteHeader(400)
450 return
451 }
452 err = a.Model.CreateNotification(n.Token, n.RepoDID)
453 if err != nil {
454 log.Log(ctx, "error creating notification", "error", err)
455 w.WriteHeader(400)
456 return
457 }
458 log.Log(ctx, "successfully created notification", "token", n.Token)
459 w.WriteHeader(200)
460 if n.RepoDID != "" {
461 go func() {
462 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
463 if err != nil {
464 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
465 }
466 }()
467 }
468 }
469}
470
471func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
472 return func(w http.ResponseWriter, req *http.Request) {
473 err := a.MediaManager.ValidateMP4(ctx, req.Body)
474 if err != nil {
475 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
476 return
477 }
478 w.WriteHeader(200)
479 }
480}
481
482func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
483 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
484 var event model.PlayerEventAPI
485 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
486 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
487 return
488 }
489 err := a.Model.CreatePlayerEvent(event)
490 if err != nil {
491 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
492 return
493 }
494 w.WriteHeader(201)
495 }
496}
497
498func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
499 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
500 segs, err := a.Model.MostRecentSegments()
501 if err != nil {
502 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
503 return
504 }
505 bs, err := json.Marshal(segs)
506 if err != nil {
507 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
508 return
509 }
510 w.Header().Add("Content-Type", "application/json")
511 w.Write(bs)
512 }
513}
514
515func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
516 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
517 user := params.ByName("repoDID")
518 if user == "" {
519 apierrors.WriteHTTPBadRequest(w, "user required", nil)
520 return
521 }
522 user, err := a.NormalizeUser(ctx, user)
523 if err != nil {
524 apierrors.WriteHTTPNotFound(w, "user not found", err)
525 return
526 }
527 seg, err := a.Model.LatestSegmentForUser(user)
528 if err != nil {
529 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
530 return
531 }
532 streamplaceSeg, err := seg.ToStreamplaceSegment()
533 if err != nil {
534 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
535 return
536 }
537 bs, err := json.Marshal(streamplaceSeg)
538 if err != nil {
539 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
540 return
541 }
542 w.Header().Add("Content-Type", "application/json")
543 w.Write(bs)
544 }
545}
546
547type LiveUsersResponse struct {
548 model.Segment
549 Viewers int `json:"viewers"`
550}
551
552func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle {
553 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
554 repos, err := a.Model.MostRecentSegments()
555 if err != nil {
556 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err)
557 return
558 }
559 liveUsers := []LiveUsersResponse{}
560 for _, repo := range repos {
561 viewers := spmetrics.GetViewCount(repo.RepoDID)
562 liveUsers = append(liveUsers, LiveUsersResponse{
563 Segment: repo,
564 Viewers: viewers,
565 })
566 }
567 bs, err := json.Marshal(liveUsers)
568 if err != nil {
569 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err)
570 return
571 }
572 w.Write(bs)
573 }
574}
575
576func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
577 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
578 user := params.ByName("user")
579 if user == "" {
580 apierrors.WriteHTTPBadRequest(w, "user required", nil)
581 return
582 }
583 user, err := a.NormalizeUser(ctx, user)
584 if err != nil {
585 apierrors.WriteHTTPNotFound(w, "user not found", err)
586 return
587 }
588 count := spmetrics.GetViewCount(user)
589 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
590 if err != nil {
591 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
592 return
593 }
594 w.Write(bs)
595 }
596}
597
598func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
599 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
600 log.Log(ctx, "got bluesky notification", "params", params)
601 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
602 if err != nil {
603 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
604 return
605 }
606 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
607 if err != nil {
608 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
609 return
610 }
611 bs, err := json.Marshal(signingKeys)
612 if err != nil {
613 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
614 return
615 }
616 w.Write(bs)
617 }
618}
619
620type ChatResponse struct {
621 Post *bsky.FeedPost `json:"post"`
622 Repo *model.Repo `json:"repo"`
623 CID string `json:"cid"`
624}
625
626func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
627 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
628 user := params.ByName("repoDID")
629 if user == "" {
630 apierrors.WriteHTTPBadRequest(w, "user required", nil)
631 return
632 }
633 repoDID, err := a.NormalizeUser(ctx, user)
634 if err != nil {
635 apierrors.WriteHTTPNotFound(w, "user not found", err)
636 return
637 }
638 replies, err := a.Model.GetReplies(repoDID)
639 if err != nil {
640 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
641 return
642 }
643 bs, err := json.Marshal(replies)
644 if err != nil {
645 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
646 return
647 }
648 w.Header().Set("Content-Type", "application/json")
649 w.Write(bs)
650 }
651}
652
653func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
654 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
655 user := params.ByName("repoDID")
656 if user == "" {
657 apierrors.WriteHTTPBadRequest(w, "user required", nil)
658 return
659 }
660 repoDID, err := a.NormalizeUser(ctx, user)
661 if err != nil {
662 apierrors.WriteHTTPNotFound(w, "user not found", err)
663 return
664 }
665 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
666 if err != nil {
667 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
668 return
669 }
670
671 doc, err := livestream.ToLivestreamView()
672 if err != nil {
673 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
674 return
675 }
676
677 if livestream == nil {
678 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
679 return
680 }
681
682 bs, err := json.Marshal(doc)
683 if err != nil {
684 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
685 return
686 }
687 w.Header().Set("Content-Type", "application/json")
688 w.Write(bs)
689 }
690}
691
692func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
693 return func(next http.Handler) http.Handler {
694 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
695 ip, _, err := net.SplitHostPort(req.RemoteAddr)
696 if err != nil {
697 ip = req.RemoteAddr
698 }
699
700 if a.CLI.RateLimitPerSecond > 0 {
701 limiter := a.getLimiter(ip)
702
703 if !limiter.Allow() {
704 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
705 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
706 return
707 }
708 }
709
710 next.ServeHTTP(w, req)
711 })
712 }
713}
714
715func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
716 handler, err := a.Handler(ctx)
717 if err != nil {
718 return err
719 }
720 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
721 s.Addr = a.CLI.HttpAddr
722 log.Log(ctx, "http server starting", "addr", s.Addr)
723 return s.ListenAndServe()
724 })
725}
726
727func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
728 handler, err := a.RedirectHandler(ctx)
729 if err != nil {
730 return err
731 }
732 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
733 s.Addr = a.CLI.HttpAddr
734 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
735 return s.ListenAndServe()
736 })
737}
738
739func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
740 handler, err := a.Handler(ctx)
741 if err != nil {
742 return err
743 }
744 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
745 s.Addr = a.CLI.HttpsAddr
746 log.Log(ctx, "https server starting",
747 "addr", s.Addr,
748 "certPath", a.CLI.TLSCertPath,
749 "keyPath", a.CLI.TLSKeyPath,
750 )
751 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
752 })
753}
754
755func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
756 ctx, cancel := context.WithCancel(ctx)
757 handler = gziphandler.GzipHandler(handler)
758 server := http.Server{Handler: handler}
759 var serveErr error
760 go func() {
761 serveErr = serve(&server)
762 cancel()
763 }()
764 <-ctx.Done()
765 if serveErr != nil {
766 return fmt.Errorf("error in http server: %w", serveErr)
767 }
768
769 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
770 defer cancel()
771 return server.Shutdown(ctx)
772}
773
774func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
775 return func(w http.ResponseWriter, req *http.Request) {
776 w.WriteHeader(200)
777 }
778}
779
780func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
781 a.limitersMu.Lock()
782 defer a.limitersMu.Unlock()
783
784 limiter, exists := a.limiters[ip]
785 if !exists {
786 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
787 a.limiters[ip] = limiter
788 }
789
790 return limiter
791}
792
793func NewWebsocketTracker(maxConns int) *WebsocketTracker {
794 return &WebsocketTracker{
795 connections: make(map[string]int),
796 maxConnsPerIP: maxConns,
797 }
798}
799
800func (t *WebsocketTracker) AddConnection(ip string) bool {
801 t.mu.Lock()
802 defer t.mu.Unlock()
803
804 count := t.connections[ip]
805
806 if count >= t.maxConnsPerIP {
807 return false
808 }
809
810 t.connections[ip] = count + 1
811 return true
812}
813
814func (t *WebsocketTracker) RemoveConnection(ip string) {
815 t.mu.Lock()
816 defer t.mu.Unlock()
817
818 count := t.connections[ip]
819 if count > 0 {
820 t.connections[ip] = count - 1
821 }
822
823 if t.connections[ip] == 0 {
824 delete(t.connections, ip)
825 }
826}