Live video on the AT Protocol
1package media
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "fmt"
9 "io"
10 "strconv"
11 "strings"
12
13 "github.com/go-gst/go-gst/gst"
14 "github.com/go-gst/go-gst/gst/app"
15 "go.opentelemetry.io/otel"
16 "stream.place/streamplace/pkg/localdb"
17 "stream.place/streamplace/pkg/log"
18)
19
20func padProbeEmpty(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
21 return gst.PadProbeOK
22}
23
24func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*localdb.SegmentMediaData, error) {
25 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData")
26 defer span.End()
27 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData")
28 ctx, cancel := context.WithCancel(ctx)
29 defer cancel()
30 pipelineSlice := []string{
31 "appsrc name=appsrc ! qtdemux name=demux",
32 "demux.video_0 ! queue ! tee name=videotee",
33 "videotee. ! queue ! h2642json ! appsink sync=false name=jsonappsink",
34 "videotee. ! queue ! appsink sync=false name=videoappsink",
35 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink",
36 }
37
38 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
39 if err != nil {
40 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
41 }
42
43 var videoMetadata *localdb.SegmentMediadataVideo
44 var audioMetadata *localdb.SegmentMediadataAudio
45
46 appsrc, err := pipeline.GetElementByName("appsrc")
47 if err != nil {
48 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
49 }
50
51 src := app.SrcFromElement(appsrc)
52 src.SetCallbacks(&app.SourceCallbacks{
53 NeedDataFunc: ReaderNeedDataIncremental(ctx, bytes.NewReader(mp4bs)),
54 })
55
56 foundSomeAudio := false
57 audioSinkElem, err := pipeline.GetElementByName("audioappsink")
58 if err != nil {
59 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
60 }
61 audioSink := app.SinkFromElement(audioSinkElem)
62 if audioSink == nil {
63 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
64 }
65 audioSink.SetCallbacks(&app.SinkCallbacks{
66 NewSampleFunc: ParseSegmentMediaDataSinkNewSampleFunc(ctx, &foundSomeAudio),
67 })
68
69 foundSomeVideo := false
70 videoSinkElem, err := pipeline.GetElementByName("videoappsink")
71 if err != nil {
72 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
73 }
74 videoSink := app.SinkFromElement(videoSinkElem)
75 if videoSink == nil {
76 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
77 }
78 videoSink.SetCallbacks(&app.SinkCallbacks{
79 NewSampleFunc: ParseSegmentMediaDataSinkNewSampleFunc(ctx, &foundSomeVideo),
80 })
81 padsAdded := 0
82
83 var padProbe func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn
84 padProbe = func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
85 if info.GetEvent().Type() != gst.EventTypeEOS {
86 return gst.PadProbeOK
87 }
88 if padsAdded != 2 {
89 err := fmt.Errorf("expected 2 tracks in input, got %d", padsAdded)
90 pipeline.Error(err.Error(), err)
91 }
92 padProbe = padProbeEmpty
93 return gst.PadProbeRemove
94 }
95
96 outerPadProbe := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
97 return padProbe(pad, info)
98 }
99
100 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
101 padsAdded += 1
102 caps := pad.GetCurrentCaps()
103 if caps == nil {
104 log.Warn(ctx, "Unable to get pad caps")
105 cancel()
106 return
107 }
108
109 pad.AddProbe(gst.PadProbeTypeEventBoth, outerPadProbe)
110
111 structure := caps.GetStructureAt(0)
112 if structure == nil {
113 log.Warn(ctx, "Unable to get structure from caps")
114 cancel()
115 return
116 }
117
118 name := structure.Name()
119
120 if name[:5] == "video" {
121 videoMetadata = &localdb.SegmentMediadataVideo{}
122 // Get some common video properties
123 widthVal, _ := structure.GetValue("width")
124 heightVal, _ := structure.GetValue("height")
125
126 width, ok := widthVal.(int)
127 if ok {
128 videoMetadata.Width = width
129 }
130 height, ok := heightVal.(int)
131 if ok {
132 videoMetadata.Height = height
133 }
134 framerateVal, _ := structure.GetValue("framerate")
135 framerateStr := fmt.Sprintf("%v", framerateVal)
136 parts := strings.Split(framerateStr, "/")
137 num := 0
138 den := 0
139 if len(parts) == 2 {
140 num, _ = strconv.Atoi(parts[0])
141 den, _ = strconv.Atoi(parts[1])
142 }
143 if num != 0 && den != 0 {
144 videoMetadata.FPSNum = num
145 videoMetadata.FPSDen = den
146 }
147 }
148
149 if name[:5] == "audio" {
150 audioMetadata = &localdb.SegmentMediadataAudio{}
151 // Get some common audio properties
152 rateVal, _ := structure.GetValue("rate")
153 channelsVal, _ := structure.GetValue("channels")
154
155 rate, ok := rateVal.(int)
156 if ok {
157 audioMetadata.Rate = rate
158 }
159 channels, ok := channelsVal.(int)
160 if ok {
161 audioMetadata.Channels = channels
162 }
163 }
164
165 // if videoMetadata != nil && audioMetadata != nil {
166 // cancel()
167 // }
168 }
169
170 demux, err := pipeline.GetElementByName("demux")
171 if err != nil {
172 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
173 }
174 _, err = demux.Connect("pad-added", onPadAdded)
175 if err != nil {
176 return nil, fmt.Errorf("error connecting pad-add: %w", err)
177 }
178
179 jsonSinkElem, err := pipeline.GetElementByName("jsonappsink")
180 if err != nil {
181 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
182 }
183 jsonSink := app.SinkFromElement(jsonSinkElem)
184 if jsonSink == nil {
185 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
186 }
187
188 hasBFrames := false
189
190 r, w := io.Pipe()
191 bufW := bufio.NewWriter(w)
192 decoder := json.NewDecoder(r)
193
194 decodeErr := make(chan error)
195 go func() {
196 for {
197 var obj map[string]any
198 err := decoder.Decode(&obj)
199 if err == io.EOF {
200 decodeErr <- nil
201 break // End of stream
202 }
203 if err != nil {
204 decodeErr <- err
205 break
206 }
207 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L836
208 header, ok := obj["slice header"].(map[string]any)
209 if !ok {
210 continue
211 }
212 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L622
213 flag, ok := header["direct spatial mv pred flag"].(bool)
214 if ok && flag {
215 hasBFrames = true
216 }
217 }
218 close(decodeErr)
219 }()
220
221 jsonSink.SetCallbacks(&app.SinkCallbacks{
222 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
223 sample := sink.PullSample()
224 if sample == nil {
225 return gst.FlowOK
226 }
227
228 buf := sample.GetBuffer().Bytes()
229 _, err := bufW.Write(buf)
230 if err != nil {
231 log.Error(ctx, "failed to write to buffer", "error", err)
232 return gst.FlowError
233 }
234
235 return gst.FlowOK
236 },
237 })
238
239 go func() {
240 if err := HandleBusMessages(ctx, pipeline); err != nil {
241 log.Log(ctx, "pipeline error", "error", err)
242 }
243 cancel()
244 }()
245
246 // Start the pipeline
247 if err := pipeline.SetState(gst.StatePlaying); err != nil {
248 return nil, err
249 }
250
251 defer func() {
252 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
253 log.Error(ctx, "error setting pipeline state to null", "error", err)
254 }
255 }()
256
257 <-ctx.Done()
258
259 err = w.Close()
260 if err != nil {
261 return nil, fmt.Errorf("error closing writer: %w", err)
262 }
263
264 err = <-decodeErr
265 if err != nil {
266 return nil, fmt.Errorf("error decoding JSON object: %w", err)
267 }
268
269 if videoMetadata == nil || !foundSomeVideo {
270 return nil, fmt.Errorf("no video in segment")
271 }
272 if audioMetadata == nil || !foundSomeAudio {
273 return nil, fmt.Errorf("no audio in segment")
274 }
275
276 videoMetadata.BFrames = hasBFrames
277
278 meta := &localdb.SegmentMediaData{
279 Video: []*localdb.SegmentMediadataVideo{videoMetadata},
280 Audio: []*localdb.SegmentMediadataAudio{audioMetadata},
281 }
282
283 ok, dur := pipeline.QueryDuration(gst.FormatTime)
284 if !ok {
285 return nil, fmt.Errorf("error getting duration")
286 } else {
287 meta.Duration = dur
288 }
289
290 return meta, nil
291}
292
293func ParseSegmentMediaDataSinkNewSampleFunc(ctx context.Context, foundThisTrack *bool) func(sink *app.Sink) gst.FlowReturn {
294 return func(sink *app.Sink) gst.FlowReturn {
295 sample := sink.PullSample()
296 if sample == nil {
297 return gst.FlowOK
298 }
299 buf := sample.GetBuffer()
300 if buf == nil {
301 return gst.FlowError
302 }
303 dur := buf.Duration().AsDuration()
304 if dur != nil && *dur > 0 {
305 *foundThisTrack = true
306 } else {
307 log.Warn(ctx, "no duration found for track", "track", sink.GetName())
308 }
309 return gst.FlowOK
310 }
311}