Live video on the AT Protocol
at eli/determinism-isk 572 lines 18 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/base64" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "net/http/pprof" 12 "os" 13 "path/filepath" 14 "regexp" 15 "runtime" 16 rtpprof "runtime/pprof" 17 "strconv" 18 "strings" 19 "time" 20 21 "github.com/julienschmidt/httprouter" 22 "github.com/pion/webrtc/v4" 23 "github.com/prometheus/client_golang/prometheus/promhttp" 24 sloghttp "github.com/samber/slog-http" 25 "stream.place/streamplace/pkg/errors" 26 "stream.place/streamplace/pkg/log" 27 "stream.place/streamplace/pkg/media" 28 "stream.place/streamplace/pkg/mist/mistconfig" 29 "stream.place/streamplace/pkg/mist/misttriggers" 30 "stream.place/streamplace/pkg/model" 31 notificationpkg "stream.place/streamplace/pkg/notifications" 32 "stream.place/streamplace/pkg/rtcrec" 33 v0 "stream.place/streamplace/pkg/schema/v0" 34) 35 36func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 37 handler, err := a.InternalHandler(ctx) 38 if err != nil { 39 return err 40 } 41 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 42 s.Addr = a.CLI.HTTPInternalAddr 43 log.Log(ctx, "http server starting", "addr", s.Addr) 44 return s.ListenAndServe() 45 }) 46} 47 48// lightweight way to authenticate push requests to ourself 49var mkvRE *regexp.Regexp 50 51func init() { 52 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 53} 54 55func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 56 router := httprouter.New() 57 broker := misttriggers.NewTriggerBroker() 58 59 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 60 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 61 // Extract the last part of the URL path 62 urlPath := payload.URL.Path 63 parts := strings.Split(urlPath, "/") 64 lastPart := "" 65 if len(parts) > 0 { 66 lastPart = parts[len(parts)-1] 67 } 68 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 69 if err != nil { 70 return "", err 71 } 72 73 ms := time.Now().UnixMilli() 74 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms) 75 a.SignerCacheMu.Lock() 76 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 77 a.SignerCacheMu.Unlock() 78 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 79 80 return out, nil 81 }) 82 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 83 router.POST("/mist-trigger", triggerCollection.Trigger()) 84 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 85 86 // Add pprof handlers 87 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 88 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 89 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 90 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 91 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 92 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 93 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 94 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 95 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 96 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 97 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 98 99 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 100 runtime.GC() 101 w.WriteHeader(204) 102 }) 103 104 router.Handler("GET", "/metrics", promhttp.Handler()) 105 106 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 107 user := p.ByName("user") 108 if user == "" { 109 errors.WriteHTTPBadRequest(w, "user required", nil) 110 return 111 } 112 rendition := p.ByName("rendition") 113 if rendition == "" { 114 errors.WriteHTTPBadRequest(w, "rendition required", nil) 115 return 116 } 117 user, err := a.NormalizeUser(ctx, user) 118 if err != nil { 119 errors.WriteHTTPBadRequest(w, "invalid user", err) 120 return 121 } 122 w.Header().Set("content-type", "text/plain") 123 fmt.Fprintf(w, "ffconcat version 1.0\n") 124 // intermittent reports that you need two here to make things work properly? shouldn't matter. 125 for i := 0; i < 2; i += 1 { 126 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 127 } 128 }) 129 130 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 131 user := p.ByName("user") 132 if user == "" { 133 errors.WriteHTTPBadRequest(w, "user required", nil) 134 return 135 } 136 user, err := a.NormalizeUser(ctx, user) 137 if err != nil { 138 errors.WriteHTTPBadRequest(w, "invalid user", err) 139 return 140 } 141 rendition := p.ByName("rendition") 142 if rendition == "" { 143 errors.WriteHTTPBadRequest(w, "rendition required", nil) 144 return 145 } 146 segChan := a.Bus.SubscribeSegment(ctx, user, rendition) 147 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan) 148 seg := <-segChan.C 149 base := filepath.Base(seg.Filepath) 150 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 151 w.WriteHeader(301) 152 }) 153 154 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 155 user := p.ByName("user") 156 if user == "" { 157 errors.WriteHTTPBadRequest(w, "user required", nil) 158 return 159 } 160 user, err := a.NormalizeUser(ctx, user) 161 if err != nil { 162 errors.WriteHTTPBadRequest(w, "invalid user", err) 163 return 164 } 165 file := p.ByName("file") 166 if file == "" { 167 errors.WriteHTTPBadRequest(w, "file required", nil) 168 return 169 } 170 fullpath, err := a.CLI.SegmentFilePath(user, file) 171 if err != nil { 172 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 173 return 174 } 175 http.ServeFile(w, r, fullpath) 176 }) 177 178 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 179 user := p.ByName("user") 180 if user == "" { 181 errors.WriteHTTPBadRequest(w, "user required", nil) 182 return 183 } 184 w.Header().Set("Content-Type", "video/x-matroska") 185 w.Header().Set("Transfer-Encoding", "chunked") 186 w.WriteHeader(200) 187 }) 188 189 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 190 uu := p.ByName("uuid") 191 if uu == "" { 192 errors.WriteHTTPBadRequest(w, "uuid required", nil) 193 return 194 } 195 pr := a.MediaManager.GetHTTPPipeWriter(uu) 196 if pr == nil { 197 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 198 return 199 } 200 if _, err := io.Copy(pr, r.Body); err != nil { 201 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil) 202 } 203 }) 204 205 // self-destruct code, useful for dumping goroutines on windows 206 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 207 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 208 log.Log(ctx, "error writing rtpprof", "error", err) 209 } 210 log.Log(ctx, "got POST /abort, self-destructing") 211 os.Exit(1) 212 }) 213 214 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 215 key := p.ByName("key") 216 log.Log(ctx, "stream start") 217 218 var mediaSigner media.MediaSigner 219 var ok bool 220 var err error 221 parts := strings.Split(key, "_") 222 223 if len(parts) == 2 { 224 a.SignerCacheMu.Lock() 225 mediaSigner, ok = a.SignerCache[parts[0]] 226 a.SignerCacheMu.Unlock() 227 if !ok { 228 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 229 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 230 return 231 } 232 } else { 233 mediaSigner, err = a.MakeMediaSigner(ctx, key) 234 if err != nil { 235 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 236 return 237 } 238 } 239 240 ctx = log.WithLogValues(ctx, "streamer", mediaSigner.Streamer()) 241 242 err = a.checkBanned(ctx, mediaSigner.Streamer()) 243 if err != nil { 244 errors.WriteHTTPUnauthorized(w, err.Error(), err) 245 return 246 } 247 248 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner) 249 250 if err != nil { 251 log.Log(ctx, "stream error", "error", err) 252 errors.WriteHTTPInternalServerError(w, "stream error", err) 253 return 254 } 255 log.Log(ctx, "stream success", "url", r.URL.String()) 256 } 257 258 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 259 router.POST("/live/:key", handleIncomingStream) 260 router.PUT("/live/:key", handleIncomingStream) 261 262 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 263 id := p.ByName("id") 264 if id == "" { 265 errors.WriteHTTPBadRequest(w, "id required", nil) 266 return 267 } 268 events, err := a.Model.PlayerReport(id) 269 if err != nil { 270 errors.WriteHTTPBadRequest(w, err.Error(), err) 271 return 272 } 273 bs, err := json.Marshal(events) 274 if err != nil { 275 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 276 return 277 } 278 if _, err := w.Write(bs); err != nil { 279 log.Error(ctx, "error writing response", "error", err) 280 } 281 }) 282 283 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 284 id := p.ByName("id") 285 if id == "" { 286 errors.WriteHTTPBadRequest(w, "id required", nil) 287 return 288 } 289 segment, err := a.Model.GetSegment(id) 290 if err != nil { 291 errors.WriteHTTPBadRequest(w, err.Error(), err) 292 return 293 } 294 if segment == nil { 295 errors.WriteHTTPNotFound(w, "segment not found", nil) 296 return 297 } 298 spSeg, err := segment.ToStreamplaceSegment() 299 if err != nil { 300 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 301 return 302 } 303 bs, err := json.Marshal(spSeg) 304 if err != nil { 305 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 306 return 307 } 308 if _, err := w.Write(bs); err != nil { 309 log.Error(ctx, "error writing response", "error", err) 310 } 311 }) 312 313 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 314 err := a.Model.ClearPlayerEvents() 315 if err != nil { 316 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 317 return 318 } 319 w.WriteHeader(204) 320 }) 321 322 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 323 user := p.ByName("user") 324 if user == "" { 325 errors.WriteHTTPBadRequest(w, "user required", nil) 326 return 327 } 328 329 followers, err := a.Model.GetUserFollowers(ctx, user) 330 if err != nil { 331 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 332 return 333 } 334 bs, err := json.Marshal(followers) 335 if err != nil { 336 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 337 return 338 } 339 if _, err := w.Write(bs); err != nil { 340 log.Error(ctx, "error writing response", "error", err) 341 } 342 }) 343 344 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 345 user := p.ByName("user") 346 if user == "" { 347 errors.WriteHTTPBadRequest(w, "user required", nil) 348 return 349 } 350 351 followers, err := a.Model.GetUserFollowing(ctx, user) 352 if err != nil { 353 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 354 return 355 } 356 bs, err := json.Marshal(followers) 357 if err != nil { 358 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 359 return 360 } 361 if _, err := w.Write(bs); err != nil { 362 log.Error(ctx, "error writing response", "error", err) 363 } 364 }) 365 366 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 367 notifications, err := a.StatefulDB.ListNotifications() 368 if err != nil { 369 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 370 return 371 } 372 bs, err := json.Marshal(notifications) 373 if err != nil { 374 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 375 return 376 } 377 if _, err := w.Write(bs); err != nil { 378 log.Error(ctx, "error writing response", "error", err) 379 } 380 }) 381 382 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 383 posts, err := a.Model.ListFeedPosts() 384 if err != nil { 385 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 386 return 387 } 388 bs, err := json.Marshal(posts) 389 if err != nil { 390 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 391 return 392 } 393 if _, err := w.Write(bs); err != nil { 394 log.Error(ctx, "error writing response", "error", err) 395 } 396 }) 397 398 router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 399 uri := p.ByName("uri") 400 if uri == "" { 401 errors.WriteHTTPBadRequest(w, "uri required", nil) 402 return 403 } 404 msg, err := a.Model.GetChatMessage(uri) 405 if err != nil { 406 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 407 return 408 } 409 spmsg, err := msg.ToStreamplaceMessageView() 410 if err != nil { 411 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 412 return 413 } 414 bs, err := json.Marshal(spmsg) 415 if err != nil { 416 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 417 return 418 } 419 if _, err := w.Write(bs); err != nil { 420 log.Error(ctx, "error writing response", "error", err) 421 } 422 }) 423 424 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 425 sessions, err := a.StatefulDB.ListOAuthSessions() 426 if err != nil { 427 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 428 return 429 } 430 bs, err := json.Marshal(sessions) 431 if err != nil { 432 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 433 return 434 } 435 if _, err := w.Write(bs); err != nil { 436 log.Error(ctx, "error writing response", "error", err) 437 } 438 }) 439 440 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 441 var payload notificationpkg.NotificationBlast 442 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 443 errors.WriteHTTPBadRequest(w, "invalid request body", err) 444 return 445 } 446 notifications, err := a.StatefulDB.ListNotifications() 447 if err != nil { 448 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 449 return 450 } 451 if a.FirebaseNotifier == nil { 452 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 453 return 454 } 455 tokens := []string{} 456 for _, not := range notifications { 457 tokens = append(tokens, not.Token) 458 } 459 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 460 if err != nil { 461 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 462 return 463 } 464 w.WriteHeader(http.StatusNoContent) 465 }) 466 467 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 468 w.Header().Set("Access-Control-Allow-Origin", "*") 469 w.Header().Set("Access-Control-Allow-Methods", "PUT") 470 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 471 472 id := p.ByName("id") 473 if id == "" { 474 errors.WriteHTTPBadRequest(w, "id required", nil) 475 return 476 } 477 478 var ident model.Identity 479 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 480 errors.WriteHTTPBadRequest(w, "invalid request body", err) 481 return 482 } 483 ident.ID = id 484 485 if err := a.Model.UpdateIdentity(&ident); err != nil { 486 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 487 return 488 } 489 490 w.WriteHeader(http.StatusNoContent) 491 }) 492 493 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 494 key := p.ByName("streamKey") 495 if key == "" { 496 errors.WriteHTTPBadRequest(w, "streamKey required", nil) 497 return 498 } 499 mediaSigner, err := a.MakeMediaSigner(ctx, key) 500 if err != nil { 501 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 502 return 503 } 504 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body) 505 if err != nil { 506 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err) 507 return 508 } 509 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{})) 510 if err != nil { 511 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err) 512 return 513 } 514 w.WriteHeader(200) 515 if _, err := w.Write([]byte(answer.SDP)); err != nil { 516 errors.WriteHTTPInternalServerError(w, "unable to write response", err) 517 log.Error(ctx, "error writing response", "error", err) 518 } 519 }) 520 521 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 522 did := p.ByName("did") 523 if did == "" { 524 errors.WriteHTTPBadRequest(w, "did required", nil) 525 return 526 } 527 user, err := a.NormalizeUser(ctx, did) 528 if err != nil { 529 errors.WriteHTTPBadRequest(w, "invalid user", err) 530 return 531 } 532 secsStr := r.URL.Query().Get("secs") 533 secs := 60 // Default to 60 seconds 534 if secsStr != "" { 535 parsedSecs, err := strconv.Atoi(secsStr) 536 if err != nil { 537 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err) 538 return 539 } 540 secs = parsedSecs 541 } 542 after := time.Now().Add(-time.Duration(secs) * time.Second) 543 w.Header().Set("Content-Type", "video/mp4") 544 err = media.ClipUser(ctx, a.Model, a.CLI, user, w, nil, &after) 545 if err != nil { 546 errors.WriteHTTPInternalServerError(w, "unable to clip user", err) 547 return 548 } 549 }) 550 551 handler := sloghttp.Recovery(router) 552 if log.Level(4) { 553 handler = sloghttp.New(slog.Default())(handler) 554 } 555 return handler, nil 556} 557 558func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 559 payload, err := base64.URLEncoding.DecodeString(key) 560 if err != nil { 561 return "", err 562 } 563 signed, err := a.Signer.Verify(payload) 564 if err != nil { 565 return "", err 566 } 567 _, ok := signed.Data().(*v0.StreamKey) 568 if !ok { 569 return "", fmt.Errorf("got signed data but it wasn't a stream key") 570 } 571 return strings.ToLower(signed.Signer()), nil 572}