Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eli/concat-script 148 lines 3.7 kB view raw
1package rtcrec 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "time" 8 9 "github.com/pion/rtcp" 10 "github.com/pion/webrtc/v4" 11 "stream.place/streamplace/pkg/log" 12) 13 14type ReplayPeerConnection struct { 15 startTime time.Time 16 group *WebRTCEventGroup 17 ctx context.Context 18} 19 20func NewReplayPeerConnection(ctx context.Context, r io.Reader) (PeerConnection, error) { 21 group, err := ReadAllEvents(r) 22 if err != nil { 23 return nil, fmt.Errorf("failed to create web rtc decoder: %w", err) 24 } 25 26 return &ReplayPeerConnection{ 27 startTime: time.Now(), 28 group: group, 29 ctx: context.Background(), 30 }, nil 31} 32 33func (pc *ReplayPeerConnection) wait(label string, t time.Time) <-chan time.Time { 34 now := time.Now() 35 historicalDiff := t.Sub(pc.group.FirstTime) 36 currentDiff := time.Since(pc.startTime) 37 diff := historicalDiff - currentDiff 38 log.Debug(pc.ctx, "waiting for event", "event", label, "diff", diff, "t", t, "first", pc.group.FirstTime, "startTime", pc.startTime, "now", now) 39 return time.After(diff) 40} 41 42func (pc *ReplayPeerConnection) Close() error { 43 // todo: implement stopping here 44 return nil 45} 46 47func (pc *ReplayPeerConnection) CreateAnswer(options *webrtc.AnswerOptions) (webrtc.SessionDescription, error) { 48 ev := pc.group.Peek(EventTypeCreateAnswer) 49 if ev == nil { 50 return webrtc.SessionDescription{}, fmt.Errorf("no create answer event found") 51 } 52 return webrtc.SessionDescription{SDP: ev.CreateAnswer.SDPAnswer}, nil 53} 54 55func (pc *ReplayPeerConnection) SetLocalDescription(desc webrtc.SessionDescription) error { 56 return nil 57} 58 59func (pc *ReplayPeerConnection) SetRemoteDescription(desc webrtc.SessionDescription) error { 60 return nil 61} 62 63func (pc *ReplayPeerConnection) LocalDescription() *webrtc.SessionDescription { 64 ev := pc.group.Peek(EventTypeLocalDescription) 65 if ev == nil { 66 return nil 67 } 68 return &webrtc.SessionDescription{SDP: ev.LocalDescription.SDPLocalDescription} 69} 70 71// func (pc *ReplayPeerConnection) RemoteDescription() *webrtc.SessionDescription { 72// return pc.pionpc.RemoteDescription() 73// } 74 75func (pc *ReplayPeerConnection) OnICEConnectionStateChange(f func(webrtc.ICEConnectionState)) { 76 go func() { 77 for { 78 ev := pc.group.Next(EventTypeICEConnectionState) 79 if ev == nil { 80 return 81 } 82 select { 83 case <-pc.wait("OnICEConnectionStateChange", ev.Time): 84 f(ev.ICEConnectionStateChange.ICEConnectionState) 85 case <-pc.ctx.Done(): 86 return 87 } 88 } 89 }() 90} 91 92func (pc *ReplayPeerConnection) OnConnectionStateChange(f func(webrtc.PeerConnectionState)) { 93 go func() { 94 for { 95 ev := pc.group.Next(EventTypeConnectionState) 96 if ev == nil { 97 return 98 } 99 select { 100 case <-pc.wait("OnConnectionStateChange", ev.Time): 101 f(ev.ConnectionStateChange.ConnectionState) 102 case <-pc.ctx.Done(): 103 return 104 } 105 } 106 }() 107} 108 109func (pc *ReplayPeerConnection) OnTrack(f func(TrackRemote, RTPReceiver)) { 110 go func() { 111 for { 112 ev := pc.group.Next(EventTypeTrack) 113 if ev == nil { 114 return 115 } 116 select { 117 case <-pc.wait("OnTrack", ev.Time): 118 track := &ReplayTrackRemote{ 119 ssrc: ev.Track.SSRC, 120 trackEvent: ev, 121 events: pc.group.Tracks[ev.Track.SSRC], 122 pc: pc, 123 } 124 go func() { 125 f(track, nil) 126 }() 127 case <-pc.ctx.Done(): 128 return 129 } 130 } 131 }() 132} 133 134func (pc *ReplayPeerConnection) WriteRTCP(pkts []rtcp.Packet) error { 135 return nil 136} 137 138func (pc *ReplayPeerConnection) AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RTPTransceiverInit) (RTPTransceiver, error) { 139 return nil, nil 140} 141 142func (pc *ReplayPeerConnection) ICEGatheringState() webrtc.ICEGatheringState { 143 ev := pc.group.Peek(EventTypeICEGatheringState) 144 if ev == nil { 145 return webrtc.ICEGatheringStateNew 146 } 147 return ev.ICEGatheringState.State 148}