Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "testing"
10 "time"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "github.com/google/uuid"
15 "github.com/stretchr/testify/require"
16 "go.uber.org/goleak"
17 "golang.org/x/sync/errgroup"
18 "stream.place/streamplace/pkg/gstinit"
19 "stream.place/streamplace/pkg/log"
20 "stream.place/streamplace/pkg/media/segchanman"
21)
22
23func TestConcatBin(t *testing.T) {
24 gstinit.InitGST()
25 before := getLeakCount(t)
26 defer checkGStreamerLeaks(t, before)
27 ignore := goleak.IgnoreCurrent()
28 defer goleak.VerifyNone(t, ignore)
29
30 g, _ := errgroup.WithContext(context.Background())
31 for range streamplaceTestCount {
32 g.Go(func() error {
33 return innerTestConcatBin(t)
34 })
35 }
36 err := g.Wait()
37 require.NoError(t, err)
38}
39
40// This function remains in scope for the duration of a single users' playback
41func innerTestConcatBin(t *testing.T) error {
42 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "ConcatBin": 9, "SegDemuxBin": 9}})
43 tag := os.Getenv("TEST_TAG")
44 uuid, _ := uuid.NewV7()
45 uuidStr := uuid.String()
46 if tag != "" {
47 ctx = log.WithLogValues(ctx, "tag", tag)
48 uuidStr = fmt.Sprintf("%s-%s", tag, uuidStr)
49 }
50 ctx = log.WithLogValues(ctx, "func", "ConcatBin", "uuid", uuidStr)
51
52 pipeline, err := gst.NewPipeline("TestConcatBin")
53 if err != nil {
54 return fmt.Errorf("failed to create pipeline: %w", err)
55 }
56
57 ctx, cancel := context.WithCancel(ctx)
58
59 errCh := make(chan error)
60 go func() {
61 err := HandleBusMessages(ctx, pipeline)
62 cancel()
63 errCh <- err
64 close(errCh)
65 }()
66
67 defer func() {
68 cancel()
69 err := <-errCh
70 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr))
71 err = pipeline.BlockSetState(gst.StateNull)
72 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr))
73 }()
74
75 filename := getFixture("sample-segment.mp4")
76 inputFile, err := os.Open(filename)
77 if err != nil {
78 return fmt.Errorf("failed to open fixture file: %w", err)
79 }
80 defer inputFile.Close()
81
82 bs, err := io.ReadAll(inputFile)
83 if err != nil {
84 return fmt.Errorf("failed to read fixture file: %w", err)
85 }
86
87 testSegs := []*segchanman.Seg{}
88 for range 5 {
89 testSegs = append(testSegs, &segchanman.Seg{
90 Data: bs,
91 Filepath: filename,
92 })
93 }
94
95 segCh := make(chan *segchanman.Seg)
96 go func() {
97 for _, seg := range testSegs {
98 segCh <- seg
99 }
100 close(segCh)
101 }()
102
103 concatBin, err := ConcatBin(ctx, segCh)
104 if err != nil {
105 return fmt.Errorf("failed to create concat bin: %w", err)
106 }
107
108 err = pipeline.Add(concatBin.Element)
109 if err != nil {
110 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
111 }
112
113 videoPad := concatBin.GetStaticPad("video_0")
114 if videoPad == nil {
115 return fmt.Errorf("video pad not found")
116 }
117
118 audioPad := concatBin.GetStaticPad("audio_0")
119 if audioPad == nil {
120 return fmt.Errorf("audio pad not found")
121 }
122
123 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
124 "name": "videoappsink",
125 "sync": false,
126 })
127 if err != nil {
128 return fmt.Errorf("failed to create video appsink: %w", err)
129 }
130
131 err = pipeline.Add(videoAppSink)
132 if err != nil {
133 return fmt.Errorf("failed to add video appsink to pipeline: %w", err)
134 }
135
136 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink")
137 if videoAppSinkPadSink == nil {
138 return fmt.Errorf("video appsink pad not found")
139 }
140
141 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
142 "name": "audioappsink",
143 "sync": false,
144 })
145 if err != nil {
146 return fmt.Errorf("failed to create audio appsink: %w", err)
147 }
148
149 err = pipeline.Add(audioAppSink)
150 if err != nil {
151 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err)
152 }
153
154 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink")
155 if audioAppSinkPadSink == nil {
156 return fmt.Errorf("audio appsink pad not found")
157 }
158
159 ok := videoPad.Link(videoAppSinkPadSink)
160 if ok != gst.PadLinkOK {
161 return fmt.Errorf("failed to link video pad: %v", ok)
162 }
163
164 ok = audioPad.Link(audioAppSinkPadSink)
165 if ok != gst.PadLinkOK {
166 return fmt.Errorf("failed to link audio pad: %v", ok)
167 }
168
169 videoBuf := bytes.Buffer{}
170 audioBuf := bytes.Buffer{}
171
172 videoappsink := app.SinkFromElement(videoAppSink)
173 videoappsink.SetCallbacks(&app.SinkCallbacks{
174 NewSampleFunc: WriterNewSample(ctx, &videoBuf),
175 })
176
177 audioappsink := app.SinkFromElement(audioAppSink)
178 audioappsink.SetCallbacks(&app.SinkCallbacks{
179 NewSampleFunc: WriterNewSample(ctx, &audioBuf),
180 })
181
182 // Start the pipeline
183 err = pipeline.SetState(gst.StatePlaying)
184 if err != nil {
185 return fmt.Errorf("failed to set pipeline to playing state: %w", err)
186 }
187
188 // Start a goroutine to print buffer sizes
189 go func() {
190 for {
191 select {
192 case <-ctx.Done():
193 return
194 case <-time.After(1 * time.Second):
195 log.Debug(ctx, "buffer sizes",
196 "videoBuf", videoBuf.Len(),
197 "audioBuf", audioBuf.Len())
198 }
199 }
200 }()
201
202 <-ctx.Done()
203
204 time.Sleep(5 * time.Second)
205
206 padIdleCh := make(chan struct{})
207
208 padIdle := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
209 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection())
210 go func() {
211 padIdleCh <- struct{}{}
212 }()
213 return gst.PadProbeRemove
214 }
215
216 videoAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle)
217 audioAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle)
218
219 <-padIdleCh
220 <-padIdleCh
221
222 require.Equal(t, 4936240, videoBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr))
223 require.Equal(t, 32200, audioBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr))
224
225 return <-errCh
226}