Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "testing"
10 "time"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "github.com/google/uuid"
15 "github.com/stretchr/testify/require"
16 "golang.org/x/sync/errgroup"
17 "stream.place/streamplace/pkg/bus"
18 "stream.place/streamplace/pkg/log"
19)
20
21func TestConcatBin(t *testing.T) {
22 withNoGSTLeaks(t, func() {
23
24 g, _ := errgroup.WithContext(context.Background())
25 for range streamplaceTestCount {
26 g.Go(func() error {
27 return innerTestConcatBin(t)
28 })
29 }
30 err := g.Wait()
31 require.NoError(t, err)
32 })
33}
34
35// This function remains in scope for the duration of a single users' playback
36func innerTestConcatBin(t *testing.T) error {
37 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "ConcatBin": 9, "SegDemuxBin": 9}})
38 tag := os.Getenv("TEST_TAG")
39 uuid, _ := uuid.NewV7()
40 uuidStr := uuid.String()
41 if tag != "" {
42 ctx = log.WithLogValues(ctx, "tag", tag)
43 uuidStr = fmt.Sprintf("%s-%s", tag, uuidStr)
44 }
45 ctx = log.WithLogValues(ctx, "func", "ConcatBin", "uuid", uuidStr)
46
47 pipeline, err := gst.NewPipeline("TestConcatBin")
48 if err != nil {
49 return fmt.Errorf("failed to create pipeline: %w", err)
50 }
51
52 ctx, cancel := context.WithCancel(ctx)
53
54 errCh := make(chan error)
55 go func() {
56 err := HandleBusMessages(ctx, pipeline)
57 cancel()
58 errCh <- err
59 close(errCh)
60 }()
61
62 defer func() {
63 cancel()
64 err := <-errCh
65 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr))
66 err = pipeline.BlockSetState(gst.StateNull)
67 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr))
68 }()
69
70 filename := getFixture("sample-segment.mp4")
71 inputFile, err := os.Open(filename)
72 if err != nil {
73 return fmt.Errorf("failed to open fixture file: %w", err)
74 }
75 defer inputFile.Close()
76
77 bs, err := io.ReadAll(inputFile)
78 if err != nil {
79 return fmt.Errorf("failed to read fixture file: %w", err)
80 }
81
82 testSegs := []*bus.Seg{}
83 for range 5 {
84 testSegs = append(testSegs, &bus.Seg{
85 Data: bs,
86 Filepath: filename,
87 })
88 }
89
90 segCh := make(chan *bus.Seg)
91 go func() {
92 for _, seg := range testSegs {
93 segCh <- seg
94 }
95 close(segCh)
96 }()
97
98 concatBin, err := ConcatBin(ctx, segCh)
99 if err != nil {
100 return fmt.Errorf("failed to create concat bin: %w", err)
101 }
102
103 err = pipeline.Add(concatBin.Element)
104 if err != nil {
105 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
106 }
107
108 videoPad := concatBin.GetStaticPad("video_0")
109 if videoPad == nil {
110 return fmt.Errorf("video pad not found")
111 }
112
113 audioPad := concatBin.GetStaticPad("audio_0")
114 if audioPad == nil {
115 return fmt.Errorf("audio pad not found")
116 }
117
118 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
119 "name": "videoappsink",
120 "sync": false,
121 })
122 if err != nil {
123 return fmt.Errorf("failed to create video appsink: %w", err)
124 }
125
126 err = pipeline.Add(videoAppSink)
127 if err != nil {
128 return fmt.Errorf("failed to add video appsink to pipeline: %w", err)
129 }
130
131 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink")
132 if videoAppSinkPadSink == nil {
133 return fmt.Errorf("video appsink pad not found")
134 }
135
136 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
137 "name": "audioappsink",
138 "sync": false,
139 })
140 if err != nil {
141 return fmt.Errorf("failed to create audio appsink: %w", err)
142 }
143
144 err = pipeline.Add(audioAppSink)
145 if err != nil {
146 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err)
147 }
148
149 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink")
150 if audioAppSinkPadSink == nil {
151 return fmt.Errorf("audio appsink pad not found")
152 }
153
154 ok := videoPad.Link(videoAppSinkPadSink)
155 if ok != gst.PadLinkOK {
156 return fmt.Errorf("failed to link video pad: %v", ok)
157 }
158
159 ok = audioPad.Link(audioAppSinkPadSink)
160 if ok != gst.PadLinkOK {
161 return fmt.Errorf("failed to link audio pad: %v", ok)
162 }
163
164 videoBuf := bytes.Buffer{}
165 audioBuf := bytes.Buffer{}
166
167 videoappsink := app.SinkFromElement(videoAppSink)
168 videoappsink.SetCallbacks(&app.SinkCallbacks{
169 NewSampleFunc: WriterNewSample(ctx, &videoBuf),
170 })
171
172 audioappsink := app.SinkFromElement(audioAppSink)
173 audioappsink.SetCallbacks(&app.SinkCallbacks{
174 NewSampleFunc: WriterNewSample(ctx, &audioBuf),
175 })
176
177 // Start the pipeline
178 err = pipeline.SetState(gst.StatePlaying)
179 if err != nil {
180 return fmt.Errorf("failed to set pipeline to playing state: %w", err)
181 }
182
183 // Start a goroutine to print buffer sizes
184 go func() {
185 for {
186 select {
187 case <-ctx.Done():
188 return
189 case <-time.After(1 * time.Second):
190 log.Debug(ctx, "buffer sizes",
191 "videoBuf", videoBuf.Len(),
192 "audioBuf", audioBuf.Len())
193 }
194 }
195 }()
196
197 <-ctx.Done()
198
199 time.Sleep(5 * time.Second)
200
201 padIdleCh := make(chan struct{})
202
203 padIdle := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
204 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection())
205 go func() {
206 padIdleCh <- struct{}{}
207 }()
208 return gst.PadProbeRemove
209 }
210
211 videoAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle)
212 audioAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle)
213
214 <-padIdleCh
215 <-padIdleCh
216
217 require.Equal(t, 4936240, videoBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr))
218 require.Equal(t, 32200, audioBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr))
219
220 return <-errCh
221}