Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.35 250 lines 7.5 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "github.com/google/uuid" 9 "github.com/pion/webrtc/v4" 10 "github.com/pion/webrtc/v4/pkg/media" 11 "golang.org/x/sync/errgroup" 12 "stream.place/streamplace/pkg/bus" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/spmetrics" 15) 16 17// This function remains in scope for the duration of a single users' playback 18func (mm *MediaManager) WebRTCPlayback2(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 19 uu, err := uuid.NewV7() 20 if err != nil { 21 return nil, err 22 } 23 ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 24 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback") 25 26 // Create a new RTCPeerConnection 27 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 28 if err != nil { 29 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 30 } 31 32 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 33 if err != nil { 34 return nil, fmt.Errorf("failed to create video track: %w", err) 35 } 36 videoRTPSender, err := peerConnection.AddTrack(videoTrack) 37 if err != nil { 38 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 39 } 40 41 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 42 if err != nil { 43 return nil, fmt.Errorf("failed to create audio track: %w", err) 44 } 45 audioRTPSender, err := peerConnection.AddTrack(audioTrack) 46 if err != nil { 47 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 48 } 49 50 close := func() { 51 if cErr := peerConnection.Close(); cErr != nil { 52 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 53 } 54 } 55 56 // Set the remote SessionDescription 57 if err = peerConnection.SetRemoteDescription(*offer); err != nil { 58 close() 59 return nil, fmt.Errorf("failed to set remote description: %w", err) 60 } 61 62 // Create answer 63 answer, err := peerConnection.CreateAnswer(nil) 64 if err != nil { 65 close() 66 return nil, fmt.Errorf("failed to create answer: %w", err) 67 } 68 69 // Sets the LocalDescription, and starts our UDP listeners 70 if err = peerConnection.SetLocalDescription(answer); err != nil { 71 close() 72 return nil, fmt.Errorf("failed to set local description: %w", err) 73 } 74 75 // Create channel that is blocked until ICE Gathering is complete 76 gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 77 78 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 79 80 go func() { 81 ctx, cancel := context.WithCancel(ctx) 82 defer cancel() 83 84 latency := time.Duration(0) 85 86 packetQueue := make(chan *bus.PacketizedSegment, 1024) 87 go func() { 88 segChan := mm.bus.SubscribeSegmentBuf(ctx, user, rendition, 2) 89 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 90 for { 91 select { 92 case <-ctx.Done(): 93 log.Debug(ctx, "exiting segment reader") 94 return 95 case file := <-segChan.C: 96 log.Debug(ctx, "got segment", "file", file.Filepath) 97 latency += file.PacketizedData.Duration 98 packetQueue <- file.PacketizedData 99 } 100 } 101 }() 102 103 go func() { 104 go func() { 105 <-ctx.Done() 106 if cErr := peerConnection.Close(); cErr != nil { 107 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 108 } 109 }() 110 111 var scalar float64 = 1 112 113 for { 114 select { 115 case <-ctx.Done(): 116 return 117 case packet := <-packetQueue: 118 latency -= packet.Duration 119 scalar = getPlaybackRate(latency) 120 log.Debug(ctx, "playback latency", "latency", latency, "scalar", scalar) 121 var videoDur time.Duration 122 var audioDur time.Duration 123 if len(packet.Video) > 0 { 124 videoDur = packet.Duration / time.Duration(len(packet.Video)) 125 } 126 if len(packet.Audio) > 0 { 127 audioDur = packet.Duration / time.Duration(len(packet.Audio)) 128 } 129 g, _ := errgroup.WithContext(ctx) 130 131 if videoDur > 0 { 132 g.Go(func() error { 133 ticker := time.NewTicker(time.Duration(float64(videoDur) * (1 / scalar))) 134 defer ticker.Stop() 135 for _, video := range packet.Video { 136 // log.Log(ctx, "writing video sample", "duration", videoDur) 137 err := videoTrack.WriteSample(media.Sample{Data: video, Duration: videoDur}) 138 if err != nil { 139 return fmt.Errorf("failed to write video sample: %w", err) 140 } 141 142 select { 143 case <-ctx.Done(): 144 return nil 145 case <-ticker.C: 146 continue 147 } 148 } 149 return nil 150 }) 151 } else { 152 log.Warn(ctx, "no video samples to write") 153 } 154 if audioDur > 0 { 155 g.Go(func() error { 156 ticker := time.NewTicker(time.Duration(float64(audioDur) * (1 / scalar))) 157 defer ticker.Stop() 158 for _, audio := range packet.Audio { 159 err := audioTrack.WriteSample(media.Sample{Data: audio, Duration: audioDur}) 160 if err != nil { 161 return fmt.Errorf("failed to write audio sample: %w", err) 162 } 163 select { 164 case <-ctx.Done(): 165 return nil 166 case <-ticker.C: 167 continue 168 } 169 } 170 return nil 171 }) 172 173 if err := g.Wait(); err != nil { 174 log.Error(ctx, "failed to write samples", "error", err) 175 cancel() 176 } 177 } else { 178 log.Warn(ctx, "no audio samples to write") 179 } 180 } 181 } 182 }() 183 184 spmetrics.ViewerInc(user) 185 defer spmetrics.ViewerDec(user) 186 187 go func() { 188 rtcpBuf := make([]byte, 1500) 189 for { 190 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 191 return 192 } 193 } 194 }() 195 196 go func() { 197 rtcpBuf := make([]byte, 1500) 198 for { 199 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 200 return 201 } 202 } 203 }() 204 205 // Set the handler for ICE connection state 206 // This will notify you when the peer has connected/disconnected 207 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 208 log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 209 }) 210 211 // Set the handler for Peer connection state 212 // This will notify you when the peer has connected/disconnected 213 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 214 log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 215 216 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected { 217 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 218 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 219 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 220 log.Log(ctx, "Peer Connection has gone to failed, exiting") 221 cancel() 222 } 223 }) 224 225 <-ctx.Done() 226 227 log.Warn(ctx, "exiting playback") 228 229 }() 230 select { 231 case <-gatherComplete: 232 return peerConnection.LocalDescription(), nil 233 case <-ctx.Done(): 234 return nil, ctx.Err() 235 } 236} 237 238// getPlaybackRate returns a playback rate that eases from 1.0 to 1.5 between 7 and 60 seconds 239func getPlaybackRate(dur time.Duration) float64 { 240 switch { 241 case dur <= 7*time.Second: 242 return 1.0 243 case dur >= 60*time.Second: 244 return 1.5 245 default: 246 // Linear interpolation between (7,1.0) and (60,1.5) 247 progress := (float64(dur) - float64(7*time.Second)) / (float64(60*time.Second) - float64(7*time.Second)) 248 return 1.0 + (0.5 * progress) 249 } 250}