Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

improvements to gstreamer memory handling (#83)

* gstreamer: move SetState(null) to end of functions

* media: split out webrtc into two files

* .gitignore: oom

* media: polish to avoid memory leaks

* media: do more memory cleanup

* media: implement proper gstreamer leak testing

* thumbnail: add queue to remove leak

* thumbnail: polish up leak tester

* media: move leak test to its own file

* leaks: fix debug string

* concat: well it's not always 6 but it's better!

* gstreamer: misc memory improvements

* test: fix gstreamer leaks count

* gstreamer: try without .clear()

* build: statically link libgstcoretracers.a

* build: flip off mediaparser leak detector

* build: make test on linux

Changelog: feature

authored by

Eli Mallon and committed by
GitHub
7d01e30f 39408841

+1234 -622
+1 -1
.github/workflows/build.yaml
··· 21 21 - name: make node 22 22 run: | 23 23 sudo apt install -y podman 24 - make in-container IN_CONTAINER_CMD="make" 24 + make in-container IN_CONTAINER_CMD="make && make test && make check" 25 25 26 26 linux-arm64: 27 27 name: linux-arm64
+1
.gitignore
··· 17 17 *.log 18 18 *.heap 19 19 /api 20 + oom
+2 -1
Makefile
··· 252 252 -D "gst-plugins-ugly:gpl=enabled" \ 253 253 -D "x264:asm=enabled" \ 254 254 -D "gstreamer-full:gst-full=enabled" \ 255 - -D "gstreamer-full:gst-full-plugins=libgstopusparse.a;libgstcodectimestamper.a;libgstrtp.a;libgstaudioresample.a;libgstlibav.a;libgstmatroska.a;libgstmultifile.a;libgstjpeg.a;libgstaudiotestsrc.a;libgstaudioconvert.a;libgstaudioparsers.a;libgstfdkaac.a;libgstisomp4.a;libgstapp.a;libgstvideoconvertscale.a;libgstvideobox.a;libgstvideorate.a;libgstpng.a;libgstcompositor.a;libgstaudiorate.a;libgstx264.a;libgstopus.a;libgstvideotestsrc.a;libgstvideoparsersbad.a;libgstaudioparsers.a;libgstmpegtsmux.a;libgstmpegtsdemux.a;libgstplayback.a;libgsttypefindfunctions.a" \ 255 + -D "gstreamer-full:gst-full-plugins=libgstopusparse.a;libgstcodectimestamper.a;libgstrtp.a;libgstaudioresample.a;libgstlibav.a;libgstmatroska.a;libgstmultifile.a;libgstjpeg.a;libgstaudiotestsrc.a;libgstaudioconvert.a;libgstaudioparsers.a;libgstfdkaac.a;libgstisomp4.a;libgstapp.a;libgstvideoconvertscale.a;libgstvideobox.a;libgstvideorate.a;libgstpng.a;libgstcompositor.a;libgstaudiorate.a;libgstx264.a;libgstopus.a;libgstvideotestsrc.a;libgstvideoparsersbad.a;libgstaudioparsers.a;libgstmpegtsmux.a;libgstmpegtsdemux.a;libgstplayback.a;libgsttypefindfunctions.a;libgstcoretracers.a" \ 256 256 -D "gstreamer-full:gst-full-libraries=gstreamer-controller-1.0,gstreamer-plugins-base-1.0,gstreamer-pbutils-1.0" \ 257 257 -D "gstreamer-full:gst-full-elements=coreelements:concat,filesrc,filesink,queue,queue2,multiqueue,typefind,tee,capsfilter,fakesink,identity" \ 258 258 -D "gstreamer-full:bad=enabled" \ ··· 263 263 -D "gstreamer-full:gst-full-typefind-functions=" \ 264 264 -D "gstreamer-full:glib_assert=false" \ 265 265 -D "gstreamer:glib_assert=false" \ 266 + -D "gstreamer:coretracers=enabled" \ 266 267 -D "gst-plugins-good:glib_assert=false" \ 267 268 -D "gst-plugins-bad:glib_assert=false" \ 268 269 -D "gst-plugins-base:glib_assert=false" \
+1
go.mod
··· 93 93 github.com/Microsoft/go-winio v0.6.2 // indirect 94 94 github.com/ProtonMail/go-crypto v1.0.0 // indirect 95 95 github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect 96 + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect 96 97 github.com/agnivade/levenshtein v1.2.0 // indirect 97 98 github.com/beorn7/perks v1.0.1 // indirect 98 99 github.com/bits-and-blooms/bitset v1.10.0 // indirect
+2
go.sum
··· 45 45 github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= 46 46 github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= 47 47 github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= 48 + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= 49 + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= 48 50 github.com/agnivade/levenshtein v1.2.0 h1:U9L4IOT0Y3i0TIlUIDJ7rVUziKi/zPbrJGaFrtYH3SY= 49 51 github.com/agnivade/levenshtein v1.2.0/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= 50 52 github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM=
+8 -1
pkg/cmd/streamplace.go
··· 14 14 "strconv" 15 15 "strings" 16 16 "syscall" 17 + "time" 17 18 18 19 "golang.org/x/term" 19 20 "stream.place/streamplace/pkg/aqhttp" ··· 144 145 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 145 146 fs.Bool("insecure", false, "DEPRECATED, does nothing.") 146 147 fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable") 148 + fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 147 149 148 150 version := fs.Bool("version", false, "print version and exit") 149 151 ··· 386 388 387 389 if cli.WHIPTest != "" { 388 390 group.Go(func() error { 389 - return WHIP(strings.Split(cli.WHIPTest, " ")) 391 + err := WHIP(strings.Split(cli.WHIPTest, " ")) 392 + log.Warn(ctx, "WHIP test complete, sleeping for 3 seconds and shutting down gstreamer") 393 + time.Sleep(time.Second * 3) 394 + // gst.Deinit() 395 + log.Warn(ctx, "gst deinit complete, exiting") 396 + return err 390 397 }) 391 398 } 392 399
+25 -8
pkg/cmd/whip.go
··· 165 165 } 166 166 } 167 167 }) 168 + go func() { 169 + <-ctx.Done() 170 + if conn.peerConnection != nil { 171 + conn.peerConnection.Close() 172 + } 173 + }() 168 174 return nil 169 175 }) 170 176 } ··· 179 185 defer ticker.Stop() 180 186 181 187 for { 182 - <-ticker.C 183 - for i, duration := range accumulators { 184 - trackType := "video" 185 - if i == 1 { 186 - trackType = "audio" 188 + select { 189 + case <-ctx.Done(): 190 + return 191 + case <-ticker.C: 192 + for i, duration := range accumulators { 193 + trackType := "video" 194 + if i == 1 { 195 + trackType = "audio" 196 + } 197 + target := startTime.Add(time.Duration(accumulators[i])) 198 + diff := time.Since(target) 199 + log.Debug(ctx, "elapsed duration", "track", trackType, "duration", duration, "diff", diff) 187 200 } 188 - target := startTime.Add(time.Duration(accumulators[i])) 189 - diff := time.Since(target) 190 - log.Debug(ctx, "elapsed duration", "track", trackType, "duration", duration, "diff", diff) 191 201 } 192 202 } 193 203 }() ··· 278 288 return err 279 289 } 280 290 } 291 + 292 + <-ctx.Done() 293 + err = pipeline.BlockSetState(gst.StateNull) 294 + if err != nil { 295 + return err 296 + } 297 + 281 298 select { 282 299 case err := <-errCh: 283 300 return err
+1
pkg/config/config.go
··· 86 86 Color string 87 87 LivepeerGatewayURL string 88 88 WHIPTest string 89 + Thumbnail bool 89 90 90 91 dataDirFlags []*string 91 92 }
+9 -7
pkg/director/stream_session.go
··· 120 120 121 121 ss.bus.Publish(spseg.Creator, spseg) 122 122 123 - go func() { 124 - err := ss.Thumbnail(ctx, spseg.Creator, not) 125 - if err != nil { 126 - log.Error(ctx, "could not create thumbnail", "error", err) 127 - } 128 - }() 123 + if ss.cli.Thumbnail { 124 + go func() { 125 + err := ss.Thumbnail(ctx, spseg.Creator, not) 126 + if err != nil { 127 + log.Error(ctx, "could not create thumbnail", "error", err) 128 + } 129 + }() 130 + } 129 131 130 132 if ss.cli.LivepeerGatewayURL != "" { 131 133 go func() { ··· 167 169 return err 168 170 } 169 171 defer fd.Close() 170 - err = ss.mm.Thumbnail(ctx, r, fd) 172 + err = media.Thumbnail(ctx, r, fd) 171 173 if err != nil { 172 174 return err 173 175 }
+57 -7
pkg/media/concat.go
··· 53 53 return nil, nil, fmt.Errorf("failed to get input queue audio src pad") 54 54 } 55 55 56 + go func() { 57 + <-ctx.Done() 58 + inputQueue.SetState(gst.StateNull) 59 + inputQueue = nil 60 + inputQueuePadVideoSink = nil 61 + inputQueuePadVideoSrc = nil 62 + inputQueuePadAudioSink = nil 63 + inputQueuePadAudioSrc = nil 64 + }() 65 + 56 66 // streamsynchronizer 57 67 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{}) 58 68 if err != nil { 59 69 return nil, nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 60 70 } 71 + go func() { 72 + <-ctx.Done() 73 + streamsynchronizer.SetState(gst.StateNull) 74 + streamsynchronizer = nil 75 + }() 61 76 err = pipeline.Add(streamsynchronizer) 62 77 if err != nil { 63 78 return nil, nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) ··· 80 95 } 81 96 82 97 // output multiqueue 83 - outputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{}) 98 + outputQueue, err := gst.NewElementWithProperties("multiqueue", map[string]any{ 99 + "name": "concat-output-queue", 100 + }) 84 101 if err != nil { 85 102 return nil, nil, fmt.Errorf("failed to create multiqueue element: %w", err) 86 103 } ··· 96 113 if outputQueuePadAudioSink == nil { 97 114 return nil, nil, fmt.Errorf("failed to get output queue audio sink pad") 98 115 } 116 + go func() { 117 + <-ctx.Done() 118 + outputQueue.SetState(gst.StateNull) 119 + outputQueue = nil 120 + outputQueuePadVideoSink = nil 121 + outputQueuePadAudioSink = nil 122 + }() 99 123 100 124 // linking 101 125 ··· 136 160 log.Debug(ctx, "got segment", "file", file.Filepath) 137 161 allFiles <- file.Data 138 162 if len(file.Data) == 0 { 139 - log.Warn(ctx, "no more segments") 163 + log.Warn(ctx, "no more segments, stopping segment reader") 140 164 return 141 165 } 142 166 } 143 167 } 144 168 }() 145 169 170 + segCount := 0 171 + 146 172 // nextFile is the primary loop that pops off a file, creates new demuxer elements for it, 147 173 // and pushes into the pipeline 148 174 var nextFile func() 149 175 nextFile = func() { 176 + mySegCount := segCount 177 + segCount += 1 178 + segDone := make(chan struct{}) 179 + log.Debug(ctx, "moving to next file", "segCount", mySegCount) 150 180 pr, pw := io.Pipe() 151 181 go func() { 152 182 select { ··· 156 186 return 157 187 case bs := <-allFiles: 158 188 if len(bs) == 0 { 159 - log.Warn(ctx, "no more segments") 189 + log.Warn(ctx, "no more segments, ending stream") 190 + pr.Close() 191 + pw.Close() 160 192 cancel() 161 193 return 162 194 } ··· 170 202 } 171 203 }() 172 204 173 - demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{}) 205 + demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{ 206 + "name": fmt.Sprintf("concat-demux-%d", mySegCount), 207 + }) 174 208 if err != nil { 175 209 log.Error(ctx, "failed to create demux element", "error", err) 176 210 cancel() ··· 228 262 if count == 0 { 229 263 // don't keep going if our context is done 230 264 if ctx.Err() == nil { 231 - nextFile() 265 + go nextFile() 266 + segDone <- struct{}{} 232 267 } 268 + } else { 269 + log.Debug(ctx, "demux has more pads, waiting for them to close") 233 270 } 234 271 return gst.PadProbeRemove 235 272 }) ··· 242 279 } 243 280 244 281 appsrc, err := gst.NewElementWithProperties("appsrc", map[string]any{ 282 + "name": fmt.Sprintf("concat-appsrc-%d", mySegCount), 245 283 "is-live": true, 246 284 }) 247 285 if err != nil { ··· 290 328 cancel() 291 329 return 292 330 } 293 - log.Debug(ctx, "EOF, ending stream", "length", read) 331 + log.Debug(ctx, "EOF, ending segment", "length", read) 294 332 done() 295 333 return 296 334 } else { ··· 309 347 self.PushBuffer(buffer) 310 348 311 349 if uint(read) < length { 312 - log.Debug(ctx, "short write, ending stream", "length", read) 350 + log.Debug(ctx, "short write, ending segment", "length", read) 313 351 done() 314 352 } 315 353 }, ··· 340 378 cancel() 341 379 return 342 380 } 381 + 382 + select { 383 + case <-ctx.Done(): 384 + case <-segDone: 385 + } 386 + 387 + log.Debug(ctx, "ending segment") 388 + demux.SetState(gst.StateNull) 389 + src.SetCallbacks(&app.SourceCallbacks{}) 390 + appsrc.SetState(gst.StateNull) 391 + pr.Close() 392 + pw.Close() 343 393 } 344 394 345 395 // fire it up!
+215
pkg/media/concat_test.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "math/rand" 8 + "os" 9 + "strconv" 10 + "testing" 11 + 12 + "github.com/go-gst/go-gst/gst" 13 + "github.com/go-gst/go-gst/gst/app" 14 + "github.com/stretchr/testify/require" 15 + "go.uber.org/goleak" 16 + "stream.place/streamplace/pkg/gstinit" 17 + "stream.place/streamplace/pkg/log" 18 + "stream.place/streamplace/pkg/media/segchanman" 19 + ) 20 + 21 + type TestConcatStreamer struct { 22 + fileName string 23 + data []byte 24 + count int 25 + } 26 + 27 + func (t *TestConcatStreamer) SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg { 28 + if len(t.data) == 0 { 29 + panic("test file empty") 30 + } 31 + ch := make(chan *segchanman.Seg) 32 + go func() { 33 + if t.count == 5 { 34 + ch <- &segchanman.Seg{ 35 + Data: nil, 36 + Filepath: "", 37 + } 38 + } else { 39 + fmt.Println("writing segment " + strconv.Itoa(t.count) + " with random number " + strconv.Itoa(rand.Intn(100))) 40 + ch <- &segchanman.Seg{ 41 + Data: t.data, 42 + Filepath: t.fileName, 43 + } 44 + t.count += 1 45 + } 46 + }() 47 + return ch 48 + } 49 + 50 + func (t *TestConcatStreamer) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg) { 51 + } 52 + 53 + func TestConcat(t *testing.T) { 54 + gstinit.InitGST() 55 + // before := getLeakCount(t) 56 + // defer checkGStreamerLeaks(t, before) 57 + ignore := goleak.IgnoreCurrent() 58 + defer goleak.VerifyNone(t, ignore) 59 + 60 + innnerTestConcat(t) 61 + after := getLeakCount(t) 62 + if after != 6 { 63 + fmt.Println("leaks", after) 64 + } 65 + } 66 + 67 + // This function remains in scope for the duration of a single users' playback 68 + func innnerTestConcat(t *testing.T) { 69 + 70 + ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat": 9}}) 71 + ctx = log.WithLogValues(ctx, "func", "TestConcat") 72 + ctx, cancel := context.WithCancel(ctx) 73 + // defer cancel() 74 + 75 + pipeline, err := gst.NewPipeline("TestConcat") 76 + require.NoError(t, err) 77 + 78 + go func() { 79 + HandleBusMessages(ctx, pipeline) 80 + cancel() 81 + }() 82 + 83 + filename := getFixture("sample-segment.mp4") 84 + inputFile, err := os.Open(filename) 85 + require.NoError(t, err) 86 + bs, err := io.ReadAll(inputFile) 87 + require.NoError(t, err) 88 + tcs := &TestConcatStreamer{ 89 + fileName: getFixture("sample-segment.mp4"), 90 + data: bs, 91 + } 92 + 93 + outputQueue, done, err := ConcatStream(ctx, pipeline, "fakeuser", "fakerendition", tcs) 94 + require.NoError(t, err) 95 + 96 + go func() { 97 + select { 98 + case <-ctx.Done(): 99 + return 100 + case <-done: 101 + cancel() 102 + } 103 + }() 104 + 105 + videoPad := outputQueue.GetStaticPad("src_0") 106 + require.NotNil(t, videoPad) 107 + 108 + audioPad := outputQueue.GetStaticPad("src_1") 109 + require.NotNil(t, audioPad) 110 + 111 + videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 112 + "name": "videoappsink", 113 + "sync": false, 114 + "wait-on-eos": false, 115 + }) 116 + require.NoError(t, err) 117 + err = pipeline.Add(videoAppSink) 118 + require.NoError(t, err) 119 + 120 + videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 121 + require.NotNil(t, videoAppSinkPadSink) 122 + 123 + audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 124 + "name": "audioappsink", 125 + "sync": false, 126 + "wait-on-eos": false, 127 + }) 128 + require.NoError(t, err) 129 + err = pipeline.Add(audioAppSink) 130 + require.NoError(t, err) 131 + 132 + audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 133 + require.NotNil(t, audioAppSinkPadSink) 134 + 135 + ok := videoPad.Link(videoAppSinkPadSink) 136 + require.Equal(t, gst.PadLinkOK, ok) 137 + 138 + ok = audioPad.Link(audioAppSinkPadSink) 139 + require.Equal(t, gst.PadLinkOK, ok) 140 + 141 + videoTotalBytes := 0 142 + audioTotalBytes := 0 143 + 144 + videoappsink := app.SinkFromElement(videoAppSink) 145 + videoappsink.SetCallbacks(&app.SinkCallbacks{ 146 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 147 + sample := sink.PullSample() 148 + if sample == nil { 149 + return gst.FlowEOS 150 + } 151 + 152 + buffer := sample.GetBuffer() 153 + if buffer == nil { 154 + return gst.FlowError 155 + } 156 + 157 + samples := buffer.Map(gst.MapRead).Bytes() 158 + defer buffer.Unmap() 159 + 160 + videoTotalBytes += len(samples) 161 + 162 + return gst.FlowOK 163 + }, 164 + EOSFunc: func(sink *app.Sink) { 165 + log.Warn(ctx, "videoappsink EOSFunc") 166 + cancel() 167 + }, 168 + }) 169 + 170 + audioappsink := app.SinkFromElement(audioAppSink) 171 + audioappsink.SetCallbacks(&app.SinkCallbacks{ 172 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 173 + sample := sink.PullSample() 174 + if sample == nil { 175 + return gst.FlowEOS 176 + } 177 + 178 + buffer := sample.GetBuffer() 179 + if buffer == nil { 180 + return gst.FlowError 181 + } 182 + 183 + samples := buffer.Map(gst.MapRead).Bytes() 184 + defer buffer.Unmap() 185 + 186 + audioTotalBytes += len(samples) 187 + 188 + return gst.FlowOK 189 + }, 190 + EOSFunc: func(sink *app.Sink) { 191 + log.Warn(ctx, "audioappsink EOSFunc") 192 + cancel() 193 + }, 194 + }) 195 + 196 + // Start the pipeline 197 + 198 + err = pipeline.SetState(gst.StatePlaying) 199 + require.NoError(t, err) 200 + 201 + <-ctx.Done() 202 + 203 + err = pipeline.BlockSetState(gst.StateNull) 204 + require.NoError(t, err) 205 + pipeline.Remove(videoAppSink) 206 + pipeline.Remove(audioAppSink) 207 + videoAppSink.SetState(gst.StateNull) 208 + audioAppSink.SetState(gst.StateNull) 209 + videoappsink.SetCallbacks(&app.SinkCallbacks{}) 210 + audioappsink.SetCallbacks(&app.SinkCallbacks{}) 211 + pipeline.Clear() 212 + 213 + require.Greater(t, videoTotalBytes, 1000000) 214 + require.Greater(t, audioTotalBytes, 40000) 215 + }
+58 -22
pkg/media/gstreamer.go
··· 76 76 77 77 // Retrieve the buffer from the sample. 78 78 buffer := sample.GetBuffer() 79 + bs := buffer.Map(gst.MapRead).Bytes() 80 + defer buffer.Unmap() 79 81 80 - _, err := io.Copy(output, buffer.Reader()) 82 + _, err := output.Write(bs) 81 83 82 84 if err != nil { 83 85 panic(err) ··· 144 146 145 147 // basic test to make sure gstreamer functionality is working 146 148 func SelfTest(ctx context.Context) error { 149 + ctx = log.WithLogValues(ctx, "mediafunc", "SelfTest") 147 150 ctx, cancel := context.WithTimeout(ctx, 5*time.Second) 148 151 defer cancel() 149 152 f, err := test.Files.Open("fixtures/sample-segment.mp4") 150 153 if err != nil { 151 - return err 154 + return fmt.Errorf("failed to open test file: %w", err) 152 155 } 153 156 defer f.Close() 154 157 bs, err := io.ReadAll(f) 155 158 if err != nil { 156 - return err 159 + return fmt.Errorf("failed to read test file: %w", err) 157 160 } 158 161 159 - pipeline, err := gst.NewPipelineFromString("appsrc name=src ! appsink name=sink") 162 + pipeline, err := gst.NewPipeline("self-test") 160 163 if err != nil { 161 - return err 164 + return fmt.Errorf("failed to create pipeline: %w", err) 162 165 } 163 166 164 - srcele, err := pipeline.GetElementByName("src") 167 + srcele, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{ 168 + "name": "self-test-src", 169 + }) 165 170 if err != nil { 166 - return err 171 + return fmt.Errorf("failed to create appsrc element: %w", err) 167 172 } 168 - if srcele == nil { 169 - return fmt.Errorf("srcele not found") 173 + err = pipeline.Add(srcele) 174 + if err != nil { 175 + return fmt.Errorf("failed to add appsrc to pipeline: %w", err) 176 + } 177 + 178 + sinkele, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 179 + "name": "self-test-sink", 180 + }) 181 + if err != nil { 182 + return fmt.Errorf("failed to create appsink element: %w", err) 183 + } 184 + err = pipeline.Add(sinkele) 185 + if err != nil { 186 + return fmt.Errorf("failed to add appsink to pipeline: %w", err) 187 + } 188 + 189 + err = srcele.Link(sinkele) 190 + if err != nil { 191 + return fmt.Errorf("failed to link appsrc to appsink: %w", err) 170 192 } 193 + 194 + // pipeline, err := gst.NewPipelineFromString("appsrc name=src ! appsink name=sink") 195 + // if err != nil { 196 + // return err 197 + // } 198 + 171 199 src := app.SrcFromElement(srcele) 172 200 src.SetCallbacks(&app.SourceCallbacks{ 173 201 NeedDataFunc: func(self *app.Source, _ uint) { ··· 175 203 buffer.Map(gst.MapWrite).WriteData(bs) 176 204 defer buffer.Unmap() 177 205 self.PushBuffer(buffer) 206 + log.Debug(ctx, "ending stream") 178 207 self.EndStream() 179 208 }, 180 209 }) 181 210 182 - mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) 183 - 184 211 output := &bytes.Buffer{} 185 - sinkele, err := pipeline.GetElementByName("sink") 212 + 186 213 if err != nil { 187 - return err 214 + return fmt.Errorf("unexpected error: %w", err) 188 215 } 189 - if sinkele == nil { 190 - return fmt.Errorf("sinkele not found") 191 - } 216 + 192 217 appsink := app.SinkFromElement(sinkele) 193 218 appsink.SetCallbacks(&app.SinkCallbacks{ 194 219 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { ··· 210 235 return gst.FlowOK 211 236 }, 212 237 EOSFunc: func(sink *app.Sink) { 238 + log.Debug(ctx, "EOSFunc") 213 239 cancel() 214 240 }, 215 241 }) 216 242 217 - // Start the pipeline 218 - pipeline.SetState(gst.StatePlaying) 219 - 220 243 go func() { 221 - <-ctx.Done() 222 - mainLoop.Quit() 244 + HandleBusMessages(ctx, pipeline) 245 + cancel() 223 246 }() 224 247 225 - mainLoop.Run() 248 + // Start the pipeline 249 + log.Debug(ctx, "setting pipeline to playing state") 250 + err = pipeline.SetState(gst.StatePlaying) 251 + if err != nil { 252 + return fmt.Errorf("failed to set pipeline to playing state: %w", err) 253 + } 254 + 255 + <-ctx.Done() 226 256 227 257 if len(output.Bytes()) < 1 { 228 258 return fmt.Errorf("got a zero-byte buffer from SelfTest") 229 259 } 260 + 261 + err = pipeline.BlockSetState(gst.StateNull) 262 + if err != nil { 263 + return fmt.Errorf("failed to set pipeline to null state: %w", err) 264 + } 265 + 230 266 return nil 231 267 } 232 268
+136
pkg/media/leak_test.go
··· 1 + package media 2 + 3 + import ( 4 + "bufio" 5 + "fmt" 6 + "os" 7 + "path/filepath" 8 + "regexp" 9 + "runtime" 10 + "runtime/debug" 11 + "strings" 12 + "sync" 13 + "syscall" 14 + "testing" 15 + "time" 16 + 17 + "github.com/acarl005/stripansi" 18 + "github.com/stretchr/testify/require" 19 + ) 20 + 21 + const GST_DEBUG_NEEDED = "leaks:9,GST_TRACER:9" 22 + const LEAK_LINE = "GST_TRACER :0:: object-alive" 23 + 24 + var LEAK_DONE_REGEX = regexp.MustCompile(`listed\s+(\d+)\s+alive\s+objects`) 25 + 26 + var LeakReport = []string{} 27 + var LeakReportMutex sync.Mutex 28 + var LeakDoneCh = make(chan struct{}) 29 + 30 + func TestMain(m *testing.M) { 31 + gstDebug := os.Getenv("GST_DEBUG") 32 + if gstDebug == "" { 33 + gstDebug = GST_DEBUG_NEEDED 34 + } else { 35 + gstDebug = fmt.Sprintf("%s,%s", gstDebug, GST_DEBUG_NEEDED) 36 + } 37 + os.Setenv("GST_DEBUG", gstDebug) 38 + os.Setenv("GST_TRACERS", "leaks") 39 + os.Setenv("GST_LEAKS_TRACER_SIG", "1") 40 + debug.SetGCPercent(5) 41 + 42 + f, err := os.MkdirTemp("", "") 43 + if err != nil { 44 + panic(err) 45 + } 46 + fName := filepath.Join(f, "leak.log") 47 + err = syscall.Mkfifo(fName, 0640) 48 + if err != nil { 49 + panic(err) 50 + } 51 + os.Setenv("GST_DEBUG_FILE", fName) 52 + 53 + go func() { 54 + pipe, err := os.OpenFile(fName, os.O_RDONLY, 0640) 55 + if err != nil { 56 + panic(err) 57 + } 58 + defer pipe.Close() 59 + // Read and print each line from FD 60 + scanner := bufio.NewScanner(pipe) 61 + for scanner.Scan() { 62 + line := scanner.Text() 63 + fmt.Println(line) 64 + line = stripansi.Strip(line) 65 + if strings.Contains(line, LEAK_LINE) { 66 + LeakReportMutex.Lock() 67 + LeakReport = append(LeakReport, line) 68 + LeakReportMutex.Unlock() 69 + } else if LEAK_DONE_REGEX.MatchString(line) { 70 + LeakDoneCh <- struct{}{} 71 + } else { 72 + continue 73 + } 74 + } 75 + if err := scanner.Err(); err != nil { 76 + panic(err) 77 + } 78 + }() 79 + os.Exit(m.Run()) 80 + } 81 + 82 + func getLeakCount(t *testing.T) int { 83 + process, err := os.FindProcess(os.Getpid()) 84 + LeakReportMutex.Lock() 85 + LeakReport = []string{} 86 + LeakReportMutex.Unlock() 87 + 88 + ch := make(chan struct{}) 89 + done := false 90 + 91 + go func() { 92 + thing := &[]byte{} 93 + runtime.SetFinalizer(thing, func(thing *[]byte) { 94 + done = true 95 + ch <- struct{}{} 96 + }) 97 + }() 98 + 99 + go func() { 100 + runtime.GC() 101 + runtime.GC() 102 + for { 103 + if done { 104 + break 105 + } 106 + runtime.GC() 107 + runtime.GC() 108 + time.Sleep(500 * time.Millisecond) 109 + } 110 + }() 111 + 112 + <-ch 113 + time.Sleep(1 * time.Second) 114 + 115 + err = process.Signal(os.Signal(syscall.SIGUSR1)) 116 + require.NoError(t, err) 117 + 118 + <-LeakDoneCh 119 + 120 + LeakReportMutex.Lock() 121 + after := len(LeakReport) 122 + LeakReportMutex.Unlock() 123 + return after 124 + } 125 + 126 + func checkGStreamerLeaks(t *testing.T, expected int) { 127 + leaks := getLeakCount(t) 128 + if leaks > expected { 129 + LeakReportMutex.Lock() 130 + for _, l := range LeakReport { 131 + fmt.Println(l) 132 + } 133 + LeakReportMutex.Unlock() 134 + } 135 + require.Equal(t, expected, len(LeakReport), "Leaks found") 136 + }
+2 -2
pkg/media/media_data_parser.go
··· 13 13 "stream.place/streamplace/pkg/model" 14 14 ) 15 15 16 - func (mm *MediaManager) ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) { 16 + func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) { 17 17 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData") 18 18 ctx, cancel := context.WithCancel(ctx) 19 19 defer cancel() 20 20 pipelineSlice := []string{ 21 - "appsrc name=appsrc ! qtdemux name=demux ! fakesink", 21 + "appsrc name=appsrc ! qtdemux name=demux ! fakesink sync=false", 22 22 } 23 23 24 24 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
+41
pkg/media/media_data_parser_test.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "os" 8 + "testing" 9 + 10 + "github.com/stretchr/testify/require" 11 + "go.uber.org/goleak" 12 + "stream.place/streamplace/pkg/gstinit" 13 + "stream.place/streamplace/pkg/log" 14 + ) 15 + 16 + func TestMediaDataParser(t *testing.T) { 17 + gstinit.InitGST() 18 + before := getLeakCount(t) 19 + // defer checkGStreamerLeaks(t, before+1) 20 + defer func() { 21 + after := getLeakCount(t) 22 + if after > before { 23 + fmt.Printf("detected %d leaks", after-before) 24 + } 25 + }() 26 + ignore := goleak.IgnoreCurrent() 27 + defer goleak.VerifyNone(t, ignore) 28 + 29 + // Open input file 30 + inputFile, err := os.Open(getFixture("sample-segment.mp4")) 31 + require.NoError(t, err) 32 + defer inputFile.Close() 33 + bs, err := io.ReadAll(inputFile) 34 + require.NoError(t, err) 35 + 36 + ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"GStreamerFunc": {"ParseSegmentMediaData": 9}}) 37 + mediaData, err := ParseSegmentMediaData(ctx, bs) 38 + require.NoError(t, err) 39 + require.NotNil(t, mediaData) 40 + require.Greater(t, mediaData.Duration, int64(0), "Video duration should not be empty") 41 + }
+1
pkg/media/segmenter.go
··· 57 57 if appsink == nil { 58 58 panic("appsink should not be nil") 59 59 } 60 + 60 61 appsink.SetCallbacks(&app.SinkCallbacks{ 61 62 NewSampleFunc: WriterNewSample(ctx, buf), 62 63 EOSFunc: func(sink *app.Sink) {
+2 -2
pkg/media/thumbnail.go
··· 11 11 "stream.place/streamplace/pkg/log" 12 12 ) 13 13 14 - func (mm *MediaManager) Thumbnail(ctx context.Context, r io.Reader, w io.Writer) error { 14 + func Thumbnail(ctx context.Context, r io.Reader, w io.Writer) error { 15 15 ctx = log.WithLogValues(ctx, "function", "Thumbnail") 16 16 ctx, cancel := context.WithCancel(ctx) 17 17 defer cancel() 18 18 19 19 pipelineSlice := []string{ 20 - "appsrc name=appsrc ! qtdemux ! decodebin ! videoconvert ! videoscale ! video/x-raw,width=[1,720],height=[1,720],pixel-aspect-ratio=1/1 ! pngenc snapshot=true ! appsink name=appsink", 20 + "appsrc name=appsrc ! qtdemux name=demux ! decodebin ! videoconvert ! videoscale ! capsfilter name=capsfilter caps=video/x-raw,width=[1,1280],height=[1,720],pixel-aspect-ratio=1/1 ! queue ! pngenc snapshot=true ! appsink name=appsink", 21 21 } 22 22 23 23 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
+33
pkg/media/thumbnail_test.go
··· 1 + package media 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "os" 7 + "testing" 8 + 9 + "github.com/stretchr/testify/require" 10 + "go.uber.org/goleak" 11 + "stream.place/streamplace/pkg/gstinit" 12 + "stream.place/streamplace/pkg/log" 13 + ) 14 + 15 + func TestThumbnail(t *testing.T) { 16 + gstinit.InitGST() 17 + before := getLeakCount(t) 18 + defer checkGStreamerLeaks(t, before+1) 19 + ignore := goleak.IgnoreCurrent() 20 + defer goleak.VerifyNone(t, ignore) 21 + 22 + // Open input file 23 + inputFile, err := os.Open(getFixture("sample-segment.mp4")) 24 + require.NoError(t, err) 25 + defer inputFile.Close() 26 + 27 + thumbnail := bytes.Buffer{} 28 + ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"function": {"Thumbnail": 9}}) 29 + err = Thumbnail(ctx, inputFile, &thumbnail) 30 + require.NoError(t, err) 31 + require.NotNil(t, thumbnail) 32 + require.Greater(t, thumbnail.Len(), 0, "Thumbnail buffer should not be empty") 33 + }
+1 -1
pkg/media/validate.go
··· 37 37 if err != nil { 38 38 return err 39 39 } 40 - mediaData, err := mm.ParseSegmentMediaData(ctx, buf) 40 + mediaData, err := ParseSegmentMediaData(ctx, buf) 41 41 if err != nil { 42 42 return err 43 43 }
-570
pkg/media/webrtc.go
··· 1 - package media 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "strings" 7 - "time" 8 - 9 - "github.com/go-gst/go-gst/gst" 10 - "github.com/go-gst/go-gst/gst/app" 11 - "github.com/google/uuid" 12 - "github.com/pion/rtcp" 13 - "github.com/pion/webrtc/v4" 14 - "github.com/pion/webrtc/v4/pkg/media" 15 - "stream.place/streamplace/pkg/log" 16 - "stream.place/streamplace/pkg/spmetrics" 17 - ) 18 - 19 - // we have a bug that prevents us from correctly probing video durations 20 - // a lot of the time. so when we don't have them we use the last duration 21 - // that we had, and when we don't have that we use a default duration 22 - var DEFAULT_DURATION = time.Duration(32 * time.Millisecond) 23 - 24 - // This function remains in scope for the duration of a single users' playback 25 - func (mm *MediaManager) WebRTCPlayback(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 26 - uu, err := uuid.NewV7() 27 - if err != nil { 28 - return nil, err 29 - } 30 - ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 31 - ctx, cancel := context.WithCancel(ctx) 32 - 33 - ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback") 34 - 35 - pipelineSlice := []string{ 36 - "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 37 - "opusparse name=audioparse ! appsink name=audioappsink", 38 - } 39 - 40 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 41 - if err != nil { 42 - return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 43 - } 44 - 45 - go func() { 46 - HandleBusMessages(ctx, pipeline) 47 - cancel() 48 - }() 49 - 50 - outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 51 - if err != nil { 52 - return nil, fmt.Errorf("failed to get output queue: %w", err) 53 - } 54 - go func() { 55 - select { 56 - case <-ctx.Done(): 57 - return 58 - case <-done: 59 - cancel() 60 - } 61 - }() 62 - // queuePadVideo := outputQueue.GetRequestPad("src_%u") 63 - // if queuePadVideo == nil { 64 - // return nil, fmt.Errorf("failed to get queue video pad") 65 - // } 66 - // queuePadAudio := outputQueue.GetRequestPad("src_%u") 67 - // if queuePadAudio == nil { 68 - // return nil, fmt.Errorf("failed to get queue audio pad") 69 - // } 70 - 71 - videoParse, err := pipeline.GetElementByName("videoparse") 72 - if err != nil { 73 - return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 74 - } 75 - err = outputQueue.Link(videoParse) 76 - if err != nil { 77 - return nil, fmt.Errorf("failed to link output queue to video parse: %w", err) 78 - } 79 - 80 - audioParse, err := pipeline.GetElementByName("audioparse") 81 - if err != nil { 82 - return nil, fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 83 - } 84 - err = outputQueue.Link(audioParse) 85 - if err != nil { 86 - return nil, fmt.Errorf("failed to link output queue to audio parse: %w", err) 87 - } 88 - 89 - videoappsinkele, err := pipeline.GetElementByName("videoappsink") 90 - if err != nil { 91 - return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 92 - } 93 - 94 - audioappsinkele, err := pipeline.GetElementByName("audioappsink") 95 - if err != nil { 96 - return nil, fmt.Errorf("failed to get audio sink element from pipeline: %w", err) 97 - } 98 - 99 - // Create a new RTCPeerConnection 100 - peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 101 - if err != nil { 102 - return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 103 - } 104 - go func() { 105 - <-ctx.Done() 106 - if cErr := peerConnection.Close(); cErr != nil { 107 - log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 108 - } 109 - }() 110 - 111 - videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 112 - if err != nil { 113 - return nil, fmt.Errorf("failed to create video track: %w", err) 114 - } 115 - videoRTPSender, err := peerConnection.AddTrack(videoTrack) 116 - if err != nil { 117 - return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 118 - } 119 - 120 - audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 121 - if err != nil { 122 - return nil, fmt.Errorf("failed to create audio track: %w", err) 123 - } 124 - audioRTPSender, err := peerConnection.AddTrack(audioTrack) 125 - if err != nil { 126 - return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 127 - } 128 - 129 - // Set the remote SessionDescription 130 - if err = peerConnection.SetRemoteDescription(*offer); err != nil { 131 - return nil, fmt.Errorf("failed to set remote description: %w", err) 132 - } 133 - 134 - // Create answer 135 - answer, err := peerConnection.CreateAnswer(nil) 136 - if err != nil { 137 - return nil, fmt.Errorf("failed to create answer: %w", err) 138 - } 139 - 140 - // Sets the LocalDescription, and starts our UDP listeners 141 - if err = peerConnection.SetLocalDescription(answer); err != nil { 142 - return nil, fmt.Errorf("failed to set local description: %w", err) 143 - } 144 - 145 - // Create channel that is blocked until ICE Gathering is complete 146 - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 147 - 148 - // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 149 - 150 - go func() { 151 - <-ctx.Done() 152 - pipeline.BlockSetState(gst.StateNull) 153 - }() 154 - 155 - go func() { 156 - ticker := time.NewTicker(time.Second * 1) 157 - for { 158 - select { 159 - case <-ctx.Done(): 160 - return 161 - case <-ticker.C: 162 - state := pipeline.GetCurrentState() 163 - log.Debug(ctx, "pipeline state", "state", state) 164 - } 165 - } 166 - }() 167 - 168 - var lastVideoDuration = &DEFAULT_DURATION 169 - 170 - go func() { 171 - 172 - videoappsink := app.SinkFromElement(videoappsinkele) 173 - videoappsink.SetCallbacks(&app.SinkCallbacks{ 174 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 175 - sample := sink.PullSample() 176 - if sample == nil { 177 - return gst.FlowEOS 178 - } 179 - 180 - buffer := sample.GetBuffer() 181 - if buffer == nil { 182 - return gst.FlowError 183 - } 184 - 185 - samples := buffer.Map(gst.MapRead).Bytes() 186 - defer buffer.Unmap() 187 - clockTime := buffer.Duration() 188 - dur := clockTime.AsDuration() 189 - mediaSample := media.Sample{Data: samples} 190 - if dur != nil { 191 - mediaSample.Duration = *dur 192 - lastVideoDuration = dur 193 - } else if lastVideoDuration != nil { 194 - mediaSample.Duration = *lastVideoDuration 195 - } else { 196 - log.Log(ctx, "no video duration", "samples", len(samples)) 197 - // cancel() 198 - return gst.FlowOK 199 - } 200 - 201 - if err := videoTrack.WriteSample(mediaSample); err != nil { 202 - log.Log(ctx, "failed to write video sample", "error", err) 203 - cancel() 204 - } 205 - 206 - return gst.FlowOK 207 - }, 208 - EOSFunc: func(sink *app.Sink) { 209 - log.Warn(ctx, "videoappsink EOSFunc") 210 - cancel() 211 - }, 212 - }) 213 - 214 - audioappsink := app.SinkFromElement(audioappsinkele) 215 - audioappsink.SetCallbacks(&app.SinkCallbacks{ 216 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 217 - sample := sink.PullSample() 218 - if sample == nil { 219 - return gst.FlowEOS 220 - } 221 - 222 - buffer := sample.GetBuffer() 223 - if buffer == nil { 224 - return gst.FlowError 225 - } 226 - 227 - samples := buffer.Map(gst.MapRead).Bytes() 228 - defer buffer.Unmap() 229 - 230 - clockTime := buffer.Duration() 231 - dur := clockTime.AsDuration() 232 - mediaSample := media.Sample{Data: samples} 233 - if dur != nil { 234 - mediaSample.Duration = *dur 235 - } else { 236 - log.Log(ctx, "no audio duration", "samples", len(samples)) 237 - // cancel() 238 - return gst.FlowOK 239 - } 240 - if err := audioTrack.WriteSample(mediaSample); err != nil { 241 - log.Log(ctx, "failed to write audio sample", "error", err) 242 - return gst.FlowOK 243 - } 244 - 245 - return gst.FlowOK 246 - }, 247 - EOSFunc: func(sink *app.Sink) { 248 - log.Warn(ctx, "audioappsink EOSFunc") 249 - cancel() 250 - }, 251 - }) 252 - 253 - // Start the pipeline 254 - pipeline.SetState(gst.StatePlaying) 255 - spmetrics.ViewerInc(user) 256 - defer spmetrics.ViewerDec(user) 257 - 258 - go func() { 259 - rtcpBuf := make([]byte, 1500) 260 - for { 261 - if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 262 - return 263 - } 264 - } 265 - }() 266 - 267 - go func() { 268 - rtcpBuf := make([]byte, 1500) 269 - for { 270 - if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 271 - return 272 - } 273 - } 274 - }() 275 - 276 - // Set the handler for ICE connection state 277 - // This will notify you when the peer has connected/disconnected 278 - peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 279 - log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 280 - if connectionState == webrtc.ICEConnectionStateConnected { 281 - // iceConnectedCtxCancel() 282 - } 283 - }) 284 - 285 - // Set the handler for Peer connection state 286 - // This will notify you when the peer has connected/disconnected 287 - peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 288 - log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 289 - 290 - if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed { 291 - // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 292 - // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 293 - // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 294 - log.Log(ctx, "Peer Connection has gone to failed, exiting") 295 - cancel() 296 - } 297 - }) 298 - 299 - <-ctx.Done() 300 - }() 301 - select { 302 - case <-gatherComplete: 303 - return peerConnection.LocalDescription(), nil 304 - case <-ctx.Done(): 305 - return nil, ctx.Err() 306 - } 307 - } 308 - 309 - // This function remains in scope for the duration of a single users' playback 310 - func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner) (*webrtc.SessionDescription, error) { 311 - uu, err := uuid.NewV7() 312 - if err != nil { 313 - return nil, err 314 - } 315 - 316 - ctx, cancel := context.WithCancel(ctx) 317 - ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest") 318 - 319 - // Setup the codecs you want to use. 320 - // We'll use a VP8 and Opus but you can also define your own 321 - 322 - // Create a new RTCPeerConnection 323 - peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 324 - if err != nil { 325 - return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 326 - } 327 - 328 - // Allow us to receive 1 audio track, and 1 video track 329 - if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { 330 - return nil, fmt.Errorf("failed to add audio transceiver: %w", err) 331 - } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { 332 - return nil, fmt.Errorf("failed to add video transceiver: %w", err) 333 - } 334 - 335 - pipelineSlice := []string{ 336 - "multiqueue name=queue", 337 - "appsrc format=time is-live=true do-timestamp=true name=videosrc ! capsfilter caps=application/x-rtp ! rtph264depay ! capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=nal ! h264parse ! h264timestamper ! identity ! queue.sink_0", 338 - "appsrc format=time is-live=true do-timestamp=true name=audiosrc ! capsfilter caps=application/x-rtp,media=audio,encoding-name=OPUS,payload=111 ! rtpopusdepay ! opusdec use-inband-fec=true ! audiorate ! opusenc ! queue.sink_1", 339 - } 340 - 341 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 342 - if err != nil { 343 - return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 344 - } 345 - 346 - go func() { 347 - HandleBusMessages(ctx, pipeline) 348 - cancel() 349 - }() 350 - 351 - queue, err := pipeline.GetElementByName("queue") 352 - if err != nil { 353 - return nil, fmt.Errorf("failed to get queue element from pipeline: %w", err) 354 - } 355 - 356 - signerElem, err := mm.SegmentAndSignElem(ctx, signer) 357 - if err != nil { 358 - return nil, fmt.Errorf("failed create signer element: %w", err) 359 - } 360 - err = pipeline.Add(signerElem) 361 - if err != nil { 362 - return nil, fmt.Errorf("failed to add signer element to pipeline: %w", err) 363 - } 364 - 365 - // err = queue.Link(signerElem) 366 - // if err != nil { 367 - // return nil, fmt.Errorf("failed to link queue to signer element: %w", err) 368 - // } 369 - videoSrcPads, err := queue.GetSrcPads() 370 - if err != nil { 371 - return nil, fmt.Errorf("failed to get videoSrcPads from queue: %w", err) 372 - } 373 - if len(videoSrcPads) != 2 { 374 - return nil, fmt.Errorf("failed to get videoSrcPads from queue") 375 - } 376 - videoSrcPad := videoSrcPads[0] 377 - audioSrcPad := videoSrcPads[1] 378 - 379 - signerElemPads, err := signerElem.GetPads() 380 - if err != nil { 381 - return nil, fmt.Errorf("failed to get signerElemPads from signer element: %w", err) 382 - } 383 - if len(signerElemPads) != 2 { 384 - return nil, fmt.Errorf("failed to get signerElemPads from signer element") 385 - } 386 - signerElemVideoPad := signerElemPads[0] 387 - signerElemAudioPad := signerElemPads[1] 388 - videoSrcPad.Link(signerElemVideoPad) 389 - audioSrcPad.Link(signerElemAudioPad) 390 - 391 - videoSrcElem, err := pipeline.GetElementByName("videosrc") 392 - if err != nil { 393 - return nil, fmt.Errorf("failed to get videoSrcElem element from pipeline: %w", err) 394 - } 395 - videoSrc := app.SrcFromElement(videoSrcElem) 396 - 397 - audioSrcElem, err := pipeline.GetElementByName("audiosrc") 398 - if err != nil { 399 - return nil, fmt.Errorf("failed to get audioSrcElem element from pipeline: %w", err) 400 - } 401 - audioSrc := app.SrcFromElement(audioSrcElem) 402 - 403 - go func() { 404 - <-ctx.Done() 405 - pipeline.BlockSetState(gst.StateNull) 406 - }() 407 - 408 - go func() { 409 - <-ctx.Done() 410 - if cErr := peerConnection.Close(); cErr != nil { 411 - log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 412 - } 413 - }() 414 - 415 - // Set the remote SessionDescription 416 - if err = peerConnection.SetRemoteDescription(*offer); err != nil { 417 - return nil, fmt.Errorf("failed to set remote description: %w", err) 418 - } 419 - 420 - // Create answer 421 - answer, err := peerConnection.CreateAnswer(nil) 422 - if err != nil { 423 - return nil, fmt.Errorf("failed to create answer: %w", err) 424 - } 425 - 426 - // Sets the LocalDescription, and starts our UDP listeners 427 - if err = peerConnection.SetLocalDescription(answer); err != nil { 428 - return nil, fmt.Errorf("failed to set local description: %w", err) 429 - } 430 - 431 - // Create channel that is blocked until ICE Gathering is complete 432 - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 433 - 434 - go func() { 435 - ticker := time.NewTicker(time.Second * 1) 436 - for { 437 - select { 438 - case <-ctx.Done(): 439 - return 440 - case <-ticker.C: 441 - state := pipeline.GetCurrentState() 442 - log.Debug(ctx, "pipeline state", "state", state) 443 - } 444 - } 445 - }() 446 - // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 447 - 448 - go func() { 449 - log.Debug(ctx, "starting pipeline") 450 - 451 - // Start the pipeline 452 - err = pipeline.SetState(gst.StatePlaying) 453 - if err != nil { 454 - log.Log(ctx, "failed to set pipeline state", "error", err) 455 - cancel() 456 - } 457 - 458 - // Set the handler for ICE connection state 459 - // This will notify you when the peer has connected/disconnected 460 - peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 461 - log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 462 - if connectionState == webrtc.ICEConnectionStateConnected { 463 - // iceConnectedCtxCancel() 464 - } 465 - }) 466 - 467 - // Set the handler for Peer connection state 468 - // This will notify you when the peer has connected/disconnected 469 - peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 470 - log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 471 - 472 - if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected { 473 - // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 474 - // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 475 - // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 476 - log.Log(ctx, "Peer Connection has ended, exiting", "state", s.String()) 477 - cancel() 478 - } 479 - }) 480 - 481 - log.Warn(ctx, "setting OnTrack") 482 - peerConnection.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { 483 - log.Warn(ctx, "OnTrack") 484 - if track.Kind() == webrtc.RTPCodecTypeVideo { 485 - // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval 486 - go func() { 487 - ticker := time.NewTicker(time.Second * 5) 488 - for { 489 - select { 490 - case <-ctx.Done(): 491 - return 492 - case <-ticker.C: 493 - rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) 494 - if rtcpSendErr != nil { 495 - log.Log(ctx, "failed to send rtcp packet", "error", rtcpSendErr) 496 - cancel() 497 - return 498 - } 499 - } 500 - } 501 - }() 502 - 503 - codecName := strings.Split(track.Codec().RTPCodecCapability.MimeType, "/")[1] 504 - log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 505 - 506 - // appSrc := pipelineForCodec(track, codecName) 507 - buf := make([]byte, 1400) 508 - for { 509 - i, _, readErr := track.Read(buf) 510 - if readErr != nil { 511 - log.Log(ctx, "failed to read track", "error", readErr) 512 - cancel() 513 - return 514 - } 515 - log.Debug(ctx, "read video track", "bytes", i) 516 - 517 - ret := videoSrc.PushBuffer(gst.NewBufferFromBytes(buf[:i])) 518 - if ret != gst.FlowOK { 519 - log.Log(ctx, "failed to push buffer", "error", ret) 520 - cancel() 521 - return 522 - } 523 - // state := pipeline.GetCurrentState() 524 - // if state != gst.StatePlaying { 525 - // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 526 - // cancel() 527 - // return 528 - // } 529 - } 530 - } 531 - if track.Kind() == webrtc.RTPCodecTypeAudio { 532 - 533 - codecName := strings.Split(track.Codec().RTPCodecCapability.MimeType, "/")[1] 534 - log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 535 - 536 - buf := make([]byte, 1400) 537 - for { 538 - i, _, readErr := track.Read(buf) 539 - if readErr != nil { 540 - log.Log(ctx, "failed to read track", "error", readErr) 541 - cancel() 542 - return 543 - } 544 - // log.Log(ctx, "read audio track", "bytes", i) 545 - 546 - ret := audioSrc.PushBuffer(gst.NewBufferFromBytes(buf[:i])) 547 - if ret != gst.FlowOK { 548 - log.Log(ctx, "failed to push buffer", "error", ret) 549 - cancel() 550 - return 551 - } 552 - // state := pipeline.GetCurrentState() 553 - // if state != gst.StatePlaying { 554 - // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 555 - // cancel() 556 - // return 557 - // } 558 - } 559 - } 560 - }) 561 - 562 - <-ctx.Done() 563 - }() 564 - select { 565 - case <-gatherComplete: 566 - return peerConnection.LocalDescription(), nil 567 - case <-ctx.Done(): 568 - return nil, ctx.Err() 569 - } 570 - }
+311
pkg/media/webrtc_ingest.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + "time" 8 + 9 + "github.com/go-gst/go-gst/gst" 10 + "github.com/go-gst/go-gst/gst/app" 11 + "github.com/google/uuid" 12 + "github.com/pion/rtcp" 13 + "github.com/pion/webrtc/v4" 14 + "stream.place/streamplace/pkg/log" 15 + ) 16 + 17 + // This function remains in scope for the duration of a single users' playback 18 + func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner) (*webrtc.SessionDescription, error) { 19 + uu, err := uuid.NewV7() 20 + if err != nil { 21 + return nil, err 22 + } 23 + 24 + ctx, cancel := context.WithCancel(ctx) 25 + ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest") 26 + 27 + // Setup the codecs you want to use. 28 + // We'll use a VP8 and Opus but you can also define your own 29 + 30 + // Create a new RTCPeerConnection 31 + peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 32 + if err != nil { 33 + return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 34 + } 35 + 36 + // Allow us to receive 1 audio track, and 1 video track 37 + if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { 38 + return nil, fmt.Errorf("failed to add audio transceiver: %w", err) 39 + } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { 40 + return nil, fmt.Errorf("failed to add video transceiver: %w", err) 41 + } 42 + 43 + pipelineSlice := []string{ 44 + "multiqueue name=queue", 45 + "appsrc format=time is-live=true do-timestamp=true name=videosrc ! capsfilter caps=application/x-rtp ! rtph264depay ! capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=nal ! h264parse ! h264timestamper ! identity ! queue.sink_0", 46 + "appsrc format=time is-live=true do-timestamp=true name=audiosrc ! capsfilter caps=application/x-rtp,media=audio,encoding-name=OPUS,payload=111 ! rtpopusdepay ! opusdec use-inband-fec=true ! audiorate ! opusenc ! queue.sink_1", 47 + } 48 + 49 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 50 + if err != nil { 51 + return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 52 + } 53 + 54 + go func() { 55 + HandleBusMessages(ctx, pipeline) 56 + cancel() 57 + }() 58 + 59 + queue, err := pipeline.GetElementByName("queue") 60 + if err != nil { 61 + return nil, fmt.Errorf("failed to get queue element from pipeline: %w", err) 62 + } 63 + 64 + signerElem, err := mm.SegmentAndSignElem(ctx, signer) 65 + if err != nil { 66 + return nil, fmt.Errorf("failed create signer element: %w", err) 67 + } 68 + err = pipeline.Add(signerElem) 69 + if err != nil { 70 + return nil, fmt.Errorf("failed to add signer element to pipeline: %w", err) 71 + } 72 + 73 + // err = queue.Link(signerElem) 74 + // if err != nil { 75 + // return nil, fmt.Errorf("failed to link queue to signer element: %w", err) 76 + // } 77 + videoSrcPads, err := queue.GetSrcPads() 78 + if err != nil { 79 + return nil, fmt.Errorf("failed to get videoSrcPads from queue: %w", err) 80 + } 81 + if len(videoSrcPads) != 2 { 82 + return nil, fmt.Errorf("failed to get videoSrcPads from queue") 83 + } 84 + videoSrcPad := videoSrcPads[0] 85 + audioSrcPad := videoSrcPads[1] 86 + 87 + signerElemPads, err := signerElem.GetPads() 88 + if err != nil { 89 + return nil, fmt.Errorf("failed to get signerElemPads from signer element: %w", err) 90 + } 91 + if len(signerElemPads) != 2 { 92 + return nil, fmt.Errorf("failed to get signerElemPads from signer element") 93 + } 94 + signerElemVideoPad := signerElemPads[0] 95 + signerElemAudioPad := signerElemPads[1] 96 + videoSrcPad.Link(signerElemVideoPad) 97 + audioSrcPad.Link(signerElemAudioPad) 98 + 99 + videoSrcElem, err := pipeline.GetElementByName("videosrc") 100 + if err != nil { 101 + return nil, fmt.Errorf("failed to get videoSrcElem element from pipeline: %w", err) 102 + } 103 + videoSrc := app.SrcFromElement(videoSrcElem) 104 + 105 + audioSrcElem, err := pipeline.GetElementByName("audiosrc") 106 + if err != nil { 107 + return nil, fmt.Errorf("failed to get audioSrcElem element from pipeline: %w", err) 108 + } 109 + audioSrc := app.SrcFromElement(audioSrcElem) 110 + 111 + go func() { 112 + <-ctx.Done() 113 + if cErr := peerConnection.Close(); cErr != nil { 114 + log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 115 + } 116 + }() 117 + 118 + // Set the remote SessionDescription 119 + if err = peerConnection.SetRemoteDescription(*offer); err != nil { 120 + return nil, fmt.Errorf("failed to set remote description: %w", err) 121 + } 122 + 123 + // Create answer 124 + answer, err := peerConnection.CreateAnswer(nil) 125 + if err != nil { 126 + return nil, fmt.Errorf("failed to create answer: %w", err) 127 + } 128 + 129 + // Sets the LocalDescription, and starts our UDP listeners 130 + if err = peerConnection.SetLocalDescription(answer); err != nil { 131 + return nil, fmt.Errorf("failed to set local description: %w", err) 132 + } 133 + 134 + // Create channel that is blocked until ICE Gathering is complete 135 + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 136 + 137 + go func() { 138 + ticker := time.NewTicker(time.Second * 1) 139 + for { 140 + select { 141 + case <-ctx.Done(): 142 + return 143 + case <-ticker.C: 144 + state := pipeline.GetCurrentState() 145 + log.Debug(ctx, "pipeline state", "state", state) 146 + } 147 + } 148 + }() 149 + // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 150 + 151 + go func() { 152 + log.Debug(ctx, "starting pipeline") 153 + 154 + // Start the pipeline 155 + err = pipeline.SetState(gst.StatePlaying) 156 + if err != nil { 157 + log.Log(ctx, "failed to set pipeline state", "error", err) 158 + cancel() 159 + } 160 + 161 + // Set the handler for ICE connection state 162 + // This will notify you when the peer has connected/disconnected 163 + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 164 + log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 165 + if connectionState == webrtc.ICEConnectionStateConnected { 166 + // iceConnectedCtxCancel() 167 + } 168 + }) 169 + 170 + // Set the handler for Peer connection state 171 + // This will notify you when the peer has connected/disconnected 172 + peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 173 + log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 174 + 175 + if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected { 176 + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 177 + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 178 + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 179 + log.Log(ctx, "Peer Connection has ended, exiting", "state", s.String()) 180 + cancel() 181 + } 182 + }) 183 + 184 + videoFirst := false 185 + audioFirst := false 186 + 187 + log.Warn(ctx, "setting OnTrack") 188 + peerConnection.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { 189 + log.Warn(ctx, "OnTrack") 190 + if track.Kind() == webrtc.RTPCodecTypeVideo { 191 + // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval 192 + go func() { 193 + ticker := time.NewTicker(time.Second * 5) 194 + for { 195 + select { 196 + case <-ctx.Done(): 197 + return 198 + case <-ticker.C: 199 + rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) 200 + if rtcpSendErr != nil { 201 + log.Log(ctx, "failed to send rtcp packet", "error", rtcpSendErr) 202 + cancel() 203 + return 204 + } 205 + } 206 + } 207 + }() 208 + 209 + codecName := strings.Split(track.Codec().RTPCodecCapability.MimeType, "/")[1] 210 + log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 211 + 212 + // appSrc := pipelineForCodec(track, codecName) 213 + buf := make([]byte, 1400) 214 + for { 215 + i, _, readErr := track.Read(buf) 216 + if readErr != nil { 217 + log.Log(ctx, "failed to read track", "error", readErr) 218 + cancel() 219 + return 220 + } 221 + if ctx.Err() != nil { 222 + return 223 + } 224 + if !videoFirst { 225 + videoFirst = true 226 + log.Debug(ctx, "got video data", "len", len(buf[:i])) 227 + } 228 + 229 + gbuf := gst.NewBufferWithSize(int64(len(buf[:i]))) 230 + gbuf.Map(gst.MapWrite).WriteData(buf[:i]) 231 + defer gbuf.Unmap() 232 + 233 + ret := videoSrc.PushBuffer(gbuf) 234 + if ret != gst.FlowOK { 235 + log.Log(ctx, "failed to push buffer", "error", ret) 236 + cancel() 237 + return 238 + } 239 + // state := pipeline.GetCurrentState() 240 + // if state != gst.StatePlaying { 241 + // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 242 + // cancel() 243 + // return 244 + // } 245 + } 246 + } 247 + if track.Kind() == webrtc.RTPCodecTypeAudio { 248 + 249 + codecName := strings.Split(track.Codec().RTPCodecCapability.MimeType, "/")[1] 250 + log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 251 + 252 + buf := make([]byte, 1400) 253 + for { 254 + i, _, readErr := track.Read(buf) 255 + if readErr != nil { 256 + log.Log(ctx, "failed to read track", "error", readErr) 257 + cancel() 258 + return 259 + } 260 + if ctx.Err() != nil { 261 + return 262 + } 263 + if !audioFirst { 264 + audioFirst = true 265 + log.Debug(ctx, "got audio data", "len", len(buf[:i])) 266 + } 267 + 268 + gbuf := gst.NewBufferWithSize(int64(len(buf[:i]))) 269 + gbuf.Map(gst.MapWrite).WriteData(buf[:i]) 270 + defer gbuf.Unmap() 271 + ret := audioSrc.PushBuffer(gbuf) 272 + if ret != gst.FlowOK { 273 + log.Log(ctx, "failed to push buffer", "error", ret) 274 + cancel() 275 + return 276 + } 277 + // state := pipeline.GetCurrentState() 278 + // if state != gst.StatePlaying { 279 + // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 280 + // cancel() 281 + // return 282 + // } 283 + } 284 + } 285 + }) 286 + 287 + <-ctx.Done() 288 + 289 + err = pipeline.BlockSetState(gst.StateNull) 290 + if err != nil { 291 + log.Log(ctx, "failed to set pipeline state to null", "error", err) 292 + } 293 + 294 + audioSrcElem.SetState(gst.StateNull) 295 + if err != nil { 296 + log.Log(ctx, "failed to set audioSrcElem state to null", "error", err) 297 + } 298 + 299 + videoSrcElem.SetState(gst.StateNull) 300 + if err != nil { 301 + log.Log(ctx, "failed to set videoSrcElem state to null", "error", err) 302 + } 303 + 304 + }() 305 + select { 306 + case <-gatherComplete: 307 + return peerConnection.LocalDescription(), nil 308 + case <-ctx.Done(): 309 + return nil, ctx.Err() 310 + } 311 + }
+327
pkg/media/webrtc_playback.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + "time" 8 + 9 + "github.com/go-gst/go-gst/gst" 10 + "github.com/go-gst/go-gst/gst/app" 11 + "github.com/google/uuid" 12 + "github.com/pion/webrtc/v4" 13 + "github.com/pion/webrtc/v4/pkg/media" 14 + "stream.place/streamplace/pkg/log" 15 + "stream.place/streamplace/pkg/spmetrics" 16 + ) 17 + 18 + // we have a bug that prevents us from correctly probing video durations 19 + // a lot of the time. so when we don't have them we use the last duration 20 + // that we had, and when we don't have that we use a default duration 21 + var DEFAULT_DURATION = time.Duration(32 * time.Millisecond) 22 + 23 + // This function remains in scope for the duration of a single users' playback 24 + func (mm *MediaManager) WebRTCPlayback(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 25 + uu, err := uuid.NewV7() 26 + if err != nil { 27 + return nil, err 28 + } 29 + ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 30 + ctx, cancel := context.WithCancel(ctx) 31 + 32 + ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback") 33 + 34 + pipelineSlice := []string{ 35 + "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 36 + "opusparse name=audioparse ! appsink name=audioappsink", 37 + } 38 + 39 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 40 + if err != nil { 41 + return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 42 + } 43 + 44 + go func() { 45 + HandleBusMessages(ctx, pipeline) 46 + cancel() 47 + }() 48 + 49 + outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 50 + if err != nil { 51 + return nil, fmt.Errorf("failed to get output queue: %w", err) 52 + } 53 + go func() { 54 + select { 55 + case <-ctx.Done(): 56 + return 57 + case <-done: 58 + cancel() 59 + } 60 + }() 61 + // queuePadVideo := outputQueue.GetRequestPad("src_%u") 62 + // if queuePadVideo == nil { 63 + // return nil, fmt.Errorf("failed to get queue video pad") 64 + // } 65 + // queuePadAudio := outputQueue.GetRequestPad("src_%u") 66 + // if queuePadAudio == nil { 67 + // return nil, fmt.Errorf("failed to get queue audio pad") 68 + // } 69 + 70 + videoParse, err := pipeline.GetElementByName("videoparse") 71 + if err != nil { 72 + return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 73 + } 74 + err = outputQueue.Link(videoParse) 75 + if err != nil { 76 + return nil, fmt.Errorf("failed to link output queue to video parse: %w", err) 77 + } 78 + 79 + audioParse, err := pipeline.GetElementByName("audioparse") 80 + if err != nil { 81 + return nil, fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 82 + } 83 + err = outputQueue.Link(audioParse) 84 + if err != nil { 85 + return nil, fmt.Errorf("failed to link output queue to audio parse: %w", err) 86 + } 87 + 88 + videoappsinkele, err := pipeline.GetElementByName("videoappsink") 89 + if err != nil { 90 + return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 91 + } 92 + 93 + audioappsinkele, err := pipeline.GetElementByName("audioappsink") 94 + if err != nil { 95 + return nil, fmt.Errorf("failed to get audio sink element from pipeline: %w", err) 96 + } 97 + 98 + // Create a new RTCPeerConnection 99 + peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 100 + if err != nil { 101 + return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 102 + } 103 + go func() { 104 + <-ctx.Done() 105 + if cErr := peerConnection.Close(); cErr != nil { 106 + log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 107 + } 108 + }() 109 + 110 + videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 111 + if err != nil { 112 + return nil, fmt.Errorf("failed to create video track: %w", err) 113 + } 114 + videoRTPSender, err := peerConnection.AddTrack(videoTrack) 115 + if err != nil { 116 + return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 117 + } 118 + 119 + audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 120 + if err != nil { 121 + return nil, fmt.Errorf("failed to create audio track: %w", err) 122 + } 123 + audioRTPSender, err := peerConnection.AddTrack(audioTrack) 124 + if err != nil { 125 + return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 126 + } 127 + 128 + // Set the remote SessionDescription 129 + if err = peerConnection.SetRemoteDescription(*offer); err != nil { 130 + return nil, fmt.Errorf("failed to set remote description: %w", err) 131 + } 132 + 133 + // Create answer 134 + answer, err := peerConnection.CreateAnswer(nil) 135 + if err != nil { 136 + return nil, fmt.Errorf("failed to create answer: %w", err) 137 + } 138 + 139 + // Sets the LocalDescription, and starts our UDP listeners 140 + if err = peerConnection.SetLocalDescription(answer); err != nil { 141 + return nil, fmt.Errorf("failed to set local description: %w", err) 142 + } 143 + 144 + // Create channel that is blocked until ICE Gathering is complete 145 + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 146 + 147 + // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 148 + 149 + go func() { 150 + ticker := time.NewTicker(time.Second * 1) 151 + for { 152 + select { 153 + case <-ctx.Done(): 154 + return 155 + case <-ticker.C: 156 + state := pipeline.GetCurrentState() 157 + log.Debug(ctx, "pipeline state", "state", state) 158 + } 159 + } 160 + }() 161 + 162 + var lastVideoDuration = &DEFAULT_DURATION 163 + 164 + go func() { 165 + 166 + videoappsink := app.SinkFromElement(videoappsinkele) 167 + videoappsink.SetCallbacks(&app.SinkCallbacks{ 168 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 169 + sample := sink.PullSample() 170 + if sample == nil { 171 + return gst.FlowEOS 172 + } 173 + 174 + buffer := sample.GetBuffer() 175 + if buffer == nil { 176 + return gst.FlowError 177 + } 178 + 179 + samples := buffer.Map(gst.MapRead).Bytes() 180 + defer buffer.Unmap() 181 + b2 := make([]byte, len(samples)) 182 + copy(b2, samples) 183 + clockTime := buffer.Duration() 184 + dur := clockTime.AsDuration() 185 + mediaSample := media.Sample{Data: b2} 186 + if dur != nil { 187 + mediaSample.Duration = *dur 188 + lastVideoDuration = dur 189 + } else if lastVideoDuration != nil { 190 + mediaSample.Duration = *lastVideoDuration 191 + } else { 192 + log.Log(ctx, "no video duration", "samples", len(b2)) 193 + // cancel() 194 + return gst.FlowOK 195 + } 196 + 197 + if err := videoTrack.WriteSample(mediaSample); err != nil { 198 + log.Log(ctx, "failed to write video sample", "error", err) 199 + cancel() 200 + } 201 + 202 + return gst.FlowOK 203 + }, 204 + EOSFunc: func(sink *app.Sink) { 205 + log.Warn(ctx, "videoappsink EOSFunc") 206 + cancel() 207 + }, 208 + }) 209 + 210 + audioappsink := app.SinkFromElement(audioappsinkele) 211 + audioappsink.SetCallbacks(&app.SinkCallbacks{ 212 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 213 + sample := sink.PullSample() 214 + if sample == nil { 215 + return gst.FlowEOS 216 + } 217 + 218 + buffer := sample.GetBuffer() 219 + if buffer == nil { 220 + return gst.FlowError 221 + } 222 + 223 + samples := buffer.Map(gst.MapRead).Bytes() 224 + defer buffer.Unmap() 225 + 226 + b2 := make([]byte, len(samples)) 227 + copy(b2, samples) 228 + 229 + clockTime := buffer.Duration() 230 + dur := clockTime.AsDuration() 231 + mediaSample := media.Sample{Data: b2} 232 + if dur != nil { 233 + mediaSample.Duration = *dur 234 + } else { 235 + log.Log(ctx, "no audio duration", "samples", len(b2)) 236 + // cancel() 237 + return gst.FlowOK 238 + } 239 + if err := audioTrack.WriteSample(mediaSample); err != nil { 240 + log.Log(ctx, "failed to write audio sample", "error", err) 241 + return gst.FlowOK 242 + } 243 + 244 + return gst.FlowOK 245 + }, 246 + EOSFunc: func(sink *app.Sink) { 247 + log.Warn(ctx, "audioappsink EOSFunc") 248 + cancel() 249 + }, 250 + }) 251 + 252 + // Start the pipeline 253 + pipeline.SetState(gst.StatePlaying) 254 + spmetrics.ViewerInc(user) 255 + defer spmetrics.ViewerDec(user) 256 + 257 + go func() { 258 + rtcpBuf := make([]byte, 1500) 259 + for { 260 + if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 261 + return 262 + } 263 + } 264 + }() 265 + 266 + go func() { 267 + rtcpBuf := make([]byte, 1500) 268 + for { 269 + if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 270 + return 271 + } 272 + } 273 + }() 274 + 275 + // Set the handler for ICE connection state 276 + // This will notify you when the peer has connected/disconnected 277 + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 278 + log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 279 + if connectionState == webrtc.ICEConnectionStateConnected { 280 + // iceConnectedCtxCancel() 281 + } 282 + }) 283 + 284 + // Set the handler for Peer connection state 285 + // This will notify you when the peer has connected/disconnected 286 + peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 287 + log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 288 + 289 + if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected { 290 + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 291 + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 292 + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 293 + log.Log(ctx, "Peer Connection has gone to failed, exiting") 294 + cancel() 295 + } 296 + }) 297 + 298 + <-ctx.Done() 299 + 300 + log.Warn(ctx, "setting playback pipeline state to null") 301 + err = pipeline.BlockSetState(gst.StateNull) 302 + if err != nil { 303 + log.Log(ctx, "failed to set pipeline state to null", "error", err) 304 + } 305 + 306 + videoappsink.SetCallbacks(&app.SinkCallbacks{}) 307 + err = videoappsinkele.SetState(gst.StateNull) 308 + if err != nil { 309 + log.Log(ctx, "failed to set videoappsinkele state to null", "error", err) 310 + } 311 + 312 + audioappsink.SetCallbacks(&app.SinkCallbacks{}) 313 + err = audioappsinkele.SetState(gst.StateNull) 314 + if err != nil { 315 + log.Log(ctx, "failed to set audioappsinkele state to null", "error", err) 316 + } 317 + 318 + log.Warn(ctx, "exiting playback") 319 + 320 + }() 321 + select { 322 + case <-gatherComplete: 323 + return peerConnection.LocalDescription(), nil 324 + case <-ctx.Done(): 325 + return nil, ctx.Err() 326 + } 327 + }