Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "github.com/google/uuid"
12 "github.com/pion/webrtc/v4"
13 "github.com/pion/webrtc/v4/pkg/media"
14 "stream.place/streamplace/pkg/bus"
15 "stream.place/streamplace/pkg/log"
16)
17
18// we have a bug that prevents us from correctly probing video durations
19// a lot of the time. so when we don't have them we use the last duration
20// that we had, and when we don't have that we use a default duration
21var DefaultDuration = time.Duration(32 * time.Millisecond)
22
23// This function remains in scope for the duration of a single users' playback
24func (mm *MediaManager) WebRTCPlayback(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
25 uu, err := uuid.NewV7()
26 if err != nil {
27 return nil, err
28 }
29 ctx = log.WithLogValues(ctx, "webrtcID", uu.String())
30 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback")
31 ctx, cancel := context.WithCancel(ctx) //nolint:all
32
33 pipelineSlice := []string{
34 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink",
35 "opusparse name=audioparse ! appsink name=audioappsink",
36 }
37
38 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
39 if err != nil {
40 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all
41 }
42
43 segBuffer := make(chan *bus.Seg, 1024)
44 go func() {
45 segChan := mm.bus.SubscribeSegment(ctx, user, rendition)
46 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan)
47 for {
48 select {
49 case <-ctx.Done():
50 log.Debug(ctx, "exiting segment reader")
51 return
52 case file := <-segChan.C:
53 log.Debug(ctx, "got segment", "file", file.Filepath)
54 segBuffer <- file
55 }
56 }
57 }()
58
59 segCh := make(chan *bus.Seg)
60 go func() {
61 for {
62 select {
63 case <-ctx.Done():
64 log.Debug(ctx, "exiting segment reader")
65 return
66 case seg := <-segBuffer:
67 select {
68 case <-ctx.Done():
69 return
70 case segCh <- seg:
71 }
72 }
73 }
74 }()
75
76 concatBin, err := ConcatBin(ctx, segCh, true)
77 if err != nil {
78 return nil, fmt.Errorf("failed to create concat bin: %w", err)
79 }
80
81 err = pipeline.Add(concatBin.Element)
82 if err != nil {
83 return nil, fmt.Errorf("failed to add concat bin to pipeline: %w", err)
84 }
85
86 videoPad := concatBin.GetStaticPad("video_0")
87 if videoPad == nil {
88 return nil, fmt.Errorf("video pad not found")
89 }
90
91 audioPad := concatBin.GetStaticPad("audio_0")
92 if audioPad == nil {
93 return nil, fmt.Errorf("audio pad not found")
94 }
95
96 // queuePadVideo := outputQueue.GetRequestPad("src_%u")
97 // if queuePadVideo == nil {
98 // return nil, fmt.Errorf("failed to get queue video pad")
99 // }
100 // queuePadAudio := outputQueue.GetRequestPad("src_%u")
101 // if queuePadAudio == nil {
102 // return nil, fmt.Errorf("failed to get queue audio pad")
103 // }
104
105 videoParse, err := pipeline.GetElementByName("videoparse")
106 if err != nil {
107 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err)
108 }
109 videoParsePad := videoParse.GetStaticPad("sink")
110 if videoParsePad == nil {
111 return nil, fmt.Errorf("video parse pad not found")
112 }
113 linked := videoPad.Link(videoParsePad)
114 if linked != gst.PadLinkOK {
115 return nil, fmt.Errorf("failed to link video pad to video parse pad: %v", linked)
116 }
117
118 audioParse, err := pipeline.GetElementByName("audioparse")
119 if err != nil {
120 return nil, fmt.Errorf("failed to get audio parse element from pipeline: %w", err)
121 }
122 audioParsePad := audioParse.GetStaticPad("sink")
123 if audioParsePad == nil {
124 return nil, fmt.Errorf("audio parse pad not found")
125 }
126 linked = audioPad.Link(audioParsePad)
127 if linked != gst.PadLinkOK {
128 return nil, fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked)
129 }
130
131 videoappsinkele, err := pipeline.GetElementByName("videoappsink")
132 if err != nil {
133 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err)
134 }
135
136 audioappsinkele, err := pipeline.GetElementByName("audioappsink")
137 if err != nil {
138 return nil, fmt.Errorf("failed to get audio sink element from pipeline: %w", err)
139 }
140
141 // Create a new RTCPeerConnection
142 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig)
143 if err != nil {
144 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err)
145 }
146 go func() {
147 <-ctx.Done()
148 if cErr := peerConnection.Close(); cErr != nil {
149 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
150 }
151 }()
152
153 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
154 if err != nil {
155 return nil, fmt.Errorf("failed to create video track: %w", err)
156 }
157 videoRTPSender, err := peerConnection.AddTrack(videoTrack)
158 if err != nil {
159 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err)
160 }
161
162 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
163 if err != nil {
164 return nil, fmt.Errorf("failed to create audio track: %w", err)
165 }
166 audioRTPSender, err := peerConnection.AddTrack(audioTrack)
167 if err != nil {
168 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err)
169 }
170
171 // Set the remote SessionDescription
172 if err = peerConnection.SetRemoteDescription(*offer); err != nil {
173 return nil, fmt.Errorf("failed to set remote description: %w", err)
174 }
175
176 // Create answer
177 answer, err := peerConnection.CreateAnswer(nil)
178 if err != nil {
179 return nil, fmt.Errorf("failed to create answer: %w", err)
180 }
181
182 // Sets the LocalDescription, and starts our UDP listeners
183 if err = peerConnection.SetLocalDescription(answer); err != nil {
184 return nil, fmt.Errorf("failed to set local description: %w", err)
185 }
186
187 // Create channel that is blocked until ICE Gathering is complete
188 gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
189
190 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user.
191
192 go func() {
193 ticker := time.NewTicker(time.Second * 1)
194 for {
195 select {
196 case <-ctx.Done():
197 return
198 case <-ticker.C:
199 state := pipeline.GetCurrentState()
200 log.Debug(ctx, "pipeline state", "state", state)
201 }
202 }
203 }()
204
205 var lastVideoDuration = &DefaultDuration
206
207 go func() {
208 go func() {
209 if err := HandleBusMessages(ctx, pipeline); err != nil {
210 log.Log(ctx, "pipeline error", "error", err)
211 }
212 cancel()
213 }()
214
215 videoappsink := app.SinkFromElement(videoappsinkele)
216 videoappsink.SetCallbacks(&app.SinkCallbacks{
217 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
218 sample := sink.PullSample()
219 if sample == nil {
220 return gst.FlowEOS
221 }
222
223 buffer := sample.GetBuffer()
224 if buffer == nil {
225 return gst.FlowError
226 }
227
228 samples := buffer.Map(gst.MapRead).Bytes()
229 defer buffer.Unmap()
230 clockTime := buffer.Duration()
231 dur := clockTime.AsDuration()
232 mediaSample := media.Sample{Data: samples}
233 if dur != nil {
234 mediaSample.Duration = *dur
235 lastVideoDuration = dur
236 } else if lastVideoDuration != nil {
237 // log.Log(ctx, "no video duration, using last duration", "lastVideoDuration", lastVideoDuration)
238 mediaSample.Duration = *lastVideoDuration
239 } else {
240 log.Log(ctx, "no video duration", "samples", len(samples))
241 // cancel()
242 return gst.FlowOK
243 }
244
245 if err := videoTrack.WriteSample(mediaSample); err != nil {
246 log.Log(ctx, "failed to write video sample", "error", err)
247 cancel()
248 }
249
250 return gst.FlowOK
251 },
252 EOSFunc: func(sink *app.Sink) {
253 log.Warn(ctx, "videoappsink EOSFunc")
254 cancel()
255 },
256 })
257
258 audioappsink := app.SinkFromElement(audioappsinkele)
259 audioappsink.SetCallbacks(&app.SinkCallbacks{
260 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn {
261 sample := sink.PullSample()
262 if sample == nil {
263 return gst.FlowEOS
264 }
265
266 buffer := sample.GetBuffer()
267 if buffer == nil {
268 return gst.FlowError
269 }
270
271 samples := buffer.Map(gst.MapRead).Bytes()
272 defer buffer.Unmap()
273
274 b2 := make([]byte, len(samples))
275 copy(b2, samples)
276
277 clockTime := buffer.Duration()
278 dur := clockTime.AsDuration()
279 mediaSample := media.Sample{Data: b2}
280 if dur != nil {
281 mediaSample.Duration = *dur
282 } else {
283 log.Log(ctx, "no audio duration", "samples", len(b2))
284 // cancel()
285 return gst.FlowOK
286 }
287 if err := audioTrack.WriteSample(mediaSample); err != nil {
288 log.Log(ctx, "failed to write audio sample", "error", err)
289 return gst.FlowOK
290 }
291
292 return gst.FlowOK
293 },
294 EOSFunc: func(sink *app.Sink) {
295 log.Warn(ctx, "audioappsink EOSFunc")
296 cancel()
297 },
298 })
299
300 // Start the pipeline
301 err := pipeline.SetState(gst.StatePlaying)
302 if err != nil {
303 log.Log(ctx, "failed to set pipeline state to null", "error", err)
304 }
305 mm.IncrementViewerCount(user, "webrtc")
306 defer mm.DecrementViewerCount(user, "webrtc")
307
308 go func() {
309 rtcpBuf := make([]byte, 1500)
310 for {
311 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil {
312 return
313 }
314 }
315 }()
316
317 go func() {
318 rtcpBuf := make([]byte, 1500)
319 for {
320 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil {
321 return
322 }
323 }
324 }()
325
326 // Set the handler for ICE connection state
327 // This will notify you when the peer has connected/disconnected
328 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
329 log.Log(ctx, "Connection State has changed", "state", connectionState.String())
330 })
331
332 // Set the handler for Peer connection state
333 // This will notify you when the peer has connected/disconnected
334 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
335 log.Log(ctx, "Peer Connection State has changed", "state", s.String())
336
337 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected {
338 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
339 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
340 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
341 log.Log(ctx, "Peer Connection has gone to failed, exiting")
342 cancel()
343 }
344 })
345
346 <-ctx.Done()
347
348 log.Warn(ctx, "setting playback pipeline state to null")
349 err = pipeline.BlockSetState(gst.StateNull)
350 if err != nil {
351 log.Log(ctx, "failed to set pipeline state to null", "error", err)
352 }
353
354 videoappsink.SetCallbacks(&app.SinkCallbacks{})
355 err = videoappsinkele.SetState(gst.StateNull)
356 if err != nil {
357 log.Log(ctx, "failed to set videoappsinkele state to null", "error", err)
358 }
359
360 audioappsink.SetCallbacks(&app.SinkCallbacks{})
361 err = audioappsinkele.SetState(gst.StateNull)
362 if err != nil {
363 log.Log(ctx, "failed to set audioappsinkele state to null", "error", err)
364 }
365
366 log.Warn(ctx, "exiting playback")
367
368 }()
369 select {
370 case <-gatherComplete:
371 return peerConnection.LocalDescription(), nil
372 case <-ctx.Done():
373 return nil, ctx.Err()
374 }
375}