Live video on the AT Protocol
1package api 2 3import ( 4 "bufio" 5 "context" 6 "encoding/base64" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log/slog" 11 "net/http" 12 "net/http/pprof" 13 "os" 14 "path/filepath" 15 "regexp" 16 "runtime" 17 rtpprof "runtime/pprof" 18 "strconv" 19 "strings" 20 "time" 21 22 "github.com/julienschmidt/httprouter" 23 "github.com/pion/webrtc/v4" 24 "github.com/prometheus/client_golang/prometheus/promhttp" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/sync/errgroup" 27 "stream.place/streamplace/pkg/errors" 28 "stream.place/streamplace/pkg/log" 29 "stream.place/streamplace/pkg/media" 30 "stream.place/streamplace/pkg/mist/mistconfig" 31 "stream.place/streamplace/pkg/mist/misttriggers" 32 "stream.place/streamplace/pkg/model" 33 notificationpkg "stream.place/streamplace/pkg/notifications" 34 "stream.place/streamplace/pkg/renditions" 35 "stream.place/streamplace/pkg/rtcrec" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37) 38 39func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 40 handler, err := a.InternalHandler(ctx) 41 if err != nil { 42 return err 43 } 44 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 45 s.Addr = a.CLI.HTTPInternalAddr 46 log.Log(ctx, "http server starting", "addr", s.Addr) 47 return s.ListenAndServe() 48 }) 49} 50 51// lightweight way to authenticate push requests to ourself 52var mkvRE *regexp.Regexp 53 54func init() { 55 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 56} 57 58func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 59 router := httprouter.New() 60 broker := misttriggers.NewTriggerBroker() 61 62 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 63 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 64 // Extract the last part of the URL path 65 urlPath := payload.URL.Path 66 parts := strings.Split(urlPath, "/") 67 lastPart := "" 68 if len(parts) > 0 { 69 lastPart = parts[len(parts)-1] 70 } 71 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 72 if err != nil { 73 return "", err 74 } 75 76 ms := time.Now().UnixMilli() 77 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms) 78 a.SignerCacheMu.Lock() 79 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 80 a.SignerCacheMu.Unlock() 81 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 82 83 return out, nil 84 }) 85 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 86 router.POST("/mist-trigger", triggerCollection.Trigger()) 87 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 88 89 // Add pprof handlers 90 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 91 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 92 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 93 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 94 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 95 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 96 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 97 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 98 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 99 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 100 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 101 102 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 103 runtime.GC() 104 w.WriteHeader(204) 105 }) 106 107 router.Handler("GET", "/metrics", promhttp.Handler()) 108 109 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 110 user := p.ByName("user") 111 if user == "" { 112 errors.WriteHTTPBadRequest(w, "user required", nil) 113 return 114 } 115 rendition := p.ByName("rendition") 116 if rendition == "" { 117 errors.WriteHTTPBadRequest(w, "rendition required", nil) 118 return 119 } 120 user, err := a.NormalizeUser(ctx, user) 121 if err != nil { 122 errors.WriteHTTPBadRequest(w, "invalid user", err) 123 return 124 } 125 w.Header().Set("content-type", "text/plain") 126 fmt.Fprintf(w, "ffconcat version 1.0\n") 127 // intermittent reports that you need two here to make things work properly? shouldn't matter. 128 for i := 0; i < 2; i += 1 { 129 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 130 } 131 }) 132 133 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 134 user := p.ByName("user") 135 if user == "" { 136 errors.WriteHTTPBadRequest(w, "user required", nil) 137 return 138 } 139 user, err := a.NormalizeUser(ctx, user) 140 if err != nil { 141 errors.WriteHTTPBadRequest(w, "invalid user", err) 142 return 143 } 144 rendition := p.ByName("rendition") 145 if rendition == "" { 146 errors.WriteHTTPBadRequest(w, "rendition required", nil) 147 return 148 } 149 seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition) 150 base := filepath.Base(seg.Filepath) 151 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 152 w.WriteHeader(301) 153 }) 154 155 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 156 user := p.ByName("user") 157 if user == "" { 158 errors.WriteHTTPBadRequest(w, "user required", nil) 159 return 160 } 161 user, err := a.NormalizeUser(ctx, user) 162 if err != nil { 163 errors.WriteHTTPBadRequest(w, "invalid user", err) 164 return 165 } 166 file := p.ByName("file") 167 if file == "" { 168 errors.WriteHTTPBadRequest(w, "file required", nil) 169 return 170 } 171 fullpath, err := a.CLI.SegmentFilePath(user, file) 172 if err != nil { 173 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 174 return 175 } 176 http.ServeFile(w, r, fullpath) 177 }) 178 179 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 180 user := p.ByName("user") 181 if user == "" { 182 errors.WriteHTTPBadRequest(w, "user required", nil) 183 return 184 } 185 rendition := p.ByName("rendition") 186 if rendition == "" { 187 errors.WriteHTTPBadRequest(w, "rendition required", nil) 188 return 189 } 190 user, err := a.NormalizeUser(ctx, user) 191 if err != nil { 192 errors.WriteHTTPBadRequest(w, "invalid user", err) 193 return 194 } 195 var delayMS int64 = 1000 196 userDelay := r.URL.Query().Get("delayms") 197 if userDelay != "" { 198 var err error 199 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 200 if err != nil { 201 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 202 return 203 } 204 if delayMS > 10000 { 205 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 206 return 207 } 208 } 209 w.Header().Set("Content-Type", "video/mp4") 210 w.WriteHeader(200) 211 g, ctx := errgroup.WithContext(ctx) 212 pr, pw := io.Pipe() 213 bufw := bufio.NewWriter(pw) 214 g.Go(func() error { 215 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 216 }) 217 g.Go(func() error { 218 time.Sleep(time.Duration(delayMS) * time.Millisecond) 219 _, err := io.Copy(w, pr) 220 return err 221 }) 222 if err := g.Wait(); err != nil { 223 errors.WriteHTTPBadRequest(w, "request failed", err) 224 } 225 }) 226 227 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 228 user := p.ByName("user") 229 if user == "" { 230 errors.WriteHTTPBadRequest(w, "user required", nil) 231 return 232 } 233 w.Header().Set("Content-Type", "video/x-matroska") 234 w.Header().Set("Transfer-Encoding", "chunked") 235 w.WriteHeader(200) 236 }) 237 238 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 239 uu := p.ByName("uuid") 240 if uu == "" { 241 errors.WriteHTTPBadRequest(w, "uuid required", nil) 242 return 243 } 244 pr := a.MediaManager.GetHTTPPipeWriter(uu) 245 if pr == nil { 246 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 247 return 248 } 249 if _, err := io.Copy(pr, r.Body); err != nil { 250 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil) 251 } 252 }) 253 254 // self-destruct code, useful for dumping goroutines on windows 255 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 256 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 257 log.Log(ctx, "error writing rtpprof", "error", err) 258 } 259 log.Log(ctx, "got POST /abort, self-destructing") 260 os.Exit(1) 261 }) 262 263 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 264 key := p.ByName("key") 265 log.Log(ctx, "stream start") 266 267 var mediaSigner media.MediaSigner 268 var ok bool 269 var err error 270 parts := strings.Split(key, "_") 271 272 if len(parts) == 2 { 273 a.SignerCacheMu.Lock() 274 mediaSigner, ok = a.SignerCache[parts[0]] 275 a.SignerCacheMu.Unlock() 276 if !ok { 277 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 278 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 279 return 280 } 281 } else { 282 mediaSigner, err = a.MakeMediaSigner(ctx, key) 283 if err != nil { 284 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 285 return 286 } 287 } 288 289 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 290 291 if err != nil { 292 log.Log(ctx, "stream error", "error", err) 293 errors.WriteHTTPInternalServerError(w, "stream error", err) 294 return 295 } 296 log.Log(ctx, "stream success", "url", r.URL.String()) 297 } 298 299 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 300 router.POST("/live/:key", handleIncomingStream) 301 router.PUT("/live/:key", handleIncomingStream) 302 303 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 304 id := p.ByName("id") 305 if id == "" { 306 errors.WriteHTTPBadRequest(w, "id required", nil) 307 return 308 } 309 events, err := a.Model.PlayerReport(id) 310 if err != nil { 311 errors.WriteHTTPBadRequest(w, err.Error(), err) 312 return 313 } 314 bs, err := json.Marshal(events) 315 if err != nil { 316 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 317 return 318 } 319 if _, err := w.Write(bs); err != nil { 320 log.Error(ctx, "error writing response", "error", err) 321 } 322 }) 323 324 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 325 id := p.ByName("id") 326 if id == "" { 327 errors.WriteHTTPBadRequest(w, "id required", nil) 328 return 329 } 330 segment, err := a.Model.GetSegment(id) 331 if err != nil { 332 errors.WriteHTTPBadRequest(w, err.Error(), err) 333 return 334 } 335 if segment == nil { 336 errors.WriteHTTPNotFound(w, "segment not found", nil) 337 return 338 } 339 spSeg, err := segment.ToStreamplaceSegment() 340 if err != nil { 341 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 342 return 343 } 344 bs, err := json.Marshal(spSeg) 345 if err != nil { 346 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 347 return 348 } 349 if _, err := w.Write(bs); err != nil { 350 log.Error(ctx, "error writing response", "error", err) 351 } 352 }) 353 354 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 355 err := a.Model.ClearPlayerEvents() 356 if err != nil { 357 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 358 return 359 } 360 w.WriteHeader(204) 361 }) 362 363 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 364 w.Header().Set("Access-Control-Allow-Origin", "*") 365 w.Header().Set("Access-Control-Allow-Methods", "GET") 366 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 367 368 id := a.Signer.Hex() 369 370 ident, err := a.Model.GetIdentity(id) 371 if err != nil { 372 errors.WriteHTTPInternalServerError(w, "unable to get settings", err) 373 return 374 } 375 376 bs, err := json.Marshal(ident) 377 if err != nil { 378 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 379 return 380 } 381 if _, err := w.Write(bs); err != nil { 382 log.Error(ctx, "error writing response", "error", err) 383 } 384 }) 385 386 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 387 user := p.ByName("user") 388 if user == "" { 389 errors.WriteHTTPBadRequest(w, "user required", nil) 390 return 391 } 392 393 followers, err := a.Model.GetUserFollowers(ctx, user) 394 if err != nil { 395 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 396 return 397 } 398 bs, err := json.Marshal(followers) 399 if err != nil { 400 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 401 return 402 } 403 if _, err := w.Write(bs); err != nil { 404 log.Error(ctx, "error writing response", "error", err) 405 } 406 }) 407 408 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 409 user := p.ByName("user") 410 if user == "" { 411 errors.WriteHTTPBadRequest(w, "user required", nil) 412 return 413 } 414 415 followers, err := a.Model.GetUserFollowing(ctx, user) 416 if err != nil { 417 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 418 return 419 } 420 bs, err := json.Marshal(followers) 421 if err != nil { 422 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 423 return 424 } 425 if _, err := w.Write(bs); err != nil { 426 log.Error(ctx, "error writing response", "error", err) 427 } 428 }) 429 430 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 431 notifications, err := a.Model.ListNotifications() 432 if err != nil { 433 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 434 return 435 } 436 bs, err := json.Marshal(notifications) 437 if err != nil { 438 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 439 return 440 } 441 if _, err := w.Write(bs); err != nil { 442 log.Error(ctx, "error writing response", "error", err) 443 } 444 }) 445 446 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 447 posts, err := a.Model.ListFeedPosts() 448 if err != nil { 449 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 450 return 451 } 452 bs, err := json.Marshal(posts) 453 if err != nil { 454 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 455 return 456 } 457 if _, err := w.Write(bs); err != nil { 458 log.Error(ctx, "error writing response", "error", err) 459 } 460 }) 461 462 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 463 cid := p.ByName("cid") 464 if cid == "" { 465 errors.WriteHTTPBadRequest(w, "cid required", nil) 466 return 467 } 468 msg, err := a.Model.GetChatMessage(cid) 469 if err != nil { 470 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 471 return 472 } 473 spmsg, err := msg.ToStreamplaceMessageView() 474 if err != nil { 475 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 476 return 477 } 478 bs, err := json.Marshal(spmsg) 479 if err != nil { 480 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 481 return 482 } 483 if _, err := w.Write(bs); err != nil { 484 log.Error(ctx, "error writing response", "error", err) 485 } 486 }) 487 488 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 489 sessions, err := a.Model.ListOAuthSessions() 490 if err != nil { 491 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 492 return 493 } 494 bs, err := json.Marshal(sessions) 495 if err != nil { 496 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 497 return 498 } 499 if _, err := w.Write(bs); err != nil { 500 log.Error(ctx, "error writing response", "error", err) 501 } 502 }) 503 504 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 505 var payload notificationpkg.NotificationBlast 506 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 507 errors.WriteHTTPBadRequest(w, "invalid request body", err) 508 return 509 } 510 notifications, err := a.Model.ListNotifications() 511 if err != nil { 512 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 513 return 514 } 515 if a.FirebaseNotifier == nil { 516 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 517 return 518 } 519 tokens := []string{} 520 for _, not := range notifications { 521 tokens = append(tokens, not.Token) 522 } 523 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 524 if err != nil { 525 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 526 return 527 } 528 w.WriteHeader(http.StatusNoContent) 529 }) 530 531 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 532 w.Header().Set("Access-Control-Allow-Origin", "*") 533 w.Header().Set("Access-Control-Allow-Methods", "PUT") 534 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 535 536 id := p.ByName("id") 537 if id == "" { 538 errors.WriteHTTPBadRequest(w, "id required", nil) 539 return 540 } 541 542 var ident model.Identity 543 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 544 errors.WriteHTTPBadRequest(w, "invalid request body", err) 545 return 546 } 547 ident.ID = id 548 549 if err := a.Model.UpdateIdentity(&ident); err != nil { 550 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 551 return 552 } 553 554 w.WriteHeader(http.StatusNoContent) 555 }) 556 557 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 558 var payload struct { 559 URL string `json:"url"` 560 } 561 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4 562 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 563 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err) 564 return 565 } 566 parts := strings.Split(payload.URL, "/") 567 if len(parts) < 5 { 568 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil) 569 return 570 } 571 didSession := parts[4] 572 idParts := strings.Split(didSession, "-") 573 if len(idParts) != 2 { 574 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil) 575 return 576 } 577 did := idParts[0] 578 // sessionID := idParts[1] 579 seg, err := a.Model.LatestSegmentForUser(did) 580 if err != nil { 581 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err) 582 return 583 } 584 spseg, err := seg.ToStreamplaceSegment() 585 if err != nil { 586 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 587 return 588 } 589 renditions, err := renditions.GenerateRenditions(spseg) 590 if err != nil { 591 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err) 592 return 593 } 594 out := map[string]any{ 595 "manifestID": didSession, 596 "profiles": renditions.ToLivepeerProfiles(), 597 } 598 bs, err := json.Marshal(out) 599 if err != nil { 600 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 601 return 602 } 603 if _, err := w.Write(bs); err != nil { 604 log.Error(ctx, "error writing response", "error", err) 605 } 606 }) 607 608 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 609 key := p.ByName("streamKey") 610 if key == "" { 611 errors.WriteHTTPBadRequest(w, "streamKey required", nil) 612 return 613 } 614 mediaSigner, err := a.MakeMediaSigner(ctx, key) 615 if err != nil { 616 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 617 return 618 } 619 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body) 620 if err != nil { 621 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err) 622 return 623 } 624 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc) 625 if err != nil { 626 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err) 627 return 628 } 629 w.WriteHeader(200) 630 if _, err := w.Write([]byte(answer.SDP)); err != nil { 631 errors.WriteHTTPInternalServerError(w, "unable to write response", err) 632 log.Error(ctx, "error writing response", "error", err) 633 } 634 }) 635 636 handler := sloghttp.Recovery(router) 637 if log.Level(4) { 638 handler = sloghttp.New(slog.Default())(handler) 639 } 640 return handler, nil 641} 642 643func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 644 payload, err := base64.URLEncoding.DecodeString(key) 645 if err != nil { 646 return "", err 647 } 648 signed, err := a.Signer.Verify(payload) 649 if err != nil { 650 return "", err 651 } 652 _, ok := signed.Data().(*v0.StreamKey) 653 if !ok { 654 return "", fmt.Errorf("got signed data but it wasn't a stream key") 655 } 656 return strings.ToLower(signed.Signer()), nil 657}