Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "testing"
7
8 "github.com/go-gst/go-gst/gst"
9 "github.com/go-gst/go-gst/gst/app"
10 "github.com/stretchr/testify/require"
11 "go.uber.org/goleak"
12 "golang.org/x/sync/errgroup"
13 "stream.place/streamplace/pkg/log"
14)
15
16func TestBusHandlerCleanup(t *testing.T) {
17 ignore := goleak.IgnoreCurrent()
18 defer goleak.VerifyNone(t, ignore)
19 before := getLeakCount(t)
20 defer checkGStreamerLeaks(t, before)
21
22 g, ctx := errgroup.WithContext(context.Background())
23 ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestBusHandler": 9}})
24 for i := range streamplaceTestCount {
25 g.Go(func() error {
26 err := testBusHandlerCleanupInner(ctx, i)
27 if err == nil {
28 return fmt.Errorf("expected error")
29 }
30 return nil
31 })
32 }
33 err := g.Wait()
34 require.NoError(t, err)
35}
36
37func testBusHandlerCleanupInner(ctx context.Context, i int) error {
38 ctx = log.WithLogValues(ctx, "func", "TestBusHandler")
39 ctx, cancel := context.WithCancel(ctx)
40 defer cancel()
41
42 pipeline, err := gst.NewPipeline(fmt.Sprintf("TestBusHandler-%d", i))
43 if err != nil {
44 return err
45 }
46
47 busDone := make(chan struct{})
48 go func() {
49 _ = HandleBusMessages(ctx, pipeline)
50 busDone <- struct{}{}
51 cancel()
52 }()
53
54 defer func() {
55 cancel()
56 <-busDone
57 err = pipeline.SetState(gst.StateNull)
58 if err != nil {
59 panic(fmt.Sprintf("failed to set state to null: %s", err))
60 }
61 }()
62
63 fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{
64 "location": getFixture("5sec.mp4"),
65 })
66 if err != nil {
67 return err
68 }
69 err = pipeline.Add(fileSrc)
70 if err != nil {
71 return err
72 }
73
74 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{
75 "name": fmt.Sprintf("TestBusHandler-qtdemux-%d", i),
76 })
77 if err != nil {
78 return err
79 }
80 err = pipeline.Add(demux)
81 if err != nil {
82 return err
83 }
84
85 appSink, err := gst.NewElementWithProperties("appsink", map[string]any{
86 "name": fmt.Sprintf("TestBusHandler-appsink-%d", i),
87 "sync": false,
88 })
89 if err != nil {
90 return err
91 }
92 err = pipeline.Add(appSink)
93 if err != nil {
94 return err
95 }
96
97 sink := app.SinkFromElement(appSink)
98 sink.SetCallbacks(&app.SinkCallbacks{
99 NewSampleFunc: WriterNewSample(ctx, nil),
100 })
101
102 return fmt.Errorf("test error")
103
104}