Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "testing"
7
8 "github.com/go-gst/go-gst/gst"
9 "github.com/go-gst/go-gst/gst/app"
10 "github.com/stretchr/testify/require"
11 "go.uber.org/goleak"
12 "golang.org/x/sync/errgroup"
13 "stream.place/streamplace/pkg/log"
14)
15
16func TestBusHandlerCleanup(t *testing.T) {
17 ignore := goleak.IgnoreCurrent()
18 defer goleak.VerifyNone(t, ignore)
19 before := getLeakCount(t)
20 defer checkGStreamerLeaks(t, before)
21
22 g, ctx := errgroup.WithContext(context.Background())
23 ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestBusHandler": 9}})
24 for i := 0; i < streamplaceTestCount; i++ {
25 g.Go(func() error {
26 err := testBusHandlerCleanupInner(ctx, i)
27 if err == nil {
28 return fmt.Errorf("expected error")
29 }
30 return nil
31 })
32 }
33 err := g.Wait()
34 require.NoError(t, err)
35}
36
37func testBusHandlerCleanupInner(ctx context.Context, i int) error {
38 ctx = log.WithLogValues(ctx, "func", "TestBusHandler")
39 ctx, cancel := context.WithCancel(ctx)
40
41 pipeline, err := gst.NewPipeline(fmt.Sprintf("TestBusHandler-%d", i))
42 if err != nil {
43 return err
44 }
45
46 busDone := make(chan struct{})
47 go func() {
48 _ = HandleBusMessages(ctx, pipeline)
49 busDone <- struct{}{}
50 cancel()
51 }()
52
53 defer func() {
54 cancel()
55 <-busDone
56 err = pipeline.SetState(gst.StateNull)
57 if err != nil {
58 panic(fmt.Sprintf("failed to set state to null: %s", err))
59 }
60 }()
61
62 fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{
63 "location": getFixture("5sec.mp4"),
64 })
65 if err != nil {
66 return err
67 }
68 err = pipeline.Add(fileSrc)
69 if err != nil {
70 return err
71 }
72
73 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{
74 "name": fmt.Sprintf("TestBusHandler-qtdemux-%d", i),
75 })
76 if err != nil {
77 return err
78 }
79 err = pipeline.Add(demux)
80 if err != nil {
81 return err
82 }
83
84 appSink, err := gst.NewElementWithProperties("appsink", map[string]any{
85 "name": fmt.Sprintf("TestBusHandler-appsink-%d", i),
86 "sync": false,
87 })
88 if err != nil {
89 return err
90 }
91 err = pipeline.Add(appSink)
92 if err != nil {
93 return err
94 }
95
96 sink := app.SinkFromElement(appSink)
97 sink.SetCallbacks(&app.SinkCallbacks{
98 NewSampleFunc: WriterNewSample(ctx, nil),
99 })
100
101 return fmt.Errorf("test error")
102
103}