Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "testing"
7
8 "github.com/go-gst/go-gst/gst"
9 "github.com/go-gst/go-gst/gst/app"
10 "github.com/stretchr/testify/require"
11 "golang.org/x/sync/errgroup"
12 "stream.place/streamplace/pkg/log"
13)
14
15func TestBusHandlerCleanup(t *testing.T) {
16 withNoGSTLeaks(t, func() {
17
18 g, ctx := errgroup.WithContext(context.Background())
19 ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestBusHandler": 9}})
20 for i := range streamplaceTestCount {
21 g.Go(func() error {
22 err := testBusHandlerCleanupInner(ctx, i)
23 if err == nil {
24 return fmt.Errorf("expected error")
25 }
26 return nil
27 })
28 }
29 err := g.Wait()
30 require.NoError(t, err)
31 })
32}
33
34func testBusHandlerCleanupInner(ctx context.Context, i int) error {
35 ctx = log.WithLogValues(ctx, "func", "TestBusHandler")
36 ctx, cancel := context.WithCancel(ctx)
37 defer cancel()
38
39 pipeline, err := gst.NewPipeline(fmt.Sprintf("TestBusHandler-%d", i))
40 if err != nil {
41 return err
42 }
43
44 busDone := make(chan struct{})
45 go func() {
46 _ = HandleBusMessages(ctx, pipeline)
47 busDone <- struct{}{}
48 cancel()
49 }()
50
51 defer func() {
52 cancel()
53 <-busDone
54 err = pipeline.SetState(gst.StateNull)
55 if err != nil {
56 panic(fmt.Sprintf("failed to set state to null: %s", err))
57 }
58 }()
59
60 fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{
61 "location": getFixture("5sec.mp4"),
62 })
63 if err != nil {
64 return err
65 }
66 err = pipeline.Add(fileSrc)
67 if err != nil {
68 return err
69 }
70
71 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{
72 "name": fmt.Sprintf("TestBusHandler-qtdemux-%d", i),
73 })
74 if err != nil {
75 return err
76 }
77 err = pipeline.Add(demux)
78 if err != nil {
79 return err
80 }
81
82 appSink, err := gst.NewElementWithProperties("appsink", map[string]any{
83 "name": fmt.Sprintf("TestBusHandler-appsink-%d", i),
84 "sync": false,
85 })
86 if err != nil {
87 return err
88 }
89 err = pipeline.Add(appSink)
90 if err != nil {
91 return err
92 }
93
94 sink := app.SinkFromElement(appSink)
95 sink.SetCallbacks(&app.SinkCallbacks{
96 NewSampleFunc: WriterNewSample(ctx, nil),
97 })
98
99 return fmt.Errorf("test error")
100
101}