fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package media
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "fmt"
9 "io"
10 "strconv"
11 "strings"
12
13 "github.com/go-gst/go-gst/gst"
14 "github.com/go-gst/go-gst/gst/app"
15 "go.opentelemetry.io/otel"
16 "stream.place/streamplace/pkg/log"
17 "stream.place/streamplace/pkg/model"
18)
19
20func padProbeEmpty(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
21 return gst.PadProbeOK
22}
23
24func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) {
25 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData")
26 defer span.End()
27 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData")
28 ctx, cancel := context.WithCancel(ctx)
29 defer cancel()
30 pipelineSlice := []string{
31 "appsrc name=appsrc ! qtdemux name=demux",
32 "demux.video_0 ! queue ! tee name=videotee",
33 "videotee. ! queue ! h2642json ! appsink sync=false name=jsonappsink",
34 "videotee. ! queue ! appsink sync=false name=videoappsink",
35 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink",
36 }
37
38 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
39 if err != nil {
40 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
41 }
42
43 var videoMetadata *model.SegmentMediadataVideo
44 var audioMetadata *model.SegmentMediadataAudio
45
46 appsrc, err := pipeline.GetElementByName("appsrc")
47 if err != nil {
48 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
49 }
50
51 src := app.SrcFromElement(appsrc)
52 src.SetCallbacks(&app.SourceCallbacks{
53 NeedDataFunc: ReaderNeedDataIncremental(ctx, bytes.NewReader(mp4bs)),
54 })
55
56 foundSomeAudio := false
57 audioSinkElem, err := pipeline.GetElementByName("audioappsink")
58 if err != nil {
59 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
60 }
61 audioSink := app.SinkFromElement(audioSinkElem)
62 if audioSink == nil {
63 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
64 }
65 audioSink.SetCallbacks(&app.SinkCallbacks{
66 NewSampleFunc: ParseSegmentMediaDataSinkNewSampleFunc(ctx, &foundSomeAudio),
67 })
68
69 foundSomeVideo := false
70 videoSinkElem, err := pipeline.GetElementByName("videoappsink")
71 if err != nil {
72 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
73 }
74 videoSink := app.SinkFromElement(videoSinkElem)
75 if videoSink == nil {
76 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
77 }
78 videoSink.SetCallbacks(&app.SinkCallbacks{
79 NewSampleFunc: ParseSegmentMediaDataSinkNewSampleFunc(ctx, &foundSomeVideo),
80 })
81 padsAdded := 0
82
83 var padProbe func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn
84 padProbe = func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
85 if info.GetEvent().Type() != gst.EventTypeEOS {
86 return gst.PadProbeOK
87 }
88 if padsAdded != 2 {
89 err := fmt.Errorf("expected 2 tracks in input, got %d", padsAdded)
90 pipeline.Error(err.Error(), err)
91 }
92 padProbe = padProbeEmpty
93 return gst.PadProbeRemove
94 }
95
96 outerPadProbe := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
97 return padProbe(pad, info)
98 }
99
100 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
101 padsAdded += 1
102 caps := pad.GetCurrentCaps()
103 if caps == nil {
104 log.Warn(ctx, "Unable to get pad caps")
105 cancel()
106 return
107 }
108
109 pad.AddProbe(gst.PadProbeTypeEventBoth, outerPadProbe)
110
111 structure := caps.GetStructureAt(0)
112 if structure == nil {
113 log.Warn(ctx, "Unable to get structure from caps")
114 cancel()
115 return
116 }
117
118 name := structure.Name()
119
120 if name[:5] == "video" {
121 videoMetadata = &model.SegmentMediadataVideo{}
122 // Get some common video properties
123 widthVal, _ := structure.GetValue("width")
124 heightVal, _ := structure.GetValue("height")
125
126 width, ok := widthVal.(int)
127 if ok {
128 videoMetadata.Width = width
129 }
130 height, ok := heightVal.(int)
131 if ok {
132 videoMetadata.Height = height
133 }
134 framerateVal, _ := structure.GetValue("framerate")
135 framerateStr := fmt.Sprintf("%v", framerateVal)
136 parts := strings.Split(framerateStr, "/")
137 num := 0
138 den := 0
139 if len(parts) == 2 {
140 num, _ = strconv.Atoi(parts[0])
141 den, _ = strconv.Atoi(parts[1])
142 }
143 if num != 0 && den != 0 {
144 videoMetadata.FPSNum = num
145 videoMetadata.FPSDen = den
146 }
147 }
148
149 if name[:5] == "audio" {
150 audioMetadata = &model.SegmentMediadataAudio{}
151 // Get some common audio properties
152 rateVal, _ := structure.GetValue("rate")
153 channelsVal, _ := structure.GetValue("channels")
154
155 rate, ok := rateVal.(int)
156 if ok {
157 audioMetadata.Rate = rate
158 }
159 channels, ok := channelsVal.(int)
160 if ok {
161 audioMetadata.Channels = channels
162 }
163 }
164
165 // if videoMetadata != nil && audioMetadata != nil {
166 // cancel()
167 // }
168 }
169
170 demux, err := pipeline.GetElementByName("demux")
171 if err != nil {
172 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
173 }
174 _, err = demux.Connect("pad-added", onPadAdded)
175 if err != nil {
176 return nil, fmt.Errorf("error connecting pad-add: %w", err)
177 }
178
179 jsonSinkElem, err := pipeline.GetElementByName("jsonappsink")
180 if err != nil {
181 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
182 }
183 jsonSink := app.SinkFromElement(jsonSinkElem)
184 if jsonSink == nil {
185 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
186 }
187
188 hasBFrames := false
189
190 r, w := io.Pipe()
191 bufW := bufio.NewWriter(w)
192 decoder := json.NewDecoder(r)
193
194 decodeErr := make(chan error)
195 go func() {
196 for {
197 var obj map[string]any
198 err := decoder.Decode(&obj)
199 if err == io.EOF {
200 decodeErr <- nil
201 break // End of stream
202 }
203 if err != nil {
204 decodeErr <- err
205 break
206 }
207 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L836
208 header, ok := obj["slice header"].(map[string]any)
209 if !ok {
210 continue
211 }
212 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L622
213 flag, ok := header["direct spatial mv pred flag"].(bool)
214 if ok && flag {
215 hasBFrames = true
216 }
217 }
218 close(decodeErr)
219 }()
220
221 jsonSink.SetCallbacks(&app.SinkCallbacks{
222 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
223 sample := sink.PullSample()
224 if sample == nil {
225 return gst.FlowOK
226 }
227
228 buf := sample.GetBuffer().Bytes()
229 _, err := bufW.Write(buf)
230 if err != nil {
231 log.Error(ctx, "failed to write to buffer", "error", err)
232 return gst.FlowError
233 }
234
235 return gst.FlowOK
236 },
237 })
238
239 go func() {
240 if err := HandleBusMessages(ctx, pipeline); err != nil {
241 log.Log(ctx, "pipeline error", "error", err)
242 }
243 cancel()
244 }()
245
246 // Start the pipeline
247 if err := pipeline.SetState(gst.StatePlaying); err != nil {
248 return nil, err
249 }
250
251 defer func() {
252 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
253 log.Error(ctx, "error setting pipeline state to null", "error", err)
254 }
255 }()
256
257 <-ctx.Done()
258
259 err = w.Close()
260 if err != nil {
261 return nil, fmt.Errorf("error closing writer: %w", err)
262 }
263
264 err = <-decodeErr
265 if err != nil {
266 return nil, fmt.Errorf("error decoding JSON object: %w", err)
267 }
268
269 if videoMetadata == nil || !foundSomeVideo {
270 return nil, fmt.Errorf("no video in segment")
271 }
272 if audioMetadata == nil || !foundSomeAudio {
273 return nil, fmt.Errorf("no audio in segment")
274 }
275
276 videoMetadata.BFrames = hasBFrames
277
278 meta := &model.SegmentMediaData{
279 Video: []*model.SegmentMediadataVideo{videoMetadata},
280 Audio: []*model.SegmentMediadataAudio{audioMetadata},
281 }
282
283 ok, dur := pipeline.QueryDuration(gst.FormatTime)
284 if !ok {
285 return nil, fmt.Errorf("error getting duration")
286 } else {
287 meta.Duration = dur
288 }
289
290 return meta, nil
291}
292
293func ParseSegmentMediaDataSinkNewSampleFunc(ctx context.Context, foundThisTrack *bool) func(sink *app.Sink) gst.FlowReturn {
294 return func(sink *app.Sink) gst.FlowReturn {
295 sample := sink.PullSample()
296 if sample == nil {
297 return gst.FlowOK
298 }
299 buf := sample.GetBuffer()
300 if buf == nil {
301 return gst.FlowError
302 }
303 dur := buf.Duration().AsDuration()
304 if dur != nil && *dur > 0 {
305 *foundThisTrack = true
306 } else {
307 log.Warn(ctx, "no duration found for track", "track", sink.GetName())
308 }
309 return gst.FlowOK
310 }
311}