Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.9.2 375 lines 11 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "github.com/google/uuid" 12 "github.com/pion/webrtc/v4" 13 "github.com/pion/webrtc/v4/pkg/media" 14 "stream.place/streamplace/pkg/bus" 15 "stream.place/streamplace/pkg/log" 16) 17 18// we have a bug that prevents us from correctly probing video durations 19// a lot of the time. so when we don't have them we use the last duration 20// that we had, and when we don't have that we use a default duration 21var DefaultDuration = time.Duration(32 * time.Millisecond) 22 23// This function remains in scope for the duration of a single users' playback 24func (mm *MediaManager) WebRTCPlayback(ctx context.Context, user string, rendition string, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 25 uu, err := uuid.NewV7() 26 if err != nil { 27 return nil, err 28 } 29 ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 30 ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback") 31 ctx, cancel := context.WithCancel(ctx) //nolint:all 32 33 pipelineSlice := []string{ 34 "h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 35 "opusparse name=audioparse ! appsink name=audioappsink", 36 } 37 38 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 39 if err != nil { 40 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 41 } 42 43 segBuffer := make(chan *bus.Seg, 1024) 44 go func() { 45 segChan := mm.bus.SubscribeSegment(ctx, user, rendition) 46 defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 47 for { 48 select { 49 case <-ctx.Done(): 50 log.Debug(ctx, "exiting segment reader") 51 return 52 case file := <-segChan.C: 53 log.Debug(ctx, "got segment", "file", file.Filepath) 54 segBuffer <- file 55 } 56 } 57 }() 58 59 segCh := make(chan *bus.Seg) 60 go func() { 61 for { 62 select { 63 case <-ctx.Done(): 64 log.Debug(ctx, "exiting segment reader") 65 return 66 case seg := <-segBuffer: 67 select { 68 case <-ctx.Done(): 69 return 70 case segCh <- seg: 71 } 72 } 73 } 74 }() 75 76 concatBin, err := ConcatBin(ctx, segCh, true) 77 if err != nil { 78 return nil, fmt.Errorf("failed to create concat bin: %w", err) 79 } 80 81 err = pipeline.Add(concatBin.Element) 82 if err != nil { 83 return nil, fmt.Errorf("failed to add concat bin to pipeline: %w", err) 84 } 85 86 videoPad := concatBin.GetStaticPad("video_0") 87 if videoPad == nil { 88 return nil, fmt.Errorf("video pad not found") 89 } 90 91 audioPad := concatBin.GetStaticPad("audio_0") 92 if audioPad == nil { 93 return nil, fmt.Errorf("audio pad not found") 94 } 95 96 // queuePadVideo := outputQueue.GetRequestPad("src_%u") 97 // if queuePadVideo == nil { 98 // return nil, fmt.Errorf("failed to get queue video pad") 99 // } 100 // queuePadAudio := outputQueue.GetRequestPad("src_%u") 101 // if queuePadAudio == nil { 102 // return nil, fmt.Errorf("failed to get queue audio pad") 103 // } 104 105 videoParse, err := pipeline.GetElementByName("videoparse") 106 if err != nil { 107 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 108 } 109 videoParsePad := videoParse.GetStaticPad("sink") 110 if videoParsePad == nil { 111 return nil, fmt.Errorf("video parse pad not found") 112 } 113 linked := videoPad.Link(videoParsePad) 114 if linked != gst.PadLinkOK { 115 return nil, fmt.Errorf("failed to link video pad to video parse pad: %v", linked) 116 } 117 118 audioParse, err := pipeline.GetElementByName("audioparse") 119 if err != nil { 120 return nil, fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 121 } 122 audioParsePad := audioParse.GetStaticPad("sink") 123 if audioParsePad == nil { 124 return nil, fmt.Errorf("audio parse pad not found") 125 } 126 linked = audioPad.Link(audioParsePad) 127 if linked != gst.PadLinkOK { 128 return nil, fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked) 129 } 130 131 videoappsinkele, err := pipeline.GetElementByName("videoappsink") 132 if err != nil { 133 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 134 } 135 136 audioappsinkele, err := pipeline.GetElementByName("audioappsink") 137 if err != nil { 138 return nil, fmt.Errorf("failed to get audio sink element from pipeline: %w", err) 139 } 140 141 // Create a new RTCPeerConnection 142 peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 143 if err != nil { 144 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 145 } 146 go func() { 147 <-ctx.Done() 148 if cErr := peerConnection.Close(); cErr != nil { 149 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 150 } 151 }() 152 153 videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 154 if err != nil { 155 return nil, fmt.Errorf("failed to create video track: %w", err) 156 } 157 videoRTPSender, err := peerConnection.AddTrack(videoTrack) 158 if err != nil { 159 return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 160 } 161 162 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 163 if err != nil { 164 return nil, fmt.Errorf("failed to create audio track: %w", err) 165 } 166 audioRTPSender, err := peerConnection.AddTrack(audioTrack) 167 if err != nil { 168 return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 169 } 170 171 // Set the remote SessionDescription 172 if err = peerConnection.SetRemoteDescription(*offer); err != nil { 173 return nil, fmt.Errorf("failed to set remote description: %w", err) 174 } 175 176 // Create answer 177 answer, err := peerConnection.CreateAnswer(nil) 178 if err != nil { 179 return nil, fmt.Errorf("failed to create answer: %w", err) 180 } 181 182 // Sets the LocalDescription, and starts our UDP listeners 183 if err = peerConnection.SetLocalDescription(answer); err != nil { 184 return nil, fmt.Errorf("failed to set local description: %w", err) 185 } 186 187 // Create channel that is blocked until ICE Gathering is complete 188 gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 189 190 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 191 192 go func() { 193 ticker := time.NewTicker(time.Second * 1) 194 for { 195 select { 196 case <-ctx.Done(): 197 return 198 case <-ticker.C: 199 state := pipeline.GetCurrentState() 200 log.Debug(ctx, "pipeline state", "state", state) 201 } 202 } 203 }() 204 205 var lastVideoDuration = &DefaultDuration 206 207 go func() { 208 go func() { 209 if err := HandleBusMessages(ctx, pipeline); err != nil { 210 log.Log(ctx, "pipeline error", "error", err) 211 } 212 cancel() 213 }() 214 215 videoappsink := app.SinkFromElement(videoappsinkele) 216 videoappsink.SetCallbacks(&app.SinkCallbacks{ 217 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 218 sample := sink.PullSample() 219 if sample == nil { 220 return gst.FlowEOS 221 } 222 223 buffer := sample.GetBuffer() 224 if buffer == nil { 225 return gst.FlowError 226 } 227 228 samples := buffer.Map(gst.MapRead).Bytes() 229 defer buffer.Unmap() 230 clockTime := buffer.Duration() 231 dur := clockTime.AsDuration() 232 mediaSample := media.Sample{Data: samples} 233 if dur != nil { 234 mediaSample.Duration = *dur 235 lastVideoDuration = dur 236 } else if lastVideoDuration != nil { 237 // log.Log(ctx, "no video duration, using last duration", "lastVideoDuration", lastVideoDuration) 238 mediaSample.Duration = *lastVideoDuration 239 } else { 240 log.Log(ctx, "no video duration", "samples", len(samples)) 241 // cancel() 242 return gst.FlowOK 243 } 244 245 if err := videoTrack.WriteSample(mediaSample); err != nil { 246 log.Log(ctx, "failed to write video sample", "error", err) 247 cancel() 248 } 249 250 return gst.FlowOK 251 }, 252 EOSFunc: func(sink *app.Sink) { 253 log.Warn(ctx, "videoappsink EOSFunc") 254 cancel() 255 }, 256 }) 257 258 audioappsink := app.SinkFromElement(audioappsinkele) 259 audioappsink.SetCallbacks(&app.SinkCallbacks{ 260 NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 261 sample := sink.PullSample() 262 if sample == nil { 263 return gst.FlowEOS 264 } 265 266 buffer := sample.GetBuffer() 267 if buffer == nil { 268 return gst.FlowError 269 } 270 271 samples := buffer.Map(gst.MapRead).Bytes() 272 defer buffer.Unmap() 273 274 b2 := make([]byte, len(samples)) 275 copy(b2, samples) 276 277 clockTime := buffer.Duration() 278 dur := clockTime.AsDuration() 279 mediaSample := media.Sample{Data: b2} 280 if dur != nil { 281 mediaSample.Duration = *dur 282 } else { 283 log.Log(ctx, "no audio duration", "samples", len(b2)) 284 // cancel() 285 return gst.FlowOK 286 } 287 if err := audioTrack.WriteSample(mediaSample); err != nil { 288 log.Log(ctx, "failed to write audio sample", "error", err) 289 return gst.FlowOK 290 } 291 292 return gst.FlowOK 293 }, 294 EOSFunc: func(sink *app.Sink) { 295 log.Warn(ctx, "audioappsink EOSFunc") 296 cancel() 297 }, 298 }) 299 300 // Start the pipeline 301 err := pipeline.SetState(gst.StatePlaying) 302 if err != nil { 303 log.Log(ctx, "failed to set pipeline state to null", "error", err) 304 } 305 mm.IncrementViewerCount(user, "webrtc") 306 defer mm.DecrementViewerCount(user, "webrtc") 307 308 go func() { 309 rtcpBuf := make([]byte, 1500) 310 for { 311 if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 312 return 313 } 314 } 315 }() 316 317 go func() { 318 rtcpBuf := make([]byte, 1500) 319 for { 320 if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 321 return 322 } 323 } 324 }() 325 326 // Set the handler for ICE connection state 327 // This will notify you when the peer has connected/disconnected 328 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 329 log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 330 }) 331 332 // Set the handler for Peer connection state 333 // This will notify you when the peer has connected/disconnected 334 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 335 log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 336 337 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed || s == webrtc.PeerConnectionStateDisconnected { 338 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 339 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 340 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 341 log.Log(ctx, "Peer Connection has gone to failed, exiting") 342 cancel() 343 } 344 }) 345 346 <-ctx.Done() 347 348 log.Warn(ctx, "setting playback pipeline state to null") 349 err = pipeline.BlockSetState(gst.StateNull) 350 if err != nil { 351 log.Log(ctx, "failed to set pipeline state to null", "error", err) 352 } 353 354 videoappsink.SetCallbacks(&app.SinkCallbacks{}) 355 err = videoappsinkele.SetState(gst.StateNull) 356 if err != nil { 357 log.Log(ctx, "failed to set videoappsinkele state to null", "error", err) 358 } 359 360 audioappsink.SetCallbacks(&app.SinkCallbacks{}) 361 err = audioappsinkele.SetState(gst.StateNull) 362 if err != nil { 363 log.Log(ctx, "failed to set audioappsinkele state to null", "error", err) 364 } 365 366 log.Warn(ctx, "exiting playback") 367 368 }() 369 select { 370 case <-gatherComplete: 371 return peerConnection.LocalDescription(), nil 372 case <-ctx.Done(): 373 return nil, ctx.Err() 374 } 375}