Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/standalone-oproxy 124 lines 2.7 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "strconv" 10 "testing" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "github.com/stretchr/testify/require" 15 "go.uber.org/goleak" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/log" 18) 19 20var streamplaceTestCount = 50 21 22func init() { 23 testRunsStr := os.Getenv("STREAMPLACE_TEST_COUNT") 24 if testRunsStr != "" { 25 var err error 26 streamplaceTestCount, err = strconv.Atoi(testRunsStr) 27 if err != nil { 28 panic(fmt.Sprintf("STREAMPLACE_TEST_COUNT is not a number: %s", testRunsStr)) 29 } 30 } 31} 32 33func TestWriterNewSample(t *testing.T) { 34 ignore := goleak.IgnoreCurrent() 35 defer goleak.VerifyNone(t, ignore) 36 before := getLeakCount(t) 37 defer checkGStreamerLeaks(t, before) 38 filePath := getFixture("5sec.mp4") 39 fileInfo, err := os.Stat(filePath) 40 require.NoError(t, err) 41 fileSize := fileInfo.Size() 42 t.Logf("Test file size: %d bytes", fileSize) 43 g, ctx := errgroup.WithContext(context.Background()) 44 // ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestWriterNewSample": 9}}) 45 for i := 0; i < streamplaceTestCount; i++ { 46 g.Go(func() error { 47 bs := bytes.Buffer{} 48 err := writerNewSampleInner(ctx, i, &bs) 49 if err != nil { 50 return err 51 } 52 if bs.Len() != int(fileSize) { 53 return fmt.Errorf("expected %d bytes, got %d", fileSize, bs.Len()) 54 } 55 return nil 56 }) 57 } 58 err = g.Wait() 59 require.NoError(t, err) 60} 61 62func writerNewSampleInner(ctx context.Context, i int, w io.Writer) error { 63 ctx = log.WithLogValues(ctx, "func", "TestWriterNewSample") 64 ctx, cancel := context.WithCancel(ctx) 65 defer cancel() 66 67 pipeline, err := gst.NewPipeline(fmt.Sprintf("TestWriterNewSample-%d", i)) 68 if err != nil { 69 return err 70 } 71 72 fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{ 73 "location": getFixture("5sec.mp4"), 74 }) 75 if err != nil { 76 return err 77 } 78 err = pipeline.Add(fileSrc) 79 if err != nil { 80 return err 81 } 82 83 var busErr error 84 go func() { 85 busErr = HandleBusMessages(ctx, pipeline) 86 cancel() 87 }() 88 89 appSink, err := gst.NewElementWithProperties("appsink", map[string]any{ 90 "name": fmt.Sprintf("TestWriterNewSample-appsink-%d", i), 91 "sync": false, 92 }) 93 if err != nil { 94 return err 95 } 96 err = pipeline.Add(appSink) 97 if err != nil { 98 return err 99 } 100 101 sink := app.SinkFromElement(appSink) 102 sink.SetCallbacks(&app.SinkCallbacks{ 103 NewSampleFunc: WriterNewSample(ctx, w), 104 }) 105 106 err = fileSrc.Link(appSink) 107 if err != nil { 108 return err 109 } 110 111 err = pipeline.SetState(gst.StatePlaying) 112 if err != nil { 113 return err 114 } 115 116 <-ctx.Done() 117 118 err = pipeline.SetState(gst.StateNull) 119 if err != nil { 120 return err 121 } 122 123 return busErr 124}