Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at issue-784 165 lines 4.0 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "testing" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "github.com/stretchr/testify/require" 14 "golang.org/x/sync/errgroup" 15 "stream.place/streamplace/pkg/bus" 16 "stream.place/streamplace/pkg/log" 17) 18 19func TestConcatDemuxBin(t *testing.T) { 20 withNoGSTLeaks(t, func() { 21 g, _ := errgroup.WithContext(context.Background()) 22 for range streamplaceTestCount { 23 g.Go(func() error { 24 return innerTestConcatDemuxBin(t) 25 }) 26 } 27 err := g.Wait() 28 require.NoError(t, err) 29 }) 30} 31 32func innerTestConcatDemuxBin(t *testing.T) error { 33 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}}) 34 ctx = log.WithLogValues(ctx, "func", "TestConcat2") 35 36 pipeline, err := gst.NewPipeline("TestConcat2") 37 if err != nil { 38 return fmt.Errorf("failed to create pipeline: %w", err) 39 } 40 41 ctx, cancel := context.WithCancel(ctx) 42 defer cancel() 43 44 errCh := make(chan error) 45 go func() { 46 err := HandleBusMessages(ctx, pipeline) 47 errCh <- err 48 }() 49 50 filename := getFixture("sample-segment.mp4") 51 inputFile, err := os.Open(filename) 52 if err != nil { 53 return fmt.Errorf("failed to open fixture file: %w", err) 54 } 55 defer inputFile.Close() 56 57 bs, err := io.ReadAll(inputFile) 58 if err != nil { 59 return fmt.Errorf("failed to read fixture file: %w", err) 60 } 61 62 testSeg := &bus.Seg{ 63 Data: bs, 64 Filepath: filename, 65 } 66 67 concatBin, err := ConcatDemuxBin(ctx, testSeg, true) 68 if err != nil { 69 return fmt.Errorf("failed to create concat bin: %w", err) 70 } 71 72 err = pipeline.Add(concatBin.Element) 73 if err != nil { 74 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 75 } 76 77 videoPad := concatBin.GetStaticPad("video_0") 78 if videoPad == nil { 79 return fmt.Errorf("video pad not found") 80 } 81 82 audioPad := concatBin.GetStaticPad("audio_0") 83 if audioPad == nil { 84 return fmt.Errorf("audio pad not found") 85 } 86 87 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 88 "name": "videoappsink", 89 "sync": false, 90 }) 91 if err != nil { 92 return fmt.Errorf("failed to create video appsink: %w", err) 93 } 94 95 err = pipeline.Add(videoAppSink) 96 if err != nil { 97 return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 98 } 99 100 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 101 if videoAppSinkPadSink == nil { 102 return fmt.Errorf("video appsink pad not found") 103 } 104 105 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 106 "name": "audioappsink", 107 "sync": false, 108 }) 109 if err != nil { 110 return fmt.Errorf("failed to create audio appsink: %w", err) 111 } 112 113 err = pipeline.Add(audioAppSink) 114 if err != nil { 115 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 116 } 117 118 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 119 if audioAppSinkPadSink == nil { 120 return fmt.Errorf("audio appsink pad not found") 121 } 122 123 ok := videoPad.Link(videoAppSinkPadSink) 124 if ok != gst.PadLinkOK { 125 return fmt.Errorf("failed to link video pad: %v", ok) 126 } 127 128 ok = audioPad.Link(audioAppSinkPadSink) 129 if ok != gst.PadLinkOK { 130 return fmt.Errorf("failed to link audio pad: %v", ok) 131 } 132 133 videoBuf := bytes.Buffer{} 134 audioBuf := bytes.Buffer{} 135 136 videoappsink := app.SinkFromElement(videoAppSink) 137 videoappsink.SetCallbacks(&app.SinkCallbacks{ 138 NewSampleFunc: WriterNewSample(ctx, &videoBuf), 139 }) 140 141 audioappsink := app.SinkFromElement(audioAppSink) 142 audioappsink.SetCallbacks(&app.SinkCallbacks{ 143 NewSampleFunc: WriterNewSample(ctx, &audioBuf), 144 }) 145 146 // Start the pipeline 147 err = pipeline.SetState(gst.StatePlaying) 148 if err != nil { 149 return fmt.Errorf("failed to set pipeline to playing state: %w", err) 150 } 151 152 defer func() { 153 if err != nil { 154 t.Errorf("bus handler error: %v", err) 155 } 156 err = pipeline.BlockSetState(gst.StateNull) 157 if err != nil { 158 t.Errorf("failed to set pipeline to null state: %v", err) 159 } 160 require.Equal(t, 987291, videoBuf.Len()) 161 require.Equal(t, 6440, audioBuf.Len()) 162 }() 163 164 return <-errCh 165}