Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "strconv"
8 "strings"
9
10 "github.com/go-gst/go-gst/gst"
11 "github.com/go-gst/go-gst/gst/app"
12 "go.opentelemetry.io/otel"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/model"
15)
16
17func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) {
18 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData")
19 defer span.End()
20 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData")
21 ctx, cancel := context.WithCancel(ctx)
22 defer cancel()
23 pipelineSlice := []string{
24 "appsrc name=appsrc ! qtdemux name=demux",
25 "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! h264timestamper ! appsink sync=false name=videoappsink",
26 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink",
27 }
28
29 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
30 if err != nil {
31 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
32 }
33
34 var videoMetadata *model.SegmentMediadataVideo
35 var audioMetadata *model.SegmentMediadataAudio
36
37 appsrc, err := pipeline.GetElementByName("appsrc")
38 if err != nil {
39 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
40 }
41
42 src := app.SrcFromElement(appsrc)
43 src.SetCallbacks(&app.SourceCallbacks{
44 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(mp4bs)),
45 })
46
47 onPadAdded := func(element *gst.Element, pad *gst.Pad) {
48 caps := pad.GetCurrentCaps()
49 if caps == nil {
50 log.Warn(ctx, "Unable to get pad caps")
51 cancel()
52 return
53 }
54
55 structure := caps.GetStructureAt(0)
56 if structure == nil {
57 log.Warn(ctx, "Unable to get structure from caps")
58 cancel()
59 return
60 }
61
62 name := structure.Name()
63
64 if name[:5] == "video" {
65 videoMetadata = &model.SegmentMediadataVideo{}
66 // Get some common video properties
67 widthVal, _ := structure.GetValue("width")
68 heightVal, _ := structure.GetValue("height")
69
70 width, ok := widthVal.(int)
71 if ok {
72 videoMetadata.Width = width
73 }
74 height, ok := heightVal.(int)
75 if ok {
76 videoMetadata.Height = height
77 }
78 framerateVal, _ := structure.GetValue("framerate")
79 framerateStr := fmt.Sprintf("%v", framerateVal)
80 parts := strings.Split(framerateStr, "/")
81 num := 0
82 den := 0
83 if len(parts) == 2 {
84 num, _ = strconv.Atoi(parts[0])
85 den, _ = strconv.Atoi(parts[1])
86 }
87 if num != 0 && den != 0 {
88 videoMetadata.FPSNum = num
89 videoMetadata.FPSDen = den
90 }
91 }
92
93 if name[:5] == "audio" {
94 audioMetadata = &model.SegmentMediadataAudio{}
95 // Get some common audio properties
96 rateVal, _ := structure.GetValue("rate")
97 channelsVal, _ := structure.GetValue("channels")
98
99 rate, ok := rateVal.(int)
100 if ok {
101 audioMetadata.Rate = rate
102 }
103 channels, ok := channelsVal.(int)
104 if ok {
105 audioMetadata.Channels = channels
106 }
107 }
108
109 // if videoMetadata != nil && audioMetadata != nil {
110 // cancel()
111 // }
112 }
113
114 demux, err := pipeline.GetElementByName("demux")
115 if err != nil {
116 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err)
117 }
118 _, err = demux.Connect("pad-added", onPadAdded)
119 if err != nil {
120 return nil, fmt.Errorf("error connecting pad-add: %w", err)
121 }
122
123 audioSinkElem, err := pipeline.GetElementByName("audioappsink")
124 if err != nil {
125 return nil, fmt.Errorf("failed to get audioappsink element: %w", err)
126 }
127 audioSink := app.SinkFromElement(audioSinkElem)
128 if audioSink == nil {
129 return nil, fmt.Errorf("failed to get audioappsink element: %w", err)
130 }
131
132 audioSink.SetCallbacks(&app.SinkCallbacks{
133 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
134 sample := sink.PullSample()
135 if sample == nil {
136 return gst.FlowOK
137 }
138
139 return gst.FlowOK
140 },
141 })
142
143 videoSinkElem, err := pipeline.GetElementByName("videoappsink")
144 if err != nil {
145 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
146 }
147 videoSink := app.SinkFromElement(videoSinkElem)
148 if videoSink == nil {
149 return nil, fmt.Errorf("failed to get videoappsink element: %w", err)
150 }
151
152 hasBFrames := false
153 videoSink.SetCallbacks(&app.SinkCallbacks{
154 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
155 sample := sink.PullSample()
156 if sample == nil {
157 return gst.FlowOK
158 }
159
160 buf := sample.GetBuffer()
161 pts := buf.PresentationTimestamp().String()
162 dts := buf.DecodingTimestamp().String()
163
164 if pts != dts {
165 hasBFrames = true
166 }
167
168 return gst.FlowOK
169 },
170 })
171
172 go func() {
173 if err := HandleBusMessages(ctx, pipeline); err != nil {
174 log.Log(ctx, "pipeline error", "error", err)
175 }
176 cancel()
177 }()
178
179 // Start the pipeline
180 if err := pipeline.SetState(gst.StatePlaying); err != nil {
181 return nil, err
182 }
183
184 defer func() {
185 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
186 log.Error(ctx, "error setting pipeline state to null", "error", err)
187 }
188 }()
189
190 <-ctx.Done()
191
192 if videoMetadata == nil {
193 return nil, fmt.Errorf("no video metadata")
194 }
195 if audioMetadata == nil {
196 return nil, fmt.Errorf("no audio metadata")
197 }
198
199 videoMetadata.BFrames = hasBFrames
200
201 meta := &model.SegmentMediaData{
202 Video: []*model.SegmentMediadataVideo{videoMetadata},
203 Audio: []*model.SegmentMediadataAudio{audioMetadata},
204 }
205
206 ok, dur := pipeline.QueryDuration(gst.FormatTime)
207 if !ok {
208 return nil, fmt.Errorf("error getting duration")
209 } else {
210 meta.Duration = dur
211 }
212
213 return meta, nil
214}