Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "net/http/pprof"
12 "os"
13 "path/filepath"
14 "regexp"
15 "runtime"
16 rtpprof "runtime/pprof"
17 "strconv"
18 "strings"
19 "time"
20
21 "github.com/julienschmidt/httprouter"
22 "github.com/pion/webrtc/v4"
23 "github.com/prometheus/client_golang/prometheus/promhttp"
24 sloghttp "github.com/samber/slog-http"
25 "stream.place/streamplace/pkg/errors"
26 "stream.place/streamplace/pkg/log"
27 "stream.place/streamplace/pkg/media"
28 "stream.place/streamplace/pkg/mist/mistconfig"
29 "stream.place/streamplace/pkg/mist/misttriggers"
30 "stream.place/streamplace/pkg/model"
31 notificationpkg "stream.place/streamplace/pkg/notifications"
32 "stream.place/streamplace/pkg/rtcrec"
33 v0 "stream.place/streamplace/pkg/schema/v0"
34)
35
36func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
37 handler, err := a.InternalHandler(ctx)
38 if err != nil {
39 return err
40 }
41 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
42 s.Addr = a.CLI.HTTPInternalAddr
43 log.Log(ctx, "http server starting", "addr", s.Addr)
44 return s.ListenAndServe()
45 })
46}
47
48// lightweight way to authenticate push requests to ourself
49var mkvRE *regexp.Regexp
50
51func init() {
52 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
53}
54
55func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
56 router := httprouter.New()
57 broker := misttriggers.NewTriggerBroker()
58
59 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
60 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
61 // Extract the last part of the URL path
62 urlPath := payload.URL.Path
63 parts := strings.Split(urlPath, "/")
64 lastPart := ""
65 if len(parts) > 0 {
66 lastPart = parts[len(parts)-1]
67 }
68 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
69 if err != nil {
70 return "", err
71 }
72
73 ms := time.Now().UnixMilli()
74 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms)
75 a.SignerCacheMu.Lock()
76 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
77 a.SignerCacheMu.Unlock()
78 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
79
80 return out, nil
81 })
82 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
83 router.POST("/mist-trigger", triggerCollection.Trigger())
84 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
85
86 // Add pprof handlers
87 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
88 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
89 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
90 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
91 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
92 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
93 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
94 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
95 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
96 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
97 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
98
99 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
100 runtime.GC()
101 w.WriteHeader(204)
102 })
103
104 router.Handler("GET", "/metrics", promhttp.Handler())
105
106 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
107 user := p.ByName("user")
108 if user == "" {
109 errors.WriteHTTPBadRequest(w, "user required", nil)
110 return
111 }
112 rendition := p.ByName("rendition")
113 if rendition == "" {
114 errors.WriteHTTPBadRequest(w, "rendition required", nil)
115 return
116 }
117 user, err := a.NormalizeUser(ctx, user)
118 if err != nil {
119 errors.WriteHTTPBadRequest(w, "invalid user", err)
120 return
121 }
122 w.Header().Set("content-type", "text/plain")
123 fmt.Fprintf(w, "ffconcat version 1.0\n")
124 // intermittent reports that you need two here to make things work properly? shouldn't matter.
125 for i := 0; i < 2; i += 1 {
126 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
127 }
128 })
129
130 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
131 user := p.ByName("user")
132 if user == "" {
133 errors.WriteHTTPBadRequest(w, "user required", nil)
134 return
135 }
136 user, err := a.NormalizeUser(ctx, user)
137 if err != nil {
138 errors.WriteHTTPBadRequest(w, "invalid user", err)
139 return
140 }
141 rendition := p.ByName("rendition")
142 if rendition == "" {
143 errors.WriteHTTPBadRequest(w, "rendition required", nil)
144 return
145 }
146 segChan := a.Bus.SubscribeSegment(ctx, user, rendition)
147 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan)
148 seg := <-segChan.C
149 base := filepath.Base(seg.Filepath)
150 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
151 w.WriteHeader(301)
152 })
153
154 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
155 user := p.ByName("user")
156 if user == "" {
157 errors.WriteHTTPBadRequest(w, "user required", nil)
158 return
159 }
160 user, err := a.NormalizeUser(ctx, user)
161 if err != nil {
162 errors.WriteHTTPBadRequest(w, "invalid user", err)
163 return
164 }
165 file := p.ByName("file")
166 if file == "" {
167 errors.WriteHTTPBadRequest(w, "file required", nil)
168 return
169 }
170 fullpath, err := a.CLI.SegmentFilePath(user, file)
171 if err != nil {
172 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
173 return
174 }
175 http.ServeFile(w, r, fullpath)
176 })
177
178 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
179 user := p.ByName("user")
180 if user == "" {
181 errors.WriteHTTPBadRequest(w, "user required", nil)
182 return
183 }
184 w.Header().Set("Content-Type", "video/x-matroska")
185 w.Header().Set("Transfer-Encoding", "chunked")
186 w.WriteHeader(200)
187 })
188
189 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
190 uu := p.ByName("uuid")
191 if uu == "" {
192 errors.WriteHTTPBadRequest(w, "uuid required", nil)
193 return
194 }
195 pr := a.MediaManager.GetHTTPPipeWriter(uu)
196 if pr == nil {
197 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
198 return
199 }
200 if _, err := io.Copy(pr, r.Body); err != nil {
201 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil)
202 }
203 })
204
205 // self-destruct code, useful for dumping goroutines on windows
206 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
207 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
208 log.Log(ctx, "error writing rtpprof", "error", err)
209 }
210 log.Log(ctx, "got POST /abort, self-destructing")
211 os.Exit(1)
212 })
213
214 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
215 key := p.ByName("key")
216 log.Log(ctx, "stream start")
217
218 var mediaSigner media.MediaSigner
219 var ok bool
220 var err error
221 parts := strings.Split(key, "_")
222
223 if len(parts) == 2 {
224 a.SignerCacheMu.Lock()
225 mediaSigner, ok = a.SignerCache[parts[0]]
226 a.SignerCacheMu.Unlock()
227 if !ok {
228 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
229 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
230 return
231 }
232 } else {
233 mediaSigner, err = a.MakeMediaSigner(ctx, key)
234 if err != nil {
235 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
236 return
237 }
238 }
239
240 ctx = log.WithLogValues(ctx, "streamer", mediaSigner.Streamer())
241
242 err = a.checkBanned(ctx, mediaSigner.Streamer())
243 if err != nil {
244 errors.WriteHTTPUnauthorized(w, err.Error(), err)
245 return
246 }
247
248 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner)
249
250 if err != nil {
251 log.Log(ctx, "stream error", "error", err)
252 errors.WriteHTTPInternalServerError(w, "stream error", err)
253 return
254 }
255 log.Log(ctx, "stream success", "url", r.URL.String())
256 }
257
258 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
259 router.POST("/live/:key", handleIncomingStream)
260 router.PUT("/live/:key", handleIncomingStream)
261
262 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
263 id := p.ByName("id")
264 if id == "" {
265 errors.WriteHTTPBadRequest(w, "id required", nil)
266 return
267 }
268 events, err := a.Model.PlayerReport(id)
269 if err != nil {
270 errors.WriteHTTPBadRequest(w, err.Error(), err)
271 return
272 }
273 bs, err := json.Marshal(events)
274 if err != nil {
275 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
276 return
277 }
278 if _, err := w.Write(bs); err != nil {
279 log.Error(ctx, "error writing response", "error", err)
280 }
281 })
282
283 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
284 id := p.ByName("id")
285 if id == "" {
286 errors.WriteHTTPBadRequest(w, "id required", nil)
287 return
288 }
289 segment, err := a.Model.GetSegment(id)
290 if err != nil {
291 errors.WriteHTTPBadRequest(w, err.Error(), err)
292 return
293 }
294 if segment == nil {
295 errors.WriteHTTPNotFound(w, "segment not found", nil)
296 return
297 }
298 spSeg, err := segment.ToStreamplaceSegment()
299 if err != nil {
300 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
301 return
302 }
303 bs, err := json.Marshal(spSeg)
304 if err != nil {
305 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
306 return
307 }
308 if _, err := w.Write(bs); err != nil {
309 log.Error(ctx, "error writing response", "error", err)
310 }
311 })
312
313 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
314 err := a.Model.ClearPlayerEvents()
315 if err != nil {
316 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
317 return
318 }
319 w.WriteHeader(204)
320 })
321
322 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
323 user := p.ByName("user")
324 if user == "" {
325 errors.WriteHTTPBadRequest(w, "user required", nil)
326 return
327 }
328
329 followers, err := a.Model.GetUserFollowers(ctx, user)
330 if err != nil {
331 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
332 return
333 }
334 bs, err := json.Marshal(followers)
335 if err != nil {
336 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
337 return
338 }
339 if _, err := w.Write(bs); err != nil {
340 log.Error(ctx, "error writing response", "error", err)
341 }
342 })
343
344 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
345 user := p.ByName("user")
346 if user == "" {
347 errors.WriteHTTPBadRequest(w, "user required", nil)
348 return
349 }
350
351 followers, err := a.Model.GetUserFollowing(ctx, user)
352 if err != nil {
353 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
354 return
355 }
356 bs, err := json.Marshal(followers)
357 if err != nil {
358 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
359 return
360 }
361 if _, err := w.Write(bs); err != nil {
362 log.Error(ctx, "error writing response", "error", err)
363 }
364 })
365
366 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
367 notifications, err := a.StatefulDB.ListNotifications()
368 if err != nil {
369 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
370 return
371 }
372 bs, err := json.Marshal(notifications)
373 if err != nil {
374 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
375 return
376 }
377 if _, err := w.Write(bs); err != nil {
378 log.Error(ctx, "error writing response", "error", err)
379 }
380 })
381
382 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
383 posts, err := a.Model.ListFeedPosts()
384 if err != nil {
385 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
386 return
387 }
388 bs, err := json.Marshal(posts)
389 if err != nil {
390 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
391 return
392 }
393 if _, err := w.Write(bs); err != nil {
394 log.Error(ctx, "error writing response", "error", err)
395 }
396 })
397
398 router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
399 uri := p.ByName("uri")
400 if uri == "" {
401 errors.WriteHTTPBadRequest(w, "uri required", nil)
402 return
403 }
404 msg, err := a.Model.GetChatMessage(uri)
405 if err != nil {
406 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
407 return
408 }
409 spmsg, err := msg.ToStreamplaceMessageView()
410 if err != nil {
411 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
412 return
413 }
414 bs, err := json.Marshal(spmsg)
415 if err != nil {
416 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
417 return
418 }
419 if _, err := w.Write(bs); err != nil {
420 log.Error(ctx, "error writing response", "error", err)
421 }
422 })
423
424 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
425 sessions, err := a.StatefulDB.ListOAuthSessions()
426 if err != nil {
427 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
428 return
429 }
430 bs, err := json.Marshal(sessions)
431 if err != nil {
432 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
433 return
434 }
435 if _, err := w.Write(bs); err != nil {
436 log.Error(ctx, "error writing response", "error", err)
437 }
438 })
439
440 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
441 var payload notificationpkg.NotificationBlast
442 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
443 errors.WriteHTTPBadRequest(w, "invalid request body", err)
444 return
445 }
446 notifications, err := a.StatefulDB.ListNotifications()
447 if err != nil {
448 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
449 return
450 }
451 if a.FirebaseNotifier == nil {
452 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
453 return
454 }
455 tokens := []string{}
456 for _, not := range notifications {
457 tokens = append(tokens, not.Token)
458 }
459 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
460 if err != nil {
461 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
462 return
463 }
464 w.WriteHeader(http.StatusNoContent)
465 })
466
467 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
468 w.Header().Set("Access-Control-Allow-Origin", "*")
469 w.Header().Set("Access-Control-Allow-Methods", "PUT")
470 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
471
472 id := p.ByName("id")
473 if id == "" {
474 errors.WriteHTTPBadRequest(w, "id required", nil)
475 return
476 }
477
478 var ident model.Identity
479 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
480 errors.WriteHTTPBadRequest(w, "invalid request body", err)
481 return
482 }
483 ident.ID = id
484
485 if err := a.Model.UpdateIdentity(&ident); err != nil {
486 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
487 return
488 }
489
490 w.WriteHeader(http.StatusNoContent)
491 })
492
493 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
494 key := p.ByName("streamKey")
495 if key == "" {
496 errors.WriteHTTPBadRequest(w, "streamKey required", nil)
497 return
498 }
499 mediaSigner, err := a.MakeMediaSigner(ctx, key)
500 if err != nil {
501 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
502 return
503 }
504 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body)
505 if err != nil {
506 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err)
507 return
508 }
509 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan error, 1))
510 if err != nil {
511 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err)
512 return
513 }
514 w.WriteHeader(200)
515 if _, err := w.Write([]byte(answer.SDP)); err != nil {
516 errors.WriteHTTPInternalServerError(w, "unable to write response", err)
517 log.Error(ctx, "error writing response", "error", err)
518 }
519 })
520
521 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
522 did := p.ByName("did")
523 if did == "" {
524 errors.WriteHTTPBadRequest(w, "did required", nil)
525 return
526 }
527 user, err := a.NormalizeUser(ctx, did)
528 if err != nil {
529 errors.WriteHTTPBadRequest(w, "invalid user", err)
530 return
531 }
532 secsStr := r.URL.Query().Get("secs")
533 secs := 60 // Default to 60 seconds
534 if secsStr != "" {
535 parsedSecs, err := strconv.Atoi(secsStr)
536 if err != nil {
537 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err)
538 return
539 }
540 secs = parsedSecs
541 }
542 after := time.Now().Add(-time.Duration(secs) * time.Second)
543 w.Header().Set("Content-Type", "video/mp4")
544 err = media.ClipUser(ctx, a.Model, a.CLI, user, w, nil, &after)
545 if err != nil {
546 errors.WriteHTTPInternalServerError(w, "unable to clip user", err)
547 return
548 }
549 })
550
551 handler := sloghttp.Recovery(router)
552 if log.Level(4) {
553 handler = sloghttp.New(slog.Default())(handler)
554 }
555 return handler, nil
556}
557
558func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
559 payload, err := base64.URLEncoding.DecodeString(key)
560 if err != nil {
561 return "", err
562 }
563 signed, err := a.Signer.Verify(payload)
564 if err != nil {
565 return "", err
566 }
567 _, ok := signed.Data().(*v0.StreamKey)
568 if !ok {
569 return "", fmt.Errorf("got signed data but it wasn't a stream key")
570 }
571 return strings.ToLower(signed.Signer()), nil
572}