Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/google/uuid"
9 "github.com/pion/webrtc/v4"
10 "github.com/pion/webrtc/v4/pkg/media"
11 "golang.org/x/sync/errgroup"
12 "stream.place/streamplace/pkg/bus"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/spmetrics"
15)
16
17// This function remains in scope for the duration of a single users' playback
18func (mm *MediaManager) WebRTCPlayback2(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
19 uu, err := uuid.NewV7()
20 if err != nil {
21 return nil, err
22 }
23 ctx = log.WithLogValues(ctx, "webrtcID", uu.String())
24 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback")
25
26 // Create a new RTCPeerConnection
27 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig)
28 if err != nil {
29 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err)
30 }
31
32 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
33 if err != nil {
34 return nil, fmt.Errorf("failed to create video track: %w", err)
35 }
36 videoRTPSender, err := peerConnection.AddTrack(videoTrack)
37 if err != nil {
38 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err)
39 }
40
41 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
42 if err != nil {
43 return nil, fmt.Errorf("failed to create audio track: %w", err)
44 }
45 audioRTPSender, err := peerConnection.AddTrack(audioTrack)
46 if err != nil {
47 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err)
48 }
49
50 // Set the remote SessionDescription
51 if err = peerConnection.SetRemoteDescription(*offer); err != nil {
52 return nil, fmt.Errorf("failed to set remote description: %w", err)
53 }
54
55 // Create answer
56 answer, err := peerConnection.CreateAnswer(nil)
57 if err != nil {
58 return nil, fmt.Errorf("failed to create answer: %w", err)
59 }
60
61 // Sets the LocalDescription, and starts our UDP listeners
62 if err = peerConnection.SetLocalDescription(answer); err != nil {
63 return nil, fmt.Errorf("failed to set local description: %w", err)
64 }
65
66 // Create channel that is blocked until ICE Gathering is complete
67 gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
68
69 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user.
70
71 go func() {
72 ctx, cancel := context.WithCancel(ctx)
73 defer cancel()
74
75 latency := time.Duration(0)
76
77 packetQueue := make(chan *bus.PacketizedSegment, 1024)
78 go func() {
79 segChan := mm.bus.SubscribeSegmentBuf(ctx, user, rendition, 2)
80 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan)
81 for {
82 select {
83 case <-ctx.Done():
84 log.Debug(ctx, "exiting segment reader")
85 return
86 case file := <-segChan.C:
87 log.Debug(ctx, "got segment", "file", file.Filepath)
88 latency += file.PacketizedData.Duration
89 packetQueue <- file.PacketizedData
90 }
91 }
92 }()
93
94 go func() {
95 go func() {
96 <-ctx.Done()
97 if cErr := peerConnection.Close(); cErr != nil {
98 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
99 }
100 }()
101
102 var scalar float64 = 1
103
104 for {
105 select {
106 case <-ctx.Done():
107 return
108 case packet := <-packetQueue:
109 latency -= packet.Duration
110 scalar = getPlaybackRate(latency)
111 log.Debug(ctx, "playback latency", "latency", latency, "scalar", scalar)
112 var videoDur time.Duration
113 var audioDur time.Duration
114 if len(packet.Video) > 0 {
115 videoDur = packet.Duration / time.Duration(len(packet.Video))
116 }
117 if len(packet.Audio) > 0 {
118 audioDur = packet.Duration / time.Duration(len(packet.Audio))
119 }
120 g, _ := errgroup.WithContext(ctx)
121
122 if videoDur > 0 {
123 g.Go(func() error {
124 ticker := time.NewTicker(time.Duration(float64(videoDur) * (1 / scalar)))
125 defer ticker.Stop()
126 for _, video := range packet.Video {
127 // log.Log(ctx, "writing video sample", "duration", videoDur)
128 err := videoTrack.WriteSample(media.Sample{Data: video, Duration: videoDur})
129 if err != nil {
130 return fmt.Errorf("failed to write video sample: %w", err)
131 }
132
133 select {
134 case <-ctx.Done():
135 return nil
136 case <-ticker.C:
137 continue
138 }
139 }
140 return nil
141 })
142 } else {
143 log.Warn(ctx, "no video samples to write")
144 }
145 if audioDur > 0 {
146 g.Go(func() error {
147 ticker := time.NewTicker(time.Duration(float64(audioDur) * (1 / scalar)))
148 defer ticker.Stop()
149 for _, audio := range packet.Audio {
150 err := audioTrack.WriteSample(media.Sample{Data: audio, Duration: audioDur})
151 if err != nil {
152 return fmt.Errorf("failed to write audio sample: %w", err)
153 }
154 select {
155 case <-ctx.Done():
156 return nil
157 case <-ticker.C:
158 continue
159 }
160 }
161 return nil
162 })
163
164 if err := g.Wait(); err != nil {
165 log.Error(ctx, "failed to write samples", "error", err)
166 cancel()
167 }
168 } else {
169 log.Warn(ctx, "no audio samples to write")
170 }
171 }
172 }
173 }()
174
175 spmetrics.ViewerInc(user)
176 defer spmetrics.ViewerDec(user)
177
178 go func() {
179 rtcpBuf := make([]byte, 1500)
180 for {
181 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil {
182 return
183 }
184 }
185 }()
186
187 go func() {
188 rtcpBuf := make([]byte, 1500)
189 for {
190 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil {
191 return
192 }
193 }
194 }()
195
196 // Set the handler for ICE connection state
197 // This will notify you when the peer has connected/disconnected
198 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
199 log.Log(ctx, "Connection State has changed", "state", connectionState.String())
200 })
201
202 // Set the handler for Peer connection state
203 // This will notify you when the peer has connected/disconnected
204 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
205 log.Log(ctx, "Peer Connection State has changed", "state", s.String())
206
207 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected {
208 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
209 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
210 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
211 log.Log(ctx, "Peer Connection has gone to failed, exiting")
212 cancel()
213 }
214 })
215
216 <-ctx.Done()
217
218 log.Warn(ctx, "exiting playback")
219
220 }()
221 select {
222 case <-gatherComplete:
223 return peerConnection.LocalDescription(), nil
224 case <-ctx.Done():
225 return nil, ctx.Err()
226 }
227}
228
229// getPlaybackRate returns a playback rate that eases from 1.0 to 1.5 between 7 and 60 seconds
230func getPlaybackRate(dur time.Duration) float64 {
231 switch {
232 case dur <= 7*time.Second:
233 return 1.0
234 case dur >= 60*time.Second:
235 return 1.5
236 default:
237 // Linear interpolation between (7,1.0) and (60,1.5)
238 progress := (float64(dur) - float64(7*time.Second)) / (float64(60*time.Second) - float64(7*time.Second))
239 return 1.0 + (0.5 * progress)
240 }
241}