Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/revert-dev-env 317 lines 10 kB view raw
1package media 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/go-gst/go-gst/gst" 10 "github.com/go-gst/go-gst/gst/app" 11 "github.com/google/uuid" 12 "github.com/pion/rtcp" 13 "github.com/pion/webrtc/v4" 14 "stream.place/streamplace/pkg/log" 15 "stream.place/streamplace/pkg/rtcrec" 16) 17 18// This function remains in scope for the duration of a single users' playback 19func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner, peerConnection rtcrec.PeerConnection, done chan struct{}) (*webrtc.SessionDescription, error) { 20 uu, err := uuid.NewV7() 21 if err != nil { 22 return nil, err 23 } 24 25 ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest", "streamer", signer.Streamer()) 26 27 // Allow us to receive 1 audio track, and 1 video track 28 if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { 29 return nil, fmt.Errorf("failed to add audio transceiver: %w", err) 30 } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { 31 return nil, fmt.Errorf("failed to add video transceiver: %w", err) 32 } 33 34 pipelineSlice := []string{ 35 "multiqueue name=queue", 36 "appsrc format=time is-live=true do-timestamp=true name=videosrc ! capsfilter caps=application/x-rtp ! rtph264depay ! capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=nal ! h264parse disable-passthrough=true config-interval=-1 ! h264timestamper ! identity ! queue.sink_0", 37 "appsrc format=time do-timestamp=true name=audiosrc ! capsfilter caps=application/x-rtp,media=audio,encoding-name=OPUS,payload=111 ! rtpopusdepay ! opusparse ! queue.sink_1", 38 } 39 40 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 41 if err != nil { 42 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 43 } 44 45 queue, err := pipeline.GetElementByName("queue") 46 if err != nil { 47 return nil, fmt.Errorf("failed to get queue element from pipeline: %w", err) 48 } 49 50 // err = queue.Link(signerElem) 51 // if err != nil { 52 // return nil, fmt.Errorf("failed to link queue to signer element: %w", err) 53 // } 54 videoSrcPads, err := queue.GetSrcPads() 55 if err != nil { 56 return nil, fmt.Errorf("failed to get videoSrcPads from queue: %w", err) 57 } 58 if len(videoSrcPads) != 2 { 59 return nil, fmt.Errorf("failed to get videoSrcPads from queue") 60 } 61 videoSrcPad := videoSrcPads[0] 62 audioSrcPad := videoSrcPads[1] 63 64 videoSrcElem, err := pipeline.GetElementByName("videosrc") 65 if err != nil { 66 return nil, fmt.Errorf("failed to get videoSrcElem element from pipeline: %w", err) 67 } 68 videoSrc := app.SrcFromElement(videoSrcElem) 69 70 audioSrcElem, err := pipeline.GetElementByName("audiosrc") 71 if err != nil { 72 return nil, fmt.Errorf("failed to get audioSrcElem element from pipeline: %w", err) 73 } 74 audioSrc := app.SrcFromElement(audioSrcElem) 75 76 // Set the remote SessionDescription 77 if err = peerConnection.SetRemoteDescription(*offer); err != nil { 78 return nil, fmt.Errorf("failed to set remote description: %w", err) 79 } 80 81 // Create answer 82 answer, err := peerConnection.CreateAnswer(nil) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create answer: %w", err) 85 } 86 87 // Sets the LocalDescription, and starts our UDP listeners 88 if err = peerConnection.SetLocalDescription(answer); err != nil { 89 return nil, fmt.Errorf("failed to set local description: %w", err) 90 } 91 92 // Create channel that is blocked until ICE Gathering is complete 93 gatherComplete := rtcrec.GatheringCompletePromise(peerConnection) 94 95 ctx, cancel := context.WithCancel(ctx) 96 signerElem, err := mm.SegmentAndSignElem(ctx, signer) 97 if err != nil { 98 cancel() 99 return nil, fmt.Errorf("failed create signer element: %w", err) 100 } 101 err = pipeline.Add(signerElem) 102 if err != nil { 103 cancel() 104 return nil, fmt.Errorf("failed to add signer element to pipeline: %w", err) 105 } 106 signerElemPads, err := signerElem.GetPads() 107 if err != nil { 108 cancel() 109 return nil, fmt.Errorf("failed to get signerElemPads from signer element: %w", err) 110 } 111 if len(signerElemPads) != 2 { 112 cancel() 113 return nil, fmt.Errorf("failed to get signerElemPads from signer element") 114 } 115 signerElemVideoPad := signerElemPads[0] 116 signerElemAudioPad := signerElemPads[1] 117 linked := videoSrcPad.Link(signerElemVideoPad) 118 if linked != gst.PadLinkOK { 119 cancel() 120 return nil, fmt.Errorf("failed to link videoSrcPad to signerElemVideoPad") 121 } 122 linked = audioSrcPad.Link(signerElemAudioPad) 123 if linked != gst.PadLinkOK { 124 cancel() 125 return nil, fmt.Errorf("failed to link audioSrcPad to signerElemAudioPad") 126 } 127 128 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 129 go func() { 130 defer cancel() 131 defer func() { close(done) }() 132 133 go func() { 134 ticker := time.NewTicker(time.Second * 1) 135 for { 136 select { 137 case <-ctx.Done(): 138 return 139 case <-ticker.C: 140 state := pipeline.GetCurrentState() 141 log.Debug(ctx, "pipeline state", "state", state) 142 } 143 } 144 }() 145 146 go func() { 147 if err := HandleBusMessages(ctx, pipeline); err != nil { 148 log.Log(ctx, "pipeline error", "error", err) 149 } 150 cancel() 151 }() 152 153 // subscription to bus messages for key revocation 154 go mm.HandleKeyRevocation(ctx, signer, pipeline) 155 156 go func() { 157 <-ctx.Done() 158 if cErr := peerConnection.Close(); cErr != nil { 159 log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 160 } 161 }() 162 163 log.Debug(ctx, "starting pipeline") 164 165 // Start the pipeline 166 err = pipeline.SetState(gst.StatePlaying) 167 if err != nil { 168 log.Log(ctx, "failed to set pipeline state", "error", err) 169 cancel() 170 } 171 172 // Set the handler for ICE connection state 173 // This will notify you when the peer has connected/disconnected 174 peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 175 log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 176 }) 177 178 // Set the handler for Peer connection state 179 // This will notify you when the peer has connected/disconnected 180 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 181 log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 182 183 if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected { 184 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 185 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 186 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 187 log.Log(ctx, "Peer Connection has ended, exiting", "state", s.String()) 188 cancel() 189 } 190 }) 191 192 videoFirst := false 193 audioFirst := false 194 195 peerConnection.OnTrack(func(track rtcrec.TrackRemote, _ rtcrec.RTPReceiver) { 196 log.Warn(ctx, "OnTrack", "kind", track.Kind()) 197 if track.Kind() == webrtc.RTPCodecTypeVideo { 198 // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval 199 go func() { 200 ticker := time.NewTicker(time.Second * 1) 201 for { 202 select { 203 case <-ctx.Done(): 204 return 205 case <-ticker.C: 206 rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) 207 if rtcpSendErr != nil { 208 log.Log(ctx, "failed to send rtcp packet", "error", rtcpSendErr) 209 cancel() 210 return 211 } 212 } 213 } 214 }() 215 216 codecName := strings.Split(track.Codec().MimeType, "/")[1] 217 log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 218 219 // appSrc := pipelineForCodec(track, codecName) 220 buf := make([]byte, 1400) 221 for { 222 i, _, readErr := track.Read(buf) 223 if readErr != nil { 224 log.Log(ctx, "failed to read track", "error", readErr) 225 cancel() 226 return 227 } 228 if ctx.Err() != nil { 229 return 230 } 231 if !videoFirst { 232 videoFirst = true 233 log.Debug(ctx, "got video data", "len", len(buf[:i])) 234 } 235 236 gbuf := gst.NewBufferWithSize(int64(len(buf[:i]))) 237 gbuf.Map(gst.MapWrite).WriteData(buf[:i]) 238 gbuf.Unmap() 239 240 ret := videoSrc.PushBuffer(gbuf) 241 if ret != gst.FlowOK { 242 log.Log(ctx, "failed to push buffer", "error", ret) 243 cancel() 244 return 245 } 246 // state := pipeline.GetCurrentState() 247 // if state != gst.StatePlaying { 248 // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 249 // cancel() 250 // return 251 // } 252 } 253 } 254 if track.Kind() == webrtc.RTPCodecTypeAudio { 255 256 codecName := strings.Split(track.Codec().MimeType, "/")[1] 257 log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 258 259 buf := make([]byte, 1400) 260 for { 261 i, _, readErr := track.Read(buf) 262 if readErr != nil { 263 log.Log(ctx, "failed to read track", "error", readErr) 264 cancel() 265 return 266 } 267 if ctx.Err() != nil { 268 return 269 } 270 if !audioFirst { 271 audioFirst = true 272 log.Debug(ctx, "got audio data", "len", len(buf[:i])) 273 } 274 275 gbuf := gst.NewBufferWithSize(int64(len(buf[:i]))) 276 gbuf.Map(gst.MapWrite).WriteData(buf[:i]) 277 gbuf.Unmap() 278 ret := audioSrc.PushBuffer(gbuf) 279 if ret != gst.FlowOK { 280 log.Log(ctx, "failed to push buffer", "error", ret) 281 cancel() 282 return 283 } 284 // state := pipeline.GetCurrentState() 285 // if state != gst.StatePlaying { 286 // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 287 // cancel() 288 // return 289 // } 290 } 291 } 292 }) 293 294 <-ctx.Done() 295 296 if err := pipeline.BlockSetState(gst.StateNull); err != nil { 297 log.Log(ctx, "failed to set pipeline state to null", "error", err) 298 } 299 300 if err := audioSrcElem.SetState(gst.StateNull); err != nil { 301 log.Log(ctx, "failed to set audioSrcElem state to null", "error", err) 302 } 303 304 if err := videoSrcElem.SetState(gst.StateNull); err != nil { 305 log.Log(ctx, "failed to set videoSrcElem state to null", "error", err) 306 } 307 308 log.Log(ctx, "webrtc ingest pipeline done") 309 310 }() 311 select { 312 case <-gatherComplete: 313 return peerConnection.LocalDescription(), nil 314 case <-ctx.Done(): 315 return nil, ctx.Err() 316 } 317}