Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "testing"
10
11 "github.com/go-gst/go-gst/gst"
12 "github.com/go-gst/go-gst/gst/app"
13 "github.com/stretchr/testify/require"
14 "go.uber.org/goleak"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/gstinit"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/media/segchanman"
19)
20
21func TestConcatDemuxBin(t *testing.T) {
22 gstinit.InitGST()
23 before := getLeakCount(t)
24 defer checkGStreamerLeaks(t, before)
25 ignore := goleak.IgnoreCurrent()
26 defer goleak.VerifyNone(t, ignore)
27
28 g, _ := errgroup.WithContext(context.Background())
29 for range streamplaceTestCount {
30 g.Go(func() error {
31 return innerTestConcatDemuxBin(t)
32 })
33 }
34 err := g.Wait()
35 require.NoError(t, err)
36}
37
38// This function remains in scope for the duration of a single users' playback
39func innerTestConcatDemuxBin(t *testing.T) error {
40 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}})
41 ctx = log.WithLogValues(ctx, "func", "TestConcat2")
42
43 pipeline, err := gst.NewPipeline("TestConcat2")
44 if err != nil {
45 return fmt.Errorf("failed to create pipeline: %w", err)
46 }
47
48 ctx, cancel := context.WithCancel(ctx)
49
50 errCh := make(chan error)
51 go func() {
52 err := HandleBusMessages(ctx, pipeline)
53 cancel()
54 errCh <- err
55 close(errCh)
56 }()
57
58 defer func() {
59 cancel()
60 err := <-errCh
61 if err != nil {
62 t.Errorf("bus handler error: %v", err)
63 }
64 err = pipeline.BlockSetState(gst.StateNull)
65 if err != nil {
66 t.Errorf("failed to set pipeline to null state: %v", err)
67 }
68 }()
69
70 filename := getFixture("sample-segment.mp4")
71 inputFile, err := os.Open(filename)
72 if err != nil {
73 return fmt.Errorf("failed to open fixture file: %w", err)
74 }
75 defer inputFile.Close()
76
77 bs, err := io.ReadAll(inputFile)
78 if err != nil {
79 return fmt.Errorf("failed to read fixture file: %w", err)
80 }
81
82 testSeg := &segchanman.Seg{
83 Data: bs,
84 Filepath: filename,
85 }
86
87 concatBin, err := ConcatDemuxBin(ctx, testSeg)
88 if err != nil {
89 return fmt.Errorf("failed to create concat bin: %w", err)
90 }
91
92 err = pipeline.Add(concatBin.Element)
93 if err != nil {
94 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
95 }
96
97 videoPad := concatBin.GetStaticPad("video_0")
98 if videoPad == nil {
99 return fmt.Errorf("video pad not found")
100 }
101
102 audioPad := concatBin.GetStaticPad("audio_0")
103 if audioPad == nil {
104 return fmt.Errorf("audio pad not found")
105 }
106
107 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
108 "name": "videoappsink",
109 "sync": false,
110 })
111 if err != nil {
112 return fmt.Errorf("failed to create video appsink: %w", err)
113 }
114
115 err = pipeline.Add(videoAppSink)
116 if err != nil {
117 return fmt.Errorf("failed to add video appsink to pipeline: %w", err)
118 }
119
120 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink")
121 if videoAppSinkPadSink == nil {
122 return fmt.Errorf("video appsink pad not found")
123 }
124
125 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
126 "name": "audioappsink",
127 "sync": false,
128 })
129 if err != nil {
130 return fmt.Errorf("failed to create audio appsink: %w", err)
131 }
132
133 err = pipeline.Add(audioAppSink)
134 if err != nil {
135 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err)
136 }
137
138 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink")
139 if audioAppSinkPadSink == nil {
140 return fmt.Errorf("audio appsink pad not found")
141 }
142
143 ok := videoPad.Link(videoAppSinkPadSink)
144 if ok != gst.PadLinkOK {
145 return fmt.Errorf("failed to link video pad: %v", ok)
146 }
147
148 ok = audioPad.Link(audioAppSinkPadSink)
149 if ok != gst.PadLinkOK {
150 return fmt.Errorf("failed to link audio pad: %v", ok)
151 }
152
153 videoBuf := bytes.Buffer{}
154 audioBuf := bytes.Buffer{}
155
156 videoappsink := app.SinkFromElement(videoAppSink)
157 videoappsink.SetCallbacks(&app.SinkCallbacks{
158 NewSampleFunc: WriterNewSample(ctx, &videoBuf),
159 })
160
161 audioappsink := app.SinkFromElement(audioAppSink)
162 audioappsink.SetCallbacks(&app.SinkCallbacks{
163 NewSampleFunc: WriterNewSample(ctx, &audioBuf),
164 })
165
166 // Start the pipeline
167 err = pipeline.SetState(gst.StatePlaying)
168 if err != nil {
169 return fmt.Errorf("failed to set pipeline to playing state: %w", err)
170 }
171
172 <-ctx.Done()
173
174 require.Equal(t, 987248, videoBuf.Len())
175 require.Equal(t, 6440, audioBuf.Len())
176
177 return <-errCh
178}