Live video on the AT Protocol
1package media
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "fmt"
9 "io"
10 "strconv"
11 "strings"
12
13 "github.com/go-gst/go-gst/gst"
14 "github.com/go-gst/go-gst/gst/app"
15 "go.opentelemetry.io/otel"
16 "stream.place/streamplace/pkg/log"
17 "stream.place/streamplace/pkg/model"
18)
19
20func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) {
21 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData")
22 defer span.End()
23 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData")
24 ctx, cancel := context.WithCancel(ctx)
25 defer cancel()
26 pipelineSlice := []string{
27 "appsrc name=appsrc ! qtdemux name=demux",
28 "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! h2642json ! appsink sync=false name=jsonappsink",
29 "demux.audio_0 ! queue ! opusparse name=audioparse ! fakesink sync=false",
30 }
31
32 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
33 if err != nil {
34 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
35 }
36
37 var videoMetadata *model.SegmentMediadataVideo
38 var audioMetadata *model.SegmentMediadataAudio
39
40 appsrc, err := pipeline.GetElementByName("appsrc")
41 if err != nil {
42 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
43 }
44
45 src := app.SrcFromElement(appsrc)
46 src.SetCallbacks(&app.SourceCallbacks{
47 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(mp4bs)),
48 })
49
50 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
51 caps := pad.GetCurrentCaps()
52 if caps == nil {
53 log.Warn(ctx, "Unable to get pad caps")
54 cancel()
55 return
56 }
57
58 structure := caps.GetStructureAt(0)
59 if structure == nil {
60 log.Warn(ctx, "Unable to get structure from caps")
61 cancel()
62 return
63 }
64
65 name := structure.Name()
66
67 if name[:5] == "video" {
68 videoMetadata = &model.SegmentMediadataVideo{}
69 // Get some common video properties
70 widthVal, _ := structure.GetValue("width")
71 heightVal, _ := structure.GetValue("height")
72
73 width, ok := widthVal.(int)
74 if ok {
75 videoMetadata.Width = width
76 }
77 height, ok := heightVal.(int)
78 if ok {
79 videoMetadata.Height = height
80 }
81 framerateVal, _ := structure.GetValue("framerate")
82 framerateStr := fmt.Sprintf("%v", framerateVal)
83 parts := strings.Split(framerateStr, "/")
84 num := 0
85 den := 0
86 if len(parts) == 2 {
87 num, _ = strconv.Atoi(parts[0])
88 den, _ = strconv.Atoi(parts[1])
89 }
90 if num != 0 && den != 0 {
91 videoMetadata.FPSNum = num
92 videoMetadata.FPSDen = den
93 }
94 }
95
96 if name[:5] == "audio" {
97 audioMetadata = &model.SegmentMediadataAudio{}
98 // Get some common audio properties
99 rateVal, _ := structure.GetValue("rate")
100 channelsVal, _ := structure.GetValue("channels")
101
102 rate, ok := rateVal.(int)
103 if ok {
104 audioMetadata.Rate = rate
105 }
106 channels, ok := channelsVal.(int)
107 if ok {
108 audioMetadata.Channels = channels
109 }
110 }
111
112 // if videoMetadata != nil && audioMetadata != nil {
113 // cancel()
114 // }
115 }
116
117 demux, err := pipeline.GetElementByName("demux")
118 if err != nil {
119 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
120 }
121 _, err = demux.Connect("pad-added", onPadAdded)
122 if err != nil {
123 return nil, fmt.Errorf("error connecting pad-add: %w", err)
124 }
125
126 jsonSinkElem, err := pipeline.GetElementByName("jsonappsink")
127 if err != nil {
128 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
129 }
130 jsonSink := app.SinkFromElement(jsonSinkElem)
131 if jsonSink == nil {
132 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
133 }
134
135 hasBFrames := false
136
137 r, w := io.Pipe()
138 bufW := bufio.NewWriter(w)
139 decoder := json.NewDecoder(r)
140
141 decodeErr := make(chan error)
142 go func() {
143 for {
144 var obj map[string]any
145 err := decoder.Decode(&obj)
146 if err == io.EOF {
147 decodeErr <- nil
148 break // End of stream
149 }
150 if err != nil {
151 decodeErr <- err
152 break
153 }
154 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L836
155 header, ok := obj["slice header"].(map[string]any)
156 if !ok {
157 continue
158 }
159 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L622
160 flag, ok := header["direct spatial mv pred flag"].(bool)
161 if ok && flag {
162 hasBFrames = true
163 }
164 }
165 close(decodeErr)
166 }()
167
168 jsonSink.SetCallbacks(&app.SinkCallbacks{
169 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
170 sample := sink.PullSample()
171 if sample == nil {
172 return gst.FlowOK
173 }
174
175 buf := sample.GetBuffer().Bytes()
176 _, err := bufW.Write(buf)
177 if err != nil {
178 log.Error(ctx, "failed to write to buffer", "error", err)
179 return gst.FlowError
180 }
181
182 return gst.FlowOK
183 },
184 })
185
186 go func() {
187 if err := HandleBusMessages(ctx, pipeline); err != nil {
188 log.Log(ctx, "pipeline error", "error", err)
189 }
190 cancel()
191 }()
192
193 // Start the pipeline
194 if err := pipeline.SetState(gst.StatePlaying); err != nil {
195 return nil, err
196 }
197
198 defer func() {
199 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
200 log.Error(ctx, "error setting pipeline state to null", "error", err)
201 }
202 }()
203
204 <-ctx.Done()
205
206 err = w.Close()
207 if err != nil {
208 return nil, fmt.Errorf("error closing writer: %w", err)
209 }
210
211 err = <-decodeErr
212 if err != nil {
213 return nil, fmt.Errorf("error decoding JSON object: %w", err)
214 }
215
216 if videoMetadata == nil {
217 return nil, fmt.Errorf("no video metadata")
218 }
219 if audioMetadata == nil {
220 return nil, fmt.Errorf("no audio metadata")
221 }
222
223 videoMetadata.BFrames = hasBFrames
224
225 meta := &model.SegmentMediaData{
226 Video: []*model.SegmentMediadataVideo{videoMetadata},
227 Audio: []*model.SegmentMediadataAudio{audioMetadata},
228 }
229
230 ok, dur := pipeline.QueryDuration(gst.FormatTime)
231 if !ok {
232 return nil, fmt.Errorf("error getting duration")
233 } else {
234 meta.Duration = dur
235 }
236
237 return meta, nil
238}