Live video on the AT Protocol
at eli/postgres 191 lines 5.0 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/log" 13) 14 15// take in a segment and return a bunch of packets suitable for webrtc 16func Packetize(ctx context.Context, seg *bus.Seg) (*bus.PacketizedSegment, error) { 17 18 pipelineSlice := []string{ 19 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink sync=false name=videoappsink", 20 "opusparse name=audioparse ! appsink sync=false name=audioappsink", 21 } 22 23 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 24 if err != nil { 25 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 26 } 27 28 demuxBin, err := ConcatDemuxBin(ctx, seg) 29 if err != nil { 30 return nil, fmt.Errorf("failed to create concat bin: %w", err) 31 } 32 33 err = pipeline.Add(demuxBin.Element) 34 if err != nil { 35 return nil, fmt.Errorf("failed to add demux bin to bin: %w", err) 36 } 37 38 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0") 39 if demuxBinPadVideoSrc == nil { 40 return nil, fmt.Errorf("failed to get demux bin video src pad") 41 } 42 43 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0") 44 if demuxBinPadAudioSrc == nil { 45 return nil, fmt.Errorf("failed to get demux bin audio src pad") 46 } 47 48 videoParse, err := pipeline.GetElementByName("videoparse") 49 if err != nil { 50 return nil, fmt.Errorf("failed to get video parse element: %w", err) 51 } 52 53 audioParse, err := pipeline.GetElementByName("audioparse") 54 if err != nil { 55 return nil, fmt.Errorf("failed to get audio parse element: %w", err) 56 } 57 58 linked := demuxBinPadVideoSrc.Link(videoParse.GetStaticPad("sink")) 59 if linked != gst.PadLinkOK { 60 return nil, fmt.Errorf("failed to link demux bin video src pad to video parse element: %v", linked) 61 } 62 63 linked = demuxBinPadAudioSrc.Link(audioParse.GetStaticPad("sink")) 64 if linked != gst.PadLinkOK { 65 return nil, fmt.Errorf("failed to link demux bin audio src pad to audio parse element: %v", linked) 66 } 67 68 videoSink, err := pipeline.GetElementByName("videoappsink") 69 if err != nil { 70 return nil, fmt.Errorf("failed to get video appsink element: %w", err) 71 } 72 if videoSink == nil { 73 return nil, fmt.Errorf("failed to get video appsink element") 74 } 75 76 audioSink, err := pipeline.GetElementByName("audioappsink") 77 if err != nil { 78 return nil, fmt.Errorf("failed to get audio appsink element: %w", err) 79 } 80 if audioSink == nil { 81 return nil, fmt.Errorf("failed to get audio appsink element") 82 } 83 84 videoOutput := [][]byte{} 85 audioOutput := [][]byte{} 86 // eosCh := make(chan struct{}) 87 88 videoappsink := app.SinkFromElement(videoSink) 89 videoappsink.SetCallbacks(&app.SinkCallbacks{ 90 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 91 sample := sink.PullSample() 92 if sample == nil { 93 return gst.FlowEOS 94 } 95 96 buffer := sample.GetBuffer() 97 if buffer == nil { 98 return gst.FlowError 99 } 100 101 samples := buffer.Bytes() 102 103 videoOutput = append(videoOutput, samples) 104 105 // clockTime := buffer.Duration() 106 // dur := clockTime.AsDuration() 107 // if dur != nil { 108 // log.Log(ctx, "video duration", "duration", *dur) 109 // } else { 110 // log.Error(ctx, "no video duration", "samples", len(samples)) 111 // } 112 113 return gst.FlowOK 114 }, 115 EOSFunc: func(sink *app.Sink) { 116 log.Debug(ctx, "videoappsink EOSFunc") 117 }, 118 }) 119 120 segDur := time.Duration(0) 121 122 audioappsink := app.SinkFromElement(audioSink) 123 audioappsink.SetCallbacks(&app.SinkCallbacks{ 124 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 125 sample := sink.PullSample() 126 if sample == nil { 127 log.Warn(ctx, "audioappsink NewSampleFunc EOS") 128 return gst.FlowEOS 129 } 130 131 buffer := sample.GetBuffer() 132 if buffer == nil { 133 return gst.FlowError 134 } 135 136 samples := buffer.Bytes() 137 138 audioOutput = append(audioOutput, samples) 139 140 clockTime := buffer.Duration() 141 dur := clockTime.AsDuration() 142 if dur != nil { 143 segDur += *dur 144 } else { 145 log.Error(ctx, "no audio duration", "samples", len(samples)) 146 return gst.FlowError 147 } 148 149 return gst.FlowOK 150 }, 151 EOSFunc: func(sink *app.Sink) { 152 log.Debug(ctx, "audioappsink EOSFunc") 153 }, 154 }) 155 156 busErr := make(chan error) 157 go func() { 158 err := HandleBusMessages(ctx, pipeline) 159 if err != nil { 160 log.Log(ctx, "pipeline error", "error", err) 161 } 162 busErr <- err 163 }() 164 165 err = pipeline.SetState(gst.StatePlaying) 166 if err != nil { 167 return nil, fmt.Errorf("failed to set pipeline to playing state: %w", err) 168 } 169 170 defer func() { 171 err = pipeline.SetState(gst.StateNull) 172 if err != nil { 173 log.Error(ctx, "failed to set pipeline to null state", "error", err) 174 } 175 err = pipeline.Remove(demuxBin.Element) 176 if err != nil { 177 log.Error(ctx, "failed to remove demux bin from bin", "error", err) 178 } 179 }() 180 181 err = <-busErr 182 if err != nil { 183 return nil, fmt.Errorf("pipeline error: %w", err) 184 } 185 186 return &bus.PacketizedSegment{ 187 Video: videoOutput, 188 Audio: audioOutput, 189 Duration: segDur, 190 }, nil 191}