Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/database-resync 226 lines 5.5 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "testing" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "github.com/google/uuid" 15 "github.com/stretchr/testify/require" 16 "go.uber.org/goleak" 17 "golang.org/x/sync/errgroup" 18 "stream.place/streamplace/pkg/gstinit" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/media/segchanman" 21) 22 23func TestConcatBin(t *testing.T) { 24 gstinit.InitGST() 25 before := getLeakCount(t) 26 defer checkGStreamerLeaks(t, before) 27 ignore := goleak.IgnoreCurrent() 28 defer goleak.VerifyNone(t, ignore) 29 30 g, _ := errgroup.WithContext(context.Background()) 31 for range streamplaceTestCount { 32 g.Go(func() error { 33 return innerTestConcatBin(t) 34 }) 35 } 36 err := g.Wait() 37 require.NoError(t, err) 38} 39 40// This function remains in scope for the duration of a single users' playback 41func innerTestConcatBin(t *testing.T) error { 42 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "ConcatBin": 9, "SegDemuxBin": 9}}) 43 tag := os.Getenv("TEST_TAG") 44 uuid, _ := uuid.NewV7() 45 uuidStr := uuid.String() 46 if tag != "" { 47 ctx = log.WithLogValues(ctx, "tag", tag) 48 uuidStr = fmt.Sprintf("%s-%s", tag, uuidStr) 49 } 50 ctx = log.WithLogValues(ctx, "func", "ConcatBin", "uuid", uuidStr) 51 52 pipeline, err := gst.NewPipeline("TestConcatBin") 53 if err != nil { 54 return fmt.Errorf("failed to create pipeline: %w", err) 55 } 56 57 ctx, cancel := context.WithCancel(ctx) 58 59 errCh := make(chan error) 60 go func() { 61 err := HandleBusMessages(ctx, pipeline) 62 cancel() 63 errCh <- err 64 close(errCh) 65 }() 66 67 defer func() { 68 cancel() 69 err := <-errCh 70 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr)) 71 err = pipeline.BlockSetState(gst.StateNull) 72 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr)) 73 }() 74 75 filename := getFixture("sample-segment.mp4") 76 inputFile, err := os.Open(filename) 77 if err != nil { 78 return fmt.Errorf("failed to open fixture file: %w", err) 79 } 80 defer inputFile.Close() 81 82 bs, err := io.ReadAll(inputFile) 83 if err != nil { 84 return fmt.Errorf("failed to read fixture file: %w", err) 85 } 86 87 testSegs := []*segchanman.Seg{} 88 for range 5 { 89 testSegs = append(testSegs, &segchanman.Seg{ 90 Data: bs, 91 Filepath: filename, 92 }) 93 } 94 95 segCh := make(chan *segchanman.Seg) 96 go func() { 97 for _, seg := range testSegs { 98 segCh <- seg 99 } 100 close(segCh) 101 }() 102 103 concatBin, err := ConcatBin(ctx, segCh) 104 if err != nil { 105 return fmt.Errorf("failed to create concat bin: %w", err) 106 } 107 108 err = pipeline.Add(concatBin.Element) 109 if err != nil { 110 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 111 } 112 113 videoPad := concatBin.GetStaticPad("video_0") 114 if videoPad == nil { 115 return fmt.Errorf("video pad not found") 116 } 117 118 audioPad := concatBin.GetStaticPad("audio_0") 119 if audioPad == nil { 120 return fmt.Errorf("audio pad not found") 121 } 122 123 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 124 "name": "videoappsink", 125 "sync": false, 126 }) 127 if err != nil { 128 return fmt.Errorf("failed to create video appsink: %w", err) 129 } 130 131 err = pipeline.Add(videoAppSink) 132 if err != nil { 133 return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 134 } 135 136 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 137 if videoAppSinkPadSink == nil { 138 return fmt.Errorf("video appsink pad not found") 139 } 140 141 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 142 "name": "audioappsink", 143 "sync": false, 144 }) 145 if err != nil { 146 return fmt.Errorf("failed to create audio appsink: %w", err) 147 } 148 149 err = pipeline.Add(audioAppSink) 150 if err != nil { 151 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 152 } 153 154 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 155 if audioAppSinkPadSink == nil { 156 return fmt.Errorf("audio appsink pad not found") 157 } 158 159 ok := videoPad.Link(videoAppSinkPadSink) 160 if ok != gst.PadLinkOK { 161 return fmt.Errorf("failed to link video pad: %v", ok) 162 } 163 164 ok = audioPad.Link(audioAppSinkPadSink) 165 if ok != gst.PadLinkOK { 166 return fmt.Errorf("failed to link audio pad: %v", ok) 167 } 168 169 videoBuf := bytes.Buffer{} 170 audioBuf := bytes.Buffer{} 171 172 videoappsink := app.SinkFromElement(videoAppSink) 173 videoappsink.SetCallbacks(&app.SinkCallbacks{ 174 NewSampleFunc: WriterNewSample(ctx, &videoBuf), 175 }) 176 177 audioappsink := app.SinkFromElement(audioAppSink) 178 audioappsink.SetCallbacks(&app.SinkCallbacks{ 179 NewSampleFunc: WriterNewSample(ctx, &audioBuf), 180 }) 181 182 // Start the pipeline 183 err = pipeline.SetState(gst.StatePlaying) 184 if err != nil { 185 return fmt.Errorf("failed to set pipeline to playing state: %w", err) 186 } 187 188 // Start a goroutine to print buffer sizes 189 go func() { 190 for { 191 select { 192 case <-ctx.Done(): 193 return 194 case <-time.After(1 * time.Second): 195 log.Debug(ctx, "buffer sizes", 196 "videoBuf", videoBuf.Len(), 197 "audioBuf", audioBuf.Len()) 198 } 199 } 200 }() 201 202 <-ctx.Done() 203 204 time.Sleep(5 * time.Second) 205 206 padIdleCh := make(chan struct{}) 207 208 padIdle := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 209 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 210 go func() { 211 padIdleCh <- struct{}{} 212 }() 213 return gst.PadProbeRemove 214 } 215 216 videoAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle) 217 audioAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle) 218 219 <-padIdleCh 220 <-padIdleCh 221 222 require.Equal(t, 4936240, videoBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr)) 223 require.Equal(t, 32200, audioBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr)) 224 225 return <-errCh 226}