Live video on the AT Protocol
at eli/docker-deployment-docs 238 lines 6.0 kB view raw
1package media 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "encoding/json" 8 "fmt" 9 "io" 10 "strconv" 11 "strings" 12 13 "github.com/go-gst/go-gst/gst" 14 "github.com/go-gst/go-gst/gst/app" 15 "go.opentelemetry.io/otel" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/model" 18) 19 20func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) { 21 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData") 22 defer span.End() 23 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData") 24 ctx, cancel := context.WithCancel(ctx) 25 defer cancel() 26 pipelineSlice := []string{ 27 "appsrc name=appsrc ! qtdemux name=demux", 28 "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! h2642json ! appsink sync=false name=jsonappsink", 29 "demux.audio_0 ! queue ! opusparse name=audioparse ! fakesink sync=false", 30 } 31 32 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 33 if err != nil { 34 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 35 } 36 37 var videoMetadata *model.SegmentMediadataVideo 38 var audioMetadata *model.SegmentMediadataAudio 39 40 appsrc, err := pipeline.GetElementByName("appsrc") 41 if err != nil { 42 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 43 } 44 45 src := app.SrcFromElement(appsrc) 46 src.SetCallbacks(&app.SourceCallbacks{ 47 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(mp4bs)), 48 }) 49 50 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 51 caps := pad.GetCurrentCaps() 52 if caps == nil { 53 log.Warn(ctx, "Unable to get pad caps") 54 cancel() 55 return 56 } 57 58 structure := caps.GetStructureAt(0) 59 if structure == nil { 60 log.Warn(ctx, "Unable to get structure from caps") 61 cancel() 62 return 63 } 64 65 name := structure.Name() 66 67 if name[:5] == "video" { 68 videoMetadata = &model.SegmentMediadataVideo{} 69 // Get some common video properties 70 widthVal, _ := structure.GetValue("width") 71 heightVal, _ := structure.GetValue("height") 72 73 width, ok := widthVal.(int) 74 if ok { 75 videoMetadata.Width = width 76 } 77 height, ok := heightVal.(int) 78 if ok { 79 videoMetadata.Height = height 80 } 81 framerateVal, _ := structure.GetValue("framerate") 82 framerateStr := fmt.Sprintf("%v", framerateVal) 83 parts := strings.Split(framerateStr, "/") 84 num := 0 85 den := 0 86 if len(parts) == 2 { 87 num, _ = strconv.Atoi(parts[0]) 88 den, _ = strconv.Atoi(parts[1]) 89 } 90 if num != 0 && den != 0 { 91 videoMetadata.FPSNum = num 92 videoMetadata.FPSDen = den 93 } 94 } 95 96 if name[:5] == "audio" { 97 audioMetadata = &model.SegmentMediadataAudio{} 98 // Get some common audio properties 99 rateVal, _ := structure.GetValue("rate") 100 channelsVal, _ := structure.GetValue("channels") 101 102 rate, ok := rateVal.(int) 103 if ok { 104 audioMetadata.Rate = rate 105 } 106 channels, ok := channelsVal.(int) 107 if ok { 108 audioMetadata.Channels = channels 109 } 110 } 111 112 // if videoMetadata != nil && audioMetadata != nil { 113 // cancel() 114 // } 115 } 116 117 demux, err := pipeline.GetElementByName("demux") 118 if err != nil { 119 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 120 } 121 _, err = demux.Connect("pad-added", onPadAdded) 122 if err != nil { 123 return nil, fmt.Errorf("error connecting pad-add: %w", err) 124 } 125 126 jsonSinkElem, err := pipeline.GetElementByName("jsonappsink") 127 if err != nil { 128 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 129 } 130 jsonSink := app.SinkFromElement(jsonSinkElem) 131 if jsonSink == nil { 132 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 133 } 134 135 hasBFrames := false 136 137 r, w := io.Pipe() 138 bufW := bufio.NewWriter(w) 139 decoder := json.NewDecoder(r) 140 141 decodeErr := make(chan error) 142 go func() { 143 for { 144 var obj map[string]any 145 err := decoder.Decode(&obj) 146 if err == io.EOF { 147 decodeErr <- nil 148 break // End of stream 149 } 150 if err != nil { 151 decodeErr <- err 152 break 153 } 154 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L836 155 header, ok := obj["slice header"].(map[string]any) 156 if !ok { 157 continue 158 } 159 // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L622 160 flag, ok := header["direct spatial mv pred flag"].(bool) 161 if ok && flag { 162 hasBFrames = true 163 } 164 } 165 close(decodeErr) 166 }() 167 168 jsonSink.SetCallbacks(&app.SinkCallbacks{ 169 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 170 sample := sink.PullSample() 171 if sample == nil { 172 return gst.FlowOK 173 } 174 175 buf := sample.GetBuffer().Bytes() 176 _, err := bufW.Write(buf) 177 if err != nil { 178 log.Error(ctx, "failed to write to buffer", "error", err) 179 return gst.FlowError 180 } 181 182 return gst.FlowOK 183 }, 184 }) 185 186 go func() { 187 if err := HandleBusMessages(ctx, pipeline); err != nil { 188 log.Log(ctx, "pipeline error", "error", err) 189 } 190 cancel() 191 }() 192 193 // Start the pipeline 194 if err := pipeline.SetState(gst.StatePlaying); err != nil { 195 return nil, err 196 } 197 198 defer func() { 199 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 200 log.Error(ctx, "error setting pipeline state to null", "error", err) 201 } 202 }() 203 204 <-ctx.Done() 205 206 err = w.Close() 207 if err != nil { 208 return nil, fmt.Errorf("error closing writer: %w", err) 209 } 210 211 err = <-decodeErr 212 if err != nil { 213 return nil, fmt.Errorf("error decoding JSON object: %w", err) 214 } 215 216 if videoMetadata == nil { 217 return nil, fmt.Errorf("no video metadata") 218 } 219 if audioMetadata == nil { 220 return nil, fmt.Errorf("no audio metadata") 221 } 222 223 videoMetadata.BFrames = hasBFrames 224 225 meta := &model.SegmentMediaData{ 226 Video: []*model.SegmentMediadataVideo{videoMetadata}, 227 Audio: []*model.SegmentMediadataAudio{audioMetadata}, 228 } 229 230 ok, dur := pipeline.QueryDuration(gst.FormatTime) 231 if !ok { 232 return nil, fmt.Errorf("error getting duration") 233 } else { 234 meta.Duration = dur 235 } 236 237 return meta, nil 238}