Live video on the AT Protocol
at eli/node-22 562 lines 16 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "strings" 8 "time" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "golang.org/x/sync/errgroup" 13 "stream.place/streamplace/pkg/log" 14) 15 16// MP4ToMPEGTS converts an MP4 file with H264 video and Opus audio to an MPEG-TS file 17// It reads from the provided reader and writes the converted MPEG-TS to the writer. 18// The conversion is optimized for speed. 19func MP4ToMPEGTS(ctx context.Context, input io.Reader, output io.Writer) (int64, error) { 20 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTS") 21 pipelineStr := strings.Join([]string{ 22 "appsrc name=appsrc ! qtdemux name=demux", 23 "mpegtsmux name=mux ! appsink name=appsink sync=false", 24 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue", 25 "demux.audio_0 ! opusdec name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc ! queue name=audioqueue", 26 }, " ") 27 28 pipeline, err := gst.NewPipelineFromString(pipelineStr) 29 if err != nil { 30 return 0, err 31 } 32 33 mux, err := pipeline.GetElementByName("mux") 34 if err != nil { 35 return 0, err 36 } 37 muxVideoSinkPad := mux.GetRequestPad("sink_%d") 38 if muxVideoSinkPad == nil { 39 return 0, fmt.Errorf("failed to get video sink pad") 40 } 41 muxAudioSinkPad := mux.GetRequestPad("sink_%d") 42 if muxAudioSinkPad == nil { 43 return 0, fmt.Errorf("failed to get audio sink pad") 44 } 45 videoQueue, err := pipeline.GetElementByName("videoqueue") 46 if err != nil { 47 return 0, err 48 } 49 audioQueue, err := pipeline.GetElementByName("audioqueue") 50 if err != nil { 51 return 0, err 52 } 53 videoQueueSrcPad := videoQueue.GetStaticPad("src") 54 if videoQueueSrcPad == nil { 55 return 0, fmt.Errorf("failed to get video queue source pad") 56 } 57 audioQueueSrcPad := audioQueue.GetStaticPad("src") 58 if audioQueueSrcPad == nil { 59 return 0, fmt.Errorf("failed to get audio queue source pad") 60 } 61 62 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 63 if ok != gst.PadLinkOK { 64 return 0, fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 65 } 66 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 67 if ok != gst.PadLinkOK { 68 return 0, fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 69 } 70 71 // Get elements 72 appsrc, err := pipeline.GetElementByName("appsrc") 73 if err != nil { 74 return 0, err 75 } 76 appsink, err := pipeline.GetElementByName("appsink") 77 if err != nil { 78 return 0, err 79 } 80 81 source := app.SrcFromElement(appsrc) 82 sink := app.SinkFromElement(appsink) 83 84 // Set up source callbacks 85 source.SetCallbacks(&app.SourceCallbacks{ 86 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 87 EnoughDataFunc: func(self *app.Source) { 88 // Nothing to do here 89 }, 90 SeekDataFunc: func(self *app.Source, offset uint64) bool { 91 return false // We don't support seeking 92 }, 93 }) 94 95 // Set up sink callbacks 96 sink.SetCallbacks(&app.SinkCallbacks{ 97 NewSampleFunc: WriterNewSample(ctx, output), 98 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 99 return gst.FlowOK 100 }, 101 }) 102 103 ctx, cancel := context.WithCancel(ctx) 104 defer func() { 105 cancel() 106 // Clean up 107 err = pipeline.SetState(gst.StateNull) 108 if err != nil { 109 log.Error(ctx, "failed to set pipeline state to null", "error", err) 110 } 111 }() 112 113 go func() { 114 select { 115 case <-ctx.Done(): 116 return 117 case <-time.After(time.Second * 10): 118 log.Debug(ctx, "pipeline is taking too long to start, cancelling") 119 err := fmt.Errorf("pipeline is taking too long to start, cancelling") 120 pipeline.Error(err.Error(), err) 121 } 122 }() 123 124 // Handle bus messages in a separate goroutine 125 errCh := make(chan error) 126 go func() { 127 err := HandleBusMessages(ctx, pipeline) 128 cancel() 129 errCh <- err 130 close(errCh) 131 }() 132 133 // Start the pipeline 134 err = pipeline.SetState(gst.StatePlaying) 135 if err != nil { 136 return 0, fmt.Errorf("failed to set pipeline state to playing: %w", err) 137 } 138 139 var durOk bool 140 var dur int64 141 busErr := <-errCh 142 143 if busErr == nil { 144 durOk, dur = pipeline.QueryDuration(gst.FormatTime) 145 if !durOk { 146 return 0, fmt.Errorf("failed to query duration") 147 } 148 } 149 150 return dur, busErr 151} 152 153// MPEGTSToMP4 converts an MPEG-TS file with H264 video and Opus audio to an MP4 file. 154// It reads from the provided reader and writes the converted MP4 to the writer. 155func MPEGTSToMP4(ctx context.Context, input io.Reader, output io.Writer) error { 156 ctx = log.WithLogValues(ctx, "func", "MPEGTSToMP4") 157 pipelineStr := strings.Join([]string{ 158 "appsrc name=appsrc ! tsdemux name=demux", 159 "mp4mux name=mux ! appsink sync=false name=appsink", 160 "demux.video_0_0100 ! h264parse ! video/x-h264,stream-format=avc ! queue name=videoqueue", 161 "demux.audio_0_0101 ! opusdec ! opusenc ! queue name=audioqueue", 162 }, " ") 163 164 pipeline, err := gst.NewPipelineFromString(pipelineStr) 165 if err != nil { 166 return err 167 } 168 169 mux, err := pipeline.GetElementByName("mux") 170 if err != nil { 171 return err 172 } 173 muxVideoSinkPad := mux.GetRequestPad("video_%u") 174 if muxVideoSinkPad == nil { 175 return fmt.Errorf("failed to get video sink pad") 176 } 177 muxAudioSinkPad := mux.GetRequestPad("audio_%u") 178 if muxAudioSinkPad == nil { 179 return fmt.Errorf("failed to get audio sink pad") 180 } 181 videoQueue, err := pipeline.GetElementByName("videoqueue") 182 if err != nil { 183 return err 184 } 185 audioQueue, err := pipeline.GetElementByName("audioqueue") 186 if err != nil { 187 return err 188 } 189 videoQueueSrcPad := videoQueue.GetStaticPad("src") 190 if videoQueueSrcPad == nil { 191 return fmt.Errorf("failed to get video queue source pad") 192 } 193 audioQueueSrcPad := audioQueue.GetStaticPad("src") 194 if audioQueueSrcPad == nil { 195 return fmt.Errorf("failed to get audio queue source pad") 196 } 197 198 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 199 if ok != gst.PadLinkOK { 200 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 201 } 202 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 203 if ok != gst.PadLinkOK { 204 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 205 } 206 207 // Get elements 208 appsrc, err := pipeline.GetElementByName("appsrc") 209 if err != nil { 210 return err 211 } 212 appsink, err := pipeline.GetElementByName("appsink") 213 if err != nil { 214 return err 215 } 216 217 source := app.SrcFromElement(appsrc) 218 sink := app.SinkFromElement(appsink) 219 220 // Set up source callbacks 221 source.SetCallbacks(&app.SourceCallbacks{ 222 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 223 EnoughDataFunc: func(self *app.Source) { 224 // Nothing to do here 225 }, 226 SeekDataFunc: func(self *app.Source, offset uint64) bool { 227 return false // We don't support seeking 228 }, 229 }) 230 231 // Set up sink callbacks 232 sink.SetCallbacks(&app.SinkCallbacks{ 233 NewSampleFunc: WriterNewSample(ctx, output), 234 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 235 return gst.FlowOK 236 }, 237 }) 238 239 ctx, cancel := context.WithCancel(ctx) 240 defer cancel() 241 242 // Handle bus messages in a separate goroutine 243 g, ctx := errgroup.WithContext(ctx) 244 g.Go(func() error { 245 HandleBusMessages(ctx, pipeline) 246 cancel() 247 return nil 248 }) 249 250 // Start the pipeline 251 err = pipeline.SetState(gst.StatePlaying) 252 if err != nil { 253 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 254 } 255 256 // Wait for the pipeline to finish or context to be canceled 257 <-ctx.Done() 258 259 // durOk, dur := pipeline.QueryDuration(gst.FormatTime) 260 // if !durOk { 261 // return fmt.Errorf("failed to query duration") 262 // } 263 264 // Clean up 265 err = pipeline.SetState(gst.StateNull) 266 if err != nil { 267 return fmt.Errorf("failed to set pipeline state to null: %w", err) 268 } 269 270 return nil 271} 272 273// Splits out video into MPEG-TS and audio into MP4 (to be recombined after transcoding) 274func MP4ToMPEGTSVideoMP4Audio(ctx context.Context, input io.Reader, videoOutput io.Writer, audioOutput io.Writer) error { 275 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTSVideoMP4Audio") 276 pipelineStr := strings.Join([]string{ 277 "appsrc name=appsrc ! qtdemux name=demux", 278 "mpegtsmux name=videomux ! appsink name=videoappsink sync=false", 279 "mp4mux name=audiomux ! appsink name=audioappsink sync=false", 280 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue", 281 "demux.audio_0 ! opusparse ! queue name=audioqueue", 282 }, " ") 283 284 pipeline, err := gst.NewPipelineFromString(pipelineStr) 285 if err != nil { 286 return err 287 } 288 289 videomux, err := pipeline.GetElementByName("videomux") 290 if err != nil { 291 return err 292 } 293 muxVideoSinkPad := videomux.GetRequestPad("sink_%d") 294 if muxVideoSinkPad == nil { 295 return fmt.Errorf("failed to get video sink pad") 296 } 297 298 audiomux, err := pipeline.GetElementByName("audiomux") 299 if err != nil { 300 return err 301 } 302 muxAudioSinkPad := audiomux.GetRequestPad("audio_%u") 303 if muxAudioSinkPad == nil { 304 return fmt.Errorf("failed to get audio sink pad") 305 } 306 307 videoQueue, err := pipeline.GetElementByName("videoqueue") 308 if err != nil { 309 return err 310 } 311 audioQueue, err := pipeline.GetElementByName("audioqueue") 312 if err != nil { 313 return err 314 } 315 316 videoQueueSrcPad := videoQueue.GetStaticPad("src") 317 if videoQueueSrcPad == nil { 318 return fmt.Errorf("failed to get video queue source pad") 319 } 320 audioQueueSrcPad := audioQueue.GetStaticPad("src") 321 if audioQueueSrcPad == nil { 322 return fmt.Errorf("failed to get audio queue source pad") 323 } 324 325 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 326 if ok != gst.PadLinkOK { 327 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 328 } 329 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 330 if ok != gst.PadLinkOK { 331 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 332 } 333 334 // Get elements 335 appsrc, err := pipeline.GetElementByName("appsrc") 336 if err != nil { 337 return err 338 } 339 videoappsink, err := pipeline.GetElementByName("videoappsink") 340 if err != nil { 341 return err 342 } 343 audioappsink, err := pipeline.GetElementByName("audioappsink") 344 if err != nil { 345 return err 346 } 347 348 source := app.SrcFromElement(appsrc) 349 videoSink := app.SinkFromElement(videoappsink) 350 audioSink := app.SinkFromElement(audioappsink) 351 352 // Set up source callbacks 353 source.SetCallbacks(&app.SourceCallbacks{ 354 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 355 EnoughDataFunc: func(self *app.Source) { 356 // Nothing to do here 357 }, 358 SeekDataFunc: func(self *app.Source, offset uint64) bool { 359 return false // We don't support seeking 360 }, 361 }) 362 363 // Set up sink callbacks 364 videoSink.SetCallbacks(&app.SinkCallbacks{ 365 NewSampleFunc: WriterNewSample(ctx, videoOutput), 366 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 367 return gst.FlowOK 368 }, 369 }) 370 371 // Set up sink callbacks 372 audioSink.SetCallbacks(&app.SinkCallbacks{ 373 NewSampleFunc: WriterNewSample(ctx, audioOutput), 374 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 375 return gst.FlowOK 376 }, 377 }) 378 379 ctx, cancel := context.WithCancel(ctx) 380 defer cancel() 381 382 // Handle bus messages in a separate goroutine 383 g, ctx := errgroup.WithContext(ctx) 384 g.Go(func() error { 385 err = HandleBusMessages(ctx, pipeline) 386 cancel() 387 return err 388 }) 389 390 // Start the pipeline 391 err = pipeline.SetState(gst.StatePlaying) 392 if err != nil { 393 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 394 } 395 396 // Wait for the pipeline to finish or context to be canceled 397 <-ctx.Done() 398 399 // Clean up 400 err = pipeline.SetState(gst.StateNull) 401 if err != nil { 402 return fmt.Errorf("failed to set pipeline state to null: %w", err) 403 } 404 405 return g.Wait() 406} 407 408// Joins video and audio back together from MPEG-TS and MP4 (from transcoding) 409func MPEGTSVideoMP4AudioToMP4(ctx context.Context, videoInput io.Reader, audioInput io.Reader, output io.Writer) error { 410 pipelineStr := strings.Join([]string{ 411 "appsrc name=videoappsrc ! tsdemux name=videodemux", 412 "appsrc name=audioappsrc ! qtdemux name=audiodemux", 413 "mp4mux name=mux ! appsink name=appsink sync=false", 414 "h264parse name=videoparse ! video/x-h264,stream-format=avc ! queue name=videoqueue", 415 "audiodemux.audio_0 ! opusparse ! queue name=audioqueue", 416 }, " ") 417 418 pipeline, err := gst.NewPipelineFromString(pipelineStr) 419 if err != nil { 420 return err 421 } 422 423 mux, err := pipeline.GetElementByName("mux") 424 if err != nil { 425 return err 426 } 427 muxVideoSinkPad := mux.GetRequestPad("video_%u") 428 if muxVideoSinkPad == nil { 429 return fmt.Errorf("failed to get video sink pad") 430 } 431 muxAudioSinkPad := mux.GetRequestPad("audio_%u") 432 if muxAudioSinkPad == nil { 433 return fmt.Errorf("failed to get audio sink pad") 434 } 435 436 videoQueue, err := pipeline.GetElementByName("videoqueue") 437 if err != nil { 438 return err 439 } 440 audioQueue, err := pipeline.GetElementByName("audioqueue") 441 if err != nil { 442 return err 443 } 444 445 videoQueueSrcPad := videoQueue.GetStaticPad("src") 446 if videoQueueSrcPad == nil { 447 return fmt.Errorf("failed to get video queue source pad") 448 } 449 audioQueueSrcPad := audioQueue.GetStaticPad("src") 450 if audioQueueSrcPad == nil { 451 return fmt.Errorf("failed to get audio queue source pad") 452 } 453 454 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 455 if ok != gst.PadLinkOK { 456 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 457 } 458 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 459 if ok != gst.PadLinkOK { 460 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 461 } 462 463 videodemux, err := pipeline.GetElementByName("videodemux") 464 if err != nil { 465 return err 466 } 467 videoparse, err := pipeline.GetElementByName("videoparse") 468 if err != nil { 469 return err 470 } 471 videoParseSinkPad := videoparse.GetStaticPad("sink") 472 if videoParseSinkPad == nil { 473 return fmt.Errorf("failed to get video parse sink pad") 474 } 475 476 // Get elements 477 videoappsrc, err := pipeline.GetElementByName("videoappsrc") 478 if err != nil { 479 return err 480 } 481 audioappsrc, err := pipeline.GetElementByName("audioappsrc") 482 if err != nil { 483 return err 484 } 485 appsink, err := pipeline.GetElementByName("appsink") 486 if err != nil { 487 return err 488 } 489 490 videoSource := app.SrcFromElement(videoappsrc) 491 audioSource := app.SrcFromElement(audioappsrc) 492 sink := app.SinkFromElement(appsink) 493 494 // Set up source callbacks 495 videoSource.SetCallbacks(&app.SourceCallbacks{ 496 NeedDataFunc: ReaderNeedDataIncremental(ctx, videoInput), 497 EnoughDataFunc: func(self *app.Source) { 498 // Nothing to do here 499 }, 500 SeekDataFunc: func(self *app.Source, offset uint64) bool { 501 return false // We don't support seeking 502 }, 503 }) 504 505 audioSource.SetCallbacks(&app.SourceCallbacks{ 506 NeedDataFunc: ReaderNeedDataIncremental(ctx, audioInput), 507 EnoughDataFunc: func(self *app.Source) { 508 // Nothing to do here 509 }, 510 SeekDataFunc: func(self *app.Source, offset uint64) bool { 511 return false // We don't support seeking 512 }, 513 }) 514 515 // Set up sink callbacks 516 sink.SetCallbacks(&app.SinkCallbacks{ 517 NewSampleFunc: WriterNewSample(ctx, output), 518 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 519 return gst.FlowOK 520 }, 521 }) 522 523 ctx, cancel := context.WithCancel(ctx) 524 defer cancel() 525 526 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 527 if pad.GetDirection() == gst.PadDirectionSource { 528 ok := pad.Link(videoParseSinkPad) 529 defer func() { videoParseSinkPad = nil }() 530 if ok != gst.PadLinkOK { 531 log.Error(ctx, "failed to link video parse sink pad to video demux pad", "error", ok) 532 cancel() 533 } 534 } 535 } 536 videodemux.Connect("pad-added", onPadAdded) 537 538 // Handle bus messages in a separate goroutine 539 g, ctx := errgroup.WithContext(ctx) 540 g.Go(func() error { 541 err = HandleBusMessages(ctx, pipeline) 542 cancel() 543 return err 544 }) 545 546 // Start the pipeline 547 err = pipeline.SetState(gst.StatePlaying) 548 if err != nil { 549 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 550 } 551 552 // Wait for the pipeline to finish or context to be canceled 553 <-ctx.Done() 554 555 // Clean up 556 err = pipeline.SetState(gst.StateNull) 557 if err != nil { 558 return fmt.Errorf("failed to set pipeline state to null: %w", err) 559 } 560 561 return g.Wait() 562}