Live video on the AT Protocol
1package api
2
3import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "net/http/pprof"
12 "os"
13 "path/filepath"
14 "regexp"
15 "runtime"
16 rtpprof "runtime/pprof"
17 "strconv"
18 "strings"
19 "time"
20
21 "github.com/julienschmidt/httprouter"
22 "github.com/pion/webrtc/v4"
23 "github.com/prometheus/client_golang/prometheus/promhttp"
24 sloghttp "github.com/samber/slog-http"
25 "stream.place/streamplace/pkg/errors"
26 "stream.place/streamplace/pkg/log"
27 "stream.place/streamplace/pkg/media"
28 "stream.place/streamplace/pkg/mist/mistconfig"
29 "stream.place/streamplace/pkg/mist/misttriggers"
30 "stream.place/streamplace/pkg/model"
31 notificationpkg "stream.place/streamplace/pkg/notifications"
32 "stream.place/streamplace/pkg/rtcrec"
33 v0 "stream.place/streamplace/pkg/schema/v0"
34)
35
36func (a *StreamplaceAPI) ServeInternalHTTP(ctx context.Context) error {
37 handler, err := a.InternalHandler(ctx)
38 if err != nil {
39 return err
40 }
41 return a.ServerWithShutdown(ctx, handler, func(s *http.Server) error {
42 s.Addr = a.CLI.HTTPInternalAddr
43 log.Log(ctx, "http server starting", "addr", s.Addr)
44 return s.ListenAndServe()
45 })
46}
47
48// lightweight way to authenticate push requests to ourself
49var mkvRE *regexp.Regexp
50
51func init() {
52 mkvRE = regexp.MustCompile(`^\d+\.mkv$`)
53}
54
55func (a *StreamplaceAPI) InternalHandler(ctx context.Context) (http.Handler, error) {
56 router := httprouter.New()
57 broker := misttriggers.NewTriggerBroker()
58
59 broker.OnPushRewrite(func(ctx context.Context, payload *misttriggers.PushRewritePayload) (string, error) {
60 log.Log(ctx, "got push out start", "streamName", payload.StreamName, "url", payload.URL.String())
61 // Extract the last part of the URL path
62 urlPath := payload.URL.Path
63 parts := strings.Split(urlPath, "/")
64 lastPart := ""
65 if len(parts) > 0 {
66 lastPart = parts[len(parts)-1]
67 }
68 mediaSigner, err := a.MakeMediaSigner(ctx, lastPart)
69 if err != nil {
70 return "", err
71 }
72
73 ms := time.Now().UnixMilli()
74 out := fmt.Sprintf("%s+%s_%d", mistconfig.StreamName, mediaSigner.Streamer(), ms)
75 a.SignerCacheMu.Lock()
76 a.SignerCache[mediaSigner.Streamer()] = mediaSigner
77 a.SignerCacheMu.Unlock()
78 log.Log(ctx, "added key to cache", "mist-stream", out, "streamer", mediaSigner.Streamer())
79
80 return out, nil
81 })
82 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker)
83 router.POST("/mist-trigger", triggerCollection.Trigger())
84 router.HandlerFunc("GET", "/healthz", a.HandleHealthz(ctx))
85
86 // Add pprof handlers
87 router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
88 router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
89 router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
90 router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
91 router.HandlerFunc("GET", "/debug/pprof/trace", pprof.Trace)
92 router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
93 router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
94 router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
95 router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
96 router.Handler("GET", "/debug/pprof/allocs", pprof.Handler("allocs"))
97 router.Handler("GET", "/debug/pprof/mutex", pprof.Handler("mutex"))
98
99 router.POST("/gc", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
100 runtime.GC()
101 w.WriteHeader(204)
102 })
103
104 router.Handler("GET", "/metrics", promhttp.Handler())
105
106 router.GET("/playback/:user/:rendition/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
107 user := p.ByName("user")
108 if user == "" {
109 errors.WriteHTTPBadRequest(w, "user required", nil)
110 return
111 }
112 rendition := p.ByName("rendition")
113 if rendition == "" {
114 errors.WriteHTTPBadRequest(w, "rendition required", nil)
115 return
116 }
117 user, err := a.NormalizeUser(ctx, user)
118 if err != nil {
119 errors.WriteHTTPBadRequest(w, "invalid user", err)
120 return
121 }
122 w.Header().Set("content-type", "text/plain")
123 fmt.Fprintf(w, "ffconcat version 1.0\n")
124 // intermittent reports that you need two here to make things work properly? shouldn't matter.
125 for i := 0; i < 2; i += 1 {
126 fmt.Fprintf(w, "file '%s/playback/%s/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user, rendition)
127 }
128 })
129
130 router.GET("/playback/:user/:rendition/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
131 user := p.ByName("user")
132 if user == "" {
133 errors.WriteHTTPBadRequest(w, "user required", nil)
134 return
135 }
136 user, err := a.NormalizeUser(ctx, user)
137 if err != nil {
138 errors.WriteHTTPBadRequest(w, "invalid user", err)
139 return
140 }
141 rendition := p.ByName("rendition")
142 if rendition == "" {
143 errors.WriteHTTPBadRequest(w, "rendition required", nil)
144 return
145 }
146 segChan := a.Bus.SubscribeSegment(ctx, user, rendition)
147 defer a.Bus.UnsubscribeSegment(ctx, user, rendition, segChan)
148 seg := <-segChan.C
149 base := filepath.Base(seg.Filepath)
150 w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, rendition, base))
151 w.WriteHeader(301)
152 })
153
154 router.GET("/playback/:user/:rendition/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
155 user := p.ByName("user")
156 if user == "" {
157 errors.WriteHTTPBadRequest(w, "user required", nil)
158 return
159 }
160 user, err := a.NormalizeUser(ctx, user)
161 if err != nil {
162 errors.WriteHTTPBadRequest(w, "invalid user", err)
163 return
164 }
165 file := p.ByName("file")
166 if file == "" {
167 errors.WriteHTTPBadRequest(w, "file required", nil)
168 return
169 }
170 fullpath, err := a.CLI.SegmentFilePath(user, file)
171 if err != nil {
172 errors.WriteHTTPBadRequest(w, "badly formatted request", err)
173 return
174 }
175 http.ServeFile(w, r, fullpath)
176 })
177
178 router.HEAD("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
179 user := p.ByName("user")
180 if user == "" {
181 errors.WriteHTTPBadRequest(w, "user required", nil)
182 return
183 }
184 w.Header().Set("Content-Type", "video/x-matroska")
185 w.Header().Set("Transfer-Encoding", "chunked")
186 w.WriteHeader(200)
187 })
188
189 router.POST("/http-pipe/:uuid", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
190 uu := p.ByName("uuid")
191 if uu == "" {
192 errors.WriteHTTPBadRequest(w, "uuid required", nil)
193 return
194 }
195 pr := a.MediaManager.GetHTTPPipeWriter(uu)
196 if pr == nil {
197 errors.WriteHTTPNotFound(w, "http-pipe not found", nil)
198 return
199 }
200 if _, err := io.Copy(pr, r.Body); err != nil {
201 errors.WriteHTTPInternalServerError(w, "failed to copy response", nil)
202 }
203 })
204
205 // self-destruct code, useful for dumping goroutines on windows
206 router.POST("/abort", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
207 if err := rtpprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
208 log.Log(ctx, "error writing rtpprof", "error", err)
209 }
210 log.Log(ctx, "got POST /abort, self-destructing")
211 os.Exit(1)
212 })
213
214 handleIncomingStream := func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
215 key := p.ByName("key")
216 log.Log(ctx, "stream start")
217
218 var mediaSigner media.MediaSigner
219 var ok bool
220 var err error
221 parts := strings.Split(key, "_")
222
223 if len(parts) == 2 {
224 a.SignerCacheMu.Lock()
225 mediaSigner, ok = a.SignerCache[parts[0]]
226 a.SignerCacheMu.Unlock()
227 if !ok {
228 log.Error(ctx, "couldn't find key in cache", "part", parts[0], "key", key)
229 errors.WriteHTTPUnauthorized(w, "invalid authorization key", nil)
230 return
231 }
232 } else {
233 mediaSigner, err = a.MakeMediaSigner(ctx, key)
234 if err != nil {
235 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
236 return
237 }
238 }
239
240 err = a.MediaManager.MKVIngest(ctx, r.Body, mediaSigner)
241
242 if err != nil {
243 log.Log(ctx, "stream error", "error", err)
244 errors.WriteHTTPInternalServerError(w, "stream error", err)
245 return
246 }
247 log.Log(ctx, "stream success", "url", r.URL.String())
248 }
249
250 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
251 router.POST("/live/:key", handleIncomingStream)
252 router.PUT("/live/:key", handleIncomingStream)
253
254 router.GET("/player-report/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
255 id := p.ByName("id")
256 if id == "" {
257 errors.WriteHTTPBadRequest(w, "id required", nil)
258 return
259 }
260 events, err := a.Model.PlayerReport(id)
261 if err != nil {
262 errors.WriteHTTPBadRequest(w, err.Error(), err)
263 return
264 }
265 bs, err := json.Marshal(events)
266 if err != nil {
267 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
268 return
269 }
270 if _, err := w.Write(bs); err != nil {
271 log.Error(ctx, "error writing response", "error", err)
272 }
273 })
274
275 router.GET("/segment/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
276 id := p.ByName("id")
277 if id == "" {
278 errors.WriteHTTPBadRequest(w, "id required", nil)
279 return
280 }
281 segment, err := a.Model.GetSegment(id)
282 if err != nil {
283 errors.WriteHTTPBadRequest(w, err.Error(), err)
284 return
285 }
286 if segment == nil {
287 errors.WriteHTTPNotFound(w, "segment not found", nil)
288 return
289 }
290 spSeg, err := segment.ToStreamplaceSegment()
291 if err != nil {
292 errors.WriteHTTPInternalServerError(w, "unable to convert segment to streamplace segment", err)
293 return
294 }
295 bs, err := json.Marshal(spSeg)
296 if err != nil {
297 errors.WriteHTTPInternalServerError(w, "unable to marhsal json", err)
298 return
299 }
300 if _, err := w.Write(bs); err != nil {
301 log.Error(ctx, "error writing response", "error", err)
302 }
303 })
304
305 router.DELETE("/player-events", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
306 err := a.Model.ClearPlayerEvents()
307 if err != nil {
308 errors.WriteHTTPInternalServerError(w, "unable to delete player events", err)
309 return
310 }
311 w.WriteHeader(204)
312 })
313
314 router.GET("/settings", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
315 w.Header().Set("Access-Control-Allow-Origin", "*")
316 w.Header().Set("Access-Control-Allow-Methods", "GET")
317 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
318
319 id := a.Signer.Hex()
320
321 ident, err := a.Model.GetIdentity(id)
322 if err != nil {
323 errors.WriteHTTPInternalServerError(w, "unable to get settings", err)
324 return
325 }
326
327 bs, err := json.Marshal(ident)
328 if err != nil {
329 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
330 return
331 }
332 if _, err := w.Write(bs); err != nil {
333 log.Error(ctx, "error writing response", "error", err)
334 }
335 })
336
337 router.GET("/followers/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
338 user := p.ByName("user")
339 if user == "" {
340 errors.WriteHTTPBadRequest(w, "user required", nil)
341 return
342 }
343
344 followers, err := a.Model.GetUserFollowers(ctx, user)
345 if err != nil {
346 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
347 return
348 }
349 bs, err := json.Marshal(followers)
350 if err != nil {
351 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
352 return
353 }
354 if _, err := w.Write(bs); err != nil {
355 log.Error(ctx, "error writing response", "error", err)
356 }
357 })
358
359 router.GET("/following/:user", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
360 user := p.ByName("user")
361 if user == "" {
362 errors.WriteHTTPBadRequest(w, "user required", nil)
363 return
364 }
365
366 followers, err := a.Model.GetUserFollowing(ctx, user)
367 if err != nil {
368 errors.WriteHTTPInternalServerError(w, "unable to get followers", err)
369 return
370 }
371 bs, err := json.Marshal(followers)
372 if err != nil {
373 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
374 return
375 }
376 if _, err := w.Write(bs); err != nil {
377 log.Error(ctx, "error writing response", "error", err)
378 }
379 })
380
381 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
382 notifications, err := a.StatefulDB.ListNotifications()
383 if err != nil {
384 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
385 return
386 }
387 bs, err := json.Marshal(notifications)
388 if err != nil {
389 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
390 return
391 }
392 if _, err := w.Write(bs); err != nil {
393 log.Error(ctx, "error writing response", "error", err)
394 }
395 })
396
397 router.GET("/chat-posts", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
398 posts, err := a.Model.ListFeedPosts()
399 if err != nil {
400 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
401 return
402 }
403 bs, err := json.Marshal(posts)
404 if err != nil {
405 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
406 return
407 }
408 if _, err := w.Write(bs); err != nil {
409 log.Error(ctx, "error writing response", "error", err)
410 }
411 })
412
413 router.GET("/chat/:uri", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
414 uri := p.ByName("uri")
415 if uri == "" {
416 errors.WriteHTTPBadRequest(w, "uri required", nil)
417 return
418 }
419 msg, err := a.Model.GetChatMessage(uri)
420 if err != nil {
421 errors.WriteHTTPInternalServerError(w, "unable to get chat posts", err)
422 return
423 }
424 spmsg, err := msg.ToStreamplaceMessageView()
425 if err != nil {
426 errors.WriteHTTPInternalServerError(w, "unable to convert chat message to streamplace message view", err)
427 return
428 }
429 bs, err := json.Marshal(spmsg)
430 if err != nil {
431 errors.WriteHTTPInternalServerError(w, "unable to marshal json", err)
432 return
433 }
434 if _, err := w.Write(bs); err != nil {
435 log.Error(ctx, "error writing response", "error", err)
436 }
437 })
438
439 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
440 sessions, err := a.StatefulDB.ListOAuthSessions()
441 if err != nil {
442 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err)
443 return
444 }
445 bs, err := json.Marshal(sessions)
446 if err != nil {
447 errors.WriteHTTPInternalServerError(w, "unable to marshal oauth sessions", err)
448 return
449 }
450 if _, err := w.Write(bs); err != nil {
451 log.Error(ctx, "error writing response", "error", err)
452 }
453 })
454
455 router.POST("/notification-blast", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
456 var payload notificationpkg.NotificationBlast
457 if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
458 errors.WriteHTTPBadRequest(w, "invalid request body", err)
459 return
460 }
461 notifications, err := a.StatefulDB.ListNotifications()
462 if err != nil {
463 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err)
464 return
465 }
466 if a.FirebaseNotifier == nil {
467 errors.WriteHTTPInternalServerError(w, "firebase notifier not initialized", nil)
468 return
469 }
470 tokens := []string{}
471 for _, not := range notifications {
472 tokens = append(tokens, not.Token)
473 }
474 err = a.FirebaseNotifier.Blast(ctx, tokens, &payload)
475 if err != nil {
476 errors.WriteHTTPInternalServerError(w, "unable to blast notifications", err)
477 return
478 }
479 w.WriteHeader(http.StatusNoContent)
480 })
481
482 router.PUT("/settings/:id", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
483 w.Header().Set("Access-Control-Allow-Origin", "*")
484 w.Header().Set("Access-Control-Allow-Methods", "PUT")
485 w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
486
487 id := p.ByName("id")
488 if id == "" {
489 errors.WriteHTTPBadRequest(w, "id required", nil)
490 return
491 }
492
493 var ident model.Identity
494 if err := json.NewDecoder(r.Body).Decode(&ident); err != nil {
495 errors.WriteHTTPBadRequest(w, "invalid request body", err)
496 return
497 }
498 ident.ID = id
499
500 if err := a.Model.UpdateIdentity(&ident); err != nil {
501 errors.WriteHTTPInternalServerError(w, "unable to update settings", err)
502 return
503 }
504
505 w.WriteHeader(http.StatusNoContent)
506 })
507
508 router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
509 key := p.ByName("streamKey")
510 if key == "" {
511 errors.WriteHTTPBadRequest(w, "streamKey required", nil)
512 return
513 }
514 mediaSigner, err := a.MakeMediaSigner(ctx, key)
515 if err != nil {
516 errors.WriteHTTPUnauthorized(w, "invalid authorization key", err)
517 return
518 }
519 pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body)
520 if err != nil {
521 errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err)
522 return
523 }
524 answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc, make(chan struct{}))
525 if err != nil {
526 errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err)
527 return
528 }
529 w.WriteHeader(200)
530 if _, err := w.Write([]byte(answer.SDP)); err != nil {
531 errors.WriteHTTPInternalServerError(w, "unable to write response", err)
532 log.Error(ctx, "error writing response", "error", err)
533 }
534 })
535
536 router.GET("/clip/:did/clip.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
537 did := p.ByName("did")
538 if did == "" {
539 errors.WriteHTTPBadRequest(w, "did required", nil)
540 return
541 }
542 user, err := a.NormalizeUser(ctx, did)
543 if err != nil {
544 errors.WriteHTTPBadRequest(w, "invalid user", err)
545 return
546 }
547 secsStr := r.URL.Query().Get("secs")
548 secs := 60 // Default to 60 seconds
549 if secsStr != "" {
550 parsedSecs, err := strconv.Atoi(secsStr)
551 if err != nil {
552 errors.WriteHTTPBadRequest(w, "invalid secs parameter", err)
553 return
554 }
555 secs = parsedSecs
556 }
557 after := time.Now().Add(-time.Duration(secs) * time.Second)
558 w.Header().Set("Content-Type", "video/mp4")
559 err = media.ClipUser(ctx, a.Model, a.CLI, user, w, nil, &after)
560 if err != nil {
561 errors.WriteHTTPInternalServerError(w, "unable to clip user", err)
562 return
563 }
564 })
565
566 handler := sloghttp.Recovery(router)
567 if log.Level(4) {
568 handler = sloghttp.New(slog.Default())(handler)
569 }
570 return handler, nil
571}
572
573func (a *StreamplaceAPI) keyToUser(ctx context.Context, key string) (string, error) {
574 payload, err := base64.URLEncoding.DecodeString(key)
575 if err != nil {
576 return "", err
577 }
578 signed, err := a.Signer.Verify(payload)
579 if err != nil {
580 return "", err
581 }
582 _, ok := signed.Data().(*v0.StreamKey)
583 if !ok {
584 return "", fmt.Errorf("got signed data but it wasn't a stream key")
585 }
586 return strings.ToLower(signed.Signer()), nil
587}