Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "go.opentelemetry.io/otel"
12 "go.opentelemetry.io/otel/attribute"
13 "go.opentelemetry.io/otel/trace"
14 "stream.place/streamplace/pkg/globalerror"
15 "stream.place/streamplace/pkg/log"
16)
17
18// element that takes the input stream, muxes to mp4, and signs the result
19func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) {
20 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1")
21 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{
22 "name": "signer",
23 "async-finalize": true,
24 "sink-factory": "appsink",
25 "muxer-factory": "mp4mux",
26 "max-size-bytes": 1,
27 })
28 if err != nil {
29 return nil, err
30 }
31
32 p := elem.GetRequestPad("video")
33 if p == nil {
34 return nil, fmt.Errorf("failed to get video pad")
35 }
36 p = elem.GetRequestPad("audio_%u")
37 if p == nil {
38 return nil, fmt.Errorf("failed to get audio pad")
39 }
40
41 resetTimer := make(chan struct{})
42
43 go func() {
44 for {
45 select {
46 case <-ctx.Done():
47 return
48 case <-resetTimer:
49 continue
50 case <-time.After(time.Second * 30):
51 log.Warn(ctx, "no new segment for 30 seconds")
52 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)")
53 return
54 }
55 }
56 }()
57
58 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) {
59 buf := &bytes.Buffer{}
60 appsink := app.SinkFromElement(sinkEle)
61 if appsink == nil {
62 panic("appsink should not be nil")
63 }
64
65 appsink.SetCallbacks(&app.SinkCallbacks{
66 NewSampleFunc: WriterNewSample(ctx, buf),
67 EOSFunc: func(sink *app.Sink) {
68 ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes(
69 attribute.String("streamer", ms.Streamer()),
70 ))
71 defer span.End()
72 resetTimer <- struct{}{}
73 now := time.Now().UnixMilli()
74 bs := buf.Bytes()
75 if mm.cli.SmearAudio {
76 smearedBuf := &bytes.Buffer{}
77 err := SmearAudioTimestamps(ctx, bytes.NewReader(buf.Bytes()), smearedBuf)
78 if err != nil {
79 log.Error(ctx, "error smearing audio timestamps", "error", err)
80 return
81 }
82 bs = smearedBuf.Bytes()
83 }
84 bs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
85 if err != nil {
86 log.Error(ctx, "error signing segment", "error", err)
87 return
88 }
89
90 err = mm.ValidateMP4(ctx, bytes.NewReader(bs), true)
91 if err != nil {
92 log.Error(ctx, "error validating segment", "error", err)
93 globalerror.GlobalError(err)
94 // We don't want to stop the pipeline here because we want to keep the stream
95 // alive. Lots of weird invalid data coming in from WebRTC connections on
96 // phones. Better we drop one weird segment than force the stream to restart
97 return
98 }
99 },
100 })
101 })
102 if err != nil {
103 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err)
104 }
105
106 return elem, nil
107}