Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/fix-thumbnail-explosion 199 lines 5.1 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/log" 13) 14 15// take in a segment and return a bunch of packets suitable for webrtc 16func Packetize(ctx context.Context, seg *bus.Seg) (*bus.PacketizedSegment, error) { 17 18 pipelineSlice := []string{ 19 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink sync=false name=videoappsink", 20 "opusparse name=audioparse ! appsink sync=false name=audioappsink", 21 } 22 23 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 24 if err != nil { 25 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 26 } 27 28 demuxBin, err := ConcatDemuxBin(ctx, seg) 29 if err != nil { 30 return nil, fmt.Errorf("failed to create concat bin: %w", err) 31 } 32 33 err = pipeline.Add(demuxBin.Element) 34 if err != nil { 35 return nil, fmt.Errorf("failed to add demux bin to bin: %w", err) 36 } 37 38 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0") 39 if demuxBinPadVideoSrc == nil { 40 return nil, fmt.Errorf("failed to get demux bin video src pad") 41 } 42 43 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0") 44 if demuxBinPadAudioSrc == nil { 45 return nil, fmt.Errorf("failed to get demux bin audio src pad") 46 } 47 48 videoParse, err := pipeline.GetElementByName("videoparse") 49 if err != nil { 50 return nil, fmt.Errorf("failed to get video parse element: %w", err) 51 } 52 53 audioParse, err := pipeline.GetElementByName("audioparse") 54 if err != nil { 55 return nil, fmt.Errorf("failed to get audio parse element: %w", err) 56 } 57 58 linked := demuxBinPadVideoSrc.Link(videoParse.GetStaticPad("sink")) 59 if linked != gst.PadLinkOK { 60 return nil, fmt.Errorf("failed to link demux bin video src pad to video parse element: %v", linked) 61 } 62 63 linked = demuxBinPadAudioSrc.Link(audioParse.GetStaticPad("sink")) 64 if linked != gst.PadLinkOK { 65 return nil, fmt.Errorf("failed to link demux bin audio src pad to audio parse element: %v", linked) 66 } 67 68 videoSink, err := pipeline.GetElementByName("videoappsink") 69 if err != nil { 70 return nil, fmt.Errorf("failed to get video appsink element: %w", err) 71 } 72 if videoSink == nil { 73 return nil, fmt.Errorf("failed to get video appsink element") 74 } 75 76 audioSink, err := pipeline.GetElementByName("audioappsink") 77 if err != nil { 78 return nil, fmt.Errorf("failed to get audio appsink element: %w", err) 79 } 80 if audioSink == nil { 81 return nil, fmt.Errorf("failed to get audio appsink element") 82 } 83 84 videoOutput := [][]byte{} 85 audioOutput := [][]byte{} 86 // eosCh := make(chan struct{}) 87 88 videoappsink := app.SinkFromElement(videoSink) 89 videoappsink.SetCallbacks(&app.SinkCallbacks{ 90 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 91 sample := sink.PullSample() 92 if sample == nil { 93 return gst.FlowEOS 94 } 95 96 buffer := sample.GetBuffer() 97 if buffer == nil { 98 return gst.FlowError 99 } 100 101 samples := buffer.Map(gst.MapRead).Bytes() 102 defer buffer.Unmap() 103 104 videoOutput = append(videoOutput, samples) 105 106 // clockTime := buffer.Duration() 107 // dur := clockTime.AsDuration() 108 // if dur != nil { 109 // log.Log(ctx, "video duration", "duration", *dur) 110 // } else { 111 // log.Error(ctx, "no video duration", "samples", len(samples)) 112 // } 113 114 return gst.FlowOK 115 }, 116 EOSFunc: func(sink *app.Sink) { 117 log.Debug(ctx, "videoappsink EOSFunc") 118 // go func() { 119 // eosCh <- struct{}{} 120 // }() 121 }, 122 }) 123 124 segDur := time.Duration(0) 125 126 audioappsink := app.SinkFromElement(audioSink) 127 audioappsink.SetCallbacks(&app.SinkCallbacks{ 128 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 129 sample := sink.PullSample() 130 if sample == nil { 131 return gst.FlowEOS 132 } 133 134 buffer := sample.GetBuffer() 135 if buffer == nil { 136 return gst.FlowError 137 } 138 139 samples := buffer.Map(gst.MapRead).Bytes() 140 defer buffer.Unmap() 141 142 audioOutput = append(audioOutput, samples) 143 144 clockTime := buffer.Duration() 145 dur := clockTime.AsDuration() 146 if dur != nil { 147 segDur += *dur 148 } else { 149 log.Log(ctx, "no audio duration", "samples", len(samples)) 150 err := fmt.Errorf("no audio duration") 151 pipeline.Error(err.Error(), err) 152 return gst.FlowError 153 } 154 155 return gst.FlowOK 156 }, 157 EOSFunc: func(sink *app.Sink) { 158 log.Debug(ctx, "audioappsink EOSFunc") 159 // go func() { 160 // eosCh <- struct{}{} 161 // }() 162 }, 163 }) 164 165 busErr := make(chan error) 166 go func() { 167 err := HandleBusMessages(ctx, pipeline) 168 if err != nil { 169 log.Log(ctx, "pipeline error", "error", err) 170 } 171 busErr <- err 172 }() 173 174 err = pipeline.SetState(gst.StatePlaying) 175 if err != nil { 176 return nil, fmt.Errorf("failed to set pipeline to playing state: %w", err) 177 } 178 179 defer func() { 180 err = pipeline.SetState(gst.StateNull) 181 if err != nil { 182 log.Error(ctx, "failed to set pipeline to null state", "error", err) 183 } 184 }() 185 186 // <-eosCh 187 // <-eosCh 188 189 err = <-busErr 190 if err != nil { 191 return nil, fmt.Errorf("pipeline error: %w", err) 192 } 193 194 return &bus.PacketizedSegment{ 195 Video: videoOutput, 196 Audio: audioOutput, 197 Duration: segDur, 198 }, nil 199}