Live video on the AT Protocol
at eli/further-gitlab-ci 627 lines 19 kB view raw
1package api 2 3import ( 4 "bufio" 5 "context" 6 "encoding/base64" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log/slog" 11 "net/http" 12 "net/http/pprof" 13 "os" 14 "path/filepath" 15 "regexp" 16 "runtime" 17 rtpprof "runtime/pprof" 18 "strconv" 19 "strings" 20 "time" 21 22 "github.com/julienschmidt/httprouter" 23 "github.com/prometheus/client_golang/prometheus/promhttp" 24 sloghttp "github.com/samber/slog-http" 25 "golang.org/x/sync/errgroup" 26 "stream.place/streamplace/pkg/errors" 27 "stream.place/streamplace/pkg/log" 28 "stream.place/streamplace/pkg/media" 29 "stream.place/streamplace/pkg/mist/mistconfig" 30 "stream.place/streamplace/pkg/mist/misttriggers" 31 "stream.place/streamplace/pkg/model" 32 notificationpkg "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/renditions" 34 v0 "stream.place/streamplace/pkg/schema/v0" 35) 36 37func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 38 handler, err := a.InternalHandler(ctx) 39 if err != nil { 40 return err 41 } 42 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 43 s.Addr = a.CLI.HTTPInternalAddr 44 log.Log(ctx, "http server starting", "addr", s.Addr) 45 return s.ListenAndServe() 46 }) 47} 48 49// lightweight way to authenticate push requests to ourself 50var mkvRE *regexp.Regexp 51 52func init() { 53 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 54} 55 56func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 57 router := httprouter.New() 58 broker := misttriggers.NewTriggerBroker() 59 60 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 61 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 62 // Extract the last part of the URL path 63 urlPath := payload.URL.Path 64 parts := strings.Split(urlPath, "/") 65 lastPart := "" 66 if len(parts) > 0 { 67 lastPart = parts[len(parts)-1] 68 } 69 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 70 if err != nil { 71 return "", err 72 } 73 74 ms := time.Now().UnixMilli() 75 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms) 76 a.SignerCacheMu.Lock() 77 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 78 a.SignerCacheMu.Unlock() 79 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 80 81 return out, nil 82 }) 83 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 84 router.POST("/mist-trigger", triggerCollection.Trigger()) 85 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 86 87 // Add pprof handlers 88 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 89 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 90 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 91 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 92 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 93 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 94 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 95 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 96 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 97 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 98 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 99 100 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 101 runtime.GC() 102 w.WriteHeader(204) 103 }) 104 105 router.Handler("GET", "/metrics", promhttp.Handler()) 106 107 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 108 user := p.ByName("user") 109 if user == "" { 110 errors.WriteHTTPBadRequest(w, "user required", nil) 111 return 112 } 113 rendition := p.ByName("rendition") 114 if rendition == "" { 115 errors.WriteHTTPBadRequest(w, "rendition required", nil) 116 return 117 } 118 user, err := a.NormalizeUser(ctx, user) 119 if err != nil { 120 errors.WriteHTTPBadRequest(w, "invalid user", err) 121 return 122 } 123 w.Header().Set("content-type", "text/plain") 124 fmt.Fprintf(w, "ffconcat version 1.0\n") 125 // intermittent reports that you need two here to make things work properly? shouldn't matter. 126 for i := 0; i < 2; i += 1 { 127 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 128 } 129 }) 130 131 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 132 user := p.ByName("user") 133 if user == "" { 134 errors.WriteHTTPBadRequest(w, "user required", nil) 135 return 136 } 137 user, err := a.NormalizeUser(ctx, user) 138 if err != nil { 139 errors.WriteHTTPBadRequest(w, "invalid user", err) 140 return 141 } 142 rendition := p.ByName("rendition") 143 if rendition == "" { 144 errors.WriteHTTPBadRequest(w, "rendition required", nil) 145 return 146 } 147 seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition) 148 base := filepath.Base(seg.Filepath) 149 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 150 w.WriteHeader(301) 151 }) 152 153 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 154 user := p.ByName("user") 155 if user == "" { 156 errors.WriteHTTPBadRequest(w, "user required", nil) 157 return 158 } 159 user, err := a.NormalizeUser(ctx, user) 160 if err != nil { 161 errors.WriteHTTPBadRequest(w, "invalid user", err) 162 return 163 } 164 file := p.ByName("file") 165 if file == "" { 166 errors.WriteHTTPBadRequest(w, "file required", nil) 167 return 168 } 169 fullpath, err := a.CLI.SegmentFilePath(user, file) 170 if err != nil { 171 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 172 return 173 } 174 http.ServeFile(w, r, fullpath) 175 }) 176 177 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 178 user := p.ByName("user") 179 if user == "" { 180 errors.WriteHTTPBadRequest(w, "user required", nil) 181 return 182 } 183 rendition := p.ByName("rendition") 184 if rendition == "" { 185 errors.WriteHTTPBadRequest(w, "rendition required", nil) 186 return 187 } 188 user, err := a.NormalizeUser(ctx, user) 189 if err != nil { 190 errors.WriteHTTPBadRequest(w, "invalid user", err) 191 return 192 } 193 var delayMS int64 = 1000 194 userDelay := r.URL.Query().Get("delayms") 195 if userDelay != "" { 196 var err error 197 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 198 if err != nil { 199 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 200 return 201 } 202 if delayMS > 10000 { 203 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 204 return 205 } 206 } 207 w.Header().Set("Content-Type", "video/mp4") 208 w.WriteHeader(200) 209 g, ctx := errgroup.WithContext(ctx) 210 pr, pw := io.Pipe() 211 bufw := bufio.NewWriter(pw) 212 g.Go(func() error { 213 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 214 }) 215 g.Go(func() error { 216 time.Sleep(time.Duration(delayMS) * time.Millisecond) 217 _, err := io.Copy(w, pr) 218 return err 219 }) 220 if err := g.Wait(); err != nil { 221 errors.WriteHTTPBadRequest(w, "request failed", err) 222 } 223 }) 224 225 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 226 user := p.ByName("user") 227 if user == "" { 228 errors.WriteHTTPBadRequest(w, "user required", nil) 229 return 230 } 231 w.Header().Set("Content-Type", "video/x-matroska") 232 w.Header().Set("Transfer-Encoding", "chunked") 233 w.WriteHeader(200) 234 }) 235 236 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 237 uu := p.ByName("uuid") 238 if uu == "" { 239 errors.WriteHTTPBadRequest(w, "uuid required", nil) 240 return 241 } 242 pr := a.MediaManager.GetHTTPPipeWriter(uu) 243 if pr == nil { 244 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 245 return 246 } 247 if _, err := io.Copy(pr, r.Body); err != nil { 248 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil) 249 } 250 }) 251 252 // self-destruct code, useful for dumping goroutines on windows 253 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 254 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 255 log.Log(ctx, "error writing rtpprof", "error", err) 256 } 257 log.Log(ctx, "got POST /abort, self-destructing") 258 os.Exit(1) 259 }) 260 261 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 262 key := p.ByName("key") 263 log.Log(ctx, "stream start") 264 265 var mediaSigner media.MediaSigner 266 var ok bool 267 var err error 268 parts := strings.Split(key, "_") 269 270 if len(parts) == 2 { 271 a.SignerCacheMu.Lock() 272 mediaSigner, ok = a.SignerCache[parts[0]] 273 a.SignerCacheMu.Unlock() 274 if !ok { 275 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 276 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 277 return 278 } 279 } else { 280 mediaSigner, err = a.MakeMediaSigner(ctx, key) 281 if err != nil { 282 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 283 return 284 } 285 } 286 287 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 288 289 if err != nil { 290 log.Log(ctx, "stream error", "error", err) 291 errors.WriteHTTPInternalServerError(w, "stream error", err) 292 return 293 } 294 log.Log(ctx, "stream success", "url", r.URL.String()) 295 } 296 297 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 298 router.POST("/live/:key", handleIncomingStream) 299 router.PUT("/live/:key", handleIncomingStream) 300 301 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 302 id := p.ByName("id") 303 if id == "" { 304 errors.WriteHTTPBadRequest(w, "id required", nil) 305 return 306 } 307 events, err := a.Model.PlayerReport(id) 308 if err != nil { 309 errors.WriteHTTPBadRequest(w, err.Error(), err) 310 return 311 } 312 bs, err := json.Marshal(events) 313 if err != nil { 314 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 315 return 316 } 317 if _, err := w.Write(bs); err != nil { 318 log.Error(ctx, "error writing response", "error", err) 319 } 320 }) 321 322 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 323 id := p.ByName("id") 324 if id == "" { 325 errors.WriteHTTPBadRequest(w, "id required", nil) 326 return 327 } 328 segment, err := a.Model.GetSegment(id) 329 if err != nil { 330 errors.WriteHTTPBadRequest(w, err.Error(), err) 331 return 332 } 333 if segment == nil { 334 errors.WriteHTTPNotFound(w, "segment not found", nil) 335 return 336 } 337 spSeg, err := segment.ToStreamplaceSegment() 338 if err != nil { 339 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 340 return 341 } 342 bs, err := json.Marshal(spSeg) 343 if err != nil { 344 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 345 return 346 } 347 if _, err := w.Write(bs); err != nil { 348 log.Error(ctx, "error writing response", "error", err) 349 } 350 }) 351 352 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 353 err := a.Model.ClearPlayerEvents() 354 if err != nil { 355 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 356 return 357 } 358 w.WriteHeader(204) 359 }) 360 361 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 362 w.Header().Set("Access-Control-Allow-Origin", "*") 363 w.Header().Set("Access-Control-Allow-Methods", "GET") 364 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 365 366 id := a.Signer.Hex() 367 368 ident, err := a.Model.GetIdentity(id) 369 if err != nil { 370 errors.WriteHTTPInternalServerError(w, "unable to get settings", err) 371 return 372 } 373 374 bs, err := json.Marshal(ident) 375 if err != nil { 376 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 377 return 378 } 379 if _, err := w.Write(bs); err != nil { 380 log.Error(ctx, "error writing response", "error", err) 381 } 382 }) 383 384 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 385 user := p.ByName("user") 386 if user == "" { 387 errors.WriteHTTPBadRequest(w, "user required", nil) 388 return 389 } 390 391 followers, err := a.Model.GetUserFollowers(ctx, user) 392 if err != nil { 393 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 394 return 395 } 396 bs, err := json.Marshal(followers) 397 if err != nil { 398 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 399 return 400 } 401 if _, err := w.Write(bs); err != nil { 402 log.Error(ctx, "error writing response", "error", err) 403 } 404 }) 405 406 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 407 user := p.ByName("user") 408 if user == "" { 409 errors.WriteHTTPBadRequest(w, "user required", nil) 410 return 411 } 412 413 followers, err := a.Model.GetUserFollowing(ctx, user) 414 if err != nil { 415 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 416 return 417 } 418 bs, err := json.Marshal(followers) 419 if err != nil { 420 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 421 return 422 } 423 if _, err := w.Write(bs); err != nil { 424 log.Error(ctx, "error writing response", "error", err) 425 } 426 }) 427 428 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 429 notifications, err := a.Model.ListNotifications() 430 if err != nil { 431 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 432 return 433 } 434 bs, err := json.Marshal(notifications) 435 if err != nil { 436 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 437 return 438 } 439 if _, err := w.Write(bs); err != nil { 440 log.Error(ctx, "error writing response", "error", err) 441 } 442 }) 443 444 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 445 posts, err := a.Model.ListFeedPosts() 446 if err != nil { 447 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 448 return 449 } 450 bs, err := json.Marshal(posts) 451 if err != nil { 452 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 453 return 454 } 455 if _, err := w.Write(bs); err != nil { 456 log.Error(ctx, "error writing response", "error", err) 457 } 458 }) 459 460 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 461 cid := p.ByName("cid") 462 if cid == "" { 463 errors.WriteHTTPBadRequest(w, "cid required", nil) 464 return 465 } 466 msg, err := a.Model.GetChatMessage(cid) 467 if err != nil { 468 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 469 return 470 } 471 spmsg, err := msg.ToStreamplaceMessageView() 472 if err != nil { 473 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 474 return 475 } 476 bs, err := json.Marshal(spmsg) 477 if err != nil { 478 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 479 return 480 } 481 if _, err := w.Write(bs); err != nil { 482 log.Error(ctx, "error writing response", "error", err) 483 } 484 }) 485 486 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 487 sessions, err := a.Model.ListOAuthSessions() 488 if err != nil { 489 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 490 return 491 } 492 bs, err := json.Marshal(sessions) 493 if err != nil { 494 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 495 return 496 } 497 if _, err := w.Write(bs); err != nil { 498 log.Error(ctx, "error writing response", "error", err) 499 } 500 }) 501 502 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 503 var payload notificationpkg.NotificationBlast 504 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 505 errors.WriteHTTPBadRequest(w, "invalid request body", err) 506 return 507 } 508 notifications, err := a.Model.ListNotifications() 509 if err != nil { 510 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 511 return 512 } 513 if a.FirebaseNotifier == nil { 514 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 515 return 516 } 517 tokens := []string{} 518 for _, not := range notifications { 519 tokens = append(tokens, not.Token) 520 } 521 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 522 if err != nil { 523 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 524 return 525 } 526 w.WriteHeader(http.StatusNoContent) 527 }) 528 529 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 530 w.Header().Set("Access-Control-Allow-Origin", "*") 531 w.Header().Set("Access-Control-Allow-Methods", "PUT") 532 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 533 534 id := p.ByName("id") 535 if id == "" { 536 errors.WriteHTTPBadRequest(w, "id required", nil) 537 return 538 } 539 540 var ident model.Identity 541 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 542 errors.WriteHTTPBadRequest(w, "invalid request body", err) 543 return 544 } 545 ident.ID = id 546 547 if err := a.Model.UpdateIdentity(&ident); err != nil { 548 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 549 return 550 } 551 552 w.WriteHeader(http.StatusNoContent) 553 }) 554 555 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 556 var payload struct { 557 URL string `json:"url"` 558 } 559 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4 560 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 561 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err) 562 return 563 } 564 parts := strings.Split(payload.URL, "/") 565 if len(parts) < 5 { 566 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil) 567 return 568 } 569 didSession := parts[4] 570 idParts := strings.Split(didSession, "-") 571 if len(idParts) != 2 { 572 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil) 573 return 574 } 575 did := idParts[0] 576 // sessionID := idParts[1] 577 seg, err := a.Model.LatestSegmentForUser(did) 578 if err != nil { 579 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err) 580 return 581 } 582 spseg, err := seg.ToStreamplaceSegment() 583 if err != nil { 584 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 585 return 586 } 587 renditions, err := renditions.GenerateRenditions(spseg) 588 if err != nil { 589 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err) 590 return 591 } 592 out := map[string]any{ 593 "manifestID": didSession, 594 "profiles": renditions.ToLivepeerProfiles(), 595 } 596 bs, err := json.Marshal(out) 597 if err != nil { 598 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 599 return 600 } 601 if _, err := w.Write(bs); err != nil { 602 log.Error(ctx, "error writing response", "error", err) 603 } 604 }) 605 606 handler := sloghttp.Recovery(router) 607 if log.Level(4) { 608 handler = sloghttp.New(slog.Default())(handler) 609 } 610 return handler, nil 611} 612 613func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 614 payload, err := base64.URLEncoding.DecodeString(key) 615 if err != nil { 616 return "", err 617 } 618 signed, err := a.Signer.Verify(payload) 619 if err != nil { 620 return "", err 621 } 622 _, ok := signed.Data().(*v0.StreamKey) 623 if !ok { 624 return "", fmt.Errorf("got signed data but it wasn't a stream key") 625 } 626 return strings.ToLower(signed.Signer()), nil 627}