Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.26 563 lines 16 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "strings" 8 "time" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "golang.org/x/sync/errgroup" 13 "stream.place/streamplace/pkg/log" 14) 15 16// MP4ToMPEGTS converts an MP4 file with H264 video and Opus audio to an MPEG-TS file 17// It reads from the provided reader and writes the converted MPEG-TS to the writer. 18// The conversion is optimized for speed. 19func MP4ToMPEGTS(ctx context.Context, input io.Reader, output io.Writer) (int64, error) { 20 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTS") 21 pipelineStr := strings.Join([]string{ 22 "appsrc name=appsrc ! qtdemux name=demux", 23 "mpegtsmux name=mux ! appsink name=appsink sync=false", 24 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue", 25 "demux.audio_0 ! opusdec name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc ! queue name=audioqueue", 26 }, " ") 27 28 pipeline, err := gst.NewPipelineFromString(pipelineStr) 29 if err != nil { 30 return 0, err 31 } 32 33 mux, err := pipeline.GetElementByName("mux") 34 if err != nil { 35 return 0, err 36 } 37 muxVideoSinkPad := mux.GetRequestPad("sink_%d") 38 if muxVideoSinkPad == nil { 39 return 0, fmt.Errorf("failed to get video sink pad") 40 } 41 muxAudioSinkPad := mux.GetRequestPad("sink_%d") 42 if muxAudioSinkPad == nil { 43 return 0, fmt.Errorf("failed to get audio sink pad") 44 } 45 videoQueue, err := pipeline.GetElementByName("videoqueue") 46 if err != nil { 47 return 0, err 48 } 49 audioQueue, err := pipeline.GetElementByName("audioqueue") 50 if err != nil { 51 return 0, err 52 } 53 videoQueueSrcPad := videoQueue.GetStaticPad("src") 54 if videoQueueSrcPad == nil { 55 return 0, fmt.Errorf("failed to get video queue source pad") 56 } 57 audioQueueSrcPad := audioQueue.GetStaticPad("src") 58 if audioQueueSrcPad == nil { 59 return 0, fmt.Errorf("failed to get audio queue source pad") 60 } 61 62 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 63 if ok != gst.PadLinkOK { 64 return 0, fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 65 } 66 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 67 if ok != gst.PadLinkOK { 68 return 0, fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 69 } 70 71 // Get elements 72 appsrc, err := pipeline.GetElementByName("appsrc") 73 if err != nil { 74 return 0, err 75 } 76 appsink, err := pipeline.GetElementByName("appsink") 77 if err != nil { 78 return 0, err 79 } 80 81 source := app.SrcFromElement(appsrc) 82 sink := app.SinkFromElement(appsink) 83 84 // Set up source callbacks 85 source.SetCallbacks(&app.SourceCallbacks{ 86 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 87 EnoughDataFunc: func(self *app.Source) { 88 // Nothing to do here 89 }, 90 SeekDataFunc: func(self *app.Source, offset uint64) bool { 91 return false // We don't support seeking 92 }, 93 }) 94 95 // Set up sink callbacks 96 sink.SetCallbacks(&app.SinkCallbacks{ 97 NewSampleFunc: WriterNewSample(ctx, output), 98 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 99 return gst.FlowOK 100 }, 101 }) 102 103 ctx, cancel := context.WithCancel(ctx) 104 defer func() { 105 cancel() 106 // Clean up 107 err = pipeline.SetState(gst.StateNull) 108 if err != nil { 109 log.Error(ctx, "failed to set pipeline state to null", "error", err) 110 } 111 }() 112 113 go func() { 114 select { 115 case <-ctx.Done(): 116 return 117 case <-time.After(time.Second * 10): 118 log.Debug(ctx, "pipeline is taking too long to start, cancelling") 119 err := fmt.Errorf("pipeline is taking too long to start, cancelling") 120 pipeline.Error(err.Error(), err) 121 } 122 }() 123 124 // Handle bus messages in a separate goroutine 125 errCh := make(chan error) 126 go func() { 127 err := HandleBusMessages(ctx, pipeline) 128 cancel() 129 errCh <- err 130 close(errCh) 131 }() 132 133 // Start the pipeline 134 err = pipeline.SetState(gst.StatePlaying) 135 if err != nil { 136 return 0, fmt.Errorf("failed to set pipeline state to playing: %w", err) 137 } 138 139 var durOk bool 140 var dur int64 141 busErr := <-errCh 142 143 if busErr == nil { 144 durOk, dur = pipeline.QueryDuration(gst.FormatTime) 145 if !durOk { 146 return 0, fmt.Errorf("failed to query duration") 147 } 148 } 149 150 return dur, busErr 151} 152 153// MPEGTSToMP4 converts an MPEG-TS file with H264 video and Opus audio to an MP4 file. 154// It reads from the provided reader and writes the converted MP4 to the writer. 155func MPEGTSToMP4(ctx context.Context, input io.Reader, output io.Writer) error { 156 ctx = log.WithLogValues(ctx, "func", "MPEGTSToMP4") 157 pipelineStr := strings.Join([]string{ 158 "appsrc name=appsrc ! tsdemux name=demux", 159 "mp4mux name=mux ! appsink sync=false name=appsink", 160 "demux.video_0_0100 ! h264parse ! video/x-h264,stream-format=avc ! queue name=videoqueue", 161 "demux.audio_0_0101 ! opusdec ! opusenc ! queue name=audioqueue", 162 }, " ") 163 164 pipeline, err := gst.NewPipelineFromString(pipelineStr) 165 if err != nil { 166 return err 167 } 168 169 mux, err := pipeline.GetElementByName("mux") 170 if err != nil { 171 return err 172 } 173 muxVideoSinkPad := mux.GetRequestPad("video_%u") 174 if muxVideoSinkPad == nil { 175 return fmt.Errorf("failed to get video sink pad") 176 } 177 muxAudioSinkPad := mux.GetRequestPad("audio_%u") 178 if muxAudioSinkPad == nil { 179 return fmt.Errorf("failed to get audio sink pad") 180 } 181 videoQueue, err := pipeline.GetElementByName("videoqueue") 182 if err != nil { 183 return err 184 } 185 audioQueue, err := pipeline.GetElementByName("audioqueue") 186 if err != nil { 187 return err 188 } 189 videoQueueSrcPad := videoQueue.GetStaticPad("src") 190 if videoQueueSrcPad == nil { 191 return fmt.Errorf("failed to get video queue source pad") 192 } 193 audioQueueSrcPad := audioQueue.GetStaticPad("src") 194 if audioQueueSrcPad == nil { 195 return fmt.Errorf("failed to get audio queue source pad") 196 } 197 198 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 199 if ok != gst.PadLinkOK { 200 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 201 } 202 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 203 if ok != gst.PadLinkOK { 204 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 205 } 206 207 // Get elements 208 appsrc, err := pipeline.GetElementByName("appsrc") 209 if err != nil { 210 return err 211 } 212 appsink, err := pipeline.GetElementByName("appsink") 213 if err != nil { 214 return err 215 } 216 217 source := app.SrcFromElement(appsrc) 218 sink := app.SinkFromElement(appsink) 219 220 // Set up source callbacks 221 source.SetCallbacks(&app.SourceCallbacks{ 222 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 223 EnoughDataFunc: func(self *app.Source) { 224 // Nothing to do here 225 }, 226 SeekDataFunc: func(self *app.Source, offset uint64) bool { 227 return false // We don't support seeking 228 }, 229 }) 230 231 // Set up sink callbacks 232 sink.SetCallbacks(&app.SinkCallbacks{ 233 NewSampleFunc: WriterNewSample(ctx, output), 234 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 235 return gst.FlowOK 236 }, 237 }) 238 239 ctx, cancel := context.WithCancel(ctx) 240 defer cancel() 241 242 // Handle bus messages in a separate goroutine 243 g, ctx := errgroup.WithContext(ctx) 244 g.Go(func() error { 245 if err := HandleBusMessages(ctx, pipeline); err != nil { 246 log.Log(ctx, "pipeline error", "error", err) 247 } 248 cancel() 249 return nil 250 }) 251 252 // Start the pipeline 253 err = pipeline.SetState(gst.StatePlaying) 254 if err != nil { 255 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 256 } 257 258 // Wait for the pipeline to finish or context to be canceled 259 <-ctx.Done() 260 261 // durOk, dur := pipeline.QueryDuration(gst.FormatTime) 262 // if !durOk { 263 // return fmt.Errorf("failed to query duration") 264 // } 265 266 // Clean up 267 err = pipeline.SetState(gst.StateNull) 268 if err != nil { 269 return fmt.Errorf("failed to set pipeline state to null: %w", err) 270 } 271 272 return nil 273} 274 275// Splits out video into MPEG-TS and audio into MP4 (to be recombined after transcoding) 276func MP4ToMPEGTSVideoMP4Audio(ctx context.Context, input io.Reader, videoOutput io.Writer, audioOutput io.Writer) error { 277 ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTSVideoMP4Audio") 278 pipelineStr := strings.Join([]string{ 279 "appsrc name=appsrc ! qtdemux name=demux", 280 "mpegtsmux name=videomux ! appsink name=videoappsink sync=false", 281 "mp4mux name=audiomux ! appsink name=audioappsink sync=false", 282 "demux.video_0 ! h264parse ! video/x-h264,stream-format=byte-stream ! queue name=videoqueue", 283 "demux.audio_0 ! opusparse ! queue name=audioqueue", 284 }, " ") 285 286 pipeline, err := gst.NewPipelineFromString(pipelineStr) 287 if err != nil { 288 return err 289 } 290 291 videomux, err := pipeline.GetElementByName("videomux") 292 if err != nil { 293 return err 294 } 295 muxVideoSinkPad := videomux.GetRequestPad("sink_%d") 296 if muxVideoSinkPad == nil { 297 return fmt.Errorf("failed to get video sink pad") 298 } 299 300 audiomux, err := pipeline.GetElementByName("audiomux") 301 if err != nil { 302 return err 303 } 304 muxAudioSinkPad := audiomux.GetRequestPad("audio_%u") 305 if muxAudioSinkPad == nil { 306 return fmt.Errorf("failed to get audio sink pad") 307 } 308 309 videoQueue, err := pipeline.GetElementByName("videoqueue") 310 if err != nil { 311 return err 312 } 313 audioQueue, err := pipeline.GetElementByName("audioqueue") 314 if err != nil { 315 return err 316 } 317 318 videoQueueSrcPad := videoQueue.GetStaticPad("src") 319 if videoQueueSrcPad == nil { 320 return fmt.Errorf("failed to get video queue source pad") 321 } 322 audioQueueSrcPad := audioQueue.GetStaticPad("src") 323 if audioQueueSrcPad == nil { 324 return fmt.Errorf("failed to get audio queue source pad") 325 } 326 327 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 328 if ok != gst.PadLinkOK { 329 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 330 } 331 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 332 if ok != gst.PadLinkOK { 333 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 334 } 335 336 // Get elements 337 appsrc, err := pipeline.GetElementByName("appsrc") 338 if err != nil { 339 return err 340 } 341 videoappsink, err := pipeline.GetElementByName("videoappsink") 342 if err != nil { 343 return err 344 } 345 audioappsink, err := pipeline.GetElementByName("audioappsink") 346 if err != nil { 347 return err 348 } 349 350 source := app.SrcFromElement(appsrc) 351 videoSink := app.SinkFromElement(videoappsink) 352 audioSink := app.SinkFromElement(audioappsink) 353 354 // Set up source callbacks 355 source.SetCallbacks(&app.SourceCallbacks{ 356 NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 357 EnoughDataFunc: func(self *app.Source) { 358 // Nothing to do here 359 }, 360 SeekDataFunc: func(self *app.Source, offset uint64) bool { 361 return false // We don't support seeking 362 }, 363 }) 364 365 // Set up sink callbacks 366 videoSink.SetCallbacks(&app.SinkCallbacks{ 367 NewSampleFunc: WriterNewSample(ctx, videoOutput), 368 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 369 return gst.FlowOK 370 }, 371 }) 372 373 // Set up sink callbacks 374 audioSink.SetCallbacks(&app.SinkCallbacks{ 375 NewSampleFunc: WriterNewSample(ctx, audioOutput), 376 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 377 return gst.FlowOK 378 }, 379 }) 380 381 ctx, cancel := context.WithCancel(ctx) 382 defer cancel() 383 384 // Handle bus messages in a separate goroutine 385 g, ctx := errgroup.WithContext(ctx) 386 g.Go(func() error { 387 err = HandleBusMessages(ctx, pipeline) 388 cancel() 389 return err 390 }) 391 392 // Start the pipeline 393 err = pipeline.SetState(gst.StatePlaying) 394 if err != nil { 395 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 396 } 397 398 // Wait for the pipeline to finish or context to be canceled 399 <-ctx.Done() 400 401 // Clean up 402 err = pipeline.SetState(gst.StateNull) 403 if err != nil { 404 return fmt.Errorf("failed to set pipeline state to null: %w", err) 405 } 406 407 return g.Wait() 408} 409 410// Joins video and audio back together from MPEG-TS and MP4 (from transcoding) 411func MPEGTSVideoMP4AudioToMP4(ctx context.Context, videoInput io.Reader, audioInput io.Reader, output io.Writer) error { 412 pipelineStr := strings.Join([]string{ 413 "appsrc name=videoappsrc ! tsdemux name=videodemux", 414 "appsrc name=audioappsrc ! qtdemux name=audiodemux", 415 "mp4mux name=mux ! appsink name=appsink sync=false", 416 "h264parse name=videoparse ! video/x-h264,stream-format=avc ! queue name=videoqueue", 417 "audiodemux.audio_0 ! opusparse ! queue name=audioqueue", 418 }, " ") 419 420 pipeline, err := gst.NewPipelineFromString(pipelineStr) 421 if err != nil { 422 return err 423 } 424 425 mux, err := pipeline.GetElementByName("mux") 426 if err != nil { 427 return err 428 } 429 muxVideoSinkPad := mux.GetRequestPad("video_%u") 430 if muxVideoSinkPad == nil { 431 return fmt.Errorf("failed to get video sink pad") 432 } 433 muxAudioSinkPad := mux.GetRequestPad("audio_%u") 434 if muxAudioSinkPad == nil { 435 return fmt.Errorf("failed to get audio sink pad") 436 } 437 438 videoQueue, err := pipeline.GetElementByName("videoqueue") 439 if err != nil { 440 return err 441 } 442 audioQueue, err := pipeline.GetElementByName("audioqueue") 443 if err != nil { 444 return err 445 } 446 447 videoQueueSrcPad := videoQueue.GetStaticPad("src") 448 if videoQueueSrcPad == nil { 449 return fmt.Errorf("failed to get video queue source pad") 450 } 451 audioQueueSrcPad := audioQueue.GetStaticPad("src") 452 if audioQueueSrcPad == nil { 453 return fmt.Errorf("failed to get audio queue source pad") 454 } 455 456 ok := videoQueueSrcPad.Link(muxVideoSinkPad) 457 if ok != gst.PadLinkOK { 458 return fmt.Errorf("failed to link video queue source pad to mux video sink pad: %v", ok) 459 } 460 ok = audioQueueSrcPad.Link(muxAudioSinkPad) 461 if ok != gst.PadLinkOK { 462 return fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 463 } 464 465 videodemux, err := pipeline.GetElementByName("videodemux") 466 if err != nil { 467 return err 468 } 469 videoparse, err := pipeline.GetElementByName("videoparse") 470 if err != nil { 471 return err 472 } 473 videoParseSinkPad := videoparse.GetStaticPad("sink") 474 if videoParseSinkPad == nil { 475 return fmt.Errorf("failed to get video parse sink pad") 476 } 477 478 // Get elements 479 videoappsrc, err := pipeline.GetElementByName("videoappsrc") 480 if err != nil { 481 return err 482 } 483 audioappsrc, err := pipeline.GetElementByName("audioappsrc") 484 if err != nil { 485 return err 486 } 487 appsink, err := pipeline.GetElementByName("appsink") 488 if err != nil { 489 return err 490 } 491 492 videoSource := app.SrcFromElement(videoappsrc) 493 audioSource := app.SrcFromElement(audioappsrc) 494 sink := app.SinkFromElement(appsink) 495 496 // Set up source callbacks 497 videoSource.SetCallbacks(&app.SourceCallbacks{ 498 NeedDataFunc: ReaderNeedDataIncremental(ctx, videoInput), 499 EnoughDataFunc: func(self *app.Source) { 500 // Nothing to do here 501 }, 502 SeekDataFunc: func(self *app.Source, offset uint64) bool { 503 return false // We don't support seeking 504 }, 505 }) 506 507 audioSource.SetCallbacks(&app.SourceCallbacks{ 508 NeedDataFunc: ReaderNeedDataIncremental(ctx, audioInput), 509 EnoughDataFunc: func(self *app.Source) { 510 // Nothing to do here 511 }, 512 SeekDataFunc: func(self *app.Source, offset uint64) bool { 513 return false // We don't support seeking 514 }, 515 }) 516 517 // Set up sink callbacks 518 sink.SetCallbacks(&app.SinkCallbacks{ 519 NewSampleFunc: WriterNewSample(ctx, output), 520 NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 521 return gst.FlowOK 522 }, 523 }) 524 525 ctx, cancel := context.WithCancel(ctx) 526 defer cancel() 527 528 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 529 if pad.GetDirection() == gst.PadDirectionSource { 530 ok := pad.Link(videoParseSinkPad) 531 if ok != gst.PadLinkOK { 532 log.Error(ctx, "failed to link video parse sink pad to video demux pad", "error", ok) 533 cancel() 534 } 535 } 536 } 537 if _, err := videodemux.Connect("pad-added", onPadAdded); err != nil { 538 return fmt.Errorf("failed connect pad-added handler: %w", err) 539 } 540 541 errCh := make(chan error) 542 go func() { 543 err = HandleBusMessages(ctx, pipeline) 544 cancel() 545 errCh <- err 546 }() 547 548 // Start the pipeline 549 err = pipeline.SetState(gst.StatePlaying) 550 if err != nil { 551 return fmt.Errorf("failed to set pipeline state to playing: %w", err) 552 } 553 554 defer func() { 555 err = pipeline.SetState(gst.StateNull) 556 if err != nil { 557 log.Error(ctx, "failed to set pipeline state to null", "error", err) 558 } 559 videoParseSinkPad = nil 560 }() 561 562 return <-errCh 563}