Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/components 178 lines 4.4 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "testing" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "github.com/stretchr/testify/require" 14 "go.uber.org/goleak" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/gstinit" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/media/segchanman" 19) 20 21func TestConcatDemuxBin(t *testing.T) { 22 gstinit.InitGST() 23 before := getLeakCount(t) 24 defer checkGStreamerLeaks(t, before) 25 ignore := goleak.IgnoreCurrent() 26 defer goleak.VerifyNone(t, ignore) 27 28 g, _ := errgroup.WithContext(context.Background()) 29 for range streamplaceTestCount { 30 g.Go(func() error { 31 return innerTestConcatDemuxBin(t) 32 }) 33 } 34 err := g.Wait() 35 require.NoError(t, err) 36} 37 38// This function remains in scope for the duration of a single users' playback 39func innerTestConcatDemuxBin(t *testing.T) error { 40 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}}) 41 ctx = log.WithLogValues(ctx, "func", "TestConcat2") 42 43 pipeline, err := gst.NewPipeline("TestConcat2") 44 if err != nil { 45 return fmt.Errorf("failed to create pipeline: %w", err) 46 } 47 48 ctx, cancel := context.WithCancel(ctx) 49 50 errCh := make(chan error) 51 go func() { 52 err := HandleBusMessages(ctx, pipeline) 53 cancel() 54 errCh <- err 55 close(errCh) 56 }() 57 58 defer func() { 59 cancel() 60 err := <-errCh 61 if err != nil { 62 t.Errorf("bus handler error: %v", err) 63 } 64 err = pipeline.BlockSetState(gst.StateNull) 65 if err != nil { 66 t.Errorf("failed to set pipeline to null state: %v", err) 67 } 68 }() 69 70 filename := getFixture("sample-segment.mp4") 71 inputFile, err := os.Open(filename) 72 if err != nil { 73 return fmt.Errorf("failed to open fixture file: %w", err) 74 } 75 defer inputFile.Close() 76 77 bs, err := io.ReadAll(inputFile) 78 if err != nil { 79 return fmt.Errorf("failed to read fixture file: %w", err) 80 } 81 82 testSeg := &segchanman.Seg{ 83 Data: bs, 84 Filepath: filename, 85 } 86 87 concatBin, err := ConcatDemuxBin(ctx, testSeg) 88 if err != nil { 89 return fmt.Errorf("failed to create concat bin: %w", err) 90 } 91 92 err = pipeline.Add(concatBin.Element) 93 if err != nil { 94 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 95 } 96 97 videoPad := concatBin.GetStaticPad("video_0") 98 if videoPad == nil { 99 return fmt.Errorf("video pad not found") 100 } 101 102 audioPad := concatBin.GetStaticPad("audio_0") 103 if audioPad == nil { 104 return fmt.Errorf("audio pad not found") 105 } 106 107 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 108 "name": "videoappsink", 109 "sync": false, 110 }) 111 if err != nil { 112 return fmt.Errorf("failed to create video appsink: %w", err) 113 } 114 115 err = pipeline.Add(videoAppSink) 116 if err != nil { 117 return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 118 } 119 120 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 121 if videoAppSinkPadSink == nil { 122 return fmt.Errorf("video appsink pad not found") 123 } 124 125 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 126 "name": "audioappsink", 127 "sync": false, 128 }) 129 if err != nil { 130 return fmt.Errorf("failed to create audio appsink: %w", err) 131 } 132 133 err = pipeline.Add(audioAppSink) 134 if err != nil { 135 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 136 } 137 138 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 139 if audioAppSinkPadSink == nil { 140 return fmt.Errorf("audio appsink pad not found") 141 } 142 143 ok := videoPad.Link(videoAppSinkPadSink) 144 if ok != gst.PadLinkOK { 145 return fmt.Errorf("failed to link video pad: %v", ok) 146 } 147 148 ok = audioPad.Link(audioAppSinkPadSink) 149 if ok != gst.PadLinkOK { 150 return fmt.Errorf("failed to link audio pad: %v", ok) 151 } 152 153 videoBuf := bytes.Buffer{} 154 audioBuf := bytes.Buffer{} 155 156 videoappsink := app.SinkFromElement(videoAppSink) 157 videoappsink.SetCallbacks(&app.SinkCallbacks{ 158 NewSampleFunc: WriterNewSample(ctx, &videoBuf), 159 }) 160 161 audioappsink := app.SinkFromElement(audioAppSink) 162 audioappsink.SetCallbacks(&app.SinkCallbacks{ 163 NewSampleFunc: WriterNewSample(ctx, &audioBuf), 164 }) 165 166 // Start the pipeline 167 err = pipeline.SetState(gst.StatePlaying) 168 if err != nil { 169 return fmt.Errorf("failed to set pipeline to playing state: %w", err) 170 } 171 172 <-ctx.Done() 173 174 require.Equal(t, 987248, videoBuf.Len()) 175 require.Equal(t, 6440, audioBuf.Len()) 176 177 return <-errCh 178}