Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/google/uuid"
9 "github.com/pion/webrtc/v4"
10 "github.com/pion/webrtc/v4/pkg/media"
11 "golang.org/x/sync/errgroup"
12 "stream.place/streamplace/pkg/bus"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/spmetrics"
15)
16
17// This function remains in scope for the duration of a single users' playback
18func (mm *MediaManager) WebRTCPlayback2(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
19 uu, err := uuid.NewV7()
20 if err != nil {
21 return nil, err
22 }
23 ctx = log.WithLogValues(ctx, "webrtcID", uu.String())
24 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback")
25
26 // Create a new RTCPeerConnection
27 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig)
28 if err != nil {
29 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err)
30 }
31
32 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
33 if err != nil {
34 return nil, fmt.Errorf("failed to create video track: %w", err)
35 }
36 videoRTPSender, err := peerConnection.AddTrack(videoTrack)
37 if err != nil {
38 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err)
39 }
40
41 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
42 if err != nil {
43 return nil, fmt.Errorf("failed to create audio track: %w", err)
44 }
45 audioRTPSender, err := peerConnection.AddTrack(audioTrack)
46 if err != nil {
47 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err)
48 }
49
50 close := func() {
51 if cErr := peerConnection.Close(); cErr != nil {
52 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
53 }
54 }
55
56 // Set the remote SessionDescription
57 if err = peerConnection.SetRemoteDescription(*offer); err != nil {
58 close()
59 return nil, fmt.Errorf("failed to set remote description: %w", err)
60 }
61
62 // Create answer
63 answer, err := peerConnection.CreateAnswer(nil)
64 if err != nil {
65 close()
66 return nil, fmt.Errorf("failed to create answer: %w", err)
67 }
68
69 // Sets the LocalDescription, and starts our UDP listeners
70 if err = peerConnection.SetLocalDescription(answer); err != nil {
71 close()
72 return nil, fmt.Errorf("failed to set local description: %w", err)
73 }
74
75 // Create channel that is blocked until ICE Gathering is complete
76 gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
77
78 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user.
79
80 go func() {
81 ctx, cancel := context.WithCancel(ctx)
82 defer cancel()
83
84 latency := time.Duration(0)
85
86 packetQueue := make(chan *bus.PacketizedSegment, 1024)
87 go func() {
88 segChan := mm.bus.SubscribeSegmentBuf(ctx, user, rendition, 2)
89 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan)
90 for {
91 select {
92 case <-ctx.Done():
93 log.Debug(ctx, "exiting segment reader")
94 return
95 case file := <-segChan.C:
96 log.Debug(ctx, "got segment", "file", file.Filepath)
97 latency += file.PacketizedData.Duration
98 packetQueue <- file.PacketizedData
99 }
100 }
101 }()
102
103 go func() {
104 go func() {
105 <-ctx.Done()
106 if cErr := peerConnection.Close(); cErr != nil {
107 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
108 }
109 }()
110
111 var scalar float64 = 1
112
113 for {
114 select {
115 case <-ctx.Done():
116 return
117 case packet := <-packetQueue:
118 latency -= packet.Duration
119 scalar = getPlaybackRate(latency)
120 log.Debug(ctx, "playback latency", "latency", latency, "scalar", scalar)
121 var videoDur time.Duration
122 var audioDur time.Duration
123 if len(packet.Video) > 0 {
124 videoDur = packet.Duration / time.Duration(len(packet.Video))
125 }
126 if len(packet.Audio) > 0 {
127 audioDur = packet.Duration / time.Duration(len(packet.Audio))
128 }
129 g, _ := errgroup.WithContext(ctx)
130
131 if videoDur > 0 {
132 g.Go(func() error {
133 ticker := time.NewTicker(time.Duration(float64(videoDur) * (1 / scalar)))
134 defer ticker.Stop()
135 for _, video := range packet.Video {
136 // log.Log(ctx, "writing video sample", "duration", videoDur)
137 err := videoTrack.WriteSample(media.Sample{Data: video, Duration: videoDur})
138 if err != nil {
139 return fmt.Errorf("failed to write video sample: %w", err)
140 }
141
142 select {
143 case <-ctx.Done():
144 return nil
145 case <-ticker.C:
146 continue
147 }
148 }
149 return nil
150 })
151 } else {
152 log.Warn(ctx, "no video samples to write")
153 }
154 if audioDur > 0 {
155 g.Go(func() error {
156 ticker := time.NewTicker(time.Duration(float64(audioDur) * (1 / scalar)))
157 defer ticker.Stop()
158 for _, audio := range packet.Audio {
159 err := audioTrack.WriteSample(media.Sample{Data: audio, Duration: audioDur})
160 if err != nil {
161 return fmt.Errorf("failed to write audio sample: %w", err)
162 }
163 select {
164 case <-ctx.Done():
165 return nil
166 case <-ticker.C:
167 continue
168 }
169 }
170 return nil
171 })
172
173 if err := g.Wait(); err != nil {
174 log.Error(ctx, "failed to write samples", "error", err)
175 cancel()
176 }
177 } else {
178 log.Warn(ctx, "no audio samples to write")
179 }
180 }
181 }
182 }()
183
184 spmetrics.ViewerInc(user)
185 defer spmetrics.ViewerDec(user)
186
187 go func() {
188 rtcpBuf := make([]byte, 1500)
189 for {
190 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil {
191 return
192 }
193 }
194 }()
195
196 go func() {
197 rtcpBuf := make([]byte, 1500)
198 for {
199 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil {
200 return
201 }
202 }
203 }()
204
205 // Set the handler for ICE connection state
206 // This will notify you when the peer has connected/disconnected
207 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
208 log.Log(ctx, "Connection State has changed", "state", connectionState.String())
209 })
210
211 // Set the handler for Peer connection state
212 // This will notify you when the peer has connected/disconnected
213 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
214 log.Log(ctx, "Peer Connection State has changed", "state", s.String())
215
216 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected {
217 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
218 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
219 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
220 log.Log(ctx, "Peer Connection has gone to failed, exiting")
221 cancel()
222 }
223 })
224
225 <-ctx.Done()
226
227 log.Warn(ctx, "exiting playback")
228
229 }()
230 select {
231 case <-gatherComplete:
232 return peerConnection.LocalDescription(), nil
233 case <-ctx.Done():
234 return nil, ctx.Err()
235 }
236}
237
238// getPlaybackRate returns a playback rate that eases from 1.0 to 1.5 between 7 and 60 seconds
239func getPlaybackRate(dur time.Duration) float64 {
240 switch {
241 case dur <= 7*time.Second:
242 return 1.0
243 case dur >= 60*time.Second:
244 return 1.5
245 default:
246 // Linear interpolation between (7,1.0) and (60,1.5)
247 progress := (float64(dur) - float64(7*time.Second)) / (float64(60*time.Second) - float64(7*time.Second))
248 return 1.0 + (0.5 * progress)
249 }
250}