Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "net/http/pprof"
12 "os"
13 "path/filepath"
14 "regexp"
15 "runtime"
16 rtpprof "runtime/pprof"
17 "strconv"
18 "strings"
19 "time"
20
21 "github.com/juju/ratelimit"
22 "github.com/julienschmidt/httprouter"
23 "github.com/pion/webrtc/v4"
24 "github.com/prometheus/client_golang/prometheus/promhttp"
25 sloghttp "github.com/samber/slog-http"
26 "stream.place/streamplace/pkg/errors"
27 "stream.place/streamplace/pkg/log"
28 "stream.place/streamplace/pkg/media"
29 "stream.place/streamplace/pkg/mist/mistconfig"
30 "stream.place/streamplace/pkg/mist/misttriggers"
31 "stream.place/streamplace/pkg/model"
32 notificationpkg "stream.place/streamplace/pkg/notifications"
33 "stream.place/streamplace/pkg/rtcrec"
34 v0 "stream.place/streamplace/pkg/schema/v0"
35)
36
37func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
38 handler, err := a.InternalHandler(ctx)
39 if err != nil {
40 return err
41 }
42 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
43 s.Addr = a.CLI.HTTPInternalAddr
44 log.Log(ctx, "http server starting", "addr", s.Addr)
45 return s.ListenAndServe()
46 })
47}
48
49// lightweight way to authenticate push requests to ourself
50var mkvRE *regexp.Regexp
51
52func init() {
53 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
54}
55
56func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
57 router := httprouter.New()
58 broker := misttriggers.NewTriggerBroker()
59
60 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
61 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
62 // Extract the last part of the URL path
63 urlPath := payload.URL.Path
64 parts := strings.Split(urlPath, "/")
65 lastPart := ""
66 if len(parts) > 0 {
67 lastPart = parts[len(parts)-1]
68 }
69 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
70 if err != nil {
71 return "", err
72 }
73
74 ms := time.Now().UnixMilli()
75 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms)
76 a.SignerCacheMu.Lock()
77 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
78 a.SignerCacheMu.Unlock()
79 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
80
81 return out, nil
82 })
83 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
84 router.POST("/mist-trigger", triggerCollection.Trigger())
85 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
86
87 // Add pprof handlers
88 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
89 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
90 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
91 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
92 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
93 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
94 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
95 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
96 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
97 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
98 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
99
100 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
101 runtime.GC()
102 w.WriteHeader(204)
103 })
104
105 router.Handler("GET", "/metrics", promhttp.Handler())
106
107 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
108 user := p.ByName("user")
109 if user == "" {
110 errors.WriteHTTPBadRequest(w, "user required", nil)
111 return
112 }
113 rendition := p.ByName("rendition")
114 if rendition == "" {
115 errors.WriteHTTPBadRequest(w, "rendition required", nil)
116 return
117 }
118 user, err := a.NormalizeUser(ctx, user)
119 if err != nil {
120 errors.WriteHTTPBadRequest(w, "invalid user", err)
121 return
122 }
123 w.Header().Set("content-type", "text/plain")
124 fmt.Fprintf(w, "ffconcat version 1.0\n")
125 // intermittent reports that you need two here to make things work properly? shouldn't matter.
126 for i := 0; i < 2; i += 1 {
127 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
128 }
129 })
130
131 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
132 user := p.ByName("user")
133 if user == "" {
134 errors.WriteHTTPBadRequest(w, "user required", nil)
135 return
136 }
137 user, err := a.NormalizeUser(ctx, user)
138 if err != nil {
139 errors.WriteHTTPBadRequest(w, "invalid user", err)
140 return
141 }
142 rendition := p.ByName("rendition")
143 if rendition == "" {
144 errors.WriteHTTPBadRequest(w, "rendition required", nil)
145 return
146 }
147 segChan := a.Bus.SubscribeSegment(ctx, user, rendition)
148 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan)
149 seg := <-segChan.C
150 base := filepath.Base(seg.Filepath)
151 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
152 w.WriteHeader(301)
153 })
154
155 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
156 user := p.ByName("user")
157 if user == "" {
158 errors.WriteHTTPBadRequest(w, "user required", nil)
159 return
160 }
161 user, err := a.NormalizeUser(ctx, user)
162 if err != nil {
163 errors.WriteHTTPBadRequest(w, "invalid user", err)
164 return
165 }
166 file := p.ByName("file")
167 if file == "" {
168 errors.WriteHTTPBadRequest(w, "file required", nil)
169 return
170 }
171 fullpath, err := a.CLI.SegmentFilePath(user, file)
172 if err != nil {
173 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
174 return
175 }
176 http.ServeFile(w, r, fullpath)
177 })
178
179 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
180 user := p.ByName("user")
181 if user == "" {
182 errors.WriteHTTPBadRequest(w, "user required", nil)
183 return
184 }
185 w.Header().Set("Content-Type", "video/x-matroska")
186 w.Header().Set("Transfer-Encoding", "chunked")
187 w.WriteHeader(200)
188 })
189
190 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
191 uu := p.ByName("uuid")
192 if uu == "" {
193 errors.WriteHTTPBadRequest(w, "uuid required", nil)
194 return
195 }
196 pr := a.MediaManager.GetHTTPPipeWriter(uu)
197 if pr == nil {
198 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
199 return
200 }
201 if _, err := io.Copy(pr, r.Body); err != nil {
202 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil)
203 }
204 })
205
206 // self-destruct code, useful for dumping goroutines on windows
207 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
208 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
209 log.Log(ctx, "error writing rtpprof", "error", err)
210 }
211 log.Log(ctx, "got POST /abort, self-destructing")
212 os.Exit(1)
213 })
214
215 handleIncomingStream := func(w http.ResponseWriter, httpReq *http.Request, p httprouter.Params) {
216 var r io.Reader = httpReq.Body
217 key := p.ByName("key")
218 limitStr := httpReq.URL.Query().Get("ratelimit")
219 if limitStr != "" {
220 limit, err := strconv.Atoi(limitStr)
221 if err != nil {
222 errors.WriteHTTPBadRequest(w, "invalid ratelimit", err)
223 return
224 }
225 bucket := ratelimit.NewBucketWithRate(float64(limit), int64(limit)) // 2 Mbps
226 r = ratelimit.Reader(r, bucket)
227 }
228 log.Log(ctx, "stream start")
229
230 var mediaSigner media.MediaSigner
231 var ok bool
232 var err error
233 parts := strings.Split(key, "_")
234
235 if len(parts) == 2 {
236 a.SignerCacheMu.Lock()
237 mediaSigner, ok = a.SignerCache[parts[0]]
238 a.SignerCacheMu.Unlock()
239 if !ok {
240 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
241 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
242 return
243 }
244 } else {
245 mediaSigner, err = a.MakeMediaSigner(ctx, key)
246 if err != nil {
247 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
248 return
249 }
250 }
251
252 ctx = log.WithLogValues(ctx, "streamer", mediaSigner.Streamer())
253
254 err = a.checkBanned(ctx, mediaSigner.Streamer())
255 if err != nil {
256 errors.WriteHTTPUnauthorized(w, err.Error(), err)
257 return
258 }
259
260 err = a.MediaManager.MKVIngest(context.Background(), r, mediaSigner)
261
262 if err != nil {
263 log.Log(ctx, "stream error", "error", err)
264 errors.WriteHTTPInternalServerError(w, "stream error", err)
265 return
266 }
267 log.Log(ctx, "stream success", "url", httpReq.URL.String())
268 }
269
270 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
271 router.POST("/live/:key", handleIncomingStream)
272 router.PUT("/live/:key", handleIncomingStream)
273
274 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
275 id := p.ByName("id")
276 if id == "" {
277 errors.WriteHTTPBadRequest(w, "id required", nil)
278 return
279 }
280 events, err := a.Model.PlayerReport(id)
281 if err != nil {
282 errors.WriteHTTPBadRequest(w, err.Error(), err)
283 return
284 }
285 bs, err := json.Marshal(events)
286 if err != nil {
287 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
288 return
289 }
290 if _, err := w.Write(bs); err != nil {
291 log.Error(ctx, "error writing response", "error", err)
292 }
293 })
294
295 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
296 id := p.ByName("id")
297 if id == "" {
298 errors.WriteHTTPBadRequest(w, "id required", nil)
299 return
300 }
301 segment, err := a.LocalDB.GetSegment(id)
302 if err != nil {
303 errors.WriteHTTPBadRequest(w, err.Error(), err)
304 return
305 }
306 if segment == nil {
307 errors.WriteHTTPNotFound(w, "segment not found", nil)
308 return
309 }
310 spSeg, err := segment.ToStreamplaceSegment()
311 if err != nil {
312 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
313 return
314 }
315 bs, err := json.Marshal(spSeg)
316 if err != nil {
317 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
318 return
319 }
320 if _, err := w.Write(bs); err != nil {
321 log.Error(ctx, "error writing response", "error", err)
322 }
323 })
324
325 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
326 err := a.Model.ClearPlayerEvents()
327 if err != nil {
328 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
329 return
330 }
331 w.WriteHeader(204)
332 })
333
334 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
335 user := p.ByName("user")
336 if user == "" {
337 errors.WriteHTTPBadRequest(w, "user required", nil)
338 return
339 }
340
341 followers, err := a.Model.GetUserFollowers(ctx, user)
342 if err != nil {
343 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
344 return
345 }
346 bs, err := json.Marshal(followers)
347 if err != nil {
348 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
349 return
350 }
351 if _, err := w.Write(bs); err != nil {
352 log.Error(ctx, "error writing response", "error", err)
353 }
354 })
355
356 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
357 user := p.ByName("user")
358 if user == "" {
359 errors.WriteHTTPBadRequest(w, "user required", nil)
360 return
361 }
362
363 followers, err := a.Model.GetUserFollowing(ctx, user)
364 if err != nil {
365 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
366 return
367 }
368 bs, err := json.Marshal(followers)
369 if err != nil {
370 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
371 return
372 }
373 if _, err := w.Write(bs); err != nil {
374 log.Error(ctx, "error writing response", "error", err)
375 }
376 })
377
378 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
379 notifications, err := a.StatefulDB.ListNotifications()
380 if err != nil {
381 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
382 return
383 }
384 bs, err := json.Marshal(notifications)
385 if err != nil {
386 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
387 return
388 }
389 if _, err := w.Write(bs); err != nil {
390 log.Error(ctx, "error writing response", "error", err)
391 }
392 })
393
394 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
395 posts, err := a.Model.ListFeedPosts()
396 if err != nil {
397 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
398 return
399 }
400 bs, err := json.Marshal(posts)
401 if err != nil {
402 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
403 return
404 }
405 if _, err := w.Write(bs); err != nil {
406 log.Error(ctx, "error writing response", "error", err)
407 }
408 })
409
410 router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
411 uri := p.ByName("uri")
412 if uri == "" {
413 errors.WriteHTTPBadRequest(w, "uri required", nil)
414 return
415 }
416 msg, err := a.Model.GetChatMessage(uri)
417 if err != nil {
418 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
419 return
420 }
421 spmsg, err := msg.ToStreamplaceMessageView()
422 if err != nil {
423 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
424 return
425 }
426 bs, err := json.Marshal(spmsg)
427 if err != nil {
428 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
429 return
430 }
431 if _, err := w.Write(bs); err != nil {
432 log.Error(ctx, "error writing response", "error", err)
433 }
434 })
435
436 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
437 sessions, err := a.StatefulDB.ListOAuthSessions()
438 if err != nil {
439 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
440 return
441 }
442 bs, err := json.Marshal(sessions)
443 if err != nil {
444 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
445 return
446 }
447 if _, err := w.Write(bs); err != nil {
448 log.Error(ctx, "error writing response", "error", err)
449 }
450 })
451
452 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
453 var payload notificationpkg.NotificationBlast
454 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
455 errors.WriteHTTPBadRequest(w, "invalid request body", err)
456 return
457 }
458 notifications, err := a.StatefulDB.ListNotifications()
459 if err != nil {
460 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
461 return
462 }
463 if a.FirebaseNotifier == nil {
464 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
465 return
466 }
467 tokens := []string{}
468 for _, not := range notifications {
469 tokens = append(tokens, not.Token)
470 }
471 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
472 if err != nil {
473 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
474 return
475 }
476 w.WriteHeader(http.StatusNoContent)
477 })
478
479 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
480 w.Header().Set("Access-Control-Allow-Origin", "*")
481 w.Header().Set("Access-Control-Allow-Methods", "PUT")
482 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
483
484 id := p.ByName("id")
485 if id == "" {
486 errors.WriteHTTPBadRequest(w, "id required", nil)
487 return
488 }
489
490 var ident model.Identity
491 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
492 errors.WriteHTTPBadRequest(w, "invalid request body", err)
493 return
494 }
495 ident.ID = id
496
497 if err := a.Model.UpdateIdentity(&ident); err != nil {
498 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
499 return
500 }
501
502 w.WriteHeader(http.StatusNoContent)
503 })
504
505 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
506 key := p.ByName("streamKey")
507 if key == "" {
508 errors.WriteHTTPBadRequest(w, "streamKey required", nil)
509 return
510 }
511 mediaSigner, err := a.MakeMediaSigner(ctx, key)
512 if err != nil {
513 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
514 return
515 }
516 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body)
517 if err != nil {
518 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err)
519 return
520 }
521 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan error, 1))
522 if err != nil {
523 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err)
524 return
525 }
526 w.WriteHeader(200)
527 if _, err := w.Write([]byte(answer.SDP)); err != nil {
528 errors.WriteHTTPInternalServerError(w, "unable to write response", err)
529 log.Error(ctx, "error writing response", "error", err)
530 }
531 })
532
533 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
534 did := p.ByName("did")
535 if did == "" {
536 errors.WriteHTTPBadRequest(w, "did required", nil)
537 return
538 }
539 user, err := a.NormalizeUser(ctx, did)
540 if err != nil {
541 errors.WriteHTTPBadRequest(w, "invalid user", err)
542 return
543 }
544 secsStr := r.URL.Query().Get("secs")
545 secs := 60 // Default to 60 seconds
546 if secsStr != "" {
547 parsedSecs, err := strconv.Atoi(secsStr)
548 if err != nil {
549 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err)
550 return
551 }
552 secs = parsedSecs
553 }
554 after := time.Now().Add(-time.Duration(secs) * time.Second)
555 w.Header().Set("Content-Type", "video/mp4")
556 err = media.ClipUser(ctx, a.LocalDB, a.CLI, user, w, nil, &after)
557 if err != nil {
558 errors.WriteHTTPInternalServerError(w, "unable to clip user", err)
559 return
560 }
561 })
562
563 handler := sloghttp.Recovery(router)
564 if log.Level(4) {
565 handler = sloghttp.New(slog.Default())(handler)
566 }
567 return handler, nil
568}
569
570func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
571 payload, err := base64.URLEncoding.DecodeString(key)
572 if err != nil {
573 return "", err
574 }
575 signed, err := a.Signer.Verify(payload)
576 if err != nil {
577 return "", err
578 }
579 _, ok := signed.Data().(*v0.StreamKey)
580 if !ok {
581 return "", fmt.Errorf("got signed data but it wasn't a stream key")
582 }
583 return strings.ToLower(signed.Signer()), nil
584}