Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "testing"
10
11 "github.com/go-gst/go-gst/gst"
12 "github.com/go-gst/go-gst/gst/app"
13 "github.com/stretchr/testify/require"
14 "golang.org/x/sync/errgroup"
15 "stream.place/streamplace/pkg/bus"
16 "stream.place/streamplace/pkg/log"
17)
18
19func TestConcatDemuxBin(t *testing.T) {
20 withNoGSTLeaks(t, func() {
21 g, _ := errgroup.WithContext(context.Background())
22 for range streamplaceTestCount {
23 g.Go(func() error {
24 return innerTestConcatDemuxBin(t)
25 })
26 }
27 err := g.Wait()
28 require.NoError(t, err)
29 })
30}
31
32// This function remains in scope for the duration of a single users' playback
33func innerTestConcatDemuxBin(t *testing.T) error {
34 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}})
35 ctx = log.WithLogValues(ctx, "func", "TestConcat2")
36
37 pipeline, err := gst.NewPipeline("TestConcat2")
38 if err != nil {
39 return fmt.Errorf("failed to create pipeline: %w", err)
40 }
41
42 ctx, cancel := context.WithCancel(ctx)
43 defer cancel()
44
45 errCh := make(chan error)
46 go func() {
47 err := HandleBusMessages(ctx, pipeline)
48 errCh <- err
49 }()
50
51 filename := getFixture("sample-segment.mp4")
52 inputFile, err := os.Open(filename)
53 if err != nil {
54 return fmt.Errorf("failed to open fixture file: %w", err)
55 }
56 defer inputFile.Close()
57
58 bs, err := io.ReadAll(inputFile)
59 if err != nil {
60 return fmt.Errorf("failed to read fixture file: %w", err)
61 }
62
63 testSeg := &bus.Seg{
64 Data: bs,
65 Filepath: filename,
66 }
67
68 concatBin, err := ConcatDemuxBin(ctx, testSeg)
69 if err != nil {
70 return fmt.Errorf("failed to create concat bin: %w", err)
71 }
72
73 err = pipeline.Add(concatBin.Element)
74 if err != nil {
75 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
76 }
77
78 videoPad := concatBin.GetStaticPad("video_0")
79 if videoPad == nil {
80 return fmt.Errorf("video pad not found")
81 }
82
83 audioPad := concatBin.GetStaticPad("audio_0")
84 if audioPad == nil {
85 return fmt.Errorf("audio pad not found")
86 }
87
88 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
89 "name": "videoappsink",
90 "sync": false,
91 })
92 if err != nil {
93 return fmt.Errorf("failed to create video appsink: %w", err)
94 }
95
96 err = pipeline.Add(videoAppSink)
97 if err != nil {
98 return fmt.Errorf("failed to add video appsink to pipeline: %w", err)
99 }
100
101 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink")
102 if videoAppSinkPadSink == nil {
103 return fmt.Errorf("video appsink pad not found")
104 }
105
106 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
107 "name": "audioappsink",
108 "sync": false,
109 })
110 if err != nil {
111 return fmt.Errorf("failed to create audio appsink: %w", err)
112 }
113
114 err = pipeline.Add(audioAppSink)
115 if err != nil {
116 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err)
117 }
118
119 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink")
120 if audioAppSinkPadSink == nil {
121 return fmt.Errorf("audio appsink pad not found")
122 }
123
124 ok := videoPad.Link(videoAppSinkPadSink)
125 if ok != gst.PadLinkOK {
126 return fmt.Errorf("failed to link video pad: %v", ok)
127 }
128
129 ok = audioPad.Link(audioAppSinkPadSink)
130 if ok != gst.PadLinkOK {
131 return fmt.Errorf("failed to link audio pad: %v", ok)
132 }
133
134 videoBuf := bytes.Buffer{}
135 audioBuf := bytes.Buffer{}
136
137 videoappsink := app.SinkFromElement(videoAppSink)
138 videoappsink.SetCallbacks(&app.SinkCallbacks{
139 NewSampleFunc: WriterNewSample(ctx, &videoBuf),
140 })
141
142 audioappsink := app.SinkFromElement(audioAppSink)
143 audioappsink.SetCallbacks(&app.SinkCallbacks{
144 NewSampleFunc: WriterNewSample(ctx, &audioBuf),
145 })
146
147 // Start the pipeline
148 err = pipeline.SetState(gst.StatePlaying)
149 if err != nil {
150 return fmt.Errorf("failed to set pipeline to playing state: %w", err)
151 }
152
153 defer func() {
154 if err != nil {
155 t.Errorf("bus handler error: %v", err)
156 }
157 err = pipeline.BlockSetState(gst.StateNull)
158 if err != nil {
159 t.Errorf("failed to set pipeline to null state: %v", err)
160 }
161 require.Equal(t, 987248, videoBuf.Len())
162 require.Equal(t, 6440, audioBuf.Len())
163 }()
164
165 return <-errCh
166}