Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log/slog"
11 "net/http"
12 "net/http/pprof"
13 "os"
14 "path/filepath"
15 "regexp"
16 "runtime"
17 rtpprof "runtime/pprof"
18 "strconv"
19 "strings"
20 "time"
21
22 "github.com/julienschmidt/httprouter"
23 "github.com/pion/webrtc/v4"
24 "github.com/prometheus/client_golang/prometheus/promhttp"
25 sloghttp "github.com/samber/slog-http"
26 "golang.org/x/sync/errgroup"
27 "stream.place/streamplace/pkg/errors"
28 "stream.place/streamplace/pkg/log"
29 "stream.place/streamplace/pkg/media"
30 "stream.place/streamplace/pkg/mist/mistconfig"
31 "stream.place/streamplace/pkg/mist/misttriggers"
32 "stream.place/streamplace/pkg/model"
33 notificationpkg "stream.place/streamplace/pkg/notifications"
34 "stream.place/streamplace/pkg/renditions"
35 "stream.place/streamplace/pkg/rtcrec"
36 v0 "stream.place/streamplace/pkg/schema/v0"
37)
38
39func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
40 handler, err := a.InternalHandler(ctx)
41 if err != nil {
42 return err
43 }
44 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
45 s.Addr = a.CLI.HTTPInternalAddr
46 log.Log(ctx, "http server starting", "addr", s.Addr)
47 return s.ListenAndServe()
48 })
49}
50
51// lightweight way to authenticate push requests to ourself
52var mkvRE *regexp.Regexp
53
54func init() {
55 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
56}
57
58func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
59 router := httprouter.New()
60 broker := misttriggers.NewTriggerBroker()
61
62 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
63 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
64 // Extract the last part of the URL path
65 urlPath := payload.URL.Path
66 parts := strings.Split(urlPath, "/")
67 lastPart := ""
68 if len(parts) > 0 {
69 lastPart = parts[len(parts)-1]
70 }
71 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
72 if err != nil {
73 return "", err
74 }
75
76 ms := time.Now().UnixMilli()
77 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms)
78 a.SignerCacheMu.Lock()
79 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
80 a.SignerCacheMu.Unlock()
81 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
82
83 return out, nil
84 })
85 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
86 router.POST("/mist-trigger", triggerCollection.Trigger())
87 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
88
89 // Add pprof handlers
90 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
91 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
92 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
93 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
94 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
95 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
96 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
97 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
98 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
99 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
100 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
101
102 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
103 runtime.GC()
104 w.WriteHeader(204)
105 })
106
107 router.Handler("GET", "/metrics", promhttp.Handler())
108
109 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
110 user := p.ByName("user")
111 if user == "" {
112 errors.WriteHTTPBadRequest(w, "user required", nil)
113 return
114 }
115 rendition := p.ByName("rendition")
116 if rendition == "" {
117 errors.WriteHTTPBadRequest(w, "rendition required", nil)
118 return
119 }
120 user, err := a.NormalizeUser(ctx, user)
121 if err != nil {
122 errors.WriteHTTPBadRequest(w, "invalid user", err)
123 return
124 }
125 w.Header().Set("content-type", "text/plain")
126 fmt.Fprintf(w, "ffconcat version 1.0\n")
127 // intermittent reports that you need two here to make things work properly? shouldn't matter.
128 for i := 0; i < 2; i += 1 {
129 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
130 }
131 })
132
133 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
134 user := p.ByName("user")
135 if user == "" {
136 errors.WriteHTTPBadRequest(w, "user required", nil)
137 return
138 }
139 user, err := a.NormalizeUser(ctx, user)
140 if err != nil {
141 errors.WriteHTTPBadRequest(w, "invalid user", err)
142 return
143 }
144 rendition := p.ByName("rendition")
145 if rendition == "" {
146 errors.WriteHTTPBadRequest(w, "rendition required", nil)
147 return
148 }
149 seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition)
150 base := filepath.Base(seg.Filepath)
151 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
152 w.WriteHeader(301)
153 })
154
155 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
156 user := p.ByName("user")
157 if user == "" {
158 errors.WriteHTTPBadRequest(w, "user required", nil)
159 return
160 }
161 user, err := a.NormalizeUser(ctx, user)
162 if err != nil {
163 errors.WriteHTTPBadRequest(w, "invalid user", err)
164 return
165 }
166 file := p.ByName("file")
167 if file == "" {
168 errors.WriteHTTPBadRequest(w, "file required", nil)
169 return
170 }
171 fullpath, err := a.CLI.SegmentFilePath(user, file)
172 if err != nil {
173 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
174 return
175 }
176 http.ServeFile(w, r, fullpath)
177 })
178
179 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
180 user := p.ByName("user")
181 if user == "" {
182 errors.WriteHTTPBadRequest(w, "user required", nil)
183 return
184 }
185 rendition := p.ByName("rendition")
186 if rendition == "" {
187 errors.WriteHTTPBadRequest(w, "rendition required", nil)
188 return
189 }
190 user, err := a.NormalizeUser(ctx, user)
191 if err != nil {
192 errors.WriteHTTPBadRequest(w, "invalid user", err)
193 return
194 }
195 var delayMS int64 = 1000
196 userDelay := r.URL.Query().Get("delayms")
197 if userDelay != "" {
198 var err error
199 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
200 if err != nil {
201 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
202 return
203 }
204 if delayMS > 10000 {
205 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
206 return
207 }
208 }
209 w.Header().Set("Content-Type", "video/mp4")
210 w.WriteHeader(200)
211 g, ctx := errgroup.WithContext(ctx)
212 pr, pw := io.Pipe()
213 bufw := bufio.NewWriter(pw)
214 g.Go(func() error {
215 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
216 })
217 g.Go(func() error {
218 time.Sleep(time.Duration(delayMS) * time.Millisecond)
219 _, err := io.Copy(w, pr)
220 return err
221 })
222 if err := g.Wait(); err != nil {
223 errors.WriteHTTPBadRequest(w, "request failed", err)
224 }
225 })
226
227 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
228 user := p.ByName("user")
229 if user == "" {
230 errors.WriteHTTPBadRequest(w, "user required", nil)
231 return
232 }
233 w.Header().Set("Content-Type", "video/x-matroska")
234 w.Header().Set("Transfer-Encoding", "chunked")
235 w.WriteHeader(200)
236 })
237
238 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
239 uu := p.ByName("uuid")
240 if uu == "" {
241 errors.WriteHTTPBadRequest(w, "uuid required", nil)
242 return
243 }
244 pr := a.MediaManager.GetHTTPPipeWriter(uu)
245 if pr == nil {
246 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
247 return
248 }
249 if _, err := io.Copy(pr, r.Body); err != nil {
250 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil)
251 }
252 })
253
254 // self-destruct code, useful for dumping goroutines on windows
255 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
256 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
257 log.Log(ctx, "error writing rtpprof", "error", err)
258 }
259 log.Log(ctx, "got POST /abort, self-destructing")
260 os.Exit(1)
261 })
262
263 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
264 key := p.ByName("key")
265 log.Log(ctx, "stream start")
266
267 var mediaSigner media.MediaSigner
268 var ok bool
269 var err error
270 parts := strings.Split(key, "_")
271
272 if len(parts) == 2 {
273 a.SignerCacheMu.Lock()
274 mediaSigner, ok = a.SignerCache[parts[0]]
275 a.SignerCacheMu.Unlock()
276 if !ok {
277 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
278 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
279 return
280 }
281 } else {
282 mediaSigner, err = a.MakeMediaSigner(ctx, key)
283 if err != nil {
284 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
285 return
286 }
287 }
288
289 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner)
290
291 if err != nil {
292 log.Log(ctx, "stream error", "error", err)
293 errors.WriteHTTPInternalServerError(w, "stream error", err)
294 return
295 }
296 log.Log(ctx, "stream success", "url", r.URL.String())
297 }
298
299 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
300 router.POST("/live/:key", handleIncomingStream)
301 router.PUT("/live/:key", handleIncomingStream)
302
303 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
304 id := p.ByName("id")
305 if id == "" {
306 errors.WriteHTTPBadRequest(w, "id required", nil)
307 return
308 }
309 events, err := a.Model.PlayerReport(id)
310 if err != nil {
311 errors.WriteHTTPBadRequest(w, err.Error(), err)
312 return
313 }
314 bs, err := json.Marshal(events)
315 if err != nil {
316 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
317 return
318 }
319 if _, err := w.Write(bs); err != nil {
320 log.Error(ctx, "error writing response", "error", err)
321 }
322 })
323
324 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
325 id := p.ByName("id")
326 if id == "" {
327 errors.WriteHTTPBadRequest(w, "id required", nil)
328 return
329 }
330 segment, err := a.Model.GetSegment(id)
331 if err != nil {
332 errors.WriteHTTPBadRequest(w, err.Error(), err)
333 return
334 }
335 if segment == nil {
336 errors.WriteHTTPNotFound(w, "segment not found", nil)
337 return
338 }
339 spSeg, err := segment.ToStreamplaceSegment()
340 if err != nil {
341 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
342 return
343 }
344 bs, err := json.Marshal(spSeg)
345 if err != nil {
346 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
347 return
348 }
349 if _, err := w.Write(bs); err != nil {
350 log.Error(ctx, "error writing response", "error", err)
351 }
352 })
353
354 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
355 err := a.Model.ClearPlayerEvents()
356 if err != nil {
357 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
358 return
359 }
360 w.WriteHeader(204)
361 })
362
363 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
364 w.Header().Set("Access-Control-Allow-Origin", "*")
365 w.Header().Set("Access-Control-Allow-Methods", "GET")
366 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
367
368 id := a.Signer.Hex()
369
370 ident, err := a.Model.GetIdentity(id)
371 if err != nil {
372 errors.WriteHTTPInternalServerError(w, "unable to get settings", err)
373 return
374 }
375
376 bs, err := json.Marshal(ident)
377 if err != nil {
378 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
379 return
380 }
381 if _, err := w.Write(bs); err != nil {
382 log.Error(ctx, "error writing response", "error", err)
383 }
384 })
385
386 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
387 user := p.ByName("user")
388 if user == "" {
389 errors.WriteHTTPBadRequest(w, "user required", nil)
390 return
391 }
392
393 followers, err := a.Model.GetUserFollowers(ctx, user)
394 if err != nil {
395 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
396 return
397 }
398 bs, err := json.Marshal(followers)
399 if err != nil {
400 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
401 return
402 }
403 if _, err := w.Write(bs); err != nil {
404 log.Error(ctx, "error writing response", "error", err)
405 }
406 })
407
408 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
409 user := p.ByName("user")
410 if user == "" {
411 errors.WriteHTTPBadRequest(w, "user required", nil)
412 return
413 }
414
415 followers, err := a.Model.GetUserFollowing(ctx, user)
416 if err != nil {
417 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
418 return
419 }
420 bs, err := json.Marshal(followers)
421 if err != nil {
422 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
423 return
424 }
425 if _, err := w.Write(bs); err != nil {
426 log.Error(ctx, "error writing response", "error", err)
427 }
428 })
429
430 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
431 notifications, err := a.Model.ListNotifications()
432 if err != nil {
433 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
434 return
435 }
436 bs, err := json.Marshal(notifications)
437 if err != nil {
438 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
439 return
440 }
441 if _, err := w.Write(bs); err != nil {
442 log.Error(ctx, "error writing response", "error", err)
443 }
444 })
445
446 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
447 posts, err := a.Model.ListFeedPosts()
448 if err != nil {
449 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
450 return
451 }
452 bs, err := json.Marshal(posts)
453 if err != nil {
454 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
455 return
456 }
457 if _, err := w.Write(bs); err != nil {
458 log.Error(ctx, "error writing response", "error", err)
459 }
460 })
461
462 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
463 cid := p.ByName("cid")
464 if cid == "" {
465 errors.WriteHTTPBadRequest(w, "cid required", nil)
466 return
467 }
468 msg, err := a.Model.GetChatMessage(cid)
469 if err != nil {
470 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
471 return
472 }
473 spmsg, err := msg.ToStreamplaceMessageView()
474 if err != nil {
475 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
476 return
477 }
478 bs, err := json.Marshal(spmsg)
479 if err != nil {
480 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
481 return
482 }
483 if _, err := w.Write(bs); err != nil {
484 log.Error(ctx, "error writing response", "error", err)
485 }
486 })
487
488 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
489 sessions, err := a.Model.ListOAuthSessions()
490 if err != nil {
491 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
492 return
493 }
494 bs, err := json.Marshal(sessions)
495 if err != nil {
496 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
497 return
498 }
499 if _, err := w.Write(bs); err != nil {
500 log.Error(ctx, "error writing response", "error", err)
501 }
502 })
503
504 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
505 var payload notificationpkg.NotificationBlast
506 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
507 errors.WriteHTTPBadRequest(w, "invalid request body", err)
508 return
509 }
510 notifications, err := a.Model.ListNotifications()
511 if err != nil {
512 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
513 return
514 }
515 if a.FirebaseNotifier == nil {
516 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
517 return
518 }
519 tokens := []string{}
520 for _, not := range notifications {
521 tokens = append(tokens, not.Token)
522 }
523 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
524 if err != nil {
525 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
526 return
527 }
528 w.WriteHeader(http.StatusNoContent)
529 })
530
531 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
532 w.Header().Set("Access-Control-Allow-Origin", "*")
533 w.Header().Set("Access-Control-Allow-Methods", "PUT")
534 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
535
536 id := p.ByName("id")
537 if id == "" {
538 errors.WriteHTTPBadRequest(w, "id required", nil)
539 return
540 }
541
542 var ident model.Identity
543 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
544 errors.WriteHTTPBadRequest(w, "invalid request body", err)
545 return
546 }
547 ident.ID = id
548
549 if err := a.Model.UpdateIdentity(&ident); err != nil {
550 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
551 return
552 }
553
554 w.WriteHeader(http.StatusNoContent)
555 })
556
557 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
558 var payload struct {
559 URL string `json:"url"`
560 }
561 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4
562 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
563 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err)
564 return
565 }
566 parts := strings.Split(payload.URL, "/")
567 if len(parts) < 5 {
568 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil)
569 return
570 }
571 didSession := parts[4]
572 idParts := strings.Split(didSession, "-")
573 if len(idParts) != 2 {
574 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil)
575 return
576 }
577 did := idParts[0]
578 // sessionID := idParts[1]
579 seg, err := a.Model.LatestSegmentForUser(did)
580 if err != nil {
581 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err)
582 return
583 }
584 spseg, err := seg.ToStreamplaceSegment()
585 if err != nil {
586 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
587 return
588 }
589 renditions, err := renditions.GenerateRenditions(spseg)
590 if err != nil {
591 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err)
592 return
593 }
594 out := map[string]any{
595 "manifestID": didSession,
596 "profiles": renditions.ToLivepeerProfiles(),
597 }
598 bs, err := json.Marshal(out)
599 if err != nil {
600 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
601 return
602 }
603 if _, err := w.Write(bs); err != nil {
604 log.Error(ctx, "error writing response", "error", err)
605 }
606 })
607
608 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
609 key := p.ByName("streamKey")
610 if key == "" {
611 errors.WriteHTTPBadRequest(w, "streamKey required", nil)
612 return
613 }
614 mediaSigner, err := a.MakeMediaSigner(ctx, key)
615 if err != nil {
616 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
617 return
618 }
619 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body)
620 if err != nil {
621 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err)
622 return
623 }
624 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc)
625 if err != nil {
626 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err)
627 return
628 }
629 w.WriteHeader(200)
630 if _, err := w.Write([]byte(answer.SDP)); err != nil {
631 errors.WriteHTTPInternalServerError(w, "unable to write response", err)
632 log.Error(ctx, "error writing response", "error", err)
633 }
634 })
635
636 handler := sloghttp.Recovery(router)
637 if log.Level(4) {
638 handler = sloghttp.New(slog.Default())(handler)
639 }
640 return handler, nil
641}
642
643func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
644 payload, err := base64.URLEncoding.DecodeString(key)
645 if err != nil {
646 return "", err
647 }
648 signed, err := a.Signer.Verify(payload)
649 if err != nil {
650 return "", err
651 }
652 _, ok := signed.Data().(*v0.StreamKey)
653 if !ok {
654 return "", fmt.Errorf("got signed data but it wasn't a stream key")
655 }
656 return strings.ToLower(signed.Signer()), nil
657}