Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/screen-sharing 166 lines 4.1 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "testing" 10 11 "github.com/go-gst/go-gst/gst" 12 "github.com/go-gst/go-gst/gst/app" 13 "github.com/stretchr/testify/require" 14 "golang.org/x/sync/errgroup" 15 "stream.place/streamplace/pkg/bus" 16 "stream.place/streamplace/pkg/log" 17) 18 19func TestConcatDemuxBin(t *testing.T) { 20 withNoGSTLeaks(t, func() { 21 g, _ := errgroup.WithContext(context.Background()) 22 for range streamplaceTestCount { 23 g.Go(func() error { 24 return innerTestConcatDemuxBin(t) 25 }) 26 } 27 err := g.Wait() 28 require.NoError(t, err) 29 }) 30} 31 32// This function remains in scope for the duration of a single users' playback 33func innerTestConcatDemuxBin(t *testing.T) error { 34 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}}) 35 ctx = log.WithLogValues(ctx, "func", "TestConcat2") 36 37 pipeline, err := gst.NewPipeline("TestConcat2") 38 if err != nil { 39 return fmt.Errorf("failed to create pipeline: %w", err) 40 } 41 42 ctx, cancel := context.WithCancel(ctx) 43 defer cancel() 44 45 errCh := make(chan error) 46 go func() { 47 err := HandleBusMessages(ctx, pipeline) 48 errCh <- err 49 }() 50 51 filename := getFixture("sample-segment.mp4") 52 inputFile, err := os.Open(filename) 53 if err != nil { 54 return fmt.Errorf("failed to open fixture file: %w", err) 55 } 56 defer inputFile.Close() 57 58 bs, err := io.ReadAll(inputFile) 59 if err != nil { 60 return fmt.Errorf("failed to read fixture file: %w", err) 61 } 62 63 testSeg := &bus.Seg{ 64 Data: bs, 65 Filepath: filename, 66 } 67 68 concatBin, err := ConcatDemuxBin(ctx, testSeg) 69 if err != nil { 70 return fmt.Errorf("failed to create concat bin: %w", err) 71 } 72 73 err = pipeline.Add(concatBin.Element) 74 if err != nil { 75 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 76 } 77 78 videoPad := concatBin.GetStaticPad("video_0") 79 if videoPad == nil { 80 return fmt.Errorf("video pad not found") 81 } 82 83 audioPad := concatBin.GetStaticPad("audio_0") 84 if audioPad == nil { 85 return fmt.Errorf("audio pad not found") 86 } 87 88 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 89 "name": "videoappsink", 90 "sync": false, 91 }) 92 if err != nil { 93 return fmt.Errorf("failed to create video appsink: %w", err) 94 } 95 96 err = pipeline.Add(videoAppSink) 97 if err != nil { 98 return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 99 } 100 101 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 102 if videoAppSinkPadSink == nil { 103 return fmt.Errorf("video appsink pad not found") 104 } 105 106 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 107 "name": "audioappsink", 108 "sync": false, 109 }) 110 if err != nil { 111 return fmt.Errorf("failed to create audio appsink: %w", err) 112 } 113 114 err = pipeline.Add(audioAppSink) 115 if err != nil { 116 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 117 } 118 119 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 120 if audioAppSinkPadSink == nil { 121 return fmt.Errorf("audio appsink pad not found") 122 } 123 124 ok := videoPad.Link(videoAppSinkPadSink) 125 if ok != gst.PadLinkOK { 126 return fmt.Errorf("failed to link video pad: %v", ok) 127 } 128 129 ok = audioPad.Link(audioAppSinkPadSink) 130 if ok != gst.PadLinkOK { 131 return fmt.Errorf("failed to link audio pad: %v", ok) 132 } 133 134 videoBuf := bytes.Buffer{} 135 audioBuf := bytes.Buffer{} 136 137 videoappsink := app.SinkFromElement(videoAppSink) 138 videoappsink.SetCallbacks(&app.SinkCallbacks{ 139 NewSampleFunc: WriterNewSample(ctx, &videoBuf), 140 }) 141 142 audioappsink := app.SinkFromElement(audioAppSink) 143 audioappsink.SetCallbacks(&app.SinkCallbacks{ 144 NewSampleFunc: WriterNewSample(ctx, &audioBuf), 145 }) 146 147 // Start the pipeline 148 err = pipeline.SetState(gst.StatePlaying) 149 if err != nil { 150 return fmt.Errorf("failed to set pipeline to playing state: %w", err) 151 } 152 153 defer func() { 154 if err != nil { 155 t.Errorf("bus handler error: %v", err) 156 } 157 err = pipeline.BlockSetState(gst.StateNull) 158 if err != nil { 159 t.Errorf("failed to set pipeline to null state: %v", err) 160 } 161 require.Equal(t, 987248, videoBuf.Len()) 162 require.Equal(t, 6440, audioBuf.Len()) 163 }() 164 165 return <-errCh 166}