Live video on the AT Protocol
1package rtcrec
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "time"
8
9 "github.com/pion/rtcp"
10 "github.com/pion/webrtc/v4"
11 "stream.place/streamplace/pkg/aqtime"
12 "stream.place/streamplace/pkg/config"
13 "stream.place/streamplace/pkg/log"
14)
15
16type RecordingPeerConnection struct {
17 enabled bool
18 pionpc *webrtc.PeerConnection
19 file *os.File
20 stream *RecorderStream
21}
22
23func NewRecordingPeerConnection(ctx context.Context, cli config.CLI, user string, pionpc *webrtc.PeerConnection, enabled bool) (PeerConnection, error) {
24 if !enabled {
25 return &RecordingPeerConnection{
26 pionpc: pionpc,
27 enabled: enabled,
28 }, nil
29 }
30 aqt := aqtime.FromTime(time.Now())
31 f, err := cli.DataFileCreate([]string{"debug-recordings", user, fmt.Sprintf("%s.rtcrec.cbor", aqt.FileSafeString())}, true)
32 if err != nil {
33 return nil, fmt.Errorf("failed to create data file: %w", err)
34 }
35 log.Log(ctx, "logging webrtc session to file", "file", f.Name())
36 stream, err := MakeWebRTCEncoder(f)
37 if err != nil {
38 return nil, fmt.Errorf("failed to create recorder stream: %w", err)
39 }
40 return &RecordingPeerConnection{
41 pionpc: pionpc,
42 file: f,
43 stream: stream,
44 enabled: enabled,
45 }, nil
46}
47
48func (pc *RecordingPeerConnection) Do(f func()) {
49 if pc.enabled {
50 go f()
51 }
52}
53
54func (pc *RecordingPeerConnection) Close() error {
55 pc.Do(func() {
56 // This is sloppy but there might be other goroutines still writing so let's chill for a sec
57 time.Sleep(10 * time.Second)
58 pc.file.Close()
59 })
60 return pc.pionpc.Close()
61}
62
63func (pc *RecordingPeerConnection) CreateAnswer(options *webrtc.AnswerOptions) (webrtc.SessionDescription, error) {
64 now := time.Now()
65 ret, err := pc.pionpc.CreateAnswer(options)
66 if err != nil {
67 return ret, err
68 }
69 pc.Do(func() {
70 pc.stream.Event(WebRTCEvent{
71 CreateAnswer: &CreateAnswer{
72 SDPAnswer: ret.SDP,
73 },
74 Time: now,
75 })
76 })
77 return ret, nil
78}
79
80func (pc *RecordingPeerConnection) SetLocalDescription(desc webrtc.SessionDescription) error {
81 now := time.Now()
82 pc.Do(func() {
83 pc.stream.Event(WebRTCEvent{
84 SetRemoteDescription: &SetRemoteDescription{
85 SDPRemoteDescription: desc.SDP,
86 },
87 Time: now,
88 })
89 })
90 return pc.pionpc.SetLocalDescription(desc)
91}
92
93func (pc *RecordingPeerConnection) SetRemoteDescription(desc webrtc.SessionDescription) error {
94 now := time.Now()
95 pc.Do(func() {
96 pc.stream.Event(WebRTCEvent{
97 SetRemoteDescription: &SetRemoteDescription{
98 SDPRemoteDescription: desc.SDP,
99 },
100 Time: now,
101 })
102 })
103 return pc.pionpc.SetRemoteDescription(desc)
104}
105
106func (pc *RecordingPeerConnection) LocalDescription() *webrtc.SessionDescription {
107 now := time.Now()
108 desc := pc.pionpc.LocalDescription()
109 pc.Do(func() {
110 pc.stream.Event(WebRTCEvent{
111 LocalDescription: &LocalDescription{
112 SDPLocalDescription: pc.pionpc.LocalDescription().SDP,
113 },
114 Time: now,
115 })
116 })
117 return desc
118}
119
120// func (pc *RecorderPeerConnection) RemoteDescription() *webrtc.SessionDescription {
121// return pc.pionpc.RemoteDescription()
122// }
123
124func (pc *RecordingPeerConnection) OnICEConnectionStateChange(f func(webrtc.ICEConnectionState)) {
125 pc.pionpc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
126 now := time.Now()
127 pc.Do(func() {
128 pc.stream.Event(WebRTCEvent{
129 ICEConnectionStateChange: &ICEConnectionStateChange{
130 ICEConnectionState: state,
131 },
132 Time: now,
133 })
134 })
135 f(state)
136 })
137}
138
139func (pc *RecordingPeerConnection) OnConnectionStateChange(f func(webrtc.PeerConnectionState)) {
140 pc.pionpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
141 now := time.Now()
142 pc.Do(func() {
143 pc.stream.Event(WebRTCEvent{
144 ConnectionStateChange: &ConnectionStateChange{
145 ConnectionState: state,
146 },
147 Time: now,
148 })
149 })
150 f(state)
151 })
152}
153
154func (pc *RecordingPeerConnection) OnTrack(f func(TrackRemote, RTPReceiver)) {
155 pc.pionpc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
156 now := time.Now()
157 wrappedTrack := &RecordingTrackRemote{track: track, stream: pc.stream, pc: pc}
158 id := track.ID()
159 kind := track.Kind()
160 ssrc := track.SSRC()
161 payloadType := track.PayloadType()
162 streamID := track.StreamID()
163 msid := track.Msid()
164 rid := track.RID()
165 pc.Do(func() {
166 pc.stream.Event(WebRTCEvent{
167 Track: &Track{
168 ID: id,
169 Kind: kind,
170 SSRC: ssrc,
171 PayloadType: payloadType,
172 StreamID: streamID,
173 Msid: msid,
174 RID: rid,
175 },
176 Time: now,
177 })
178 })
179 f(wrappedTrack, receiver)
180 })
181}
182
183func (pc *RecordingPeerConnection) WriteRTCP(pkts []rtcp.Packet) error {
184 return pc.pionpc.WriteRTCP(pkts)
185}
186
187func (pc *RecordingPeerConnection) AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RTPTransceiverInit) (RTPTransceiver, error) {
188 now := time.Now()
189 ret, err := pc.pionpc.AddTransceiverFromKind(kind, init...)
190 pc.Do(func() {
191 pc.stream.Event(WebRTCEvent{
192 AddTransceiverFromKind: &AddTransceiverFromKind{
193 Kind: kind,
194 },
195 Time: now,
196 })
197 })
198 return ret, err
199}
200
201func (pc *RecordingPeerConnection) ICEGatheringState() webrtc.ICEGatheringState {
202 now := time.Now()
203 state := pc.pionpc.ICEGatheringState()
204 pc.Do(func() {
205 pc.stream.Event(WebRTCEvent{
206 ICEGatheringState: &ICEGatheringState{
207 State: state,
208 },
209 Time: now,
210 })
211 })
212 return state
213}
214
215func (pc *RecordingPeerConnection) OnDataChannel(f func(*webrtc.DataChannel)) {
216 pc.pionpc.OnDataChannel(func(dc *webrtc.DataChannel) {
217 now := time.Now()
218 pc.Do(func() {
219 pc.stream.Event(WebRTCEvent{
220 DataChannel: &DataChannel{
221 Label: dc.Label(),
222 },
223 Time: now,
224 })
225 })
226 f(dc)
227 })
228}
229
230func (pc *RecordingPeerConnection) OnNegotiationNeeded(f func()) {
231 pc.pionpc.OnNegotiationNeeded(func() {
232 now := time.Now()
233 pc.Do(func() {
234 pc.stream.Event(WebRTCEvent{
235 NegotiationNeeded: &NegotiationNeeded{},
236 Time: now,
237 })
238 })
239 f()
240 })
241}