Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "strings"
10 "time"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "stream.place/streamplace/pkg/config"
15 "stream.place/streamplace/pkg/log"
16)
17
18// For testing. Normally, We don't want to stop the pipeline upon a
19// segmentation error because we want to keep the stream alive. Lots
20// of weird invalid data coming in from WebRTC connections on phones.
21// Better we drop one weird segment than force the stream to restart.
22// But for tests, we want (sometimes) to know if there's a problem.
23var FatalSegmentationErrors = false
24
25// element that takes the input stream, muxes to mp4, and signs the result
26func SegmentElem(ctx context.Context, cli *config.CLI, streamer string, doH264Parse bool, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) {
27 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1")
28 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{
29 "name": "signer",
30 "async-finalize": true,
31 "sink-factory": "appsink",
32 "muxer-factory": "mp4mux",
33 "max-size-bytes": 1,
34 })
35 if err != nil {
36 return nil, err
37 }
38
39 p := elem.GetRequestPad("video")
40 if p == nil {
41 return nil, fmt.Errorf("failed to get video pad")
42 }
43 p = elem.GetRequestPad("audio_%u")
44 if p == nil {
45 return nil, fmt.Errorf("failed to get audio pad")
46 }
47
48 resetTimer := make(chan struct{})
49
50 go func() {
51 for {
52 select {
53 case <-ctx.Done():
54 return
55 case <-resetTimer:
56 continue
57 case <-time.After(time.Second * 30):
58 log.Warn(ctx, "no new segment for 30 seconds")
59 elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "No new segment for 30 seconds", "No new segment for 30 seconds (debug)")
60 return
61 }
62 }
63 }()
64
65 // we didn't need faststart but i'm leaving this commented here in case
66 // you want to change any other muxer properties in the future
67
68 _, err = elem.Connect("muxer-added", func(split, muxEle *gst.Element) {
69 err := muxEle.SetProperty("presentation-time", false)
70 if err != nil {
71 panic("error setting presentation-time to false: " + err.Error())
72 }
73 err = muxEle.SetProperty("interleave-bytes", InterleaveBytes)
74 if err != nil {
75 panic("error setting interleave-bytes" + err.Error())
76 }
77 err = muxEle.SetProperty("interleave-time", InterleaveTime)
78 if err != nil {
79 panic("error setting interleave-time" + err.Error())
80 }
81 err = muxEle.SetProperty("faststart", true)
82 if err != nil {
83 panic("error setting faststart" + err.Error())
84 }
85 err = muxEle.SetProperty("movie-timescale", uint(60000))
86 if err != nil {
87 panic("error setting movie-timescale" + err.Error())
88 }
89 err = muxEle.SetProperty("trak-timescale", uint(60000))
90 if err != nil {
91 panic("error setting trak-timescale" + err.Error())
92 }
93 })
94 if err != nil {
95 return nil, fmt.Errorf("failed to connect muxer-added handler: %w", err)
96 }
97
98 // channel to make sure data is emitted in order
99 var ch chan struct{}
100
101 _, err = elem.Connect("sink-added", func(split, sinkEle *gst.Element) {
102 previousSegCh := ch
103 mySegCh := make(chan struct{}, 1)
104 ch = mySegCh
105 buf := &bytes.Buffer{}
106 err := sinkEle.SetProperty("sync", false)
107 if err != nil {
108 panic("error setting sync to false: " + err.Error())
109 }
110 appsink := app.SinkFromElement(sinkEle)
111 if appsink == nil {
112 panic("appsink should not be nil")
113 }
114
115 appsink.SetCallbacks(&app.SinkCallbacks{
116 NewSampleFunc: WriterNewSample(ctx, buf),
117 EOSFunc: func(sink *app.Sink) {
118 // ctx, span := otel.Tracer("signer").Start(ctx, "SegmentAndSignElem", trace.WithAttributes(
119 // attribute.String("streamer", ms.Streamer()),
120 // ))
121 // defer span.End()
122 now := time.Now().UnixMilli()
123 resetTimer <- struct{}{}
124 bs := buf.Bytes()
125
126 if previousSegCh != nil {
127 <-previousSegCh
128 }
129 err := func() error {
130 bs, err := ConvergeSegment(ctx, cli, bs, now, streamer, doH264Parse)
131 if err != nil {
132 return fmt.Errorf("error converging segment: %w", err)
133 }
134 log.Debug(ctx, "signing segment", "size", len(bs))
135 err = cb(ctx, bs, now)
136 if err != nil {
137 return fmt.Errorf("error signing segment: %w", err)
138 }
139 return nil
140 }()
141 close(mySegCh)
142 if err != nil {
143 log.Error(ctx, "error in segmenter", "error", err)
144 if FatalSegmentationErrors {
145 sink.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "error in segmenter", err.Error())
146 return
147 }
148 }
149 },
150 })
151 })
152 if err != nil {
153 return nil, fmt.Errorf("failed to connect sink-added handler: %w", err)
154 }
155
156 return elem, nil
157}
158
159func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) {
160 return SegmentElem(ctx, mm.cli, ms.Streamer(), false, func(ctx context.Context, bs []byte, now int64) error {
161 if mm.cli.SmearAudio {
162 smearedBuf := &bytes.Buffer{}
163 err := RewriteAudioTimestamps(ctx, mm.cli, bytes.NewReader(bs), smearedBuf, true)
164 if err != nil {
165 return fmt.Errorf("error smearing audio timestamps: %w", err)
166 }
167 bs = smearedBuf.Bytes()
168 }
169 signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
170 if err != nil {
171 return fmt.Errorf("error calling SignMP4: %w", err)
172 }
173 log.Debug(ctx, "signed segment", "size", len(signedBs))
174 err = mm.ValidateMP4(ctx, bytes.NewReader(signedBs), true)
175 if err != nil {
176 mm.cli.DumpDebugSegment(ctx, "just-signed-segment.mp4", bytes.NewReader(signedBs))
177 return fmt.Errorf("error validating just-signed segment: %w", err)
178 }
179 return nil
180 })
181}
182
183func SegmentFileUnsigned(ctx context.Context, cli *config.CLI, streamer string, input string, ch chan *SplitSegment) error {
184 fd, err := os.OpenFile(input, os.O_RDONLY, 0644)
185 log.Log(ctx, "reading file", "file", input)
186 if err != nil {
187 return fmt.Errorf("failed to read file: %w", err)
188 }
189 defer fd.Close()
190 return SegmentUnsigned(ctx, cli, streamer, fd, false, ch)
191}
192
193func SegmentUnsigned(ctx context.Context, cli *config.CLI, streamer string, input io.Reader, doH264Parse bool, ch chan *SplitSegment) error {
194 ctx, cancel := context.WithCancel(ctx)
195 defer cancel()
196 pipelineSlice := []string{
197 "appsrc name=appsrc ! qtdemux name=demux",
198 "demux. ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=0",
199 "demux. ! queue ! opusparse name=audioparse",
200 }
201 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
202 if err != nil {
203 return fmt.Errorf("error creating MKVIngest pipeline: %w", err)
204 }
205
206 srcele, err := pipeline.GetElementByName("appsrc")
207 if err != nil {
208 return err
209 }
210 src := app.SrcFromElement(srcele)
211 src.SetCallbacks(&app.SourceCallbacks{
212 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
213 })
214 videoParseEle, err := pipeline.GetElementByName("videoparse")
215 if err != nil {
216 return err
217 }
218
219 segmenter, err := SegmentElem(ctx, cli, streamer, doH264Parse, func(ctx context.Context, buf []byte, now int64) error {
220 ch <- &SplitSegment{
221 Filename: fmt.Sprintf("%d.mp4", now),
222 Data: buf,
223 }
224 return nil
225 })
226 if err != nil {
227 return err
228 }
229
230 err = pipeline.Add(segmenter)
231 if err != nil {
232 return err
233 }
234 err = videoParseEle.Link(segmenter)
235 if err != nil {
236 return err
237 }
238 audioparse, err := pipeline.GetElementByName("audioparse")
239 if err != nil {
240 return err
241 }
242 err = audioparse.Link(segmenter)
243 if err != nil {
244 return err
245 }
246
247 busErr := make(chan error)
248 go func() {
249 err := HandleBusMessages(ctx, pipeline)
250 cancel()
251 busErr <- err
252 }()
253
254 err = pipeline.SetState(gst.StatePlaying)
255 if err != nil {
256 return err
257 }
258
259 defer func() {
260 err := pipeline.SetState(gst.StateNull)
261 if err != nil {
262 log.Error(ctx, "error setting pipeline to null state", "error", err)
263 }
264 }()
265
266 err = <-busErr
267 if err != nil {
268 return err
269 }
270
271 return nil
272}