Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/go-gst/go-gst/gst"
10 "github.com/go-gst/go-gst/gst/app"
11 "github.com/google/uuid"
12 "github.com/pion/rtcp"
13 "github.com/pion/webrtc/v4"
14 "stream.place/streamplace/pkg/log"
15 "stream.place/streamplace/pkg/rtcrec"
16)
17
18// This function remains in scope for the duration of a single users' playback
19func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner, peerConnection rtcrec.PeerConnection, done chan error) (*webrtc.SessionDescription, error) {
20 uu, err := uuid.NewV7()
21 if err != nil {
22 return nil, err
23 }
24
25 ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest", "streamer", signer.Streamer())
26
27 // Allow us to receive 1 audio track, and 1 video track
28 if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil {
29 return nil, fmt.Errorf("failed to add audio transceiver: %w", err)
30 } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil {
31 return nil, fmt.Errorf("failed to add video transceiver: %w", err)
32 }
33
34 pipelineSlice := []string{
35 "multiqueue name=queue",
36 "appsrc format=time is-live=true do-timestamp=true name=videosrc ! capsfilter caps=application/x-rtp ! rtph264depay ! capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=nal ! h264parse disable-passthrough=true config-interval=-1 ! h264timestamper ! identity ! queue.sink_0",
37 "appsrc format=time do-timestamp=true name=audiosrc ! capsfilter caps=application/x-rtp,media=audio,encoding-name=OPUS,payload=111 ! rtpopusdepay ! opusparse ! queue.sink_1",
38 }
39
40 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
41 if err != nil {
42 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err)
43 }
44
45 queue, err := pipeline.GetElementByName("queue")
46 if err != nil {
47 return nil, fmt.Errorf("failed to get queue element from pipeline: %w", err)
48 }
49
50 // err = queue.Link(signerElem)
51 // if err != nil {
52 // return nil, fmt.Errorf("failed to link queue to signer element: %w", err)
53 // }
54 videoSrcPads, err := queue.GetSrcPads()
55 if err != nil {
56 return nil, fmt.Errorf("failed to get videoSrcPads from queue: %w", err)
57 }
58 if len(videoSrcPads) != 2 {
59 return nil, fmt.Errorf("failed to get videoSrcPads from queue")
60 }
61 videoSrcPad := videoSrcPads[0]
62 audioSrcPad := videoSrcPads[1]
63
64 videoSrcElem, err := pipeline.GetElementByName("videosrc")
65 if err != nil {
66 return nil, fmt.Errorf("failed to get videoSrcElem element from pipeline: %w", err)
67 }
68 videoSrc := app.SrcFromElement(videoSrcElem)
69
70 audioSrcElem, err := pipeline.GetElementByName("audiosrc")
71 if err != nil {
72 return nil, fmt.Errorf("failed to get audioSrcElem element from pipeline: %w", err)
73 }
74 audioSrc := app.SrcFromElement(audioSrcElem)
75
76 // Set the remote SessionDescription
77 if err = peerConnection.SetRemoteDescription(*offer); err != nil {
78 return nil, fmt.Errorf("failed to set remote description: %w", err)
79 }
80
81 // Create answer
82 answer, err := peerConnection.CreateAnswer(nil)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create answer: %w", err)
85 }
86
87 // Sets the LocalDescription, and starts our UDP listeners
88 if err = peerConnection.SetLocalDescription(answer); err != nil {
89 return nil, fmt.Errorf("failed to set local description: %w", err)
90 }
91
92 // Create channel that is blocked until ICE Gathering is complete
93 gatherComplete := rtcrec.GatheringCompletePromise(peerConnection)
94
95 ctx, cancel := context.WithCancel(ctx)
96 signerElem, err := mm.SegmentAndSignElem(ctx, signer)
97 if err != nil {
98 cancel()
99 return nil, fmt.Errorf("failed create signer element: %w", err)
100 }
101 err = pipeline.Add(signerElem)
102 if err != nil {
103 cancel()
104 return nil, fmt.Errorf("failed to add signer element to pipeline: %w", err)
105 }
106 signerElemPads, err := signerElem.GetPads()
107 if err != nil {
108 cancel()
109 return nil, fmt.Errorf("failed to get signerElemPads from signer element: %w", err)
110 }
111 if len(signerElemPads) != 2 {
112 cancel()
113 return nil, fmt.Errorf("failed to get signerElemPads from signer element")
114 }
115 signerElemVideoPad := signerElemPads[0]
116 signerElemAudioPad := signerElemPads[1]
117 linked := videoSrcPad.Link(signerElemVideoPad)
118 if linked != gst.PadLinkOK {
119 cancel()
120 return nil, fmt.Errorf("failed to link videoSrcPad to signerElemVideoPad")
121 }
122 linked = audioSrcPad.Link(signerElemAudioPad)
123 if linked != gst.PadLinkOK {
124 cancel()
125 return nil, fmt.Errorf("failed to link audioSrcPad to signerElemAudioPad")
126 }
127
128 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user.
129 go func() {
130 busErrorChan := make(chan error)
131 go func() {
132 err := HandleBusMessages(ctx, pipeline)
133 if err != nil {
134 log.Log(ctx, "pipeline error", "error", err)
135 }
136 cancel()
137 busErrorChan <- err
138 }()
139
140 defer cancel()
141 defer func() { done <- <-busErrorChan }()
142
143 go func() {
144 ticker := time.NewTicker(time.Second * 1)
145 for {
146 select {
147 case <-ctx.Done():
148 return
149 case <-ticker.C:
150 state := pipeline.GetCurrentState()
151 log.Debug(ctx, "pipeline state", "state", state)
152 }
153 }
154 }()
155
156 // subscription to bus messages for key revocation
157 go mm.HandleKeyRevocation(ctx, signer, pipeline)
158
159 go func() {
160 <-ctx.Done()
161 if cErr := peerConnection.Close(); cErr != nil {
162 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
163 }
164 }()
165
166 log.Debug(ctx, "starting pipeline")
167
168 // Start the pipeline
169 err = pipeline.SetState(gst.StatePlaying)
170 if err != nil {
171 log.Log(ctx, "failed to set pipeline state", "error", err)
172 cancel()
173 }
174
175 // Set the handler for ICE connection state
176 // This will notify you when the peer has connected/disconnected
177 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
178 log.Log(ctx, "Connection State has changed", "state", connectionState.String())
179 })
180
181 // Set the handler for Peer connection state
182 // This will notify you when the peer has connected/disconnected
183 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
184 log.Log(ctx, "Peer Connection State has changed", "state", s.String())
185
186 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected {
187 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
188 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
189 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
190 log.Log(ctx, "Peer Connection has ended, exiting", "state", s.String())
191 cancel()
192 }
193 })
194
195 videoFirst := false
196 audioFirst := false
197
198 peerConnection.OnTrack(func(track rtcrec.TrackRemote, _ rtcrec.RTPReceiver) {
199 log.Warn(ctx, "OnTrack", "kind", track.Kind())
200 if track.Kind() == webrtc.RTPCodecTypeVideo {
201 // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
202 go func() {
203 ticker := time.NewTicker(time.Second * 1)
204 for {
205 select {
206 case <-ctx.Done():
207 return
208 case <-ticker.C:
209 rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}})
210 if rtcpSendErr != nil {
211 log.Log(ctx, "failed to send rtcp packet", "error", rtcpSendErr)
212 cancel()
213 return
214 }
215 }
216 }
217 }()
218
219 codecName := strings.Split(track.Codec().MimeType, "/")[1]
220 log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName)
221
222 // appSrc := pipelineForCodec(track, codecName)
223 buf := make([]byte, 1400)
224 for {
225 i, _, readErr := track.Read(buf)
226 if readErr != nil {
227 log.Log(ctx, "failed to read track", "error", readErr)
228 videoSrc.EndStream()
229 return
230 }
231 // if ctx.Err() != nil {
232 // return
233 // }
234 if !videoFirst {
235 videoFirst = true
236 log.Debug(ctx, "got video data", "len", len(buf[:i]))
237 }
238
239 gbuf := gst.NewBufferWithSize(int64(len(buf[:i])))
240 gbuf.Map(gst.MapWrite).WriteData(buf[:i])
241 gbuf.Unmap()
242
243 ret := videoSrc.PushBuffer(gbuf)
244 if ret != gst.FlowOK {
245 log.Log(ctx, "failed to push buffer", "error", ret)
246 cancel()
247 return
248 }
249 // state := pipeline.GetCurrentState()
250 // if state != gst.StatePlaying {
251 // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state)
252 // cancel()
253 // return
254 // }
255 }
256 }
257 if track.Kind() == webrtc.RTPCodecTypeAudio {
258
259 codecName := strings.Split(track.Codec().MimeType, "/")[1]
260 log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName)
261
262 buf := make([]byte, 1400)
263 for {
264 i, _, readErr := track.Read(buf)
265 if readErr != nil {
266 log.Log(ctx, "failed to read track", "error", readErr)
267 audioSrc.EndStream()
268 return
269 }
270 // if ctx.Err() != nil {
271 // return
272 // }
273 if !audioFirst {
274 audioFirst = true
275 log.Debug(ctx, "got audio data", "len", len(buf[:i]))
276 }
277
278 gbuf := gst.NewBufferWithSize(int64(len(buf[:i])))
279 gbuf.Map(gst.MapWrite).WriteData(buf[:i])
280 gbuf.Unmap()
281 ret := audioSrc.PushBuffer(gbuf)
282 if ret != gst.FlowOK {
283 log.Log(ctx, "failed to push buffer", "error", ret)
284 cancel()
285 return
286 }
287 // state := pipeline.GetCurrentState()
288 // if state != gst.StatePlaying {
289 // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state)
290 // cancel()
291 // return
292 // }
293 }
294 }
295 })
296
297 <-ctx.Done()
298
299 if err := pipeline.BlockSetState(gst.StateNull); err != nil {
300 log.Log(ctx, "failed to set pipeline state to null", "error", err)
301 }
302
303 if err := audioSrcElem.SetState(gst.StateNull); err != nil {
304 log.Log(ctx, "failed to set audioSrcElem state to null", "error", err)
305 }
306
307 if err := videoSrcElem.SetState(gst.StateNull); err != nil {
308 log.Log(ctx, "failed to set videoSrcElem state to null", "error", err)
309 }
310
311 log.Log(ctx, "webrtc ingest pipeline done")
312
313 }()
314 select {
315 case <-gatherComplete:
316 return peerConnection.LocalDescription(), nil
317 case <-ctx.Done():
318 return nil, ctx.Err()
319 }
320}