Live video on the AT Protocol
at eli/github-skip-darwin 104 lines 2.2 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 8 "github.com/go-gst/go-gst/gst" 9 "github.com/go-gst/go-gst/gst/app" 10 "github.com/stretchr/testify/require" 11 "go.uber.org/goleak" 12 "golang.org/x/sync/errgroup" 13 "stream.place/streamplace/pkg/log" 14) 15 16func TestBusHandlerCleanup(t *testing.T) { 17 ignore := goleak.IgnoreCurrent() 18 defer goleak.VerifyNone(t, ignore) 19 before := getLeakCount(t) 20 defer checkGStreamerLeaks(t, before) 21 22 g, ctx := errgroup.WithContext(context.Background()) 23 ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestBusHandler": 9}}) 24 for i := range streamplaceTestCount { 25 g.Go(func() error { 26 err := testBusHandlerCleanupInner(ctx, i) 27 if err == nil { 28 return fmt.Errorf("expected error") 29 } 30 return nil 31 }) 32 } 33 err := g.Wait() 34 require.NoError(t, err) 35} 36 37func testBusHandlerCleanupInner(ctx context.Context, i int) error { 38 ctx = log.WithLogValues(ctx, "func", "TestBusHandler") 39 ctx, cancel := context.WithCancel(ctx) 40 defer cancel() 41 42 pipeline, err := gst.NewPipeline(fmt.Sprintf("TestBusHandler-%d", i)) 43 if err != nil { 44 return err 45 } 46 47 busDone := make(chan struct{}) 48 go func() { 49 _ = HandleBusMessages(ctx, pipeline) 50 busDone <- struct{}{} 51 cancel() 52 }() 53 54 defer func() { 55 cancel() 56 <-busDone 57 err = pipeline.SetState(gst.StateNull) 58 if err != nil { 59 panic(fmt.Sprintf("failed to set state to null: %s", err)) 60 } 61 }() 62 63 fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{ 64 "location": getFixture("5sec.mp4"), 65 }) 66 if err != nil { 67 return err 68 } 69 err = pipeline.Add(fileSrc) 70 if err != nil { 71 return err 72 } 73 74 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{ 75 "name": fmt.Sprintf("TestBusHandler-qtdemux-%d", i), 76 }) 77 if err != nil { 78 return err 79 } 80 err = pipeline.Add(demux) 81 if err != nil { 82 return err 83 } 84 85 appSink, err := gst.NewElementWithProperties("appsink", map[string]any{ 86 "name": fmt.Sprintf("TestBusHandler-appsink-%d", i), 87 "sync": false, 88 }) 89 if err != nil { 90 return err 91 } 92 err = pipeline.Add(appSink) 93 if err != nil { 94 return err 95 } 96 97 sink := app.SinkFromElement(appSink) 98 sink.SetCallbacks(&app.SinkCallbacks{ 99 NewSampleFunc: WriterNewSample(ctx, nil), 100 }) 101 102 return fmt.Errorf("test error") 103 104}