Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/chat-fixes 376 lines 11 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "github.com/google/uuid" 12 "github.com/pion/webrtc/v4" 13 "github.com/pion/webrtc/v4/pkg/media" 14 "stream.place/streamplace/pkg/bus" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/spmetrics" 17) 18 19// we have a bug that prevents us from correctly probing video durations 20// a lot of the time. so when we don't have them we use the last duration 21// that we had, and when we don't have that we use a default duration 22var DefaultDuration = time.Duration(32 * time.Millisecond) 23 24// This function remains in scope for the duration of a single users' playback 25func (mm *MediaManager) WebRTCPlayback(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 26 uu, err := uuid.NewV7() 27 if err != nil { 28 return nil, err 29 } 30 ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 31 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback") 32 ctx, cancel := context.WithCancel(ctx) //nolint:all 33 34 pipelineSlice := []string{ 35 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 36 "opusparse name=audioparse ! appsink name=audioappsink", 37 } 38 39 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 40 if err != nil { 41 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 42 } 43 44 segBuffer := make(chan *bus.Seg, 1024) 45 go func() { 46 segChan := mm.bus.SubscribeSegment(ctx, user, rendition) 47 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 48 for { 49 select { 50 case <-ctx.Done(): 51 log.Debug(ctx, "exiting segment reader") 52 return 53 case file := <-segChan.C: 54 log.Debug(ctx, "got segment", "file", file.Filepath) 55 segBuffer <- file 56 } 57 } 58 }() 59 60 segCh := make(chan *bus.Seg) 61 go func() { 62 for { 63 select { 64 case <-ctx.Done(): 65 log.Debug(ctx, "exiting segment reader") 66 return 67 case seg := <-segBuffer: 68 select { 69 case <-ctx.Done(): 70 return 71 case segCh <- seg: 72 } 73 } 74 } 75 }() 76 77 concatBin, err := ConcatBin(ctx, segCh) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create concat bin: %w", err) 80 } 81 82 err = pipeline.Add(concatBin.Element) 83 if err != nil { 84 return nil, fmt.Errorf("failed to add concat bin to pipeline: %w", err) 85 } 86 87 videoPad := concatBin.GetStaticPad("video_0") 88 if videoPad == nil { 89 return nil, fmt.Errorf("video pad not found") 90 } 91 92 audioPad := concatBin.GetStaticPad("audio_0") 93 if audioPad == nil { 94 return nil, fmt.Errorf("audio pad not found") 95 } 96 97 // queuePadVideo := outputQueue.GetRequestPad("src_%u") 98 // if queuePadVideo == nil { 99 // return nil, fmt.Errorf("failed to get queue video pad") 100 // } 101 // queuePadAudio := outputQueue.GetRequestPad("src_%u") 102 // if queuePadAudio == nil { 103 // return nil, fmt.Errorf("failed to get queue audio pad") 104 // } 105 106 videoParse, err := pipeline.GetElementByName("videoparse") 107 if err != nil { 108 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 109 } 110 videoParsePad := videoParse.GetStaticPad("sink") 111 if videoParsePad == nil { 112 return nil, fmt.Errorf("video parse pad not found") 113 } 114 linked := videoPad.Link(videoParsePad) 115 if linked != gst.PadLinkOK { 116 return nil, fmt.Errorf("failed to link video pad to video parse pad: %v", linked) 117 } 118 119 audioParse, err := pipeline.GetElementByName("audioparse") 120 if err != nil { 121 return nil, fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 122 } 123 audioParsePad := audioParse.GetStaticPad("sink") 124 if audioParsePad == nil { 125 return nil, fmt.Errorf("audio parse pad not found") 126 } 127 linked = audioPad.Link(audioParsePad) 128 if linked != gst.PadLinkOK { 129 return nil, fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked) 130 } 131 132 videoappsinkele, err := pipeline.GetElementByName("videoappsink") 133 if err != nil { 134 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 135 } 136 137 audioappsinkele, err := pipeline.GetElementByName("audioappsink") 138 if err != nil { 139 return nil, fmt.Errorf("failed to get audio sink element from pipeline: %w", err) 140 } 141 142 // Create a new RTCPeerConnection 143 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 144 if err != nil { 145 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 146 } 147 go func() { 148 <-ctx.Done() 149 if cErr := peerConnection.Close(); cErr != nil { 150 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 151 } 152 }() 153 154 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 155 if err != nil { 156 return nil, fmt.Errorf("failed to create video track: %w", err) 157 } 158 videoRTPSender, err := peerConnection.AddTrack(videoTrack) 159 if err != nil { 160 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 161 } 162 163 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 164 if err != nil { 165 return nil, fmt.Errorf("failed to create audio track: %w", err) 166 } 167 audioRTPSender, err := peerConnection.AddTrack(audioTrack) 168 if err != nil { 169 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 170 } 171 172 // Set the remote SessionDescription 173 if err = peerConnection.SetRemoteDescription(*offer); err != nil { 174 return nil, fmt.Errorf("failed to set remote description: %w", err) 175 } 176 177 // Create answer 178 answer, err := peerConnection.CreateAnswer(nil) 179 if err != nil { 180 return nil, fmt.Errorf("failed to create answer: %w", err) 181 } 182 183 // Sets the LocalDescription, and starts our UDP listeners 184 if err = peerConnection.SetLocalDescription(answer); err != nil { 185 return nil, fmt.Errorf("failed to set local description: %w", err) 186 } 187 188 // Create channel that is blocked until ICE Gathering is complete 189 gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 190 191 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 192 193 go func() { 194 ticker := time.NewTicker(time.Second * 1) 195 for { 196 select { 197 case <-ctx.Done(): 198 return 199 case <-ticker.C: 200 state := pipeline.GetCurrentState() 201 log.Debug(ctx, "pipeline state", "state", state) 202 } 203 } 204 }() 205 206 var lastVideoDuration = &DefaultDuration 207 208 go func() { 209 go func() { 210 if err := HandleBusMessages(ctx, pipeline); err != nil { 211 log.Log(ctx, "pipeline error", "error", err) 212 } 213 cancel() 214 }() 215 216 videoappsink := app.SinkFromElement(videoappsinkele) 217 videoappsink.SetCallbacks(&app.SinkCallbacks{ 218 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 219 sample := sink.PullSample() 220 if sample == nil { 221 return gst.FlowEOS 222 } 223 224 buffer := sample.GetBuffer() 225 if buffer == nil { 226 return gst.FlowError 227 } 228 229 samples := buffer.Map(gst.MapRead).Bytes() 230 defer buffer.Unmap() 231 clockTime := buffer.Duration() 232 dur := clockTime.AsDuration() 233 mediaSample := media.Sample{Data: samples} 234 if dur != nil { 235 mediaSample.Duration = *dur 236 lastVideoDuration = dur 237 } else if lastVideoDuration != nil { 238 // log.Log(ctx, "no video duration, using last duration", "lastVideoDuration", lastVideoDuration) 239 mediaSample.Duration = *lastVideoDuration 240 } else { 241 log.Log(ctx, "no video duration", "samples", len(samples)) 242 // cancel() 243 return gst.FlowOK 244 } 245 246 if err := videoTrack.WriteSample(mediaSample); err != nil { 247 log.Log(ctx, "failed to write video sample", "error", err) 248 cancel() 249 } 250 251 return gst.FlowOK 252 }, 253 EOSFunc: func(sink *app.Sink) { 254 log.Warn(ctx, "videoappsink EOSFunc") 255 cancel() 256 }, 257 }) 258 259 audioappsink := app.SinkFromElement(audioappsinkele) 260 audioappsink.SetCallbacks(&app.SinkCallbacks{ 261 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 262 sample := sink.PullSample() 263 if sample == nil { 264 return gst.FlowEOS 265 } 266 267 buffer := sample.GetBuffer() 268 if buffer == nil { 269 return gst.FlowError 270 } 271 272 samples := buffer.Map(gst.MapRead).Bytes() 273 defer buffer.Unmap() 274 275 b2 := make([]byte, len(samples)) 276 copy(b2, samples) 277 278 clockTime := buffer.Duration() 279 dur := clockTime.AsDuration() 280 mediaSample := media.Sample{Data: b2} 281 if dur != nil { 282 mediaSample.Duration = *dur 283 } else { 284 log.Log(ctx, "no audio duration", "samples", len(b2)) 285 // cancel() 286 return gst.FlowOK 287 } 288 if err := audioTrack.WriteSample(mediaSample); err != nil { 289 log.Log(ctx, "failed to write audio sample", "error", err) 290 return gst.FlowOK 291 } 292 293 return gst.FlowOK 294 }, 295 EOSFunc: func(sink *app.Sink) { 296 log.Warn(ctx, "audioappsink EOSFunc") 297 cancel() 298 }, 299 }) 300 301 // Start the pipeline 302 err := pipeline.SetState(gst.StatePlaying) 303 if err != nil { 304 log.Log(ctx, "failed to set pipeline state to null", "error", err) 305 } 306 spmetrics.ViewerInc(user) 307 defer spmetrics.ViewerDec(user) 308 309 go func() { 310 rtcpBuf := make([]byte, 1500) 311 for { 312 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 313 return 314 } 315 } 316 }() 317 318 go func() { 319 rtcpBuf := make([]byte, 1500) 320 for { 321 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 322 return 323 } 324 } 325 }() 326 327 // Set the handler for ICE connection state 328 // This will notify you when the peer has connected/disconnected 329 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 330 log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 331 }) 332 333 // Set the handler for Peer connection state 334 // This will notify you when the peer has connected/disconnected 335 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 336 log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 337 338 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected { 339 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 340 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 341 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 342 log.Log(ctx, "Peer Connection has gone to failed, exiting") 343 cancel() 344 } 345 }) 346 347 <-ctx.Done() 348 349 log.Warn(ctx, "setting playback pipeline state to null") 350 err = pipeline.BlockSetState(gst.StateNull) 351 if err != nil { 352 log.Log(ctx, "failed to set pipeline state to null", "error", err) 353 } 354 355 videoappsink.SetCallbacks(&app.SinkCallbacks{}) 356 err = videoappsinkele.SetState(gst.StateNull) 357 if err != nil { 358 log.Log(ctx, "failed to set videoappsinkele state to null", "error", err) 359 } 360 361 audioappsink.SetCallbacks(&app.SinkCallbacks{}) 362 err = audioappsinkele.SetState(gst.StateNull) 363 if err != nil { 364 log.Log(ctx, "failed to set audioappsinkele state to null", "error", err) 365 } 366 367 log.Warn(ctx, "exiting playback") 368 369 }() 370 select { 371 case <-gatherComplete: 372 return peerConnection.LocalDescription(), nil 373 case <-ctx.Done(): 374 return nil, ctx.Err() 375 } 376}