Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/packaging 241 lines 7.4 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "github.com/google/uuid" 9 "github.com/pion/webrtc/v4" 10 "github.com/pion/webrtc/v4/pkg/media" 11 "golang.org/x/sync/errgroup" 12 "stream.place/streamplace/pkg/bus" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/spmetrics" 15) 16 17// This function remains in scope for the duration of a single users' playback 18func (mm *MediaManager) WebRTCPlayback2(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 19 uu, err := uuid.NewV7() 20 if err != nil { 21 return nil, err 22 } 23 ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 24 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback") 25 26 // Create a new RTCPeerConnection 27 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 28 if err != nil { 29 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 30 } 31 32 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 33 if err != nil { 34 return nil, fmt.Errorf("failed to create video track: %w", err) 35 } 36 videoRTPSender, err := peerConnection.AddTrack(videoTrack) 37 if err != nil { 38 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 39 } 40 41 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 42 if err != nil { 43 return nil, fmt.Errorf("failed to create audio track: %w", err) 44 } 45 audioRTPSender, err := peerConnection.AddTrack(audioTrack) 46 if err != nil { 47 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 48 } 49 50 // Set the remote SessionDescription 51 if err = peerConnection.SetRemoteDescription(*offer); err != nil { 52 return nil, fmt.Errorf("failed to set remote description: %w", err) 53 } 54 55 // Create answer 56 answer, err := peerConnection.CreateAnswer(nil) 57 if err != nil { 58 return nil, fmt.Errorf("failed to create answer: %w", err) 59 } 60 61 // Sets the LocalDescription, and starts our UDP listeners 62 if err = peerConnection.SetLocalDescription(answer); err != nil { 63 return nil, fmt.Errorf("failed to set local description: %w", err) 64 } 65 66 // Create channel that is blocked until ICE Gathering is complete 67 gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 68 69 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 70 71 go func() { 72 ctx, cancel := context.WithCancel(ctx) 73 defer cancel() 74 75 latency := time.Duration(0) 76 77 packetQueue := make(chan *bus.PacketizedSegment, 1024) 78 go func() { 79 segChan := mm.bus.SubscribeSegmentBuf(ctx, user, rendition, 2) 80 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 81 for { 82 select { 83 case <-ctx.Done(): 84 log.Debug(ctx, "exiting segment reader") 85 return 86 case file := <-segChan.C: 87 log.Debug(ctx, "got segment", "file", file.Filepath) 88 latency += file.PacketizedData.Duration 89 packetQueue <- file.PacketizedData 90 } 91 } 92 }() 93 94 go func() { 95 go func() { 96 <-ctx.Done() 97 if cErr := peerConnection.Close(); cErr != nil { 98 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 99 } 100 }() 101 102 var scalar float64 = 1 103 104 for { 105 select { 106 case <-ctx.Done(): 107 return 108 case packet := <-packetQueue: 109 latency -= packet.Duration 110 scalar = getPlaybackRate(latency) 111 log.Debug(ctx, "playback latency", "latency", latency, "scalar", scalar) 112 var videoDur time.Duration 113 var audioDur time.Duration 114 if len(packet.Video) > 0 { 115 videoDur = packet.Duration / time.Duration(len(packet.Video)) 116 } 117 if len(packet.Audio) > 0 { 118 audioDur = packet.Duration / time.Duration(len(packet.Audio)) 119 } 120 g, _ := errgroup.WithContext(ctx) 121 122 if videoDur > 0 { 123 g.Go(func() error { 124 ticker := time.NewTicker(time.Duration(float64(videoDur) * (1 / scalar))) 125 defer ticker.Stop() 126 for _, video := range packet.Video { 127 // log.Log(ctx, "writing video sample", "duration", videoDur) 128 err := videoTrack.WriteSample(media.Sample{Data: video, Duration: videoDur}) 129 if err != nil { 130 return fmt.Errorf("failed to write video sample: %w", err) 131 } 132 133 select { 134 case <-ctx.Done(): 135 return nil 136 case <-ticker.C: 137 continue 138 } 139 } 140 return nil 141 }) 142 } else { 143 log.Warn(ctx, "no video samples to write") 144 } 145 if audioDur > 0 { 146 g.Go(func() error { 147 ticker := time.NewTicker(time.Duration(float64(audioDur) * (1 / scalar))) 148 defer ticker.Stop() 149 for _, audio := range packet.Audio { 150 err := audioTrack.WriteSample(media.Sample{Data: audio, Duration: audioDur}) 151 if err != nil { 152 return fmt.Errorf("failed to write audio sample: %w", err) 153 } 154 select { 155 case <-ctx.Done(): 156 return nil 157 case <-ticker.C: 158 continue 159 } 160 } 161 return nil 162 }) 163 164 if err := g.Wait(); err != nil { 165 log.Error(ctx, "failed to write samples", "error", err) 166 cancel() 167 } 168 } else { 169 log.Warn(ctx, "no audio samples to write") 170 } 171 } 172 } 173 }() 174 175 spmetrics.ViewerInc(user) 176 defer spmetrics.ViewerDec(user) 177 178 go func() { 179 rtcpBuf := make([]byte, 1500) 180 for { 181 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 182 return 183 } 184 } 185 }() 186 187 go func() { 188 rtcpBuf := make([]byte, 1500) 189 for { 190 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 191 return 192 } 193 } 194 }() 195 196 // Set the handler for ICE connection state 197 // This will notify you when the peer has connected/disconnected 198 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 199 log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 200 }) 201 202 // Set the handler for Peer connection state 203 // This will notify you when the peer has connected/disconnected 204 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 205 log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 206 207 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected { 208 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 209 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 210 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 211 log.Log(ctx, "Peer Connection has gone to failed, exiting") 212 cancel() 213 } 214 }) 215 216 <-ctx.Done() 217 218 log.Warn(ctx, "exiting playback") 219 220 }() 221 select { 222 case <-gatherComplete: 223 return peerConnection.LocalDescription(), nil 224 case <-ctx.Done(): 225 return nil, ctx.Err() 226 } 227} 228 229// getPlaybackRate returns a playback rate that eases from 1.0 to 1.5 between 7 and 60 seconds 230func getPlaybackRate(dur time.Duration) float64 { 231 switch { 232 case dur <= 7*time.Second: 233 return 1.0 234 case dur >= 60*time.Second: 235 return 1.5 236 default: 237 // Linear interpolation between (7,1.0) and (60,1.5) 238 progress := (float64(dur) - float64(7*time.Second)) / (float64(60*time.Second) - float64(7*time.Second)) 239 return 1.0 + (0.5 * progress) 240 } 241}