Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.9 148 lines 3.7 kB view raw
1package rtcrec 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "time" 8 9 "github.com/pion/rtcp" 10 "github.com/pion/webrtc/v4" 11 "stream.place/streamplace/pkg/log" 12) 13 14type ReplayPeerConnection struct { 15 startTime time.Time 16 group *WebRTCEventGroup 17 ctx context.Context 18} 19 20func NewReplayPeerConnection(ctx context.Context, r io.Reader) (PeerConnection, error) { 21 group, err := ReadAllEvents(r) 22 if err != nil { 23 return nil, fmt.Errorf("failed to create web rtc decoder: %w", err) 24 } 25 26 return &ReplayPeerConnection{ 27 startTime: time.Now(), 28 group: group, 29 ctx: context.Background(), 30 }, nil 31} 32 33func (pc *ReplayPeerConnection) wait(label string, t time.Time) <-chan time.Time { 34 now := time.Now() 35 historicalDiff := t.Sub(pc.group.FirstTime) 36 currentDiff := time.Since(pc.startTime) 37 diff := historicalDiff - currentDiff 38 log.Debug(pc.ctx, "waiting for event", "event", label, "diff", diff, "t", t, "first", pc.group.FirstTime, "startTime", pc.startTime, "now", now) 39 return time.After(diff) 40} 41 42func (pc *ReplayPeerConnection) Close() error { 43 // todo: implement stopping here 44 return nil 45} 46 47func (pc *ReplayPeerConnection) CreateAnswer(options *webrtc.AnswerOptions) (webrtc.SessionDescription, error) { 48 ev := pc.group.Peek(EventTypeCreateAnswer) 49 if ev == nil { 50 return webrtc.SessionDescription{}, fmt.Errorf("no create answer event found") 51 } 52 return webrtc.SessionDescription{SDP: ev.CreateAnswer.SDPAnswer}, nil 53} 54 55func (pc *ReplayPeerConnection) SetLocalDescription(desc webrtc.SessionDescription) error { 56 return nil 57} 58 59func (pc *ReplayPeerConnection) SetRemoteDescription(desc webrtc.SessionDescription) error { 60 return nil 61} 62 63func (pc *ReplayPeerConnection) LocalDescription() *webrtc.SessionDescription { 64 ev := pc.group.Peek(EventTypeLocalDescription) 65 if ev == nil { 66 return nil 67 } 68 return &webrtc.SessionDescription{SDP: ev.LocalDescription.SDPLocalDescription} 69} 70 71// func (pc *ReplayPeerConnection) RemoteDescription() *webrtc.SessionDescription { 72// return pc.pionpc.RemoteDescription() 73// } 74 75func (pc *ReplayPeerConnection) OnICEConnectionStateChange(f func(webrtc.ICEConnectionState)) { 76 go func() { 77 for { 78 ev := pc.group.Next(EventTypeICEConnectionState) 79 if ev == nil { 80 return 81 } 82 select { 83 case <-pc.wait("OnICEConnectionStateChange", ev.Time): 84 f(ev.ICEConnectionStateChange.ICEConnectionState) 85 case <-pc.ctx.Done(): 86 return 87 } 88 } 89 }() 90} 91 92func (pc *ReplayPeerConnection) OnConnectionStateChange(f func(webrtc.PeerConnectionState)) { 93 go func() { 94 for { 95 ev := pc.group.Next(EventTypeConnectionState) 96 if ev == nil { 97 return 98 } 99 select { 100 case <-pc.wait("OnConnectionStateChange", ev.Time): 101 f(ev.ConnectionStateChange.ConnectionState) 102 case <-pc.ctx.Done(): 103 return 104 } 105 } 106 }() 107} 108 109func (pc *ReplayPeerConnection) OnTrack(f func(TrackRemote, RTPReceiver)) { 110 go func() { 111 for { 112 ev := pc.group.Next(EventTypeTrack) 113 if ev == nil { 114 return 115 } 116 select { 117 case <-pc.wait("OnTrack", ev.Time): 118 track := &ReplayTrackRemote{ 119 ssrc: ev.Track.SSRC, 120 trackEvent: ev, 121 events: pc.group.Tracks[ev.Track.SSRC], 122 pc: pc, 123 } 124 go func() { 125 f(track, nil) 126 }() 127 case <-pc.ctx.Done(): 128 return 129 } 130 } 131 }() 132} 133 134func (pc *ReplayPeerConnection) WriteRTCP(pkts []rtcp.Packet) error { 135 return nil 136} 137 138func (pc *ReplayPeerConnection) AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RTPTransceiverInit) (RTPTransceiver, error) { 139 return nil, nil 140} 141 142func (pc *ReplayPeerConnection) ICEGatheringState() webrtc.ICEGatheringState { 143 ev := pc.group.Peek(EventTypeICEGatheringState) 144 if ev == nil { 145 return webrtc.ICEGatheringStateNew 146 } 147 return ev.ICEGatheringState.State 148}