Live video on the AT Protocol
1package api
2
3import (
4 "bufio"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log/slog"
11 "net/http"
12 "net/http/pprof"
13 "os"
14 "path/filepath"
15 "regexp"
16 "runtime"
17 rtpprof "runtime/pprof"
18 "strconv"
19 "strings"
20 "time"
21
22 "github.com/julienschmidt/httprouter"
23 "github.com/pion/webrtc/v4"
24 "github.com/prometheus/client_golang/prometheus/promhttp"
25 sloghttp "github.com/samber/slog-http"
26 "golang.org/x/sync/errgroup"
27 "stream.place/streamplace/pkg/errors"
28 "stream.place/streamplace/pkg/log"
29 "stream.place/streamplace/pkg/media"
30 "stream.place/streamplace/pkg/mist/mistconfig"
31 "stream.place/streamplace/pkg/mist/misttriggers"
32 "stream.place/streamplace/pkg/model"
33 notificationpkg "stream.place/streamplace/pkg/notifications"
34 "stream.place/streamplace/pkg/rtcrec"
35 v0 "stream.place/streamplace/pkg/schema/v0"
36)
37
38func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
39 handler, err := a.InternalHandler(ctx)
40 if err != nil {
41 return err
42 }
43 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
44 s.Addr = a.CLI.HTTPInternalAddr
45 log.Log(ctx, "http server starting", "addr", s.Addr)
46 return s.ListenAndServe()
47 })
48}
49
50// lightweight way to authenticate push requests to ourself
51var mkvRE *regexp.Regexp
52
53func init() {
54 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
55}
56
57func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
58 router := httprouter.New()
59 broker := misttriggers.NewTriggerBroker()
60
61 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
62 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
63 // Extract the last part of the URL path
64 urlPath := payload.URL.Path
65 parts := strings.Split(urlPath, "/")
66 lastPart := ""
67 if len(parts) > 0 {
68 lastPart = parts[len(parts)-1]
69 }
70 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
71 if err != nil {
72 return "", err
73 }
74
75 ms := time.Now().UnixMilli()
76 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms)
77 a.SignerCacheMu.Lock()
78 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
79 a.SignerCacheMu.Unlock()
80 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
81
82 return out, nil
83 })
84 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
85 router.POST("/mist-trigger", triggerCollection.Trigger())
86 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
87
88 // Add pprof handlers
89 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
90 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
91 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
92 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
93 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
94 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
95 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
96 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
97 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
98 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
99 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
100
101 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
102 runtime.GC()
103 w.WriteHeader(204)
104 })
105
106 router.Handler("GET", "/metrics", promhttp.Handler())
107
108 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
109 user := p.ByName("user")
110 if user == "" {
111 errors.WriteHTTPBadRequest(w, "user required", nil)
112 return
113 }
114 rendition := p.ByName("rendition")
115 if rendition == "" {
116 errors.WriteHTTPBadRequest(w, "rendition required", nil)
117 return
118 }
119 user, err := a.NormalizeUser(ctx, user)
120 if err != nil {
121 errors.WriteHTTPBadRequest(w, "invalid user", err)
122 return
123 }
124 w.Header().Set("content-type", "text/plain")
125 fmt.Fprintf(w, "ffconcat version 1.0\n")
126 // intermittent reports that you need two here to make things work properly? shouldn't matter.
127 for i := 0; i < 2; i += 1 {
128 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
129 }
130 })
131
132 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
133 user := p.ByName("user")
134 if user == "" {
135 errors.WriteHTTPBadRequest(w, "user required", nil)
136 return
137 }
138 user, err := a.NormalizeUser(ctx, user)
139 if err != nil {
140 errors.WriteHTTPBadRequest(w, "invalid user", err)
141 return
142 }
143 rendition := p.ByName("rendition")
144 if rendition == "" {
145 errors.WriteHTTPBadRequest(w, "rendition required", nil)
146 return
147 }
148 segChan := a.Bus.SubscribeSegment(ctx, user, rendition)
149 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan)
150 seg := <-segChan.C
151 base := filepath.Base(seg.Filepath)
152 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
153 w.WriteHeader(301)
154 })
155
156 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
157 user := p.ByName("user")
158 if user == "" {
159 errors.WriteHTTPBadRequest(w, "user required", nil)
160 return
161 }
162 user, err := a.NormalizeUser(ctx, user)
163 if err != nil {
164 errors.WriteHTTPBadRequest(w, "invalid user", err)
165 return
166 }
167 file := p.ByName("file")
168 if file == "" {
169 errors.WriteHTTPBadRequest(w, "file required", nil)
170 return
171 }
172 fullpath, err := a.CLI.SegmentFilePath(user, file)
173 if err != nil {
174 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
175 return
176 }
177 http.ServeFile(w, r, fullpath)
178 })
179
180 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
181 user := p.ByName("user")
182 if user == "" {
183 errors.WriteHTTPBadRequest(w, "user required", nil)
184 return
185 }
186 rendition := p.ByName("rendition")
187 if rendition == "" {
188 errors.WriteHTTPBadRequest(w, "rendition required", nil)
189 return
190 }
191 user, err := a.NormalizeUser(ctx, user)
192 if err != nil {
193 errors.WriteHTTPBadRequest(w, "invalid user", err)
194 return
195 }
196 var delayMS int64 = 1000
197 userDelay := r.URL.Query().Get("delayms")
198 if userDelay != "" {
199 var err error
200 delayMS, err = strconv.ParseInt(userDelay, 10, 64)
201 if err != nil {
202 errors.WriteHTTPBadRequest(w, "error parsing delay", err)
203 return
204 }
205 if delayMS > 10000 {
206 errors.WriteHTTPBadRequest(w, "delay too large, maximum 10000", nil)
207 return
208 }
209 }
210 w.Header().Set("Content-Type", "video/mp4")
211 w.WriteHeader(200)
212 g, ctx := errgroup.WithContext(ctx)
213 pr, pw := io.Pipe()
214 bufw := bufio.NewWriter(pw)
215 g.Go(func() error {
216 return a.MediaManager.SegmentToMP4(ctx, user, rendition, bufw)
217 })
218 g.Go(func() error {
219 time.Sleep(time.Duration(delayMS) * time.Millisecond)
220 _, err := io.Copy(w, pr)
221 return err
222 })
223 if err := g.Wait(); err != nil {
224 errors.WriteHTTPBadRequest(w, "request failed", err)
225 }
226 })
227
228 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
229 user := p.ByName("user")
230 if user == "" {
231 errors.WriteHTTPBadRequest(w, "user required", nil)
232 return
233 }
234 w.Header().Set("Content-Type", "video/x-matroska")
235 w.Header().Set("Transfer-Encoding", "chunked")
236 w.WriteHeader(200)
237 })
238
239 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
240 uu := p.ByName("uuid")
241 if uu == "" {
242 errors.WriteHTTPBadRequest(w, "uuid required", nil)
243 return
244 }
245 pr := a.MediaManager.GetHTTPPipeWriter(uu)
246 if pr == nil {
247 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
248 return
249 }
250 if _, err := io.Copy(pr, r.Body); err != nil {
251 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil)
252 }
253 })
254
255 // self-destruct code, useful for dumping goroutines on windows
256 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
257 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
258 log.Log(ctx, "error writing rtpprof", "error", err)
259 }
260 log.Log(ctx, "got POST /abort, self-destructing")
261 os.Exit(1)
262 })
263
264 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
265 key := p.ByName("key")
266 log.Log(ctx, "stream start")
267
268 var mediaSigner media.MediaSigner
269 var ok bool
270 var err error
271 parts := strings.Split(key, "_")
272
273 if len(parts) == 2 {
274 a.SignerCacheMu.Lock()
275 mediaSigner, ok = a.SignerCache[parts[0]]
276 a.SignerCacheMu.Unlock()
277 if !ok {
278 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
279 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
280 return
281 }
282 } else {
283 mediaSigner, err = a.MakeMediaSigner(ctx, key)
284 if err != nil {
285 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
286 return
287 }
288 }
289
290 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner)
291
292 if err != nil {
293 log.Log(ctx, "stream error", "error", err)
294 errors.WriteHTTPInternalServerError(w, "stream error", err)
295 return
296 }
297 log.Log(ctx, "stream success", "url", r.URL.String())
298 }
299
300 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
301 router.POST("/live/:key", handleIncomingStream)
302 router.PUT("/live/:key", handleIncomingStream)
303
304 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
305 id := p.ByName("id")
306 if id == "" {
307 errors.WriteHTTPBadRequest(w, "id required", nil)
308 return
309 }
310 events, err := a.Model.PlayerReport(id)
311 if err != nil {
312 errors.WriteHTTPBadRequest(w, err.Error(), err)
313 return
314 }
315 bs, err := json.Marshal(events)
316 if err != nil {
317 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
318 return
319 }
320 if _, err := w.Write(bs); err != nil {
321 log.Error(ctx, "error writing response", "error", err)
322 }
323 })
324
325 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
326 id := p.ByName("id")
327 if id == "" {
328 errors.WriteHTTPBadRequest(w, "id required", nil)
329 return
330 }
331 segment, err := a.Model.GetSegment(id)
332 if err != nil {
333 errors.WriteHTTPBadRequest(w, err.Error(), err)
334 return
335 }
336 if segment == nil {
337 errors.WriteHTTPNotFound(w, "segment not found", nil)
338 return
339 }
340 spSeg, err := segment.ToStreamplaceSegment()
341 if err != nil {
342 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
343 return
344 }
345 bs, err := json.Marshal(spSeg)
346 if err != nil {
347 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
348 return
349 }
350 if _, err := w.Write(bs); err != nil {
351 log.Error(ctx, "error writing response", "error", err)
352 }
353 })
354
355 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
356 err := a.Model.ClearPlayerEvents()
357 if err != nil {
358 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
359 return
360 }
361 w.WriteHeader(204)
362 })
363
364 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
365 w.Header().Set("Access-Control-Allow-Origin", "*")
366 w.Header().Set("Access-Control-Allow-Methods", "GET")
367 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
368
369 id := a.Signer.Hex()
370
371 ident, err := a.Model.GetIdentity(id)
372 if err != nil {
373 errors.WriteHTTPInternalServerError(w, "unable to get settings", err)
374 return
375 }
376
377 bs, err := json.Marshal(ident)
378 if err != nil {
379 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
380 return
381 }
382 if _, err := w.Write(bs); err != nil {
383 log.Error(ctx, "error writing response", "error", err)
384 }
385 })
386
387 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
388 user := p.ByName("user")
389 if user == "" {
390 errors.WriteHTTPBadRequest(w, "user required", nil)
391 return
392 }
393
394 followers, err := a.Model.GetUserFollowers(ctx, user)
395 if err != nil {
396 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
397 return
398 }
399 bs, err := json.Marshal(followers)
400 if err != nil {
401 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
402 return
403 }
404 if _, err := w.Write(bs); err != nil {
405 log.Error(ctx, "error writing response", "error", err)
406 }
407 })
408
409 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
410 user := p.ByName("user")
411 if user == "" {
412 errors.WriteHTTPBadRequest(w, "user required", nil)
413 return
414 }
415
416 followers, err := a.Model.GetUserFollowing(ctx, user)
417 if err != nil {
418 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
419 return
420 }
421 bs, err := json.Marshal(followers)
422 if err != nil {
423 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
424 return
425 }
426 if _, err := w.Write(bs); err != nil {
427 log.Error(ctx, "error writing response", "error", err)
428 }
429 })
430
431 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
432 notifications, err := a.Model.ListNotifications()
433 if err != nil {
434 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
435 return
436 }
437 bs, err := json.Marshal(notifications)
438 if err != nil {
439 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
440 return
441 }
442 if _, err := w.Write(bs); err != nil {
443 log.Error(ctx, "error writing response", "error", err)
444 }
445 })
446
447 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
448 posts, err := a.Model.ListFeedPosts()
449 if err != nil {
450 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
451 return
452 }
453 bs, err := json.Marshal(posts)
454 if err != nil {
455 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
456 return
457 }
458 if _, err := w.Write(bs); err != nil {
459 log.Error(ctx, "error writing response", "error", err)
460 }
461 })
462
463 router.GET("/chat/:cid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
464 cid := p.ByName("cid")
465 if cid == "" {
466 errors.WriteHTTPBadRequest(w, "cid required", nil)
467 return
468 }
469 msg, err := a.Model.GetChatMessage(cid)
470 if err != nil {
471 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
472 return
473 }
474 spmsg, err := msg.ToStreamplaceMessageView()
475 if err != nil {
476 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
477 return
478 }
479 bs, err := json.Marshal(spmsg)
480 if err != nil {
481 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
482 return
483 }
484 if _, err := w.Write(bs); err != nil {
485 log.Error(ctx, "error writing response", "error", err)
486 }
487 })
488
489 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
490 sessions, err := a.Model.ListOAuthSessions()
491 if err != nil {
492 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
493 return
494 }
495 bs, err := json.Marshal(sessions)
496 if err != nil {
497 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
498 return
499 }
500 if _, err := w.Write(bs); err != nil {
501 log.Error(ctx, "error writing response", "error", err)
502 }
503 })
504
505 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
506 var payload notificationpkg.NotificationBlast
507 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
508 errors.WriteHTTPBadRequest(w, "invalid request body", err)
509 return
510 }
511 notifications, err := a.Model.ListNotifications()
512 if err != nil {
513 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
514 return
515 }
516 if a.FirebaseNotifier == nil {
517 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
518 return
519 }
520 tokens := []string{}
521 for _, not := range notifications {
522 tokens = append(tokens, not.Token)
523 }
524 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
525 if err != nil {
526 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
527 return
528 }
529 w.WriteHeader(http.StatusNoContent)
530 })
531
532 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
533 w.Header().Set("Access-Control-Allow-Origin", "*")
534 w.Header().Set("Access-Control-Allow-Methods", "PUT")
535 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
536
537 id := p.ByName("id")
538 if id == "" {
539 errors.WriteHTTPBadRequest(w, "id required", nil)
540 return
541 }
542
543 var ident model.Identity
544 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
545 errors.WriteHTTPBadRequest(w, "invalid request body", err)
546 return
547 }
548 ident.ID = id
549
550 if err := a.Model.UpdateIdentity(&ident); err != nil {
551 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
552 return
553 }
554
555 w.WriteHeader(http.StatusNoContent)
556 })
557
558 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
559 key := p.ByName("streamKey")
560 if key == "" {
561 errors.WriteHTTPBadRequest(w, "streamKey required", nil)
562 return
563 }
564 mediaSigner, err := a.MakeMediaSigner(ctx, key)
565 if err != nil {
566 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
567 return
568 }
569 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body)
570 if err != nil {
571 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err)
572 return
573 }
574 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{}))
575 if err != nil {
576 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err)
577 return
578 }
579 w.WriteHeader(200)
580 if _, err := w.Write([]byte(answer.SDP)); err != nil {
581 errors.WriteHTTPInternalServerError(w, "unable to write response", err)
582 log.Error(ctx, "error writing response", "error", err)
583 }
584 })
585
586 handler := sloghttp.Recovery(router)
587 if log.Level(4) {
588 handler = sloghttp.New(slog.Default())(handler)
589 }
590 return handler, nil
591}
592
593func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
594 payload, err := base64.URLEncoding.DecodeString(key)
595 if err != nil {
596 return "", err
597 }
598 signed, err := a.Signer.Verify(payload)
599 if err != nil {
600 return "", err
601 }
602 _, ok := signed.Data().(*v0.StreamKey)
603 if !ok {
604 return "", fmt.Errorf("got signed data but it wasn't a stream key")
605 }
606 return strings.ToLower(signed.Signer()), nil
607}