Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "net"
11 "net/http"
12 "net/http/httputil"
13 "net/url"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/NYTimes/gziphandler"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/julienschmidt/httprouter"
22 "github.com/rs/cors"
23 sloghttp "github.com/samber/slog-http"
24 "golang.org/x/time/rate"
25
26 "stream.place/streamplace/js/app"
27 "stream.place/streamplace/pkg/atproto"
28 "stream.place/streamplace/pkg/bus"
29 "stream.place/streamplace/pkg/config"
30 "stream.place/streamplace/pkg/crypto/signers/eip712"
31 "stream.place/streamplace/pkg/director"
32 apierrors "stream.place/streamplace/pkg/errors"
33 "stream.place/streamplace/pkg/linking"
34 "stream.place/streamplace/pkg/log"
35 "stream.place/streamplace/pkg/media"
36 "stream.place/streamplace/pkg/mist/mistconfig"
37 "stream.place/streamplace/pkg/model"
38 "stream.place/streamplace/pkg/notifications"
39 "stream.place/streamplace/pkg/oproxy"
40 "stream.place/streamplace/pkg/spmetrics"
41 "stream.place/streamplace/pkg/spxrpc"
42 "stream.place/streamplace/pkg/streamplace"
43)
44
45type StreamplaceAPI struct {
46 CLI *config.CLI
47 Model model.Model
48 Updater *Updater
49 Signer *eip712.EIP712Signer
50 Mimes map[string]string
51 FirebaseNotifier notifications.FirebaseNotifier
52 MediaManager *media.MediaManager
53 MediaSigner media.MediaSigner
54 // not thread-safe yet
55 Aliases map[string]string
56 Bus *bus.Bus
57 ATSync *atproto.ATProtoSynchronizer
58 Director *director.Director
59
60 connTracker *WebsocketTracker
61
62 limiters map[string]*rate.Limiter
63 limitersMu sync.Mutex
64}
65
66type WebsocketTracker struct {
67 connections map[string]int
68 maxConnsPerIP int
69 mu sync.RWMutex
70}
71
72func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director) (*StreamplaceAPI, error) {
73 updater, err := PrepareUpdater(cli)
74 if err != nil {
75 return nil, err
76 }
77 a := &StreamplaceAPI{CLI: cli,
78 Model: mod,
79 Updater: updater,
80 Signer: signer,
81 FirebaseNotifier: noter,
82 MediaManager: mm,
83 MediaSigner: ms,
84 Aliases: map[string]string{},
85 Bus: bus,
86 ATSync: atsync,
87 Director: d,
88 connTracker: NewWebsocketTracker(cli.RateLimitWebsocket),
89 limiters: make(map[string]*rate.Limiter),
90 }
91 a.Mimes, err = updater.GetMimes()
92 if err != nil {
93 return nil, err
94 }
95 return a, nil
96}
97
98type AppHostingFS struct {
99 http.FileSystem
100}
101
102var ErrorIndex = errors.New("not found, use index.html")
103
104func (fs AppHostingFS) Open(name string) (http.File, error) {
105 file, err1 := fs.FileSystem.Open(name)
106 if err1 == nil {
107 return file, nil
108 }
109 if !errors.Is(err1, os.ErrNotExist) {
110 return nil, err1
111 }
112
113 return nil, ErrorIndex
114}
115
116// api/playback/iame.li/webrtc?rendition=source
117// api/playback/iame.li/stream.mp4?rendition=source
118// api/playback/iame.li/stream.webm?rendition=source
119// api/playback/iame.li/hls/index.m3u8
120// api/playback/iame.li/hls/source/stream.m3u8
121// api/playback/iame.li/hls/source/000000000000.ts
122
123func (a *StreamplaceAPI) Handler(ctx context.Context) (http.Handler, error) {
124 var xrpc http.Handler
125 xrpc, err := spxrpc.NewServer(a.CLI, a.Model)
126 if err != nil {
127 return nil, err
128 }
129
130 clientMetadata := &oproxy.OAuthClientMetadata{
131 ClientName: "Streamplace",
132 ClientURI: "https://stream.place",
133 LogoURI: "https://stream.place/logo.png",
134 RedirectURIs: []string{"https://fairway.iameli.link/login", "https://fairway.iameli.link/api/app-return"},
135 }
136
137 // RedirectURIs: []string{"http://127.0.0.1:38080/login", "https://%s/api/app-return"},
138 op := oproxy.New(&oproxy.Config{
139 Host: a.CLI.PublicHost,
140 CreateOAuthSession: a.Model.CreateOAuthSession,
141 UpdateOAuthSession: a.Model.UpdateOAuthSession,
142 LoadOAuthSession: a.Model.LoadOAuthSession,
143 Scope: "atproto transition:generic",
144 UpstreamJWK: a.CLI.JWK,
145 DownstreamJWK: a.CLI.AccessJWK,
146 ClientMetadata: clientMetadata,
147 AdditionalRedirectURIs: []string{"http://127.0.0.1:38080/login"},
148 })
149
150 xrpc = op.OAuthMiddleware(xrpc)
151 router := httprouter.New()
152 router.Handler("GET", "/oauth/*anything", op.Handler())
153 router.Handler("POST", "/oauth/*anything", op.Handler())
154 router.Handler("GET", "/.well-known/oauth-authorization-server", op.Handler())
155 router.Handler("GET", "/.well-known/oauth-protected-resource", op.Handler())
156 apiRouter := httprouter.New()
157 apiRouter.HandlerFunc("POST", "/api/notification", a.HandleNotification(ctx))
158 // old clients
159 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx))
160
161 // new ones
162 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx))
163 apiRouter.GET("/api/desktop-updates/:platform/:architecture/:version/:buildTime/:file", a.HandleDesktopUpdates(ctx))
164 apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
165 apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
166 apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s"))
167 apiRouter.Handler("POST", "/api/segment", a.HandleSegment(ctx))
168 apiRouter.HandlerFunc("GET", "/api/healthz", a.HandleHealthz(ctx))
169 apiRouter.GET("/api/playback/:user/hls/*file", a.HandleHLSPlayback(ctx))
170 apiRouter.GET("/api/playback/:user/stream.mp4", a.HandleMP4Playback(ctx))
171 apiRouter.GET("/api/playback/:user/stream.webm", a.HandleMKVPlayback(ctx))
172 // they're, uh, not jpegs. but we used this once and i don't wanna break backwards compatibility
173 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx))
174 // this one is not a lie
175 apiRouter.GET("/api/playback/:user/stream.png", a.HandleThumbnailPlayback(ctx))
176 apiRouter.GET("/api/app-return/*anything", a.HandleAppReturn(ctx))
177 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx))
178 apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx))
179 apiRouter.POST("/api/ingest/webrtc/:key", a.HandleWebRTCIngest(ctx))
180 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx))
181 apiRouter.GET("/api/chat/:repoDID", a.HandleChat(ctx))
182 apiRouter.GET("/api/websocket/:repoDID", a.HandleWebsocket(ctx))
183 apiRouter.GET("/api/livestream/:repoDID", a.HandleLivestream(ctx))
184 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx))
185 apiRouter.GET("/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx))
186 apiRouter.GET("/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx))
187 apiRouter.GET("/api/live-users", a.HandleLiveUsers(ctx))
188 apiRouter.GET("/api/view-count/:user", a.HandleViewCount(ctx))
189 apiRouter.NotFound = a.HandleAPI404(ctx)
190 apiRouterHandler := a.RateLimitMiddleware(ctx)(apiRouter)
191 xrpcHandler := a.RateLimitMiddleware(ctx)(xrpc)
192 router.Handler("GET", "/api/*resource", apiRouterHandler)
193 router.Handler("POST", "/api/*resource", apiRouterHandler)
194 router.Handler("PUT", "/api/*resource", apiRouterHandler)
195 router.Handler("PATCH", "/api/*resource", apiRouterHandler)
196 router.Handler("DELETE", "/api/*resource", apiRouterHandler)
197 router.Handler("GET", "/xrpc/*resource", xrpcHandler)
198 router.Handler("POST", "/xrpc/*resource", xrpcHandler)
199 router.Handler("PUT", "/xrpc/*resource", xrpcHandler)
200 router.Handler("PATCH", "/xrpc/*resource", xrpcHandler)
201 router.Handler("DELETE", "/xrpc/*resource", xrpcHandler)
202 router.GET("/.well-known/did.json", a.HandleDidJson(ctx))
203 router.GET("/dl/*params", a.HandleAppDownload(ctx))
204 router.POST("/", a.HandleWebRTCIngest(ctx))
205 for _, redirect := range a.CLI.Redirects {
206 parts := strings.Split(redirect, ":")
207 if len(parts) != 2 {
208 log.Error(ctx, "invalid redirect", "redirect", redirect)
209 return nil, fmt.Errorf("invalid redirect: %s", redirect)
210 }
211 router.Handle("GET", parts[0], func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
212 http.Redirect(w, r, parts[1], http.StatusTemporaryRedirect)
213 })
214 }
215 if a.CLI.FrontendProxy != "" {
216 u, err := url.Parse(a.CLI.FrontendProxy)
217 if err != nil {
218 return nil, err
219 }
220 log.Warn(ctx, "using frontend proxy instead of bundled frontend", "destination", a.CLI.FrontendProxy)
221 router.NotFound = &httputil.ReverseProxy{
222 Rewrite: func(r *httputil.ProxyRequest) {
223 // workaround for Expo disliking serving requests from 127.0.0.1 instead of localhost
224 // we need to use 127.0.0.1 because the atproto oauth client requires it
225 r.Out.Header.Set("Origin", u.String())
226 r.SetURL(u)
227 },
228 }
229 } else {
230 files, err := app.Files()
231 if err != nil {
232 return nil, err
233 }
234 index, err := files.Open("index.html")
235 if err != nil {
236 return nil, err
237 }
238 bs, err := io.ReadAll(index)
239 if err != nil {
240 return nil, err
241 }
242 linker, err := linking.NewLinker(ctx, bs)
243 if err != nil {
244 return nil, err
245 }
246 linkingHandler, err := a.NotFoundLinkingHandler(ctx, linker)
247 if err != nil {
248 return nil, err
249 }
250 router.NotFound = linkingHandler
251 }
252 // needed because the WebRTC handler issues 405s from / otherwise
253 router.GET("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
254 router.NotFound.ServeHTTP(w, r)
255 })
256 handler := sloghttp.Recovery(router)
257 handler = cors.AllowAll().Handler(handler)
258 handler = sloghttp.New(slog.Default())(handler)
259 handler = a.RateLimitMiddleware(ctx)(handler)
260
261 return handler, nil
262}
263
264func copyHeader(dst, src http.Header) {
265 for k, vv := range src {
266 // we'll handle CORS ourselves, thanks
267 if strings.HasPrefix(k, "Access-Control") {
268 continue
269 }
270 for _, v := range vv {
271 dst.Add(k, v)
272 }
273 }
274}
275
276// handler that takes care of static files and otherwise returns the index.html with the correct link card data
277func (a *StreamplaceAPI) NotFoundLinkingHandler(ctx context.Context, linker *linking.Linker) (http.HandlerFunc, error) {
278 files, err := app.Files()
279 if err != nil {
280 return nil, err
281 }
282 fs := AppHostingFS{http.FS(files)}
283
284 fileHandler := a.FileHandler(ctx, http.FileServer(fs))
285 defaultHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
286 f := strings.TrimPrefix(req.URL.Path, "/")
287 // under docs we need the index.html suffix due to astro rendering
288 if strings.HasPrefix(req.URL.Path, "/docs") && strings.HasSuffix(req.URL.Path, "/") {
289 f += "index.html"
290 }
291 _, err := fs.Open(f)
292 if err == nil {
293 fileHandler.ServeHTTP(w, req)
294 return
295 }
296 if errors.Is(err, ErrorIndex) || f == "" {
297 bs, err := linker.GenerateDefaultCard(ctx, req.URL)
298 if err != nil {
299 log.Error(ctx, "error generating default card", "error", err)
300 }
301 w.Header().Set("Content-Type", "text/html")
302 w.Write(bs)
303 } else {
304 log.Warn(ctx, "error opening file", "error", err)
305 apierrors.WriteHTTPInternalServerError(w, "file not found", err)
306 }
307 })
308 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
309 proto := "http"
310 if req.TLS != nil {
311 proto = "https"
312 }
313 fwProto := req.Header.Get("x-forwarded-proto")
314 if fwProto != "" {
315 proto = fwProto
316 }
317 req.URL.Host = req.Host
318 req.URL.Scheme = proto
319 maybeHandle := strings.TrimPrefix(req.URL.Path, "/")
320 repo, err := a.Model.GetRepoByHandleOrDID(maybeHandle)
321 if err != nil || repo == nil {
322 log.Error(ctx, "no repo found", "maybeHandle", maybeHandle)
323 defaultHandler.ServeHTTP(w, req)
324 return
325 }
326 ls, err := a.Model.GetLatestLivestreamForRepo(repo.DID)
327 if err != nil || ls == nil {
328 log.Error(ctx, "no livestream found", "repoDID", repo.DID)
329 defaultHandler.ServeHTTP(w, req)
330 return
331 }
332 lsv, err := ls.ToLivestreamView()
333 if err != nil || lsv == nil {
334 log.Error(ctx, "no livestream view found", "repoDID", repo.DID)
335 defaultHandler.ServeHTTP(w, req)
336 return
337 }
338 bs, err := linker.GenerateStreamerCard(ctx, req.URL, lsv)
339 if err != nil {
340 log.Error(ctx, "error generating html", "error", err)
341 defaultHandler.ServeHTTP(w, req)
342 return
343 }
344 w.Header().Set("Content-Type", "text/html")
345 w.Write(bs)
346 }), nil
347}
348
349func (a *StreamplaceAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle {
350 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
351 if !a.CLI.HasMist() {
352 apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil)
353 return
354 }
355 stream := params.ByName("stream")
356 if stream == "" {
357 apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil)
358 return
359 }
360
361 fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream)
362 prefix := fmt.Sprintf(tmpl, fullstream)
363 resource := params.ByName("resource")
364
365 // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api")
366
367 client := &http.Client{}
368 req.URL = &url.URL{
369 Scheme: "http",
370 Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort),
371 Path: fmt.Sprintf("%s%s", prefix, resource),
372 RawQuery: req.URL.RawQuery,
373 }
374
375 //http: Request.RequestURI can't be set in client requests.
376 //http://golang.org/src/pkg/net/http/client.go
377 req.RequestURI = ""
378
379 resp, err := client.Do(req)
380 if err != nil {
381 apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err)
382 return
383 }
384 defer resp.Body.Close()
385
386 copyHeader(w.Header(), resp.Header)
387 w.WriteHeader(resp.StatusCode)
388 io.Copy(w, resp.Body)
389 }
390}
391
392func (a *StreamplaceAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
393 return func(w http.ResponseWriter, req *http.Request) {
394 noslash := req.URL.Path[1:]
395 ct, ok := a.Mimes[noslash]
396 if ok {
397 w.Header().Set("content-type", ct)
398 }
399 fs.ServeHTTP(w, req)
400 }
401}
402
403func (a *StreamplaceAPI) RedirectHandler(ctx context.Context) (http.Handler, error) {
404 _, tlsPort, err := net.SplitHostPort(a.CLI.HttpsAddr)
405 if err != nil {
406 return nil, err
407 }
408 handleRedirect := func(w http.ResponseWriter, req *http.Request) {
409 host, _, err := net.SplitHostPort(req.Host)
410 if err != nil {
411 host = req.Host
412 }
413 u := req.URL
414 if tlsPort == "443" {
415 u.Host = host
416 } else {
417 u.Host = net.JoinHostPort(host, tlsPort)
418 }
419 u.Scheme = "https"
420 http.Redirect(w, req, u.String(), http.StatusTemporaryRedirect)
421 }
422 mux := http.NewServeMux()
423 mux.HandleFunc("/", handleRedirect)
424 return mux, nil
425}
426
427type NotificationPayload struct {
428 Token string `json:"token"`
429 RepoDID string `json:"repoDID"`
430}
431
432func (a *StreamplaceAPI) HandleAPI404(ctx context.Context) http.HandlerFunc {
433 return func(w http.ResponseWriter, req *http.Request) {
434 w.WriteHeader(404)
435 }
436}
437
438func (a *StreamplaceAPI) HandleNotification(ctx context.Context) http.HandlerFunc {
439 return func(w http.ResponseWriter, req *http.Request) {
440 payload, err := io.ReadAll(req.Body)
441 if err != nil {
442 log.Log(ctx, "error reading notification create", "error", err)
443 w.WriteHeader(400)
444 return
445 }
446 n := NotificationPayload{}
447 err = json.Unmarshal(payload, &n)
448 if err != nil {
449 log.Log(ctx, "error unmarshalling notification create", "error", err)
450 w.WriteHeader(400)
451 return
452 }
453 err = a.Model.CreateNotification(n.Token, n.RepoDID)
454 if err != nil {
455 log.Log(ctx, "error creating notification", "error", err)
456 w.WriteHeader(400)
457 return
458 }
459 log.Log(ctx, "successfully created notification", "token", n.Token)
460 w.WriteHeader(200)
461 if n.RepoDID != "" {
462 go func() {
463 _, err := a.ATSync.SyncBlueskyRepo(ctx, n.RepoDID, a.Model)
464 if err != nil {
465 log.Error(ctx, "error syncing bluesky repo after notification creation", "error", err)
466 }
467 }()
468 }
469 }
470}
471
472func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc {
473 return func(w http.ResponseWriter, req *http.Request) {
474 err := a.MediaManager.ValidateMP4(ctx, req.Body)
475 if err != nil {
476 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err)
477 return
478 }
479 w.WriteHeader(200)
480 }
481}
482
483func (a *StreamplaceAPI) HandlePlayerEvent(ctx context.Context) httprouter.Handle {
484 return func(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
485 var event model.PlayerEventAPI
486 if err := json.NewDecoder(req.Body).Decode(&event); err != nil {
487 apierrors.WriteHTTPBadRequest(w, "could not decode JSON body", err)
488 return
489 }
490 err := a.Model.CreatePlayerEvent(event)
491 if err != nil {
492 apierrors.WriteHTTPBadRequest(w, "could not create player event", err)
493 return
494 }
495 w.WriteHeader(201)
496 }
497}
498
499func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle {
500 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
501 segs, err := a.Model.MostRecentSegments()
502 if err != nil {
503 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
504 return
505 }
506 bs, err := json.Marshal(segs)
507 if err != nil {
508 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
509 return
510 }
511 w.Header().Add("Content-Type", "application/json")
512 w.Write(bs)
513 }
514}
515
516func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle {
517 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
518 user := params.ByName("repoDID")
519 if user == "" {
520 apierrors.WriteHTTPBadRequest(w, "user required", nil)
521 return
522 }
523 user, err := a.NormalizeUser(ctx, user)
524 if err != nil {
525 apierrors.WriteHTTPNotFound(w, "user not found", err)
526 return
527 }
528 seg, err := a.Model.LatestSegmentForUser(user)
529 if err != nil {
530 apierrors.WriteHTTPInternalServerError(w, "could not get segments", err)
531 return
532 }
533 streamplaceSeg, err := seg.ToStreamplaceSegment()
534 if err != nil {
535 apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err)
536 return
537 }
538 bs, err := json.Marshal(streamplaceSeg)
539 if err != nil {
540 apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err)
541 return
542 }
543 w.Header().Add("Content-Type", "application/json")
544 w.Write(bs)
545 }
546}
547
548type LiveUsersResponse struct {
549 model.Segment
550 Viewers int `json:"viewers"`
551}
552
553func (a *StreamplaceAPI) HandleLiveUsers(ctx context.Context) httprouter.Handle {
554 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
555 repos, err := a.Model.MostRecentSegments()
556 if err != nil {
557 apierrors.WriteHTTPInternalServerError(w, "could not get live users", err)
558 return
559 }
560 liveUsers := []LiveUsersResponse{}
561 for _, repo := range repos {
562 viewers := spmetrics.GetViewCount(repo.RepoDID)
563 liveUsers = append(liveUsers, LiveUsersResponse{
564 Segment: repo,
565 Viewers: viewers,
566 })
567 }
568 bs, err := json.Marshal(liveUsers)
569 if err != nil {
570 apierrors.WriteHTTPInternalServerError(w, "could not marshal live users", err)
571 return
572 }
573 w.Write(bs)
574 }
575}
576
577func (a *StreamplaceAPI) HandleViewCount(ctx context.Context) httprouter.Handle {
578 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
579 user := params.ByName("user")
580 if user == "" {
581 apierrors.WriteHTTPBadRequest(w, "user required", nil)
582 return
583 }
584 user, err := a.NormalizeUser(ctx, user)
585 if err != nil {
586 apierrors.WriteHTTPNotFound(w, "user not found", err)
587 return
588 }
589 count := spmetrics.GetViewCount(user)
590 bs, err := json.Marshal(streamplace.Livestream_ViewerCount{Count: int64(count), LexiconTypeID: "place.stream.livestream#viewerCount"})
591 if err != nil {
592 apierrors.WriteHTTPInternalServerError(w, "could not marshal view count", err)
593 return
594 }
595 w.Write(bs)
596 }
597}
598
599func (a *StreamplaceAPI) HandleBlueskyResolve(ctx context.Context) httprouter.Handle {
600 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
601 log.Log(ctx, "got bluesky notification", "params", params)
602 key, err := a.ATSync.SyncBlueskyRepo(ctx, params.ByName("handle"), a.Model)
603 if err != nil {
604 apierrors.WriteHTTPInternalServerError(w, "could not resolve streamplace key", err)
605 return
606 }
607 signingKeys, err := a.Model.GetSigningKeysForRepo(key.DID)
608 if err != nil {
609 apierrors.WriteHTTPInternalServerError(w, "could not get signing keys", err)
610 return
611 }
612 bs, err := json.Marshal(signingKeys)
613 if err != nil {
614 apierrors.WriteHTTPInternalServerError(w, "could not marshal signing keys", err)
615 return
616 }
617 w.Write(bs)
618 }
619}
620
621type ChatResponse struct {
622 Post *bsky.FeedPost `json:"post"`
623 Repo *model.Repo `json:"repo"`
624 CID string `json:"cid"`
625}
626
627func (a *StreamplaceAPI) HandleChat(ctx context.Context) httprouter.Handle {
628 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
629 user := params.ByName("repoDID")
630 if user == "" {
631 apierrors.WriteHTTPBadRequest(w, "user required", nil)
632 return
633 }
634 repoDID, err := a.NormalizeUser(ctx, user)
635 if err != nil {
636 apierrors.WriteHTTPNotFound(w, "user not found", err)
637 return
638 }
639 replies, err := a.Model.GetReplies(repoDID)
640 if err != nil {
641 apierrors.WriteHTTPInternalServerError(w, "could not get replies", err)
642 return
643 }
644 bs, err := json.Marshal(replies)
645 if err != nil {
646 apierrors.WriteHTTPInternalServerError(w, "could not marshal replies", err)
647 return
648 }
649 w.Header().Set("Content-Type", "application/json")
650 w.Write(bs)
651 }
652}
653
654func (a *StreamplaceAPI) HandleLivestream(ctx context.Context) httprouter.Handle {
655 return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
656 user := params.ByName("repoDID")
657 if user == "" {
658 apierrors.WriteHTTPBadRequest(w, "user required", nil)
659 return
660 }
661 repoDID, err := a.NormalizeUser(ctx, user)
662 if err != nil {
663 apierrors.WriteHTTPNotFound(w, "user not found", err)
664 return
665 }
666 livestream, err := a.Model.GetLatestLivestreamForRepo(repoDID)
667 if err != nil {
668 apierrors.WriteHTTPInternalServerError(w, "could not get livestream", err)
669 return
670 }
671
672 doc, err := livestream.ToLivestreamView()
673 if err != nil {
674 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
675 return
676 }
677
678 if livestream == nil {
679 apierrors.WriteHTTPNotFound(w, "no livestream found", nil)
680 return
681 }
682
683 bs, err := json.Marshal(doc)
684 if err != nil {
685 apierrors.WriteHTTPInternalServerError(w, "could not marshal livestream", err)
686 return
687 }
688 w.Header().Set("Content-Type", "application/json")
689 w.Write(bs)
690 }
691}
692
693func (a *StreamplaceAPI) RateLimitMiddleware(ctx context.Context) func(http.Handler) http.Handler {
694 return func(next http.Handler) http.Handler {
695 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
696 ip, _, err := net.SplitHostPort(req.RemoteAddr)
697 if err != nil {
698 ip = req.RemoteAddr
699 }
700
701 if a.CLI.RateLimitPerSecond > 0 {
702 limiter := a.getLimiter(ip)
703
704 if !limiter.Allow() {
705 log.Warn(ctx, "rate limit exceeded", "ip", ip, "path", req.URL.Path)
706 apierrors.WriteHTTPTooManyRequests(w, "rate limit exceeded")
707 return
708 }
709 }
710
711 next.ServeHTTP(w, req)
712 })
713 }
714}
715
716func (a *StreamplaceAPI) ServeHTTP(ctx context.Context) error {
717 handler, err := a.Handler(ctx)
718 if err != nil {
719 return err
720 }
721 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
722 s.Addr = a.CLI.HttpAddr
723 log.Log(ctx, "http server starting", "addr", s.Addr)
724 return s.ListenAndServe()
725 })
726}
727
728func (a *StreamplaceAPI) ServeHTTPRedirect(ctx context.Context) error {
729 handler, err := a.RedirectHandler(ctx)
730 if err != nil {
731 return err
732 }
733 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
734 s.Addr = a.CLI.HttpAddr
735 log.Log(ctx, "http tls redirecct server starting", "addr", s.Addr)
736 return s.ListenAndServe()
737 })
738}
739
740func (a *StreamplaceAPI) ServeHTTPS(ctx context.Context) error {
741 handler, err := a.Handler(ctx)
742 if err != nil {
743 return err
744 }
745 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
746 s.Addr = a.CLI.HttpsAddr
747 log.Log(ctx, "https server starting",
748 "addr", s.Addr,
749 "certPath", a.CLI.TLSCertPath,
750 "keyPath", a.CLI.TLSKeyPath,
751 )
752 return s.ListenAndServeTLS(a.CLI.TLSCertPath, a.CLI.TLSKeyPath)
753 })
754}
755
756func (a *StreamplaceAPI) ServerWithShutdown(ctx context.Context, handler http.Handler, serve func(*http.Server) error) error {
757 ctx, cancel := context.WithCancel(ctx)
758 handler = gziphandler.GzipHandler(handler)
759 server := http.Server{Handler: handler}
760 var serveErr error
761 go func() {
762 serveErr = serve(&server)
763 cancel()
764 }()
765 <-ctx.Done()
766 if serveErr != nil {
767 return fmt.Errorf("error in http server: %w", serveErr)
768 }
769
770 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
771 defer cancel()
772 return server.Shutdown(ctx)
773}
774
775func (a *StreamplaceAPI) HandleHealthz(ctx context.Context) http.HandlerFunc {
776 return func(w http.ResponseWriter, req *http.Request) {
777 w.WriteHeader(200)
778 }
779}
780
781func (a *StreamplaceAPI) getLimiter(ip string) *rate.Limiter {
782 a.limitersMu.Lock()
783 defer a.limitersMu.Unlock()
784
785 limiter, exists := a.limiters[ip]
786 if !exists {
787 limiter = rate.NewLimiter(rate.Limit(a.CLI.RateLimitPerSecond), a.CLI.RateLimitBurst)
788 a.limiters[ip] = limiter
789 }
790
791 return limiter
792}
793
794func NewWebsocketTracker(maxConns int) *WebsocketTracker {
795 return &WebsocketTracker{
796 connections: make(map[string]int),
797 maxConnsPerIP: maxConns,
798 }
799}
800
801func (t *WebsocketTracker) AddConnection(ip string) bool {
802 t.mu.Lock()
803 defer t.mu.Unlock()
804
805 count := t.connections[ip]
806
807 if count >= t.maxConnsPerIP {
808 return false
809 }
810
811 t.connections[ip] = count + 1
812 return true
813}
814
815func (t *WebsocketTracker) RemoveConnection(ip string) {
816 t.mu.Lock()
817 defer t.mu.Unlock()
818
819 count := t.connections[ip]
820 if count > 0 {
821 t.connections[ip] = count - 1
822 }
823
824 if t.connections[ip] == 0 {
825 delete(t.connections, ip)
826 }
827}