Live video on the AT Protocol
1package media
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "strconv"
10 "testing"
11
12 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "github.com/stretchr/testify/require"
15 "go.uber.org/goleak"
16 "golang.org/x/sync/errgroup"
17 "stream.place/streamplace/pkg/log"
18)
19
20var streamplaceTestCount = 50
21
22func init() {
23 testRunsStr := os.Getenv("STREAMPLACE_TEST_COUNT")
24 if testRunsStr != "" {
25 var err error
26 streamplaceTestCount, err = strconv.Atoi(testRunsStr)
27 if err != nil {
28 panic(fmt.Sprintf("STREAMPLACE_TEST_COUNT is not a number: %s", testRunsStr))
29 }
30 }
31}
32
33func TestWriterNewSample(t *testing.T) {
34 ignore := goleak.IgnoreCurrent()
35 defer goleak.VerifyNone(t, ignore)
36 before := getLeakCount(t)
37 defer checkGStreamerLeaks(t, before)
38 filePath := getFixture("5sec.mp4")
39 fileInfo, err := os.Stat(filePath)
40 require.NoError(t, err)
41 fileSize := fileInfo.Size()
42 t.Logf("Test file size: %d bytes", fileSize)
43 g, ctx := errgroup.WithContext(context.Background())
44 // ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestWriterNewSample": 9}})
45 for i := 0; i < streamplaceTestCount; i++ {
46 g.Go(func() error {
47 bs := bytes.Buffer{}
48 err := writerNewSampleInner(ctx, i, &bs)
49 if err != nil {
50 return err
51 }
52 if bs.Len() != int(fileSize) {
53 return fmt.Errorf("expected %d bytes, got %d", fileSize, bs.Len())
54 }
55 return nil
56 })
57 }
58 err = g.Wait()
59 require.NoError(t, err)
60}
61
62func writerNewSampleInner(ctx context.Context, i int, w io.Writer) error {
63 ctx = log.WithLogValues(ctx, "func", "TestWriterNewSample")
64 ctx, cancel := context.WithCancel(ctx)
65 defer cancel()
66
67 pipeline, err := gst.NewPipeline(fmt.Sprintf("TestWriterNewSample-%d", i))
68 if err != nil {
69 return err
70 }
71
72 fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{
73 "location": getFixture("5sec.mp4"),
74 })
75 if err != nil {
76 return err
77 }
78 err = pipeline.Add(fileSrc)
79 if err != nil {
80 return err
81 }
82
83 var busErr error
84 go func() {
85 busErr = HandleBusMessages(ctx, pipeline)
86 cancel()
87 }()
88
89 appSink, err := gst.NewElementWithProperties("appsink", map[string]any{
90 "name": fmt.Sprintf("TestWriterNewSample-appsink-%d", i),
91 "sync": false,
92 })
93 if err != nil {
94 return err
95 }
96 err = pipeline.Add(appSink)
97 if err != nil {
98 return err
99 }
100
101 sink := app.SinkFromElement(appSink)
102 sink.SetCallbacks(&app.SinkCallbacks{
103 NewSampleFunc: WriterNewSample(ctx, w),
104 })
105
106 err = fileSrc.Link(appSink)
107 if err != nil {
108 return err
109 }
110
111 err = pipeline.SetState(gst.StatePlaying)
112 if err != nil {
113 return err
114 }
115
116 <-ctx.Done()
117
118 err = pipeline.SetState(gst.StateNull)
119 if err != nil {
120 return err
121 }
122
123 return busErr
124}