Live video on the AT Protocol
at eli/postgres 214 lines 5.4 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strconv" 8 "strings" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" 12 "go.opentelemetry.io/otel" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/model" 15) 16 17func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) { 18 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData") 19 defer span.End() 20 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData") 21 ctx, cancel := context.WithCancel(ctx) 22 defer cancel() 23 pipelineSlice := []string{ 24 "appsrc name=appsrc ! qtdemux name=demux", 25 "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! h264timestamper ! appsink sync=false name=videoappsink", 26 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 27 } 28 29 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 30 if err != nil { 31 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 32 } 33 34 var videoMetadata *model.SegmentMediadataVideo 35 var audioMetadata *model.SegmentMediadataAudio 36 37 appsrc, err := pipeline.GetElementByName("appsrc") 38 if err != nil { 39 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 40 } 41 42 src := app.SrcFromElement(appsrc) 43 src.SetCallbacks(&app.SourceCallbacks{ 44 NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(mp4bs)), 45 }) 46 47 onPadAdded := func(element *gst.Element, pad *gst.Pad) { 48 caps := pad.GetCurrentCaps() 49 if caps == nil { 50 log.Warn(ctx, "Unable to get pad caps") 51 cancel() 52 return 53 } 54 55 structure := caps.GetStructureAt(0) 56 if structure == nil { 57 log.Warn(ctx, "Unable to get structure from caps") 58 cancel() 59 return 60 } 61 62 name := structure.Name() 63 64 if name[:5] == "video" { 65 videoMetadata = &model.SegmentMediadataVideo{} 66 // Get some common video properties 67 widthVal, _ := structure.GetValue("width") 68 heightVal, _ := structure.GetValue("height") 69 70 width, ok := widthVal.(int) 71 if ok { 72 videoMetadata.Width = width 73 } 74 height, ok := heightVal.(int) 75 if ok { 76 videoMetadata.Height = height 77 } 78 framerateVal, _ := structure.GetValue("framerate") 79 framerateStr := fmt.Sprintf("%v", framerateVal) 80 parts := strings.Split(framerateStr, "/") 81 num := 0 82 den := 0 83 if len(parts) == 2 { 84 num, _ = strconv.Atoi(parts[0]) 85 den, _ = strconv.Atoi(parts[1]) 86 } 87 if num != 0 && den != 0 { 88 videoMetadata.FPSNum = num 89 videoMetadata.FPSDen = den 90 } 91 } 92 93 if name[:5] == "audio" { 94 audioMetadata = &model.SegmentMediadataAudio{} 95 // Get some common audio properties 96 rateVal, _ := structure.GetValue("rate") 97 channelsVal, _ := structure.GetValue("channels") 98 99 rate, ok := rateVal.(int) 100 if ok { 101 audioMetadata.Rate = rate 102 } 103 channels, ok := channelsVal.(int) 104 if ok { 105 audioMetadata.Channels = channels 106 } 107 } 108 109 // if videoMetadata != nil && audioMetadata != nil { 110 // cancel() 111 // } 112 } 113 114 demux, err := pipeline.GetElementByName("demux") 115 if err != nil { 116 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 117 } 118 _, err = demux.Connect("pad-added", onPadAdded) 119 if err != nil { 120 return nil, fmt.Errorf("error connecting pad-add: %w", err) 121 } 122 123 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 124 if err != nil { 125 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 126 } 127 audioSink := app.SinkFromElement(audioSinkElem) 128 if audioSink == nil { 129 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 130 } 131 132 audioSink.SetCallbacks(&app.SinkCallbacks{ 133 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 134 sample := sink.PullSample() 135 if sample == nil { 136 return gst.FlowOK 137 } 138 139 return gst.FlowOK 140 }, 141 }) 142 143 videoSinkElem, err := pipeline.GetElementByName("videoappsink") 144 if err != nil { 145 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 146 } 147 videoSink := app.SinkFromElement(videoSinkElem) 148 if videoSink == nil { 149 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 150 } 151 152 hasBFrames := false 153 videoSink.SetCallbacks(&app.SinkCallbacks{ 154 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 155 sample := sink.PullSample() 156 if sample == nil { 157 return gst.FlowOK 158 } 159 160 buf := sample.GetBuffer() 161 pts := buf.PresentationTimestamp().String() 162 dts := buf.DecodingTimestamp().String() 163 164 if pts != dts { 165 hasBFrames = true 166 } 167 168 return gst.FlowOK 169 }, 170 }) 171 172 go func() { 173 if err := HandleBusMessages(ctx, pipeline); err != nil { 174 log.Log(ctx, "pipeline error", "error", err) 175 } 176 cancel() 177 }() 178 179 // Start the pipeline 180 if err := pipeline.SetState(gst.StatePlaying); err != nil { 181 return nil, err 182 } 183 184 defer func() { 185 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 186 log.Error(ctx, "error setting pipeline state to null", "error", err) 187 } 188 }() 189 190 <-ctx.Done() 191 192 if videoMetadata == nil { 193 return nil, fmt.Errorf("no video metadata") 194 } 195 if audioMetadata == nil { 196 return nil, fmt.Errorf("no audio metadata") 197 } 198 199 videoMetadata.BFrames = hasBFrames 200 201 meta := &model.SegmentMediaData{ 202 Video: []*model.SegmentMediadataVideo{videoMetadata}, 203 Audio: []*model.SegmentMediadataAudio{audioMetadata}, 204 } 205 206 ok, dur := pipeline.QueryDuration(gst.FormatTime) 207 if !ok { 208 return nil, fmt.Errorf("error getting duration") 209 } else { 210 meta.Duration = dur 211 } 212 213 return meta, nil 214}