Live video on the AT Protocol
at eli/fix-postgres-locking 221 lines 5.3 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "testing" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "github.com/google/uuid" 15 "github.com/stretchr/testify/require" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/bus" 18 "stream.place/streamplace/pkg/log" 19) 20 21func TestConcatBin(t *testing.T) { 22 withNoGSTLeaks(t, func() { 23 24 g, _ := errgroup.WithContext(context.Background()) 25 for range streamplaceTestCount { 26 g.Go(func() error { 27 return innerTestConcatBin(t) 28 }) 29 } 30 err := g.Wait() 31 require.NoError(t, err) 32 }) 33} 34 35// This function remains in scope for the duration of a single users' playback 36func innerTestConcatBin(t *testing.T) error { 37 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "ConcatBin": 9, "SegDemuxBin": 9}}) 38 tag := os.Getenv("TEST_TAG") 39 uuid, _ := uuid.NewV7() 40 uuidStr := uuid.String() 41 if tag != "" { 42 ctx = log.WithLogValues(ctx, "tag", tag) 43 uuidStr = fmt.Sprintf("%s-%s", tag, uuidStr) 44 } 45 ctx = log.WithLogValues(ctx, "func", "ConcatBin", "uuid", uuidStr) 46 47 pipeline, err := gst.NewPipeline("TestConcatBin") 48 if err != nil { 49 return fmt.Errorf("failed to create pipeline: %w", err) 50 } 51 52 ctx, cancel := context.WithCancel(ctx) 53 54 errCh := make(chan error) 55 go func() { 56 err := HandleBusMessages(ctx, pipeline) 57 cancel() 58 errCh <- err 59 close(errCh) 60 }() 61 62 defer func() { 63 cancel() 64 err := <-errCh 65 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr)) 66 err = pipeline.BlockSetState(gst.StateNull) 67 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr)) 68 }() 69 70 filename := getFixture("sample-segment.mp4") 71 inputFile, err := os.Open(filename) 72 if err != nil { 73 return fmt.Errorf("failed to open fixture file: %w", err) 74 } 75 defer inputFile.Close() 76 77 bs, err := io.ReadAll(inputFile) 78 if err != nil { 79 return fmt.Errorf("failed to read fixture file: %w", err) 80 } 81 82 testSegs := []*bus.Seg{} 83 for range 5 { 84 testSegs = append(testSegs, &bus.Seg{ 85 Data: bs, 86 Filepath: filename, 87 }) 88 } 89 90 segCh := make(chan *bus.Seg) 91 go func() { 92 for _, seg := range testSegs { 93 segCh <- seg 94 } 95 close(segCh) 96 }() 97 98 concatBin, err := ConcatBin(ctx, segCh) 99 if err != nil { 100 return fmt.Errorf("failed to create concat bin: %w", err) 101 } 102 103 err = pipeline.Add(concatBin.Element) 104 if err != nil { 105 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 106 } 107 108 videoPad := concatBin.GetStaticPad("video_0") 109 if videoPad == nil { 110 return fmt.Errorf("video pad not found") 111 } 112 113 audioPad := concatBin.GetStaticPad("audio_0") 114 if audioPad == nil { 115 return fmt.Errorf("audio pad not found") 116 } 117 118 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 119 "name": "videoappsink", 120 "sync": false, 121 }) 122 if err != nil { 123 return fmt.Errorf("failed to create video appsink: %w", err) 124 } 125 126 err = pipeline.Add(videoAppSink) 127 if err != nil { 128 return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 129 } 130 131 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 132 if videoAppSinkPadSink == nil { 133 return fmt.Errorf("video appsink pad not found") 134 } 135 136 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 137 "name": "audioappsink", 138 "sync": false, 139 }) 140 if err != nil { 141 return fmt.Errorf("failed to create audio appsink: %w", err) 142 } 143 144 err = pipeline.Add(audioAppSink) 145 if err != nil { 146 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 147 } 148 149 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 150 if audioAppSinkPadSink == nil { 151 return fmt.Errorf("audio appsink pad not found") 152 } 153 154 ok := videoPad.Link(videoAppSinkPadSink) 155 if ok != gst.PadLinkOK { 156 return fmt.Errorf("failed to link video pad: %v", ok) 157 } 158 159 ok = audioPad.Link(audioAppSinkPadSink) 160 if ok != gst.PadLinkOK { 161 return fmt.Errorf("failed to link audio pad: %v", ok) 162 } 163 164 videoBuf := bytes.Buffer{} 165 audioBuf := bytes.Buffer{} 166 167 videoappsink := app.SinkFromElement(videoAppSink) 168 videoappsink.SetCallbacks(&app.SinkCallbacks{ 169 NewSampleFunc: WriterNewSample(ctx, &videoBuf), 170 }) 171 172 audioappsink := app.SinkFromElement(audioAppSink) 173 audioappsink.SetCallbacks(&app.SinkCallbacks{ 174 NewSampleFunc: WriterNewSample(ctx, &audioBuf), 175 }) 176 177 // Start the pipeline 178 err = pipeline.SetState(gst.StatePlaying) 179 if err != nil { 180 return fmt.Errorf("failed to set pipeline to playing state: %w", err) 181 } 182 183 // Start a goroutine to print buffer sizes 184 go func() { 185 for { 186 select { 187 case <-ctx.Done(): 188 return 189 case <-time.After(1 * time.Second): 190 log.Debug(ctx, "buffer sizes", 191 "videoBuf", videoBuf.Len(), 192 "audioBuf", audioBuf.Len()) 193 } 194 } 195 }() 196 197 <-ctx.Done() 198 199 time.Sleep(5 * time.Second) 200 201 padIdleCh := make(chan struct{}) 202 203 padIdle := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 204 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 205 go func() { 206 padIdleCh <- struct{}{} 207 }() 208 return gst.PadProbeRemove 209 } 210 211 videoAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle) 212 audioAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle) 213 214 <-padIdleCh 215 <-padIdleCh 216 217 require.Equal(t, 4936240, videoBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr)) 218 require.Equal(t, 32200, audioBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr)) 219 220 return <-errCh 221}