Live video on the AT Protocol
at next 311 lines 8.3 kB view raw
1package media 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "encoding/json" 8 "fmt" 9 "io" 10 "strconv" 11 "strings" 12 13 "github.com/go-gst/go-gst/gst" 14 "github.com/go-gst/go-gst/gst/app" 15 "go.opentelemetry.io/otel" 16 "stream.place/streamplace/pkg/localdb" 17 "stream.place/streamplace/pkg/log" 18) 19 20func padProbeEmpty(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { 21 return gst.PadProbeOK 22} 23 24func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*localdb.SegmentMediaData, error) { 25 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData") 26 defer span.End() 27 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData") 28 ctx, cancel := context.WithCancel(ctx) 29 defer cancel() 30 pipelineSlice := []string{ 31 "appsrc name=appsrc ! qtdemux name=demux", 32 "demux.video_0 ! queue ! tee name=videotee", 33 "videotee. ! queue ! h2642json ! appsink sync=false name=jsonappsink", 34 "videotee. ! queue ! appsink sync=false name=videoappsink", 35 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 36 } 37 38 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 39 if err != nil { 40 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 41 } 42 43 var videoMetadata *localdb.SegmentMediadataVideo 44 var audioMetadata *localdb.SegmentMediadataAudio 45 46 appsrc, err := pipeline.GetElementByName("appsrc") 47 if err != nil { 48 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 49 } 50 51 src := app.SrcFromElement(appsrc) 52 src.SetCallbacks(&app.SourceCallbacks{ 53 NeedDataFunc: ReaderNeedDataIncremental(ctx, bytes.NewReader(mp4bs)), 54 }) 55 56 foundSomeAudio := false 57 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 58 if err != nil { 59 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 60 } 61 audioSink := app.SinkFromElement(audioSinkElem) 62 if audioSink == nil { 63 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 64 } 65 audioSink.SetCallbacks(&app.SinkCallbacks{ 66 NewSampleFunc: ParseSegmentMediaDataSinkNewSampleFunc(ctx, &foundSomeAudio), 67 }) 68 69 foundSomeVideo := false 70 videoSinkElem, err := pipeline.GetElementByName("videoappsink") 71 if err != nil { 72 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 73 } 74 videoSink := app.SinkFromElement(videoSinkElem) 75 if videoSink == nil { 76 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 77 } 78 videoSink.SetCallbacks(&app.SinkCallbacks{ 79 NewSampleFunc: ParseSegmentMediaDataSinkNewSampleFunc(ctx, &foundSomeVideo), 80 }) 81 padsAdded := 0 82 83 var padProbe func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn 84 padProbe = func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 85 if info.GetEvent().Type() != gst.EventTypeEOS { 86 return gst.PadProbeOK 87 } 88 if padsAdded != 2 { 89 err := fmt.Errorf("expected 2 tracks in input, got %d", padsAdded) 90 pipeline.Error(err.Error(), err) 91 } 92 padProbe = padProbeEmpty 93 return gst.PadProbeRemove 94 } 95 96 outerPadProbe := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 97 return padProbe(pad, info) 98 } 99 100 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 101 padsAdded += 1 102 caps := pad.GetCurrentCaps() 103 if caps == nil { 104 log.Warn(ctx, "Unable to get pad caps") 105 cancel() 106 return 107 } 108 109 pad.AddProbe(gst.PadProbeTypeEventBoth, outerPadProbe) 110 111 structure := caps.GetStructureAt(0) 112 if structure == nil { 113 log.Warn(ctx, "Unable to get structure from caps") 114 cancel() 115 return 116 } 117 118 name := structure.Name() 119 120 if name[:5] == "video" { 121 videoMetadata = &localdb.SegmentMediadataVideo{} 122 // Get some common video properties 123 widthVal, _ := structure.GetValue("width") 124 heightVal, _ := structure.GetValue("height") 125 126 width, ok := widthVal.(int) 127 if ok { 128 videoMetadata.Width = width 129 } 130 height, ok := heightVal.(int) 131 if ok { 132 videoMetadata.Height = height 133 } 134 framerateVal, _ := structure.GetValue("framerate") 135 framerateStr := fmt.Sprintf("%v", framerateVal) 136 parts := strings.Split(framerateStr, "/") 137 num := 0 138 den := 0 139 if len(parts) == 2 { 140 num, _ = strconv.Atoi(parts[0]) 141 den, _ = strconv.Atoi(parts[1]) 142 } 143 if num != 0 && den != 0 { 144 videoMetadata.FPSNum = num 145 videoMetadata.FPSDen = den 146 } 147 } 148 149 if name[:5] == "audio" { 150 audioMetadata = &localdb.SegmentMediadataAudio{} 151 // Get some common audio properties 152 rateVal, _ := structure.GetValue("rate") 153 channelsVal, _ := structure.GetValue("channels") 154 155 rate, ok := rateVal.(int) 156 if ok { 157 audioMetadata.Rate = rate 158 } 159 channels, ok := channelsVal.(int) 160 if ok { 161 audioMetadata.Channels = channels 162 } 163 } 164 165 // if videoMetadata != nil && audioMetadata != nil { 166 // cancel() 167 // } 168 } 169 170 demux, err := pipeline.GetElementByName("demux") 171 if err != nil { 172 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 173 } 174 _, err = demux.Connect("pad-added", onPadAdded) 175 if err != nil { 176 return nil, fmt.Errorf("error connecting pad-add: %w", err) 177 } 178 179 jsonSinkElem, err := pipeline.GetElementByName("jsonappsink") 180 if err != nil { 181 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 182 } 183 jsonSink := app.SinkFromElement(jsonSinkElem) 184 if jsonSink == nil { 185 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 186 } 187 188 hasBFrames := false 189 190 r, w := io.Pipe() 191 bufW := bufio.NewWriter(w) 192 decoder := json.NewDecoder(r) 193 194 decodeErr := make(chan error) 195 go func() { 196 for { 197 var obj map[string]any 198 err := decoder.Decode(&obj) 199 if err == io.EOF { 200 decodeErr <- nil 201 break // End of stream 202 } 203 if err != nil { 204 decodeErr <- err 205 break 206 } 207 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L836 208 header, ok := obj["slice header"].(map[string]any) 209 if !ok { 210 continue 211 } 212 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L622 213 flag, ok := header["direct spatial mv pred flag"].(bool) 214 if ok && flag { 215 hasBFrames = true 216 } 217 } 218 close(decodeErr) 219 }() 220 221 jsonSink.SetCallbacks(&app.SinkCallbacks{ 222 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 223 sample := sink.PullSample() 224 if sample == nil { 225 return gst.FlowOK 226 } 227 228 buf := sample.GetBuffer().Bytes() 229 _, err := bufW.Write(buf) 230 if err != nil { 231 log.Error(ctx, "failed to write to buffer", "error", err) 232 return gst.FlowError 233 } 234 235 return gst.FlowOK 236 }, 237 }) 238 239 go func() { 240 if err := HandleBusMessages(ctx, pipeline); err != nil { 241 log.Log(ctx, "pipeline error", "error", err) 242 } 243 cancel() 244 }() 245 246 // Start the pipeline 247 if err := pipeline.SetState(gst.StatePlaying); err != nil { 248 return nil, err 249 } 250 251 defer func() { 252 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 253 log.Error(ctx, "error setting pipeline state to null", "error", err) 254 } 255 }() 256 257 <-ctx.Done() 258 259 err = w.Close() 260 if err != nil { 261 return nil, fmt.Errorf("error closing writer: %w", err) 262 } 263 264 err = <-decodeErr 265 if err != nil { 266 return nil, fmt.Errorf("error decoding JSON object: %w", err) 267 } 268 269 if videoMetadata == nil || !foundSomeVideo { 270 return nil, fmt.Errorf("no video in segment") 271 } 272 if audioMetadata == nil || !foundSomeAudio { 273 return nil, fmt.Errorf("no audio in segment") 274 } 275 276 videoMetadata.BFrames = hasBFrames 277 278 meta := &localdb.SegmentMediaData{ 279 Video: []*localdb.SegmentMediadataVideo{videoMetadata}, 280 Audio: []*localdb.SegmentMediadataAudio{audioMetadata}, 281 } 282 283 ok, dur := pipeline.QueryDuration(gst.FormatTime) 284 if !ok { 285 return nil, fmt.Errorf("error getting duration") 286 } else { 287 meta.Duration = dur 288 } 289 290 return meta, nil 291} 292 293func ParseSegmentMediaDataSinkNewSampleFunc(ctx context.Context, foundThisTrack *bool) func(sink *app.Sink) gst.FlowReturn { 294 return func(sink *app.Sink) gst.FlowReturn { 295 sample := sink.PullSample() 296 if sample == nil { 297 return gst.FlowOK 298 } 299 buf := sample.GetBuffer() 300 if buf == nil { 301 return gst.FlowError 302 } 303 dur := buf.Duration().AsDuration() 304 if dur != nil && *dur > 0 { 305 *foundThisTrack = true 306 } else { 307 log.Warn(ctx, "no duration found for track", "track", sink.GetName()) 308 } 309 return gst.FlowOK 310 } 311}