Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/too-many-keyframes 320 lines 10 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "github.com/google/uuid" 12 "github.com/pion/rtcp" 13 "github.com/pion/webrtc/v4" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/rtcrec" 16) 17 18// This function remains in scope for the duration of a single users' playback 19func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner, peerConnection rtcrec.PeerConnection, done chan error) (*webrtc.SessionDescription, error) { 20 uu, err := uuid.NewV7() 21 if err != nil { 22 return nil, err 23 } 24 25 ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest", "streamer", signer.Streamer()) 26 27 // Allow us to receive 1 audio track, and 1 video track 28 if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { 29 return nil, fmt.Errorf("failed to add audio transceiver: %w", err) 30 } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { 31 return nil, fmt.Errorf("failed to add video transceiver: %w", err) 32 } 33 34 pipelineSlice := []string{ 35 "multiqueue name=queue", 36 "appsrc format=time is-live=true do-timestamp=true name=videosrc ! capsfilter caps=application/x-rtp ! rtph264depay ! capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=nal ! h264parse disable-passthrough=true config-interval=-1 ! h264timestamper ! identity ! queue.sink_0", 37 "appsrc format=time do-timestamp=true name=audiosrc ! capsfilter caps=application/x-rtp,media=audio,encoding-name=OPUS,payload=111 ! rtpopusdepay ! opusparse ! queue.sink_1", 38 } 39 40 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 41 if err != nil { 42 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 43 } 44 45 queue, err := pipeline.GetElementByName("queue") 46 if err != nil { 47 return nil, fmt.Errorf("failed to get queue element from pipeline: %w", err) 48 } 49 50 // err = queue.Link(signerElem) 51 // if err != nil { 52 // return nil, fmt.Errorf("failed to link queue to signer element: %w", err) 53 // } 54 videoSrcPads, err := queue.GetSrcPads() 55 if err != nil { 56 return nil, fmt.Errorf("failed to get videoSrcPads from queue: %w", err) 57 } 58 if len(videoSrcPads) != 2 { 59 return nil, fmt.Errorf("failed to get videoSrcPads from queue") 60 } 61 videoSrcPad := videoSrcPads[0] 62 audioSrcPad := videoSrcPads[1] 63 64 videoSrcElem, err := pipeline.GetElementByName("videosrc") 65 if err != nil { 66 return nil, fmt.Errorf("failed to get videoSrcElem element from pipeline: %w", err) 67 } 68 videoSrc := app.SrcFromElement(videoSrcElem) 69 70 audioSrcElem, err := pipeline.GetElementByName("audiosrc") 71 if err != nil { 72 return nil, fmt.Errorf("failed to get audioSrcElem element from pipeline: %w", err) 73 } 74 audioSrc := app.SrcFromElement(audioSrcElem) 75 76 // Set the remote SessionDescription 77 if err = peerConnection.SetRemoteDescription(*offer); err != nil { 78 return nil, fmt.Errorf("failed to set remote description: %w", err) 79 } 80 81 // Create answer 82 answer, err := peerConnection.CreateAnswer(nil) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create answer: %w", err) 85 } 86 87 // Sets the LocalDescription, and starts our UDP listeners 88 if err = peerConnection.SetLocalDescription(answer); err != nil { 89 return nil, fmt.Errorf("failed to set local description: %w", err) 90 } 91 92 // Create channel that is blocked until ICE Gathering is complete 93 gatherComplete := rtcrec.GatheringCompletePromise(peerConnection) 94 95 ctx, cancel := context.WithCancel(ctx) 96 signerElem, err := mm.SegmentAndSignElem(ctx, signer) 97 if err != nil { 98 cancel() 99 return nil, fmt.Errorf("failed create signer element: %w", err) 100 } 101 err = pipeline.Add(signerElem) 102 if err != nil { 103 cancel() 104 return nil, fmt.Errorf("failed to add signer element to pipeline: %w", err) 105 } 106 signerElemPads, err := signerElem.GetPads() 107 if err != nil { 108 cancel() 109 return nil, fmt.Errorf("failed to get signerElemPads from signer element: %w", err) 110 } 111 if len(signerElemPads) != 2 { 112 cancel() 113 return nil, fmt.Errorf("failed to get signerElemPads from signer element") 114 } 115 signerElemVideoPad := signerElemPads[0] 116 signerElemAudioPad := signerElemPads[1] 117 linked := videoSrcPad.Link(signerElemVideoPad) 118 if linked != gst.PadLinkOK { 119 cancel() 120 return nil, fmt.Errorf("failed to link videoSrcPad to signerElemVideoPad") 121 } 122 linked = audioSrcPad.Link(signerElemAudioPad) 123 if linked != gst.PadLinkOK { 124 cancel() 125 return nil, fmt.Errorf("failed to link audioSrcPad to signerElemAudioPad") 126 } 127 128 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 129 go func() { 130 busErrorChan := make(chan error) 131 go func() { 132 err := HandleBusMessages(ctx, pipeline) 133 if err != nil { 134 log.Log(ctx, "pipeline error", "error", err) 135 } 136 cancel() 137 busErrorChan <- err 138 }() 139 140 defer cancel() 141 defer func() { done <- <-busErrorChan }() 142 143 go func() { 144 ticker := time.NewTicker(time.Second * 1) 145 for { 146 select { 147 case <-ctx.Done(): 148 return 149 case <-ticker.C: 150 state := pipeline.GetCurrentState() 151 log.Debug(ctx, "pipeline state", "state", state) 152 } 153 } 154 }() 155 156 // subscription to bus messages for key revocation 157 go mm.HandleKeyRevocation(ctx, signer, pipeline) 158 159 go func() { 160 <-ctx.Done() 161 if cErr := peerConnection.Close(); cErr != nil { 162 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 163 } 164 }() 165 166 log.Debug(ctx, "starting pipeline") 167 168 // Start the pipeline 169 err = pipeline.SetState(gst.StatePlaying) 170 if err != nil { 171 log.Log(ctx, "failed to set pipeline state", "error", err) 172 cancel() 173 } 174 175 // Set the handler for ICE connection state 176 // This will notify you when the peer has connected/disconnected 177 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 178 log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 179 }) 180 181 // Set the handler for Peer connection state 182 // This will notify you when the peer has connected/disconnected 183 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 184 log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 185 186 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected { 187 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 188 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 189 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 190 log.Log(ctx, "Peer Connection has ended, exiting", "state", s.String()) 191 cancel() 192 } 193 }) 194 195 videoFirst := false 196 audioFirst := false 197 198 peerConnection.OnTrack(func(track rtcrec.TrackRemote, _ rtcrec.RTPReceiver) { 199 log.Warn(ctx, "OnTrack", "kind", track.Kind()) 200 if track.Kind() == webrtc.RTPCodecTypeVideo { 201 // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval 202 go func() { 203 ticker := time.NewTicker(time.Second * 1) 204 for { 205 select { 206 case <-ctx.Done(): 207 return 208 case <-ticker.C: 209 rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) 210 if rtcpSendErr != nil { 211 log.Log(ctx, "failed to send rtcp packet", "error", rtcpSendErr) 212 cancel() 213 return 214 } 215 } 216 } 217 }() 218 219 codecName := strings.Split(track.Codec().MimeType, "/")[1] 220 log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 221 222 // appSrc := pipelineForCodec(track, codecName) 223 buf := make([]byte, 1400) 224 for { 225 i, _, readErr := track.Read(buf) 226 if readErr != nil { 227 log.Log(ctx, "failed to read track", "error", readErr) 228 videoSrc.EndStream() 229 return 230 } 231 // if ctx.Err() != nil { 232 // return 233 // } 234 if !videoFirst { 235 videoFirst = true 236 log.Debug(ctx, "got video data", "len", len(buf[:i])) 237 } 238 239 gbuf := gst.NewBufferWithSize(int64(len(buf[:i]))) 240 gbuf.Map(gst.MapWrite).WriteData(buf[:i]) 241 gbuf.Unmap() 242 243 ret := videoSrc.PushBuffer(gbuf) 244 if ret != gst.FlowOK { 245 log.Log(ctx, "failed to push buffer", "error", ret) 246 cancel() 247 return 248 } 249 // state := pipeline.GetCurrentState() 250 // if state != gst.StatePlaying { 251 // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 252 // cancel() 253 // return 254 // } 255 } 256 } 257 if track.Kind() == webrtc.RTPCodecTypeAudio { 258 259 codecName := strings.Split(track.Codec().MimeType, "/")[1] 260 log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 261 262 buf := make([]byte, 1400) 263 for { 264 i, _, readErr := track.Read(buf) 265 if readErr != nil { 266 log.Log(ctx, "failed to read track", "error", readErr) 267 audioSrc.EndStream() 268 return 269 } 270 // if ctx.Err() != nil { 271 // return 272 // } 273 if !audioFirst { 274 audioFirst = true 275 log.Debug(ctx, "got audio data", "len", len(buf[:i])) 276 } 277 278 gbuf := gst.NewBufferWithSize(int64(len(buf[:i]))) 279 gbuf.Map(gst.MapWrite).WriteData(buf[:i]) 280 gbuf.Unmap() 281 ret := audioSrc.PushBuffer(gbuf) 282 if ret != gst.FlowOK { 283 log.Log(ctx, "failed to push buffer", "error", ret) 284 cancel() 285 return 286 } 287 // state := pipeline.GetCurrentState() 288 // if state != gst.StatePlaying { 289 // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 290 // cancel() 291 // return 292 // } 293 } 294 } 295 }) 296 297 <-ctx.Done() 298 299 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 300 log.Log(ctx, "failed to set pipeline state to null", "error", err) 301 } 302 303 if err := audioSrcElem.SetState(gst.StateNull); err != nil { 304 log.Log(ctx, "failed to set audioSrcElem state to null", "error", err) 305 } 306 307 if err := videoSrcElem.SetState(gst.StateNull); err != nil { 308 log.Log(ctx, "failed to set videoSrcElem state to null", "error", err) 309 } 310 311 log.Log(ctx, "webrtc ingest pipeline done") 312 313 }() 314 select { 315 case <-gatherComplete: 316 return peerConnection.LocalDescription(), nil 317 case <-ctx.Done(): 318 return nil, ctx.Err() 319 } 320}