Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "net/http/pprof"
12 "os"
13 "path/filepath"
14 "regexp"
15 "runtime"
16 rtpprof "runtime/pprof"
17 "strconv"
18 "strings"
19 "time"
20
21 "github.com/julienschmidt/httprouter"
22 "github.com/pion/webrtc/v4"
23 "github.com/prometheus/client_golang/prometheus/promhttp"
24 sloghttp "github.com/samber/slog-http"
25 "stream.place/streamplace/pkg/errors"
26 "stream.place/streamplace/pkg/log"
27 "stream.place/streamplace/pkg/media"
28 "stream.place/streamplace/pkg/mist/mistconfig"
29 "stream.place/streamplace/pkg/mist/misttriggers"
30 "stream.place/streamplace/pkg/model"
31 notificationpkg "stream.place/streamplace/pkg/notifications"
32 "stream.place/streamplace/pkg/rtcrec"
33 v0 "stream.place/streamplace/pkg/schema/v0"
34)
35
36func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
37 handler, err := a.InternalHandler(ctx)
38 if err != nil {
39 return err
40 }
41 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
42 s.Addr = a.CLI.HTTPInternalAddr
43 log.Log(ctx, "http server starting", "addr", s.Addr)
44 return s.ListenAndServe()
45 })
46}
47
48// lightweight way to authenticate push requests to ourself
49var mkvRE *regexp.Regexp
50
51func init() {
52 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
53}
54
55func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
56 router := httprouter.New()
57 broker := misttriggers.NewTriggerBroker()
58
59 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
60 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
61 // Extract the last part of the URL path
62 urlPath := payload.URL.Path
63 parts := strings.Split(urlPath, "/")
64 lastPart := ""
65 if len(parts) > 0 {
66 lastPart = parts[len(parts)-1]
67 }
68 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
69 if err != nil {
70 return "", err
71 }
72
73 ms := time.Now().UnixMilli()
74 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms)
75 a.SignerCacheMu.Lock()
76 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
77 a.SignerCacheMu.Unlock()
78 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
79
80 return out, nil
81 })
82 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
83 router.POST("/mist-trigger", triggerCollection.Trigger())
84 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
85
86 // Add pprof handlers
87 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
88 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
89 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
90 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
91 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
92 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
93 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
94 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
95 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
96 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
97 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
98
99 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
100 runtime.GC()
101 w.WriteHeader(204)
102 })
103
104 router.Handler("GET", "/metrics", promhttp.Handler())
105
106 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
107 user := p.ByName("user")
108 if user == "" {
109 errors.WriteHTTPBadRequest(w, "user required", nil)
110 return
111 }
112 rendition := p.ByName("rendition")
113 if rendition == "" {
114 errors.WriteHTTPBadRequest(w, "rendition required", nil)
115 return
116 }
117 user, err := a.NormalizeUser(ctx, user)
118 if err != nil {
119 errors.WriteHTTPBadRequest(w, "invalid user", err)
120 return
121 }
122 w.Header().Set("content-type", "text/plain")
123 fmt.Fprintf(w, "ffconcat version 1.0\n")
124 // intermittent reports that you need two here to make things work properly? shouldn't matter.
125 for i := 0; i < 2; i += 1 {
126 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
127 }
128 })
129
130 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
131 user := p.ByName("user")
132 if user == "" {
133 errors.WriteHTTPBadRequest(w, "user required", nil)
134 return
135 }
136 user, err := a.NormalizeUser(ctx, user)
137 if err != nil {
138 errors.WriteHTTPBadRequest(w, "invalid user", err)
139 return
140 }
141 rendition := p.ByName("rendition")
142 if rendition == "" {
143 errors.WriteHTTPBadRequest(w, "rendition required", nil)
144 return
145 }
146 segChan := a.Bus.SubscribeSegment(ctx, user, rendition)
147 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan)
148 seg := <-segChan.C
149 base := filepath.Base(seg.Filepath)
150 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
151 w.WriteHeader(301)
152 })
153
154 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
155 user := p.ByName("user")
156 if user == "" {
157 errors.WriteHTTPBadRequest(w, "user required", nil)
158 return
159 }
160 user, err := a.NormalizeUser(ctx, user)
161 if err != nil {
162 errors.WriteHTTPBadRequest(w, "invalid user", err)
163 return
164 }
165 file := p.ByName("file")
166 if file == "" {
167 errors.WriteHTTPBadRequest(w, "file required", nil)
168 return
169 }
170 fullpath, err := a.CLI.SegmentFilePath(user, file)
171 if err != nil {
172 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
173 return
174 }
175 http.ServeFile(w, r, fullpath)
176 })
177
178 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
179 user := p.ByName("user")
180 if user == "" {
181 errors.WriteHTTPBadRequest(w, "user required", nil)
182 return
183 }
184 w.Header().Set("Content-Type", "video/x-matroska")
185 w.Header().Set("Transfer-Encoding", "chunked")
186 w.WriteHeader(200)
187 })
188
189 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
190 uu := p.ByName("uuid")
191 if uu == "" {
192 errors.WriteHTTPBadRequest(w, "uuid required", nil)
193 return
194 }
195 pr := a.MediaManager.GetHTTPPipeWriter(uu)
196 if pr == nil {
197 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
198 return
199 }
200 if _, err := io.Copy(pr, r.Body); err != nil {
201 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil)
202 }
203 })
204
205 // self-destruct code, useful for dumping goroutines on windows
206 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
207 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
208 log.Log(ctx, "error writing rtpprof", "error", err)
209 }
210 log.Log(ctx, "got POST /abort, self-destructing")
211 os.Exit(1)
212 })
213
214 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
215 key := p.ByName("key")
216 log.Log(ctx, "stream start")
217
218 var mediaSigner media.MediaSigner
219 var ok bool
220 var err error
221 parts := strings.Split(key, "_")
222
223 if len(parts) == 2 {
224 a.SignerCacheMu.Lock()
225 mediaSigner, ok = a.SignerCache[parts[0]]
226 a.SignerCacheMu.Unlock()
227 if !ok {
228 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
229 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
230 return
231 }
232 } else {
233 mediaSigner, err = a.MakeMediaSigner(ctx, key)
234 if err != nil {
235 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
236 return
237 }
238 }
239
240 ctx = log.WithLogValues(ctx, "streamer", mediaSigner.Streamer())
241
242 err = a.checkBanned(ctx, mediaSigner.Streamer())
243 if err != nil {
244 errors.WriteHTTPUnauthorized(w, err.Error(), err)
245 return
246 }
247
248 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner)
249
250 if err != nil {
251 log.Log(ctx, "stream error", "error", err)
252 errors.WriteHTTPInternalServerError(w, "stream error", err)
253 return
254 }
255 log.Log(ctx, "stream success", "url", r.URL.String())
256 }
257
258 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
259 router.POST("/live/:key", handleIncomingStream)
260 router.PUT("/live/:key", handleIncomingStream)
261
262 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
263 id := p.ByName("id")
264 if id == "" {
265 errors.WriteHTTPBadRequest(w, "id required", nil)
266 return
267 }
268 events, err := a.Model.PlayerReport(id)
269 if err != nil {
270 errors.WriteHTTPBadRequest(w, err.Error(), err)
271 return
272 }
273 bs, err := json.Marshal(events)
274 if err != nil {
275 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
276 return
277 }
278 if _, err := w.Write(bs); err != nil {
279 log.Error(ctx, "error writing response", "error", err)
280 }
281 })
282
283 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
284 id := p.ByName("id")
285 if id == "" {
286 errors.WriteHTTPBadRequest(w, "id required", nil)
287 return
288 }
289 segment, err := a.Model.GetSegment(id)
290 if err != nil {
291 errors.WriteHTTPBadRequest(w, err.Error(), err)
292 return
293 }
294 if segment == nil {
295 errors.WriteHTTPNotFound(w, "segment not found", nil)
296 return
297 }
298 spSeg, err := segment.ToStreamplaceSegment()
299 if err != nil {
300 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
301 return
302 }
303 bs, err := json.Marshal(spSeg)
304 if err != nil {
305 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
306 return
307 }
308 if _, err := w.Write(bs); err != nil {
309 log.Error(ctx, "error writing response", "error", err)
310 }
311 })
312
313 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
314 err := a.Model.ClearPlayerEvents()
315 if err != nil {
316 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
317 return
318 }
319 w.WriteHeader(204)
320 })
321
322 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
323 w.Header().Set("Access-Control-Allow-Origin", "*")
324 w.Header().Set("Access-Control-Allow-Methods", "GET")
325 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
326
327 id := a.Signer.Hex()
328
329 ident, err := a.Model.GetIdentity(id)
330 if err != nil {
331 errors.WriteHTTPInternalServerError(w, "unable to get settings", err)
332 return
333 }
334
335 bs, err := json.Marshal(ident)
336 if err != nil {
337 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
338 return
339 }
340 if _, err := w.Write(bs); err != nil {
341 log.Error(ctx, "error writing response", "error", err)
342 }
343 })
344
345 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
346 user := p.ByName("user")
347 if user == "" {
348 errors.WriteHTTPBadRequest(w, "user required", nil)
349 return
350 }
351
352 followers, err := a.Model.GetUserFollowers(ctx, user)
353 if err != nil {
354 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
355 return
356 }
357 bs, err := json.Marshal(followers)
358 if err != nil {
359 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
360 return
361 }
362 if _, err := w.Write(bs); err != nil {
363 log.Error(ctx, "error writing response", "error", err)
364 }
365 })
366
367 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
368 user := p.ByName("user")
369 if user == "" {
370 errors.WriteHTTPBadRequest(w, "user required", nil)
371 return
372 }
373
374 followers, err := a.Model.GetUserFollowing(ctx, user)
375 if err != nil {
376 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
377 return
378 }
379 bs, err := json.Marshal(followers)
380 if err != nil {
381 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
382 return
383 }
384 if _, err := w.Write(bs); err != nil {
385 log.Error(ctx, "error writing response", "error", err)
386 }
387 })
388
389 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
390 notifications, err := a.StatefulDB.ListNotifications()
391 if err != nil {
392 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
393 return
394 }
395 bs, err := json.Marshal(notifications)
396 if err != nil {
397 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
398 return
399 }
400 if _, err := w.Write(bs); err != nil {
401 log.Error(ctx, "error writing response", "error", err)
402 }
403 })
404
405 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
406 posts, err := a.Model.ListFeedPosts()
407 if err != nil {
408 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
409 return
410 }
411 bs, err := json.Marshal(posts)
412 if err != nil {
413 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
414 return
415 }
416 if _, err := w.Write(bs); err != nil {
417 log.Error(ctx, "error writing response", "error", err)
418 }
419 })
420
421 router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
422 uri := p.ByName("uri")
423 if uri == "" {
424 errors.WriteHTTPBadRequest(w, "uri required", nil)
425 return
426 }
427 msg, err := a.Model.GetChatMessage(uri)
428 if err != nil {
429 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
430 return
431 }
432 spmsg, err := msg.ToStreamplaceMessageView()
433 if err != nil {
434 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
435 return
436 }
437 bs, err := json.Marshal(spmsg)
438 if err != nil {
439 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
440 return
441 }
442 if _, err := w.Write(bs); err != nil {
443 log.Error(ctx, "error writing response", "error", err)
444 }
445 })
446
447 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
448 sessions, err := a.StatefulDB.ListOAuthSessions()
449 if err != nil {
450 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
451 return
452 }
453 bs, err := json.Marshal(sessions)
454 if err != nil {
455 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
456 return
457 }
458 if _, err := w.Write(bs); err != nil {
459 log.Error(ctx, "error writing response", "error", err)
460 }
461 })
462
463 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
464 var payload notificationpkg.NotificationBlast
465 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
466 errors.WriteHTTPBadRequest(w, "invalid request body", err)
467 return
468 }
469 notifications, err := a.StatefulDB.ListNotifications()
470 if err != nil {
471 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
472 return
473 }
474 if a.FirebaseNotifier == nil {
475 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
476 return
477 }
478 tokens := []string{}
479 for _, not := range notifications {
480 tokens = append(tokens, not.Token)
481 }
482 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
483 if err != nil {
484 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
485 return
486 }
487 w.WriteHeader(http.StatusNoContent)
488 })
489
490 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
491 w.Header().Set("Access-Control-Allow-Origin", "*")
492 w.Header().Set("Access-Control-Allow-Methods", "PUT")
493 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
494
495 id := p.ByName("id")
496 if id == "" {
497 errors.WriteHTTPBadRequest(w, "id required", nil)
498 return
499 }
500
501 var ident model.Identity
502 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
503 errors.WriteHTTPBadRequest(w, "invalid request body", err)
504 return
505 }
506 ident.ID = id
507
508 if err := a.Model.UpdateIdentity(&ident); err != nil {
509 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
510 return
511 }
512
513 w.WriteHeader(http.StatusNoContent)
514 })
515
516 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
517 key := p.ByName("streamKey")
518 if key == "" {
519 errors.WriteHTTPBadRequest(w, "streamKey required", nil)
520 return
521 }
522 mediaSigner, err := a.MakeMediaSigner(ctx, key)
523 if err != nil {
524 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
525 return
526 }
527 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body)
528 if err != nil {
529 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err)
530 return
531 }
532 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{}))
533 if err != nil {
534 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err)
535 return
536 }
537 w.WriteHeader(200)
538 if _, err := w.Write([]byte(answer.SDP)); err != nil {
539 errors.WriteHTTPInternalServerError(w, "unable to write response", err)
540 log.Error(ctx, "error writing response", "error", err)
541 }
542 })
543
544 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
545 did := p.ByName("did")
546 if did == "" {
547 errors.WriteHTTPBadRequest(w, "did required", nil)
548 return
549 }
550 user, err := a.NormalizeUser(ctx, did)
551 if err != nil {
552 errors.WriteHTTPBadRequest(w, "invalid user", err)
553 return
554 }
555 secsStr := r.URL.Query().Get("secs")
556 secs := 60 // Default to 60 seconds
557 if secsStr != "" {
558 parsedSecs, err := strconv.Atoi(secsStr)
559 if err != nil {
560 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err)
561 return
562 }
563 secs = parsedSecs
564 }
565 after := time.Now().Add(-time.Duration(secs) * time.Second)
566 w.Header().Set("Content-Type", "video/mp4")
567 err = media.ClipUser(ctx, a.Model, a.CLI, user, w, nil, &after)
568 if err != nil {
569 errors.WriteHTTPInternalServerError(w, "unable to clip user", err)
570 return
571 }
572 })
573
574 handler := sloghttp.Recovery(router)
575 if log.Level(4) {
576 handler = sloghttp.New(slog.Default())(handler)
577 }
578 return handler, nil
579}
580
581func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
582 payload, err := base64.URLEncoding.DecodeString(key)
583 if err != nil {
584 return "", err
585 }
586 signed, err := a.Signer.Verify(payload)
587 if err != nil {
588 return "", err
589 }
590 _, ok := signed.Data().(*v0.StreamKey)
591 if !ok {
592 return "", fmt.Errorf("got signed data but it wasn't a stream key")
593 }
594 return strings.ToLower(signed.Signer()), nil
595}