Live video on the AT Protocol
at v0.9.9 584 lines 18 kB view raw
1package api 2 3import ( 4 "context" 5 "encoding/base64" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "net/http/pprof" 12 "os" 13 "path/filepath" 14 "regexp" 15 "runtime" 16 rtpprof "runtime/pprof" 17 "strconv" 18 "strings" 19 "time" 20 21 "github.com/juju/ratelimit" 22 "github.com/julienschmidt/httprouter" 23 "github.com/pion/webrtc/v4" 24 "github.com/prometheus/client_golang/prometheus/promhttp" 25 sloghttp "github.com/samber/slog-http" 26 "stream.place/streamplace/pkg/errors" 27 "stream.place/streamplace/pkg/log" 28 "stream.place/streamplace/pkg/media" 29 "stream.place/streamplace/pkg/mist/mistconfig" 30 "stream.place/streamplace/pkg/mist/misttriggers" 31 "stream.place/streamplace/pkg/model" 32 notificationpkg "stream.place/streamplace/pkg/notifications" 33 "stream.place/streamplace/pkg/rtcrec" 34 v0 "stream.place/streamplace/pkg/schema/v0" 35) 36 37func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error { 38 handler, err := a.InternalHandler(ctx) 39 if err != nil { 40 return err 41 } 42 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error { 43 s.Addr = a.CLI.HTTPInternalAddr 44 log.Log(ctx, "http server starting", "addr", s.Addr) 45 return s.ListenAndServe() 46 }) 47} 48 49// lightweight way to authenticate push requests to ourself 50var mkvRE *regexp.Regexp 51 52func init() { 53 mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 54} 55 56func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) { 57 router := httprouter.New() 58 broker := misttriggers.NewTriggerBroker() 59 60 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) { 61 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String()) 62 // Extract the last part of the URL path 63 urlPath := payload.URL.Path 64 parts := strings.Split(urlPath, "/") 65 lastPart := "" 66 if len(parts) > 0 { 67 lastPart = parts[len(parts)-1] 68 } 69 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart) 70 if err != nil { 71 return "", err 72 } 73 74 ms := time.Now().UnixMilli() 75 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms) 76 a.SignerCacheMu.Lock() 77 a.SignerCache[mediaSigner.Streamer()] = mediaSigner 78 a.SignerCacheMu.Unlock() 79 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer()) 80 81 return out, nil 82 }) 83 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 84 router.POST("/mist-trigger", triggerCollection.Trigger()) 85 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx)) 86 87 // Add pprof handlers 88 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index) 89 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline) 90 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile) 91 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol) 92 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace) 93 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine")) 94 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap")) 95 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate")) 96 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block")) 97 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs")) 98 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex")) 99 100 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 101 runtime.GC() 102 w.WriteHeader(204) 103 }) 104 105 router.Handler("GET", "/metrics", promhttp.Handler()) 106 107 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 108 user := p.ByName("user") 109 if user == "" { 110 errors.WriteHTTPBadRequest(w, "user required", nil) 111 return 112 } 113 rendition := p.ByName("rendition") 114 if rendition == "" { 115 errors.WriteHTTPBadRequest(w, "rendition required", nil) 116 return 117 } 118 user, err := a.NormalizeUser(ctx, user) 119 if err != nil { 120 errors.WriteHTTPBadRequest(w, "invalid user", err) 121 return 122 } 123 w.Header().Set("content-type", "text/plain") 124 fmt.Fprintf(w, "ffconcat version 1.0\n") 125 // intermittent reports that you need two here to make things work properly? shouldn't matter. 126 for i := 0; i < 2; i += 1 { 127 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition) 128 } 129 }) 130 131 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 132 user := p.ByName("user") 133 if user == "" { 134 errors.WriteHTTPBadRequest(w, "user required", nil) 135 return 136 } 137 user, err := a.NormalizeUser(ctx, user) 138 if err != nil { 139 errors.WriteHTTPBadRequest(w, "invalid user", err) 140 return 141 } 142 rendition := p.ByName("rendition") 143 if rendition == "" { 144 errors.WriteHTTPBadRequest(w, "rendition required", nil) 145 return 146 } 147 segChan := a.Bus.SubscribeSegment(ctx, user, rendition) 148 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan) 149 seg := <-segChan.C 150 base := filepath.Base(seg.Filepath) 151 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base)) 152 w.WriteHeader(301) 153 }) 154 155 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 156 user := p.ByName("user") 157 if user == "" { 158 errors.WriteHTTPBadRequest(w, "user required", nil) 159 return 160 } 161 user, err := a.NormalizeUser(ctx, user) 162 if err != nil { 163 errors.WriteHTTPBadRequest(w, "invalid user", err) 164 return 165 } 166 file := p.ByName("file") 167 if file == "" { 168 errors.WriteHTTPBadRequest(w, "file required", nil) 169 return 170 } 171 fullpath, err := a.CLI.SegmentFilePath(user, file) 172 if err != nil { 173 errors.WriteHTTPBadRequest(w, "badly formatted request", err) 174 return 175 } 176 http.ServeFile(w, r, fullpath) 177 }) 178 179 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 180 user := p.ByName("user") 181 if user == "" { 182 errors.WriteHTTPBadRequest(w, "user required", nil) 183 return 184 } 185 w.Header().Set("Content-Type", "video/x-matroska") 186 w.Header().Set("Transfer-Encoding", "chunked") 187 w.WriteHeader(200) 188 }) 189 190 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 191 uu := p.ByName("uuid") 192 if uu == "" { 193 errors.WriteHTTPBadRequest(w, "uuid required", nil) 194 return 195 } 196 pr := a.MediaManager.GetHTTPPipeWriter(uu) 197 if pr == nil { 198 errors.WriteHTTPNotFound(w, "http-pipe not found", nil) 199 return 200 } 201 if _, err := io.Copy(pr, r.Body); err != nil { 202 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil) 203 } 204 }) 205 206 // self-destruct code, useful for dumping goroutines on windows 207 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 208 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil { 209 log.Log(ctx, "error writing rtpprof", "error", err) 210 } 211 log.Log(ctx, "got POST /abort, self-destructing") 212 os.Exit(1) 213 }) 214 215 handleIncomingStream := func(w http.ResponseWriter, httpReq *http.Request, p httprouter.Params) { 216 var r io.Reader = httpReq.Body 217 key := p.ByName("key") 218 limitStr := httpReq.URL.Query().Get("ratelimit") 219 if limitStr != "" { 220 limit, err := strconv.Atoi(limitStr) 221 if err != nil { 222 errors.WriteHTTPBadRequest(w, "invalid ratelimit", err) 223 return 224 } 225 bucket := ratelimit.NewBucketWithRate(float64(limit), int64(limit)) // 2 Mbps 226 r = ratelimit.Reader(r, bucket) 227 } 228 log.Log(ctx, "stream start") 229 230 var mediaSigner media.MediaSigner 231 var ok bool 232 var err error 233 parts := strings.Split(key, "_") 234 235 if len(parts) == 2 { 236 a.SignerCacheMu.Lock() 237 mediaSigner, ok = a.SignerCache[parts[0]] 238 a.SignerCacheMu.Unlock() 239 if !ok { 240 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key) 241 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil) 242 return 243 } 244 } else { 245 mediaSigner, err = a.MakeMediaSigner(ctx, key) 246 if err != nil { 247 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 248 return 249 } 250 } 251 252 ctx = log.WithLogValues(ctx, "streamer", mediaSigner.Streamer()) 253 254 err = a.checkBanned(ctx, mediaSigner.Streamer()) 255 if err != nil { 256 errors.WriteHTTPUnauthorized(w, err.Error(), err) 257 return 258 } 259 260 err = a.MediaManager.MKVIngest(context.Background(), r, mediaSigner) 261 262 if err != nil { 263 log.Log(ctx, "stream error", "error", err) 264 errors.WriteHTTPInternalServerError(w, "stream error", err) 265 return 266 } 267 log.Log(ctx, "stream success", "url", httpReq.URL.String()) 268 } 269 270 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler 271 router.POST("/live/:key", handleIncomingStream) 272 router.PUT("/live/:key", handleIncomingStream) 273 274 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 275 id := p.ByName("id") 276 if id == "" { 277 errors.WriteHTTPBadRequest(w, "id required", nil) 278 return 279 } 280 events, err := a.Model.PlayerReport(id) 281 if err != nil { 282 errors.WriteHTTPBadRequest(w, err.Error(), err) 283 return 284 } 285 bs, err := json.Marshal(events) 286 if err != nil { 287 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 288 return 289 } 290 if _, err := w.Write(bs); err != nil { 291 log.Error(ctx, "error writing response", "error", err) 292 } 293 }) 294 295 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 296 id := p.ByName("id") 297 if id == "" { 298 errors.WriteHTTPBadRequest(w, "id required", nil) 299 return 300 } 301 segment, err := a.LocalDB.GetSegment(id) 302 if err != nil { 303 errors.WriteHTTPBadRequest(w, err.Error(), err) 304 return 305 } 306 if segment == nil { 307 errors.WriteHTTPNotFound(w, "segment not found", nil) 308 return 309 } 310 spSeg, err := segment.ToStreamplaceSegment() 311 if err != nil { 312 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err) 313 return 314 } 315 bs, err := json.Marshal(spSeg) 316 if err != nil { 317 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err) 318 return 319 } 320 if _, err := w.Write(bs); err != nil { 321 log.Error(ctx, "error writing response", "error", err) 322 } 323 }) 324 325 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 326 err := a.Model.ClearPlayerEvents() 327 if err != nil { 328 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err) 329 return 330 } 331 w.WriteHeader(204) 332 }) 333 334 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 335 user := p.ByName("user") 336 if user == "" { 337 errors.WriteHTTPBadRequest(w, "user required", nil) 338 return 339 } 340 341 followers, err := a.Model.GetUserFollowers(ctx, user) 342 if err != nil { 343 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 344 return 345 } 346 bs, err := json.Marshal(followers) 347 if err != nil { 348 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 349 return 350 } 351 if _, err := w.Write(bs); err != nil { 352 log.Error(ctx, "error writing response", "error", err) 353 } 354 }) 355 356 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 357 user := p.ByName("user") 358 if user == "" { 359 errors.WriteHTTPBadRequest(w, "user required", nil) 360 return 361 } 362 363 followers, err := a.Model.GetUserFollowing(ctx, user) 364 if err != nil { 365 errors.WriteHTTPInternalServerError(w, "unable to get followers", err) 366 return 367 } 368 bs, err := json.Marshal(followers) 369 if err != nil { 370 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 371 return 372 } 373 if _, err := w.Write(bs); err != nil { 374 log.Error(ctx, "error writing response", "error", err) 375 } 376 }) 377 378 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 379 notifications, err := a.StatefulDB.ListNotifications() 380 if err != nil { 381 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 382 return 383 } 384 bs, err := json.Marshal(notifications) 385 if err != nil { 386 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 387 return 388 } 389 if _, err := w.Write(bs); err != nil { 390 log.Error(ctx, "error writing response", "error", err) 391 } 392 }) 393 394 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 395 posts, err := a.Model.ListFeedPosts() 396 if err != nil { 397 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 398 return 399 } 400 bs, err := json.Marshal(posts) 401 if err != nil { 402 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 403 return 404 } 405 if _, err := w.Write(bs); err != nil { 406 log.Error(ctx, "error writing response", "error", err) 407 } 408 }) 409 410 router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 411 uri := p.ByName("uri") 412 if uri == "" { 413 errors.WriteHTTPBadRequest(w, "uri required", nil) 414 return 415 } 416 msg, err := a.Model.GetChatMessage(uri) 417 if err != nil { 418 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err) 419 return 420 } 421 spmsg, err := msg.ToStreamplaceMessageView() 422 if err != nil { 423 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err) 424 return 425 } 426 bs, err := json.Marshal(spmsg) 427 if err != nil { 428 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err) 429 return 430 } 431 if _, err := w.Write(bs); err != nil { 432 log.Error(ctx, "error writing response", "error", err) 433 } 434 }) 435 436 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 437 sessions, err := a.StatefulDB.ListOAuthSessions() 438 if err != nil { 439 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 440 return 441 } 442 bs, err := json.Marshal(sessions) 443 if err != nil { 444 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err) 445 return 446 } 447 if _, err := w.Write(bs); err != nil { 448 log.Error(ctx, "error writing response", "error", err) 449 } 450 }) 451 452 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 453 var payload notificationpkg.NotificationBlast 454 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { 455 errors.WriteHTTPBadRequest(w, "invalid request body", err) 456 return 457 } 458 notifications, err := a.StatefulDB.ListNotifications() 459 if err != nil { 460 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 461 return 462 } 463 if a.FirebaseNotifier == nil { 464 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil) 465 return 466 } 467 tokens := []string{} 468 for _, not := range notifications { 469 tokens = append(tokens, not.Token) 470 } 471 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload) 472 if err != nil { 473 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err) 474 return 475 } 476 w.WriteHeader(http.StatusNoContent) 477 }) 478 479 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 480 w.Header().Set("Access-Control-Allow-Origin", "*") 481 w.Header().Set("Access-Control-Allow-Methods", "PUT") 482 w.Header().Set("Access-Control-Allow-Headers", "Content-Type") 483 484 id := p.ByName("id") 485 if id == "" { 486 errors.WriteHTTPBadRequest(w, "id required", nil) 487 return 488 } 489 490 var ident model.Identity 491 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil { 492 errors.WriteHTTPBadRequest(w, "invalid request body", err) 493 return 494 } 495 ident.ID = id 496 497 if err := a.Model.UpdateIdentity(&ident); err != nil { 498 errors.WriteHTTPInternalServerError(w, "unable to update settings", err) 499 return 500 } 501 502 w.WriteHeader(http.StatusNoContent) 503 }) 504 505 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 506 key := p.ByName("streamKey") 507 if key == "" { 508 errors.WriteHTTPBadRequest(w, "streamKey required", nil) 509 return 510 } 511 mediaSigner, err := a.MakeMediaSigner(ctx, key) 512 if err != nil { 513 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 514 return 515 } 516 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body) 517 if err != nil { 518 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err) 519 return 520 } 521 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan error, 1)) 522 if err != nil { 523 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err) 524 return 525 } 526 w.WriteHeader(200) 527 if _, err := w.Write([]byte(answer.SDP)); err != nil { 528 errors.WriteHTTPInternalServerError(w, "unable to write response", err) 529 log.Error(ctx, "error writing response", "error", err) 530 } 531 }) 532 533 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 534 did := p.ByName("did") 535 if did == "" { 536 errors.WriteHTTPBadRequest(w, "did required", nil) 537 return 538 } 539 user, err := a.NormalizeUser(ctx, did) 540 if err != nil { 541 errors.WriteHTTPBadRequest(w, "invalid user", err) 542 return 543 } 544 secsStr := r.URL.Query().Get("secs") 545 secs := 60 // Default to 60 seconds 546 if secsStr != "" { 547 parsedSecs, err := strconv.Atoi(secsStr) 548 if err != nil { 549 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err) 550 return 551 } 552 secs = parsedSecs 553 } 554 after := time.Now().Add(-time.Duration(secs) * time.Second) 555 w.Header().Set("Content-Type", "video/mp4") 556 err = media.ClipUser(ctx, a.LocalDB, a.CLI, user, w, nil, &after) 557 if err != nil { 558 errors.WriteHTTPInternalServerError(w, "unable to clip user", err) 559 return 560 } 561 }) 562 563 handler := sloghttp.Recovery(router) 564 if log.Level(4) { 565 handler = sloghttp.New(slog.Default())(handler) 566 } 567 return handler, nil 568} 569 570func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) { 571 payload, err := base64.URLEncoding.DecodeString(key) 572 if err != nil { 573 return "", err 574 } 575 signed, err := a.Signer.Verify(payload) 576 if err != nil { 577 return "", err 578 } 579 _, ok := signed.Data().(*v0.StreamKey) 580 if !ok { 581 return "", fmt.Errorf("got signed data but it wasn't a stream key") 582 } 583 return strings.ToLower(signed.Signer()), nil 584}