Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "stream.place/streamplace/pkg/bus"
12 "stream.place/streamplace/pkg/log"
13)
14
15// take in a segment and return a bunch of packets suitable for webrtc
16func Packetize(ctx context.Context, seg *bus.Seg) (*bus.PacketizedSegment, error) {
17
18 pipelineSlice := []string{
19 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink sync=false name=videoappsink",
20 "opusparse name=audioparse ! appsink sync=false name=audioappsink",
21 }
22
23 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
24 if err != nil {
25 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all
26 }
27
28 demuxBin, err := ConcatDemuxBin(ctx, seg)
29 if err != nil {
30 return nil, fmt.Errorf("failed to create concat bin: %w", err)
31 }
32
33 err = pipeline.Add(demuxBin.Element)
34 if err != nil {
35 return nil, fmt.Errorf("failed to add demux bin to bin: %w", err)
36 }
37
38 demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0")
39 if demuxBinPadVideoSrc == nil {
40 return nil, fmt.Errorf("failed to get demux bin video src pad")
41 }
42
43 demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0")
44 if demuxBinPadAudioSrc == nil {
45 return nil, fmt.Errorf("failed to get demux bin audio src pad")
46 }
47
48 videoParse, err := pipeline.GetElementByName("videoparse")
49 if err != nil {
50 return nil, fmt.Errorf("failed to get video parse element: %w", err)
51 }
52
53 audioParse, err := pipeline.GetElementByName("audioparse")
54 if err != nil {
55 return nil, fmt.Errorf("failed to get audio parse element: %w", err)
56 }
57
58 linked := demuxBinPadVideoSrc.Link(videoParse.GetStaticPad("sink"))
59 if linked != gst.PadLinkOK {
60 return nil, fmt.Errorf("failed to link demux bin video src pad to video parse element: %v", linked)
61 }
62
63 linked = demuxBinPadAudioSrc.Link(audioParse.GetStaticPad("sink"))
64 if linked != gst.PadLinkOK {
65 return nil, fmt.Errorf("failed to link demux bin audio src pad to audio parse element: %v", linked)
66 }
67
68 videoSink, err := pipeline.GetElementByName("videoappsink")
69 if err != nil {
70 return nil, fmt.Errorf("failed to get video appsink element: %w", err)
71 }
72 if videoSink == nil {
73 return nil, fmt.Errorf("failed to get video appsink element")
74 }
75
76 audioSink, err := pipeline.GetElementByName("audioappsink")
77 if err != nil {
78 return nil, fmt.Errorf("failed to get audio appsink element: %w", err)
79 }
80 if audioSink == nil {
81 return nil, fmt.Errorf("failed to get audio appsink element")
82 }
83
84 videoOutput := [][]byte{}
85 audioOutput := [][]byte{}
86 // eosCh := make(chan struct{})
87
88 videoappsink := app.SinkFromElement(videoSink)
89 videoappsink.SetCallbacks(&app.SinkCallbacks{
90 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
91 sample := sink.PullSample()
92 if sample == nil {
93 return gst.FlowEOS
94 }
95
96 buffer := sample.GetBuffer()
97 if buffer == nil {
98 return gst.FlowError
99 }
100
101 samples := buffer.Map(gst.MapRead).Bytes()
102 defer buffer.Unmap()
103
104 videoOutput = append(videoOutput, samples)
105
106 // clockTime := buffer.Duration()
107 // dur := clockTime.AsDuration()
108 // if dur != nil {
109 // log.Log(ctx, "video duration", "duration", *dur)
110 // } else {
111 // log.Error(ctx, "no video duration", "samples", len(samples))
112 // }
113
114 return gst.FlowOK
115 },
116 EOSFunc: func(sink *app.Sink) {
117 log.Debug(ctx, "videoappsink EOSFunc")
118 // go func() {
119 // eosCh <- struct{}{}
120 // }()
121 },
122 })
123
124 segDur := time.Duration(0)
125
126 audioappsink := app.SinkFromElement(audioSink)
127 audioappsink.SetCallbacks(&app.SinkCallbacks{
128 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
129 sample := sink.PullSample()
130 if sample == nil {
131 return gst.FlowEOS
132 }
133
134 buffer := sample.GetBuffer()
135 if buffer == nil {
136 return gst.FlowError
137 }
138
139 samples := buffer.Map(gst.MapRead).Bytes()
140 defer buffer.Unmap()
141
142 audioOutput = append(audioOutput, samples)
143
144 clockTime := buffer.Duration()
145 dur := clockTime.AsDuration()
146 if dur != nil {
147 segDur += *dur
148 } else {
149 log.Log(ctx, "no audio duration", "samples", len(samples))
150 err := fmt.Errorf("no audio duration")
151 pipeline.Error(err.Error(), err)
152 return gst.FlowError
153 }
154
155 return gst.FlowOK
156 },
157 EOSFunc: func(sink *app.Sink) {
158 log.Debug(ctx, "audioappsink EOSFunc")
159 // go func() {
160 // eosCh <- struct{}{}
161 // }()
162 },
163 })
164
165 busErr := make(chan error)
166 go func() {
167 err := HandleBusMessages(ctx, pipeline)
168 if err != nil {
169 log.Log(ctx, "pipeline error", "error", err)
170 }
171 busErr <- err
172 }()
173
174 err = pipeline.SetState(gst.StatePlaying)
175 if err != nil {
176 return nil, fmt.Errorf("failed to set pipeline to playing state: %w", err)
177 }
178
179 defer func() {
180 err = pipeline.SetState(gst.StateNull)
181 if err != nil {
182 log.Error(ctx, "failed to set pipeline to null state", "error", err)
183 }
184 }()
185
186 // <-eosCh
187 // <-eosCh
188
189 err = <-busErr
190 if err != nil {
191 return nil, fmt.Errorf("pipeline error: %w", err)
192 }
193
194 return &bus.PacketizedSegment{
195 Video: videoOutput,
196 Audio: audioOutput,
197 Duration: segDur,
198 }, nil
199}