Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "testing"
10
11 "github.com/go-gst/go-gst/gst"
12 "github.com/go-gst/go-gst/gst/app"
13 "github.com/stretchr/testify/require"
14 "golang.org/x/sync/errgroup"
15 "stream.place/streamplace/pkg/bus"
16 "stream.place/streamplace/pkg/log"
17)
18
19func TestConcatDemuxBin(t *testing.T) {
20 withNoGSTLeaks(t, func() {
21 g, _ := errgroup.WithContext(context.Background())
22 for range streamplaceTestCount {
23 g.Go(func() error {
24 return innerTestConcatDemuxBin(t)
25 })
26 }
27 err := g.Wait()
28 require.NoError(t, err)
29 })
30}
31
32// This function remains in scope for the duration of a single users' playback
33func innerTestConcatDemuxBin(t *testing.T) error {
34 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}})
35 ctx = log.WithLogValues(ctx, "func", "TestConcat2")
36
37 pipeline, err := gst.NewPipeline("TestConcat2")
38 if err != nil {
39 return fmt.Errorf("failed to create pipeline: %w", err)
40 }
41
42 ctx, cancel := context.WithCancel(ctx)
43
44 errCh := make(chan error)
45 go func() {
46 err := HandleBusMessages(ctx, pipeline)
47 cancel()
48 errCh <- err
49 close(errCh)
50 }()
51
52 defer func() {
53 cancel()
54 err := <-errCh
55 if err != nil {
56 t.Errorf("bus handler error: %v", err)
57 }
58 err = pipeline.BlockSetState(gst.StateNull)
59 if err != nil {
60 t.Errorf("failed to set pipeline to null state: %v", err)
61 }
62 }()
63
64 filename := getFixture("sample-segment.mp4")
65 inputFile, err := os.Open(filename)
66 if err != nil {
67 return fmt.Errorf("failed to open fixture file: %w", err)
68 }
69 defer inputFile.Close()
70
71 bs, err := io.ReadAll(inputFile)
72 if err != nil {
73 return fmt.Errorf("failed to read fixture file: %w", err)
74 }
75
76 testSeg := &bus.Seg{
77 Data: bs,
78 Filepath: filename,
79 }
80
81 concatBin, err := ConcatDemuxBin(ctx, testSeg)
82 if err != nil {
83 return fmt.Errorf("failed to create concat bin: %w", err)
84 }
85
86 err = pipeline.Add(concatBin.Element)
87 if err != nil {
88 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
89 }
90
91 videoPad := concatBin.GetStaticPad("video_0")
92 if videoPad == nil {
93 return fmt.Errorf("video pad not found")
94 }
95
96 audioPad := concatBin.GetStaticPad("audio_0")
97 if audioPad == nil {
98 return fmt.Errorf("audio pad not found")
99 }
100
101 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
102 "name": "videoappsink",
103 "sync": false,
104 })
105 if err != nil {
106 return fmt.Errorf("failed to create video appsink: %w", err)
107 }
108
109 err = pipeline.Add(videoAppSink)
110 if err != nil {
111 return fmt.Errorf("failed to add video appsink to pipeline: %w", err)
112 }
113
114 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink")
115 if videoAppSinkPadSink == nil {
116 return fmt.Errorf("video appsink pad not found")
117 }
118
119 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
120 "name": "audioappsink",
121 "sync": false,
122 })
123 if err != nil {
124 return fmt.Errorf("failed to create audio appsink: %w", err)
125 }
126
127 err = pipeline.Add(audioAppSink)
128 if err != nil {
129 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err)
130 }
131
132 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink")
133 if audioAppSinkPadSink == nil {
134 return fmt.Errorf("audio appsink pad not found")
135 }
136
137 ok := videoPad.Link(videoAppSinkPadSink)
138 if ok != gst.PadLinkOK {
139 return fmt.Errorf("failed to link video pad: %v", ok)
140 }
141
142 ok = audioPad.Link(audioAppSinkPadSink)
143 if ok != gst.PadLinkOK {
144 return fmt.Errorf("failed to link audio pad: %v", ok)
145 }
146
147 videoBuf := bytes.Buffer{}
148 audioBuf := bytes.Buffer{}
149
150 videoappsink := app.SinkFromElement(videoAppSink)
151 videoappsink.SetCallbacks(&app.SinkCallbacks{
152 NewSampleFunc: WriterNewSample(ctx, &videoBuf),
153 })
154
155 audioappsink := app.SinkFromElement(audioAppSink)
156 audioappsink.SetCallbacks(&app.SinkCallbacks{
157 NewSampleFunc: WriterNewSample(ctx, &audioBuf),
158 })
159
160 // Start the pipeline
161 err = pipeline.SetState(gst.StatePlaying)
162 if err != nil {
163 return fmt.Errorf("failed to set pipeline to playing state: %w", err)
164 }
165
166 <-ctx.Done()
167
168 require.Equal(t, 987248, videoBuf.Len())
169 require.Equal(t, 6440, audioBuf.Len())
170
171 return <-errCh
172}