Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/finally-fix-reloading 172 lines 4.2 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "testing" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "github.com/stretchr/testify/require" 14 "golang.org/x/sync/errgroup" 15 "stream.place/streamplace/pkg/bus" 16 "stream.place/streamplace/pkg/log" 17) 18 19func TestConcatDemuxBin(t *testing.T) { 20 withNoGSTLeaks(t, func() { 21 g, _ := errgroup.WithContext(context.Background()) 22 for range streamplaceTestCount { 23 g.Go(func() error { 24 return innerTestConcatDemuxBin(t) 25 }) 26 } 27 err := g.Wait() 28 require.NoError(t, err) 29 }) 30} 31 32// This function remains in scope for the duration of a single users' playback 33func innerTestConcatDemuxBin(t *testing.T) error { 34 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}}) 35 ctx = log.WithLogValues(ctx, "func", "TestConcat2") 36 37 pipeline, err := gst.NewPipeline("TestConcat2") 38 if err != nil { 39 return fmt.Errorf("failed to create pipeline: %w", err) 40 } 41 42 ctx, cancel := context.WithCancel(ctx) 43 44 errCh := make(chan error) 45 go func() { 46 err := HandleBusMessages(ctx, pipeline) 47 cancel() 48 errCh <- err 49 close(errCh) 50 }() 51 52 defer func() { 53 cancel() 54 err := <-errCh 55 if err != nil { 56 t.Errorf("bus handler error: %v", err) 57 } 58 err = pipeline.BlockSetState(gst.StateNull) 59 if err != nil { 60 t.Errorf("failed to set pipeline to null state: %v", err) 61 } 62 }() 63 64 filename := getFixture("sample-segment.mp4") 65 inputFile, err := os.Open(filename) 66 if err != nil { 67 return fmt.Errorf("failed to open fixture file: %w", err) 68 } 69 defer inputFile.Close() 70 71 bs, err := io.ReadAll(inputFile) 72 if err != nil { 73 return fmt.Errorf("failed to read fixture file: %w", err) 74 } 75 76 testSeg := &bus.Seg{ 77 Data: bs, 78 Filepath: filename, 79 } 80 81 concatBin, err := ConcatDemuxBin(ctx, testSeg) 82 if err != nil { 83 return fmt.Errorf("failed to create concat bin: %w", err) 84 } 85 86 err = pipeline.Add(concatBin.Element) 87 if err != nil { 88 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 89 } 90 91 videoPad := concatBin.GetStaticPad("video_0") 92 if videoPad == nil { 93 return fmt.Errorf("video pad not found") 94 } 95 96 audioPad := concatBin.GetStaticPad("audio_0") 97 if audioPad == nil { 98 return fmt.Errorf("audio pad not found") 99 } 100 101 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 102 "name": "videoappsink", 103 "sync": false, 104 }) 105 if err != nil { 106 return fmt.Errorf("failed to create video appsink: %w", err) 107 } 108 109 err = pipeline.Add(videoAppSink) 110 if err != nil { 111 return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 112 } 113 114 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 115 if videoAppSinkPadSink == nil { 116 return fmt.Errorf("video appsink pad not found") 117 } 118 119 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 120 "name": "audioappsink", 121 "sync": false, 122 }) 123 if err != nil { 124 return fmt.Errorf("failed to create audio appsink: %w", err) 125 } 126 127 err = pipeline.Add(audioAppSink) 128 if err != nil { 129 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 130 } 131 132 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 133 if audioAppSinkPadSink == nil { 134 return fmt.Errorf("audio appsink pad not found") 135 } 136 137 ok := videoPad.Link(videoAppSinkPadSink) 138 if ok != gst.PadLinkOK { 139 return fmt.Errorf("failed to link video pad: %v", ok) 140 } 141 142 ok = audioPad.Link(audioAppSinkPadSink) 143 if ok != gst.PadLinkOK { 144 return fmt.Errorf("failed to link audio pad: %v", ok) 145 } 146 147 videoBuf := bytes.Buffer{} 148 audioBuf := bytes.Buffer{} 149 150 videoappsink := app.SinkFromElement(videoAppSink) 151 videoappsink.SetCallbacks(&app.SinkCallbacks{ 152 NewSampleFunc: WriterNewSample(ctx, &videoBuf), 153 }) 154 155 audioappsink := app.SinkFromElement(audioAppSink) 156 audioappsink.SetCallbacks(&app.SinkCallbacks{ 157 NewSampleFunc: WriterNewSample(ctx, &audioBuf), 158 }) 159 160 // Start the pipeline 161 err = pipeline.SetState(gst.StatePlaying) 162 if err != nil { 163 return fmt.Errorf("failed to set pipeline to playing state: %w", err) 164 } 165 166 <-ctx.Done() 167 168 require.Equal(t, 987248, videoBuf.Len()) 169 require.Equal(t, 6440, audioBuf.Len()) 170 171 return <-errCh 172}