Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

tests: working RTCRec tests

+38 -53
+3 -5
pkg/media/rtcrec_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "errors" 6 5 "os" 7 6 "testing" 8 7 ··· 26 25 name: "IntermittentTracks", 27 26 fatalErrors: false, 28 27 fixture: getFixture("intermittent-tracks.cbor"), 29 - expectedSegments: 2, 28 + expectedSegments: 1, 30 29 }, 31 30 { 32 31 name: "SegmentConvergenceIssues", ··· 90 89 require.NoError(t, err) 91 90 // fmt.Println(answer.SDP) 92 91 pipelineError := <-done 93 - if err != nil && !errors.Is(err, ErrPeerConnectionClosed) { 94 - require.NoError(t, pipelineError) 95 - } 92 + require.ErrorIs(t, pipelineError, context.Canceled) 93 + 96 94 // the segment getting ingested is ever so slightly after the done, which doesn't matter except in tests, just do a backoff for checking 97 95 require.Equal(t, testCase.expectedSegments, segCount) 98 96 ticker := backoff.NewTicker(backoff.NewExponentialBackOff())
+35 -48
pkg/media/webrtc_ingest.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "errors" 6 5 "fmt" 7 6 "strings" 8 7 "time" ··· 15 14 "stream.place/streamplace/pkg/log" 16 15 "stream.place/streamplace/pkg/rtcrec" 17 16 ) 18 - 19 - var ErrPeerConnectionClosed = errors.New("peer connection closed") 20 17 21 18 // This function remains in scope for the duration of a single users' playback 22 19 func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner, peerConnection rtcrec.PeerConnection, done chan error) (*webrtc.SessionDescription, error) { ··· 130 127 131 128 // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 132 129 go func() { 133 - pipelineErrorChan := make(chan error) 130 + busErrorChan := make(chan error) 131 + go func() { 132 + err := HandleBusMessages(ctx, pipeline) 133 + if err != nil { 134 + log.Log(ctx, "pipeline error", "error", err) 135 + } 136 + busErrorChan <- err 137 + cancel() 138 + }() 139 + 134 140 defer cancel() 141 + defer func() { done <- <-busErrorChan }() 135 142 136 143 go func() { 137 144 ticker := time.NewTicker(time.Second * 1) ··· 144 151 log.Debug(ctx, "pipeline state", "state", state) 145 152 } 146 153 } 147 - }() 148 - 149 - go func() { 150 - pipelineErrorChan <- HandleBusMessages(ctx, pipeline) 151 154 }() 152 155 153 156 // subscription to bus messages for key revocation ··· 166 169 err = pipeline.SetState(gst.StatePlaying) 167 170 if err != nil { 168 171 log.Log(ctx, "failed to set pipeline state", "error", err) 169 - return 172 + cancel() 170 173 } 171 174 172 175 // Set the handler for ICE connection state ··· 180 183 peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 181 184 log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 182 185 183 - if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected || s == webrtc.PeerConnectionStateClosed { 186 + if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateDisconnected { 184 187 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 185 188 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 186 189 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 187 190 log.Log(ctx, "Peer Connection has ended, exiting", "state", s.String()) 188 - pipeline.Error(ErrPeerConnectionClosed.Error(), ErrPeerConnectionClosed) 191 + cancel() 189 192 } 190 193 }) 191 194 ··· 206 209 rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) 207 210 if rtcpSendErr != nil { 208 211 log.Log(ctx, "failed to send rtcp packet", "error", rtcpSendErr) 209 - pipeline.Error(fmt.Sprintf("failed to send rtcp packet: %s", rtcpSendErr.Error()), nil) 212 + cancel() 210 213 return 211 214 } 212 215 } ··· 220 223 buf := make([]byte, 1400) 221 224 for { 222 225 i, _, readErr := track.Read(buf) 223 - if ctx.Err() != nil { 226 + if readErr != nil { 227 + log.Log(ctx, "failed to read track", "error", readErr) 228 + cancel() 224 229 return 225 230 } 226 - if readErr != nil { 227 - log.Log(ctx, "failed to read track", "error", readErr) 228 - err := fmt.Errorf("failed to read track: %s", readErr.Error()) 229 - pipeline.Error(err.Error(), err) 231 + if ctx.Err() != nil { 230 232 return 231 233 } 232 234 if !videoFirst { ··· 241 243 ret := videoSrc.PushBuffer(gbuf) 242 244 if ret != gst.FlowOK { 243 245 log.Log(ctx, "failed to push buffer", "error", ret) 244 - err := fmt.Errorf("failed to push buffer: %s", ret.String()) 245 - pipeline.Error(err.Error(), err) 246 + cancel() 246 247 return 247 248 } 248 249 // state := pipeline.GetCurrentState() ··· 261 262 buf := make([]byte, 1400) 262 263 for { 263 264 i, _, readErr := track.Read(buf) 264 - if ctx.Err() != nil { 265 + if readErr != nil { 266 + log.Log(ctx, "failed to read track", "error", readErr) 267 + cancel() 265 268 return 266 269 } 267 - if readErr != nil { 268 - log.Log(ctx, "failed to read track", "error", readErr) 269 - err := fmt.Errorf("failed to read track: %s", readErr.Error()) 270 - pipeline.Error(err.Error(), err) 270 + if ctx.Err() != nil { 271 271 return 272 272 } 273 273 if !audioFirst { ··· 281 281 ret := audioSrc.PushBuffer(gbuf) 282 282 if ret != gst.FlowOK { 283 283 log.Log(ctx, "failed to push buffer", "error", ret) 284 - err := fmt.Errorf("failed to push buffer: %s", ret.String()) 285 - pipeline.Error(err.Error(), err) 284 + cancel() 286 285 return 287 286 } 288 287 // state := pipeline.GetCurrentState() ··· 295 294 } 296 295 }) 297 296 298 - defer func() { 299 - err = pipeline.Remove(signerElem) 300 - if err != nil { 301 - log.Log(ctx, "failed to remove signer element from pipeline", "error", err) 302 - } 303 - if err := pipeline.BlockSetState(gst.StateNull); err != nil { 304 - log.Log(ctx, "failed to set pipeline state to null", "error", err) 305 - } 297 + <-ctx.Done() 306 298 307 - if err := audioSrcElem.SetState(gst.StateNull); err != nil { 308 - log.Log(ctx, "failed to set audioSrcElem state to null", "error", err) 309 - } 299 + if err := pipeline.BlockSetState(gst.StateNull); err != nil { 300 + log.Log(ctx, "failed to set pipeline state to null", "error", err) 301 + } 310 302 311 - if err := videoSrcElem.SetState(gst.StateNull); err != nil { 312 - log.Log(ctx, "failed to set videoSrcElem state to null", "error", err) 313 - } 314 - err = signerElem.BlockSetState(gst.StateNull) 315 - if err != nil { 316 - log.Log(ctx, "failed to set signer element state to null", "error", err) 317 - } 318 - signerElem = nil 319 - }() 303 + if err := audioSrcElem.SetState(gst.StateNull); err != nil { 304 + log.Log(ctx, "failed to set audioSrcElem state to null", "error", err) 305 + } 320 306 321 - pipelineErr := <-pipelineErrorChan 307 + if err := videoSrcElem.SetState(gst.StateNull); err != nil { 308 + log.Log(ctx, "failed to set videoSrcElem state to null", "error", err) 309 + } 322 310 323 311 log.Log(ctx, "webrtc ingest pipeline done") 324 312 325 - done <- pipelineErr 326 313 }() 327 314 select { 328 315 case <-gatherComplete: