Live video on the AT Protocol

segment_conv: add timeout, failing test case

+39 -25
+12
pkg/media/segment_conv.go
··· 5 "fmt" 6 "io" 7 "strings" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" ··· 106 err = pipeline.SetState(gst.StateNull) 107 if err != nil { 108 log.Error(ctx, "failed to set pipeline state to null", "error", err) 109 } 110 }() 111
··· 5 "fmt" 6 "io" 7 "strings" 8 + "time" 9 10 "github.com/go-gst/go-gst/gst" 11 "github.com/go-gst/go-gst/gst/app" ··· 107 err = pipeline.SetState(gst.StateNull) 108 if err != nil { 109 log.Error(ctx, "failed to set pipeline state to null", "error", err) 110 + } 111 + }() 112 + 113 + go func() { 114 + select { 115 + case <-ctx.Done(): 116 + return 117 + case <-time.After(time.Second * 10): 118 + log.Debug(ctx, "pipeline is taking too long to start, cancelling") 119 + err := fmt.Errorf("pipeline is taking too long to start, cancelling") 120 + pipeline.Error(err.Error(), err) 121 } 122 }() 123
+27 -25
pkg/media/segment_conv_test.go
··· 19 before := getLeakCount(t) 20 defer checkGStreamerLeaks(t, before) 21 22 - // Open input file 23 - inputFile, err := os.Open(getFixture("sample-segment.mp4")) 24 - require.NoError(t, err) 25 - defer inputFile.Close() 26 27 - // Create a buffer for output 28 - buf := bytes.Buffer{} 29 30 - bs, err := io.ReadAll(inputFile) 31 - require.NoError(t, err) 32 - // Create temporary output file 33 34 - g, _ := errgroup.WithContext(context.Background()) 35 - for i := 0; i < streamplaceTestCount; i++ { 36 - g.Go(func() error { 37 - _, err := MP4ToMPEGTS(context.Background(), bytes.NewReader(bs), &buf) 38 - return err 39 - }) 40 - } 41 - err = g.Wait() 42 - require.NoError(t, err) 43 - // Convert MPEG-TS to MP4 44 45 - // Convert MP4 to MPEG-TS 46 - dur, err := MP4ToMPEGTS(context.Background(), bytes.NewReader(bs), &buf) 47 - require.NoError(t, err) 48 - require.Greater(t, dur, int64(0), "Duration should be greater than 0") 49 50 - // Verify buffer has content 51 - require.Greater(t, buf.Len(), 0, "Output buffer should not be empty") 52 } 53 54 func TestMP4ToMPEGTSInvalid(t *testing.T) {
··· 19 before := getLeakCount(t) 20 defer checkGStreamerLeaks(t, before) 21 22 + for _, file := range []string{"sample-segment.mp4", "short-video.mp4"} { 23 + // Open input file 24 + inputFile, err := os.Open(getFixture(file)) 25 + require.NoError(t, err) 26 + defer inputFile.Close() 27 28 + // Create a buffer for output 29 + buf := bytes.Buffer{} 30 31 + bs, err := io.ReadAll(inputFile) 32 + require.NoError(t, err) 33 + // Create temporary output file 34 35 + g, _ := errgroup.WithContext(context.Background()) 36 + for i := 0; i < streamplaceTestCount; i++ { 37 + g.Go(func() error { 38 + _, err := MP4ToMPEGTS(context.Background(), bytes.NewReader(bs), &buf) 39 + return err 40 + }) 41 + } 42 + err = g.Wait() 43 + require.NoError(t, err) 44 + // Convert MPEG-TS to MP4 45 46 + // Convert MP4 to MPEG-TS 47 + dur, err := MP4ToMPEGTS(context.Background(), bytes.NewReader(bs), &buf) 48 + require.NoError(t, err) 49 + require.Greater(t, dur, int64(0), "Duration should be greater than 0") 50 51 + // Verify buffer has content 52 + require.Greater(t, buf.Len(), 0, "Output buffer should not be empty") 53 + } 54 } 55 56 func TestMP4ToMPEGTSInvalid(t *testing.T) {
test/fixtures/short-video.mp4

This is a binary file and will not be displayed.