Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log/slog"
11 "net/http"
12 "net/http/pprof"
13 "os"
14 "path/filepath"
15 "regexp"
16 "runtime"
17 rtpprof "runtime/pprof"
18 "strconv"
19 "strings"
20 "time"
21
22 "github.com/julienschmidt/httprouter"
23 "github.com/prometheus/client_golang/prometheus/promhttp"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/sync/errgroup"
26 "stream.place/streamplace/pkg/errors"
27 "stream.place/streamplace/pkg/log"
28 "stream.place/streamplace/pkg/media"
29 "stream.place/streamplace/pkg/mist/mistconfig"
30 "stream.place/streamplace/pkg/mist/misttriggers"
31 "stream.place/streamplace/pkg/model"
32 notificationpkg "stream.place/streamplace/pkg/notifications"
33 "stream.place/streamplace/pkg/renditions"
34 v0 "stream.place/streamplace/pkg/schema/v0"
35)
36
37func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
38 handler, err := a.InternalHandler(ctx)
39 if err != nil {
40 return err
41 }
42 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
43 s.Addr = a.CLI.HttpInternalAddr
44 log.Log(ctx, "http server starting", "addr", s.Addr)
45 return s.ListenAndServe()
46 })
47}
48
49// lightweight way to authenticate push requests to ourself
50var mkvRE *regexp.Regexp
51
52func init() {
53 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
54}
55
56func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
57 router := httprouter.New()
58 broker := misttriggers.NewTriggerBroker()
59
60 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
61 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
62 // Extract the last part of the URL path
63 urlPath := payload.URL.Path
64 parts := strings.Split(urlPath, "/")
65 lastPart := ""
66 if len(parts) > 0 {
67 lastPart = parts[len(parts)-1]
68 }
69 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
70 if err != nil {
71 return "", err
72 }
73
74 ms := time.Now().UnixMilli()
75 out := fmt.Sprintf("%s+%s_%d", mistconfig.STREAM_NAME, mediaSigner.Streamer(), ms)
76 a.SignerCacheMu.Lock()
77 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
78 a.SignerCacheMu.Unlock()
79 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
80
81 return out, nil
82 })
83 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
84 router.POST("/mist-trigger", triggerCollection.Trigger())
85 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
86
87 // Add pprof handlers
88 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
89 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
90 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
91 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
92 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
93 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
94 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
95 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
96 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
97 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
98 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
99
100 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
101 runtime.GC()
102 w.WriteHeader(204)
103 })
104
105 router.Handler("GET", "/metrics", promhttp.Handler())
106
107 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
108 user := p.ByName("user")
109 if user == "" {
110 errors.WriteHTTPBadRequest(w, "user required", nil)
111 return
112 }
113 rendition := p.ByName("rendition")
114 if rendition == "" {
115 errors.WriteHTTPBadRequest(w, "rendition required", nil)
116 return
117 }
118 user, err := a.NormalizeUser(ctx, user)
119 if err != nil {
120 errors.WriteHTTPBadRequest(w, "invalid user", err)
121 return
122 }
123 w.Header().Set("content-type", "text/plain")
124 fmt.Fprintf(w, "ffconcat version 1.0\n")
125 // intermittent reports that you need two here to make things work properly? shouldn't matter.
126 for i := 0; i < 2; i += 1 {
127 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
128 }
129 })
130
131 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
132 user := p.ByName("user")
133 if user == "" {
134 errors.WriteHTTPBadRequest(w, "user required", nil)
135 return
136 }
137 user, err := a.NormalizeUser(ctx, user)
138 if err != nil {
139 errors.WriteHTTPBadRequest(w, "invalid user", err)
140 return
141 }
142 rendition := p.ByName("rendition")
143 if rendition == "" {
144 errors.WriteHTTPBadRequest(w, "rendition required", nil)
145 return
146 }
147 seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition)
148 base := filepath.Base(seg.Filepath)
149 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
150 w.WriteHeader(301)
151 })
152
153 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
154 user := p.ByName("user")
155 if user == "" {
156 errors.WriteHTTPBadRequest(w, "user required", nil)
157 return
158 }
159 user, err := a.NormalizeUser(ctx, user)
160 if err != nil {
161 errors.WriteHTTPBadRequest(w, "invalid user", err)
162 return
163 }
164 file := p.ByName("file")
165 if file == "" {
166 errors.WriteHTTPBadRequest(w, "file required", nil)
167 return
168 }
169 fullpath, err := a.CLI.SegmentFilePath(user, file)
170 if err != nil {
171 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
172 return
173 }
174 http.ServeFile(w, r, fullpath)
175 })
176
177 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
178 user := p.ByName("user")
179 if user == "" {
180 errors.WriteHTTPBadRequest(w, "user required", nil)
181 return
182 }
183 rendition := p.ByName("rendition")
184 if rendition == "" {
185 errors.WriteHTTPBadRequest(w, "rendition required", nil)
186 return
187 }
188 user, err := a.NormalizeUser(ctx, user)
189 if err != nil {
190 errors.WriteHTTPBadRequest(w, "invalid user", err)
191 return
192 }
193 var delayMS int64 = 1000
194 userDelay := r.URL.Query().Get("delayms")
195 if userDelay != "" {
196 var err error
197 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
198 if err != nil {
199 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
200 return
201 }
202 if delayMS > 10000 {
203 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
204 return
205 }
206 }
207 w.Header().Set("Content-Type", "video/mp4")
208 w.WriteHeader(200)
209 g, ctx := errgroup.WithContext(ctx)
210 pr, pw := io.Pipe()
211 bufw := bufio.NewWriter(pw)
212 g.Go(func() error {
213 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
214 })
215 g.Go(func() error {
216 time.Sleep(time.Duration(delayMS) * time.Millisecond)
217 _, err := io.Copy(w, pr)
218 return err
219 })
220 g.Wait()
221 })
222
223 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
224 user := p.ByName("user")
225 if user == "" {
226 errors.WriteHTTPBadRequest(w, "user required", nil)
227 return
228 }
229 w.Header().Set("Content-Type", "video/x-matroska")
230 w.Header().Set("Transfer-Encoding", "chunked")
231 w.WriteHeader(200)
232 })
233
234 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
235 uu := p.ByName("uuid")
236 if uu == "" {
237 errors.WriteHTTPBadRequest(w, "uuid required", nil)
238 return
239 }
240 pr := a.MediaManager.GetHTTPPipeWriter(uu)
241 if pr == nil {
242 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
243 return
244 }
245 io.Copy(pr, r.Body)
246 })
247
248 // self-destruct code, useful for dumping goroutines on windows
249 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
250 rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
251 log.Log(ctx, "got POST /abort, self-destructing")
252 os.Exit(1)
253 })
254
255 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
256 key := p.ByName("key")
257 log.Log(ctx, "stream start")
258
259 var mediaSigner media.MediaSigner
260 var ok bool
261 var err error
262 parts := strings.Split(key, "_")
263
264 if len(parts) == 2 {
265 a.SignerCacheMu.Lock()
266 mediaSigner, ok = a.SignerCache[parts[0]]
267 a.SignerCacheMu.Unlock()
268 if !ok {
269 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
270 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
271 return
272 }
273 } else {
274 mediaSigner, err = a.MakeMediaSigner(ctx, key)
275 if err != nil {
276 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
277 return
278 }
279 }
280
281 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner)
282
283 if err != nil {
284 log.Log(ctx, "stream error", "error", err)
285 errors.WriteHTTPInternalServerError(w, "stream error", err)
286 return
287 }
288 log.Log(ctx, "stream success", "url", r.URL.String())
289 }
290
291 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
292 router.POST("/live/:key", handleIncomingStream)
293 router.PUT("/live/:key", handleIncomingStream)
294
295 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
296 id := p.ByName("id")
297 if id == "" {
298 errors.WriteHTTPBadRequest(w, "id required", nil)
299 return
300 }
301 events, err := a.Model.PlayerReport(id)
302 if err != nil {
303 errors.WriteHTTPBadRequest(w, err.Error(), err)
304 return
305 }
306 bs, err := json.Marshal(events)
307 if err != nil {
308 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
309 return
310 }
311 w.Write(bs)
312 })
313
314 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
315 id := p.ByName("id")
316 if id == "" {
317 errors.WriteHTTPBadRequest(w, "id required", nil)
318 return
319 }
320 segment, err := a.Model.GetSegment(id)
321 if err != nil {
322 errors.WriteHTTPBadRequest(w, err.Error(), err)
323 return
324 }
325 if segment == nil {
326 errors.WriteHTTPNotFound(w, "segment not found", nil)
327 return
328 }
329 spSeg, err := segment.ToStreamplaceSegment()
330 if err != nil {
331 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
332 return
333 }
334 bs, err := json.Marshal(spSeg)
335 if err != nil {
336 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
337 return
338 }
339 w.Write(bs)
340 })
341
342 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
343 err := a.Model.ClearPlayerEvents()
344 if err != nil {
345 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
346 return
347 }
348 w.WriteHeader(204)
349 })
350
351 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
352 w.Header().Set("Access-Control-Allow-Origin", "*")
353 w.Header().Set("Access-Control-Allow-Methods", "GET")
354 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
355
356 id := a.Signer.Hex()
357
358 ident, err := a.Model.GetIdentity(id)
359 if err != nil {
360 errors.WriteHTTPInternalServerError(w, "unable to get settings", err)
361 return
362 }
363
364 bs, err := json.Marshal(ident)
365 if err != nil {
366 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
367 return
368 }
369 w.Write(bs)
370 })
371
372 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
373 user := p.ByName("user")
374 if user == "" {
375 errors.WriteHTTPBadRequest(w, "user required", nil)
376 return
377 }
378
379 followers, err := a.Model.GetUserFollowers(ctx, user)
380 if err != nil {
381 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
382 return
383 }
384 bs, err := json.Marshal(followers)
385 if err != nil {
386 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
387 return
388 }
389 w.Write(bs)
390 })
391
392 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
393 user := p.ByName("user")
394 if user == "" {
395 errors.WriteHTTPBadRequest(w, "user required", nil)
396 return
397 }
398
399 followers, err := a.Model.GetUserFollowing(ctx, user)
400 if err != nil {
401 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
402 return
403 }
404 bs, err := json.Marshal(followers)
405 if err != nil {
406 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
407 return
408 }
409 w.Write(bs)
410 })
411
412 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
413 notifications, err := a.Model.ListNotifications()
414 if err != nil {
415 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
416 return
417 }
418 bs, err := json.Marshal(notifications)
419 if err != nil {
420 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
421 return
422 }
423 w.Write(bs)
424 })
425
426 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
427 posts, err := a.Model.ListFeedPosts()
428 if err != nil {
429 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
430 return
431 }
432 bs, err := json.Marshal(posts)
433 if err != nil {
434 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
435 return
436 }
437 w.Write(bs)
438 })
439
440 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
441 cid := p.ByName("cid")
442 if cid == "" {
443 errors.WriteHTTPBadRequest(w, "cid required", nil)
444 return
445 }
446 msg, err := a.Model.GetChatMessage(cid)
447 if err != nil {
448 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
449 return
450 }
451 spmsg, err := msg.ToStreamplaceMessageView()
452 if err != nil {
453 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
454 return
455 }
456 bs, err := json.Marshal(spmsg)
457 if err != nil {
458 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
459 return
460 }
461 w.Write(bs)
462 })
463
464 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
465 sessions, err := a.Model.ListOAuthSessions()
466 if err != nil {
467 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
468 return
469 }
470 bs, err := json.Marshal(sessions)
471 if err != nil {
472 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
473 return
474 }
475 w.Write(bs)
476 })
477
478 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
479 var payload notificationpkg.NotificationBlast
480 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
481 errors.WriteHTTPBadRequest(w, "invalid request body", err)
482 return
483 }
484 notifications, err := a.Model.ListNotifications()
485 if err != nil {
486 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
487 return
488 }
489 if a.FirebaseNotifier == nil {
490 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
491 return
492 }
493 tokens := []string{}
494 for _, not := range notifications {
495 tokens = append(tokens, not.Token)
496 }
497 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
498 if err != nil {
499 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
500 return
501 }
502 w.WriteHeader(http.StatusNoContent)
503 })
504
505 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
506 w.Header().Set("Access-Control-Allow-Origin", "*")
507 w.Header().Set("Access-Control-Allow-Methods", "PUT")
508 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
509
510 id := p.ByName("id")
511 if id == "" {
512 errors.WriteHTTPBadRequest(w, "id required", nil)
513 return
514 }
515
516 var ident model.Identity
517 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
518 errors.WriteHTTPBadRequest(w, "invalid request body", err)
519 return
520 }
521 ident.ID = id
522
523 if err := a.Model.UpdateIdentity(&ident); err != nil {
524 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
525 return
526 }
527
528 w.WriteHeader(http.StatusNoContent)
529 })
530
531 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
532 var payload struct {
533 URL string `json:"url"`
534 }
535 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4
536 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
537 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err)
538 return
539 }
540 parts := strings.Split(payload.URL, "/")
541 if len(parts) < 5 {
542 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil)
543 return
544 }
545 didSession := parts[4]
546 idParts := strings.Split(didSession, "-")
547 if len(idParts) != 2 {
548 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil)
549 return
550 }
551 did := idParts[0]
552 // sessionID := idParts[1]
553 seg, err := a.Model.LatestSegmentForUser(did)
554 if err != nil {
555 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err)
556 return
557 }
558 spseg, err := seg.ToStreamplaceSegment()
559 if err != nil {
560 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
561 return
562 }
563 renditions, err := renditions.GenerateRenditions(spseg)
564 if err != nil {
565 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err)
566 return
567 }
568 out := map[string]any{
569 "manifestID": didSession,
570 "profiles": renditions.ToLivepeerProfiles(),
571 }
572 bs, err := json.Marshal(out)
573 if err != nil {
574 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
575 return
576 }
577 w.Write(bs)
578 })
579
580 handler := sloghttp.Recovery(router)
581 if log.Level(4) {
582 handler = sloghttp.New(slog.Default())(handler)
583 }
584 return handler, nil
585}
586
587func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
588 payload, err := base64.URLEncoding.DecodeString(key)
589 if err != nil {
590 return "", err
591 }
592 signed, err := a.Signer.Verify(payload)
593 if err != nil {
594 return "", err
595 }
596 _, ok := signed.Data().(*v0.StreamKey)
597 if !ok {
598 return "", fmt.Errorf("got signed data but it wasn't a stream key")
599 }
600 return strings.ToLower(signed.Signer()), nil
601}