Live video on the AT Protocol
1package rtcrec
2
3import (
4 "fmt"
5 "io"
6 "sort"
7 "sync"
8 "time"
9
10 "github.com/fxamacker/cbor/v2"
11 "github.com/pion/webrtc/v4"
12)
13
14type WebRTCEventDecoder struct {
15 dec *cbor.Decoder
16}
17
18func Opts() (cbor.EncMode, error) {
19 opts := cbor.CoreDetEncOptions()
20 opts.Time = cbor.TimeRFC3339Nano
21 em, err := opts.EncMode()
22 if err != nil {
23 return nil, fmt.Errorf("failed to create encoder mode: %w", err)
24 }
25 return em, nil
26}
27
28func MakeWebRTCDecoder(r io.Reader) (*WebRTCEventDecoder, error) {
29 dec := cbor.NewDecoder(r)
30 return &WebRTCEventDecoder{dec: dec}, nil
31}
32
33func (d *WebRTCEventDecoder) Next() (*WebRTCEvent, error) {
34 var ev WebRTCEvent
35 err := d.dec.Decode(&ev)
36 if err != nil {
37 return nil, err
38 }
39 return &ev, err
40}
41
42type WebRTCEventGroup struct {
43 Events map[string][]*WebRTCEvent
44 Tracks map[webrtc.SSRC]map[string][]*WebRTCEvent
45 FirstTime time.Time
46 EventMutex sync.Mutex
47}
48
49const (
50 EventTypeOffer = "Offer"
51 EventTypeCreateAnswer = "CreateAnswer"
52 EventTypeSetRemoteDescription = "SetRemoteDescription"
53 EventTypeSetLocalDescription = "SetLocalDescription"
54 EventTypeLocalDescription = "LocalDescription"
55 EventTypeICEConnectionState = "ICEConnectionStateChange"
56 EventTypeConnectionState = "ConnectionStateChange"
57 EventTypeTrack = "Track"
58 EventTypeAddTransceiverFromKind = "AddTransceiverFromKind"
59 EventTypeICEGatheringState = "ICEGatheringState"
60 EventTypeDataChannel = "DataChannel"
61 EventTypeNegotiationNeeded = "NegotiationNeeded"
62 EventTypeTrackRead = "TrackRead"
63 EventTypeTrackCodec = "TrackCodec"
64 EventTypeTrackKind = "TrackKind"
65 EventTypeTrackPayloadType = "TrackPayloadType"
66 EventTypeTrackSSRC = "TrackSSRC"
67 EventTypeUnknown = "Unknown"
68)
69
70// ReadAllEvents reads all WebRTC events from a CBOR reader and organizes them by type.
71// Returns a map where keys are event types and values are slices of events of that type.
72func ReadAllEvents(r io.Reader) (*WebRTCEventGroup, error) {
73 dec, err := MakeWebRTCDecoder(r)
74 if err != nil {
75 return nil, err
76 }
77
78 eventList := []*WebRTCEvent{}
79 events := make(map[string][]*WebRTCEvent)
80 tracks := make(map[webrtc.SSRC]map[string][]*WebRTCEvent)
81 var firstTime time.Time
82 for {
83 ev, err := dec.Next()
84 if err == io.EOF {
85 break
86 }
87 if err != nil {
88 return nil, err
89 }
90 eventList = append(eventList, ev)
91 }
92
93 sort.Slice(eventList, func(i, j int) bool {
94 return eventList[i].Time.Before(eventList[j].Time)
95 })
96
97 for _, ev := range eventList {
98 if firstTime.IsZero() {
99 firstTime = ev.Time
100 }
101
102 // Determine the event type based on which field is non-nil
103 var eventType string
104 var trackSSRC *webrtc.SSRC
105 switch {
106 case ev.Offer != nil:
107 eventType = EventTypeOffer
108 case ev.CreateAnswer != nil:
109 eventType = EventTypeCreateAnswer
110 case ev.SetRemoteDescription != nil:
111 eventType = EventTypeSetRemoteDescription
112 case ev.SetLocalDescription != nil:
113 eventType = EventTypeSetLocalDescription
114 case ev.LocalDescription != nil:
115 eventType = EventTypeLocalDescription
116 case ev.ICEConnectionStateChange != nil:
117 eventType = EventTypeICEConnectionState
118 case ev.ConnectionStateChange != nil:
119 eventType = EventTypeConnectionState
120 case ev.Track != nil:
121 eventType = EventTypeTrack
122 case ev.AddTransceiverFromKind != nil:
123 eventType = EventTypeAddTransceiverFromKind
124 case ev.ICEGatheringState != nil:
125 eventType = EventTypeICEGatheringState
126 case ev.DataChannel != nil:
127 eventType = EventTypeDataChannel
128 case ev.NegotiationNeeded != nil:
129 eventType = EventTypeNegotiationNeeded
130
131 case ev.TrackRead != nil:
132 trackSSRC = &ev.TrackRead.SSRC
133 eventType = EventTypeTrackRead
134 case ev.TrackCodec != nil:
135 trackSSRC = &ev.TrackCodec.SSRC
136 eventType = EventTypeTrackCodec
137 case ev.TrackKind != nil:
138 trackSSRC = &ev.TrackKind.SSRC
139 eventType = EventTypeTrackKind
140 case ev.TrackPayloadType != nil:
141 trackSSRC = &ev.TrackPayloadType.SSRC
142 eventType = EventTypeTrackPayloadType
143 case ev.TrackSSRC != nil:
144 trackSSRC = &ev.TrackSSRC.SSRC
145 eventType = EventTypeTrackSSRC
146 default:
147 eventType = EventTypeUnknown
148 panic(fmt.Sprintf("unknown event type: %+v", ev))
149 }
150
151 if trackSSRC != nil {
152 if tracks[*trackSSRC] == nil {
153 tracks[*trackSSRC] = make(map[string][]*WebRTCEvent)
154 }
155 if tracks[*trackSSRC][eventType] == nil {
156 tracks[*trackSSRC][eventType] = []*WebRTCEvent{}
157 }
158 tracks[*trackSSRC][eventType] = append(tracks[*trackSSRC][eventType], ev)
159 }
160
161 if events[eventType] == nil {
162 events[eventType] = []*WebRTCEvent{}
163 }
164
165 events[eventType] = append(events[eventType], ev)
166 }
167
168 return &WebRTCEventGroup{
169 Events: events,
170 Tracks: tracks,
171 FirstTime: firstTime,
172 }, nil
173}
174
175func (g *WebRTCEventGroup) Peek(eventType string) *WebRTCEvent {
176 if g.Events[eventType] == nil {
177 panic(fmt.Sprintf("no events of type %s", eventType))
178 }
179 if len(g.Events[eventType]) == 0 {
180 return nil
181 }
182 return g.Events[eventType][0]
183}
184
185func (g *WebRTCEventGroup) PeekTrack(ssrc webrtc.SSRC, eventType string) *WebRTCEvent {
186 if g.Tracks[ssrc] == nil {
187 panic(fmt.Sprintf("no tracks for ssrc %d", ssrc))
188 }
189 if g.Tracks[ssrc][eventType] == nil {
190 panic(fmt.Sprintf("no events of type %s for ssrc %d", eventType, ssrc))
191 }
192 if len(g.Tracks[ssrc][eventType]) == 0 {
193 return nil
194 }
195 return g.Tracks[ssrc][eventType][0]
196}
197
198func (g *WebRTCEventGroup) Next(eventType string) *WebRTCEvent {
199 g.EventMutex.Lock()
200 defer g.EventMutex.Unlock()
201 if g.Events[eventType] == nil {
202 panic(fmt.Sprintf("no events of type %s", eventType))
203 }
204 if len(g.Events[eventType]) == 0 {
205 return nil
206 }
207 ev := g.Events[eventType][0]
208 g.Events[eventType] = g.Events[eventType][1:]
209 return ev
210}
211
212func (g *WebRTCEventGroup) NextTrack(ssrc webrtc.SSRC, eventType string) *WebRTCEvent {
213 if g.Tracks[ssrc] == nil {
214 panic(fmt.Sprintf("no tracks for ssrc %d", ssrc))
215 }
216 if g.Tracks[ssrc][eventType] == nil {
217 panic(fmt.Sprintf("no events of type %s for ssrc %d", eventType, ssrc))
218 }
219 if len(g.Tracks[ssrc][eventType]) == 0 {
220 return nil
221 }
222 ev := g.Tracks[ssrc][eventType][0]
223 g.Tracks[ssrc][eventType] = g.Tracks[ssrc][eventType][1:]
224 return ev
225}