Live video on the AT Protocol
at natb/tost 595 lines 18 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/base64" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "net/http/pprof" 12 "os" 13 "path/filepath" 14 "regexp" 15 "runtime" 16 rtpprof "runtime/pprof" 17 "strconv" 18 "strings" 19 "time" 20 21 "github.com/julienschmidt/httprouter" 22 "github.com/pion/webrtc/v4" 23 "github.com/prometheus/client_golang/prometheus/promhttp" 24 sloghttp "github.com/samber/slog-http" 25 "stream.place/streamplace/pkg/errors" 26 "stream.place/streamplace/pkg/log" 27 "stream.place/streamplace/pkg/media" 28 "stream.place/streamplace/pkg/mist/mistconfig" 29 "stream.place/streamplace/pkg/mist/misttriggers" 30 "stream.place/streamplace/pkg/model" 31 notificationpkg "stream.place/streamplace/pkg/notifications" 32 "stream.place/streamplace/pkg/rtcrec" 33 v0 "stream.place/streamplace/pkg/schema/v0" 34) 35 36func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 37 handler, err := a.InternalHandler(ctx) 38 if err != nil { 39 return err 40 } 41 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 42 s.Addr = a.CLI.HTTPInternalAddr 43 log.Log(ctx, "http server starting", "addr", s.Addr) 44 return s.ListenAndServe() 45 }) 46} 47 48// lightweight way to authenticate push requests to ourself 49var mkvRE *regexp.Regexp 50 51func init() { 52 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 53} 54 55func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 56 router := httprouter.New() 57 broker := misttriggers.NewTriggerBroker() 58 59 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 60 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 61 // Extract the last part of the URL path 62 urlPath := payload.URL.Path 63 parts := strings.Split(urlPath, "/") 64 lastPart := "" 65 if len(parts) > 0 { 66 lastPart = parts[len(parts)-1] 67 } 68 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 69 if err != nil { 70 return "", err 71 } 72 73 ms := time.Now().UnixMilli() 74 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms) 75 a.SignerCacheMu.Lock() 76 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 77 a.SignerCacheMu.Unlock() 78 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 79 80 return out, nil 81 }) 82 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 83 router.POST("/mist-trigger", triggerCollection.Trigger()) 84 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 85 86 // Add pprof handlers 87 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 88 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 89 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 90 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 91 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 92 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 93 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 94 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 95 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 96 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 97 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 98 99 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 100 runtime.GC() 101 w.WriteHeader(204) 102 }) 103 104 router.Handler("GET", "/metrics", promhttp.Handler()) 105 106 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 107 user := p.ByName("user") 108 if user == "" { 109 errors.WriteHTTPBadRequest(w, "user required", nil) 110 return 111 } 112 rendition := p.ByName("rendition") 113 if rendition == "" { 114 errors.WriteHTTPBadRequest(w, "rendition required", nil) 115 return 116 } 117 user, err := a.NormalizeUser(ctx, user) 118 if err != nil { 119 errors.WriteHTTPBadRequest(w, "invalid user", err) 120 return 121 } 122 w.Header().Set("content-type", "text/plain") 123 fmt.Fprintf(w, "ffconcat version 1.0\n") 124 // intermittent reports that you need two here to make things work properly? shouldn't matter. 125 for i := 0; i < 2; i += 1 { 126 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 127 } 128 }) 129 130 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 131 user := p.ByName("user") 132 if user == "" { 133 errors.WriteHTTPBadRequest(w, "user required", nil) 134 return 135 } 136 user, err := a.NormalizeUser(ctx, user) 137 if err != nil { 138 errors.WriteHTTPBadRequest(w, "invalid user", err) 139 return 140 } 141 rendition := p.ByName("rendition") 142 if rendition == "" { 143 errors.WriteHTTPBadRequest(w, "rendition required", nil) 144 return 145 } 146 segChan := a.Bus.SubscribeSegment(ctx, user, rendition) 147 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan) 148 seg := <-segChan.C 149 base := filepath.Base(seg.Filepath) 150 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 151 w.WriteHeader(301) 152 }) 153 154 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 155 user := p.ByName("user") 156 if user == "" { 157 errors.WriteHTTPBadRequest(w, "user required", nil) 158 return 159 } 160 user, err := a.NormalizeUser(ctx, user) 161 if err != nil { 162 errors.WriteHTTPBadRequest(w, "invalid user", err) 163 return 164 } 165 file := p.ByName("file") 166 if file == "" { 167 errors.WriteHTTPBadRequest(w, "file required", nil) 168 return 169 } 170 fullpath, err := a.CLI.SegmentFilePath(user, file) 171 if err != nil { 172 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 173 return 174 } 175 http.ServeFile(w, r, fullpath) 176 }) 177 178 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 179 user := p.ByName("user") 180 if user == "" { 181 errors.WriteHTTPBadRequest(w, "user required", nil) 182 return 183 } 184 w.Header().Set("Content-Type", "video/x-matroska") 185 w.Header().Set("Transfer-Encoding", "chunked") 186 w.WriteHeader(200) 187 }) 188 189 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 190 uu := p.ByName("uuid") 191 if uu == "" { 192 errors.WriteHTTPBadRequest(w, "uuid required", nil) 193 return 194 } 195 pr := a.MediaManager.GetHTTPPipeWriter(uu) 196 if pr == nil { 197 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 198 return 199 } 200 if _, err := io.Copy(pr, r.Body); err != nil { 201 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil) 202 } 203 }) 204 205 // self-destruct code, useful for dumping goroutines on windows 206 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 207 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 208 log.Log(ctx, "error writing rtpprof", "error", err) 209 } 210 log.Log(ctx, "got POST /abort, self-destructing") 211 os.Exit(1) 212 }) 213 214 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 215 key := p.ByName("key") 216 log.Log(ctx, "stream start") 217 218 var mediaSigner media.MediaSigner 219 var ok bool 220 var err error 221 parts := strings.Split(key, "_") 222 223 if len(parts) == 2 { 224 a.SignerCacheMu.Lock() 225 mediaSigner, ok = a.SignerCache[parts[0]] 226 a.SignerCacheMu.Unlock() 227 if !ok { 228 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 229 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 230 return 231 } 232 } else { 233 mediaSigner, err = a.MakeMediaSigner(ctx, key) 234 if err != nil { 235 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 236 return 237 } 238 } 239 240 ctx = log.WithLogValues(ctx, "streamer", mediaSigner.Streamer()) 241 242 err = a.checkBanned(ctx, mediaSigner.Streamer()) 243 if err != nil { 244 errors.WriteHTTPUnauthorized(w, err.Error(), err) 245 return 246 } 247 248 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 249 250 if err != nil { 251 log.Log(ctx, "stream error", "error", err) 252 errors.WriteHTTPInternalServerError(w, "stream error", err) 253 return 254 } 255 log.Log(ctx, "stream success", "url", r.URL.String()) 256 } 257 258 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 259 router.POST("/live/:key", handleIncomingStream) 260 router.PUT("/live/:key", handleIncomingStream) 261 262 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 263 id := p.ByName("id") 264 if id == "" { 265 errors.WriteHTTPBadRequest(w, "id required", nil) 266 return 267 } 268 events, err := a.Model.PlayerReport(id) 269 if err != nil { 270 errors.WriteHTTPBadRequest(w, err.Error(), err) 271 return 272 } 273 bs, err := json.Marshal(events) 274 if err != nil { 275 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 276 return 277 } 278 if _, err := w.Write(bs); err != nil { 279 log.Error(ctx, "error writing response", "error", err) 280 } 281 }) 282 283 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 284 id := p.ByName("id") 285 if id == "" { 286 errors.WriteHTTPBadRequest(w, "id required", nil) 287 return 288 } 289 segment, err := a.Model.GetSegment(id) 290 if err != nil { 291 errors.WriteHTTPBadRequest(w, err.Error(), err) 292 return 293 } 294 if segment == nil { 295 errors.WriteHTTPNotFound(w, "segment not found", nil) 296 return 297 } 298 spSeg, err := segment.ToStreamplaceSegment() 299 if err != nil { 300 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 301 return 302 } 303 bs, err := json.Marshal(spSeg) 304 if err != nil { 305 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 306 return 307 } 308 if _, err := w.Write(bs); err != nil { 309 log.Error(ctx, "error writing response", "error", err) 310 } 311 }) 312 313 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 314 err := a.Model.ClearPlayerEvents() 315 if err != nil { 316 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 317 return 318 } 319 w.WriteHeader(204) 320 }) 321 322 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 323 w.Header().Set("Access-Control-Allow-Origin", "*") 324 w.Header().Set("Access-Control-Allow-Methods", "GET") 325 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 326 327 id := a.Signer.Hex() 328 329 ident, err := a.Model.GetIdentity(id) 330 if err != nil { 331 errors.WriteHTTPInternalServerError(w, "unable to get settings", err) 332 return 333 } 334 335 bs, err := json.Marshal(ident) 336 if err != nil { 337 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 338 return 339 } 340 if _, err := w.Write(bs); err != nil { 341 log.Error(ctx, "error writing response", "error", err) 342 } 343 }) 344 345 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 346 user := p.ByName("user") 347 if user == "" { 348 errors.WriteHTTPBadRequest(w, "user required", nil) 349 return 350 } 351 352 followers, err := a.Model.GetUserFollowers(ctx, user) 353 if err != nil { 354 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 355 return 356 } 357 bs, err := json.Marshal(followers) 358 if err != nil { 359 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 360 return 361 } 362 if _, err := w.Write(bs); err != nil { 363 log.Error(ctx, "error writing response", "error", err) 364 } 365 }) 366 367 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 368 user := p.ByName("user") 369 if user == "" { 370 errors.WriteHTTPBadRequest(w, "user required", nil) 371 return 372 } 373 374 followers, err := a.Model.GetUserFollowing(ctx, user) 375 if err != nil { 376 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 377 return 378 } 379 bs, err := json.Marshal(followers) 380 if err != nil { 381 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 382 return 383 } 384 if _, err := w.Write(bs); err != nil { 385 log.Error(ctx, "error writing response", "error", err) 386 } 387 }) 388 389 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 390 notifications, err := a.StatefulDB.ListNotifications() 391 if err != nil { 392 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 393 return 394 } 395 bs, err := json.Marshal(notifications) 396 if err != nil { 397 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 398 return 399 } 400 if _, err := w.Write(bs); err != nil { 401 log.Error(ctx, "error writing response", "error", err) 402 } 403 }) 404 405 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 406 posts, err := a.Model.ListFeedPosts() 407 if err != nil { 408 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 409 return 410 } 411 bs, err := json.Marshal(posts) 412 if err != nil { 413 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 414 return 415 } 416 if _, err := w.Write(bs); err != nil { 417 log.Error(ctx, "error writing response", "error", err) 418 } 419 }) 420 421 router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 422 uri := p.ByName("uri") 423 if uri == "" { 424 errors.WriteHTTPBadRequest(w, "uri required", nil) 425 return 426 } 427 msg, err := a.Model.GetChatMessage(uri) 428 if err != nil { 429 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 430 return 431 } 432 spmsg, err := msg.ToStreamplaceMessageView() 433 if err != nil { 434 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 435 return 436 } 437 bs, err := json.Marshal(spmsg) 438 if err != nil { 439 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 440 return 441 } 442 if _, err := w.Write(bs); err != nil { 443 log.Error(ctx, "error writing response", "error", err) 444 } 445 }) 446 447 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 448 sessions, err := a.StatefulDB.ListOAuthSessions() 449 if err != nil { 450 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 451 return 452 } 453 bs, err := json.Marshal(sessions) 454 if err != nil { 455 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 456 return 457 } 458 if _, err := w.Write(bs); err != nil { 459 log.Error(ctx, "error writing response", "error", err) 460 } 461 }) 462 463 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 464 var payload notificationpkg.NotificationBlast 465 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 466 errors.WriteHTTPBadRequest(w, "invalid request body", err) 467 return 468 } 469 notifications, err := a.StatefulDB.ListNotifications() 470 if err != nil { 471 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 472 return 473 } 474 if a.FirebaseNotifier == nil { 475 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 476 return 477 } 478 tokens := []string{} 479 for _, not := range notifications { 480 tokens = append(tokens, not.Token) 481 } 482 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 483 if err != nil { 484 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 485 return 486 } 487 w.WriteHeader(http.StatusNoContent) 488 }) 489 490 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 491 w.Header().Set("Access-Control-Allow-Origin", "*") 492 w.Header().Set("Access-Control-Allow-Methods", "PUT") 493 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 494 495 id := p.ByName("id") 496 if id == "" { 497 errors.WriteHTTPBadRequest(w, "id required", nil) 498 return 499 } 500 501 var ident model.Identity 502 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 503 errors.WriteHTTPBadRequest(w, "invalid request body", err) 504 return 505 } 506 ident.ID = id 507 508 if err := a.Model.UpdateIdentity(&ident); err != nil { 509 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 510 return 511 } 512 513 w.WriteHeader(http.StatusNoContent) 514 }) 515 516 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 517 key := p.ByName("streamKey") 518 if key == "" { 519 errors.WriteHTTPBadRequest(w, "streamKey required", nil) 520 return 521 } 522 mediaSigner, err := a.MakeMediaSigner(ctx, key) 523 if err != nil { 524 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 525 return 526 } 527 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body) 528 if err != nil { 529 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err) 530 return 531 } 532 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{})) 533 if err != nil { 534 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err) 535 return 536 } 537 w.WriteHeader(200) 538 if _, err := w.Write([]byte(answer.SDP)); err != nil { 539 errors.WriteHTTPInternalServerError(w, "unable to write response", err) 540 log.Error(ctx, "error writing response", "error", err) 541 } 542 }) 543 544 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 545 did := p.ByName("did") 546 if did == "" { 547 errors.WriteHTTPBadRequest(w, "did required", nil) 548 return 549 } 550 user, err := a.NormalizeUser(ctx, did) 551 if err != nil { 552 errors.WriteHTTPBadRequest(w, "invalid user", err) 553 return 554 } 555 secsStr := r.URL.Query().Get("secs") 556 secs := 60 // Default to 60 seconds 557 if secsStr != "" { 558 parsedSecs, err := strconv.Atoi(secsStr) 559 if err != nil { 560 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err) 561 return 562 } 563 secs = parsedSecs 564 } 565 after := time.Now().Add(-time.Duration(secs) * time.Second) 566 w.Header().Set("Content-Type", "video/mp4") 567 err = media.ClipUser(ctx, a.Model, a.CLI, user, w, nil, &after) 568 if err != nil { 569 errors.WriteHTTPInternalServerError(w, "unable to clip user", err) 570 return 571 } 572 }) 573 574 handler := sloghttp.Recovery(router) 575 if log.Level(4) { 576 handler = sloghttp.New(slog.Default())(handler) 577 } 578 return handler, nil 579} 580 581func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 582 payload, err := base64.URLEncoding.DecodeString(key) 583 if err != nil { 584 return "", err 585 } 586 signed, err := a.Signer.Verify(payload) 587 if err != nil { 588 return "", err 589 } 590 _, ok := signed.Data().(*v0.StreamKey) 591 if !ok { 592 return "", fmt.Errorf("got signed data but it wasn't a stream key") 593 } 594 return strings.ToLower(signed.Signer()), nil 595}