Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.3 221 lines 5.3 kB view raw
1package media 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "testing" 10 "time" 11 12 "github.com/go-gst/go-gst/gst" 13 "github.com/go-gst/go-gst/gst/app" 14 "github.com/google/uuid" 15 "github.com/stretchr/testify/require" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/bus" 18 "stream.place/streamplace/pkg/log" 19) 20 21func TestConcatBin(t *testing.T) { 22 withNoGSTLeaks(t, func() { 23 24 g, _ := errgroup.WithContext(context.Background()) 25 for range streamplaceTestCount { 26 g.Go(func() error { 27 return innerTestConcatBin(t) 28 }) 29 } 30 err := g.Wait() 31 require.NoError(t, err) 32 }) 33} 34 35// This function remains in scope for the duration of a single users' playback 36func innerTestConcatBin(t *testing.T) error { 37 ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "ConcatBin": 9, "SegDemuxBin": 9}}) 38 tag := os.Getenv("TEST_TAG") 39 uuid, _ := uuid.NewV7() 40 uuidStr := uuid.String() 41 if tag != "" { 42 ctx = log.WithLogValues(ctx, "tag", tag) 43 uuidStr = fmt.Sprintf("%s-%s", tag, uuidStr) 44 } 45 ctx = log.WithLogValues(ctx, "func", "ConcatBin", "uuid", uuidStr) 46 47 pipeline, err := gst.NewPipeline("TestConcatBin") 48 if err != nil { 49 return fmt.Errorf("failed to create pipeline: %w", err) 50 } 51 52 ctx, cancel := context.WithCancel(ctx) 53 54 errCh := make(chan error) 55 go func() { 56 err := HandleBusMessages(ctx, pipeline) 57 cancel() 58 errCh <- err 59 close(errCh) 60 }() 61 62 defer func() { 63 cancel() 64 err := <-errCh 65 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr)) 66 err = pipeline.BlockSetState(gst.StateNull) 67 require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr)) 68 }() 69 70 filename := getFixture("sample-segment.mp4") 71 inputFile, err := os.Open(filename) 72 if err != nil { 73 return fmt.Errorf("failed to open fixture file: %w", err) 74 } 75 defer inputFile.Close() 76 77 bs, err := io.ReadAll(inputFile) 78 if err != nil { 79 return fmt.Errorf("failed to read fixture file: %w", err) 80 } 81 82 testSegs := []*bus.Seg{} 83 for range 5 { 84 testSegs = append(testSegs, &bus.Seg{ 85 Data: bs, 86 Filepath: filename, 87 }) 88 } 89 90 segCh := make(chan *bus.Seg) 91 go func() { 92 for _, seg := range testSegs { 93 segCh <- seg 94 } 95 close(segCh) 96 }() 97 98 concatBin, err := ConcatBin(ctx, segCh, true) 99 if err != nil { 100 return fmt.Errorf("failed to create concat bin: %w", err) 101 } 102 103 err = pipeline.Add(concatBin.Element) 104 if err != nil { 105 return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 106 } 107 108 videoPad := concatBin.GetStaticPad("video_0") 109 if videoPad == nil { 110 return fmt.Errorf("video pad not found") 111 } 112 113 audioPad := concatBin.GetStaticPad("audio_0") 114 if audioPad == nil { 115 return fmt.Errorf("audio pad not found") 116 } 117 118 videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 119 "name": "videoappsink", 120 "sync": false, 121 }) 122 if err != nil { 123 return fmt.Errorf("failed to create video appsink: %w", err) 124 } 125 126 err = pipeline.Add(videoAppSink) 127 if err != nil { 128 return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 129 } 130 131 videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 132 if videoAppSinkPadSink == nil { 133 return fmt.Errorf("video appsink pad not found") 134 } 135 136 audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 137 "name": "audioappsink", 138 "sync": false, 139 }) 140 if err != nil { 141 return fmt.Errorf("failed to create audio appsink: %w", err) 142 } 143 144 err = pipeline.Add(audioAppSink) 145 if err != nil { 146 return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 147 } 148 149 audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 150 if audioAppSinkPadSink == nil { 151 return fmt.Errorf("audio appsink pad not found") 152 } 153 154 ok := videoPad.Link(videoAppSinkPadSink) 155 if ok != gst.PadLinkOK { 156 return fmt.Errorf("failed to link video pad: %v", ok) 157 } 158 159 ok = audioPad.Link(audioAppSinkPadSink) 160 if ok != gst.PadLinkOK { 161 return fmt.Errorf("failed to link audio pad: %v", ok) 162 } 163 164 videoBuf := bytes.Buffer{} 165 audioBuf := bytes.Buffer{} 166 167 videoappsink := app.SinkFromElement(videoAppSink) 168 videoappsink.SetCallbacks(&app.SinkCallbacks{ 169 NewSampleFunc: WriterNewSample(ctx, &videoBuf), 170 }) 171 172 audioappsink := app.SinkFromElement(audioAppSink) 173 audioappsink.SetCallbacks(&app.SinkCallbacks{ 174 NewSampleFunc: WriterNewSample(ctx, &audioBuf), 175 }) 176 177 // Start the pipeline 178 err = pipeline.SetState(gst.StatePlaying) 179 if err != nil { 180 return fmt.Errorf("failed to set pipeline to playing state: %w", err) 181 } 182 183 // Start a goroutine to print buffer sizes 184 go func() { 185 for { 186 select { 187 case <-ctx.Done(): 188 return 189 case <-time.After(1 * time.Second): 190 log.Debug(ctx, "buffer sizes", 191 "videoBuf", videoBuf.Len(), 192 "audioBuf", audioBuf.Len()) 193 } 194 } 195 }() 196 197 <-ctx.Done() 198 199 time.Sleep(5 * time.Second) 200 201 padIdleCh := make(chan struct{}) 202 203 padIdle := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 204 log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 205 go func() { 206 padIdleCh <- struct{}{} 207 }() 208 return gst.PadProbeRemove 209 } 210 211 videoAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle) 212 audioAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle) 213 214 <-padIdleCh 215 <-padIdleCh 216 217 require.Equal(t, 4936455, videoBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr)) 218 require.Equal(t, 32200, audioBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr)) 219 220 return <-errCh 221}