Live video on the AT Protocol
at blog-link-in-docs 637 lines 20 kB view raw
1package api 2 3import ( 4 "bufio" 5 "context" 6 "encoding/base64" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log/slog" 11 "net/http" 12 "net/http/pprof" 13 "os" 14 "path/filepath" 15 "regexp" 16 "runtime" 17 rtpprof "runtime/pprof" 18 "strconv" 19 "strings" 20 "time" 21 22 "github.com/julienschmidt/httprouter" 23 "github.com/pion/webrtc/v4" 24 "github.com/prometheus/client_golang/prometheus/promhttp" 25 sloghttp "github.com/samber/slog-http" 26 "golang.org/x/sync/errgroup" 27 "stream.place/streamplace/pkg/errors" 28 "stream.place/streamplace/pkg/log" 29 "stream.place/streamplace/pkg/media" 30 "stream.place/streamplace/pkg/mist/mistconfig" 31 "stream.place/streamplace/pkg/mist/misttriggers" 32 "stream.place/streamplace/pkg/model" 33 notificationpkg "stream.place/streamplace/pkg/notifications" 34 "stream.place/streamplace/pkg/rtcrec" 35 v0 "stream.place/streamplace/pkg/schema/v0" 36) 37 38func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 39 handler, err := a.InternalHandler(ctx) 40 if err != nil { 41 return err 42 } 43 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 44 s.Addr = a.CLI.HTTPInternalAddr 45 log.Log(ctx, "http server starting", "addr", s.Addr) 46 return s.ListenAndServe() 47 }) 48} 49 50// lightweight way to authenticate push requests to ourself 51var mkvRE *regexp.Regexp 52 53func init() { 54 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 55} 56 57func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 58 router := httprouter.New() 59 broker := misttriggers.NewTriggerBroker() 60 61 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 62 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 63 // Extract the last part of the URL path 64 urlPath := payload.URL.Path 65 parts := strings.Split(urlPath, "/") 66 lastPart := "" 67 if len(parts) > 0 { 68 lastPart = parts[len(parts)-1] 69 } 70 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 71 if err != nil { 72 return "", err 73 } 74 75 ms := time.Now().UnixMilli() 76 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms) 77 a.SignerCacheMu.Lock() 78 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 79 a.SignerCacheMu.Unlock() 80 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 81 82 return out, nil 83 }) 84 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 85 router.POST("/mist-trigger", triggerCollection.Trigger()) 86 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 87 88 // Add pprof handlers 89 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 90 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 91 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 92 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 93 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 94 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 95 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 96 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 97 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 98 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 99 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 100 101 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 102 runtime.GC() 103 w.WriteHeader(204) 104 }) 105 106 router.Handler("GET", "/metrics", promhttp.Handler()) 107 108 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 109 user := p.ByName("user") 110 if user == "" { 111 errors.WriteHTTPBadRequest(w, "user required", nil) 112 return 113 } 114 rendition := p.ByName("rendition") 115 if rendition == "" { 116 errors.WriteHTTPBadRequest(w, "rendition required", nil) 117 return 118 } 119 user, err := a.NormalizeUser(ctx, user) 120 if err != nil { 121 errors.WriteHTTPBadRequest(w, "invalid user", err) 122 return 123 } 124 w.Header().Set("content-type", "text/plain") 125 fmt.Fprintf(w, "ffconcat version 1.0\n") 126 // intermittent reports that you need two here to make things work properly? shouldn't matter. 127 for i := 0; i < 2; i += 1 { 128 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 129 } 130 }) 131 132 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 133 user := p.ByName("user") 134 if user == "" { 135 errors.WriteHTTPBadRequest(w, "user required", nil) 136 return 137 } 138 user, err := a.NormalizeUser(ctx, user) 139 if err != nil { 140 errors.WriteHTTPBadRequest(w, "invalid user", err) 141 return 142 } 143 rendition := p.ByName("rendition") 144 if rendition == "" { 145 errors.WriteHTTPBadRequest(w, "rendition required", nil) 146 return 147 } 148 segChan := a.Bus.SubscribeSegment(ctx, user, rendition) 149 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan) 150 seg := <-segChan.C 151 base := filepath.Base(seg.Filepath) 152 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 153 w.WriteHeader(301) 154 }) 155 156 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 157 user := p.ByName("user") 158 if user == "" { 159 errors.WriteHTTPBadRequest(w, "user required", nil) 160 return 161 } 162 user, err := a.NormalizeUser(ctx, user) 163 if err != nil { 164 errors.WriteHTTPBadRequest(w, "invalid user", err) 165 return 166 } 167 file := p.ByName("file") 168 if file == "" { 169 errors.WriteHTTPBadRequest(w, "file required", nil) 170 return 171 } 172 fullpath, err := a.CLI.SegmentFilePath(user, file) 173 if err != nil { 174 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 175 return 176 } 177 http.ServeFile(w, r, fullpath) 178 }) 179 180 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 181 user := p.ByName("user") 182 if user == "" { 183 errors.WriteHTTPBadRequest(w, "user required", nil) 184 return 185 } 186 rendition := p.ByName("rendition") 187 if rendition == "" { 188 errors.WriteHTTPBadRequest(w, "rendition required", nil) 189 return 190 } 191 user, err := a.NormalizeUser(ctx, user) 192 if err != nil { 193 errors.WriteHTTPBadRequest(w, "invalid user", err) 194 return 195 } 196 var delayMS int64 = 1000 197 userDelay := r.URL.Query().Get("delayms") 198 if userDelay != "" { 199 var err error 200 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 201 if err != nil { 202 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 203 return 204 } 205 if delayMS > 10000 { 206 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 207 return 208 } 209 } 210 w.Header().Set("Content-Type", "video/mp4") 211 w.WriteHeader(200) 212 g, ctx := errgroup.WithContext(ctx) 213 pr, pw := io.Pipe() 214 bufw := bufio.NewWriter(pw) 215 g.Go(func() error { 216 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 217 }) 218 g.Go(func() error { 219 time.Sleep(time.Duration(delayMS) * time.Millisecond) 220 _, err := io.Copy(w, pr) 221 return err 222 }) 223 if err := g.Wait(); err != nil { 224 errors.WriteHTTPBadRequest(w, "request failed", err) 225 } 226 }) 227 228 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 229 user := p.ByName("user") 230 if user == "" { 231 errors.WriteHTTPBadRequest(w, "user required", nil) 232 return 233 } 234 w.Header().Set("Content-Type", "video/x-matroska") 235 w.Header().Set("Transfer-Encoding", "chunked") 236 w.WriteHeader(200) 237 }) 238 239 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 240 uu := p.ByName("uuid") 241 if uu == "" { 242 errors.WriteHTTPBadRequest(w, "uuid required", nil) 243 return 244 } 245 pr := a.MediaManager.GetHTTPPipeWriter(uu) 246 if pr == nil { 247 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 248 return 249 } 250 if _, err := io.Copy(pr, r.Body); err != nil { 251 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil) 252 } 253 }) 254 255 // self-destruct code, useful for dumping goroutines on windows 256 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 257 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 258 log.Log(ctx, "error writing rtpprof", "error", err) 259 } 260 log.Log(ctx, "got POST /abort, self-destructing") 261 os.Exit(1) 262 }) 263 264 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 265 key := p.ByName("key") 266 log.Log(ctx, "stream start") 267 268 var mediaSigner media.MediaSigner 269 var ok bool 270 var err error 271 parts := strings.Split(key, "_") 272 273 if len(parts) == 2 { 274 a.SignerCacheMu.Lock() 275 mediaSigner, ok = a.SignerCache[parts[0]] 276 a.SignerCacheMu.Unlock() 277 if !ok { 278 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 279 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 280 return 281 } 282 } else { 283 mediaSigner, err = a.MakeMediaSigner(ctx, key) 284 if err != nil { 285 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 286 return 287 } 288 } 289 290 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 291 292 if err != nil { 293 log.Log(ctx, "stream error", "error", err) 294 errors.WriteHTTPInternalServerError(w, "stream error", err) 295 return 296 } 297 log.Log(ctx, "stream success", "url", r.URL.String()) 298 } 299 300 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 301 router.POST("/live/:key", handleIncomingStream) 302 router.PUT("/live/:key", handleIncomingStream) 303 304 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 305 id := p.ByName("id") 306 if id == "" { 307 errors.WriteHTTPBadRequest(w, "id required", nil) 308 return 309 } 310 events, err := a.Model.PlayerReport(id) 311 if err != nil { 312 errors.WriteHTTPBadRequest(w, err.Error(), err) 313 return 314 } 315 bs, err := json.Marshal(events) 316 if err != nil { 317 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 318 return 319 } 320 if _, err := w.Write(bs); err != nil { 321 log.Error(ctx, "error writing response", "error", err) 322 } 323 }) 324 325 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 326 id := p.ByName("id") 327 if id == "" { 328 errors.WriteHTTPBadRequest(w, "id required", nil) 329 return 330 } 331 segment, err := a.Model.GetSegment(id) 332 if err != nil { 333 errors.WriteHTTPBadRequest(w, err.Error(), err) 334 return 335 } 336 if segment == nil { 337 errors.WriteHTTPNotFound(w, "segment not found", nil) 338 return 339 } 340 spSeg, err := segment.ToStreamplaceSegment() 341 if err != nil { 342 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 343 return 344 } 345 bs, err := json.Marshal(spSeg) 346 if err != nil { 347 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 348 return 349 } 350 if _, err := w.Write(bs); err != nil { 351 log.Error(ctx, "error writing response", "error", err) 352 } 353 }) 354 355 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 356 err := a.Model.ClearPlayerEvents() 357 if err != nil { 358 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 359 return 360 } 361 w.WriteHeader(204) 362 }) 363 364 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 365 w.Header().Set("Access-Control-Allow-Origin", "*") 366 w.Header().Set("Access-Control-Allow-Methods", "GET") 367 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 368 369 id := a.Signer.Hex() 370 371 ident, err := a.Model.GetIdentity(id) 372 if err != nil { 373 errors.WriteHTTPInternalServerError(w, "unable to get settings", err) 374 return 375 } 376 377 bs, err := json.Marshal(ident) 378 if err != nil { 379 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 380 return 381 } 382 if _, err := w.Write(bs); err != nil { 383 log.Error(ctx, "error writing response", "error", err) 384 } 385 }) 386 387 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 388 user := p.ByName("user") 389 if user == "" { 390 errors.WriteHTTPBadRequest(w, "user required", nil) 391 return 392 } 393 394 followers, err := a.Model.GetUserFollowers(ctx, user) 395 if err != nil { 396 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 397 return 398 } 399 bs, err := json.Marshal(followers) 400 if err != nil { 401 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 402 return 403 } 404 if _, err := w.Write(bs); err != nil { 405 log.Error(ctx, "error writing response", "error", err) 406 } 407 }) 408 409 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 410 user := p.ByName("user") 411 if user == "" { 412 errors.WriteHTTPBadRequest(w, "user required", nil) 413 return 414 } 415 416 followers, err := a.Model.GetUserFollowing(ctx, user) 417 if err != nil { 418 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 419 return 420 } 421 bs, err := json.Marshal(followers) 422 if err != nil { 423 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 424 return 425 } 426 if _, err := w.Write(bs); err != nil { 427 log.Error(ctx, "error writing response", "error", err) 428 } 429 }) 430 431 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 432 notifications, err := a.Model.ListNotifications() 433 if err != nil { 434 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 435 return 436 } 437 bs, err := json.Marshal(notifications) 438 if err != nil { 439 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 440 return 441 } 442 if _, err := w.Write(bs); err != nil { 443 log.Error(ctx, "error writing response", "error", err) 444 } 445 }) 446 447 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 448 posts, err := a.Model.ListFeedPosts() 449 if err != nil { 450 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 451 return 452 } 453 bs, err := json.Marshal(posts) 454 if err != nil { 455 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 456 return 457 } 458 if _, err := w.Write(bs); err != nil { 459 log.Error(ctx, "error writing response", "error", err) 460 } 461 }) 462 463 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 464 cid := p.ByName("cid") 465 if cid == "" { 466 errors.WriteHTTPBadRequest(w, "cid required", nil) 467 return 468 } 469 msg, err := a.Model.GetChatMessage(cid) 470 if err != nil { 471 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 472 return 473 } 474 spmsg, err := msg.ToStreamplaceMessageView() 475 if err != nil { 476 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 477 return 478 } 479 bs, err := json.Marshal(spmsg) 480 if err != nil { 481 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 482 return 483 } 484 if _, err := w.Write(bs); err != nil { 485 log.Error(ctx, "error writing response", "error", err) 486 } 487 }) 488 489 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 490 sessions, err := a.Model.ListOAuthSessions() 491 if err != nil { 492 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 493 return 494 } 495 bs, err := json.Marshal(sessions) 496 if err != nil { 497 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 498 return 499 } 500 if _, err := w.Write(bs); err != nil { 501 log.Error(ctx, "error writing response", "error", err) 502 } 503 }) 504 505 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 506 var payload notificationpkg.NotificationBlast 507 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 508 errors.WriteHTTPBadRequest(w, "invalid request body", err) 509 return 510 } 511 notifications, err := a.Model.ListNotifications() 512 if err != nil { 513 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 514 return 515 } 516 if a.FirebaseNotifier == nil { 517 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 518 return 519 } 520 tokens := []string{} 521 for _, not := range notifications { 522 tokens = append(tokens, not.Token) 523 } 524 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 525 if err != nil { 526 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 527 return 528 } 529 w.WriteHeader(http.StatusNoContent) 530 }) 531 532 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 533 w.Header().Set("Access-Control-Allow-Origin", "*") 534 w.Header().Set("Access-Control-Allow-Methods", "PUT") 535 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 536 537 id := p.ByName("id") 538 if id == "" { 539 errors.WriteHTTPBadRequest(w, "id required", nil) 540 return 541 } 542 543 var ident model.Identity 544 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 545 errors.WriteHTTPBadRequest(w, "invalid request body", err) 546 return 547 } 548 ident.ID = id 549 550 if err := a.Model.UpdateIdentity(&ident); err != nil { 551 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 552 return 553 } 554 555 w.WriteHeader(http.StatusNoContent) 556 }) 557 558 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 559 key := p.ByName("streamKey") 560 if key == "" { 561 errors.WriteHTTPBadRequest(w, "streamKey required", nil) 562 return 563 } 564 mediaSigner, err := a.MakeMediaSigner(ctx, key) 565 if err != nil { 566 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 567 return 568 } 569 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body) 570 if err != nil { 571 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err) 572 return 573 } 574 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{})) 575 if err != nil { 576 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err) 577 return 578 } 579 w.WriteHeader(200) 580 if _, err := w.Write([]byte(answer.SDP)); err != nil { 581 errors.WriteHTTPInternalServerError(w, "unable to write response", err) 582 log.Error(ctx, "error writing response", "error", err) 583 } 584 }) 585 586 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 587 did := p.ByName("did") 588 if did == "" { 589 errors.WriteHTTPBadRequest(w, "did required", nil) 590 return 591 } 592 user, err := a.NormalizeUser(ctx, did) 593 if err != nil { 594 errors.WriteHTTPBadRequest(w, "invalid user", err) 595 return 596 } 597 secsStr := r.URL.Query().Get("secs") 598 secs := 60 // Default to 60 seconds 599 if secsStr != "" { 600 parsedSecs, err := strconv.Atoi(secsStr) 601 if err != nil { 602 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err) 603 return 604 } 605 secs = parsedSecs 606 } 607 after := time.Now().Add(-time.Duration(secs) * time.Second) 608 w.Header().Set("Content-Type", "video/mp4") 609 err = media.ClipUser(ctx, a.Model, a.CLI, user, w, nil, &after) 610 if err != nil { 611 errors.WriteHTTPInternalServerError(w, "unable to clip user", err) 612 return 613 } 614 }) 615 616 handler := sloghttp.Recovery(router) 617 if log.Level(4) { 618 handler = sloghttp.New(slog.Default())(handler) 619 } 620 return handler, nil 621} 622 623func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 624 payload, err := base64.URLEncoding.DecodeString(key) 625 if err != nil { 626 return "", err 627 } 628 signed, err := a.Signer.Verify(payload) 629 if err != nil { 630 return "", err 631 } 632 _, ok := signed.Data().(*v0.StreamKey) 633 if !ok { 634 return "", fmt.Errorf("got signed data but it wasn't a stream key") 635 } 636 return strings.ToLower(signed.Signer()), nil 637}