Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "strings"
8 "time"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "golang.org/x/sync/errgroup"
13 "stream.place/streamplace/pkg/log"
14)
15
16// MP4ToMPEGTS converts an MP4 file with H264 video and Opus audio to an MPEG-TS file
17// It reads from the provided reader and writes the converted MPEG-TS to the writer.
18// The conversion is optimized for speed.
19func MP4ToMPEGTS(ctx context.Context, input io.Reader, output io.Writer) (int64, error) {
20 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTS")
21 pipelineStr := strings.Join([]string{
22 "appsrc name=appsrc ! qtdemux name=demux",
23 "mpegtsmux name=mux ! appsink name=appsink sync=false",
24 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue",
25 "demux.audio_0 ! opusdec name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc ! queue name=audioqueue",
26 }, " ")
27
28 pipeline, err := gst.NewPipelineFromString(pipelineStr)
29 if err != nil {
30 return 0, err
31 }
32
33 mux, err := pipeline.GetElementByName("mux")
34 if err != nil {
35 return 0, err
36 }
37 muxVideoSinkPad := mux.GetRequestPad("sink_%d")
38 if muxVideoSinkPad == nil {
39 return 0, fmt.Errorf("failed to get video sink pad")
40 }
41 muxAudioSinkPad := mux.GetRequestPad("sink_%d")
42 if muxAudioSinkPad == nil {
43 return 0, fmt.Errorf("failed to get audio sink pad")
44 }
45 videoQueue, err := pipeline.GetElementByName("videoqueue")
46 if err != nil {
47 return 0, err
48 }
49 audioQueue, err := pipeline.GetElementByName("audioqueue")
50 if err != nil {
51 return 0, err
52 }
53 videoQueueSrcPad := videoQueue.GetStaticPad("src")
54 if videoQueueSrcPad == nil {
55 return 0, fmt.Errorf("failed to get video queue source pad")
56 }
57 audioQueueSrcPad := audioQueue.GetStaticPad("src")
58 if audioQueueSrcPad == nil {
59 return 0, fmt.Errorf("failed to get audio queue source pad")
60 }
61
62 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
63 if ok != gst.PadLinkOK {
64 return 0, fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
65 }
66 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
67 if ok != gst.PadLinkOK {
68 return 0, fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
69 }
70
71 // Get elements
72 appsrc, err := pipeline.GetElementByName("appsrc")
73 if err != nil {
74 return 0, err
75 }
76 appsink, err := pipeline.GetElementByName("appsink")
77 if err != nil {
78 return 0, err
79 }
80
81 source := app.SrcFromElement(appsrc)
82 sink := app.SinkFromElement(appsink)
83
84 // Set up source callbacks
85 source.SetCallbacks(&app.SourceCallbacks{
86 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
87 EnoughDataFunc: func(self *app.Source) {
88 // Nothing to do here
89 },
90 SeekDataFunc: func(self *app.Source, offset uint64) bool {
91 return false // We don't support seeking
92 },
93 })
94
95 // Set up sink callbacks
96 sink.SetCallbacks(&app.SinkCallbacks{
97 NewSampleFunc: WriterNewSample(ctx, output),
98 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
99 return gst.FlowOK
100 },
101 })
102
103 ctx, cancel := context.WithCancel(ctx)
104 defer func() {
105 cancel()
106 // Clean up
107 err = pipeline.SetState(gst.StateNull)
108 if err != nil {
109 log.Error(ctx, "failed to set pipeline state to null", "error", err)
110 }
111 }()
112
113 go func() {
114 select {
115 case <-ctx.Done():
116 return
117 case <-time.After(time.Second * 10):
118 log.Debug(ctx, "pipeline is taking too long to start, cancelling")
119 err := fmt.Errorf("pipeline is taking too long to start, cancelling")
120 pipeline.Error(err.Error(), err)
121 }
122 }()
123
124 // Handle bus messages in a separate goroutine
125 errCh := make(chan error)
126 go func() {
127 err := HandleBusMessages(ctx, pipeline)
128 cancel()
129 errCh <- err
130 close(errCh)
131 }()
132
133 // Start the pipeline
134 err = pipeline.SetState(gst.StatePlaying)
135 if err != nil {
136 return 0, fmt.Errorf("failed to set pipeline state to playing: %w", err)
137 }
138
139 var durOk bool
140 var dur int64
141 busErr := <-errCh
142
143 if busErr == nil {
144 durOk, dur = pipeline.QueryDuration(gst.FormatTime)
145 if !durOk {
146 return 0, fmt.Errorf("failed to query duration")
147 }
148 }
149
150 return dur, busErr
151}
152
153// MPEGTSToMP4 converts an MPEG-TS file with H264 video and Opus audio to an MP4 file.
154// It reads from the provided reader and writes the converted MP4 to the writer.
155func MPEGTSToMP4(ctx context.Context, input io.Reader, output io.Writer) error {
156 ctx = log.WithLogValues(ctx, "func", "MPEGTSToMP4")
157 pipelineStr := strings.Join([]string{
158 "appsrc name=appsrc ! tsdemux name=demux",
159 "mp4mux name=mux ! appsink sync=false name=appsink",
160 "demux.video_0_0100 ! h264parse ! video/x-h264,stream-format=avc ! queue name=videoqueue",
161 "demux.audio_0_0101 ! opusdec ! opusenc ! queue name=audioqueue",
162 }, " ")
163
164 pipeline, err := gst.NewPipelineFromString(pipelineStr)
165 if err != nil {
166 return err
167 }
168
169 mux, err := pipeline.GetElementByName("mux")
170 if err != nil {
171 return err
172 }
173 muxVideoSinkPad := mux.GetRequestPad("video_%u")
174 if muxVideoSinkPad == nil {
175 return fmt.Errorf("failed to get video sink pad")
176 }
177 muxAudioSinkPad := mux.GetRequestPad("audio_%u")
178 if muxAudioSinkPad == nil {
179 return fmt.Errorf("failed to get audio sink pad")
180 }
181 videoQueue, err := pipeline.GetElementByName("videoqueue")
182 if err != nil {
183 return err
184 }
185 audioQueue, err := pipeline.GetElementByName("audioqueue")
186 if err != nil {
187 return err
188 }
189 videoQueueSrcPad := videoQueue.GetStaticPad("src")
190 if videoQueueSrcPad == nil {
191 return fmt.Errorf("failed to get video queue source pad")
192 }
193 audioQueueSrcPad := audioQueue.GetStaticPad("src")
194 if audioQueueSrcPad == nil {
195 return fmt.Errorf("failed to get audio queue source pad")
196 }
197
198 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
199 if ok != gst.PadLinkOK {
200 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
201 }
202 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
203 if ok != gst.PadLinkOK {
204 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
205 }
206
207 // Get elements
208 appsrc, err := pipeline.GetElementByName("appsrc")
209 if err != nil {
210 return err
211 }
212 appsink, err := pipeline.GetElementByName("appsink")
213 if err != nil {
214 return err
215 }
216
217 source := app.SrcFromElement(appsrc)
218 sink := app.SinkFromElement(appsink)
219
220 // Set up source callbacks
221 source.SetCallbacks(&app.SourceCallbacks{
222 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
223 EnoughDataFunc: func(self *app.Source) {
224 // Nothing to do here
225 },
226 SeekDataFunc: func(self *app.Source, offset uint64) bool {
227 return false // We don't support seeking
228 },
229 })
230
231 // Set up sink callbacks
232 sink.SetCallbacks(&app.SinkCallbacks{
233 NewSampleFunc: WriterNewSample(ctx, output),
234 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
235 return gst.FlowOK
236 },
237 })
238
239 ctx, cancel := context.WithCancel(ctx)
240 defer cancel()
241
242 // Handle bus messages in a separate goroutine
243 g, ctx := errgroup.WithContext(ctx)
244 g.Go(func() error {
245 HandleBusMessages(ctx, pipeline)
246 cancel()
247 return nil
248 })
249
250 // Start the pipeline
251 err = pipeline.SetState(gst.StatePlaying)
252 if err != nil {
253 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
254 }
255
256 // Wait for the pipeline to finish or context to be canceled
257 <-ctx.Done()
258
259 // durOk, dur := pipeline.QueryDuration(gst.FormatTime)
260 // if !durOk {
261 // return fmt.Errorf("failed to query duration")
262 // }
263
264 // Clean up
265 err = pipeline.SetState(gst.StateNull)
266 if err != nil {
267 return fmt.Errorf("failed to set pipeline state to null: %w", err)
268 }
269
270 return nil
271}
272
273// Splits out video into MPEG-TS and audio into MP4 (to be recombined after transcoding)
274func MP4ToMPEGTSVideoMP4Audio(ctx context.Context, input io.Reader, videoOutput io.Writer, audioOutput io.Writer) error {
275 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTSVideoMP4Audio")
276 pipelineStr := strings.Join([]string{
277 "appsrc name=appsrc ! qtdemux name=demux",
278 "mpegtsmux name=videomux ! appsink name=videoappsink sync=false",
279 "mp4mux name=audiomux ! appsink name=audioappsink sync=false",
280 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue",
281 "demux.audio_0 ! opusparse ! queue name=audioqueue",
282 }, " ")
283
284 pipeline, err := gst.NewPipelineFromString(pipelineStr)
285 if err != nil {
286 return err
287 }
288
289 videomux, err := pipeline.GetElementByName("videomux")
290 if err != nil {
291 return err
292 }
293 muxVideoSinkPad := videomux.GetRequestPad("sink_%d")
294 if muxVideoSinkPad == nil {
295 return fmt.Errorf("failed to get video sink pad")
296 }
297
298 audiomux, err := pipeline.GetElementByName("audiomux")
299 if err != nil {
300 return err
301 }
302 muxAudioSinkPad := audiomux.GetRequestPad("audio_%u")
303 if muxAudioSinkPad == nil {
304 return fmt.Errorf("failed to get audio sink pad")
305 }
306
307 videoQueue, err := pipeline.GetElementByName("videoqueue")
308 if err != nil {
309 return err
310 }
311 audioQueue, err := pipeline.GetElementByName("audioqueue")
312 if err != nil {
313 return err
314 }
315
316 videoQueueSrcPad := videoQueue.GetStaticPad("src")
317 if videoQueueSrcPad == nil {
318 return fmt.Errorf("failed to get video queue source pad")
319 }
320 audioQueueSrcPad := audioQueue.GetStaticPad("src")
321 if audioQueueSrcPad == nil {
322 return fmt.Errorf("failed to get audio queue source pad")
323 }
324
325 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
326 if ok != gst.PadLinkOK {
327 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
328 }
329 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
330 if ok != gst.PadLinkOK {
331 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
332 }
333
334 // Get elements
335 appsrc, err := pipeline.GetElementByName("appsrc")
336 if err != nil {
337 return err
338 }
339 videoappsink, err := pipeline.GetElementByName("videoappsink")
340 if err != nil {
341 return err
342 }
343 audioappsink, err := pipeline.GetElementByName("audioappsink")
344 if err != nil {
345 return err
346 }
347
348 source := app.SrcFromElement(appsrc)
349 videoSink := app.SinkFromElement(videoappsink)
350 audioSink := app.SinkFromElement(audioappsink)
351
352 // Set up source callbacks
353 source.SetCallbacks(&app.SourceCallbacks{
354 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
355 EnoughDataFunc: func(self *app.Source) {
356 // Nothing to do here
357 },
358 SeekDataFunc: func(self *app.Source, offset uint64) bool {
359 return false // We don't support seeking
360 },
361 })
362
363 // Set up sink callbacks
364 videoSink.SetCallbacks(&app.SinkCallbacks{
365 NewSampleFunc: WriterNewSample(ctx, videoOutput),
366 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
367 return gst.FlowOK
368 },
369 })
370
371 // Set up sink callbacks
372 audioSink.SetCallbacks(&app.SinkCallbacks{
373 NewSampleFunc: WriterNewSample(ctx, audioOutput),
374 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
375 return gst.FlowOK
376 },
377 })
378
379 ctx, cancel := context.WithCancel(ctx)
380 defer cancel()
381
382 // Handle bus messages in a separate goroutine
383 g, ctx := errgroup.WithContext(ctx)
384 g.Go(func() error {
385 err = HandleBusMessages(ctx, pipeline)
386 cancel()
387 return err
388 })
389
390 // Start the pipeline
391 err = pipeline.SetState(gst.StatePlaying)
392 if err != nil {
393 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
394 }
395
396 // Wait for the pipeline to finish or context to be canceled
397 <-ctx.Done()
398
399 // Clean up
400 err = pipeline.SetState(gst.StateNull)
401 if err != nil {
402 return fmt.Errorf("failed to set pipeline state to null: %w", err)
403 }
404
405 return g.Wait()
406}
407
408// Joins video and audio back together from MPEG-TS and MP4 (from transcoding)
409func MPEGTSVideoMP4AudioToMP4(ctx context.Context, videoInput io.Reader, audioInput io.Reader, output io.Writer) error {
410 pipelineStr := strings.Join([]string{
411 "appsrc name=videoappsrc ! tsdemux name=videodemux",
412 "appsrc name=audioappsrc ! qtdemux name=audiodemux",
413 "mp4mux name=mux ! appsink name=appsink sync=false",
414 "h264parse name=videoparse ! video/x-h264,stream-format=avc ! queue name=videoqueue",
415 "audiodemux.audio_0 ! opusparse ! queue name=audioqueue",
416 }, " ")
417
418 pipeline, err := gst.NewPipelineFromString(pipelineStr)
419 if err != nil {
420 return err
421 }
422
423 mux, err := pipeline.GetElementByName("mux")
424 if err != nil {
425 return err
426 }
427 muxVideoSinkPad := mux.GetRequestPad("video_%u")
428 if muxVideoSinkPad == nil {
429 return fmt.Errorf("failed to get video sink pad")
430 }
431 muxAudioSinkPad := mux.GetRequestPad("audio_%u")
432 if muxAudioSinkPad == nil {
433 return fmt.Errorf("failed to get audio sink pad")
434 }
435
436 videoQueue, err := pipeline.GetElementByName("videoqueue")
437 if err != nil {
438 return err
439 }
440 audioQueue, err := pipeline.GetElementByName("audioqueue")
441 if err != nil {
442 return err
443 }
444
445 videoQueueSrcPad := videoQueue.GetStaticPad("src")
446 if videoQueueSrcPad == nil {
447 return fmt.Errorf("failed to get video queue source pad")
448 }
449 audioQueueSrcPad := audioQueue.GetStaticPad("src")
450 if audioQueueSrcPad == nil {
451 return fmt.Errorf("failed to get audio queue source pad")
452 }
453
454 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
455 if ok != gst.PadLinkOK {
456 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
457 }
458 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
459 if ok != gst.PadLinkOK {
460 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
461 }
462
463 videodemux, err := pipeline.GetElementByName("videodemux")
464 if err != nil {
465 return err
466 }
467 videoparse, err := pipeline.GetElementByName("videoparse")
468 if err != nil {
469 return err
470 }
471 videoParseSinkPad := videoparse.GetStaticPad("sink")
472 if videoParseSinkPad == nil {
473 return fmt.Errorf("failed to get video parse sink pad")
474 }
475
476 // Get elements
477 videoappsrc, err := pipeline.GetElementByName("videoappsrc")
478 if err != nil {
479 return err
480 }
481 audioappsrc, err := pipeline.GetElementByName("audioappsrc")
482 if err != nil {
483 return err
484 }
485 appsink, err := pipeline.GetElementByName("appsink")
486 if err != nil {
487 return err
488 }
489
490 videoSource := app.SrcFromElement(videoappsrc)
491 audioSource := app.SrcFromElement(audioappsrc)
492 sink := app.SinkFromElement(appsink)
493
494 // Set up source callbacks
495 videoSource.SetCallbacks(&app.SourceCallbacks{
496 NeedDataFunc: ReaderNeedDataIncremental(ctx, videoInput),
497 EnoughDataFunc: func(self *app.Source) {
498 // Nothing to do here
499 },
500 SeekDataFunc: func(self *app.Source, offset uint64) bool {
501 return false // We don't support seeking
502 },
503 })
504
505 audioSource.SetCallbacks(&app.SourceCallbacks{
506 NeedDataFunc: ReaderNeedDataIncremental(ctx, audioInput),
507 EnoughDataFunc: func(self *app.Source) {
508 // Nothing to do here
509 },
510 SeekDataFunc: func(self *app.Source, offset uint64) bool {
511 return false // We don't support seeking
512 },
513 })
514
515 // Set up sink callbacks
516 sink.SetCallbacks(&app.SinkCallbacks{
517 NewSampleFunc: WriterNewSample(ctx, output),
518 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
519 return gst.FlowOK
520 },
521 })
522
523 ctx, cancel := context.WithCancel(ctx)
524 defer cancel()
525
526 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
527 if pad.GetDirection() == gst.PadDirectionSource {
528 ok := pad.Link(videoParseSinkPad)
529 defer func() { videoParseSinkPad = nil }()
530 if ok != gst.PadLinkOK {
531 log.Error(ctx, "failed to link video parse sink pad to video demux pad", "error", ok)
532 cancel()
533 }
534 }
535 }
536 videodemux.Connect("pad-added", onPadAdded)
537
538 // Handle bus messages in a separate goroutine
539 g, ctx := errgroup.WithContext(ctx)
540 g.Go(func() error {
541 err = HandleBusMessages(ctx, pipeline)
542 cancel()
543 return err
544 })
545
546 // Start the pipeline
547 err = pipeline.SetState(gst.StatePlaying)
548 if err != nil {
549 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
550 }
551
552 // Wait for the pipeline to finish or context to be canceled
553 <-ctx.Done()
554
555 // Clean up
556 err = pipeline.SetState(gst.StateNull)
557 if err != nil {
558 return fmt.Errorf("failed to set pipeline state to null: %w", err)
559 }
560
561 return g.Wait()
562}