Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at natb/analytics 249 lines 7.5 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "github.com/google/uuid" 9 "github.com/pion/webrtc/v4" 10 "github.com/pion/webrtc/v4/pkg/media" 11 "golang.org/x/sync/errgroup" 12 "stream.place/streamplace/pkg/bus" 13 "stream.place/streamplace/pkg/log" 14) 15 16// This function remains in scope for the duration of a single users' playback 17func (mm *MediaManager) WebRTCPlayback2(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 18 uu, err := uuid.NewV7() 19 if err != nil { 20 return nil, err 21 } 22 ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 23 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback") 24 25 // Create a new RTCPeerConnection 26 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 27 if err != nil { 28 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 29 } 30 31 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 32 if err != nil { 33 return nil, fmt.Errorf("failed to create video track: %w", err) 34 } 35 videoRTPSender, err := peerConnection.AddTrack(videoTrack) 36 if err != nil { 37 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 38 } 39 40 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 41 if err != nil { 42 return nil, fmt.Errorf("failed to create audio track: %w", err) 43 } 44 audioRTPSender, err := peerConnection.AddTrack(audioTrack) 45 if err != nil { 46 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 47 } 48 49 close := func() { 50 if cErr := peerConnection.Close(); cErr != nil { 51 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 52 } 53 } 54 55 // Set the remote SessionDescription 56 if err = peerConnection.SetRemoteDescription(*offer); err != nil { 57 close() 58 return nil, fmt.Errorf("failed to set remote description: %w", err) 59 } 60 61 // Create answer 62 answer, err := peerConnection.CreateAnswer(nil) 63 if err != nil { 64 close() 65 return nil, fmt.Errorf("failed to create answer: %w", err) 66 } 67 68 // Sets the LocalDescription, and starts our UDP listeners 69 if err = peerConnection.SetLocalDescription(answer); err != nil { 70 close() 71 return nil, fmt.Errorf("failed to set local description: %w", err) 72 } 73 74 // Create channel that is blocked until ICE Gathering is complete 75 gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 76 77 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 78 79 go func() { 80 ctx, cancel := context.WithCancel(ctx) 81 defer cancel() 82 83 latency := time.Duration(0) 84 85 packetQueue := make(chan *bus.PacketizedSegment, 1024) 86 go func() { 87 segChan := mm.bus.SubscribeSegmentBuf(ctx, user, rendition, 2) 88 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 89 for { 90 select { 91 case <-ctx.Done(): 92 log.Debug(ctx, "exiting segment reader") 93 return 94 case file := <-segChan.C: 95 log.Debug(ctx, "got segment", "file", file.Filepath) 96 latency += file.PacketizedData.Duration 97 packetQueue <- file.PacketizedData 98 } 99 } 100 }() 101 102 go func() { 103 go func() { 104 <-ctx.Done() 105 if cErr := peerConnection.Close(); cErr != nil { 106 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 107 } 108 }() 109 110 var scalar float64 = 1 111 112 for { 113 select { 114 case <-ctx.Done(): 115 return 116 case packet := <-packetQueue: 117 latency -= packet.Duration 118 scalar = getPlaybackRate(latency) 119 log.Debug(ctx, "playback latency", "latency", latency, "scalar", scalar) 120 var videoDur time.Duration 121 var audioDur time.Duration 122 if len(packet.Video) > 0 { 123 videoDur = packet.Duration / time.Duration(len(packet.Video)) 124 } 125 if len(packet.Audio) > 0 { 126 audioDur = packet.Duration / time.Duration(len(packet.Audio)) 127 } 128 g, _ := errgroup.WithContext(ctx) 129 130 if videoDur > 0 { 131 g.Go(func() error { 132 ticker := time.NewTicker(time.Duration(float64(videoDur) * (1 / scalar))) 133 defer ticker.Stop() 134 for _, video := range packet.Video { 135 // log.Log(ctx, "writing video sample", "duration", videoDur) 136 err := videoTrack.WriteSample(media.Sample{Data: video, Duration: videoDur}) 137 if err != nil { 138 return fmt.Errorf("failed to write video sample: %w", err) 139 } 140 141 select { 142 case <-ctx.Done(): 143 return nil 144 case <-ticker.C: 145 continue 146 } 147 } 148 return nil 149 }) 150 } else { 151 log.Warn(ctx, "no video samples to write") 152 } 153 if audioDur > 0 { 154 g.Go(func() error { 155 ticker := time.NewTicker(time.Duration(float64(audioDur) * (1 / scalar))) 156 defer ticker.Stop() 157 for _, audio := range packet.Audio { 158 err := audioTrack.WriteSample(media.Sample{Data: audio, Duration: audioDur}) 159 if err != nil { 160 return fmt.Errorf("failed to write audio sample: %w", err) 161 } 162 select { 163 case <-ctx.Done(): 164 return nil 165 case <-ticker.C: 166 continue 167 } 168 } 169 return nil 170 }) 171 172 if err := g.Wait(); err != nil { 173 log.Error(ctx, "failed to write samples", "error", err) 174 cancel() 175 } 176 } else { 177 log.Warn(ctx, "no audio samples to write") 178 } 179 } 180 } 181 }() 182 183 mm.IncrementViewerCount(user, "webrtc") 184 defer mm.DecrementViewerCount(user, "webrtc") 185 186 go func() { 187 rtcpBuf := make([]byte, 1500) 188 for { 189 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 190 return 191 } 192 } 193 }() 194 195 go func() { 196 rtcpBuf := make([]byte, 1500) 197 for { 198 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 199 return 200 } 201 } 202 }() 203 204 // Set the handler for ICE connection state 205 // This will notify you when the peer has connected/disconnected 206 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 207 log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 208 }) 209 210 // Set the handler for Peer connection state 211 // This will notify you when the peer has connected/disconnected 212 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 213 log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 214 215 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected { 216 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 217 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 218 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 219 log.Log(ctx, "Peer Connection has gone to failed, exiting") 220 cancel() 221 } 222 }) 223 224 <-ctx.Done() 225 226 log.Warn(ctx, "exiting playback") 227 228 }() 229 select { 230 case <-gatherComplete: 231 return peerConnection.LocalDescription(), nil 232 case <-ctx.Done(): 233 return nil, ctx.Err() 234 } 235} 236 237// getPlaybackRate returns a playback rate that eases from 1.0 to 1.5 between 7 and 60 seconds 238func getPlaybackRate(dur time.Duration) float64 { 239 switch { 240 case dur <= 7*time.Second: 241 return 1.0 242 case dur >= 60*time.Second: 243 return 1.5 244 default: 245 // Linear interpolation between (7,1.0) and (60,1.5) 246 progress := (float64(dur) - float64(7*time.Second)) / (float64(60*time.Second) - float64(7*time.Second)) 247 return 1.0 + (0.5 * progress) 248 } 249}