Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log/slog"
11 "net/http"
12 "net/http/pprof"
13 "os"
14 "path/filepath"
15 "regexp"
16 "runtime"
17 rtpprof "runtime/pprof"
18 "strconv"
19 "strings"
20 "time"
21
22 "github.com/julienschmidt/httprouter"
23 "github.com/prometheus/client_golang/prometheus/promhttp"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/sync/errgroup"
26 "stream.place/streamplace/pkg/errors"
27 "stream.place/streamplace/pkg/log"
28 "stream.place/streamplace/pkg/media"
29 "stream.place/streamplace/pkg/mist/mistconfig"
30 "stream.place/streamplace/pkg/mist/misttriggers"
31 "stream.place/streamplace/pkg/model"
32 notificationpkg "stream.place/streamplace/pkg/notifications"
33 "stream.place/streamplace/pkg/renditions"
34 v0 "stream.place/streamplace/pkg/schema/v0"
35)
36
37func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
38 handler, err := a.InternalHandler(ctx)
39 if err != nil {
40 return err
41 }
42 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
43 s.Addr = a.CLI.HTTPInternalAddr
44 log.Log(ctx, "http server starting", "addr", s.Addr)
45 return s.ListenAndServe()
46 })
47}
48
49// lightweight way to authenticate push requests to ourself
50var mkvRE *regexp.Regexp
51
52func init() {
53 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
54}
55
56func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
57 router := httprouter.New()
58 broker := misttriggers.NewTriggerBroker()
59
60 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
61 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
62 // Extract the last part of the URL path
63 urlPath := payload.URL.Path
64 parts := strings.Split(urlPath, "/")
65 lastPart := ""
66 if len(parts) > 0 {
67 lastPart = parts[len(parts)-1]
68 }
69 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
70 if err != nil {
71 return "", err
72 }
73
74 ms := time.Now().UnixMilli()
75 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms)
76 a.SignerCacheMu.Lock()
77 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
78 a.SignerCacheMu.Unlock()
79 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
80
81 return out, nil
82 })
83 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
84 router.POST("/mist-trigger", triggerCollection.Trigger())
85 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
86
87 // Add pprof handlers
88 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
89 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
90 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
91 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
92 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
93 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
94 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
95 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
96 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
97 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
98 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
99
100 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
101 runtime.GC()
102 w.WriteHeader(204)
103 })
104
105 router.Handler("GET", "/metrics", promhttp.Handler())
106
107 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
108 user := p.ByName("user")
109 if user == "" {
110 errors.WriteHTTPBadRequest(w, "user required", nil)
111 return
112 }
113 rendition := p.ByName("rendition")
114 if rendition == "" {
115 errors.WriteHTTPBadRequest(w, "rendition required", nil)
116 return
117 }
118 user, err := a.NormalizeUser(ctx, user)
119 if err != nil {
120 errors.WriteHTTPBadRequest(w, "invalid user", err)
121 return
122 }
123 w.Header().Set("content-type", "text/plain")
124 fmt.Fprintf(w, "ffconcat version 1.0\n")
125 // intermittent reports that you need two here to make things work properly? shouldn't matter.
126 for i := 0; i < 2; i += 1 {
127 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
128 }
129 })
130
131 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
132 user := p.ByName("user")
133 if user == "" {
134 errors.WriteHTTPBadRequest(w, "user required", nil)
135 return
136 }
137 user, err := a.NormalizeUser(ctx, user)
138 if err != nil {
139 errors.WriteHTTPBadRequest(w, "invalid user", err)
140 return
141 }
142 rendition := p.ByName("rendition")
143 if rendition == "" {
144 errors.WriteHTTPBadRequest(w, "rendition required", nil)
145 return
146 }
147 seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition)
148 base := filepath.Base(seg.Filepath)
149 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
150 w.WriteHeader(301)
151 })
152
153 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
154 user := p.ByName("user")
155 if user == "" {
156 errors.WriteHTTPBadRequest(w, "user required", nil)
157 return
158 }
159 user, err := a.NormalizeUser(ctx, user)
160 if err != nil {
161 errors.WriteHTTPBadRequest(w, "invalid user", err)
162 return
163 }
164 file := p.ByName("file")
165 if file == "" {
166 errors.WriteHTTPBadRequest(w, "file required", nil)
167 return
168 }
169 fullpath, err := a.CLI.SegmentFilePath(user, file)
170 if err != nil {
171 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
172 return
173 }
174 http.ServeFile(w, r, fullpath)
175 })
176
177 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
178 user := p.ByName("user")
179 if user == "" {
180 errors.WriteHTTPBadRequest(w, "user required", nil)
181 return
182 }
183 rendition := p.ByName("rendition")
184 if rendition == "" {
185 errors.WriteHTTPBadRequest(w, "rendition required", nil)
186 return
187 }
188 user, err := a.NormalizeUser(ctx, user)
189 if err != nil {
190 errors.WriteHTTPBadRequest(w, "invalid user", err)
191 return
192 }
193 var delayMS int64 = 1000
194 userDelay := r.URL.Query().Get("delayms")
195 if userDelay != "" {
196 var err error
197 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
198 if err != nil {
199 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
200 return
201 }
202 if delayMS > 10000 {
203 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
204 return
205 }
206 }
207 w.Header().Set("Content-Type", "video/mp4")
208 w.WriteHeader(200)
209 g, ctx := errgroup.WithContext(ctx)
210 pr, pw := io.Pipe()
211 bufw := bufio.NewWriter(pw)
212 g.Go(func() error {
213 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
214 })
215 g.Go(func() error {
216 time.Sleep(time.Duration(delayMS) * time.Millisecond)
217 _, err := io.Copy(w, pr)
218 return err
219 })
220 if err := g.Wait(); err != nil {
221 errors.WriteHTTPBadRequest(w, "request failed", err)
222 }
223 })
224
225 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
226 user := p.ByName("user")
227 if user == "" {
228 errors.WriteHTTPBadRequest(w, "user required", nil)
229 return
230 }
231 w.Header().Set("Content-Type", "video/x-matroska")
232 w.Header().Set("Transfer-Encoding", "chunked")
233 w.WriteHeader(200)
234 })
235
236 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
237 uu := p.ByName("uuid")
238 if uu == "" {
239 errors.WriteHTTPBadRequest(w, "uuid required", nil)
240 return
241 }
242 pr := a.MediaManager.GetHTTPPipeWriter(uu)
243 if pr == nil {
244 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
245 return
246 }
247 if _, err := io.Copy(pr, r.Body); err != nil {
248 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil)
249 }
250 })
251
252 // self-destruct code, useful for dumping goroutines on windows
253 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
254 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
255 log.Log(ctx, "error writing rtpprof", "error", err)
256 }
257 log.Log(ctx, "got POST /abort, self-destructing")
258 os.Exit(1)
259 })
260
261 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
262 key := p.ByName("key")
263 log.Log(ctx, "stream start")
264
265 var mediaSigner media.MediaSigner
266 var ok bool
267 var err error
268 parts := strings.Split(key, "_")
269
270 if len(parts) == 2 {
271 a.SignerCacheMu.Lock()
272 mediaSigner, ok = a.SignerCache[parts[0]]
273 a.SignerCacheMu.Unlock()
274 if !ok {
275 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
276 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
277 return
278 }
279 } else {
280 mediaSigner, err = a.MakeMediaSigner(ctx, key)
281 if err != nil {
282 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
283 return
284 }
285 }
286
287 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner)
288
289 if err != nil {
290 log.Log(ctx, "stream error", "error", err)
291 errors.WriteHTTPInternalServerError(w, "stream error", err)
292 return
293 }
294 log.Log(ctx, "stream success", "url", r.URL.String())
295 }
296
297 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
298 router.POST("/live/:key", handleIncomingStream)
299 router.PUT("/live/:key", handleIncomingStream)
300
301 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
302 id := p.ByName("id")
303 if id == "" {
304 errors.WriteHTTPBadRequest(w, "id required", nil)
305 return
306 }
307 events, err := a.Model.PlayerReport(id)
308 if err != nil {
309 errors.WriteHTTPBadRequest(w, err.Error(), err)
310 return
311 }
312 bs, err := json.Marshal(events)
313 if err != nil {
314 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
315 return
316 }
317 if _, err := w.Write(bs); err != nil {
318 log.Error(ctx, "error writing response", "error", err)
319 }
320 })
321
322 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
323 id := p.ByName("id")
324 if id == "" {
325 errors.WriteHTTPBadRequest(w, "id required", nil)
326 return
327 }
328 segment, err := a.Model.GetSegment(id)
329 if err != nil {
330 errors.WriteHTTPBadRequest(w, err.Error(), err)
331 return
332 }
333 if segment == nil {
334 errors.WriteHTTPNotFound(w, "segment not found", nil)
335 return
336 }
337 spSeg, err := segment.ToStreamplaceSegment()
338 if err != nil {
339 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
340 return
341 }
342 bs, err := json.Marshal(spSeg)
343 if err != nil {
344 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
345 return
346 }
347 if _, err := w.Write(bs); err != nil {
348 log.Error(ctx, "error writing response", "error", err)
349 }
350 })
351
352 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
353 err := a.Model.ClearPlayerEvents()
354 if err != nil {
355 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
356 return
357 }
358 w.WriteHeader(204)
359 })
360
361 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
362 w.Header().Set("Access-Control-Allow-Origin", "*")
363 w.Header().Set("Access-Control-Allow-Methods", "GET")
364 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
365
366 id := a.Signer.Hex()
367
368 ident, err := a.Model.GetIdentity(id)
369 if err != nil {
370 errors.WriteHTTPInternalServerError(w, "unable to get settings", err)
371 return
372 }
373
374 bs, err := json.Marshal(ident)
375 if err != nil {
376 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
377 return
378 }
379 if _, err := w.Write(bs); err != nil {
380 log.Error(ctx, "error writing response", "error", err)
381 }
382 })
383
384 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
385 user := p.ByName("user")
386 if user == "" {
387 errors.WriteHTTPBadRequest(w, "user required", nil)
388 return
389 }
390
391 followers, err := a.Model.GetUserFollowers(ctx, user)
392 if err != nil {
393 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
394 return
395 }
396 bs, err := json.Marshal(followers)
397 if err != nil {
398 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
399 return
400 }
401 if _, err := w.Write(bs); err != nil {
402 log.Error(ctx, "error writing response", "error", err)
403 }
404 })
405
406 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
407 user := p.ByName("user")
408 if user == "" {
409 errors.WriteHTTPBadRequest(w, "user required", nil)
410 return
411 }
412
413 followers, err := a.Model.GetUserFollowing(ctx, user)
414 if err != nil {
415 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
416 return
417 }
418 bs, err := json.Marshal(followers)
419 if err != nil {
420 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
421 return
422 }
423 if _, err := w.Write(bs); err != nil {
424 log.Error(ctx, "error writing response", "error", err)
425 }
426 })
427
428 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
429 notifications, err := a.Model.ListNotifications()
430 if err != nil {
431 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
432 return
433 }
434 bs, err := json.Marshal(notifications)
435 if err != nil {
436 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
437 return
438 }
439 if _, err := w.Write(bs); err != nil {
440 log.Error(ctx, "error writing response", "error", err)
441 }
442 })
443
444 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
445 posts, err := a.Model.ListFeedPosts()
446 if err != nil {
447 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
448 return
449 }
450 bs, err := json.Marshal(posts)
451 if err != nil {
452 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
453 return
454 }
455 if _, err := w.Write(bs); err != nil {
456 log.Error(ctx, "error writing response", "error", err)
457 }
458 })
459
460 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
461 cid := p.ByName("cid")
462 if cid == "" {
463 errors.WriteHTTPBadRequest(w, "cid required", nil)
464 return
465 }
466 msg, err := a.Model.GetChatMessage(cid)
467 if err != nil {
468 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
469 return
470 }
471 spmsg, err := msg.ToStreamplaceMessageView()
472 if err != nil {
473 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
474 return
475 }
476 bs, err := json.Marshal(spmsg)
477 if err != nil {
478 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
479 return
480 }
481 if _, err := w.Write(bs); err != nil {
482 log.Error(ctx, "error writing response", "error", err)
483 }
484 })
485
486 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
487 sessions, err := a.Model.ListOAuthSessions()
488 if err != nil {
489 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
490 return
491 }
492 bs, err := json.Marshal(sessions)
493 if err != nil {
494 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
495 return
496 }
497 if _, err := w.Write(bs); err != nil {
498 log.Error(ctx, "error writing response", "error", err)
499 }
500 })
501
502 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
503 var payload notificationpkg.NotificationBlast
504 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
505 errors.WriteHTTPBadRequest(w, "invalid request body", err)
506 return
507 }
508 notifications, err := a.Model.ListNotifications()
509 if err != nil {
510 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
511 return
512 }
513 if a.FirebaseNotifier == nil {
514 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
515 return
516 }
517 tokens := []string{}
518 for _, not := range notifications {
519 tokens = append(tokens, not.Token)
520 }
521 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
522 if err != nil {
523 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
524 return
525 }
526 w.WriteHeader(http.StatusNoContent)
527 })
528
529 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
530 w.Header().Set("Access-Control-Allow-Origin", "*")
531 w.Header().Set("Access-Control-Allow-Methods", "PUT")
532 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
533
534 id := p.ByName("id")
535 if id == "" {
536 errors.WriteHTTPBadRequest(w, "id required", nil)
537 return
538 }
539
540 var ident model.Identity
541 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
542 errors.WriteHTTPBadRequest(w, "invalid request body", err)
543 return
544 }
545 ident.ID = id
546
547 if err := a.Model.UpdateIdentity(&ident); err != nil {
548 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
549 return
550 }
551
552 w.WriteHeader(http.StatusNoContent)
553 })
554
555 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
556 var payload struct {
557 URL string `json:"url"`
558 }
559 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4
560 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
561 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err)
562 return
563 }
564 parts := strings.Split(payload.URL, "/")
565 if len(parts) < 5 {
566 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil)
567 return
568 }
569 didSession := parts[4]
570 idParts := strings.Split(didSession, "-")
571 if len(idParts) != 2 {
572 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil)
573 return
574 }
575 did := idParts[0]
576 // sessionID := idParts[1]
577 seg, err := a.Model.LatestSegmentForUser(did)
578 if err != nil {
579 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err)
580 return
581 }
582 spseg, err := seg.ToStreamplaceSegment()
583 if err != nil {
584 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
585 return
586 }
587 renditions, err := renditions.GenerateRenditions(spseg)
588 if err != nil {
589 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err)
590 return
591 }
592 out := map[string]any{
593 "manifestID": didSession,
594 "profiles": renditions.ToLivepeerProfiles(),
595 }
596 bs, err := json.Marshal(out)
597 if err != nil {
598 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
599 return
600 }
601 if _, err := w.Write(bs); err != nil {
602 log.Error(ctx, "error writing response", "error", err)
603 }
604 })
605
606 handler := sloghttp.Recovery(router)
607 if log.Level(4) {
608 handler = sloghttp.New(slog.Default())(handler)
609 }
610 return handler, nil
611}
612
613func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
614 payload, err := base64.URLEncoding.DecodeString(key)
615 if err != nil {
616 return "", err
617 }
618 signed, err := a.Signer.Verify(payload)
619 if err != nil {
620 return "", err
621 }
622 _, ok := signed.Data().(*v0.StreamKey)
623 if !ok {
624 return "", fmt.Errorf("got signed data but it wasn't a stream key")
625 }
626 return strings.ToLower(signed.Signer()), nil
627}