Live video on the AT Protocol
at natb/patch-rn-webrtc-fork 659 lines 20 kB view raw
1package api 2 3import ( 4 "bufio" 5 "context" 6 "encoding/base64" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log/slog" 11 "net/http" 12 "net/http/pprof" 13 "os" 14 "path/filepath" 15 "regexp" 16 "runtime" 17 rtpprof "runtime/pprof" 18 "strconv" 19 "strings" 20 "time" 21 22 "github.com/julienschmidt/httprouter" 23 "github.com/pion/webrtc/v4" 24 "github.com/prometheus/client_golang/prometheus/promhttp" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/sync/errgroup" 27 "stream.place/streamplace/pkg/errors" 28 "stream.place/streamplace/pkg/log" 29 "stream.place/streamplace/pkg/media" 30 "stream.place/streamplace/pkg/mist/mistconfig" 31 "stream.place/streamplace/pkg/mist/misttriggers" 32 "stream.place/streamplace/pkg/model" 33 notificationpkg "stream.place/streamplace/pkg/notifications" 34 "stream.place/streamplace/pkg/renditions" 35 "stream.place/streamplace/pkg/rtcrec" 36 v0 "stream.place/streamplace/pkg/schema/v0" 37) 38 39func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 40 handler, err := a.InternalHandler(ctx) 41 if err != nil { 42 return err 43 } 44 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 45 s.Addr = a.CLI.HTTPInternalAddr 46 log.Log(ctx, "http server starting", "addr", s.Addr) 47 return s.ListenAndServe() 48 }) 49} 50 51// lightweight way to authenticate push requests to ourself 52var mkvRE *regexp.Regexp 53 54func init() { 55 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 56} 57 58func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 59 router := httprouter.New() 60 broker := misttriggers.NewTriggerBroker() 61 62 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 63 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 64 // Extract the last part of the URL path 65 urlPath := payload.URL.Path 66 parts := strings.Split(urlPath, "/") 67 lastPart := "" 68 if len(parts) > 0 { 69 lastPart = parts[len(parts)-1] 70 } 71 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 72 if err != nil { 73 return "", err 74 } 75 76 ms := time.Now().UnixMilli() 77 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms) 78 a.SignerCacheMu.Lock() 79 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 80 a.SignerCacheMu.Unlock() 81 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 82 83 return out, nil 84 }) 85 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 86 router.POST("/mist-trigger", triggerCollection.Trigger()) 87 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 88 89 // Add pprof handlers 90 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 91 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 92 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 93 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 94 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 95 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 96 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 97 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 98 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 99 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 100 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 101 102 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 103 runtime.GC() 104 w.WriteHeader(204) 105 }) 106 107 router.Handler("GET", "/metrics", promhttp.Handler()) 108 109 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 110 user := p.ByName("user") 111 if user == "" { 112 errors.WriteHTTPBadRequest(w, "user required", nil) 113 return 114 } 115 rendition := p.ByName("rendition") 116 if rendition == "" { 117 errors.WriteHTTPBadRequest(w, "rendition required", nil) 118 return 119 } 120 user, err := a.NormalizeUser(ctx, user) 121 if err != nil { 122 errors.WriteHTTPBadRequest(w, "invalid user", err) 123 return 124 } 125 w.Header().Set("content-type", "text/plain") 126 fmt.Fprintf(w, "ffconcat version 1.0\n") 127 // intermittent reports that you need two here to make things work properly? shouldn't matter. 128 for i := 0; i < 2; i += 1 { 129 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 130 } 131 }) 132 133 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 134 user := p.ByName("user") 135 if user == "" { 136 errors.WriteHTTPBadRequest(w, "user required", nil) 137 return 138 } 139 user, err := a.NormalizeUser(ctx, user) 140 if err != nil { 141 errors.WriteHTTPBadRequest(w, "invalid user", err) 142 return 143 } 144 rendition := p.ByName("rendition") 145 if rendition == "" { 146 errors.WriteHTTPBadRequest(w, "rendition required", nil) 147 return 148 } 149 segChan := a.Bus.SubscribeSegment(ctx, user, rendition) 150 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan) 151 seg := <-segChan.C 152 base := filepath.Base(seg.Filepath) 153 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 154 w.WriteHeader(301) 155 }) 156 157 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 158 user := p.ByName("user") 159 if user == "" { 160 errors.WriteHTTPBadRequest(w, "user required", nil) 161 return 162 } 163 user, err := a.NormalizeUser(ctx, user) 164 if err != nil { 165 errors.WriteHTTPBadRequest(w, "invalid user", err) 166 return 167 } 168 file := p.ByName("file") 169 if file == "" { 170 errors.WriteHTTPBadRequest(w, "file required", nil) 171 return 172 } 173 fullpath, err := a.CLI.SegmentFilePath(user, file) 174 if err != nil { 175 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 176 return 177 } 178 http.ServeFile(w, r, fullpath) 179 }) 180 181 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 182 user := p.ByName("user") 183 if user == "" { 184 errors.WriteHTTPBadRequest(w, "user required", nil) 185 return 186 } 187 rendition := p.ByName("rendition") 188 if rendition == "" { 189 errors.WriteHTTPBadRequest(w, "rendition required", nil) 190 return 191 } 192 user, err := a.NormalizeUser(ctx, user) 193 if err != nil { 194 errors.WriteHTTPBadRequest(w, "invalid user", err) 195 return 196 } 197 var delayMS int64 = 1000 198 userDelay := r.URL.Query().Get("delayms") 199 if userDelay != "" { 200 var err error 201 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 202 if err != nil { 203 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 204 return 205 } 206 if delayMS > 10000 { 207 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 208 return 209 } 210 } 211 w.Header().Set("Content-Type", "video/mp4") 212 w.WriteHeader(200) 213 g, ctx := errgroup.WithContext(ctx) 214 pr, pw := io.Pipe() 215 bufw := bufio.NewWriter(pw) 216 g.Go(func() error { 217 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 218 }) 219 g.Go(func() error { 220 time.Sleep(time.Duration(delayMS) * time.Millisecond) 221 _, err := io.Copy(w, pr) 222 return err 223 }) 224 if err := g.Wait(); err != nil { 225 errors.WriteHTTPBadRequest(w, "request failed", err) 226 } 227 }) 228 229 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 230 user := p.ByName("user") 231 if user == "" { 232 errors.WriteHTTPBadRequest(w, "user required", nil) 233 return 234 } 235 w.Header().Set("Content-Type", "video/x-matroska") 236 w.Header().Set("Transfer-Encoding", "chunked") 237 w.WriteHeader(200) 238 }) 239 240 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 241 uu := p.ByName("uuid") 242 if uu == "" { 243 errors.WriteHTTPBadRequest(w, "uuid required", nil) 244 return 245 } 246 pr := a.MediaManager.GetHTTPPipeWriter(uu) 247 if pr == nil { 248 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 249 return 250 } 251 if _, err := io.Copy(pr, r.Body); err != nil { 252 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil) 253 } 254 }) 255 256 // self-destruct code, useful for dumping goroutines on windows 257 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 258 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 259 log.Log(ctx, "error writing rtpprof", "error", err) 260 } 261 log.Log(ctx, "got POST /abort, self-destructing") 262 os.Exit(1) 263 }) 264 265 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 266 key := p.ByName("key") 267 log.Log(ctx, "stream start") 268 269 var mediaSigner media.MediaSigner 270 var ok bool 271 var err error 272 parts := strings.Split(key, "_") 273 274 if len(parts) == 2 { 275 a.SignerCacheMu.Lock() 276 mediaSigner, ok = a.SignerCache[parts[0]] 277 a.SignerCacheMu.Unlock() 278 if !ok { 279 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 280 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 281 return 282 } 283 } else { 284 mediaSigner, err = a.MakeMediaSigner(ctx, key) 285 if err != nil { 286 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 287 return 288 } 289 } 290 291 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 292 293 if err != nil { 294 log.Log(ctx, "stream error", "error", err) 295 errors.WriteHTTPInternalServerError(w, "stream error", err) 296 return 297 } 298 log.Log(ctx, "stream success", "url", r.URL.String()) 299 } 300 301 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 302 router.POST("/live/:key", handleIncomingStream) 303 router.PUT("/live/:key", handleIncomingStream) 304 305 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 306 id := p.ByName("id") 307 if id == "" { 308 errors.WriteHTTPBadRequest(w, "id required", nil) 309 return 310 } 311 events, err := a.Model.PlayerReport(id) 312 if err != nil { 313 errors.WriteHTTPBadRequest(w, err.Error(), err) 314 return 315 } 316 bs, err := json.Marshal(events) 317 if err != nil { 318 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 319 return 320 } 321 if _, err := w.Write(bs); err != nil { 322 log.Error(ctx, "error writing response", "error", err) 323 } 324 }) 325 326 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 327 id := p.ByName("id") 328 if id == "" { 329 errors.WriteHTTPBadRequest(w, "id required", nil) 330 return 331 } 332 segment, err := a.Model.GetSegment(id) 333 if err != nil { 334 errors.WriteHTTPBadRequest(w, err.Error(), err) 335 return 336 } 337 if segment == nil { 338 errors.WriteHTTPNotFound(w, "segment not found", nil) 339 return 340 } 341 spSeg, err := segment.ToStreamplaceSegment() 342 if err != nil { 343 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 344 return 345 } 346 bs, err := json.Marshal(spSeg) 347 if err != nil { 348 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 349 return 350 } 351 if _, err := w.Write(bs); err != nil { 352 log.Error(ctx, "error writing response", "error", err) 353 } 354 }) 355 356 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 357 err := a.Model.ClearPlayerEvents() 358 if err != nil { 359 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 360 return 361 } 362 w.WriteHeader(204) 363 }) 364 365 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 366 w.Header().Set("Access-Control-Allow-Origin", "*") 367 w.Header().Set("Access-Control-Allow-Methods", "GET") 368 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 369 370 id := a.Signer.Hex() 371 372 ident, err := a.Model.GetIdentity(id) 373 if err != nil { 374 errors.WriteHTTPInternalServerError(w, "unable to get settings", err) 375 return 376 } 377 378 bs, err := json.Marshal(ident) 379 if err != nil { 380 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 381 return 382 } 383 if _, err := w.Write(bs); err != nil { 384 log.Error(ctx, "error writing response", "error", err) 385 } 386 }) 387 388 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 389 user := p.ByName("user") 390 if user == "" { 391 errors.WriteHTTPBadRequest(w, "user required", nil) 392 return 393 } 394 395 followers, err := a.Model.GetUserFollowers(ctx, user) 396 if err != nil { 397 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 398 return 399 } 400 bs, err := json.Marshal(followers) 401 if err != nil { 402 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 403 return 404 } 405 if _, err := w.Write(bs); err != nil { 406 log.Error(ctx, "error writing response", "error", err) 407 } 408 }) 409 410 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 411 user := p.ByName("user") 412 if user == "" { 413 errors.WriteHTTPBadRequest(w, "user required", nil) 414 return 415 } 416 417 followers, err := a.Model.GetUserFollowing(ctx, user) 418 if err != nil { 419 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 420 return 421 } 422 bs, err := json.Marshal(followers) 423 if err != nil { 424 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 425 return 426 } 427 if _, err := w.Write(bs); err != nil { 428 log.Error(ctx, "error writing response", "error", err) 429 } 430 }) 431 432 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 433 notifications, err := a.Model.ListNotifications() 434 if err != nil { 435 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 436 return 437 } 438 bs, err := json.Marshal(notifications) 439 if err != nil { 440 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 441 return 442 } 443 if _, err := w.Write(bs); err != nil { 444 log.Error(ctx, "error writing response", "error", err) 445 } 446 }) 447 448 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 449 posts, err := a.Model.ListFeedPosts() 450 if err != nil { 451 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 452 return 453 } 454 bs, err := json.Marshal(posts) 455 if err != nil { 456 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 457 return 458 } 459 if _, err := w.Write(bs); err != nil { 460 log.Error(ctx, "error writing response", "error", err) 461 } 462 }) 463 464 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 465 cid := p.ByName("cid") 466 if cid == "" { 467 errors.WriteHTTPBadRequest(w, "cid required", nil) 468 return 469 } 470 msg, err := a.Model.GetChatMessage(cid) 471 if err != nil { 472 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 473 return 474 } 475 spmsg, err := msg.ToStreamplaceMessageView() 476 if err != nil { 477 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 478 return 479 } 480 bs, err := json.Marshal(spmsg) 481 if err != nil { 482 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 483 return 484 } 485 if _, err := w.Write(bs); err != nil { 486 log.Error(ctx, "error writing response", "error", err) 487 } 488 }) 489 490 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 491 sessions, err := a.Model.ListOAuthSessions() 492 if err != nil { 493 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 494 return 495 } 496 bs, err := json.Marshal(sessions) 497 if err != nil { 498 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 499 return 500 } 501 if _, err := w.Write(bs); err != nil { 502 log.Error(ctx, "error writing response", "error", err) 503 } 504 }) 505 506 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 507 var payload notificationpkg.NotificationBlast 508 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 509 errors.WriteHTTPBadRequest(w, "invalid request body", err) 510 return 511 } 512 notifications, err := a.Model.ListNotifications() 513 if err != nil { 514 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 515 return 516 } 517 if a.FirebaseNotifier == nil { 518 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 519 return 520 } 521 tokens := []string{} 522 for _, not := range notifications { 523 tokens = append(tokens, not.Token) 524 } 525 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 526 if err != nil { 527 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 528 return 529 } 530 w.WriteHeader(http.StatusNoContent) 531 }) 532 533 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 534 w.Header().Set("Access-Control-Allow-Origin", "*") 535 w.Header().Set("Access-Control-Allow-Methods", "PUT") 536 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 537 538 id := p.ByName("id") 539 if id == "" { 540 errors.WriteHTTPBadRequest(w, "id required", nil) 541 return 542 } 543 544 var ident model.Identity 545 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 546 errors.WriteHTTPBadRequest(w, "invalid request body", err) 547 return 548 } 549 ident.ID = id 550 551 if err := a.Model.UpdateIdentity(&ident); err != nil { 552 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 553 return 554 } 555 556 w.WriteHeader(http.StatusNoContent) 557 }) 558 559 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 560 var payload struct { 561 URL string `json:"url"` 562 } 563 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4 564 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 565 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err) 566 return 567 } 568 parts := strings.Split(payload.URL, "/") 569 if len(parts) < 5 { 570 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil) 571 return 572 } 573 didSession := parts[4] 574 idParts := strings.Split(didSession, "-") 575 if len(idParts) != 2 { 576 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil) 577 return 578 } 579 did := idParts[0] 580 // sessionID := idParts[1] 581 seg, err := a.Model.LatestSegmentForUser(did) 582 if err != nil { 583 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err) 584 return 585 } 586 spseg, err := seg.ToStreamplaceSegment() 587 if err != nil { 588 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 589 return 590 } 591 renditions, err := renditions.GenerateRenditions(spseg) 592 if err != nil { 593 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err) 594 return 595 } 596 out := map[string]any{ 597 "manifestID": didSession, 598 "profiles": renditions.ToLivepeerProfiles(), 599 } 600 bs, err := json.Marshal(out) 601 if err != nil { 602 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 603 return 604 } 605 if _, err := w.Write(bs); err != nil { 606 log.Error(ctx, "error writing response", "error", err) 607 } 608 }) 609 610 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 611 key := p.ByName("streamKey") 612 if key == "" { 613 errors.WriteHTTPBadRequest(w, "streamKey required", nil) 614 return 615 } 616 mediaSigner, err := a.MakeMediaSigner(ctx, key) 617 if err != nil { 618 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 619 return 620 } 621 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body) 622 if err != nil { 623 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err) 624 return 625 } 626 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{})) 627 if err != nil { 628 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err) 629 return 630 } 631 w.WriteHeader(200) 632 if _, err := w.Write([]byte(answer.SDP)); err != nil { 633 errors.WriteHTTPInternalServerError(w, "unable to write response", err) 634 log.Error(ctx, "error writing response", "error", err) 635 } 636 }) 637 638 handler := sloghttp.Recovery(router) 639 if log.Level(4) { 640 handler = sloghttp.New(slog.Default())(handler) 641 } 642 return handler, nil 643} 644 645func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 646 payload, err := base64.URLEncoding.DecodeString(key) 647 if err != nil { 648 return "", err 649 } 650 signed, err := a.Signer.Verify(payload) 651 if err != nil { 652 return "", err 653 } 654 _, ok := signed.Data().(*v0.StreamKey) 655 if !ok { 656 return "", fmt.Errorf("got signed data but it wasn't a stream key") 657 } 658 return strings.ToLower(signed.Signer()), nil 659}