Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "strings"
10 "time"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "stream.place/streamplace/pkg/config"
15 "stream.place/streamplace/pkg/log"
16)
17
18// For testing. Normally, We don't want to stop the pipeline upon a
19// segmentation error because we want to keep the stream alive. Lots
20// of weird invalid data coming in from WebRTC connections on phones.
21// Better we drop one weird segment than force the stream to restart.
22// But for tests, we want (sometimes) to know if there's a problem.
23var FatalSegmentationErrors = false
24
25// element that takes the input stream, muxes to mp4, and signs the result
26func SegmentElem(ctx context.Context, cli *config.CLI, streamer string, doH264Parse bool, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) {
27 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1")
28 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{
29 "name": "signer",
30 "async-finalize": true,
31 "sink-factory": "appsink",
32 "muxer-factory": "mp4mux",
33 "max-size-bytes": 1,
34 })
35 if err != nil {
36 return nil, err
37 }
38
39 p := elem.GetRequestPad("video")
40 if p == nil {
41 return nil, fmt.Errorf("failed to get video pad")
42 }
43 p = elem.GetRequestPad("audio_%u")
44 if p == nil {
45 return nil, fmt.Errorf("failed to get audio pad")
46 }
47
48 resetTimer := make(chan struct{})
49
50 go func() {
51 for {
52 select {
53 case <-ctx.Done():
54 return
55 case <-resetTimer:
56 continue
57 case <-time.After(time.Second * 30):
58 log.Warn(ctx, "no new segment for 30 seconds")
59 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)")
60 return
61 }
62 }
63 }()
64
65 // we didn't need faststart but i'm leaving this commented here in case
66 // you want to change any other muxer properties in the future
67
68 _, err = elem.Connect("muxer-added", func(split, muxEle *gst.Element) {
69 err := muxEle.SetProperty("presentation-time", false)
70 if err != nil {
71 panic("error setting presentation-time to false: " + err.Error())
72 }
73 err = muxEle.SetProperty("interleave-bytes", InterleaveBytes)
74 if err != nil {
75 panic("error setting interleave-bytes" + err.Error())
76 }
77 err = muxEle.SetProperty("interleave-time", InterleaveTime)
78 if err != nil {
79 panic("error setting interleave-time" + err.Error())
80 }
81 err = muxEle.SetProperty("faststart", true)
82 if err != nil {
83 panic("error setting faststart" + err.Error())
84 }
85 err = muxEle.SetProperty("movie-timescale", uint(60000))
86 if err != nil {
87 panic("error setting movie-timescale" + err.Error())
88 }
89 err = muxEle.SetProperty("trak-timescale", uint(60000))
90 if err != nil {
91 panic("error setting trak-timescale" + err.Error())
92 }
93 })
94 if err != nil {
95 return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err)
96 }
97
98 // channel to make sure data is emitted in order
99 var ch chan struct{}
100
101 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) {
102 previousSegCh := ch
103 mySegCh := make(chan struct{}, 1)
104 ch = mySegCh
105 buf := &bytes.Buffer{}
106 err := sinkEle.SetProperty("sync", false)
107 if err != nil {
108 panic("error setting sync to false: " + err.Error())
109 }
110 appsink := app.SinkFromElement(sinkEle)
111 if appsink == nil {
112 panic("appsink should not be nil")
113 }
114
115 appsink.SetCallbacks(&app.SinkCallbacks{
116 NewSampleFunc: WriterNewSample(ctx, buf),
117 EOSFunc: func(sink *app.Sink) {
118 // ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes(
119 // attribute.String("streamer", ms.Streamer()),
120 // ))
121 // defer span.End()
122 now := time.Now().UnixMilli()
123 bs := buf.Bytes()
124
125 if previousSegCh != nil {
126 <-previousSegCh
127 }
128 resetTimer <- struct{}{}
129 convergeAndSign := func() error {
130 convergedBs, err := ConvergeSegment(ctx, cli, bs, now, streamer, doH264Parse)
131 if err != nil {
132 log.Error(ctx, "error converging segment", "error", err)
133 } else {
134 bs = convergedBs
135 }
136 log.Debug(ctx, "signing segment", "size", len(bs))
137 err = cb(ctx, bs, now)
138 if err != nil {
139 return fmt.Errorf("error signing segment: %w", err)
140 }
141 return nil
142 }
143 err := func() error {
144 convergeDone := make(chan error)
145 go func() {
146 convergeDone <- convergeAndSign()
147 }()
148 select {
149 case <-ctx.Done():
150 return ctx.Err()
151 case err := <-convergeDone:
152 return err
153 case <-time.After(time.Second * 3):
154 go func() {
155 err = cli.DataFileWrite([]string{"debug-recordings", streamer, fmt.Sprintf("converge-timeout-%d.mp4", now)}, bytes.NewReader(bs), true)
156 if err != nil {
157 log.Error(ctx, "error writing debug recording", "error", err)
158 }
159 }()
160 return fmt.Errorf("timeout converging segment")
161 }
162 }()
163 close(mySegCh)
164 if err != nil {
165 log.Error(ctx, "error in segmenter", "error", err)
166 if FatalSegmentationErrors {
167 sink.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "error in segmenter", err.Error())
168 return
169 }
170 }
171 },
172 })
173 })
174 if err != nil {
175 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err)
176 }
177
178 return elem, nil
179}
180
181func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) {
182 return SegmentElem(ctx, mm.cli, ms.Streamer(), false, func(ctx context.Context, bs []byte, now int64) error {
183 if mm.cli.SmearAudio {
184 smearedBuf := &bytes.Buffer{}
185 err := RewriteAudioTimestamps(ctx, mm.cli, bytes.NewReader(bs), smearedBuf, true)
186 if err != nil {
187 return fmt.Errorf("error smearing audio timestamps: %w", err)
188 }
189 bs = smearedBuf.Bytes()
190 }
191 signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
192 if err != nil {
193 return fmt.Errorf("error calling SignMP4: %w", err)
194 }
195 log.Debug(ctx, "signed segment", "size", len(signedBs))
196 err = mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true)
197 if err != nil {
198 mm.cli.DumpDebugSegment(ctx, "just-signed-segment.mp4", bytes.NewReader(signedBs))
199 return fmt.Errorf("error validating just-signed segment: %w", err)
200 }
201 return nil
202 })
203}
204
205func SegmentFileUnsigned(ctx context.Context, cli *config.CLI, streamer string, input string, ch chan *SplitSegment) error {
206 fd, err := os.OpenFile(input, os.O_RDONLY, 0644)
207 log.Log(ctx, "reading file", "file", input)
208 if err != nil {
209 return fmt.Errorf("failed to read file: %w", err)
210 }
211 defer fd.Close()
212 return SegmentUnsigned(ctx, cli, streamer, fd, false, ch)
213}
214
215func SegmentUnsigned(ctx context.Context, cli *config.CLI, streamer string, input io.Reader, doH264Parse bool, ch chan *SplitSegment) error {
216 ctx, cancel := context.WithCancel(ctx)
217 defer cancel()
218 pipelineSlice := []string{
219 "appsrc name=appsrc ! qtdemux name=demux",
220 "demux. ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=0",
221 "demux. ! queue ! opusparse name=audioparse",
222 }
223 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
224 if err != nil {
225 return fmt.Errorf("error creating MKVIngest pipeline: %w", err)
226 }
227
228 srcele, err := pipeline.GetElementByName("appsrc")
229 if err != nil {
230 return err
231 }
232 src := app.SrcFromElement(srcele)
233 src.SetCallbacks(&app.SourceCallbacks{
234 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
235 })
236 videoParseEle, err := pipeline.GetElementByName("videoparse")
237 if err != nil {
238 return err
239 }
240
241 segmenter, err := SegmentElem(ctx, cli, streamer, doH264Parse, func(ctx context.Context, buf []byte, now int64) error {
242 ch <- &SplitSegment{
243 Filename: fmt.Sprintf("%d.mp4", now),
244 Data: buf,
245 }
246 return nil
247 })
248 if err != nil {
249 return err
250 }
251
252 err = pipeline.Add(segmenter)
253 if err != nil {
254 return err
255 }
256 err = videoParseEle.Link(segmenter)
257 if err != nil {
258 return err
259 }
260 audioparse, err := pipeline.GetElementByName("audioparse")
261 if err != nil {
262 return err
263 }
264 err = audioparse.Link(segmenter)
265 if err != nil {
266 return err
267 }
268
269 busErr := make(chan error)
270 go func() {
271 err := HandleBusMessages(ctx, pipeline)
272 cancel()
273 busErr <- err
274 }()
275
276 err = pipeline.SetState(gst.StatePlaying)
277 if err != nil {
278 return err
279 }
280
281 defer func() {
282 err := pipeline.SetState(gst.StateNull)
283 if err != nil {
284 log.Error(ctx, "error setting pipeline to null state", "error", err)
285 }
286 }()
287
288 err = <-busErr
289 if err != nil {
290 return err
291 }
292
293 return nil
294}