Live video on the AT Protocol
1package media
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/google/uuid"
9 "github.com/pion/webrtc/v4"
10 "github.com/pion/webrtc/v4/pkg/media"
11 "golang.org/x/sync/errgroup"
12 "stream.place/streamplace/pkg/bus"
13 "stream.place/streamplace/pkg/log"
14)
15
16// This function remains in scope for the duration of a single users' playback
17func (mm *MediaManager) WebRTCPlayback2(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
18 uu, err := uuid.NewV7()
19 if err != nil {
20 return nil, err
21 }
22 ctx = log.WithLogValues(ctx, "webrtcID", uu.String())
23 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback")
24
25 // Create a new RTCPeerConnection
26 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig)
27 if err != nil {
28 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err)
29 }
30
31 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
32 if err != nil {
33 return nil, fmt.Errorf("failed to create video track: %w", err)
34 }
35 videoRTPSender, err := peerConnection.AddTrack(videoTrack)
36 if err != nil {
37 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err)
38 }
39
40 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
41 if err != nil {
42 return nil, fmt.Errorf("failed to create audio track: %w", err)
43 }
44 audioRTPSender, err := peerConnection.AddTrack(audioTrack)
45 if err != nil {
46 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err)
47 }
48
49 close := func() {
50 if cErr := peerConnection.Close(); cErr != nil {
51 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
52 }
53 }
54
55 // Set the remote SessionDescription
56 if err = peerConnection.SetRemoteDescription(*offer); err != nil {
57 close()
58 return nil, fmt.Errorf("failed to set remote description: %w", err)
59 }
60
61 // Create answer
62 answer, err := peerConnection.CreateAnswer(nil)
63 if err != nil {
64 close()
65 return nil, fmt.Errorf("failed to create answer: %w", err)
66 }
67
68 // Sets the LocalDescription, and starts our UDP listeners
69 if err = peerConnection.SetLocalDescription(answer); err != nil {
70 close()
71 return nil, fmt.Errorf("failed to set local description: %w", err)
72 }
73
74 // Create channel that is blocked until ICE Gathering is complete
75 gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
76
77 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user.
78
79 go func() {
80 ctx, cancel := context.WithCancel(ctx)
81 defer cancel()
82
83 latency := time.Duration(0)
84
85 packetQueue := make(chan *bus.PacketizedSegment, 1024)
86 go func() {
87 segChan := mm.bus.SubscribeSegmentBuf(ctx, user, rendition, 2)
88 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan)
89 for {
90 select {
91 case <-ctx.Done():
92 log.Debug(ctx, "exiting segment reader")
93 return
94 case file := <-segChan.C:
95 log.Debug(ctx, "got segment", "file", file.Filepath)
96 latency += file.PacketizedData.Duration
97 packetQueue <- file.PacketizedData
98 }
99 }
100 }()
101
102 go func() {
103 go func() {
104 <-ctx.Done()
105 if cErr := peerConnection.Close(); cErr != nil {
106 log.Log(ctx, "cannot close peerConnection: %v\n", cErr)
107 }
108 }()
109
110 var scalar float64 = 1
111
112 for {
113 select {
114 case <-ctx.Done():
115 return
116 case packet := <-packetQueue:
117 latency -= packet.Duration
118 scalar = getPlaybackRate(latency)
119 log.Debug(ctx, "playback latency", "latency", latency, "scalar", scalar)
120 var videoDur time.Duration
121 var audioDur time.Duration
122 if len(packet.Video) > 0 {
123 videoDur = packet.Duration / time.Duration(len(packet.Video))
124 }
125 if len(packet.Audio) > 0 {
126 audioDur = packet.Duration / time.Duration(len(packet.Audio))
127 }
128 g, _ := errgroup.WithContext(ctx)
129
130 if videoDur > 0 {
131 g.Go(func() error {
132 ticker := time.NewTicker(time.Duration(float64(videoDur) * (1 / scalar)))
133 defer ticker.Stop()
134 for _, video := range packet.Video {
135 // log.Log(ctx, "writing video sample", "duration", videoDur)
136 err := videoTrack.WriteSample(media.Sample{Data: video, Duration: videoDur})
137 if err != nil {
138 return fmt.Errorf("failed to write video sample: %w", err)
139 }
140
141 select {
142 case <-ctx.Done():
143 return nil
144 case <-ticker.C:
145 continue
146 }
147 }
148 return nil
149 })
150 } else {
151 log.Warn(ctx, "no video samples to write")
152 }
153 if audioDur > 0 {
154 g.Go(func() error {
155 ticker := time.NewTicker(time.Duration(float64(audioDur) * (1 / scalar)))
156 defer ticker.Stop()
157 for _, audio := range packet.Audio {
158 err := audioTrack.WriteSample(media.Sample{Data: audio, Duration: audioDur})
159 if err != nil {
160 return fmt.Errorf("failed to write audio sample: %w", err)
161 }
162 select {
163 case <-ctx.Done():
164 return nil
165 case <-ticker.C:
166 continue
167 }
168 }
169 return nil
170 })
171
172 if err := g.Wait(); err != nil {
173 log.Error(ctx, "failed to write samples", "error", err)
174 cancel()
175 }
176 } else {
177 log.Warn(ctx, "no audio samples to write")
178 }
179 }
180 }
181 }()
182
183 mm.IncrementViewerCount(user, "webrtc")
184 defer mm.DecrementViewerCount(user, "webrtc")
185
186 go func() {
187 rtcpBuf := make([]byte, 1500)
188 for {
189 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil {
190 return
191 }
192 }
193 }()
194
195 go func() {
196 rtcpBuf := make([]byte, 1500)
197 for {
198 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil {
199 return
200 }
201 }
202 }()
203
204 // Set the handler for ICE connection state
205 // This will notify you when the peer has connected/disconnected
206 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
207 log.Log(ctx, "Connection State has changed", "state", connectionState.String())
208 })
209
210 // Set the handler for Peer connection state
211 // This will notify you when the peer has connected/disconnected
212 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
213 log.Log(ctx, "Peer Connection State has changed", "state", s.String())
214
215 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected {
216 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
217 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
218 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
219 log.Log(ctx, "Peer Connection has gone to failed, exiting")
220 cancel()
221 }
222 })
223
224 <-ctx.Done()
225
226 log.Warn(ctx, "exiting playback")
227
228 }()
229 select {
230 case <-gatherComplete:
231 return peerConnection.LocalDescription(), nil
232 case <-ctx.Done():
233 return nil, ctx.Err()
234 }
235}
236
237// getPlaybackRate returns a playback rate that eases from 1.0 to 1.5 between 7 and 60 seconds
238func getPlaybackRate(dur time.Duration) float64 {
239 switch {
240 case dur <= 7*time.Second:
241 return 1.0
242 case dur >= 60*time.Second:
243 return 1.5
244 default:
245 // Linear interpolation between (7,1.0) and (60,1.5)
246 progress := (float64(dur) - float64(7*time.Second)) / (float64(60*time.Second) - float64(7*time.Second))
247 return 1.0 + (0.5 * progress)
248 }
249}