Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log/slog"
11 "net/http"
12 "net/http/pprof"
13 "os"
14 "path/filepath"
15 "regexp"
16 "runtime"
17 rtpprof "runtime/pprof"
18 "strconv"
19 "strings"
20 "time"
21
22 "github.com/julienschmidt/httprouter"
23 "github.com/pion/webrtc/v4"
24 "github.com/prometheus/client_golang/prometheus/promhttp"
25 sloghttp "github.com/samber/slog-http"
26 "golang.org/x/sync/errgroup"
27 "stream.place/streamplace/pkg/errors"
28 "stream.place/streamplace/pkg/log"
29 "stream.place/streamplace/pkg/media"
30 "stream.place/streamplace/pkg/mist/mistconfig"
31 "stream.place/streamplace/pkg/mist/misttriggers"
32 "stream.place/streamplace/pkg/model"
33 notificationpkg "stream.place/streamplace/pkg/notifications"
34 "stream.place/streamplace/pkg/renditions"
35 "stream.place/streamplace/pkg/rtcrec"
36 v0 "stream.place/streamplace/pkg/schema/v0"
37)
38
39func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
40 handler, err := a.InternalHandler(ctx)
41 if err != nil {
42 return err
43 }
44 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
45 s.Addr = a.CLI.HTTPInternalAddr
46 log.Log(ctx, "http server starting", "addr", s.Addr)
47 return s.ListenAndServe()
48 })
49}
50
51// lightweight way to authenticate push requests to ourself
52var mkvRE *regexp.Regexp
53
54func init() {
55 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
56}
57
58func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
59 router := httprouter.New()
60 broker := misttriggers.NewTriggerBroker()
61
62 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
63 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
64 // Extract the last part of the URL path
65 urlPath := payload.URL.Path
66 parts := strings.Split(urlPath, "/")
67 lastPart := ""
68 if len(parts) > 0 {
69 lastPart = parts[len(parts)-1]
70 }
71 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
72 if err != nil {
73 return "", err
74 }
75
76 ms := time.Now().UnixMilli()
77 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms)
78 a.SignerCacheMu.Lock()
79 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
80 a.SignerCacheMu.Unlock()
81 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
82
83 return out, nil
84 })
85 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
86 router.POST("/mist-trigger", triggerCollection.Trigger())
87 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
88
89 // Add pprof handlers
90 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
91 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
92 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
93 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
94 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
95 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
96 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
97 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
98 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
99 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
100 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
101
102 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
103 runtime.GC()
104 w.WriteHeader(204)
105 })
106
107 router.Handler("GET", "/metrics", promhttp.Handler())
108
109 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
110 user := p.ByName("user")
111 if user == "" {
112 errors.WriteHTTPBadRequest(w, "user required", nil)
113 return
114 }
115 rendition := p.ByName("rendition")
116 if rendition == "" {
117 errors.WriteHTTPBadRequest(w, "rendition required", nil)
118 return
119 }
120 user, err := a.NormalizeUser(ctx, user)
121 if err != nil {
122 errors.WriteHTTPBadRequest(w, "invalid user", err)
123 return
124 }
125 w.Header().Set("content-type", "text/plain")
126 fmt.Fprintf(w, "ffconcat version 1.0\n")
127 // intermittent reports that you need two here to make things work properly? shouldn't matter.
128 for i := 0; i < 2; i += 1 {
129 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
130 }
131 })
132
133 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
134 user := p.ByName("user")
135 if user == "" {
136 errors.WriteHTTPBadRequest(w, "user required", nil)
137 return
138 }
139 user, err := a.NormalizeUser(ctx, user)
140 if err != nil {
141 errors.WriteHTTPBadRequest(w, "invalid user", err)
142 return
143 }
144 rendition := p.ByName("rendition")
145 if rendition == "" {
146 errors.WriteHTTPBadRequest(w, "rendition required", nil)
147 return
148 }
149 segChan := a.Bus.SubscribeSegment(ctx, user, rendition)
150 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan)
151 seg := <-segChan.C
152 base := filepath.Base(seg.Filepath)
153 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
154 w.WriteHeader(301)
155 })
156
157 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
158 user := p.ByName("user")
159 if user == "" {
160 errors.WriteHTTPBadRequest(w, "user required", nil)
161 return
162 }
163 user, err := a.NormalizeUser(ctx, user)
164 if err != nil {
165 errors.WriteHTTPBadRequest(w, "invalid user", err)
166 return
167 }
168 file := p.ByName("file")
169 if file == "" {
170 errors.WriteHTTPBadRequest(w, "file required", nil)
171 return
172 }
173 fullpath, err := a.CLI.SegmentFilePath(user, file)
174 if err != nil {
175 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
176 return
177 }
178 http.ServeFile(w, r, fullpath)
179 })
180
181 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
182 user := p.ByName("user")
183 if user == "" {
184 errors.WriteHTTPBadRequest(w, "user required", nil)
185 return
186 }
187 rendition := p.ByName("rendition")
188 if rendition == "" {
189 errors.WriteHTTPBadRequest(w, "rendition required", nil)
190 return
191 }
192 user, err := a.NormalizeUser(ctx, user)
193 if err != nil {
194 errors.WriteHTTPBadRequest(w, "invalid user", err)
195 return
196 }
197 var delayMS int64 = 1000
198 userDelay := r.URL.Query().Get("delayms")
199 if userDelay != "" {
200 var err error
201 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
202 if err != nil {
203 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
204 return
205 }
206 if delayMS > 10000 {
207 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
208 return
209 }
210 }
211 w.Header().Set("Content-Type", "video/mp4")
212 w.WriteHeader(200)
213 g, ctx := errgroup.WithContext(ctx)
214 pr, pw := io.Pipe()
215 bufw := bufio.NewWriter(pw)
216 g.Go(func() error {
217 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
218 })
219 g.Go(func() error {
220 time.Sleep(time.Duration(delayMS) * time.Millisecond)
221 _, err := io.Copy(w, pr)
222 return err
223 })
224 if err := g.Wait(); err != nil {
225 errors.WriteHTTPBadRequest(w, "request failed", err)
226 }
227 })
228
229 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
230 user := p.ByName("user")
231 if user == "" {
232 errors.WriteHTTPBadRequest(w, "user required", nil)
233 return
234 }
235 w.Header().Set("Content-Type", "video/x-matroska")
236 w.Header().Set("Transfer-Encoding", "chunked")
237 w.WriteHeader(200)
238 })
239
240 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
241 uu := p.ByName("uuid")
242 if uu == "" {
243 errors.WriteHTTPBadRequest(w, "uuid required", nil)
244 return
245 }
246 pr := a.MediaManager.GetHTTPPipeWriter(uu)
247 if pr == nil {
248 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
249 return
250 }
251 if _, err := io.Copy(pr, r.Body); err != nil {
252 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil)
253 }
254 })
255
256 // self-destruct code, useful for dumping goroutines on windows
257 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
258 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
259 log.Log(ctx, "error writing rtpprof", "error", err)
260 }
261 log.Log(ctx, "got POST /abort, self-destructing")
262 os.Exit(1)
263 })
264
265 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
266 key := p.ByName("key")
267 log.Log(ctx, "stream start")
268
269 var mediaSigner media.MediaSigner
270 var ok bool
271 var err error
272 parts := strings.Split(key, "_")
273
274 if len(parts) == 2 {
275 a.SignerCacheMu.Lock()
276 mediaSigner, ok = a.SignerCache[parts[0]]
277 a.SignerCacheMu.Unlock()
278 if !ok {
279 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
280 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
281 return
282 }
283 } else {
284 mediaSigner, err = a.MakeMediaSigner(ctx, key)
285 if err != nil {
286 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
287 return
288 }
289 }
290
291 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner)
292
293 if err != nil {
294 log.Log(ctx, "stream error", "error", err)
295 errors.WriteHTTPInternalServerError(w, "stream error", err)
296 return
297 }
298 log.Log(ctx, "stream success", "url", r.URL.String())
299 }
300
301 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
302 router.POST("/live/:key", handleIncomingStream)
303 router.PUT("/live/:key", handleIncomingStream)
304
305 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
306 id := p.ByName("id")
307 if id == "" {
308 errors.WriteHTTPBadRequest(w, "id required", nil)
309 return
310 }
311 events, err := a.Model.PlayerReport(id)
312 if err != nil {
313 errors.WriteHTTPBadRequest(w, err.Error(), err)
314 return
315 }
316 bs, err := json.Marshal(events)
317 if err != nil {
318 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
319 return
320 }
321 if _, err := w.Write(bs); err != nil {
322 log.Error(ctx, "error writing response", "error", err)
323 }
324 })
325
326 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
327 id := p.ByName("id")
328 if id == "" {
329 errors.WriteHTTPBadRequest(w, "id required", nil)
330 return
331 }
332 segment, err := a.Model.GetSegment(id)
333 if err != nil {
334 errors.WriteHTTPBadRequest(w, err.Error(), err)
335 return
336 }
337 if segment == nil {
338 errors.WriteHTTPNotFound(w, "segment not found", nil)
339 return
340 }
341 spSeg, err := segment.ToStreamplaceSegment()
342 if err != nil {
343 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
344 return
345 }
346 bs, err := json.Marshal(spSeg)
347 if err != nil {
348 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
349 return
350 }
351 if _, err := w.Write(bs); err != nil {
352 log.Error(ctx, "error writing response", "error", err)
353 }
354 })
355
356 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
357 err := a.Model.ClearPlayerEvents()
358 if err != nil {
359 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
360 return
361 }
362 w.WriteHeader(204)
363 })
364
365 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
366 w.Header().Set("Access-Control-Allow-Origin", "*")
367 w.Header().Set("Access-Control-Allow-Methods", "GET")
368 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
369
370 id := a.Signer.Hex()
371
372 ident, err := a.Model.GetIdentity(id)
373 if err != nil {
374 errors.WriteHTTPInternalServerError(w, "unable to get settings", err)
375 return
376 }
377
378 bs, err := json.Marshal(ident)
379 if err != nil {
380 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
381 return
382 }
383 if _, err := w.Write(bs); err != nil {
384 log.Error(ctx, "error writing response", "error", err)
385 }
386 })
387
388 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
389 user := p.ByName("user")
390 if user == "" {
391 errors.WriteHTTPBadRequest(w, "user required", nil)
392 return
393 }
394
395 followers, err := a.Model.GetUserFollowers(ctx, user)
396 if err != nil {
397 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
398 return
399 }
400 bs, err := json.Marshal(followers)
401 if err != nil {
402 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
403 return
404 }
405 if _, err := w.Write(bs); err != nil {
406 log.Error(ctx, "error writing response", "error", err)
407 }
408 })
409
410 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
411 user := p.ByName("user")
412 if user == "" {
413 errors.WriteHTTPBadRequest(w, "user required", nil)
414 return
415 }
416
417 followers, err := a.Model.GetUserFollowing(ctx, user)
418 if err != nil {
419 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
420 return
421 }
422 bs, err := json.Marshal(followers)
423 if err != nil {
424 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
425 return
426 }
427 if _, err := w.Write(bs); err != nil {
428 log.Error(ctx, "error writing response", "error", err)
429 }
430 })
431
432 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
433 notifications, err := a.Model.ListNotifications()
434 if err != nil {
435 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
436 return
437 }
438 bs, err := json.Marshal(notifications)
439 if err != nil {
440 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
441 return
442 }
443 if _, err := w.Write(bs); err != nil {
444 log.Error(ctx, "error writing response", "error", err)
445 }
446 })
447
448 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
449 posts, err := a.Model.ListFeedPosts()
450 if err != nil {
451 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
452 return
453 }
454 bs, err := json.Marshal(posts)
455 if err != nil {
456 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
457 return
458 }
459 if _, err := w.Write(bs); err != nil {
460 log.Error(ctx, "error writing response", "error", err)
461 }
462 })
463
464 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
465 cid := p.ByName("cid")
466 if cid == "" {
467 errors.WriteHTTPBadRequest(w, "cid required", nil)
468 return
469 }
470 msg, err := a.Model.GetChatMessage(cid)
471 if err != nil {
472 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
473 return
474 }
475 spmsg, err := msg.ToStreamplaceMessageView()
476 if err != nil {
477 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
478 return
479 }
480 bs, err := json.Marshal(spmsg)
481 if err != nil {
482 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
483 return
484 }
485 if _, err := w.Write(bs); err != nil {
486 log.Error(ctx, "error writing response", "error", err)
487 }
488 })
489
490 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
491 sessions, err := a.Model.ListOAuthSessions()
492 if err != nil {
493 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
494 return
495 }
496 bs, err := json.Marshal(sessions)
497 if err != nil {
498 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
499 return
500 }
501 if _, err := w.Write(bs); err != nil {
502 log.Error(ctx, "error writing response", "error", err)
503 }
504 })
505
506 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
507 var payload notificationpkg.NotificationBlast
508 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
509 errors.WriteHTTPBadRequest(w, "invalid request body", err)
510 return
511 }
512 notifications, err := a.Model.ListNotifications()
513 if err != nil {
514 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
515 return
516 }
517 if a.FirebaseNotifier == nil {
518 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
519 return
520 }
521 tokens := []string{}
522 for _, not := range notifications {
523 tokens = append(tokens, not.Token)
524 }
525 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
526 if err != nil {
527 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
528 return
529 }
530 w.WriteHeader(http.StatusNoContent)
531 })
532
533 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
534 w.Header().Set("Access-Control-Allow-Origin", "*")
535 w.Header().Set("Access-Control-Allow-Methods", "PUT")
536 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
537
538 id := p.ByName("id")
539 if id == "" {
540 errors.WriteHTTPBadRequest(w, "id required", nil)
541 return
542 }
543
544 var ident model.Identity
545 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
546 errors.WriteHTTPBadRequest(w, "invalid request body", err)
547 return
548 }
549 ident.ID = id
550
551 if err := a.Model.UpdateIdentity(&ident); err != nil {
552 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
553 return
554 }
555
556 w.WriteHeader(http.StatusNoContent)
557 })
558
559 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
560 var payload struct {
561 URL string `json:"url"`
562 }
563 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4
564 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
565 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err)
566 return
567 }
568 parts := strings.Split(payload.URL, "/")
569 if len(parts) < 5 {
570 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil)
571 return
572 }
573 didSession := parts[4]
574 idParts := strings.Split(didSession, "-")
575 if len(idParts) != 2 {
576 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil)
577 return
578 }
579 did := idParts[0]
580 // sessionID := idParts[1]
581 seg, err := a.Model.LatestSegmentForUser(did)
582 if err != nil {
583 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err)
584 return
585 }
586 spseg, err := seg.ToStreamplaceSegment()
587 if err != nil {
588 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
589 return
590 }
591 renditions, err := renditions.GenerateRenditions(spseg)
592 if err != nil {
593 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err)
594 return
595 }
596 out := map[string]any{
597 "manifestID": didSession,
598 "profiles": renditions.ToLivepeerProfiles(),
599 }
600 bs, err := json.Marshal(out)
601 if err != nil {
602 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
603 return
604 }
605 if _, err := w.Write(bs); err != nil {
606 log.Error(ctx, "error writing response", "error", err)
607 }
608 })
609
610 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
611 key := p.ByName("streamKey")
612 if key == "" {
613 errors.WriteHTTPBadRequest(w, "streamKey required", nil)
614 return
615 }
616 mediaSigner, err := a.MakeMediaSigner(ctx, key)
617 if err != nil {
618 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
619 return
620 }
621 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body)
622 if err != nil {
623 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err)
624 return
625 }
626 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{}))
627 if err != nil {
628 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err)
629 return
630 }
631 w.WriteHeader(200)
632 if _, err := w.Write([]byte(answer.SDP)); err != nil {
633 errors.WriteHTTPInternalServerError(w, "unable to write response", err)
634 log.Error(ctx, "error writing response", "error", err)
635 }
636 })
637
638 handler := sloghttp.Recovery(router)
639 if log.Level(4) {
640 handler = sloghttp.New(slog.Default())(handler)
641 }
642 return handler, nil
643}
644
645func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
646 payload, err := base64.URLEncoding.DecodeString(key)
647 if err != nil {
648 return "", err
649 }
650 signed, err := a.Signer.Verify(payload)
651 if err != nil {
652 return "", err
653 }
654 _, ok := signed.Data().(*v0.StreamKey)
655 if !ok {
656 return "", fmt.Errorf("got signed data but it wasn't a stream key")
657 }
658 return strings.ToLower(signed.Signer()), nil
659}