Live video on the AT Protocol
at eli/fix-stuck-packetize 587 lines 18 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/base64" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "net/http/pprof" 12 "os" 13 "path/filepath" 14 "regexp" 15 "runtime" 16 rtpprof "runtime/pprof" 17 "strconv" 18 "strings" 19 "time" 20 21 "github.com/julienschmidt/httprouter" 22 "github.com/pion/webrtc/v4" 23 "github.com/prometheus/client_golang/prometheus/promhttp" 24 sloghttp "github.com/samber/slog-http" 25 "stream.place/streamplace/pkg/errors" 26 "stream.place/streamplace/pkg/log" 27 "stream.place/streamplace/pkg/media" 28 "stream.place/streamplace/pkg/mist/mistconfig" 29 "stream.place/streamplace/pkg/mist/misttriggers" 30 "stream.place/streamplace/pkg/model" 31 notificationpkg "stream.place/streamplace/pkg/notifications" 32 "stream.place/streamplace/pkg/rtcrec" 33 v0 "stream.place/streamplace/pkg/schema/v0" 34) 35 36func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 37 handler, err := a.InternalHandler(ctx) 38 if err != nil { 39 return err 40 } 41 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 42 s.Addr = a.CLI.HTTPInternalAddr 43 log.Log(ctx, "http server starting", "addr", s.Addr) 44 return s.ListenAndServe() 45 }) 46} 47 48// lightweight way to authenticate push requests to ourself 49var mkvRE *regexp.Regexp 50 51func init() { 52 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 53} 54 55func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 56 router := httprouter.New() 57 broker := misttriggers.NewTriggerBroker() 58 59 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 60 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 61 // Extract the last part of the URL path 62 urlPath := payload.URL.Path 63 parts := strings.Split(urlPath, "/") 64 lastPart := "" 65 if len(parts) > 0 { 66 lastPart = parts[len(parts)-1] 67 } 68 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 69 if err != nil { 70 return "", err 71 } 72 73 ms := time.Now().UnixMilli() 74 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms) 75 a.SignerCacheMu.Lock() 76 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 77 a.SignerCacheMu.Unlock() 78 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 79 80 return out, nil 81 }) 82 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 83 router.POST("/mist-trigger", triggerCollection.Trigger()) 84 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 85 86 // Add pprof handlers 87 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 88 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 89 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 90 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 91 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 92 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 93 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 94 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 95 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 96 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 97 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 98 99 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 100 runtime.GC() 101 w.WriteHeader(204) 102 }) 103 104 router.Handler("GET", "/metrics", promhttp.Handler()) 105 106 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 107 user := p.ByName("user") 108 if user == "" { 109 errors.WriteHTTPBadRequest(w, "user required", nil) 110 return 111 } 112 rendition := p.ByName("rendition") 113 if rendition == "" { 114 errors.WriteHTTPBadRequest(w, "rendition required", nil) 115 return 116 } 117 user, err := a.NormalizeUser(ctx, user) 118 if err != nil { 119 errors.WriteHTTPBadRequest(w, "invalid user", err) 120 return 121 } 122 w.Header().Set("content-type", "text/plain") 123 fmt.Fprintf(w, "ffconcat version 1.0\n") 124 // intermittent reports that you need two here to make things work properly? shouldn't matter. 125 for i := 0; i < 2; i += 1 { 126 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 127 } 128 }) 129 130 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 131 user := p.ByName("user") 132 if user == "" { 133 errors.WriteHTTPBadRequest(w, "user required", nil) 134 return 135 } 136 user, err := a.NormalizeUser(ctx, user) 137 if err != nil { 138 errors.WriteHTTPBadRequest(w, "invalid user", err) 139 return 140 } 141 rendition := p.ByName("rendition") 142 if rendition == "" { 143 errors.WriteHTTPBadRequest(w, "rendition required", nil) 144 return 145 } 146 segChan := a.Bus.SubscribeSegment(ctx, user, rendition) 147 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan) 148 seg := <-segChan.C 149 base := filepath.Base(seg.Filepath) 150 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 151 w.WriteHeader(301) 152 }) 153 154 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 155 user := p.ByName("user") 156 if user == "" { 157 errors.WriteHTTPBadRequest(w, "user required", nil) 158 return 159 } 160 user, err := a.NormalizeUser(ctx, user) 161 if err != nil { 162 errors.WriteHTTPBadRequest(w, "invalid user", err) 163 return 164 } 165 file := p.ByName("file") 166 if file == "" { 167 errors.WriteHTTPBadRequest(w, "file required", nil) 168 return 169 } 170 fullpath, err := a.CLI.SegmentFilePath(user, file) 171 if err != nil { 172 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 173 return 174 } 175 http.ServeFile(w, r, fullpath) 176 }) 177 178 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 179 user := p.ByName("user") 180 if user == "" { 181 errors.WriteHTTPBadRequest(w, "user required", nil) 182 return 183 } 184 w.Header().Set("Content-Type", "video/x-matroska") 185 w.Header().Set("Transfer-Encoding", "chunked") 186 w.WriteHeader(200) 187 }) 188 189 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 190 uu := p.ByName("uuid") 191 if uu == "" { 192 errors.WriteHTTPBadRequest(w, "uuid required", nil) 193 return 194 } 195 pr := a.MediaManager.GetHTTPPipeWriter(uu) 196 if pr == nil { 197 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 198 return 199 } 200 if _, err := io.Copy(pr, r.Body); err != nil { 201 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil) 202 } 203 }) 204 205 // self-destruct code, useful for dumping goroutines on windows 206 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 207 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 208 log.Log(ctx, "error writing rtpprof", "error", err) 209 } 210 log.Log(ctx, "got POST /abort, self-destructing") 211 os.Exit(1) 212 }) 213 214 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 215 key := p.ByName("key") 216 log.Log(ctx, "stream start") 217 218 var mediaSigner media.MediaSigner 219 var ok bool 220 var err error 221 parts := strings.Split(key, "_") 222 223 if len(parts) == 2 { 224 a.SignerCacheMu.Lock() 225 mediaSigner, ok = a.SignerCache[parts[0]] 226 a.SignerCacheMu.Unlock() 227 if !ok { 228 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 229 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 230 return 231 } 232 } else { 233 mediaSigner, err = a.MakeMediaSigner(ctx, key) 234 if err != nil { 235 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 236 return 237 } 238 } 239 240 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 241 242 if err != nil { 243 log.Log(ctx, "stream error", "error", err) 244 errors.WriteHTTPInternalServerError(w, "stream error", err) 245 return 246 } 247 log.Log(ctx, "stream success", "url", r.URL.String()) 248 } 249 250 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 251 router.POST("/live/:key", handleIncomingStream) 252 router.PUT("/live/:key", handleIncomingStream) 253 254 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 255 id := p.ByName("id") 256 if id == "" { 257 errors.WriteHTTPBadRequest(w, "id required", nil) 258 return 259 } 260 events, err := a.Model.PlayerReport(id) 261 if err != nil { 262 errors.WriteHTTPBadRequest(w, err.Error(), err) 263 return 264 } 265 bs, err := json.Marshal(events) 266 if err != nil { 267 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 268 return 269 } 270 if _, err := w.Write(bs); err != nil { 271 log.Error(ctx, "error writing response", "error", err) 272 } 273 }) 274 275 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 276 id := p.ByName("id") 277 if id == "" { 278 errors.WriteHTTPBadRequest(w, "id required", nil) 279 return 280 } 281 segment, err := a.Model.GetSegment(id) 282 if err != nil { 283 errors.WriteHTTPBadRequest(w, err.Error(), err) 284 return 285 } 286 if segment == nil { 287 errors.WriteHTTPNotFound(w, "segment not found", nil) 288 return 289 } 290 spSeg, err := segment.ToStreamplaceSegment() 291 if err != nil { 292 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 293 return 294 } 295 bs, err := json.Marshal(spSeg) 296 if err != nil { 297 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 298 return 299 } 300 if _, err := w.Write(bs); err != nil { 301 log.Error(ctx, "error writing response", "error", err) 302 } 303 }) 304 305 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 306 err := a.Model.ClearPlayerEvents() 307 if err != nil { 308 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 309 return 310 } 311 w.WriteHeader(204) 312 }) 313 314 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 315 w.Header().Set("Access-Control-Allow-Origin", "*") 316 w.Header().Set("Access-Control-Allow-Methods", "GET") 317 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 318 319 id := a.Signer.Hex() 320 321 ident, err := a.Model.GetIdentity(id) 322 if err != nil { 323 errors.WriteHTTPInternalServerError(w, "unable to get settings", err) 324 return 325 } 326 327 bs, err := json.Marshal(ident) 328 if err != nil { 329 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 330 return 331 } 332 if _, err := w.Write(bs); err != nil { 333 log.Error(ctx, "error writing response", "error", err) 334 } 335 }) 336 337 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 338 user := p.ByName("user") 339 if user == "" { 340 errors.WriteHTTPBadRequest(w, "user required", nil) 341 return 342 } 343 344 followers, err := a.Model.GetUserFollowers(ctx, user) 345 if err != nil { 346 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 347 return 348 } 349 bs, err := json.Marshal(followers) 350 if err != nil { 351 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 352 return 353 } 354 if _, err := w.Write(bs); err != nil { 355 log.Error(ctx, "error writing response", "error", err) 356 } 357 }) 358 359 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 360 user := p.ByName("user") 361 if user == "" { 362 errors.WriteHTTPBadRequest(w, "user required", nil) 363 return 364 } 365 366 followers, err := a.Model.GetUserFollowing(ctx, user) 367 if err != nil { 368 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 369 return 370 } 371 bs, err := json.Marshal(followers) 372 if err != nil { 373 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 374 return 375 } 376 if _, err := w.Write(bs); err != nil { 377 log.Error(ctx, "error writing response", "error", err) 378 } 379 }) 380 381 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 382 notifications, err := a.StatefulDB.ListNotifications() 383 if err != nil { 384 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 385 return 386 } 387 bs, err := json.Marshal(notifications) 388 if err != nil { 389 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 390 return 391 } 392 if _, err := w.Write(bs); err != nil { 393 log.Error(ctx, "error writing response", "error", err) 394 } 395 }) 396 397 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 398 posts, err := a.Model.ListFeedPosts() 399 if err != nil { 400 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 401 return 402 } 403 bs, err := json.Marshal(posts) 404 if err != nil { 405 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 406 return 407 } 408 if _, err := w.Write(bs); err != nil { 409 log.Error(ctx, "error writing response", "error", err) 410 } 411 }) 412 413 router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 414 uri := p.ByName("uri") 415 if uri == "" { 416 errors.WriteHTTPBadRequest(w, "uri required", nil) 417 return 418 } 419 msg, err := a.Model.GetChatMessage(uri) 420 if err != nil { 421 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 422 return 423 } 424 spmsg, err := msg.ToStreamplaceMessageView() 425 if err != nil { 426 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 427 return 428 } 429 bs, err := json.Marshal(spmsg) 430 if err != nil { 431 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 432 return 433 } 434 if _, err := w.Write(bs); err != nil { 435 log.Error(ctx, "error writing response", "error", err) 436 } 437 }) 438 439 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 440 sessions, err := a.StatefulDB.ListOAuthSessions() 441 if err != nil { 442 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 443 return 444 } 445 bs, err := json.Marshal(sessions) 446 if err != nil { 447 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 448 return 449 } 450 if _, err := w.Write(bs); err != nil { 451 log.Error(ctx, "error writing response", "error", err) 452 } 453 }) 454 455 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 456 var payload notificationpkg.NotificationBlast 457 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 458 errors.WriteHTTPBadRequest(w, "invalid request body", err) 459 return 460 } 461 notifications, err := a.StatefulDB.ListNotifications() 462 if err != nil { 463 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 464 return 465 } 466 if a.FirebaseNotifier == nil { 467 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 468 return 469 } 470 tokens := []string{} 471 for _, not := range notifications { 472 tokens = append(tokens, not.Token) 473 } 474 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 475 if err != nil { 476 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 477 return 478 } 479 w.WriteHeader(http.StatusNoContent) 480 }) 481 482 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 483 w.Header().Set("Access-Control-Allow-Origin", "*") 484 w.Header().Set("Access-Control-Allow-Methods", "PUT") 485 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 486 487 id := p.ByName("id") 488 if id == "" { 489 errors.WriteHTTPBadRequest(w, "id required", nil) 490 return 491 } 492 493 var ident model.Identity 494 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 495 errors.WriteHTTPBadRequest(w, "invalid request body", err) 496 return 497 } 498 ident.ID = id 499 500 if err := a.Model.UpdateIdentity(&ident); err != nil { 501 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 502 return 503 } 504 505 w.WriteHeader(http.StatusNoContent) 506 }) 507 508 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 509 key := p.ByName("streamKey") 510 if key == "" { 511 errors.WriteHTTPBadRequest(w, "streamKey required", nil) 512 return 513 } 514 mediaSigner, err := a.MakeMediaSigner(ctx, key) 515 if err != nil { 516 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 517 return 518 } 519 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body) 520 if err != nil { 521 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err) 522 return 523 } 524 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{})) 525 if err != nil { 526 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err) 527 return 528 } 529 w.WriteHeader(200) 530 if _, err := w.Write([]byte(answer.SDP)); err != nil { 531 errors.WriteHTTPInternalServerError(w, "unable to write response", err) 532 log.Error(ctx, "error writing response", "error", err) 533 } 534 }) 535 536 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 537 did := p.ByName("did") 538 if did == "" { 539 errors.WriteHTTPBadRequest(w, "did required", nil) 540 return 541 } 542 user, err := a.NormalizeUser(ctx, did) 543 if err != nil { 544 errors.WriteHTTPBadRequest(w, "invalid user", err) 545 return 546 } 547 secsStr := r.URL.Query().Get("secs") 548 secs := 60 // Default to 60 seconds 549 if secsStr != "" { 550 parsedSecs, err := strconv.Atoi(secsStr) 551 if err != nil { 552 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err) 553 return 554 } 555 secs = parsedSecs 556 } 557 after := time.Now().Add(-time.Duration(secs) * time.Second) 558 w.Header().Set("Content-Type", "video/mp4") 559 err = media.ClipUser(ctx, a.Model, a.CLI, user, w, nil, &after) 560 if err != nil { 561 errors.WriteHTTPInternalServerError(w, "unable to clip user", err) 562 return 563 } 564 }) 565 566 handler := sloghttp.Recovery(router) 567 if log.Level(4) { 568 handler = sloghttp.New(slog.Default())(handler) 569 } 570 return handler, nil 571} 572 573func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 574 payload, err := base64.URLEncoding.DecodeString(key) 575 if err != nil { 576 return "", err 577 } 578 signed, err := a.Signer.Verify(payload) 579 if err != nil { 580 return "", err 581 } 582 _, ok := signed.Data().(*v0.StreamKey) 583 if !ok { 584 return "", fmt.Errorf("got signed data but it wasn't a stream key") 585 } 586 return strings.ToLower(signed.Signer()), nil 587}