Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "testing"
10
11 "github.com/go-gst/go-gst/gst"
12 "github.com/go-gst/go-gst/gst/app"
13 "github.com/stretchr/testify/require"
14 "golang.org/x/sync/errgroup"
15 "stream.place/streamplace/pkg/bus"
16 "stream.place/streamplace/pkg/log"
17)
18
19func TestConcatDemuxBin(t *testing.T) {
20 withNoGSTLeaks(t, func() {
21 g, _ := errgroup.WithContext(context.Background())
22 for range streamplaceTestCount {
23 g.Go(func() error {
24 return innerTestConcatDemuxBin(t)
25 })
26 }
27 err := g.Wait()
28 require.NoError(t, err)
29 })
30}
31
32func innerTestConcatDemuxBin(t *testing.T) error {
33 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}})
34 ctx = log.WithLogValues(ctx, "func", "TestConcat2")
35
36 pipeline, err := gst.NewPipeline("TestConcat2")
37 if err != nil {
38 return fmt.Errorf("failed to create pipeline: %w", err)
39 }
40
41 ctx, cancel := context.WithCancel(ctx)
42 defer cancel()
43
44 errCh := make(chan error)
45 go func() {
46 err := HandleBusMessages(ctx, pipeline)
47 errCh <- err
48 }()
49
50 filename := getFixture("sample-segment.mp4")
51 inputFile, err := os.Open(filename)
52 if err != nil {
53 return fmt.Errorf("failed to open fixture file: %w", err)
54 }
55 defer inputFile.Close()
56
57 bs, err := io.ReadAll(inputFile)
58 if err != nil {
59 return fmt.Errorf("failed to read fixture file: %w", err)
60 }
61
62 testSeg := &bus.Seg{
63 Data: bs,
64 Filepath: filename,
65 }
66
67 concatBin, err := ConcatDemuxBin(ctx, testSeg, true)
68 if err != nil {
69 return fmt.Errorf("failed to create concat bin: %w", err)
70 }
71
72 err = pipeline.Add(concatBin.Element)
73 if err != nil {
74 return fmt.Errorf("failed to add concat bin to pipeline: %w", err)
75 }
76
77 videoPad := concatBin.GetStaticPad("video_0")
78 if videoPad == nil {
79 return fmt.Errorf("video pad not found")
80 }
81
82 audioPad := concatBin.GetStaticPad("audio_0")
83 if audioPad == nil {
84 return fmt.Errorf("audio pad not found")
85 }
86
87 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
88 "name": "videoappsink",
89 "sync": false,
90 })
91 if err != nil {
92 return fmt.Errorf("failed to create video appsink: %w", err)
93 }
94
95 err = pipeline.Add(videoAppSink)
96 if err != nil {
97 return fmt.Errorf("failed to add video appsink to pipeline: %w", err)
98 }
99
100 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink")
101 if videoAppSinkPadSink == nil {
102 return fmt.Errorf("video appsink pad not found")
103 }
104
105 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{
106 "name": "audioappsink",
107 "sync": false,
108 })
109 if err != nil {
110 return fmt.Errorf("failed to create audio appsink: %w", err)
111 }
112
113 err = pipeline.Add(audioAppSink)
114 if err != nil {
115 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err)
116 }
117
118 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink")
119 if audioAppSinkPadSink == nil {
120 return fmt.Errorf("audio appsink pad not found")
121 }
122
123 ok := videoPad.Link(videoAppSinkPadSink)
124 if ok != gst.PadLinkOK {
125 return fmt.Errorf("failed to link video pad: %v", ok)
126 }
127
128 ok = audioPad.Link(audioAppSinkPadSink)
129 if ok != gst.PadLinkOK {
130 return fmt.Errorf("failed to link audio pad: %v", ok)
131 }
132
133 videoBuf := bytes.Buffer{}
134 audioBuf := bytes.Buffer{}
135
136 videoappsink := app.SinkFromElement(videoAppSink)
137 videoappsink.SetCallbacks(&app.SinkCallbacks{
138 NewSampleFunc: WriterNewSample(ctx, &videoBuf),
139 })
140
141 audioappsink := app.SinkFromElement(audioAppSink)
142 audioappsink.SetCallbacks(&app.SinkCallbacks{
143 NewSampleFunc: WriterNewSample(ctx, &audioBuf),
144 })
145
146 // Start the pipeline
147 err = pipeline.SetState(gst.StatePlaying)
148 if err != nil {
149 return fmt.Errorf("failed to set pipeline to playing state: %w", err)
150 }
151
152 defer func() {
153 if err != nil {
154 t.Errorf("bus handler error: %v", err)
155 }
156 err = pipeline.BlockSetState(gst.StateNull)
157 if err != nil {
158 t.Errorf("failed to set pipeline to null state: %v", err)
159 }
160 require.Equal(t, 987291, videoBuf.Len())
161 require.Equal(t, 6440, audioBuf.Len())
162 }()
163
164 return <-errCh
165}