Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log/slog"
11 "net/http"
12 "net/http/pprof"
13 "os"
14 "path/filepath"
15 "regexp"
16 "runtime"
17 rtpprof "runtime/pprof"
18 "strconv"
19 "strings"
20 "time"
21
22 "github.com/julienschmidt/httprouter"
23 "github.com/prometheus/client_golang/prometheus/promhttp"
24 sloghttp "github.com/samber/slog-http"
25 "golang.org/x/sync/errgroup"
26 "stream.place/streamplace/pkg/errors"
27 "stream.place/streamplace/pkg/log"
28 "stream.place/streamplace/pkg/mist/mistconfig"
29 "stream.place/streamplace/pkg/mist/misttriggers"
30 "stream.place/streamplace/pkg/model"
31 notificationpkg "stream.place/streamplace/pkg/notifications"
32 "stream.place/streamplace/pkg/renditions"
33 v0 "stream.place/streamplace/pkg/schema/v0"
34)
35
36func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
37 handler, err := a.InternalHandler(ctx)
38 if err != nil {
39 return err
40 }
41 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
42 s.Addr = a.CLI.HttpInternalAddr
43 log.Log(ctx, "http server starting", "addr", s.Addr)
44 return s.ListenAndServe()
45 })
46}
47
48// lightweight way to authenticate push requests to ourself
49var mkvRE *regexp.Regexp
50
51func init() {
52 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
53}
54
55func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
56 router := httprouter.New()
57 broker := misttriggers.NewTriggerBroker()
58 broker.OnPushOutStart(func(ctx context.Context, payload *misttriggers.PushOutStartPayload) (string, error) {
59 return payload.URL, nil
60 })
61 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
62 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
63
64 ms := time.Now().UnixMilli()
65 out := fmt.Sprintf("%s+%s_%d", mistconfig.STREAM_NAME, payload.StreamName, ms)
66
67 return out, nil
68 })
69 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
70 router.POST("/mist-trigger", triggerCollection.Trigger())
71 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
72
73 // Add pprof handlers
74 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
75 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
76 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
77 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
78 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
79 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
80 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
81 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
82 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
83 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
84 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
85
86 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
87 runtime.GC()
88 w.WriteHeader(204)
89 })
90
91 router.Handler("GET", "/metrics", promhttp.Handler())
92
93 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
94 user := p.ByName("user")
95 if user == "" {
96 errors.WriteHTTPBadRequest(w, "user required", nil)
97 return
98 }
99 rendition := p.ByName("rendition")
100 if rendition == "" {
101 errors.WriteHTTPBadRequest(w, "rendition required", nil)
102 return
103 }
104 user, err := a.NormalizeUser(ctx, user)
105 if err != nil {
106 errors.WriteHTTPBadRequest(w, "invalid user", err)
107 return
108 }
109 w.Header().Set("content-type", "text/plain")
110 fmt.Fprintf(w, "ffconcat version 1.0\n")
111 // intermittent reports that you need two here to make things work properly? shouldn't matter.
112 for i := 0; i < 2; i += 1 {
113 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
114 }
115 })
116
117 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
118 user := p.ByName("user")
119 if user == "" {
120 errors.WriteHTTPBadRequest(w, "user required", nil)
121 return
122 }
123 user, err := a.NormalizeUser(ctx, user)
124 if err != nil {
125 errors.WriteHTTPBadRequest(w, "invalid user", err)
126 return
127 }
128 rendition := p.ByName("rendition")
129 if rendition == "" {
130 errors.WriteHTTPBadRequest(w, "rendition required", nil)
131 return
132 }
133 seg := <-a.MediaManager.SubscribeSegment(ctx, user, rendition)
134 base := filepath.Base(seg.Filepath)
135 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
136 w.WriteHeader(301)
137 })
138
139 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
140 user := p.ByName("user")
141 if user == "" {
142 errors.WriteHTTPBadRequest(w, "user required", nil)
143 return
144 }
145 user, err := a.NormalizeUser(ctx, user)
146 if err != nil {
147 errors.WriteHTTPBadRequest(w, "invalid user", err)
148 return
149 }
150 file := p.ByName("file")
151 if file == "" {
152 errors.WriteHTTPBadRequest(w, "file required", nil)
153 return
154 }
155 fullpath, err := a.CLI.SegmentFilePath(user, file)
156 if err != nil {
157 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
158 return
159 }
160 http.ServeFile(w, r, fullpath)
161 })
162
163 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
164 user := p.ByName("user")
165 if user == "" {
166 errors.WriteHTTPBadRequest(w, "user required", nil)
167 return
168 }
169 rendition := p.ByName("rendition")
170 if rendition == "" {
171 errors.WriteHTTPBadRequest(w, "rendition required", nil)
172 return
173 }
174 user, err := a.NormalizeUser(ctx, user)
175 if err != nil {
176 errors.WriteHTTPBadRequest(w, "invalid user", err)
177 return
178 }
179 var delayMS int64 = 1000
180 userDelay := r.URL.Query().Get("delayms")
181 if userDelay != "" {
182 var err error
183 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
184 if err != nil {
185 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
186 return
187 }
188 if delayMS > 10000 {
189 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
190 return
191 }
192 }
193 w.Header().Set("Content-Type", "video/mp4")
194 w.WriteHeader(200)
195 g, ctx := errgroup.WithContext(ctx)
196 pr, pw := io.Pipe()
197 bufw := bufio.NewWriter(pw)
198 g.Go(func() error {
199 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
200 })
201 g.Go(func() error {
202 time.Sleep(time.Duration(delayMS) * time.Millisecond)
203 _, err := io.Copy(w, pr)
204 return err
205 })
206 g.Wait()
207 })
208
209 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
210 user := p.ByName("user")
211 if user == "" {
212 errors.WriteHTTPBadRequest(w, "user required", nil)
213 return
214 }
215 w.Header().Set("Content-Type", "video/x-matroska")
216 w.Header().Set("Transfer-Encoding", "chunked")
217 w.WriteHeader(200)
218 })
219
220 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
221 uu := p.ByName("uuid")
222 if uu == "" {
223 errors.WriteHTTPBadRequest(w, "uuid required", nil)
224 return
225 }
226 pr := a.MediaManager.GetHTTPPipeWriter(uu)
227 if pr == nil {
228 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
229 return
230 }
231 io.Copy(pr, r.Body)
232 })
233
234 // self-destruct code, useful for dumping goroutines on windows
235 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
236 rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
237 log.Log(ctx, "got POST /abort, self-destructing")
238 os.Exit(1)
239 })
240
241 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
242 log.Log(ctx, "stream start")
243 err := a.MediaManager.IngestStream(ctx, r.Body, a.MediaSigner)
244
245 if err != nil {
246 log.Log(ctx, "stream error", "error", err)
247 errors.WriteHTTPInternalServerError(w, "stream error", err)
248 return
249 }
250 log.Log(ctx, "stream success", "url", r.URL.String())
251 }
252
253 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
254 router.POST("/stream/:key", handleIncomingStream)
255 router.PUT("/stream/:key", handleIncomingStream)
256
257 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
258 id := p.ByName("id")
259 if id == "" {
260 errors.WriteHTTPBadRequest(w, "id required", nil)
261 return
262 }
263 events, err := a.Model.PlayerReport(id)
264 if err != nil {
265 errors.WriteHTTPBadRequest(w, err.Error(), err)
266 return
267 }
268 bs, err := json.Marshal(events)
269 if err != nil {
270 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
271 return
272 }
273 w.Write(bs)
274 })
275
276 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
277 id := p.ByName("id")
278 if id == "" {
279 errors.WriteHTTPBadRequest(w, "id required", nil)
280 return
281 }
282 segment, err := a.Model.GetSegment(id)
283 if err != nil {
284 errors.WriteHTTPBadRequest(w, err.Error(), err)
285 return
286 }
287 if segment == nil {
288 errors.WriteHTTPNotFound(w, "segment not found", nil)
289 return
290 }
291 spSeg, err := segment.ToStreamplaceSegment()
292 if err != nil {
293 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
294 return
295 }
296 bs, err := json.Marshal(spSeg)
297 if err != nil {
298 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
299 return
300 }
301 w.Write(bs)
302 })
303
304 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
305 err := a.Model.ClearPlayerEvents()
306 if err != nil {
307 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
308 return
309 }
310 w.WriteHeader(204)
311 })
312
313 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
314 w.Header().Set("Access-Control-Allow-Origin", "*")
315 w.Header().Set("Access-Control-Allow-Methods", "GET")
316 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
317
318 id := a.Signer.Hex()
319
320 ident, err := a.Model.GetIdentity(id)
321 if err != nil {
322 errors.WriteHTTPInternalServerError(w, "unable to get settings", err)
323 return
324 }
325
326 bs, err := json.Marshal(ident)
327 if err != nil {
328 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
329 return
330 }
331 w.Write(bs)
332 })
333
334 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
335 user := p.ByName("user")
336 if user == "" {
337 errors.WriteHTTPBadRequest(w, "user required", nil)
338 return
339 }
340
341 followers, err := a.Model.GetUserFollowers(ctx, user)
342 if err != nil {
343 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
344 return
345 }
346 bs, err := json.Marshal(followers)
347 if err != nil {
348 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
349 return
350 }
351 w.Write(bs)
352 })
353
354 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
355 user := p.ByName("user")
356 if user == "" {
357 errors.WriteHTTPBadRequest(w, "user required", nil)
358 return
359 }
360
361 followers, err := a.Model.GetUserFollowing(ctx, user)
362 if err != nil {
363 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
364 return
365 }
366 bs, err := json.Marshal(followers)
367 if err != nil {
368 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
369 return
370 }
371 w.Write(bs)
372 })
373
374 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
375 notifications, err := a.Model.ListNotifications()
376 if err != nil {
377 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
378 return
379 }
380 bs, err := json.Marshal(notifications)
381 if err != nil {
382 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
383 return
384 }
385 w.Write(bs)
386 })
387
388 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
389 posts, err := a.Model.ListFeedPosts()
390 if err != nil {
391 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
392 return
393 }
394 bs, err := json.Marshal(posts)
395 if err != nil {
396 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
397 return
398 }
399 w.Write(bs)
400 })
401
402 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
403 cid := p.ByName("cid")
404 if cid == "" {
405 errors.WriteHTTPBadRequest(w, "cid required", nil)
406 return
407 }
408 msg, err := a.Model.GetChatMessage(cid)
409 if err != nil {
410 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
411 return
412 }
413 spmsg, err := msg.ToStreamplaceMessageView()
414 if err != nil {
415 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
416 return
417 }
418 bs, err := json.Marshal(spmsg)
419 if err != nil {
420 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
421 return
422 }
423 w.Write(bs)
424 })
425
426 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
427 sessions, err := a.Model.ListOAuthSessions()
428 if err != nil {
429 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
430 return
431 }
432 bs, err := json.Marshal(sessions)
433 if err != nil {
434 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
435 return
436 }
437 w.Write(bs)
438 })
439
440 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
441 var payload notificationpkg.NotificationBlast
442 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
443 errors.WriteHTTPBadRequest(w, "invalid request body", err)
444 return
445 }
446 notifications, err := a.Model.ListNotifications()
447 if err != nil {
448 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
449 return
450 }
451 if a.FirebaseNotifier == nil {
452 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
453 return
454 }
455 tokens := []string{}
456 for _, not := range notifications {
457 tokens = append(tokens, not.Token)
458 }
459 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
460 if err != nil {
461 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
462 return
463 }
464 w.WriteHeader(http.StatusNoContent)
465 })
466
467 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
468 w.Header().Set("Access-Control-Allow-Origin", "*")
469 w.Header().Set("Access-Control-Allow-Methods", "PUT")
470 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
471
472 id := p.ByName("id")
473 if id == "" {
474 errors.WriteHTTPBadRequest(w, "id required", nil)
475 return
476 }
477
478 var ident model.Identity
479 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
480 errors.WriteHTTPBadRequest(w, "invalid request body", err)
481 return
482 }
483 ident.ID = id
484
485 if err := a.Model.UpdateIdentity(&ident); err != nil {
486 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
487 return
488 }
489
490 w.WriteHeader(http.StatusNoContent)
491 })
492
493 router.POST("/livepeer-auth-webhook-url", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
494 var payload struct {
495 URL string `json:"url"`
496 }
497 // urls look like http://127.0.0.1:9999/live/did:plc:dkh4rwafdcda4ko7lewe43ml-uucbv40mdkcfat50/47.mp4
498 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
499 errors.WriteHTTPBadRequest(w, "invalid request body (could not decode)", err)
500 return
501 }
502 parts := strings.Split(payload.URL, "/")
503 if len(parts) < 5 {
504 errors.WriteHTTPBadRequest(w, "invalid request body (too few parts)", nil)
505 return
506 }
507 didSession := parts[4]
508 idParts := strings.Split(didSession, "-")
509 if len(idParts) != 2 {
510 errors.WriteHTTPBadRequest(w, "invalid request body (invalid did session)", nil)
511 return
512 }
513 did := idParts[0]
514 // sessionID := idParts[1]
515 seg, err := a.Model.LatestSegmentForUser(did)
516 if err != nil {
517 errors.WriteHTTPInternalServerError(w, "unable to get latest segment", err)
518 return
519 }
520 spseg, err := seg.ToStreamplaceSegment()
521 if err != nil {
522 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
523 return
524 }
525 renditions, err := renditions.GenerateRenditions(spseg)
526 if err != nil {
527 errors.WriteHTTPInternalServerError(w, "unable to generate renditions", err)
528 return
529 }
530 out := map[string]any{
531 "manifestID": didSession,
532 "profiles": renditions.ToLivepeerProfiles(),
533 }
534 bs, err := json.Marshal(out)
535 if err != nil {
536 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
537 return
538 }
539 w.Write(bs)
540 })
541
542 handler := sloghttp.Recovery(router)
543 if log.Level(4) {
544 handler = sloghttp.New(slog.Default())(handler)
545 }
546 return handler, nil
547}
548
549func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
550 payload, err := base64.URLEncoding.DecodeString(key)
551 if err != nil {
552 return "", err
553 }
554 signed, err := a.Signer.Verify(payload)
555 if err != nil {
556 return "", err
557 }
558 _, ok := signed.Data().(*v0.StreamKey)
559 if !ok {
560 return "", fmt.Errorf("got signed data but it wasn't a stream key")
561 }
562 return strings.ToLower(signed.Signer()), nil
563}