Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "github.com/google/uuid"
12 "github.com/pion/webrtc/v4"
13 "github.com/pion/webrtc/v4/pkg/media"
14 "stream.place/streamplace/pkg/bus"
15 "stream.place/streamplace/pkg/log"
16 "stream.place/streamplace/pkg/spmetrics"
17)
18
19// we have a bug that prevents us from correctly probing video durations
20// a lot of the time. so when we don't have them we use the last duration
21// that we had, and when we don't have that we use a default duration
22var DefaultDuration = time.Duration(32 * time.Millisecond)
23
24// This function remains in scope for the duration of a single users' playback
25func (mm *MediaManager) WebRTCPlayback(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
26 uu, err := uuid.NewV7()
27 if err != nil {
28 return nil, err
29 }
30 ctx = log.WithLogValues(ctx, "webrtcID", uu.String())
31 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback")
32 ctx, cancel := context.WithCancel(ctx) //nolint:all
33
34 pipelineSlice := []string{
35 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink",
36 "opusparse name=audioparse ! appsink name=audioappsink",
37 }
38
39 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
40 if err != nil {
41 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all
42 }
43
44 segBuffer := make(chan *bus.Seg, 1024)
45 go func() {
46 segChan := mm.bus.SubscribeSegment(ctx, user, rendition)
47 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan)
48 for {
49 select {
50 case <-ctx.Done():
51 log.Debug(ctx, "exiting segment reader")
52 return
53 case file := <-segChan.C:
54 log.Debug(ctx, "got segment", "file", file.Filepath)
55 segBuffer <- file
56 }
57 }
58 }()
59
60 segCh := make(chan *bus.Seg)
61 go func() {
62 for {
63 select {
64 case <-ctx.Done():
65 log.Debug(ctx, "exiting segment reader")
66 return
67 case seg := <-segBuffer:
68 select {
69 case <-ctx.Done():
70 return
71 case segCh <- seg:
72 }
73 }
74 }
75 }()
76
77 concatBin, err := ConcatBin(ctx, segCh)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create concat bin: %w", err)
80 }
81
82 err = pipeline.Add(concatBin.Element)
83 if err != nil {
84 return nil, fmt.Errorf("failed to add concat bin to pipeline: %w", err)
85 }
86
87 videoPad := concatBin.GetStaticPad("video_0")
88 if videoPad == nil {
89 return nil, fmt.Errorf("video pad not found")
90 }
91
92 audioPad := concatBin.GetStaticPad("audio_0")
93 if audioPad == nil {
94 return nil, fmt.Errorf("audio pad not found")
95 }
96
97 // queuePadVideo := outputQueue.GetRequestPad("src_%u")
98 // if queuePadVideo == nil {
99 // return nil, fmt.Errorf("failed to get queue video pad")
100 // }
101 // queuePadAudio := outputQueue.GetRequestPad("src_%u")
102 // if queuePadAudio == nil {
103 // return nil, fmt.Errorf("failed to get queue audio pad")
104 // }
105
106 videoParse, err := pipeline.GetElementByName("videoparse")
107 if err != nil {
108 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err)
109 }
110 videoParsePad := videoParse.GetStaticPad("sink")
111 if videoParsePad == nil {
112 return nil, fmt.Errorf("video parse pad not found")
113 }
114 linked := videoPad.Link(videoParsePad)
115 if linked != gst.PadLinkOK {
116 return nil, fmt.Errorf("failed to link video pad to video parse pad: %v", linked)
117 }
118
119 audioParse, err := pipeline.GetElementByName("audioparse")
120 if err != nil {
121 return nil, fmt.Errorf("failed to get audio parse element from pipeline: %w", err)
122 }
123 audioParsePad := audioParse.GetStaticPad("sink")
124 if audioParsePad == nil {
125 return nil, fmt.Errorf("audio parse pad not found")
126 }
127 linked = audioPad.Link(audioParsePad)
128 if linked != gst.PadLinkOK {
129 return nil, fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked)
130 }
131
132 videoappsinkele, err := pipeline.GetElementByName("videoappsink")
133 if err != nil {
134 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err)
135 }
136
137 audioappsinkele, err := pipeline.GetElementByName("audioappsink")
138 if err != nil {
139 return nil, fmt.Errorf("failed to get audio sink element from pipeline: %w", err)
140 }
141
142 // Create a new RTCPeerConnection
143 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig)
144 if err != nil {
145 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err)
146 }
147 go func() {
148 <-ctx.Done()
149 if cErr := peerConnection.Close(); cErr != nil {
150 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
151 }
152 }()
153
154 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
155 if err != nil {
156 return nil, fmt.Errorf("failed to create video track: %w", err)
157 }
158 videoRTPSender, err := peerConnection.AddTrack(videoTrack)
159 if err != nil {
160 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err)
161 }
162
163 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
164 if err != nil {
165 return nil, fmt.Errorf("failed to create audio track: %w", err)
166 }
167 audioRTPSender, err := peerConnection.AddTrack(audioTrack)
168 if err != nil {
169 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err)
170 }
171
172 // Set the remote SessionDescription
173 if err = peerConnection.SetRemoteDescription(*offer); err != nil {
174 return nil, fmt.Errorf("failed to set remote description: %w", err)
175 }
176
177 // Create answer
178 answer, err := peerConnection.CreateAnswer(nil)
179 if err != nil {
180 return nil, fmt.Errorf("failed to create answer: %w", err)
181 }
182
183 // Sets the LocalDescription, and starts our UDP listeners
184 if err = peerConnection.SetLocalDescription(answer); err != nil {
185 return nil, fmt.Errorf("failed to set local description: %w", err)
186 }
187
188 // Create channel that is blocked until ICE Gathering is complete
189 gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
190
191 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user.
192
193 go func() {
194 ticker := time.NewTicker(time.Second * 1)
195 for {
196 select {
197 case <-ctx.Done():
198 return
199 case <-ticker.C:
200 state := pipeline.GetCurrentState()
201 log.Debug(ctx, "pipeline state", "state", state)
202 }
203 }
204 }()
205
206 var lastVideoDuration = &DefaultDuration
207
208 go func() {
209 go func() {
210 if err := HandleBusMessages(ctx, pipeline); err != nil {
211 log.Log(ctx, "pipeline error", "error", err)
212 }
213 cancel()
214 }()
215
216 videoappsink := app.SinkFromElement(videoappsinkele)
217 videoappsink.SetCallbacks(&app.SinkCallbacks{
218 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
219 sample := sink.PullSample()
220 if sample == nil {
221 return gst.FlowEOS
222 }
223
224 buffer := sample.GetBuffer()
225 if buffer == nil {
226 return gst.FlowError
227 }
228
229 samples := buffer.Map(gst.MapRead).Bytes()
230 defer buffer.Unmap()
231 clockTime := buffer.Duration()
232 dur := clockTime.AsDuration()
233 mediaSample := media.Sample{Data: samples}
234 if dur != nil {
235 mediaSample.Duration = *dur
236 lastVideoDuration = dur
237 } else if lastVideoDuration != nil {
238 // log.Log(ctx, "no video duration, using last duration", "lastVideoDuration", lastVideoDuration)
239 mediaSample.Duration = *lastVideoDuration
240 } else {
241 log.Log(ctx, "no video duration", "samples", len(samples))
242 // cancel()
243 return gst.FlowOK
244 }
245
246 if err := videoTrack.WriteSample(mediaSample); err != nil {
247 log.Log(ctx, "failed to write video sample", "error", err)
248 cancel()
249 }
250
251 return gst.FlowOK
252 },
253 EOSFunc: func(sink *app.Sink) {
254 log.Warn(ctx, "videoappsink EOSFunc")
255 cancel()
256 },
257 })
258
259 audioappsink := app.SinkFromElement(audioappsinkele)
260 audioappsink.SetCallbacks(&app.SinkCallbacks{
261 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
262 sample := sink.PullSample()
263 if sample == nil {
264 return gst.FlowEOS
265 }
266
267 buffer := sample.GetBuffer()
268 if buffer == nil {
269 return gst.FlowError
270 }
271
272 samples := buffer.Map(gst.MapRead).Bytes()
273 defer buffer.Unmap()
274
275 b2 := make([]byte, len(samples))
276 copy(b2, samples)
277
278 clockTime := buffer.Duration()
279 dur := clockTime.AsDuration()
280 mediaSample := media.Sample{Data: b2}
281 if dur != nil {
282 mediaSample.Duration = *dur
283 } else {
284 log.Log(ctx, "no audio duration", "samples", len(b2))
285 // cancel()
286 return gst.FlowOK
287 }
288 if err := audioTrack.WriteSample(mediaSample); err != nil {
289 log.Log(ctx, "failed to write audio sample", "error", err)
290 return gst.FlowOK
291 }
292
293 return gst.FlowOK
294 },
295 EOSFunc: func(sink *app.Sink) {
296 log.Warn(ctx, "audioappsink EOSFunc")
297 cancel()
298 },
299 })
300
301 // Start the pipeline
302 err := pipeline.SetState(gst.StatePlaying)
303 if err != nil {
304 log.Log(ctx, "failed to set pipeline state to null", "error", err)
305 }
306 spmetrics.ViewerInc(user)
307 defer spmetrics.ViewerDec(user)
308
309 go func() {
310 rtcpBuf := make([]byte, 1500)
311 for {
312 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil {
313 return
314 }
315 }
316 }()
317
318 go func() {
319 rtcpBuf := make([]byte, 1500)
320 for {
321 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil {
322 return
323 }
324 }
325 }()
326
327 // Set the handler for ICE connection state
328 // This will notify you when the peer has connected/disconnected
329 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
330 log.Log(ctx, "Connection State has changed", "state", connectionState.String())
331 })
332
333 // Set the handler for Peer connection state
334 // This will notify you when the peer has connected/disconnected
335 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
336 log.Log(ctx, "Peer Connection State has changed", "state", s.String())
337
338 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected {
339 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
340 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
341 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
342 log.Log(ctx, "Peer Connection has gone to failed, exiting")
343 cancel()
344 }
345 })
346
347 <-ctx.Done()
348
349 log.Warn(ctx, "setting playback pipeline state to null")
350 err = pipeline.BlockSetState(gst.StateNull)
351 if err != nil {
352 log.Log(ctx, "failed to set pipeline state to null", "error", err)
353 }
354
355 videoappsink.SetCallbacks(&app.SinkCallbacks{})
356 err = videoappsinkele.SetState(gst.StateNull)
357 if err != nil {
358 log.Log(ctx, "failed to set videoappsinkele state to null", "error", err)
359 }
360
361 audioappsink.SetCallbacks(&app.SinkCallbacks{})
362 err = audioappsinkele.SetState(gst.StateNull)
363 if err != nil {
364 log.Log(ctx, "failed to set audioappsinkele state to null", "error", err)
365 }
366
367 log.Warn(ctx, "exiting playback")
368
369 }()
370 select {
371 case <-gatherComplete:
372 return peerConnection.LocalDescription(), nil
373 case <-ctx.Done():
374 return nil, ctx.Err()
375 }
376}