Live video on the AT Protocol
at next 583 lines 16 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "strings" 8 "time" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "golang.org/x/sync/errgroup" 13 "stream.place/streamplace/pkg/log" 14) 15 16// MP4ToMPEGTS converts an MP4 file with H264 video and Opus audio to an MPEG-TS file 17// It reads from the provided reader and writes the converted MPEG-TS to the writer. 18// The conversion is optimized for speed. 19func MP4ToMPEGTS(ctx context.Context, input io.Reader, output io.Writer) (int64, error) { 20 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTS") 21 pipelineStr := strings.Join([]string{ 22 "appsrc name=appsrc ! qtdemux name=demux", 23 "mpegtsmux name=mux ! appsink name=appsink sync=false", 24 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue", 25 "demux.audio_0 ! opusdec name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc ! queue name=audioqueue", 26 }, " ") 27 28 pipeline, err := gst.NewPipelineFromString(pipelineStr) 29 if err != nil { 30 return 0, err 31 } 32 33 mux, err := pipeline.GetElementByName("mux") 34 if err != nil { 35 return 0, err 36 } 37 muxVideoSinkPad := mux.GetRequestPad("sink_%d") 38 if muxVideoSinkPad == nil { 39 return 0, fmt.Errorf("failed to get video sink pad") 40 } 41 muxAudioSinkPad := mux.GetRequestPad("sink_%d") 42 if muxAudioSinkPad == nil { 43 return 0, fmt.Errorf("failed to get audio sink pad") 44 } 45 videoQueue, err := pipeline.GetElementByName("videoqueue") 46 if err != nil { 47 return 0, err 48 } 49 audioQueue, err := pipeline.GetElementByName("audioqueue") 50 if err != nil { 51 return 0, err 52 } 53 videoQueueSrcPad := videoQueue.GetStaticPad("src") 54 if videoQueueSrcPad == nil { 55 return 0, fmt.Errorf("failed to get video queue source pad") 56 } 57 audioQueueSrcPad := audioQueue.GetStaticPad("src") 58 if audioQueueSrcPad == nil { 59 return 0, fmt.Errorf("failed to get audio queue source pad") 60 } 61 62 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 63 if ok != gst.PadLinkOK { 64 return 0, fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 65 } 66 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 67 if ok != gst.PadLinkOK { 68 return 0, fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 69 } 70 71 // Get elements 72 appsrc, err := pipeline.GetElementByName("appsrc") 73 if err != nil { 74 return 0, err 75 } 76 appsink, err := pipeline.GetElementByName("appsink") 77 if err != nil { 78 return 0, err 79 } 80 81 source := app.SrcFromElement(appsrc) 82 sink := app.SinkFromElement(appsink) 83 84 // Set up source callbacks 85 source.SetCallbacks(&app.SourceCallbacks{ 86 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 87 EnoughDataFunc: func(self *app.Source) { 88 // Nothing to do here 89 }, 90 SeekDataFunc: func(self *app.Source, offset uint64) bool { 91 return false // We don't support seeking 92 }, 93 }) 94 95 // Set up sink callbacks 96 sink.SetCallbacks(&app.SinkCallbacks{ 97 NewSampleFunc: WriterNewSample(ctx, output), 98 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 99 return gst.FlowOK 100 }, 101 }) 102 103 ctx, cancel := context.WithCancel(ctx) 104 defer func() { 105 cancel() 106 // Clean up 107 err = pipeline.SetState(gst.StateNull) 108 if err != nil { 109 log.Error(ctx, "failed to set pipeline state to null", "error", err) 110 } 111 }() 112 113 go func() { 114 select { 115 case <-ctx.Done(): 116 return 117 case <-time.After(time.Second * 10): 118 log.Debug(ctx, "pipeline is taking too long to start, cancelling") 119 err := fmt.Errorf("pipeline is taking too long to start, cancelling") 120 pipeline.Error(err.Error(), err) 121 } 122 }() 123 124 // Handle bus messages in a separate goroutine 125 errCh := make(chan error) 126 go func() { 127 err := HandleBusMessages(ctx, pipeline) 128 cancel() 129 errCh <- err 130 close(errCh) 131 }() 132 133 // Start the pipeline 134 err = pipeline.SetState(gst.StatePlaying) 135 if err != nil { 136 return 0, fmt.Errorf("failed to set pipeline state to playing: %w", err) 137 } 138 139 var durOk bool 140 var dur int64 141 busErr := <-errCh 142 143 if busErr == nil { 144 durOk, dur = pipeline.QueryDuration(gst.FormatTime) 145 if !durOk { 146 return 0, fmt.Errorf("failed to query duration") 147 } 148 } 149 150 return dur, busErr 151} 152 153// MPEGTSToMP4 converts an MPEG-TS file with H264 video and Opus audio to an MP4 file. 154// It reads from the provided reader and writes the converted MP4 to the writer. 155func MPEGTSToMP4(ctx context.Context, input io.Reader, output io.Writer) error { 156 ctx = log.WithLogValues(ctx, "func", "MPEGTSToMP4") 157 pipelineStr := strings.Join([]string{ 158 "appsrc name=appsrc ! tsdemux name=demux", 159 "mp4mux name=mux ! appsink sync=false name=appsink", 160 "demux.video_0_0100 ! h264parse ! video/x-h264,stream-format=avc ! queue name=videoqueue", 161 "demux.audio_0_0101 ! opusdec ! opusenc ! queue name=audioqueue", 162 }, " ") 163 164 pipeline, err := gst.NewPipelineFromString(pipelineStr) 165 if err != nil { 166 return err 167 } 168 169 mux, err := pipeline.GetElementByName("mux") 170 if err != nil { 171 return err 172 } 173 muxVideoSinkPad := mux.GetRequestPad("video_%u") 174 if muxVideoSinkPad == nil { 175 return fmt.Errorf("failed to get video sink pad") 176 } 177 muxAudioSinkPad := mux.GetRequestPad("audio_%u") 178 if muxAudioSinkPad == nil { 179 return fmt.Errorf("failed to get audio sink pad") 180 } 181 videoQueue, err := pipeline.GetElementByName("videoqueue") 182 if err != nil { 183 return err 184 } 185 audioQueue, err := pipeline.GetElementByName("audioqueue") 186 if err != nil { 187 return err 188 } 189 videoQueueSrcPad := videoQueue.GetStaticPad("src") 190 if videoQueueSrcPad == nil { 191 return fmt.Errorf("failed to get video queue source pad") 192 } 193 audioQueueSrcPad := audioQueue.GetStaticPad("src") 194 if audioQueueSrcPad == nil { 195 return fmt.Errorf("failed to get audio queue source pad") 196 } 197 198 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 199 if ok != gst.PadLinkOK { 200 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 201 } 202 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 203 if ok != gst.PadLinkOK { 204 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 205 } 206 207 // Get elements 208 appsrc, err := pipeline.GetElementByName("appsrc") 209 if err != nil { 210 return err 211 } 212 appsink, err := pipeline.GetElementByName("appsink") 213 if err != nil { 214 return err 215 } 216 217 source := app.SrcFromElement(appsrc) 218 sink := app.SinkFromElement(appsink) 219 220 // Set up source callbacks 221 source.SetCallbacks(&app.SourceCallbacks{ 222 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 223 EnoughDataFunc: func(self *app.Source) { 224 // Nothing to do here 225 }, 226 SeekDataFunc: func(self *app.Source, offset uint64) bool { 227 return false // We don't support seeking 228 }, 229 }) 230 231 // Set up sink callbacks 232 sink.SetCallbacks(&app.SinkCallbacks{ 233 NewSampleFunc: WriterNewSample(ctx, output), 234 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 235 return gst.FlowOK 236 }, 237 }) 238 239 ctx, cancel := context.WithCancel(ctx) 240 defer cancel() 241 242 busErr := make(chan error) 243 go func() { 244 err := HandleBusMessages(ctx, pipeline) 245 busErr <- err 246 }() 247 248 err = pipeline.SetState(gst.StatePlaying) 249 if err != nil { 250 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 251 } 252 253 defer func() { 254 err = pipeline.SetState(gst.StateNull) 255 if err != nil { 256 log.Error(ctx, "failed to set pipeline state to null", "error", err) 257 } 258 }() 259 260 return <-busErr 261} 262 263// Splits out video into MPEG-TS and audio into MP4 (to be recombined after transcoding) 264func MP4ToMPEGTSVideoMP4Audio(ctx context.Context, input io.Reader, videoOutput io.Writer, audioOutput io.Writer) error { 265 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTSVideoMP4Audio") 266 pipelineStr := strings.Join([]string{ 267 "appsrc name=appsrc ! qtdemux name=demux", 268 "mpegtsmux name=videomux ! appsink name=videoappsink sync=false", 269 "mp4mux name=audiomux ! appsink name=audioappsink sync=false", 270 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue", 271 "demux.audio_0 ! opusparse ! queue name=audioqueue", 272 }, " ") 273 274 pipeline, err := gst.NewPipelineFromString(pipelineStr) 275 if err != nil { 276 return err 277 } 278 279 videomux, err := pipeline.GetElementByName("videomux") 280 if err != nil { 281 return err 282 } 283 muxVideoSinkPad := videomux.GetRequestPad("sink_%d") 284 if muxVideoSinkPad == nil { 285 return fmt.Errorf("failed to get video sink pad") 286 } 287 288 audiomux, err := pipeline.GetElementByName("audiomux") 289 if err != nil { 290 return err 291 } 292 muxAudioSinkPad := audiomux.GetRequestPad("audio_%u") 293 if muxAudioSinkPad == nil { 294 return fmt.Errorf("failed to get audio sink pad") 295 } 296 297 videoQueue, err := pipeline.GetElementByName("videoqueue") 298 if err != nil { 299 return err 300 } 301 audioQueue, err := pipeline.GetElementByName("audioqueue") 302 if err != nil { 303 return err 304 } 305 306 videoQueueSrcPad := videoQueue.GetStaticPad("src") 307 if videoQueueSrcPad == nil { 308 return fmt.Errorf("failed to get video queue source pad") 309 } 310 audioQueueSrcPad := audioQueue.GetStaticPad("src") 311 if audioQueueSrcPad == nil { 312 return fmt.Errorf("failed to get audio queue source pad") 313 } 314 315 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 316 if ok != gst.PadLinkOK { 317 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 318 } 319 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 320 if ok != gst.PadLinkOK { 321 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 322 } 323 324 // Get elements 325 appsrc, err := pipeline.GetElementByName("appsrc") 326 if err != nil { 327 return err 328 } 329 videoappsink, err := pipeline.GetElementByName("videoappsink") 330 if err != nil { 331 return err 332 } 333 audioappsink, err := pipeline.GetElementByName("audioappsink") 334 if err != nil { 335 return err 336 } 337 338 source := app.SrcFromElement(appsrc) 339 videoSink := app.SinkFromElement(videoappsink) 340 audioSink := app.SinkFromElement(audioappsink) 341 342 // Set up source callbacks 343 source.SetCallbacks(&app.SourceCallbacks{ 344 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 345 EnoughDataFunc: func(self *app.Source) { 346 // Nothing to do here 347 }, 348 SeekDataFunc: func(self *app.Source, offset uint64) bool { 349 return false // We don't support seeking 350 }, 351 }) 352 353 // Set up sink callbacks 354 videoSink.SetCallbacks(&app.SinkCallbacks{ 355 NewSampleFunc: WriterNewSample(ctx, videoOutput), 356 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 357 return gst.FlowOK 358 }, 359 }) 360 361 // Set up sink callbacks 362 audioSink.SetCallbacks(&app.SinkCallbacks{ 363 NewSampleFunc: WriterNewSample(ctx, audioOutput), 364 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 365 return gst.FlowOK 366 }, 367 }) 368 369 ctx, cancel := context.WithCancel(ctx) 370 defer cancel() 371 372 // Handle bus messages in a separate goroutine 373 g, ctx := errgroup.WithContext(ctx) 374 g.Go(func() error { 375 err = HandleBusMessages(ctx, pipeline) 376 cancel() 377 return err 378 }) 379 380 // Start the pipeline 381 err = pipeline.SetState(gst.StatePlaying) 382 if err != nil { 383 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 384 } 385 386 // Wait for the pipeline to finish or context to be canceled 387 <-ctx.Done() 388 389 // Clean up 390 err = pipeline.SetState(gst.StateNull) 391 if err != nil { 392 return fmt.Errorf("failed to set pipeline state to null: %w", err) 393 } 394 395 return g.Wait() 396} 397 398// Joins video and audio back together from MPEG-TS and MP4 (from transcoding) 399func MPEGTSVideoMP4AudioToMP4(ctx context.Context, videoInput io.Reader, audioInput io.Reader, output io.Writer) error { 400 pipelineStr := strings.Join([]string{ 401 "appsrc name=videoappsrc ! tsdemux name=videodemux", 402 "appsrc name=audioappsrc ! qtdemux name=audiodemux", 403 "mp4mux name=mux ! appsink name=appsink sync=false", 404 "h264parse name=videoparse ! video/x-h264,stream-format=avc ! queue name=videoqueue", 405 "audiodemux.audio_0 ! opusparse ! queue name=audioqueue", 406 }, " ") 407 408 pipeline, err := gst.NewPipelineFromString(pipelineStr) 409 if err != nil { 410 return err 411 } 412 413 mux, err := pipeline.GetElementByName("mux") 414 if err != nil { 415 return err 416 } 417 muxVideoSinkPad := mux.GetRequestPad("video_%u") 418 if muxVideoSinkPad == nil { 419 return fmt.Errorf("failed to get video sink pad") 420 } 421 muxAudioSinkPad := mux.GetRequestPad("audio_%u") 422 if muxAudioSinkPad == nil { 423 return fmt.Errorf("failed to get audio sink pad") 424 } 425 426 videoQueue, err := pipeline.GetElementByName("videoqueue") 427 if err != nil { 428 return err 429 } 430 audioQueue, err := pipeline.GetElementByName("audioqueue") 431 if err != nil { 432 return err 433 } 434 435 videoQueueSrcPad := videoQueue.GetStaticPad("src") 436 if videoQueueSrcPad == nil { 437 return fmt.Errorf("failed to get video queue source pad") 438 } 439 audioQueueSrcPad := audioQueue.GetStaticPad("src") 440 if audioQueueSrcPad == nil { 441 return fmt.Errorf("failed to get audio queue source pad") 442 } 443 444 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 445 if ok != gst.PadLinkOK { 446 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 447 } 448 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 449 if ok != gst.PadLinkOK { 450 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 451 } 452 453 videodemux, err := pipeline.GetElementByName("videodemux") 454 if err != nil { 455 return err 456 } 457 videoparse, err := pipeline.GetElementByName("videoparse") 458 if err != nil { 459 return err 460 } 461 videoParseSinkPad := videoparse.GetStaticPad("sink") 462 if videoParseSinkPad == nil { 463 return fmt.Errorf("failed to get video parse sink pad") 464 } 465 466 // Get elements 467 videoappsrc, err := pipeline.GetElementByName("videoappsrc") 468 if err != nil { 469 return err 470 } 471 audioappsrc, err := pipeline.GetElementByName("audioappsrc") 472 if err != nil { 473 return err 474 } 475 appsink, err := pipeline.GetElementByName("appsink") 476 if err != nil { 477 return err 478 } 479 480 videoSource := app.SrcFromElement(videoappsrc) 481 audioSource := app.SrcFromElement(audioappsrc) 482 sink := app.SinkFromElement(appsink) 483 484 // Set up source callbacks 485 videoSource.SetCallbacks(&app.SourceCallbacks{ 486 NeedDataFunc: ReaderNeedDataIncremental(ctx, videoInput), 487 EnoughDataFunc: func(self *app.Source) { 488 // Nothing to do here 489 }, 490 SeekDataFunc: func(self *app.Source, offset uint64) bool { 491 return false // We don't support seeking 492 }, 493 }) 494 495 audioSource.SetCallbacks(&app.SourceCallbacks{ 496 NeedDataFunc: ReaderNeedDataIncremental(ctx, audioInput), 497 EnoughDataFunc: func(self *app.Source) { 498 // Nothing to do here 499 }, 500 SeekDataFunc: func(self *app.Source, offset uint64) bool { 501 return false // We don't support seeking 502 }, 503 }) 504 505 wroteAnything := false 506 507 // Set up sink callbacks 508 sink.SetCallbacks(&app.SinkCallbacks{ 509 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 510 sample := sink.PullSample() 511 if sample == nil { 512 return gst.FlowOK 513 } 514 515 // Retrieve the buffer from the sample. 516 buffer := sample.GetBuffer() 517 bs := buffer.Map(gst.MapRead).Bytes() 518 defer buffer.Unmap() 519 520 _, err := output.Write(bs) 521 522 if err != nil { 523 log.Error(ctx, "error writing to output", "error", err) 524 return gst.FlowError 525 } 526 527 wroteAnything = true 528 529 return gst.FlowOK 530 }, 531 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 532 return gst.FlowOK 533 }, 534 }) 535 536 ctx, cancel := context.WithCancel(ctx) 537 defer cancel() 538 539 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 540 if pad.GetDirection() == gst.PadDirectionSource { 541 ok := pad.Link(videoParseSinkPad) 542 if ok != gst.PadLinkOK { 543 log.Error(ctx, "failed to link video parse sink pad to video demux pad", "error", ok) 544 cancel() 545 } 546 } 547 } 548 if _, err := videodemux.Connect("pad-added", onPadAdded); err != nil { 549 return fmt.Errorf("failed connect pad-added handler: %w", err) 550 } 551 552 errCh := make(chan error) 553 go func() { 554 err = HandleBusMessages(ctx, pipeline) 555 cancel() 556 errCh <- err 557 }() 558 559 // Start the pipeline 560 err = pipeline.SetState(gst.StatePlaying) 561 if err != nil { 562 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 563 } 564 565 defer func() { 566 err = pipeline.SetState(gst.StateNull) 567 if err != nil { 568 log.Error(ctx, "failed to set pipeline state to null", "error", err) 569 } 570 videoParseSinkPad = nil 571 }() 572 573 err = <-errCh 574 if err != nil { 575 return fmt.Errorf("pipeline error: %w", err) 576 } 577 578 if !wroteAnything { 579 return fmt.Errorf("no data written to output") 580 } 581 582 return nil 583}