Live video on the AT Protocol
at eli/docker-deployment-docs 194 lines 5.1 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/log" 13) 14 15// take in a segment and return a bunch of packets suitable for webrtc 16func Packetize(ctx context.Context, seg *bus.Seg) (*bus.PacketizedSegment, error) { 17 18 ctx, cancel := context.WithTimeout(ctx, 10*time.Second) 19 defer cancel() 20 21 pipelineSlice := []string{ 22 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink sync=false name=videoappsink", 23 "opusparse name=audioparse ! appsink sync=false name=audioappsink", 24 } 25 26 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 27 if err != nil { 28 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 29 } 30 31 demuxBin, err := ConcatDemuxBin(ctx, seg) 32 if err != nil { 33 return nil, fmt.Errorf("failed to create concat bin: %w", err) 34 } 35 36 err = pipeline.Add(demuxBin.Element) 37 if err != nil { 38 return nil, fmt.Errorf("failed to add demux bin to bin: %w", err) 39 } 40 41 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0") 42 if demuxBinPadVideoSrc == nil { 43 return nil, fmt.Errorf("failed to get demux bin video src pad") 44 } 45 46 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0") 47 if demuxBinPadAudioSrc == nil { 48 return nil, fmt.Errorf("failed to get demux bin audio src pad") 49 } 50 51 videoParse, err := pipeline.GetElementByName("videoparse") 52 if err != nil { 53 return nil, fmt.Errorf("failed to get video parse element: %w", err) 54 } 55 56 audioParse, err := pipeline.GetElementByName("audioparse") 57 if err != nil { 58 return nil, fmt.Errorf("failed to get audio parse element: %w", err) 59 } 60 61 linked := demuxBinPadVideoSrc.Link(videoParse.GetStaticPad("sink")) 62 if linked != gst.PadLinkOK { 63 return nil, fmt.Errorf("failed to link demux bin video src pad to video parse element: %v", linked) 64 } 65 66 linked = demuxBinPadAudioSrc.Link(audioParse.GetStaticPad("sink")) 67 if linked != gst.PadLinkOK { 68 return nil, fmt.Errorf("failed to link demux bin audio src pad to audio parse element: %v", linked) 69 } 70 71 videoSink, err := pipeline.GetElementByName("videoappsink") 72 if err != nil { 73 return nil, fmt.Errorf("failed to get video appsink element: %w", err) 74 } 75 if videoSink == nil { 76 return nil, fmt.Errorf("failed to get video appsink element") 77 } 78 79 audioSink, err := pipeline.GetElementByName("audioappsink") 80 if err != nil { 81 return nil, fmt.Errorf("failed to get audio appsink element: %w", err) 82 } 83 if audioSink == nil { 84 return nil, fmt.Errorf("failed to get audio appsink element") 85 } 86 87 videoOutput := [][]byte{} 88 audioOutput := [][]byte{} 89 // eosCh := make(chan struct{}) 90 91 videoappsink := app.SinkFromElement(videoSink) 92 videoappsink.SetCallbacks(&app.SinkCallbacks{ 93 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 94 sample := sink.PullSample() 95 if sample == nil { 96 return gst.FlowEOS 97 } 98 99 buffer := sample.GetBuffer() 100 if buffer == nil { 101 return gst.FlowError 102 } 103 104 samples := buffer.Bytes() 105 106 videoOutput = append(videoOutput, samples) 107 108 // clockTime := buffer.Duration() 109 // dur := clockTime.AsDuration() 110 // if dur != nil { 111 // log.Log(ctx, "video duration", "duration", *dur) 112 // } else { 113 // log.Error(ctx, "no video duration", "samples", len(samples)) 114 // } 115 116 return gst.FlowOK 117 }, 118 EOSFunc: func(sink *app.Sink) { 119 log.Debug(ctx, "videoappsink EOSFunc") 120 }, 121 }) 122 123 segDur := time.Duration(0) 124 125 audioappsink := app.SinkFromElement(audioSink) 126 audioappsink.SetCallbacks(&app.SinkCallbacks{ 127 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 128 sample := sink.PullSample() 129 if sample == nil { 130 log.Warn(ctx, "audioappsink NewSampleFunc EOS") 131 return gst.FlowEOS 132 } 133 134 buffer := sample.GetBuffer() 135 if buffer == nil { 136 return gst.FlowError 137 } 138 139 samples := buffer.Bytes() 140 141 audioOutput = append(audioOutput, samples) 142 143 clockTime := buffer.Duration() 144 dur := clockTime.AsDuration() 145 if dur != nil { 146 segDur += *dur 147 } else { 148 log.Error(ctx, "no audio duration", "samples", len(samples)) 149 return gst.FlowError 150 } 151 152 return gst.FlowOK 153 }, 154 EOSFunc: func(sink *app.Sink) { 155 log.Debug(ctx, "audioappsink EOSFunc") 156 }, 157 }) 158 159 busErr := make(chan error) 160 go func() { 161 err := HandleBusMessages(ctx, pipeline) 162 if err != nil { 163 log.Log(ctx, "pipeline error", "error", err) 164 } 165 busErr <- err 166 }() 167 168 err = pipeline.SetState(gst.StatePlaying) 169 if err != nil { 170 return nil, fmt.Errorf("failed to set pipeline to playing state: %w", err) 171 } 172 173 defer func() { 174 err = pipeline.SetState(gst.StateNull) 175 if err != nil { 176 log.Error(ctx, "failed to set pipeline to null state", "error", err) 177 } 178 err = pipeline.Remove(demuxBin.Element) 179 if err != nil { 180 log.Error(ctx, "failed to remove demux bin from bin", "error", err) 181 } 182 }() 183 184 err = <-busErr 185 if err != nil { 186 return nil, fmt.Errorf("packetize pipeline error filename=%s, error=%w", seg.Filepath, err) 187 } 188 189 return &bus.PacketizedSegment{ 190 Video: videoOutput, 191 Audio: audioOutput, 192 Duration: segDur, 193 }, nil 194}