Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/too-many-keyframes 241 lines 5.9 kB view raw
1package rtcrec 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "time" 8 9 "github.com/pion/rtcp" 10 "github.com/pion/webrtc/v4" 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/config" 13 "stream.place/streamplace/pkg/log" 14) 15 16type RecordingPeerConnection struct { 17 enabled bool 18 pionpc *webrtc.PeerConnection 19 file *os.File 20 stream *RecorderStream 21} 22 23func NewRecordingPeerConnection(ctx context.Context, cli config.CLI, user string, pionpc *webrtc.PeerConnection, enabled bool) (PeerConnection, error) { 24 if !enabled { 25 return &RecordingPeerConnection{ 26 pionpc: pionpc, 27 enabled: enabled, 28 }, nil 29 } 30 aqt := aqtime.FromTime(time.Now()) 31 f, err := cli.DataFileCreate([]string{"debug-recordings", user, fmt.Sprintf("%s.rtcrec.cbor", aqt.FileSafeString())}, true) 32 if err != nil { 33 return nil, fmt.Errorf("failed to create data file: %w", err) 34 } 35 log.Log(ctx, "logging webrtc session to file", "file", f.Name()) 36 stream, err := MakeWebRTCEncoder(f) 37 if err != nil { 38 return nil, fmt.Errorf("failed to create recorder stream: %w", err) 39 } 40 return &RecordingPeerConnection{ 41 pionpc: pionpc, 42 file: f, 43 stream: stream, 44 enabled: enabled, 45 }, nil 46} 47 48func (pc *RecordingPeerConnection) Do(f func()) { 49 if pc.enabled { 50 go f() 51 } 52} 53 54func (pc *RecordingPeerConnection) Close() error { 55 pc.Do(func() { 56 // This is sloppy but there might be other goroutines still writing so let's chill for a sec 57 time.Sleep(10 * time.Second) 58 pc.file.Close() 59 }) 60 return pc.pionpc.Close() 61} 62 63func (pc *RecordingPeerConnection) CreateAnswer(options *webrtc.AnswerOptions) (webrtc.SessionDescription, error) { 64 now := time.Now() 65 ret, err := pc.pionpc.CreateAnswer(options) 66 if err != nil { 67 return ret, err 68 } 69 pc.Do(func() { 70 pc.stream.Event(WebRTCEvent{ 71 CreateAnswer: &CreateAnswer{ 72 SDPAnswer: ret.SDP, 73 }, 74 Time: now, 75 }) 76 }) 77 return ret, nil 78} 79 80func (pc *RecordingPeerConnection) SetLocalDescription(desc webrtc.SessionDescription) error { 81 now := time.Now() 82 pc.Do(func() { 83 pc.stream.Event(WebRTCEvent{ 84 SetRemoteDescription: &SetRemoteDescription{ 85 SDPRemoteDescription: desc.SDP, 86 }, 87 Time: now, 88 }) 89 }) 90 return pc.pionpc.SetLocalDescription(desc) 91} 92 93func (pc *RecordingPeerConnection) SetRemoteDescription(desc webrtc.SessionDescription) error { 94 now := time.Now() 95 pc.Do(func() { 96 pc.stream.Event(WebRTCEvent{ 97 SetRemoteDescription: &SetRemoteDescription{ 98 SDPRemoteDescription: desc.SDP, 99 }, 100 Time: now, 101 }) 102 }) 103 return pc.pionpc.SetRemoteDescription(desc) 104} 105 106func (pc *RecordingPeerConnection) LocalDescription() *webrtc.SessionDescription { 107 now := time.Now() 108 desc := pc.pionpc.LocalDescription() 109 pc.Do(func() { 110 pc.stream.Event(WebRTCEvent{ 111 LocalDescription: &LocalDescription{ 112 SDPLocalDescription: pc.pionpc.LocalDescription().SDP, 113 }, 114 Time: now, 115 }) 116 }) 117 return desc 118} 119 120// func (pc *RecorderPeerConnection) RemoteDescription() *webrtc.SessionDescription { 121// return pc.pionpc.RemoteDescription() 122// } 123 124func (pc *RecordingPeerConnection) OnICEConnectionStateChange(f func(webrtc.ICEConnectionState)) { 125 pc.pionpc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { 126 now := time.Now() 127 pc.Do(func() { 128 pc.stream.Event(WebRTCEvent{ 129 ICEConnectionStateChange: &ICEConnectionStateChange{ 130 ICEConnectionState: state, 131 }, 132 Time: now, 133 }) 134 }) 135 f(state) 136 }) 137} 138 139func (pc *RecordingPeerConnection) OnConnectionStateChange(f func(webrtc.PeerConnectionState)) { 140 pc.pionpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { 141 now := time.Now() 142 pc.Do(func() { 143 pc.stream.Event(WebRTCEvent{ 144 ConnectionStateChange: &ConnectionStateChange{ 145 ConnectionState: state, 146 }, 147 Time: now, 148 }) 149 }) 150 f(state) 151 }) 152} 153 154func (pc *RecordingPeerConnection) OnTrack(f func(TrackRemote, RTPReceiver)) { 155 pc.pionpc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { 156 now := time.Now() 157 wrappedTrack := &RecordingTrackRemote{track: track, stream: pc.stream, pc: pc} 158 id := track.ID() 159 kind := track.Kind() 160 ssrc := track.SSRC() 161 payloadType := track.PayloadType() 162 streamID := track.StreamID() 163 msid := track.Msid() 164 rid := track.RID() 165 pc.Do(func() { 166 pc.stream.Event(WebRTCEvent{ 167 Track: &Track{ 168 ID: id, 169 Kind: kind, 170 SSRC: ssrc, 171 PayloadType: payloadType, 172 StreamID: streamID, 173 Msid: msid, 174 RID: rid, 175 }, 176 Time: now, 177 }) 178 }) 179 f(wrappedTrack, receiver) 180 }) 181} 182 183func (pc *RecordingPeerConnection) WriteRTCP(pkts []rtcp.Packet) error { 184 return pc.pionpc.WriteRTCP(pkts) 185} 186 187func (pc *RecordingPeerConnection) AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RTPTransceiverInit) (RTPTransceiver, error) { 188 now := time.Now() 189 ret, err := pc.pionpc.AddTransceiverFromKind(kind, init...) 190 pc.Do(func() { 191 pc.stream.Event(WebRTCEvent{ 192 AddTransceiverFromKind: &AddTransceiverFromKind{ 193 Kind: kind, 194 }, 195 Time: now, 196 }) 197 }) 198 return ret, err 199} 200 201func (pc *RecordingPeerConnection) ICEGatheringState() webrtc.ICEGatheringState { 202 now := time.Now() 203 state := pc.pionpc.ICEGatheringState() 204 pc.Do(func() { 205 pc.stream.Event(WebRTCEvent{ 206 ICEGatheringState: &ICEGatheringState{ 207 State: state, 208 }, 209 Time: now, 210 }) 211 }) 212 return state 213} 214 215func (pc *RecordingPeerConnection) OnDataChannel(f func(*webrtc.DataChannel)) { 216 pc.pionpc.OnDataChannel(func(dc *webrtc.DataChannel) { 217 now := time.Now() 218 pc.Do(func() { 219 pc.stream.Event(WebRTCEvent{ 220 DataChannel: &DataChannel{ 221 Label: dc.Label(), 222 }, 223 Time: now, 224 }) 225 }) 226 f(dc) 227 }) 228} 229 230func (pc *RecordingPeerConnection) OnNegotiationNeeded(f func()) { 231 pc.pionpc.OnNegotiationNeeded(func() { 232 now := time.Now() 233 pc.Do(func() { 234 pc.stream.Event(WebRTCEvent{ 235 NegotiationNeeded: &NegotiationNeeded{}, 236 Time: now, 237 }) 238 }) 239 f() 240 }) 241}