Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "github.com/google/uuid"
12 "github.com/pion/rtcp"
13 "github.com/pion/webrtc/v4"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/rtcrec"
16)
17
18// This function remains in scope for the duration of a single users' playback
19func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner, peerConnection rtcrec.PeerConnection, done chan struct{}) (*webrtc.SessionDescription, error) {
20 uu, err := uuid.NewV7()
21 if err != nil {
22 return nil, err
23 }
24
25 ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest", "streamer", signer.Streamer())
26
27 // Allow us to receive 1 audio track, and 1 video track
28 if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil {
29 return nil, fmt.Errorf("failed to add audio transceiver: %w", err)
30 } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil {
31 return nil, fmt.Errorf("failed to add video transceiver: %w", err)
32 }
33
34 pipelineSlice := []string{
35 "multiqueue name=queue",
36 "appsrc format=time is-live=true do-timestamp=true name=videosrc ! capsfilter caps=application/x-rtp ! rtph264depay ! capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=nal ! h264parse disable-passthrough=true config-interval=-1 ! h264timestamper ! identity ! queue.sink_0",
37 "appsrc format=time do-timestamp=true name=audiosrc ! capsfilter caps=application/x-rtp,media=audio,encoding-name=OPUS,payload=111 ! rtpopusdepay ! opusparse ! queue.sink_1",
38 }
39
40 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
41 if err != nil {
42 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err)
43 }
44
45 queue, err := pipeline.GetElementByName("queue")
46 if err != nil {
47 return nil, fmt.Errorf("failed to get queue element from pipeline: %w", err)
48 }
49
50 // err = queue.Link(signerElem)
51 // if err != nil {
52 // return nil, fmt.Errorf("failed to link queue to signer element: %w", err)
53 // }
54 videoSrcPads, err := queue.GetSrcPads()
55 if err != nil {
56 return nil, fmt.Errorf("failed to get videoSrcPads from queue: %w", err)
57 }
58 if len(videoSrcPads) != 2 {
59 return nil, fmt.Errorf("failed to get videoSrcPads from queue")
60 }
61 videoSrcPad := videoSrcPads[0]
62 audioSrcPad := videoSrcPads[1]
63
64 videoSrcElem, err := pipeline.GetElementByName("videosrc")
65 if err != nil {
66 return nil, fmt.Errorf("failed to get videoSrcElem element from pipeline: %w", err)
67 }
68 videoSrc := app.SrcFromElement(videoSrcElem)
69
70 audioSrcElem, err := pipeline.GetElementByName("audiosrc")
71 if err != nil {
72 return nil, fmt.Errorf("failed to get audioSrcElem element from pipeline: %w", err)
73 }
74 audioSrc := app.SrcFromElement(audioSrcElem)
75
76 // Set the remote SessionDescription
77 if err = peerConnection.SetRemoteDescription(*offer); err != nil {
78 return nil, fmt.Errorf("failed to set remote description: %w", err)
79 }
80
81 // Create answer
82 answer, err := peerConnection.CreateAnswer(nil)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create answer: %w", err)
85 }
86
87 // Sets the LocalDescription, and starts our UDP listeners
88 if err = peerConnection.SetLocalDescription(answer); err != nil {
89 return nil, fmt.Errorf("failed to set local description: %w", err)
90 }
91
92 // Create channel that is blocked until ICE Gathering is complete
93 gatherComplete := rtcrec.GatheringCompletePromise(peerConnection)
94
95 ctx, cancel := context.WithCancel(ctx)
96 signerElem, err := mm.SegmentAndSignElem(ctx, signer)
97 if err != nil {
98 cancel()
99 return nil, fmt.Errorf("failed create signer element: %w", err)
100 }
101 err = pipeline.Add(signerElem)
102 if err != nil {
103 cancel()
104 return nil, fmt.Errorf("failed to add signer element to pipeline: %w", err)
105 }
106 signerElemPads, err := signerElem.GetPads()
107 if err != nil {
108 cancel()
109 return nil, fmt.Errorf("failed to get signerElemPads from signer element: %w", err)
110 }
111 if len(signerElemPads) != 2 {
112 cancel()
113 return nil, fmt.Errorf("failed to get signerElemPads from signer element")
114 }
115 signerElemVideoPad := signerElemPads[0]
116 signerElemAudioPad := signerElemPads[1]
117 linked := videoSrcPad.Link(signerElemVideoPad)
118 if linked != gst.PadLinkOK {
119 cancel()
120 return nil, fmt.Errorf("failed to link videoSrcPad to signerElemVideoPad")
121 }
122 linked = audioSrcPad.Link(signerElemAudioPad)
123 if linked != gst.PadLinkOK {
124 cancel()
125 return nil, fmt.Errorf("failed to link audioSrcPad to signerElemAudioPad")
126 }
127
128 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user.
129 go func() {
130 defer cancel()
131 defer func() { close(done) }()
132
133 go func() {
134 ticker := time.NewTicker(time.Second * 1)
135 for {
136 select {
137 case <-ctx.Done():
138 return
139 case <-ticker.C:
140 state := pipeline.GetCurrentState()
141 log.Debug(ctx, "pipeline state", "state", state)
142 }
143 }
144 }()
145
146 go func() {
147 if err := HandleBusMessages(ctx, pipeline); err != nil {
148 log.Log(ctx, "pipeline error", "error", err)
149 }
150 cancel()
151 }()
152
153 // subscription to bus messages for key revocation
154 go mm.HandleKeyRevocation(ctx, signer, pipeline)
155
156 go func() {
157 <-ctx.Done()
158 if cErr := peerConnection.Close(); cErr != nil {
159 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
160 }
161 }()
162
163 log.Debug(ctx, "starting pipeline")
164
165 // Start the pipeline
166 err = pipeline.SetState(gst.StatePlaying)
167 if err != nil {
168 log.Log(ctx, "failed to set pipeline state", "error", err)
169 cancel()
170 }
171
172 // Set the handler for ICE connection state
173 // This will notify you when the peer has connected/disconnected
174 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
175 log.Log(ctx, "Connection State has changed", "state", connectionState.String())
176 })
177
178 // Set the handler for Peer connection state
179 // This will notify you when the peer has connected/disconnected
180 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
181 log.Log(ctx, "Peer Connection State has changed", "state", s.String())
182
183 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected {
184 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
185 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
186 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
187 log.Log(ctx, "Peer Connection has ended, exiting", "state", s.String())
188 cancel()
189 }
190 })
191
192 videoFirst := false
193 audioFirst := false
194
195 peerConnection.OnTrack(func(track rtcrec.TrackRemote, _ rtcrec.RTPReceiver) {
196 log.Warn(ctx, "OnTrack", "kind", track.Kind())
197 if track.Kind() == webrtc.RTPCodecTypeVideo {
198 // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
199 go func() {
200 ticker := time.NewTicker(time.Second * 1)
201 for {
202 select {
203 case <-ctx.Done():
204 return
205 case <-ticker.C:
206 rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}})
207 if rtcpSendErr != nil {
208 log.Log(ctx, "failed to send rtcp packet", "error", rtcpSendErr)
209 cancel()
210 return
211 }
212 }
213 }
214 }()
215
216 codecName := strings.Split(track.Codec().MimeType, "/")[1]
217 log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName)
218
219 // appSrc := pipelineForCodec(track, codecName)
220 buf := make([]byte, 1400)
221 for {
222 i, _, readErr := track.Read(buf)
223 if readErr != nil {
224 log.Log(ctx, "failed to read track", "error", readErr)
225 cancel()
226 return
227 }
228 if ctx.Err() != nil {
229 return
230 }
231 if !videoFirst {
232 videoFirst = true
233 log.Debug(ctx, "got video data", "len", len(buf[:i]))
234 }
235
236 gbuf := gst.NewBufferWithSize(int64(len(buf[:i])))
237 gbuf.Map(gst.MapWrite).WriteData(buf[:i])
238 gbuf.Unmap()
239
240 ret := videoSrc.PushBuffer(gbuf)
241 if ret != gst.FlowOK {
242 log.Log(ctx, "failed to push buffer", "error", ret)
243 cancel()
244 return
245 }
246 // state := pipeline.GetCurrentState()
247 // if state != gst.StatePlaying {
248 // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state)
249 // cancel()
250 // return
251 // }
252 }
253 }
254 if track.Kind() == webrtc.RTPCodecTypeAudio {
255
256 codecName := strings.Split(track.Codec().MimeType, "/")[1]
257 log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName)
258
259 buf := make([]byte, 1400)
260 for {
261 i, _, readErr := track.Read(buf)
262 if readErr != nil {
263 log.Log(ctx, "failed to read track", "error", readErr)
264 cancel()
265 return
266 }
267 if ctx.Err() != nil {
268 return
269 }
270 if !audioFirst {
271 audioFirst = true
272 log.Debug(ctx, "got audio data", "len", len(buf[:i]))
273 }
274
275 gbuf := gst.NewBufferWithSize(int64(len(buf[:i])))
276 gbuf.Map(gst.MapWrite).WriteData(buf[:i])
277 gbuf.Unmap()
278 ret := audioSrc.PushBuffer(gbuf)
279 if ret != gst.FlowOK {
280 log.Log(ctx, "failed to push buffer", "error", ret)
281 cancel()
282 return
283 }
284 // state := pipeline.GetCurrentState()
285 // if state != gst.StatePlaying {
286 // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state)
287 // cancel()
288 // return
289 // }
290 }
291 }
292 })
293
294 <-ctx.Done()
295
296 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
297 log.Log(ctx, "failed to set pipeline state to null", "error", err)
298 }
299
300 if err := audioSrcElem.SetState(gst.StateNull); err != nil {
301 log.Log(ctx, "failed to set audioSrcElem state to null", "error", err)
302 }
303
304 if err := videoSrcElem.SetState(gst.StateNull); err != nil {
305 log.Log(ctx, "failed to set videoSrcElem state to null", "error", err)
306 }
307
308 log.Log(ctx, "webrtc ingest pipeline done")
309
310 }()
311 select {
312 case <-gatherComplete:
313 return peerConnection.LocalDescription(), nil
314 case <-ctx.Done():
315 return nil, ctx.Err()
316 }
317}