Live video on the AT Protocol
at eli/fix-gitlab 566 lines 16 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "strings" 8 "time" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "golang.org/x/sync/errgroup" 13 "stream.place/streamplace/pkg/log" 14) 15 16// MP4ToMPEGTS converts an MP4 file with H264 video and Opus audio to an MPEG-TS file 17// It reads from the provided reader and writes the converted MPEG-TS to the writer. 18// The conversion is optimized for speed. 19func MP4ToMPEGTS(ctx context.Context, input io.Reader, output io.Writer) (int64, error) { 20 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTS") 21 pipelineStr := strings.Join([]string{ 22 "appsrc name=appsrc ! qtdemux name=demux", 23 "mpegtsmux name=mux ! appsink name=appsink sync=false", 24 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue", 25 "demux.audio_0 ! opusdec name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc ! queue name=audioqueue", 26 }, " ") 27 28 pipeline, err := gst.NewPipelineFromString(pipelineStr) 29 if err != nil { 30 return 0, err 31 } 32 33 mux, err := pipeline.GetElementByName("mux") 34 if err != nil { 35 return 0, err 36 } 37 muxVideoSinkPad := mux.GetRequestPad("sink_%d") 38 if muxVideoSinkPad == nil { 39 return 0, fmt.Errorf("failed to get video sink pad") 40 } 41 muxAudioSinkPad := mux.GetRequestPad("sink_%d") 42 if muxAudioSinkPad == nil { 43 return 0, fmt.Errorf("failed to get audio sink pad") 44 } 45 videoQueue, err := pipeline.GetElementByName("videoqueue") 46 if err != nil { 47 return 0, err 48 } 49 audioQueue, err := pipeline.GetElementByName("audioqueue") 50 if err != nil { 51 return 0, err 52 } 53 videoQueueSrcPad := videoQueue.GetStaticPad("src") 54 if videoQueueSrcPad == nil { 55 return 0, fmt.Errorf("failed to get video queue source pad") 56 } 57 audioQueueSrcPad := audioQueue.GetStaticPad("src") 58 if audioQueueSrcPad == nil { 59 return 0, fmt.Errorf("failed to get audio queue source pad") 60 } 61 62 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 63 if ok != gst.PadLinkOK { 64 return 0, fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 65 } 66 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 67 if ok != gst.PadLinkOK { 68 return 0, fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 69 } 70 71 // Get elements 72 appsrc, err := pipeline.GetElementByName("appsrc") 73 if err != nil { 74 return 0, err 75 } 76 appsink, err := pipeline.GetElementByName("appsink") 77 if err != nil { 78 return 0, err 79 } 80 81 source := app.SrcFromElement(appsrc) 82 sink := app.SinkFromElement(appsink) 83 84 // Set up source callbacks 85 source.SetCallbacks(&app.SourceCallbacks{ 86 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 87 EnoughDataFunc: func(self *app.Source) { 88 // Nothing to do here 89 }, 90 SeekDataFunc: func(self *app.Source, offset uint64) bool { 91 return false // We don't support seeking 92 }, 93 }) 94 95 // Set up sink callbacks 96 sink.SetCallbacks(&app.SinkCallbacks{ 97 NewSampleFunc: WriterNewSample(ctx, output), 98 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 99 return gst.FlowOK 100 }, 101 }) 102 103 ctx, cancel := context.WithCancel(ctx) 104 defer func() { 105 cancel() 106 // Clean up 107 err = pipeline.SetState(gst.StateNull) 108 if err != nil { 109 log.Error(ctx, "failed to set pipeline state to null", "error", err) 110 } 111 }() 112 113 go func() { 114 select { 115 case <-ctx.Done(): 116 return 117 case <-time.After(time.Second * 10): 118 log.Debug(ctx, "pipeline is taking too long to start, cancelling") 119 err := fmt.Errorf("pipeline is taking too long to start, cancelling") 120 pipeline.Error(err.Error(), err) 121 } 122 }() 123 124 // Handle bus messages in a separate goroutine 125 errCh := make(chan error) 126 go func() { 127 err := HandleBusMessages(ctx, pipeline) 128 cancel() 129 errCh <- err 130 close(errCh) 131 }() 132 133 // Start the pipeline 134 err = pipeline.SetState(gst.StatePlaying) 135 if err != nil { 136 return 0, fmt.Errorf("failed to set pipeline state to playing: %w", err) 137 } 138 139 var durOk bool 140 var dur int64 141 busErr := <-errCh 142 143 if busErr == nil { 144 durOk, dur = pipeline.QueryDuration(gst.FormatTime) 145 if !durOk { 146 return 0, fmt.Errorf("failed to query duration") 147 } 148 } 149 150 return dur, busErr 151} 152 153// MPEGTSToMP4 converts an MPEG-TS file with H264 video and Opus audio to an MP4 file. 154// It reads from the provided reader and writes the converted MP4 to the writer. 155func MPEGTSToMP4(ctx context.Context, input io.Reader, output io.Writer) error { 156 ctx = log.WithLogValues(ctx, "func", "MPEGTSToMP4") 157 pipelineStr := strings.Join([]string{ 158 "appsrc name=appsrc ! tsdemux name=demux", 159 "mp4mux name=mux ! appsink sync=false name=appsink", 160 "demux.video_0_0100 ! h264parse ! video/x-h264,stream-format=avc ! queue name=videoqueue", 161 "demux.audio_0_0101 ! opusdec ! opusenc ! queue name=audioqueue", 162 }, " ") 163 164 pipeline, err := gst.NewPipelineFromString(pipelineStr) 165 if err != nil { 166 return err 167 } 168 169 mux, err := pipeline.GetElementByName("mux") 170 if err != nil { 171 return err 172 } 173 muxVideoSinkPad := mux.GetRequestPad("video_%u") 174 if muxVideoSinkPad == nil { 175 return fmt.Errorf("failed to get video sink pad") 176 } 177 muxAudioSinkPad := mux.GetRequestPad("audio_%u") 178 if muxAudioSinkPad == nil { 179 return fmt.Errorf("failed to get audio sink pad") 180 } 181 videoQueue, err := pipeline.GetElementByName("videoqueue") 182 if err != nil { 183 return err 184 } 185 audioQueue, err := pipeline.GetElementByName("audioqueue") 186 if err != nil { 187 return err 188 } 189 videoQueueSrcPad := videoQueue.GetStaticPad("src") 190 if videoQueueSrcPad == nil { 191 return fmt.Errorf("failed to get video queue source pad") 192 } 193 audioQueueSrcPad := audioQueue.GetStaticPad("src") 194 if audioQueueSrcPad == nil { 195 return fmt.Errorf("failed to get audio queue source pad") 196 } 197 198 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 199 if ok != gst.PadLinkOK { 200 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 201 } 202 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 203 if ok != gst.PadLinkOK { 204 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 205 } 206 207 // Get elements 208 appsrc, err := pipeline.GetElementByName("appsrc") 209 if err != nil { 210 return err 211 } 212 appsink, err := pipeline.GetElementByName("appsink") 213 if err != nil { 214 return err 215 } 216 217 source := app.SrcFromElement(appsrc) 218 sink := app.SinkFromElement(appsink) 219 220 // Set up source callbacks 221 source.SetCallbacks(&app.SourceCallbacks{ 222 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 223 EnoughDataFunc: func(self *app.Source) { 224 // Nothing to do here 225 }, 226 SeekDataFunc: func(self *app.Source, offset uint64) bool { 227 return false // We don't support seeking 228 }, 229 }) 230 231 // Set up sink callbacks 232 sink.SetCallbacks(&app.SinkCallbacks{ 233 NewSampleFunc: WriterNewSample(ctx, output), 234 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 235 return gst.FlowOK 236 }, 237 }) 238 239 ctx, cancel := context.WithCancel(ctx) 240 defer cancel() 241 242 // Handle bus messages in a separate goroutine 243 g, ctx := errgroup.WithContext(ctx) 244 g.Go(func() error { 245 if err := HandleBusMessages(ctx, pipeline); err != nil { 246 log.Log(ctx, "pipeline error", "error", err) 247 } 248 cancel() 249 return nil 250 }) 251 252 // Start the pipeline 253 err = pipeline.SetState(gst.StatePlaying) 254 if err != nil { 255 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 256 } 257 258 // Wait for the pipeline to finish or context to be canceled 259 <-ctx.Done() 260 261 // durOk, dur := pipeline.QueryDuration(gst.FormatTime) 262 // if !durOk { 263 // return fmt.Errorf("failed to query duration") 264 // } 265 266 // Clean up 267 err = pipeline.SetState(gst.StateNull) 268 if err != nil { 269 return fmt.Errorf("failed to set pipeline state to null: %w", err) 270 } 271 272 return nil 273} 274 275// Splits out video into MPEG-TS and audio into MP4 (to be recombined after transcoding) 276func MP4ToMPEGTSVideoMP4Audio(ctx context.Context, input io.Reader, videoOutput io.Writer, audioOutput io.Writer) error { 277 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTSVideoMP4Audio") 278 pipelineStr := strings.Join([]string{ 279 "appsrc name=appsrc ! qtdemux name=demux", 280 "mpegtsmux name=videomux ! appsink name=videoappsink sync=false", 281 "mp4mux name=audiomux ! appsink name=audioappsink sync=false", 282 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue", 283 "demux.audio_0 ! opusparse ! queue name=audioqueue", 284 }, " ") 285 286 pipeline, err := gst.NewPipelineFromString(pipelineStr) 287 if err != nil { 288 return err 289 } 290 291 videomux, err := pipeline.GetElementByName("videomux") 292 if err != nil { 293 return err 294 } 295 muxVideoSinkPad := videomux.GetRequestPad("sink_%d") 296 if muxVideoSinkPad == nil { 297 return fmt.Errorf("failed to get video sink pad") 298 } 299 300 audiomux, err := pipeline.GetElementByName("audiomux") 301 if err != nil { 302 return err 303 } 304 muxAudioSinkPad := audiomux.GetRequestPad("audio_%u") 305 if muxAudioSinkPad == nil { 306 return fmt.Errorf("failed to get audio sink pad") 307 } 308 309 videoQueue, err := pipeline.GetElementByName("videoqueue") 310 if err != nil { 311 return err 312 } 313 audioQueue, err := pipeline.GetElementByName("audioqueue") 314 if err != nil { 315 return err 316 } 317 318 videoQueueSrcPad := videoQueue.GetStaticPad("src") 319 if videoQueueSrcPad == nil { 320 return fmt.Errorf("failed to get video queue source pad") 321 } 322 audioQueueSrcPad := audioQueue.GetStaticPad("src") 323 if audioQueueSrcPad == nil { 324 return fmt.Errorf("failed to get audio queue source pad") 325 } 326 327 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 328 if ok != gst.PadLinkOK { 329 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 330 } 331 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 332 if ok != gst.PadLinkOK { 333 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 334 } 335 336 // Get elements 337 appsrc, err := pipeline.GetElementByName("appsrc") 338 if err != nil { 339 return err 340 } 341 videoappsink, err := pipeline.GetElementByName("videoappsink") 342 if err != nil { 343 return err 344 } 345 audioappsink, err := pipeline.GetElementByName("audioappsink") 346 if err != nil { 347 return err 348 } 349 350 source := app.SrcFromElement(appsrc) 351 videoSink := app.SinkFromElement(videoappsink) 352 audioSink := app.SinkFromElement(audioappsink) 353 354 // Set up source callbacks 355 source.SetCallbacks(&app.SourceCallbacks{ 356 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 357 EnoughDataFunc: func(self *app.Source) { 358 // Nothing to do here 359 }, 360 SeekDataFunc: func(self *app.Source, offset uint64) bool { 361 return false // We don't support seeking 362 }, 363 }) 364 365 // Set up sink callbacks 366 videoSink.SetCallbacks(&app.SinkCallbacks{ 367 NewSampleFunc: WriterNewSample(ctx, videoOutput), 368 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 369 return gst.FlowOK 370 }, 371 }) 372 373 // Set up sink callbacks 374 audioSink.SetCallbacks(&app.SinkCallbacks{ 375 NewSampleFunc: WriterNewSample(ctx, audioOutput), 376 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 377 return gst.FlowOK 378 }, 379 }) 380 381 ctx, cancel := context.WithCancel(ctx) 382 defer cancel() 383 384 // Handle bus messages in a separate goroutine 385 g, ctx := errgroup.WithContext(ctx) 386 g.Go(func() error { 387 err = HandleBusMessages(ctx, pipeline) 388 cancel() 389 return err 390 }) 391 392 // Start the pipeline 393 err = pipeline.SetState(gst.StatePlaying) 394 if err != nil { 395 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 396 } 397 398 // Wait for the pipeline to finish or context to be canceled 399 <-ctx.Done() 400 401 // Clean up 402 err = pipeline.SetState(gst.StateNull) 403 if err != nil { 404 return fmt.Errorf("failed to set pipeline state to null: %w", err) 405 } 406 407 return g.Wait() 408} 409 410// Joins video and audio back together from MPEG-TS and MP4 (from transcoding) 411func MPEGTSVideoMP4AudioToMP4(ctx context.Context, videoInput io.Reader, audioInput io.Reader, output io.Writer) error { 412 pipelineStr := strings.Join([]string{ 413 "appsrc name=videoappsrc ! tsdemux name=videodemux", 414 "appsrc name=audioappsrc ! qtdemux name=audiodemux", 415 "mp4mux name=mux ! appsink name=appsink sync=false", 416 "h264parse name=videoparse ! video/x-h264,stream-format=avc ! queue name=videoqueue", 417 "audiodemux.audio_0 ! opusparse ! queue name=audioqueue", 418 }, " ") 419 420 pipeline, err := gst.NewPipelineFromString(pipelineStr) 421 if err != nil { 422 return err 423 } 424 425 mux, err := pipeline.GetElementByName("mux") 426 if err != nil { 427 return err 428 } 429 muxVideoSinkPad := mux.GetRequestPad("video_%u") 430 if muxVideoSinkPad == nil { 431 return fmt.Errorf("failed to get video sink pad") 432 } 433 muxAudioSinkPad := mux.GetRequestPad("audio_%u") 434 if muxAudioSinkPad == nil { 435 return fmt.Errorf("failed to get audio sink pad") 436 } 437 438 videoQueue, err := pipeline.GetElementByName("videoqueue") 439 if err != nil { 440 return err 441 } 442 audioQueue, err := pipeline.GetElementByName("audioqueue") 443 if err != nil { 444 return err 445 } 446 447 videoQueueSrcPad := videoQueue.GetStaticPad("src") 448 if videoQueueSrcPad == nil { 449 return fmt.Errorf("failed to get video queue source pad") 450 } 451 audioQueueSrcPad := audioQueue.GetStaticPad("src") 452 if audioQueueSrcPad == nil { 453 return fmt.Errorf("failed to get audio queue source pad") 454 } 455 456 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 457 if ok != gst.PadLinkOK { 458 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 459 } 460 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 461 if ok != gst.PadLinkOK { 462 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 463 } 464 465 videodemux, err := pipeline.GetElementByName("videodemux") 466 if err != nil { 467 return err 468 } 469 videoparse, err := pipeline.GetElementByName("videoparse") 470 if err != nil { 471 return err 472 } 473 videoParseSinkPad := videoparse.GetStaticPad("sink") 474 if videoParseSinkPad == nil { 475 return fmt.Errorf("failed to get video parse sink pad") 476 } 477 478 // Get elements 479 videoappsrc, err := pipeline.GetElementByName("videoappsrc") 480 if err != nil { 481 return err 482 } 483 audioappsrc, err := pipeline.GetElementByName("audioappsrc") 484 if err != nil { 485 return err 486 } 487 appsink, err := pipeline.GetElementByName("appsink") 488 if err != nil { 489 return err 490 } 491 492 videoSource := app.SrcFromElement(videoappsrc) 493 audioSource := app.SrcFromElement(audioappsrc) 494 sink := app.SinkFromElement(appsink) 495 496 // Set up source callbacks 497 videoSource.SetCallbacks(&app.SourceCallbacks{ 498 NeedDataFunc: ReaderNeedDataIncremental(ctx, videoInput), 499 EnoughDataFunc: func(self *app.Source) { 500 // Nothing to do here 501 }, 502 SeekDataFunc: func(self *app.Source, offset uint64) bool { 503 return false // We don't support seeking 504 }, 505 }) 506 507 audioSource.SetCallbacks(&app.SourceCallbacks{ 508 NeedDataFunc: ReaderNeedDataIncremental(ctx, audioInput), 509 EnoughDataFunc: func(self *app.Source) { 510 // Nothing to do here 511 }, 512 SeekDataFunc: func(self *app.Source, offset uint64) bool { 513 return false // We don't support seeking 514 }, 515 }) 516 517 // Set up sink callbacks 518 sink.SetCallbacks(&app.SinkCallbacks{ 519 NewSampleFunc: WriterNewSample(ctx, output), 520 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 521 return gst.FlowOK 522 }, 523 }) 524 525 ctx, cancel := context.WithCancel(ctx) 526 defer cancel() 527 528 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 529 if pad.GetDirection() == gst.PadDirectionSource { 530 ok := pad.Link(videoParseSinkPad) 531 defer func() { videoParseSinkPad = nil }() 532 if ok != gst.PadLinkOK { 533 log.Error(ctx, "failed to link video parse sink pad to video demux pad", "error", ok) 534 cancel() 535 } 536 } 537 } 538 if _, err := videodemux.Connect("pad-added", onPadAdded); err != nil { 539 return fmt.Errorf("failed connect pad-added handler: %w", err) 540 } 541 542 // Handle bus messages in a separate goroutine 543 g, ctx := errgroup.WithContext(ctx) 544 g.Go(func() error { 545 err = HandleBusMessages(ctx, pipeline) 546 cancel() 547 return err 548 }) 549 550 // Start the pipeline 551 err = pipeline.SetState(gst.StatePlaying) 552 if err != nil { 553 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 554 } 555 556 // Wait for the pipeline to finish or context to be canceled 557 <-ctx.Done() 558 559 // Clean up 560 err = pipeline.SetState(gst.StateNull) 561 if err != nil { 562 return fmt.Errorf("failed to set pipeline state to null: %w", err) 563 } 564 565 return g.Wait() 566}