Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "strings"
8 "time"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "golang.org/x/sync/errgroup"
13 "stream.place/streamplace/pkg/log"
14)
15
16// MP4ToMPEGTS converts an MP4 file with H264 video and Opus audio to an MPEG-TS file
17// It reads from the provided reader and writes the converted MPEG-TS to the writer.
18// The conversion is optimized for speed.
19func MP4ToMPEGTS(ctx context.Context, input io.Reader, output io.Writer) (int64, error) {
20 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTS")
21 pipelineStr := strings.Join([]string{
22 "appsrc name=appsrc ! qtdemux name=demux",
23 "mpegtsmux name=mux ! appsink name=appsink sync=false",
24 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue",
25 "demux.audio_0 ! opusdec name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc ! queue name=audioqueue",
26 }, " ")
27
28 pipeline, err := gst.NewPipelineFromString(pipelineStr)
29 if err != nil {
30 return 0, err
31 }
32
33 mux, err := pipeline.GetElementByName("mux")
34 if err != nil {
35 return 0, err
36 }
37 muxVideoSinkPad := mux.GetRequestPad("sink_%d")
38 if muxVideoSinkPad == nil {
39 return 0, fmt.Errorf("failed to get video sink pad")
40 }
41 muxAudioSinkPad := mux.GetRequestPad("sink_%d")
42 if muxAudioSinkPad == nil {
43 return 0, fmt.Errorf("failed to get audio sink pad")
44 }
45 videoQueue, err := pipeline.GetElementByName("videoqueue")
46 if err != nil {
47 return 0, err
48 }
49 audioQueue, err := pipeline.GetElementByName("audioqueue")
50 if err != nil {
51 return 0, err
52 }
53 videoQueueSrcPad := videoQueue.GetStaticPad("src")
54 if videoQueueSrcPad == nil {
55 return 0, fmt.Errorf("failed to get video queue source pad")
56 }
57 audioQueueSrcPad := audioQueue.GetStaticPad("src")
58 if audioQueueSrcPad == nil {
59 return 0, fmt.Errorf("failed to get audio queue source pad")
60 }
61
62 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
63 if ok != gst.PadLinkOK {
64 return 0, fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
65 }
66 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
67 if ok != gst.PadLinkOK {
68 return 0, fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
69 }
70
71 // Get elements
72 appsrc, err := pipeline.GetElementByName("appsrc")
73 if err != nil {
74 return 0, err
75 }
76 appsink, err := pipeline.GetElementByName("appsink")
77 if err != nil {
78 return 0, err
79 }
80
81 source := app.SrcFromElement(appsrc)
82 sink := app.SinkFromElement(appsink)
83
84 // Set up source callbacks
85 source.SetCallbacks(&app.SourceCallbacks{
86 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
87 EnoughDataFunc: func(self *app.Source) {
88 // Nothing to do here
89 },
90 SeekDataFunc: func(self *app.Source, offset uint64) bool {
91 return false // We don't support seeking
92 },
93 })
94
95 // Set up sink callbacks
96 sink.SetCallbacks(&app.SinkCallbacks{
97 NewSampleFunc: WriterNewSample(ctx, output),
98 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
99 return gst.FlowOK
100 },
101 })
102
103 ctx, cancel := context.WithCancel(ctx)
104 defer func() {
105 cancel()
106 // Clean up
107 err = pipeline.SetState(gst.StateNull)
108 if err != nil {
109 log.Error(ctx, "failed to set pipeline state to null", "error", err)
110 }
111 }()
112
113 go func() {
114 select {
115 case <-ctx.Done():
116 return
117 case <-time.After(time.Second * 10):
118 log.Debug(ctx, "pipeline is taking too long to start, cancelling")
119 err := fmt.Errorf("pipeline is taking too long to start, cancelling")
120 pipeline.Error(err.Error(), err)
121 }
122 }()
123
124 // Handle bus messages in a separate goroutine
125 errCh := make(chan error)
126 go func() {
127 err := HandleBusMessages(ctx, pipeline)
128 cancel()
129 errCh <- err
130 close(errCh)
131 }()
132
133 // Start the pipeline
134 err = pipeline.SetState(gst.StatePlaying)
135 if err != nil {
136 return 0, fmt.Errorf("failed to set pipeline state to playing: %w", err)
137 }
138
139 var durOk bool
140 var dur int64
141 busErr := <-errCh
142
143 if busErr == nil {
144 durOk, dur = pipeline.QueryDuration(gst.FormatTime)
145 if !durOk {
146 return 0, fmt.Errorf("failed to query duration")
147 }
148 }
149
150 return dur, busErr
151}
152
153// MPEGTSToMP4 converts an MPEG-TS file with H264 video and Opus audio to an MP4 file.
154// It reads from the provided reader and writes the converted MP4 to the writer.
155func MPEGTSToMP4(ctx context.Context, input io.Reader, output io.Writer) error {
156 ctx = log.WithLogValues(ctx, "func", "MPEGTSToMP4")
157 pipelineStr := strings.Join([]string{
158 "appsrc name=appsrc ! tsdemux name=demux",
159 "mp4mux name=mux ! appsink sync=false name=appsink",
160 "demux.video_0_0100 ! h264parse ! video/x-h264,stream-format=avc ! queue name=videoqueue",
161 "demux.audio_0_0101 ! opusdec ! opusenc ! queue name=audioqueue",
162 }, " ")
163
164 pipeline, err := gst.NewPipelineFromString(pipelineStr)
165 if err != nil {
166 return err
167 }
168
169 mux, err := pipeline.GetElementByName("mux")
170 if err != nil {
171 return err
172 }
173 muxVideoSinkPad := mux.GetRequestPad("video_%u")
174 if muxVideoSinkPad == nil {
175 return fmt.Errorf("failed to get video sink pad")
176 }
177 muxAudioSinkPad := mux.GetRequestPad("audio_%u")
178 if muxAudioSinkPad == nil {
179 return fmt.Errorf("failed to get audio sink pad")
180 }
181 videoQueue, err := pipeline.GetElementByName("videoqueue")
182 if err != nil {
183 return err
184 }
185 audioQueue, err := pipeline.GetElementByName("audioqueue")
186 if err != nil {
187 return err
188 }
189 videoQueueSrcPad := videoQueue.GetStaticPad("src")
190 if videoQueueSrcPad == nil {
191 return fmt.Errorf("failed to get video queue source pad")
192 }
193 audioQueueSrcPad := audioQueue.GetStaticPad("src")
194 if audioQueueSrcPad == nil {
195 return fmt.Errorf("failed to get audio queue source pad")
196 }
197
198 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
199 if ok != gst.PadLinkOK {
200 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
201 }
202 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
203 if ok != gst.PadLinkOK {
204 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
205 }
206
207 // Get elements
208 appsrc, err := pipeline.GetElementByName("appsrc")
209 if err != nil {
210 return err
211 }
212 appsink, err := pipeline.GetElementByName("appsink")
213 if err != nil {
214 return err
215 }
216
217 source := app.SrcFromElement(appsrc)
218 sink := app.SinkFromElement(appsink)
219
220 // Set up source callbacks
221 source.SetCallbacks(&app.SourceCallbacks{
222 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
223 EnoughDataFunc: func(self *app.Source) {
224 // Nothing to do here
225 },
226 SeekDataFunc: func(self *app.Source, offset uint64) bool {
227 return false // We don't support seeking
228 },
229 })
230
231 // Set up sink callbacks
232 sink.SetCallbacks(&app.SinkCallbacks{
233 NewSampleFunc: WriterNewSample(ctx, output),
234 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
235 return gst.FlowOK
236 },
237 })
238
239 ctx, cancel := context.WithCancel(ctx)
240 defer cancel()
241
242 // Handle bus messages in a separate goroutine
243 g, ctx := errgroup.WithContext(ctx)
244 g.Go(func() error {
245 if err := HandleBusMessages(ctx, pipeline); err != nil {
246 log.Log(ctx, "pipeline error", "error", err)
247 }
248 cancel()
249 return nil
250 })
251
252 // Start the pipeline
253 err = pipeline.SetState(gst.StatePlaying)
254 if err != nil {
255 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
256 }
257
258 // Wait for the pipeline to finish or context to be canceled
259 <-ctx.Done()
260
261 // durOk, dur := pipeline.QueryDuration(gst.FormatTime)
262 // if !durOk {
263 // return fmt.Errorf("failed to query duration")
264 // }
265
266 // Clean up
267 err = pipeline.SetState(gst.StateNull)
268 if err != nil {
269 return fmt.Errorf("failed to set pipeline state to null: %w", err)
270 }
271
272 return nil
273}
274
275// Splits out video into MPEG-TS and audio into MP4 (to be recombined after transcoding)
276func MP4ToMPEGTSVideoMP4Audio(ctx context.Context, input io.Reader, videoOutput io.Writer, audioOutput io.Writer) error {
277 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTSVideoMP4Audio")
278 pipelineStr := strings.Join([]string{
279 "appsrc name=appsrc ! qtdemux name=demux",
280 "mpegtsmux name=videomux ! appsink name=videoappsink sync=false",
281 "mp4mux name=audiomux ! appsink name=audioappsink sync=false",
282 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue",
283 "demux.audio_0 ! opusparse ! queue name=audioqueue",
284 }, " ")
285
286 pipeline, err := gst.NewPipelineFromString(pipelineStr)
287 if err != nil {
288 return err
289 }
290
291 videomux, err := pipeline.GetElementByName("videomux")
292 if err != nil {
293 return err
294 }
295 muxVideoSinkPad := videomux.GetRequestPad("sink_%d")
296 if muxVideoSinkPad == nil {
297 return fmt.Errorf("failed to get video sink pad")
298 }
299
300 audiomux, err := pipeline.GetElementByName("audiomux")
301 if err != nil {
302 return err
303 }
304 muxAudioSinkPad := audiomux.GetRequestPad("audio_%u")
305 if muxAudioSinkPad == nil {
306 return fmt.Errorf("failed to get audio sink pad")
307 }
308
309 videoQueue, err := pipeline.GetElementByName("videoqueue")
310 if err != nil {
311 return err
312 }
313 audioQueue, err := pipeline.GetElementByName("audioqueue")
314 if err != nil {
315 return err
316 }
317
318 videoQueueSrcPad := videoQueue.GetStaticPad("src")
319 if videoQueueSrcPad == nil {
320 return fmt.Errorf("failed to get video queue source pad")
321 }
322 audioQueueSrcPad := audioQueue.GetStaticPad("src")
323 if audioQueueSrcPad == nil {
324 return fmt.Errorf("failed to get audio queue source pad")
325 }
326
327 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
328 if ok != gst.PadLinkOK {
329 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
330 }
331 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
332 if ok != gst.PadLinkOK {
333 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
334 }
335
336 // Get elements
337 appsrc, err := pipeline.GetElementByName("appsrc")
338 if err != nil {
339 return err
340 }
341 videoappsink, err := pipeline.GetElementByName("videoappsink")
342 if err != nil {
343 return err
344 }
345 audioappsink, err := pipeline.GetElementByName("audioappsink")
346 if err != nil {
347 return err
348 }
349
350 source := app.SrcFromElement(appsrc)
351 videoSink := app.SinkFromElement(videoappsink)
352 audioSink := app.SinkFromElement(audioappsink)
353
354 // Set up source callbacks
355 source.SetCallbacks(&app.SourceCallbacks{
356 NeedDataFunc: ReaderNeedDataIncremental(ctx, input),
357 EnoughDataFunc: func(self *app.Source) {
358 // Nothing to do here
359 },
360 SeekDataFunc: func(self *app.Source, offset uint64) bool {
361 return false // We don't support seeking
362 },
363 })
364
365 // Set up sink callbacks
366 videoSink.SetCallbacks(&app.SinkCallbacks{
367 NewSampleFunc: WriterNewSample(ctx, videoOutput),
368 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
369 return gst.FlowOK
370 },
371 })
372
373 // Set up sink callbacks
374 audioSink.SetCallbacks(&app.SinkCallbacks{
375 NewSampleFunc: WriterNewSample(ctx, audioOutput),
376 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
377 return gst.FlowOK
378 },
379 })
380
381 ctx, cancel := context.WithCancel(ctx)
382 defer cancel()
383
384 // Handle bus messages in a separate goroutine
385 g, ctx := errgroup.WithContext(ctx)
386 g.Go(func() error {
387 err = HandleBusMessages(ctx, pipeline)
388 cancel()
389 return err
390 })
391
392 // Start the pipeline
393 err = pipeline.SetState(gst.StatePlaying)
394 if err != nil {
395 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
396 }
397
398 // Wait for the pipeline to finish or context to be canceled
399 <-ctx.Done()
400
401 // Clean up
402 err = pipeline.SetState(gst.StateNull)
403 if err != nil {
404 return fmt.Errorf("failed to set pipeline state to null: %w", err)
405 }
406
407 return g.Wait()
408}
409
410// Joins video and audio back together from MPEG-TS and MP4 (from transcoding)
411func MPEGTSVideoMP4AudioToMP4(ctx context.Context, videoInput io.Reader, audioInput io.Reader, output io.Writer) error {
412 pipelineStr := strings.Join([]string{
413 "appsrc name=videoappsrc ! tsdemux name=videodemux",
414 "appsrc name=audioappsrc ! qtdemux name=audiodemux",
415 "mp4mux name=mux ! appsink name=appsink sync=false",
416 "h264parse name=videoparse ! video/x-h264,stream-format=avc ! queue name=videoqueue",
417 "audiodemux.audio_0 ! opusparse ! queue name=audioqueue",
418 }, " ")
419
420 pipeline, err := gst.NewPipelineFromString(pipelineStr)
421 if err != nil {
422 return err
423 }
424
425 mux, err := pipeline.GetElementByName("mux")
426 if err != nil {
427 return err
428 }
429 muxVideoSinkPad := mux.GetRequestPad("video_%u")
430 if muxVideoSinkPad == nil {
431 return fmt.Errorf("failed to get video sink pad")
432 }
433 muxAudioSinkPad := mux.GetRequestPad("audio_%u")
434 if muxAudioSinkPad == nil {
435 return fmt.Errorf("failed to get audio sink pad")
436 }
437
438 videoQueue, err := pipeline.GetElementByName("videoqueue")
439 if err != nil {
440 return err
441 }
442 audioQueue, err := pipeline.GetElementByName("audioqueue")
443 if err != nil {
444 return err
445 }
446
447 videoQueueSrcPad := videoQueue.GetStaticPad("src")
448 if videoQueueSrcPad == nil {
449 return fmt.Errorf("failed to get video queue source pad")
450 }
451 audioQueueSrcPad := audioQueue.GetStaticPad("src")
452 if audioQueueSrcPad == nil {
453 return fmt.Errorf("failed to get audio queue source pad")
454 }
455
456 ok := videoQueueSrcPad.Link(muxVideoSinkPad)
457 if ok != gst.PadLinkOK {
458 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok)
459 }
460 ok = audioQueueSrcPad.Link(muxAudioSinkPad)
461 if ok != gst.PadLinkOK {
462 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok)
463 }
464
465 videodemux, err := pipeline.GetElementByName("videodemux")
466 if err != nil {
467 return err
468 }
469 videoparse, err := pipeline.GetElementByName("videoparse")
470 if err != nil {
471 return err
472 }
473 videoParseSinkPad := videoparse.GetStaticPad("sink")
474 if videoParseSinkPad == nil {
475 return fmt.Errorf("failed to get video parse sink pad")
476 }
477
478 // Get elements
479 videoappsrc, err := pipeline.GetElementByName("videoappsrc")
480 if err != nil {
481 return err
482 }
483 audioappsrc, err := pipeline.GetElementByName("audioappsrc")
484 if err != nil {
485 return err
486 }
487 appsink, err := pipeline.GetElementByName("appsink")
488 if err != nil {
489 return err
490 }
491
492 videoSource := app.SrcFromElement(videoappsrc)
493 audioSource := app.SrcFromElement(audioappsrc)
494 sink := app.SinkFromElement(appsink)
495
496 // Set up source callbacks
497 videoSource.SetCallbacks(&app.SourceCallbacks{
498 NeedDataFunc: ReaderNeedDataIncremental(ctx, videoInput),
499 EnoughDataFunc: func(self *app.Source) {
500 // Nothing to do here
501 },
502 SeekDataFunc: func(self *app.Source, offset uint64) bool {
503 return false // We don't support seeking
504 },
505 })
506
507 audioSource.SetCallbacks(&app.SourceCallbacks{
508 NeedDataFunc: ReaderNeedDataIncremental(ctx, audioInput),
509 EnoughDataFunc: func(self *app.Source) {
510 // Nothing to do here
511 },
512 SeekDataFunc: func(self *app.Source, offset uint64) bool {
513 return false // We don't support seeking
514 },
515 })
516
517 // Set up sink callbacks
518 sink.SetCallbacks(&app.SinkCallbacks{
519 NewSampleFunc: WriterNewSample(ctx, output),
520 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn {
521 return gst.FlowOK
522 },
523 })
524
525 ctx, cancel := context.WithCancel(ctx)
526 defer cancel()
527
528 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
529 if pad.GetDirection() == gst.PadDirectionSource {
530 ok := pad.Link(videoParseSinkPad)
531 if ok != gst.PadLinkOK {
532 log.Error(ctx, "failed to link video parse sink pad to video demux pad", "error", ok)
533 cancel()
534 }
535 }
536 }
537 if _, err := videodemux.Connect("pad-added", onPadAdded); err != nil {
538 return fmt.Errorf("failed connect pad-added handler: %w", err)
539 }
540
541 errCh := make(chan error)
542 go func() {
543 err = HandleBusMessages(ctx, pipeline)
544 cancel()
545 errCh <- err
546 }()
547
548 // Start the pipeline
549 err = pipeline.SetState(gst.StatePlaying)
550 if err != nil {
551 return fmt.Errorf("failed to set pipeline state to playing: %w", err)
552 }
553
554 defer func() {
555 err = pipeline.SetState(gst.StateNull)
556 if err != nil {
557 log.Error(ctx, "failed to set pipeline state to null", "error", err)
558 }
559 videoParseSinkPad = nil
560 }()
561
562 return <-errCh
563}