Live video on the AT Protocol
at eli/oauth-in-dev-stinks 563 lines 17 kB view raw
1package api 2 3import ( 4 "bufio" 5 "context" 6 "encoding/base64" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log/slog" 11 "net/http" 12 "net/http/pprof" 13 "os" 14 "path/filepath" 15 "regexp" 16 "runtime" 17 rtpprof "runtime/pprof" 18 "strconv" 19 "strings" 20 "time" 21 22 "github.com/julienschmidt/httprouter" 23 "github.com/prometheus/client_golang/prometheus/promhttp" 24 sloghttp "github.com/samber/slog-http" 25 "golang.org/x/sync/errgroup" 26 "stream.place/streamplace/pkg/errors" 27 "stream.place/streamplace/pkg/log" 28 "stream.place/streamplace/pkg/mist/mistconfig" 29 "stream.place/streamplace/pkg/mist/misttriggers" 30 "stream.place/streamplace/pkg/model" 31 notificationpkg "stream.place/streamplace/pkg/notifications" 32 "stream.place/streamplace/pkg/renditions" 33 v0 "stream.place/streamplace/pkg/schema/v0" 34) 35 36func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 37 handler, err := a.InternalHandler(ctx) 38 if err != nil { 39 return err 40 } 41 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 42 s.Addr = a.CLI.HttpInternalAddr 43 log.Log(ctx, "http server starting", "addr", s.Addr) 44 return s.ListenAndServe() 45 }) 46} 47 48// lightweight way to authenticate push requests to ourself 49var mkvRE *regexp.Regexp 50 51func init() { 52 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 53} 54 55func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 56 router := httprouter.New() 57 broker := misttriggers.NewTriggerBroker() 58 broker.OnPushOutStart(func(ctx context.Context, payload *misttriggers.PushOutStartPayload) (string, error) { 59 return payload.URL, nil 60 }) 61 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 62 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 63 64 ms := time.Now().UnixMilli() 65 out := fmt.Sprintf("%s+%s_%d", mistconfig.STREAM_NAME, payload.StreamName, ms) 66 67 return out, nil 68 }) 69 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 70 router.POST("/mist-trigger", triggerCollection.Trigger()) 71 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 72 73 // Add pprof handlers 74 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 75 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 76 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 77 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 78 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 79 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 80 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 81 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 82 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 83 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 84 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 85 86 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 87 runtime.GC() 88 w.WriteHeader(204) 89 }) 90 91 router.Handler("GET", "/metrics", promhttp.Handler()) 92 93 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 94 user := p.ByName("user") 95 if user == "" { 96 errors.WriteHTTPBadRequest(w, "user required", nil) 97 return 98 } 99 rendition := p.ByName("rendition") 100 if rendition == "" { 101 errors.WriteHTTPBadRequest(w, "rendition required", nil) 102 return 103 } 104 user, err := a.NormalizeUser(ctx, user) 105 if err != nil { 106 errors.WriteHTTPBadRequest(w, "invalid user", err) 107 return 108 } 109 w.Header().Set("content-type", "text/plain") 110 fmt.Fprintf(w, "ffconcat version 1.0\n") 111 // intermittent reports that you need two here to make things work properly? shouldn't matter. 112 for i := 0; i < 2; i += 1 { 113 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 114 } 115 }) 116 117 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 118 user := p.ByName("user") 119 if user == "" { 120 errors.WriteHTTPBadRequest(w, "user required", nil) 121 return 122 } 123 user, err := a.NormalizeUser(ctx, user) 124 if err != nil { 125 errors.WriteHTTPBadRequest(w, "invalid user", err) 126 return 127 } 128 rendition := p.ByName("rendition") 129 if rendition == "" { 130 errors.WriteHTTPBadRequest(w, "rendition required", nil) 131 return 132 } 133 seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition) 134 base := filepath.Base(seg.Filepath) 135 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 136 w.WriteHeader(301) 137 }) 138 139 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 140 user := p.ByName("user") 141 if user == "" { 142 errors.WriteHTTPBadRequest(w, "user required", nil) 143 return 144 } 145 user, err := a.NormalizeUser(ctx, user) 146 if err != nil { 147 errors.WriteHTTPBadRequest(w, "invalid user", err) 148 return 149 } 150 file := p.ByName("file") 151 if file == "" { 152 errors.WriteHTTPBadRequest(w, "file required", nil) 153 return 154 } 155 fullpath, err := a.CLI.SegmentFilePath(user, file) 156 if err != nil { 157 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 158 return 159 } 160 http.ServeFile(w, r, fullpath) 161 }) 162 163 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 164 user := p.ByName("user") 165 if user == "" { 166 errors.WriteHTTPBadRequest(w, "user required", nil) 167 return 168 } 169 rendition := p.ByName("rendition") 170 if rendition == "" { 171 errors.WriteHTTPBadRequest(w, "rendition required", nil) 172 return 173 } 174 user, err := a.NormalizeUser(ctx, user) 175 if err != nil { 176 errors.WriteHTTPBadRequest(w, "invalid user", err) 177 return 178 } 179 var delayMS int64 = 1000 180 userDelay := r.URL.Query().Get("delayms") 181 if userDelay != "" { 182 var err error 183 delayMS, err = strconv.ParseInt(userDelay, 10, 64) 184 if err != nil { 185 errors.WriteHTTPBadRequest(w, "error parsing delay", err) 186 return 187 } 188 if delayMS > 10000 { 189 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil) 190 return 191 } 192 } 193 w.Header().Set("Content-Type", "video/mp4") 194 w.WriteHeader(200) 195 g, ctx := errgroup.WithContext(ctx) 196 pr, pw := io.Pipe() 197 bufw := bufio.NewWriter(pw) 198 g.Go(func() error { 199 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw) 200 }) 201 g.Go(func() error { 202 time.Sleep(time.Duration(delayMS) * time.Millisecond) 203 _, err := io.Copy(w, pr) 204 return err 205 }) 206 g.Wait() 207 }) 208 209 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 210 user := p.ByName("user") 211 if user == "" { 212 errors.WriteHTTPBadRequest(w, "user required", nil) 213 return 214 } 215 w.Header().Set("Content-Type", "video/x-matroska") 216 w.Header().Set("Transfer-Encoding", "chunked") 217 w.WriteHeader(200) 218 }) 219 220 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 221 uu := p.ByName("uuid") 222 if uu == "" { 223 errors.WriteHTTPBadRequest(w, "uuid required", nil) 224 return 225 } 226 pr := a.MediaManager.GetHTTPPipeWriter(uu) 227 if pr == nil { 228 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 229 return 230 } 231 io.Copy(pr, r.Body) 232 }) 233 234 // self-destruct code, useful for dumping goroutines on windows 235 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 236 rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2) 237 log.Log(ctx, "got POST /abort, self-destructing") 238 os.Exit(1) 239 }) 240 241 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 242 log.Log(ctx, "stream start") 243 err := a.MediaManager.IngestStream(ctx, r.Body, a.MediaSigner) 244 245 if err != nil { 246 log.Log(ctx, "stream error", "error", err) 247 errors.WriteHTTPInternalServerError(w, "stream error", err) 248 return 249 } 250 log.Log(ctx, "stream success", "url", r.URL.String()) 251 } 252 253 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 254 router.POST("/stream/:key", handleIncomingStream) 255 router.PUT("/stream/:key", handleIncomingStream) 256 257 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 258 id := p.ByName("id") 259 if id == "" { 260 errors.WriteHTTPBadRequest(w, "id required", nil) 261 return 262 } 263 events, err := a.Model.PlayerReport(id) 264 if err != nil { 265 errors.WriteHTTPBadRequest(w, err.Error(), err) 266 return 267 } 268 bs, err := json.Marshal(events) 269 if err != nil { 270 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 271 return 272 } 273 w.Write(bs) 274 }) 275 276 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 277 id := p.ByName("id") 278 if id == "" { 279 errors.WriteHTTPBadRequest(w, "id required", nil) 280 return 281 } 282 segment, err := a.Model.GetSegment(id) 283 if err != nil { 284 errors.WriteHTTPBadRequest(w, err.Error(), err) 285 return 286 } 287 if segment == nil { 288 errors.WriteHTTPNotFound(w, "segment not found", nil) 289 return 290 } 291 spSeg, err := segment.ToStreamplaceSegment() 292 if err != nil { 293 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 294 return 295 } 296 bs, err := json.Marshal(spSeg) 297 if err != nil { 298 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 299 return 300 } 301 w.Write(bs) 302 }) 303 304 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 305 err := a.Model.ClearPlayerEvents() 306 if err != nil { 307 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 308 return 309 } 310 w.WriteHeader(204) 311 }) 312 313 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 314 w.Header().Set("Access-Control-Allow-Origin", "*") 315 w.Header().Set("Access-Control-Allow-Methods", "GET") 316 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 317 318 id := a.Signer.Hex() 319 320 ident, err := a.Model.GetIdentity(id) 321 if err != nil { 322 errors.WriteHTTPInternalServerError(w, "unable to get settings", err) 323 return 324 } 325 326 bs, err := json.Marshal(ident) 327 if err != nil { 328 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 329 return 330 } 331 w.Write(bs) 332 }) 333 334 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 335 user := p.ByName("user") 336 if user == "" { 337 errors.WriteHTTPBadRequest(w, "user required", nil) 338 return 339 } 340 341 followers, err := a.Model.GetUserFollowers(ctx, user) 342 if err != nil { 343 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 344 return 345 } 346 bs, err := json.Marshal(followers) 347 if err != nil { 348 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 349 return 350 } 351 w.Write(bs) 352 }) 353 354 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 355 user := p.ByName("user") 356 if user == "" { 357 errors.WriteHTTPBadRequest(w, "user required", nil) 358 return 359 } 360 361 followers, err := a.Model.GetUserFollowing(ctx, user) 362 if err != nil { 363 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 364 return 365 } 366 bs, err := json.Marshal(followers) 367 if err != nil { 368 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 369 return 370 } 371 w.Write(bs) 372 }) 373 374 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 375 notifications, err := a.Model.ListNotifications() 376 if err != nil { 377 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 378 return 379 } 380 bs, err := json.Marshal(notifications) 381 if err != nil { 382 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 383 return 384 } 385 w.Write(bs) 386 }) 387 388 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 389 posts, err := a.Model.ListFeedPosts() 390 if err != nil { 391 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 392 return 393 } 394 bs, err := json.Marshal(posts) 395 if err != nil { 396 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 397 return 398 } 399 w.Write(bs) 400 }) 401 402 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 403 cid := p.ByName("cid") 404 if cid == "" { 405 errors.WriteHTTPBadRequest(w, "cid required", nil) 406 return 407 } 408 msg, err := a.Model.GetChatMessage(cid) 409 if err != nil { 410 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 411 return 412 } 413 spmsg, err := msg.ToStreamplaceMessageView() 414 if err != nil { 415 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 416 return 417 } 418 bs, err := json.Marshal(spmsg) 419 if err != nil { 420 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 421 return 422 } 423 w.Write(bs) 424 }) 425 426 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 427 sessions, err := a.Model.ListOAuthSessions() 428 if err != nil { 429 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 430 return 431 } 432 bs, err := json.Marshal(sessions) 433 if err != nil { 434 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 435 return 436 } 437 w.Write(bs) 438 }) 439 440 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 441 var payload notificationpkg.NotificationBlast 442 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 443 errors.WriteHTTPBadRequest(w, "invalid request body", err) 444 return 445 } 446 notifications, err := a.Model.ListNotifications() 447 if err != nil { 448 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 449 return 450 } 451 if a.FirebaseNotifier == nil { 452 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 453 return 454 } 455 tokens := []string{} 456 for _, not := range notifications { 457 tokens = append(tokens, not.Token) 458 } 459 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 460 if err != nil { 461 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 462 return 463 } 464 w.WriteHeader(http.StatusNoContent) 465 }) 466 467 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 468 w.Header().Set("Access-Control-Allow-Origin", "*") 469 w.Header().Set("Access-Control-Allow-Methods", "PUT") 470 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 471 472 id := p.ByName("id") 473 if id == "" { 474 errors.WriteHTTPBadRequest(w, "id required", nil) 475 return 476 } 477 478 var ident model.Identity 479 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 480 errors.WriteHTTPBadRequest(w, "invalid request body", err) 481 return 482 } 483 ident.ID = id 484 485 if err := a.Model.UpdateIdentity(&ident); err != nil { 486 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 487 return 488 } 489 490 w.WriteHeader(http.StatusNoContent) 491 }) 492 493 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 494 var payload struct { 495 URL string `json:"url"` 496 } 497 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4 498 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 499 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err) 500 return 501 } 502 parts := strings.Split(payload.URL, "/") 503 if len(parts) < 5 { 504 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil) 505 return 506 } 507 didSession := parts[4] 508 idParts := strings.Split(didSession, "-") 509 if len(idParts) != 2 { 510 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil) 511 return 512 } 513 did := idParts[0] 514 // sessionID := idParts[1] 515 seg, err := a.Model.LatestSegmentForUser(did) 516 if err != nil { 517 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err) 518 return 519 } 520 spseg, err := seg.ToStreamplaceSegment() 521 if err != nil { 522 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 523 return 524 } 525 renditions, err := renditions.GenerateRenditions(spseg) 526 if err != nil { 527 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err) 528 return 529 } 530 out := map[string]any{ 531 "manifestID": didSession, 532 "profiles": renditions.ToLivepeerProfiles(), 533 } 534 bs, err := json.Marshal(out) 535 if err != nil { 536 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 537 return 538 } 539 w.Write(bs) 540 }) 541 542 handler := sloghttp.Recovery(router) 543 if log.Level(4) { 544 handler = sloghttp.New(slog.Default())(handler) 545 } 546 return handler, nil 547} 548 549func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 550 payload, err := base64.URLEncoding.DecodeString(key) 551 if err != nil { 552 return "", err 553 } 554 signed, err := a.Signer.Verify(payload) 555 if err != nil { 556 return "", err 557 } 558 _, ok := signed.Data().(*v0.StreamKey) 559 if !ok { 560 return "", fmt.Errorf("got signed data but it wasn't a stream key") 561 } 562 return strings.ToLower(signed.Signer()), nil 563}