Live video on the AT Protocol
1package rtcrec
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8
9 "github.com/pion/rtcp"
10 "github.com/pion/webrtc/v4"
11 "stream.place/streamplace/pkg/log"
12)
13
14type ReplayPeerConnection struct {
15 startTime time.Time
16 group *WebRTCEventGroup
17 ctx context.Context
18}
19
20func NewReplayPeerConnection(ctx context.Context, r io.Reader) (PeerConnection, error) {
21 group, err := ReadAllEvents(r)
22 if err != nil {
23 return nil, fmt.Errorf("failed to create web rtc decoder: %w", err)
24 }
25
26 return &ReplayPeerConnection{
27 startTime: time.Now(),
28 group: group,
29 ctx: context.Background(),
30 }, nil
31}
32
33func (pc *ReplayPeerConnection) wait(label string, t time.Time) <-chan time.Time {
34 now := time.Now()
35 historicalDiff := t.Sub(pc.group.FirstTime)
36 currentDiff := time.Since(pc.startTime)
37 diff := historicalDiff - currentDiff
38 log.Debug(pc.ctx, "waiting for event", "event", label, "diff", diff, "t", t, "first", pc.group.FirstTime, "startTime", pc.startTime, "now", now)
39 return time.After(diff)
40}
41
42func (pc *ReplayPeerConnection) Close() error {
43 // todo: implement stopping here
44 return nil
45}
46
47func (pc *ReplayPeerConnection) CreateAnswer(options *webrtc.AnswerOptions) (webrtc.SessionDescription, error) {
48 ev := pc.group.Peek(EventTypeCreateAnswer)
49 if ev == nil {
50 return webrtc.SessionDescription{}, fmt.Errorf("no create answer event found")
51 }
52 return webrtc.SessionDescription{SDP: ev.CreateAnswer.SDPAnswer}, nil
53}
54
55func (pc *ReplayPeerConnection) SetLocalDescription(desc webrtc.SessionDescription) error {
56 return nil
57}
58
59func (pc *ReplayPeerConnection) SetRemoteDescription(desc webrtc.SessionDescription) error {
60 return nil
61}
62
63func (pc *ReplayPeerConnection) LocalDescription() *webrtc.SessionDescription {
64 ev := pc.group.Peek(EventTypeLocalDescription)
65 if ev == nil {
66 return nil
67 }
68 return &webrtc.SessionDescription{SDP: ev.LocalDescription.SDPLocalDescription}
69}
70
71// func (pc *ReplayPeerConnection) RemoteDescription() *webrtc.SessionDescription {
72// return pc.pionpc.RemoteDescription()
73// }
74
75func (pc *ReplayPeerConnection) OnICEConnectionStateChange(f func(webrtc.ICEConnectionState)) {
76 go func() {
77 for {
78 ev := pc.group.Next(EventTypeICEConnectionState)
79 if ev == nil {
80 return
81 }
82 select {
83 case <-pc.wait("OnICEConnectionStateChange", ev.Time):
84 f(ev.ICEConnectionStateChange.ICEConnectionState)
85 case <-pc.ctx.Done():
86 return
87 }
88 }
89 }()
90}
91
92func (pc *ReplayPeerConnection) OnConnectionStateChange(f func(webrtc.PeerConnectionState)) {
93 go func() {
94 for {
95 ev := pc.group.Next(EventTypeConnectionState)
96 if ev == nil {
97 return
98 }
99 select {
100 case <-pc.wait("OnConnectionStateChange", ev.Time):
101 f(ev.ConnectionStateChange.ConnectionState)
102 case <-pc.ctx.Done():
103 return
104 }
105 }
106 }()
107}
108
109func (pc *ReplayPeerConnection) OnTrack(f func(TrackRemote, RTPReceiver)) {
110 go func() {
111 for {
112 ev := pc.group.Next(EventTypeTrack)
113 if ev == nil {
114 return
115 }
116 select {
117 case <-pc.wait("OnTrack", ev.Time):
118 track := &ReplayTrackRemote{
119 ssrc: ev.Track.SSRC,
120 trackEvent: ev,
121 events: pc.group.Tracks[ev.Track.SSRC],
122 pc: pc,
123 }
124 go func() {
125 f(track, nil)
126 }()
127 case <-pc.ctx.Done():
128 return
129 }
130 }
131 }()
132}
133
134func (pc *ReplayPeerConnection) WriteRTCP(pkts []rtcp.Packet) error {
135 return nil
136}
137
138func (pc *ReplayPeerConnection) AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RTPTransceiverInit) (RTPTransceiver, error) {
139 return nil, nil
140}
141
142func (pc *ReplayPeerConnection) ICEGatheringState() webrtc.ICEGatheringState {
143 ev := pc.group.Peek(EventTypeICEGatheringState)
144 if ev == nil {
145 return webrtc.ICEGatheringStateNew
146 }
147 return ev.ICEGatheringState.State
148}