Live video on the AT Protocol
at eli/get-segments 601 lines 18 kB view raw
1package api 2 3import ( 4 "bufio" 5 "context" 6 "encoding/base64" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log/slog" 11 "net/http" 12 "net/http/pprof" 13 "os" 14 "path/filepath" 15 "regexp" 16 "runtime" 17 rtpprof "runtime/pprof" 18 "strconv" 19 "strings" 20 "time" 21 22 "github.com/julienschmidt/httprouter" 23 "github.com/prometheus/client_golang/prometheus/promhttp" 24 sloghttp "github.com/samber/slog-http" 25 "golang.org/x/sync/errgroup" 26 "stream.place/streamplace/pkg/errors" 27 "stream.place/streamplace/pkg/log" 28 "stream.place/streamplace/pkg/media" 29 "stream.place/streamplace/pkg/mist/mistconfig" 30 "stream.place/streamplace/pkg/mist/misttriggers" 31 "stream.place/streamplace/pkg/model" 32 notificationpkg "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/renditions" 34 v0 "stream.place/streamplace/pkg/schema/v0" 35) 36 37func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 38 handler, err := a.InternalHandler(ctx) 39 if err != nil { 40 return err 41 } 42 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 43 s.Addr = a.CLI.HttpInternalAddr 44 log.Log(ctx, "http server starting", "addr", s.Addr) 45 return s.ListenAndServe() 46 }) 47} 48 49// lightweight way to authenticate push requests to ourself 50var mkvRE *regexp.Regexp 51 52func init() { 53 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 54} 55 56func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 57 router := httprouter.New() 58 broker := misttriggers.NewTriggerBroker() 59 60 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 61 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 62 // Extract the last part of the URL path 63 urlPath := payload.URL.Path 64 parts := strings.Split(urlPath, "/") 65 lastPart := "" 66 if len(parts) > 0 { 67 lastPart = parts[len(parts)-1] 68 } 69 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 70 if err != nil { 71 return "", err 72 } 73 74 ms := time.Now().UnixMilli() 75 out := fmt.Sprintf("%s+%s_%d", mistconfig.STREAM_NAME, mediaSigner.Streamer(), ms) 76 a.SignerCacheMu.Lock() 77 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 78 a.SignerCacheMu.Unlock() 79 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 80 81 return out, nil 82 }) 83 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 84 router.POST("/mist-trigger", triggerCollection.Trigger()) 85 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 86 87 // Add pprof handlers 88 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 89 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 90 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 91 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 92 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 93 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 94 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 95 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 96 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 97 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 98 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 99 100 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 101 runtime.GC() 102 w.WriteHeader(204) 103 }) 104 105 router.Handler("GET", "/metrics", promhttp.Handler()) 106 107 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 108 user := p.ByName("user") 109 if user == "" { 110 errors.WriteHTTPBadRequest(w, "user required", nil) 111 return 112 } 113 rendition := p.ByName("rendition") 114 if rendition == "" { 115 errors.WriteHTTPBadRequest(w, "rendition required", nil) 116 return 117 } 118 user, err := a.NormalizeUser(ctx, user) 119 if err != nil { 120 errors.WriteHTTPBadRequest(w, "invalid user", err) 121 return 122 } 123 w.Header().Set("content-type", "text/plain") 124 fmt.Fprintf(w, "ffconcat version 1.0\n") 125 // intermittent reports that you need two here to make things work properly? shouldn't matter. 126 for i := 0; i < 2; i += 1 { 127 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 128 } 129 }) 130 131 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 132 user := p.ByName("user") 133 if user == "" { 134 errors.WriteHTTPBadRequest(w, "user required", nil) 135 return 136 } 137 user, err := a.NormalizeUser(ctx, user) 138 if err != nil { 139 errors.WriteHTTPBadRequest(w, "invalid user", err) 140 return 141 } 142 rendition := p.ByName("rendition") 143 if rendition == "" { 144 errors.WriteHTTPBadRequest(w, "rendition required", nil) 145 return 146 } 147 seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition) 148 base := filepath.Base(seg.Filepath) 149 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 150 w.WriteHeader(301) 151 }) 152 153 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 154 user := p.ByName("user") 155 if user == "" { 156 errors.WriteHTTPBadRequest(w, "user required", nil) 157 return 158 } 159 user, err := a.NormalizeUser(ctx, user) 160 if err != nil { 161 errors.WriteHTTPBadRequest(w, "invalid user", err) 162 return 163 } 164 file := p.ByName("file") 165 if file == "" { 166 errors.WriteHTTPBadRequest(w, "file required", nil) 167 return 168 } 169 fullpath, err := a.CLI.SegmentFilePath(user, file) 170 if err != nil { 171 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 172 return 173 } 174 http.ServeFile(w, r, fullpath) 175 }) 176 177 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 178 user := p.ByName("user") 179 if user == "" { 180 errors.WriteHTTPBadRequest(w, "user required", nil) 181 return 182 } 183 rendition := p.ByName("rendition") 184 if rendition == "" { 185 errors.WriteHTTPBadRequest(w, "rendition required", nil) 186 return 187 } 188 user, err := a.NormalizeUser(ctx, user) 189 if err != nil { 190 errors.WriteHTTPBadRequest(w, "invalid user", err) 191 return 192 } 193 var delayMS int64 = 1000 194 userDelay := r.URL.Query().Get("delayms") 195 if userDelay != "" { 196 var err error 197 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 198 if err != nil { 199 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 200 return 201 } 202 if delayMS > 10000 { 203 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 204 return 205 } 206 } 207 w.Header().Set("Content-Type", "video/mp4") 208 w.WriteHeader(200) 209 g, ctx := errgroup.WithContext(ctx) 210 pr, pw := io.Pipe() 211 bufw := bufio.NewWriter(pw) 212 g.Go(func() error { 213 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 214 }) 215 g.Go(func() error { 216 time.Sleep(time.Duration(delayMS) * time.Millisecond) 217 _, err := io.Copy(w, pr) 218 return err 219 }) 220 g.Wait() 221 }) 222 223 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 224 user := p.ByName("user") 225 if user == "" { 226 errors.WriteHTTPBadRequest(w, "user required", nil) 227 return 228 } 229 w.Header().Set("Content-Type", "video/x-matroska") 230 w.Header().Set("Transfer-Encoding", "chunked") 231 w.WriteHeader(200) 232 }) 233 234 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 235 uu := p.ByName("uuid") 236 if uu == "" { 237 errors.WriteHTTPBadRequest(w, "uuid required", nil) 238 return 239 } 240 pr := a.MediaManager.GetHTTPPipeWriter(uu) 241 if pr == nil { 242 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 243 return 244 } 245 io.Copy(pr, r.Body) 246 }) 247 248 // self-destruct code, useful for dumping goroutines on windows 249 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 250 rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2) 251 log.Log(ctx, "got POST /abort, self-destructing") 252 os.Exit(1) 253 }) 254 255 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 256 key := p.ByName("key") 257 log.Log(ctx, "stream start") 258 259 var mediaSigner media.MediaSigner 260 var ok bool 261 var err error 262 parts := strings.Split(key, "_") 263 264 if len(parts) == 2 { 265 a.SignerCacheMu.Lock() 266 mediaSigner, ok = a.SignerCache[parts[0]] 267 a.SignerCacheMu.Unlock() 268 if !ok { 269 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 270 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 271 return 272 } 273 } else { 274 mediaSigner, err = a.MakeMediaSigner(ctx, key) 275 if err != nil { 276 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 277 return 278 } 279 } 280 281 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 282 283 if err != nil { 284 log.Log(ctx, "stream error", "error", err) 285 errors.WriteHTTPInternalServerError(w, "stream error", err) 286 return 287 } 288 log.Log(ctx, "stream success", "url", r.URL.String()) 289 } 290 291 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 292 router.POST("/live/:key", handleIncomingStream) 293 router.PUT("/live/:key", handleIncomingStream) 294 295 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 296 id := p.ByName("id") 297 if id == "" { 298 errors.WriteHTTPBadRequest(w, "id required", nil) 299 return 300 } 301 events, err := a.Model.PlayerReport(id) 302 if err != nil { 303 errors.WriteHTTPBadRequest(w, err.Error(), err) 304 return 305 } 306 bs, err := json.Marshal(events) 307 if err != nil { 308 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 309 return 310 } 311 w.Write(bs) 312 }) 313 314 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 315 id := p.ByName("id") 316 if id == "" { 317 errors.WriteHTTPBadRequest(w, "id required", nil) 318 return 319 } 320 segment, err := a.Model.GetSegment(id) 321 if err != nil { 322 errors.WriteHTTPBadRequest(w, err.Error(), err) 323 return 324 } 325 if segment == nil { 326 errors.WriteHTTPNotFound(w, "segment not found", nil) 327 return 328 } 329 spSeg, err := segment.ToStreamplaceSegment() 330 if err != nil { 331 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 332 return 333 } 334 bs, err := json.Marshal(spSeg) 335 if err != nil { 336 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 337 return 338 } 339 w.Write(bs) 340 }) 341 342 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 343 err := a.Model.ClearPlayerEvents() 344 if err != nil { 345 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 346 return 347 } 348 w.WriteHeader(204) 349 }) 350 351 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 352 w.Header().Set("Access-Control-Allow-Origin", "*") 353 w.Header().Set("Access-Control-Allow-Methods", "GET") 354 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 355 356 id := a.Signer.Hex() 357 358 ident, err := a.Model.GetIdentity(id) 359 if err != nil { 360 errors.WriteHTTPInternalServerError(w, "unable to get settings", err) 361 return 362 } 363 364 bs, err := json.Marshal(ident) 365 if err != nil { 366 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 367 return 368 } 369 w.Write(bs) 370 }) 371 372 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 373 user := p.ByName("user") 374 if user == "" { 375 errors.WriteHTTPBadRequest(w, "user required", nil) 376 return 377 } 378 379 followers, err := a.Model.GetUserFollowers(ctx, user) 380 if err != nil { 381 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 382 return 383 } 384 bs, err := json.Marshal(followers) 385 if err != nil { 386 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 387 return 388 } 389 w.Write(bs) 390 }) 391 392 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 393 user := p.ByName("user") 394 if user == "" { 395 errors.WriteHTTPBadRequest(w, "user required", nil) 396 return 397 } 398 399 followers, err := a.Model.GetUserFollowing(ctx, user) 400 if err != nil { 401 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 402 return 403 } 404 bs, err := json.Marshal(followers) 405 if err != nil { 406 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 407 return 408 } 409 w.Write(bs) 410 }) 411 412 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 413 notifications, err := a.Model.ListNotifications() 414 if err != nil { 415 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 416 return 417 } 418 bs, err := json.Marshal(notifications) 419 if err != nil { 420 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 421 return 422 } 423 w.Write(bs) 424 }) 425 426 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 427 posts, err := a.Model.ListFeedPosts() 428 if err != nil { 429 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 430 return 431 } 432 bs, err := json.Marshal(posts) 433 if err != nil { 434 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 435 return 436 } 437 w.Write(bs) 438 }) 439 440 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 441 cid := p.ByName("cid") 442 if cid == "" { 443 errors.WriteHTTPBadRequest(w, "cid required", nil) 444 return 445 } 446 msg, err := a.Model.GetChatMessage(cid) 447 if err != nil { 448 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 449 return 450 } 451 spmsg, err := msg.ToStreamplaceMessageView() 452 if err != nil { 453 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 454 return 455 } 456 bs, err := json.Marshal(spmsg) 457 if err != nil { 458 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 459 return 460 } 461 w.Write(bs) 462 }) 463 464 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 465 sessions, err := a.Model.ListOAuthSessions() 466 if err != nil { 467 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 468 return 469 } 470 bs, err := json.Marshal(sessions) 471 if err != nil { 472 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 473 return 474 } 475 w.Write(bs) 476 }) 477 478 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 479 var payload notificationpkg.NotificationBlast 480 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 481 errors.WriteHTTPBadRequest(w, "invalid request body", err) 482 return 483 } 484 notifications, err := a.Model.ListNotifications() 485 if err != nil { 486 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 487 return 488 } 489 if a.FirebaseNotifier == nil { 490 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 491 return 492 } 493 tokens := []string{} 494 for _, not := range notifications { 495 tokens = append(tokens, not.Token) 496 } 497 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 498 if err != nil { 499 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 500 return 501 } 502 w.WriteHeader(http.StatusNoContent) 503 }) 504 505 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 506 w.Header().Set("Access-Control-Allow-Origin", "*") 507 w.Header().Set("Access-Control-Allow-Methods", "PUT") 508 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 509 510 id := p.ByName("id") 511 if id == "" { 512 errors.WriteHTTPBadRequest(w, "id required", nil) 513 return 514 } 515 516 var ident model.Identity 517 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 518 errors.WriteHTTPBadRequest(w, "invalid request body", err) 519 return 520 } 521 ident.ID = id 522 523 if err := a.Model.UpdateIdentity(&ident); err != nil { 524 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 525 return 526 } 527 528 w.WriteHeader(http.StatusNoContent) 529 }) 530 531 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 532 var payload struct { 533 URL string `json:"url"` 534 } 535 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4 536 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 537 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err) 538 return 539 } 540 parts := strings.Split(payload.URL, "/") 541 if len(parts) < 5 { 542 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil) 543 return 544 } 545 didSession := parts[4] 546 idParts := strings.Split(didSession, "-") 547 if len(idParts) != 2 { 548 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil) 549 return 550 } 551 did := idParts[0] 552 // sessionID := idParts[1] 553 seg, err := a.Model.LatestSegmentForUser(did) 554 if err != nil { 555 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err) 556 return 557 } 558 spseg, err := seg.ToStreamplaceSegment() 559 if err != nil { 560 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 561 return 562 } 563 renditions, err := renditions.GenerateRenditions(spseg) 564 if err != nil { 565 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err) 566 return 567 } 568 out := map[string]any{ 569 "manifestID": didSession, 570 "profiles": renditions.ToLivepeerProfiles(), 571 } 572 bs, err := json.Marshal(out) 573 if err != nil { 574 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 575 return 576 } 577 w.Write(bs) 578 }) 579 580 handler := sloghttp.Recovery(router) 581 if log.Level(4) { 582 handler = sloghttp.New(slog.Default())(handler) 583 } 584 return handler, nil 585} 586 587func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 588 payload, err := base64.URLEncoding.DecodeString(key) 589 if err != nil { 590 return "", err 591 } 592 signed, err := a.Signer.Verify(payload) 593 if err != nil { 594 return "", err 595 } 596 _, ok := signed.Data().(*v0.StreamKey) 597 if !ok { 598 return "", fmt.Errorf("got signed data but it wasn't a stream key") 599 } 600 return strings.ToLower(signed.Signer()), nil 601}