Live video on the AT Protocol
at eli/node-22 103 lines 2.2 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 8 "github.com/go-gst/go-gst/gst" 9 "github.com/go-gst/go-gst/gst/app" 10 "github.com/stretchr/testify/require" 11 "go.uber.org/goleak" 12 "golang.org/x/sync/errgroup" 13 "stream.place/streamplace/pkg/log" 14) 15 16func TestBusHandlerCleanup(t *testing.T) { 17 ignore := goleak.IgnoreCurrent() 18 defer goleak.VerifyNone(t, ignore) 19 before := getLeakCount(t) 20 defer checkGStreamerLeaks(t, before) 21 22 g, ctx := errgroup.WithContext(context.Background()) 23 ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestBusHandler": 9}}) 24 for i := 0; i < streamplaceTestCount; i++ { 25 g.Go(func() error { 26 err := testBusHandlerCleanupInner(ctx, i) 27 if err == nil { 28 return fmt.Errorf("expected error") 29 } 30 return nil 31 }) 32 } 33 err := g.Wait() 34 require.NoError(t, err) 35} 36 37func testBusHandlerCleanupInner(ctx context.Context, i int) error { 38 ctx = log.WithLogValues(ctx, "func", "TestBusHandler") 39 ctx, cancel := context.WithCancel(ctx) 40 41 pipeline, err := gst.NewPipeline(fmt.Sprintf("TestBusHandler-%d", i)) 42 if err != nil { 43 return err 44 } 45 46 busDone := make(chan struct{}) 47 go func() { 48 _ = HandleBusMessages(ctx, pipeline) 49 busDone <- struct{}{} 50 cancel() 51 }() 52 53 defer func() { 54 cancel() 55 <-busDone 56 err = pipeline.SetState(gst.StateNull) 57 if err != nil { 58 panic(fmt.Sprintf("failed to set state to null: %s", err)) 59 } 60 }() 61 62 fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{ 63 "location": getFixture("5sec.mp4"), 64 }) 65 if err != nil { 66 return err 67 } 68 err = pipeline.Add(fileSrc) 69 if err != nil { 70 return err 71 } 72 73 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{ 74 "name": fmt.Sprintf("TestBusHandler-qtdemux-%d", i), 75 }) 76 if err != nil { 77 return err 78 } 79 err = pipeline.Add(demux) 80 if err != nil { 81 return err 82 } 83 84 appSink, err := gst.NewElementWithProperties("appsink", map[string]any{ 85 "name": fmt.Sprintf("TestBusHandler-appsink-%d", i), 86 "sync": false, 87 }) 88 if err != nil { 89 return err 90 } 91 err = pipeline.Add(appSink) 92 if err != nil { 93 return err 94 } 95 96 sink := app.SinkFromElement(appSink) 97 sink.SetCallbacks(&app.SinkCallbacks{ 98 NewSampleFunc: WriterNewSample(ctx, nil), 99 }) 100 101 return fmt.Errorf("test error") 102 103}