Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.4 194 lines 5.1 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/log" 13) 14 15// take in a segment and return a bunch of packets suitable for webrtc 16func Packetize(ctx context.Context, seg *bus.Seg) (*bus.PacketizedSegment, error) { 17 18 ctx, cancel := context.WithTimeout(ctx, 10*time.Second) 19 defer cancel() 20 21 pipelineSlice := []string{ 22 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink sync=false name=videoappsink", 23 "opusparse name=audioparse ! appsink sync=false name=audioappsink", 24 } 25 26 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 27 if err != nil { 28 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 29 } 30 31 demuxBin, err := ConcatDemuxBin(ctx, seg) 32 if err != nil { 33 return nil, fmt.Errorf("failed to create concat bin: %w", err) 34 } 35 36 err = pipeline.Add(demuxBin.Element) 37 if err != nil { 38 return nil, fmt.Errorf("failed to add demux bin to bin: %w", err) 39 } 40 41 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0") 42 if demuxBinPadVideoSrc == nil { 43 return nil, fmt.Errorf("failed to get demux bin video src pad") 44 } 45 46 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0") 47 if demuxBinPadAudioSrc == nil { 48 return nil, fmt.Errorf("failed to get demux bin audio src pad") 49 } 50 51 videoParse, err := pipeline.GetElementByName("videoparse") 52 if err != nil { 53 return nil, fmt.Errorf("failed to get video parse element: %w", err) 54 } 55 56 audioParse, err := pipeline.GetElementByName("audioparse") 57 if err != nil { 58 return nil, fmt.Errorf("failed to get audio parse element: %w", err) 59 } 60 61 linked := demuxBinPadVideoSrc.Link(videoParse.GetStaticPad("sink")) 62 if linked != gst.PadLinkOK { 63 return nil, fmt.Errorf("failed to link demux bin video src pad to video parse element: %v", linked) 64 } 65 66 linked = demuxBinPadAudioSrc.Link(audioParse.GetStaticPad("sink")) 67 if linked != gst.PadLinkOK { 68 return nil, fmt.Errorf("failed to link demux bin audio src pad to audio parse element: %v", linked) 69 } 70 71 videoSink, err := pipeline.GetElementByName("videoappsink") 72 if err != nil { 73 return nil, fmt.Errorf("failed to get video appsink element: %w", err) 74 } 75 if videoSink == nil { 76 return nil, fmt.Errorf("failed to get video appsink element") 77 } 78 79 audioSink, err := pipeline.GetElementByName("audioappsink") 80 if err != nil { 81 return nil, fmt.Errorf("failed to get audio appsink element: %w", err) 82 } 83 if audioSink == nil { 84 return nil, fmt.Errorf("failed to get audio appsink element") 85 } 86 87 videoOutput := [][]byte{} 88 audioOutput := [][]byte{} 89 // eosCh := make(chan struct{}) 90 91 videoappsink := app.SinkFromElement(videoSink) 92 videoappsink.SetCallbacks(&app.SinkCallbacks{ 93 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 94 sample := sink.PullSample() 95 if sample == nil { 96 return gst.FlowEOS 97 } 98 99 buffer := sample.GetBuffer() 100 if buffer == nil { 101 return gst.FlowError 102 } 103 104 samples := buffer.Bytes() 105 106 videoOutput = append(videoOutput, samples) 107 108 // clockTime := buffer.Duration() 109 // dur := clockTime.AsDuration() 110 // if dur != nil { 111 // log.Log(ctx, "video duration", "duration", *dur) 112 // } else { 113 // log.Error(ctx, "no video duration", "samples", len(samples)) 114 // } 115 116 return gst.FlowOK 117 }, 118 EOSFunc: func(sink *app.Sink) { 119 log.Debug(ctx, "videoappsink EOSFunc") 120 }, 121 }) 122 123 segDur := time.Duration(0) 124 125 audioappsink := app.SinkFromElement(audioSink) 126 audioappsink.SetCallbacks(&app.SinkCallbacks{ 127 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 128 sample := sink.PullSample() 129 if sample == nil { 130 log.Warn(ctx, "audioappsink NewSampleFunc EOS") 131 return gst.FlowEOS 132 } 133 134 buffer := sample.GetBuffer() 135 if buffer == nil { 136 return gst.FlowError 137 } 138 139 samples := buffer.Bytes() 140 141 audioOutput = append(audioOutput, samples) 142 143 clockTime := buffer.Duration() 144 dur := clockTime.AsDuration() 145 if dur != nil { 146 segDur += *dur 147 } else { 148 log.Error(ctx, "no audio duration", "samples", len(samples)) 149 return gst.FlowError 150 } 151 152 return gst.FlowOK 153 }, 154 EOSFunc: func(sink *app.Sink) { 155 log.Debug(ctx, "audioappsink EOSFunc") 156 }, 157 }) 158 159 busErr := make(chan error) 160 go func() { 161 err := HandleBusMessages(ctx, pipeline) 162 if err != nil { 163 log.Log(ctx, "pipeline error", "error", err) 164 } 165 busErr <- err 166 }() 167 168 err = pipeline.SetState(gst.StatePlaying) 169 if err != nil { 170 return nil, fmt.Errorf("failed to set pipeline to playing state: %w", err) 171 } 172 173 defer func() { 174 err = pipeline.SetState(gst.StateNull) 175 if err != nil { 176 log.Error(ctx, "failed to set pipeline to null state", "error", err) 177 } 178 err = pipeline.Remove(demuxBin.Element) 179 if err != nil { 180 log.Error(ctx, "failed to remove demux bin from bin", "error", err) 181 } 182 }() 183 184 err = <-busErr 185 if err != nil { 186 return nil, fmt.Errorf("packetize pipeline error filename=%s, error=%w", seg.Filepath, err) 187 } 188 189 return &bus.PacketizedSegment{ 190 Video: videoOutput, 191 Audio: audioOutput, 192 Duration: segDur, 193 }, nil 194}