Live video on the AT Protocol
at next 101 lines 2.1 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 8 "github.com/go-gst/go-gst/gst" 9 "github.com/go-gst/go-gst/gst/app" 10 "github.com/stretchr/testify/require" 11 "golang.org/x/sync/errgroup" 12 "stream.place/streamplace/pkg/log" 13) 14 15func TestBusHandlerCleanup(t *testing.T) { 16 withNoGSTLeaks(t, func() { 17 18 g, ctx := errgroup.WithContext(context.Background()) 19 ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestBusHandler": 9}}) 20 for i := range streamplaceTestCount { 21 g.Go(func() error { 22 err := testBusHandlerCleanupInner(ctx, i) 23 if err == nil { 24 return fmt.Errorf("expected error") 25 } 26 return nil 27 }) 28 } 29 err := g.Wait() 30 require.NoError(t, err) 31 }) 32} 33 34func testBusHandlerCleanupInner(ctx context.Context, i int) error { 35 ctx = log.WithLogValues(ctx, "func", "TestBusHandler") 36 ctx, cancel := context.WithCancel(ctx) 37 defer cancel() 38 39 pipeline, err := gst.NewPipeline(fmt.Sprintf("TestBusHandler-%d", i)) 40 if err != nil { 41 return err 42 } 43 44 busDone := make(chan struct{}) 45 go func() { 46 _ = HandleBusMessages(ctx, pipeline) 47 busDone <- struct{}{} 48 cancel() 49 }() 50 51 defer func() { 52 cancel() 53 <-busDone 54 err = pipeline.SetState(gst.StateNull) 55 if err != nil { 56 panic(fmt.Sprintf("failed to set state to null: %s", err)) 57 } 58 }() 59 60 fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{ 61 "location": getFixture("5sec.mp4"), 62 }) 63 if err != nil { 64 return err 65 } 66 err = pipeline.Add(fileSrc) 67 if err != nil { 68 return err 69 } 70 71 demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{ 72 "name": fmt.Sprintf("TestBusHandler-qtdemux-%d", i), 73 }) 74 if err != nil { 75 return err 76 } 77 err = pipeline.Add(demux) 78 if err != nil { 79 return err 80 } 81 82 appSink, err := gst.NewElementWithProperties("appsink", map[string]any{ 83 "name": fmt.Sprintf("TestBusHandler-appsink-%d", i), 84 "sync": false, 85 }) 86 if err != nil { 87 return err 88 } 89 err = pipeline.Add(appSink) 90 if err != nil { 91 return err 92 } 93 94 sink := app.SinkFromElement(appSink) 95 sink.SetCallbacks(&app.SinkCallbacks{ 96 NewSampleFunc: WriterNewSample(ctx, nil), 97 }) 98 99 return fmt.Errorf("test error") 100 101}