Live video on the AT Protocol

Merge pull request #597 from streamplace/eli/better-bframe-detection

media: fix bframe detection

authored by

Eli Mallon and committed by
GitHub
bfda565f e948662a

+124 -50
+3 -2
Makefile
··· 180 180 -D "gst-plugins-ugly:gpl=enabled" \ 181 181 -D "x264:asm=enabled" \ 182 182 -D "gstreamer-full:gst-full=enabled" \ 183 - -D "gstreamer-full:gst-full-plugins=libgstopusparse.a;libgstcodectimestamper.a;libgstrtp.a;libgstaudioresample.a;libgstlibav.a;libgstmatroska.a;libgstmultifile.a;libgstjpeg.a;libgstaudiotestsrc.a;libgstaudioconvert.a;libgstaudioparsers.a;libgstfdkaac.a;libgstisomp4.a;libgstapp.a;libgstvideoconvertscale.a;libgstvideobox.a;libgstvideorate.a;libgstpng.a;libgstcompositor.a;libgstaudiorate.a;libgstx264.a;libgstopus.a;libgstvideotestsrc.a;libgstvideoparsersbad.a;libgstaudioparsers.a;libgstmpegtsmux.a;libgstmpegtsdemux.a;libgstplayback.a;libgsttypefindfunctions.a;libgstcoretracers.a" \ 183 + -D "gstreamer-full:gst-full-plugins=libgstopusparse.a;libgstcodectimestamper.a;libgstrtp.a;libgstaudioresample.a;libgstlibav.a;libgstmatroska.a;libgstmultifile.a;libgstjpeg.a;libgstaudiotestsrc.a;libgstaudioconvert.a;libgstaudioparsers.a;libgstfdkaac.a;libgstisomp4.a;libgstapp.a;libgstvideoconvertscale.a;libgstvideobox.a;libgstvideorate.a;libgstpng.a;libgstcompositor.a;libgstaudiorate.a;libgstx264.a;libgstopus.a;libgstvideotestsrc.a;libgstvideoparsersbad.a;libgstaudioparsers.a;libgstmpegtsmux.a;libgstmpegtsdemux.a;libgstplayback.a;libgsttypefindfunctions.a;libgstcoretracers.a;libgstcodec2json.a" \ 184 184 -D "gstreamer-full:gst-full-libraries=gstreamer-controller-1.0,gstreamer-plugins-base-1.0,gstreamer-pbutils-1.0" \ 185 185 -D "gstreamer-full:gst-full-elements=coreelements:concat,filesrc,filesink,queue,queue2,multiqueue,typefind,tee,capsfilter,fakesink,identity" \ 186 186 -D "gstreamer-full:bad=enabled" \ ··· 199 199 -D "glib:glib_assert=false" \ 200 200 -D "glib:glib_assert=false" \ 201 201 -D "gst-libav:glib_assert=false" \ 202 - -D "gst-plugins-good:adaptivedemux2=disabled" 202 + -D "gst-plugins-good:adaptivedemux2=disabled" \ 203 + -D "gst-plugins-bad:codec2json=enabled" 203 204 204 205 STATIC_OPTS = \ 205 206 $(BASE_OPTS) \
+6
pkg/config/config.go
··· 117 117 PLCURL string 118 118 SQLLogging bool 119 119 SentryDSN string 120 + LivepeerDebug bool 120 121 } 121 122 122 123 func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { ··· 182 183 fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory") 183 184 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging") 184 185 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting") 186 + fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug") 185 187 186 188 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 187 189 _ = starter.NewLivepeerConfig(lpFlags) ··· 271 273 } 272 274 if cli.LivepeerGateway { 273 275 gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"}) 276 + err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0") 277 + if err != nil { 278 + return err 279 + } 274 280 err = fs.Set("livepeer.data-dir", gatewayPath) 275 281 if err != nil { 276 282 return err
+1 -1
pkg/director/stream_session.go
··· 381 381 382 382 if ss.lp == nil { 383 383 var err error 384 - ss.lp, err = livepeer.NewLivepeerSession(ctx, spseg.Creator, ss.cli.LivepeerGatewayURL) 384 + ss.lp, err = livepeer.NewLivepeerSession(ctx, ss.cli, spseg.Creator, ss.cli.LivepeerGatewayURL) 385 385 if err != nil { 386 386 return err 387 387 }
+40 -3
pkg/livepeer/livepeer.go
··· 10 10 "mime" 11 11 "mime/multipart" 12 12 "net/http" 13 + "os" 13 14 "strings" 14 15 "time" 15 16 16 17 "golang.org/x/net/context/ctxhttp" 17 18 "stream.place/streamplace/pkg/aqhttp" 19 + "stream.place/streamplace/pkg/config" 18 20 "stream.place/streamplace/pkg/log" 19 21 "stream.place/streamplace/pkg/media" 20 22 "stream.place/streamplace/pkg/renditions" ··· 29 31 Count int 30 32 GatewayURL string 31 33 Guard chan struct{} 34 + CLI *config.CLI 32 35 } 33 36 34 37 // borrowed from catalyst-api ··· 42 45 return string(res) 43 46 } 44 47 45 - func NewLivepeerSession(ctx context.Context, did string, gatewayURL string) (*LivepeerSession, error) { 46 - sessionID := RandomTrailer(8) 48 + func NewLivepeerSession(ctx context.Context, cli *config.CLI, did string, gatewayURL string) (*LivepeerSession, error) { 49 + sessionID := fmt.Sprintf("%s-%s", did, RandomTrailer(8)) 50 + sessionID = strings.ReplaceAll(sessionID, ":", "") 51 + sessionID = strings.ReplaceAll(sessionID, ".", "") 47 52 return &LivepeerSession{ 48 - SessionID: strings.ReplaceAll(fmt.Sprintf("%s-%s", did, sessionID), ":", ""), 53 + SessionID: sessionID, 49 54 Count: 0, 50 55 GatewayURL: gatewayURL, 51 56 Guard: make(chan struct{}, SegmentsInFlight), 57 + CLI: cli, 52 58 }, nil 53 59 } 54 60 ··· 107 113 req.Header.Set("Content-Resolution", fmt.Sprintf("%dx%d", width, height)) 108 114 req.Header.Set("Livepeer-Transcode-Configuration", string(bs)) 109 115 116 + if ls.CLI.LivepeerDebug { 117 + debugDir := ls.CLI.DataFilePath([]string{"livepeer-debug"}) 118 + err = os.MkdirAll(debugDir, 0755) 119 + if err != nil { 120 + return nil, fmt.Errorf("failed to create debug directory: %w", err) 121 + } 122 + debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-input.ts", ls.CLI.DataDir, sessionIDRen, seqNo) 123 + err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644) 124 + if err != nil { 125 + return nil, fmt.Errorf("failed to write debug file: %w", err) 126 + } 127 + bs, err := json.MarshalIndent(req.Header, "", " ") 128 + if err != nil { 129 + return nil, fmt.Errorf("failed to marshal livepeer profile: %w", err) 130 + } 131 + configFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-config.json", ls.CLI.DataDir, sessionIDRen, seqNo) 132 + err = os.WriteFile(configFile, bs, 0644) 133 + if err != nil { 134 + return nil, fmt.Errorf("failed to write debug file: %w", err) 135 + } 136 + log.Log(ctx, "wrote debug file", "file", debugFile) 137 + } 138 + 110 139 resp, err := ctxhttp.Do(ctx, &aqhttp.Client, req) 111 140 if err != nil { 112 141 <-ls.Guard ··· 139 168 } 140 169 mp4Bs := bytes.Buffer{} 141 170 audioReader := bytes.NewReader(audioSeg.Bytes()) 171 + if ls.CLI.LivepeerDebug { 172 + debugFile := fmt.Sprintf("%s/livepeer-debug/%s-%06d-output-%s", ls.CLI.DataDir, sessionIDRen, seqNo, p.FileName()) 173 + err = os.WriteFile(debugFile, tsSeg.Bytes(), 0644) 174 + if err != nil { 175 + return nil, fmt.Errorf("failed to write debug file: %w", err) 176 + } 177 + log.Log(ctx, "wrote debug file", "file", debugFile) 178 + } 142 179 err = media.MPEGTSVideoMP4AudioToMP4(ctx, p, audioReader, &mp4Bs) 143 180 if err != nil { 144 181 return nil, fmt.Errorf("failed to convert ts to mp4: %w", err)
+56 -32
pkg/media/media_data_parser.go
··· 1 1 package media 2 2 3 3 import ( 4 + "bufio" 4 5 "bytes" 5 6 "context" 7 + "encoding/json" 6 8 "fmt" 9 + "io" 7 10 "strconv" 8 11 "strings" 9 12 ··· 22 25 defer cancel() 23 26 pipelineSlice := []string{ 24 27 "appsrc name=appsrc ! qtdemux name=demux", 25 - "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! h264timestamper ! appsink sync=false name=videoappsink", 26 - "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 28 + "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! h2642json ! appsink sync=false name=jsonappsink", 29 + "demux.audio_0 ! queue ! opusparse name=audioparse ! fakesink sync=false", 27 30 } 28 31 29 32 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) ··· 120 123 return nil, fmt.Errorf("error connecting pad-add: %w", err) 121 124 } 122 125 123 - audioSinkElem, err := pipeline.GetElementByName("audioappsink") 124 - if err != nil { 125 - return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 126 - } 127 - audioSink := app.SinkFromElement(audioSinkElem) 128 - if audioSink == nil { 129 - return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 130 - } 131 - 132 - audioSink.SetCallbacks(&app.SinkCallbacks{ 133 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 134 - sample := sink.PullSample() 135 - if sample == nil { 136 - return gst.FlowOK 137 - } 138 - 139 - return gst.FlowOK 140 - }, 141 - }) 142 - 143 - videoSinkElem, err := pipeline.GetElementByName("videoappsink") 126 + jsonSinkElem, err := pipeline.GetElementByName("jsonappsink") 144 127 if err != nil { 145 128 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 146 129 } 147 - videoSink := app.SinkFromElement(videoSinkElem) 148 - if videoSink == nil { 130 + jsonSink := app.SinkFromElement(jsonSinkElem) 131 + if jsonSink == nil { 149 132 return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 150 133 } 151 134 152 135 hasBFrames := false 153 - videoSink.SetCallbacks(&app.SinkCallbacks{ 136 + 137 + r, w := io.Pipe() 138 + bufW := bufio.NewWriter(w) 139 + decoder := json.NewDecoder(r) 140 + 141 + decodeErr := make(chan error) 142 + go func() { 143 + for { 144 + var obj map[string]any 145 + err := decoder.Decode(&obj) 146 + if err == io.EOF { 147 + decodeErr <- nil 148 + break // End of stream 149 + } 150 + if err != nil { 151 + decodeErr <- err 152 + break 153 + } 154 + // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L836 155 + header, ok := obj["slice header"].(map[string]any) 156 + if !ok { 157 + continue 158 + } 159 + // https://github.com/GStreamer/gstreamer/blob/68fa54c7616b93d5b7cc5febaa388546fcd617e0/subprojects/gst-plugins-bad/ext/codec2json/gsth2642json.c#L622 160 + flag, ok := header["direct spatial mv pred flag"].(bool) 161 + if ok && flag { 162 + hasBFrames = true 163 + } 164 + } 165 + close(decodeErr) 166 + }() 167 + 168 + jsonSink.SetCallbacks(&app.SinkCallbacks{ 154 169 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 155 170 sample := sink.PullSample() 156 171 if sample == nil { 157 172 return gst.FlowOK 158 173 } 159 174 160 - buf := sample.GetBuffer() 161 - pts := buf.PresentationTimestamp().String() 162 - dts := buf.DecodingTimestamp().String() 163 - 164 - if pts != dts { 165 - hasBFrames = true 175 + buf := sample.GetBuffer().Bytes() 176 + _, err := bufW.Write(buf) 177 + if err != nil { 178 + log.Error(ctx, "failed to write to buffer", "error", err) 179 + return gst.FlowError 166 180 } 167 181 168 182 return gst.FlowOK ··· 188 202 }() 189 203 190 204 <-ctx.Done() 205 + 206 + err = w.Close() 207 + if err != nil { 208 + return nil, fmt.Errorf("error closing writer: %w", err) 209 + } 210 + 211 + err = <-decodeErr 212 + if err != nil { 213 + return nil, fmt.Errorf("error decoding JSON object: %w", err) 214 + } 191 215 192 216 if videoMetadata == nil { 193 217 return nil, fmt.Errorf("no video metadata")
+18 -12
pkg/media/media_data_parser_test.go
··· 12 12 ) 13 13 14 14 func TestMediaDataParser(t *testing.T) { 15 + segmentsWithoutBFrames := []string{ 16 + remote.RemoteFixture("d63d26050db9a60c0944b4c2e2b1d052c4350a2a8a877324c7b0b7e7a0c1ae27/bframe-false-positive.mp4"), 17 + getFixture("sample-segment.mp4"), 18 + } 15 19 withNoGSTLeaks(t, func() { 16 - // Open input file 17 - inputFile, err := os.Open(getFixture("sample-segment.mp4")) 18 - require.NoError(t, err) 19 - defer inputFile.Close() 20 - bs, err := io.ReadAll(inputFile) 21 - require.NoError(t, err) 20 + for _, segment := range segmentsWithoutBFrames { 21 + // Open input file 22 + inputFile, err := os.Open(segment) 23 + require.NoError(t, err) 24 + defer inputFile.Close() 25 + bs, err := io.ReadAll(inputFile) 26 + require.NoError(t, err) 22 27 23 - ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"GStreamerFunc": {"ParseSegmentMediaData": 9}}) 24 - mediaData, err := ParseSegmentMediaData(ctx, bs) 25 - require.NoError(t, err) 26 - require.NotNil(t, mediaData) 27 - require.False(t, mediaData.Video[0].BFrames, "Video should not have BFrames") 28 - require.Greater(t, mediaData.Duration, int64(0), "Video duration should not be empty") 28 + ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"GStreamerFunc": {"ParseSegmentMediaData": 9}}) 29 + mediaData, err := ParseSegmentMediaData(ctx, bs) 30 + require.NoError(t, err) 31 + require.NotNil(t, mediaData) 32 + require.False(t, mediaData.Video[0].BFrames, "Video should not have BFrames") 33 + require.Greater(t, mediaData.Duration, int64(0), "Video duration should not be empty") 34 + } 29 35 }) 30 36 } 31 37