Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "strings"
8 "time"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "golang.org/x/sync/errgroup"
13 "stream.place/streamplace/pkg/log"
14)
15
16// MP4ToMPEGTS converts an MP4 file with H264 video and Opus audio to an MPEG-TS file
17// It reads from the provided reader and writes the converted MPEG-TS to the writer.
18// The conversion is optimized for speed.
19func MP4ToMPEGTS(ctx context.Context, input io.Reader, output io.Writer) (int64, error) {
20 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTS")
21 pipelineStr := strings.Join([]string{
22 "appsrc name=appsrc ! qtdemux name=demux",
23 "mpegtsmux name=mux ! appsink name=appsink sync=false",
24 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue",
25 "demux.audio_0 ! opusdec name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc ! queue name=audioqueue",
26 }, " ")
27
28 pipeline, err := gst.NewPipelineFromString(pipelineStr)
29 if err != nil {
30 return 0, err
31 }
32
33 mux, err := pipeline.GetElementByName("mux")
34 if err != nil {
35 return 0, err
36 }
37 muxVideoSinkPad := mux.GetRequestPad("sink_%d")
38 if muxVideoSinkPad == nil {
39 return 0, fmt.Errorf("failed to get video sink pad")
40 }
41 muxAudioSinkPad := mux.GetRequestPad("sink_%d")
42 if muxAudioSinkPad == nil {
43 return 0, fmt.Errorf("failed to get audio sink pad")
44 }
45 videoQueue, err := pipeline.GetElementByName("videoqueue")
46 if err != nil {
47 return 0, err
48 }
49 audioQueue, err := pipeline.GetElementByName("audioqueue")
50 if err != nil {
51 return 0, err
52 }
53 videoQueueSrcPad := videoQueue.GetStaticPad("src")
54 if videoQueueSrcPad == nil {
55 return 0, fmt.Errorf("failed to get video queue source pad")
56 }
57 audioQueueSrcPad := audioQueue.GetStaticPad("src")
58 if audioQueueSrcPad == nil {
59 return 0, fmt.Errorf("failed to get audio queue source pad")
60 }
61
62 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
63 if ok != gst.PadLinkOK {
64 return 0, fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
65 }
66 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
67 if ok != gst.PadLinkOK {
68 return 0, fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
69 }
70
71 // Get elements
72 appsrc, err := pipeline.GetElementByName("appsrc")
73 if err != nil {
74 return 0, err
75 }
76 appsink, err := pipeline.GetElementByName("appsink")
77 if err != nil {
78 return 0, err
79 }
80
81 source := app.SrcFromElement(appsrc)
82 sink := app.SinkFromElement(appsink)
83
84 // Set up source callbacks
85 source.SetCallbacks(&app.SourceCallbacks{
86 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
87 EnoughDataFunc: func(self *app.Source) {
88 // Nothing to do here
89 },
90 SeekDataFunc: func(self *app.Source, offset uint64) bool {
91 return false // We don't support seeking
92 },
93 })
94
95 // Set up sink callbacks
96 sink.SetCallbacks(&app.SinkCallbacks{
97 NewSampleFunc: WriterNewSample(ctx, output),
98 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
99 return gst.FlowOK
100 },
101 })
102
103 ctx, cancel := context.WithCancel(ctx)
104 defer func() {
105 cancel()
106 // Clean up
107 err = pipeline.SetState(gst.StateNull)
108 if err != nil {
109 log.Error(ctx, "failed to set pipeline state to null", "error", err)
110 }
111 }()
112
113 go func() {
114 select {
115 case <-ctx.Done():
116 return
117 case <-time.After(time.Second * 10):
118 log.Debug(ctx, "pipeline is taking too long to start, cancelling")
119 err := fmt.Errorf("pipeline is taking too long to start, cancelling")
120 pipeline.Error(err.Error(), err)
121 }
122 }()
123
124 // Handle bus messages in a separate goroutine
125 errCh := make(chan error)
126 go func() {
127 err := HandleBusMessages(ctx, pipeline)
128 cancel()
129 errCh <- err
130 close(errCh)
131 }()
132
133 // Start the pipeline
134 err = pipeline.SetState(gst.StatePlaying)
135 if err != nil {
136 return 0, fmt.Errorf("failed to set pipeline state to playing: %w", err)
137 }
138
139 var durOk bool
140 var dur int64
141 busErr := <-errCh
142
143 if busErr == nil {
144 durOk, dur = pipeline.QueryDuration(gst.FormatTime)
145 if !durOk {
146 return 0, fmt.Errorf("failed to query duration")
147 }
148 }
149
150 return dur, busErr
151}
152
153// MPEGTSToMP4 converts an MPEG-TS file with H264 video and Opus audio to an MP4 file.
154// It reads from the provided reader and writes the converted MP4 to the writer.
155func MPEGTSToMP4(ctx context.Context, input io.Reader, output io.Writer) error {
156 ctx = log.WithLogValues(ctx, "func", "MPEGTSToMP4")
157 pipelineStr := strings.Join([]string{
158 "appsrc name=appsrc ! tsdemux name=demux",
159 "mp4mux name=mux ! appsink sync=false name=appsink",
160 "demux.video_0_0100 ! h264parse ! video/x-h264,stream-format=avc ! queue name=videoqueue",
161 "demux.audio_0_0101 ! opusdec ! opusenc ! queue name=audioqueue",
162 }, " ")
163
164 pipeline, err := gst.NewPipelineFromString(pipelineStr)
165 if err != nil {
166 return err
167 }
168
169 mux, err := pipeline.GetElementByName("mux")
170 if err != nil {
171 return err
172 }
173 muxVideoSinkPad := mux.GetRequestPad("video_%u")
174 if muxVideoSinkPad == nil {
175 return fmt.Errorf("failed to get video sink pad")
176 }
177 muxAudioSinkPad := mux.GetRequestPad("audio_%u")
178 if muxAudioSinkPad == nil {
179 return fmt.Errorf("failed to get audio sink pad")
180 }
181 videoQueue, err := pipeline.GetElementByName("videoqueue")
182 if err != nil {
183 return err
184 }
185 audioQueue, err := pipeline.GetElementByName("audioqueue")
186 if err != nil {
187 return err
188 }
189 videoQueueSrcPad := videoQueue.GetStaticPad("src")
190 if videoQueueSrcPad == nil {
191 return fmt.Errorf("failed to get video queue source pad")
192 }
193 audioQueueSrcPad := audioQueue.GetStaticPad("src")
194 if audioQueueSrcPad == nil {
195 return fmt.Errorf("failed to get audio queue source pad")
196 }
197
198 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
199 if ok != gst.PadLinkOK {
200 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
201 }
202 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
203 if ok != gst.PadLinkOK {
204 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
205 }
206
207 // Get elements
208 appsrc, err := pipeline.GetElementByName("appsrc")
209 if err != nil {
210 return err
211 }
212 appsink, err := pipeline.GetElementByName("appsink")
213 if err != nil {
214 return err
215 }
216
217 source := app.SrcFromElement(appsrc)
218 sink := app.SinkFromElement(appsink)
219
220 // Set up source callbacks
221 source.SetCallbacks(&app.SourceCallbacks{
222 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
223 EnoughDataFunc: func(self *app.Source) {
224 // Nothing to do here
225 },
226 SeekDataFunc: func(self *app.Source, offset uint64) bool {
227 return false // We don't support seeking
228 },
229 })
230
231 // Set up sink callbacks
232 sink.SetCallbacks(&app.SinkCallbacks{
233 NewSampleFunc: WriterNewSample(ctx, output),
234 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
235 return gst.FlowOK
236 },
237 })
238
239 ctx, cancel := context.WithCancel(ctx)
240 defer cancel()
241
242 busErr := make(chan error)
243 go func() {
244 err := HandleBusMessages(ctx, pipeline)
245 busErr <- err
246 }()
247
248 err = pipeline.SetState(gst.StatePlaying)
249 if err != nil {
250 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
251 }
252
253 defer func() {
254 err = pipeline.SetState(gst.StateNull)
255 if err != nil {
256 log.Error(ctx, "failed to set pipeline state to null", "error", err)
257 }
258 }()
259
260 return <-busErr
261}
262
263// Splits out video into MPEG-TS and audio into MP4 (to be recombined after transcoding)
264func MP4ToMPEGTSVideoMP4Audio(ctx context.Context, input io.Reader, videoOutput io.Writer, audioOutput io.Writer) error {
265 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTSVideoMP4Audio")
266 pipelineStr := strings.Join([]string{
267 "appsrc name=appsrc ! qtdemux name=demux",
268 "mpegtsmux name=videomux ! appsink name=videoappsink sync=false",
269 "mp4mux name=audiomux ! appsink name=audioappsink sync=false",
270 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue",
271 "demux.audio_0 ! opusparse ! queue name=audioqueue",
272 }, " ")
273
274 pipeline, err := gst.NewPipelineFromString(pipelineStr)
275 if err != nil {
276 return err
277 }
278
279 videomux, err := pipeline.GetElementByName("videomux")
280 if err != nil {
281 return err
282 }
283 muxVideoSinkPad := videomux.GetRequestPad("sink_%d")
284 if muxVideoSinkPad == nil {
285 return fmt.Errorf("failed to get video sink pad")
286 }
287
288 audiomux, err := pipeline.GetElementByName("audiomux")
289 if err != nil {
290 return err
291 }
292 muxAudioSinkPad := audiomux.GetRequestPad("audio_%u")
293 if muxAudioSinkPad == nil {
294 return fmt.Errorf("failed to get audio sink pad")
295 }
296
297 videoQueue, err := pipeline.GetElementByName("videoqueue")
298 if err != nil {
299 return err
300 }
301 audioQueue, err := pipeline.GetElementByName("audioqueue")
302 if err != nil {
303 return err
304 }
305
306 videoQueueSrcPad := videoQueue.GetStaticPad("src")
307 if videoQueueSrcPad == nil {
308 return fmt.Errorf("failed to get video queue source pad")
309 }
310 audioQueueSrcPad := audioQueue.GetStaticPad("src")
311 if audioQueueSrcPad == nil {
312 return fmt.Errorf("failed to get audio queue source pad")
313 }
314
315 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
316 if ok != gst.PadLinkOK {
317 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
318 }
319 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
320 if ok != gst.PadLinkOK {
321 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
322 }
323
324 // Get elements
325 appsrc, err := pipeline.GetElementByName("appsrc")
326 if err != nil {
327 return err
328 }
329 videoappsink, err := pipeline.GetElementByName("videoappsink")
330 if err != nil {
331 return err
332 }
333 audioappsink, err := pipeline.GetElementByName("audioappsink")
334 if err != nil {
335 return err
336 }
337
338 source := app.SrcFromElement(appsrc)
339 videoSink := app.SinkFromElement(videoappsink)
340 audioSink := app.SinkFromElement(audioappsink)
341
342 // Set up source callbacks
343 source.SetCallbacks(&app.SourceCallbacks{
344 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
345 EnoughDataFunc: func(self *app.Source) {
346 // Nothing to do here
347 },
348 SeekDataFunc: func(self *app.Source, offset uint64) bool {
349 return false // We don't support seeking
350 },
351 })
352
353 // Set up sink callbacks
354 videoSink.SetCallbacks(&app.SinkCallbacks{
355 NewSampleFunc: WriterNewSample(ctx, videoOutput),
356 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
357 return gst.FlowOK
358 },
359 })
360
361 // Set up sink callbacks
362 audioSink.SetCallbacks(&app.SinkCallbacks{
363 NewSampleFunc: WriterNewSample(ctx, audioOutput),
364 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
365 return gst.FlowOK
366 },
367 })
368
369 ctx, cancel := context.WithCancel(ctx)
370 defer cancel()
371
372 // Handle bus messages in a separate goroutine
373 g, ctx := errgroup.WithContext(ctx)
374 g.Go(func() error {
375 err = HandleBusMessages(ctx, pipeline)
376 cancel()
377 return err
378 })
379
380 // Start the pipeline
381 err = pipeline.SetState(gst.StatePlaying)
382 if err != nil {
383 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
384 }
385
386 // Wait for the pipeline to finish or context to be canceled
387 <-ctx.Done()
388
389 // Clean up
390 err = pipeline.SetState(gst.StateNull)
391 if err != nil {
392 return fmt.Errorf("failed to set pipeline state to null: %w", err)
393 }
394
395 return g.Wait()
396}
397
398// Joins video and audio back together from MPEG-TS and MP4 (from transcoding)
399func MPEGTSVideoMP4AudioToMP4(ctx context.Context, videoInput io.Reader, audioInput io.Reader, output io.Writer) error {
400 pipelineStr := strings.Join([]string{
401 "appsrc name=videoappsrc ! tsdemux name=videodemux",
402 "appsrc name=audioappsrc ! qtdemux name=audiodemux",
403 "mp4mux name=mux ! appsink name=appsink sync=false",
404 "h264parse name=videoparse ! video/x-h264,stream-format=avc ! queue name=videoqueue",
405 "audiodemux.audio_0 ! opusparse ! queue name=audioqueue",
406 }, " ")
407
408 pipeline, err := gst.NewPipelineFromString(pipelineStr)
409 if err != nil {
410 return err
411 }
412
413 mux, err := pipeline.GetElementByName("mux")
414 if err != nil {
415 return err
416 }
417 muxVideoSinkPad := mux.GetRequestPad("video_%u")
418 if muxVideoSinkPad == nil {
419 return fmt.Errorf("failed to get video sink pad")
420 }
421 muxAudioSinkPad := mux.GetRequestPad("audio_%u")
422 if muxAudioSinkPad == nil {
423 return fmt.Errorf("failed to get audio sink pad")
424 }
425
426 videoQueue, err := pipeline.GetElementByName("videoqueue")
427 if err != nil {
428 return err
429 }
430 audioQueue, err := pipeline.GetElementByName("audioqueue")
431 if err != nil {
432 return err
433 }
434
435 videoQueueSrcPad := videoQueue.GetStaticPad("src")
436 if videoQueueSrcPad == nil {
437 return fmt.Errorf("failed to get video queue source pad")
438 }
439 audioQueueSrcPad := audioQueue.GetStaticPad("src")
440 if audioQueueSrcPad == nil {
441 return fmt.Errorf("failed to get audio queue source pad")
442 }
443
444 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
445 if ok != gst.PadLinkOK {
446 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
447 }
448 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
449 if ok != gst.PadLinkOK {
450 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
451 }
452
453 videodemux, err := pipeline.GetElementByName("videodemux")
454 if err != nil {
455 return err
456 }
457 videoparse, err := pipeline.GetElementByName("videoparse")
458 if err != nil {
459 return err
460 }
461 videoParseSinkPad := videoparse.GetStaticPad("sink")
462 if videoParseSinkPad == nil {
463 return fmt.Errorf("failed to get video parse sink pad")
464 }
465
466 // Get elements
467 videoappsrc, err := pipeline.GetElementByName("videoappsrc")
468 if err != nil {
469 return err
470 }
471 audioappsrc, err := pipeline.GetElementByName("audioappsrc")
472 if err != nil {
473 return err
474 }
475 appsink, err := pipeline.GetElementByName("appsink")
476 if err != nil {
477 return err
478 }
479
480 videoSource := app.SrcFromElement(videoappsrc)
481 audioSource := app.SrcFromElement(audioappsrc)
482 sink := app.SinkFromElement(appsink)
483
484 // Set up source callbacks
485 videoSource.SetCallbacks(&app.SourceCallbacks{
486 NeedDataFunc: ReaderNeedDataIncremental(ctx, videoInput),
487 EnoughDataFunc: func(self *app.Source) {
488 // Nothing to do here
489 },
490 SeekDataFunc: func(self *app.Source, offset uint64) bool {
491 return false // We don't support seeking
492 },
493 })
494
495 audioSource.SetCallbacks(&app.SourceCallbacks{
496 NeedDataFunc: ReaderNeedDataIncremental(ctx, audioInput),
497 EnoughDataFunc: func(self *app.Source) {
498 // Nothing to do here
499 },
500 SeekDataFunc: func(self *app.Source, offset uint64) bool {
501 return false // We don't support seeking
502 },
503 })
504
505 wroteAnything := false
506
507 // Set up sink callbacks
508 sink.SetCallbacks(&app.SinkCallbacks{
509 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
510 sample := sink.PullSample()
511 if sample == nil {
512 return gst.FlowOK
513 }
514
515 // Retrieve the buffer from the sample.
516 buffer := sample.GetBuffer()
517 bs := buffer.Map(gst.MapRead).Bytes()
518 defer buffer.Unmap()
519
520 _, err := output.Write(bs)
521
522 if err != nil {
523 log.Error(ctx, "error writing to output", "error", err)
524 return gst.FlowError
525 }
526
527 wroteAnything = true
528
529 return gst.FlowOK
530 },
531 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
532 return gst.FlowOK
533 },
534 })
535
536 ctx, cancel := context.WithCancel(ctx)
537 defer cancel()
538
539 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
540 if pad.GetDirection() == gst.PadDirectionSource {
541 ok := pad.Link(videoParseSinkPad)
542 if ok != gst.PadLinkOK {
543 log.Error(ctx, "failed to link video parse sink pad to video demux pad", "error", ok)
544 cancel()
545 }
546 }
547 }
548 if _, err := videodemux.Connect("pad-added", onPadAdded); err != nil {
549 return fmt.Errorf("failed connect pad-added handler: %w", err)
550 }
551
552 errCh := make(chan error)
553 go func() {
554 err = HandleBusMessages(ctx, pipeline)
555 cancel()
556 errCh <- err
557 }()
558
559 // Start the pipeline
560 err = pipeline.SetState(gst.StatePlaying)
561 if err != nil {
562 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
563 }
564
565 defer func() {
566 err = pipeline.SetState(gst.StateNull)
567 if err != nil {
568 log.Error(ctx, "failed to set pipeline state to null", "error", err)
569 }
570 videoParseSinkPad = nil
571 }()
572
573 err = <-errCh
574 if err != nil {
575 return fmt.Errorf("pipeline error: %w", err)
576 }
577
578 if !wroteAnything {
579 return fmt.Errorf("no data written to output")
580 }
581
582 return nil
583}