Live video on the AT Protocol
1import { usePlayerStore } from "@streamplace/components";
2import {
3 createStreamKeyRecord,
4 selectStoredKey,
5} from "features/bluesky/blueskySlice";
6import { useEffect, useRef, useState } from "react";
7import { useAppDispatch, useAppSelector } from "store/hooks";
8import { RTCPeerConnection, RTCSessionDescription } from "./webrtc-primitives";
9
10export default function useWebRTC(
11 endpoint: string,
12): [MediaStream | null, boolean] {
13 const [mediaStream, setMediaStream] = useState<MediaStream | null>(null);
14 const [stuck, setStuck] = useState<boolean>(false);
15
16 const lastChange = useRef<number>(0);
17
18 useEffect(() => {
19 const peerConnection = new RTCPeerConnection({
20 bundlePolicy: "max-bundle",
21 });
22 peerConnection.addTransceiver("video", {
23 direction: "recvonly",
24 });
25 peerConnection.addTransceiver("audio", {
26 direction: "recvonly",
27 });
28 peerConnection.addEventListener("track", (event) => {
29 const track = event.track;
30 if (!track) {
31 return;
32 }
33 setMediaStream(event.streams[0]);
34 });
35 peerConnection.addEventListener("connectionstatechange", () => {
36 console.log("connection state change", peerConnection.connectionState);
37 if (peerConnection.connectionState === "closed") {
38 setStuck(true);
39 }
40 if (peerConnection.connectionState !== "connected") {
41 return;
42 }
43 });
44 peerConnection.addEventListener("negotiationneeded", () => {
45 negotiateConnectionWithClientOffer(peerConnection, endpoint);
46 });
47
48 let lastFramesReceived = 0;
49 let lastAudioFramesReceived = 0;
50
51 const handle = setInterval(async () => {
52 const stats = await peerConnection.getStats();
53 stats.forEach((stat) => {
54 const mediaType = stat.mediaType /* web */ ?? stat.kind; /* native */
55 if (stat.type === "inbound-rtp" && mediaType === "audio") {
56 const audioFramesReceived = stat.lastPacketReceivedTimestamp;
57 if (lastAudioFramesReceived !== audioFramesReceived) {
58 lastAudioFramesReceived = audioFramesReceived;
59 lastChange.current = Date.now();
60 setStuck(false);
61 }
62 }
63 if (stat.type === "inbound-rtp" && mediaType === "video") {
64 const framesReceived = stat.framesReceived;
65 if (lastFramesReceived !== framesReceived) {
66 lastFramesReceived = framesReceived;
67 lastChange.current = Date.now();
68 setStuck(false);
69 }
70 }
71 });
72 if (Date.now() - lastChange.current > 2000) {
73 setStuck(true);
74 }
75 }, 200);
76
77 return () => {
78 clearInterval(handle);
79 peerConnection.close();
80 };
81 }, [endpoint]);
82 return [mediaStream, stuck];
83}
84
85/**
86 * Performs the actual SDP exchange.
87 *
88 * 1. Constructs the client's SDP offer
89 * 2. Sends the SDP offer to the server,
90 * 3. Awaits the server's offer.
91 *
92 * SDP describes what kind of media we can send and how the server and client communicate.
93 *
94 * https://developer.mozilla.org/en-US/docs/Glossary/SDP
95 * https://www.ietf.org/archive/id/draft-ietf-wish-whip-01.html#name-protocol-operation
96 */
97export async function negotiateConnectionWithClientOffer(
98 peerConnection: RTCPeerConnection,
99 endpoint: string,
100 bearerToken?: string,
101) {
102 /** https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/createOffer */
103 const offer = await peerConnection.createOffer({
104 offerToReceiveAudio: true,
105 offerToReceiveVideo: true,
106 });
107 /** https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/setLocalDescription */
108 await peerConnection.setLocalDescription(offer);
109
110 /** Wait for ICE gathering to complete */
111 let ofr = await waitToCompleteICEGathering(peerConnection);
112 if (!ofr) {
113 throw Error("failed to gather ICE candidates for offer");
114 }
115
116 /**
117 * As long as the connection is open, attempt to...
118 */
119 while (peerConnection.connectionState !== "closed") {
120 try {
121 /**
122 * This response contains the server's SDP offer.
123 * This specifies how the client should communicate,
124 * and what kind of media client and server have negotiated to exchange.
125 */
126 let response = await postSDPOffer(`${endpoint}`, ofr.sdp, bearerToken);
127 if (response.status === 201) {
128 let answerSDP = await response.text();
129 if ((peerConnection.connectionState as string) === "closed") {
130 return;
131 }
132 await peerConnection.setRemoteDescription(
133 new RTCSessionDescription({ type: "answer", sdp: answerSDP }),
134 );
135 return response.headers.get("Location");
136 } else if (response.status === 405) {
137 console.log(
138 "Remember to update the URL passed into the WHIP or WHEP client",
139 );
140 } else {
141 const errorMessage = await response.text();
142 console.error(errorMessage);
143 }
144 } catch (e) {
145 console.error(`posting sdp offer failed: ${e}`);
146 }
147
148 /** Limit reconnection attempts to at-most once every 5 seconds */
149 await new Promise((r) => setTimeout(r, 5000));
150 }
151}
152
153async function postSDPOffer(
154 endpoint: string,
155 data: string,
156 bearerToken?: string,
157) {
158 return await fetch(endpoint, {
159 method: "POST",
160 mode: "cors",
161 headers: {
162 "content-type": "application/sdp",
163 ...(bearerToken ? { Authorization: `Bearer ${bearerToken}` } : {}),
164 },
165 body: data,
166 });
167}
168
169/**
170 * Receives an RTCPeerConnection and waits until
171 * the connection is initialized or a timeout passes.
172 *
173 * https://www.ietf.org/archive/id/draft-ietf-wish-whip-01.html#section-4.1
174 * https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/iceGatheringState
175 * https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/icegatheringstatechange_event
176 */
177async function waitToCompleteICEGathering(peerConnection: RTCPeerConnection) {
178 return new Promise<RTCSessionDescription | null>((resolve) => {
179 /** Wait at most 1 second for ICE gathering. */
180 setTimeout(function () {
181 if (peerConnection.connectionState === "closed") {
182 return;
183 }
184 resolve(peerConnection.localDescription);
185 }, 1000);
186 peerConnection.addEventListener("icegatheringstatechange", (ev) => {
187 if (peerConnection.iceGatheringState === "complete") {
188 resolve(peerConnection.localDescription);
189 }
190 });
191 });
192}
193
194export function useWebRTCIngest({
195 endpoint,
196}: {
197 endpoint: string;
198}): [MediaStream | null, (mediaStream: MediaStream | null) => void] {
199 const [mediaStream, setMediaStream] = useState<MediaStream | null>(null);
200 const ingestConnectionState = usePlayerStore((x) => x.ingestConnectionState);
201 const setIngestConnectionState = usePlayerStore(
202 (x) => x.setIngestConnectionState,
203 );
204 const dispatch = useAppDispatch();
205 const storedKey = useAppSelector(selectStoredKey)?.privateKey;
206
207 const [retryTime, setRetryTime] = useState<number>(0);
208 useEffect(() => {
209 if (storedKey) {
210 return;
211 }
212 dispatch(createStreamKeyRecord({ store: true }));
213 }, [storedKey]);
214 useEffect(() => {
215 if (!mediaStream) {
216 return;
217 }
218 if (!storedKey) {
219 return;
220 }
221 console.log("creating peer connection");
222 const peerConnection = new RTCPeerConnection({
223 bundlePolicy: "max-bundle",
224 });
225 for (const track of mediaStream.getTracks()) {
226 console.log(
227 "adding track",
228 track.kind,
229 track.label,
230 track.enabled,
231 track.readyState,
232 );
233 peerConnection.addTrack(track, mediaStream);
234 }
235 peerConnection.addEventListener("connectionstatechange", (ev) => {
236 setIngestConnectionState(peerConnection.connectionState);
237 console.log("connection state change", peerConnection.connectionState);
238 if (peerConnection.connectionState === "failed") {
239 setRetryTime(Date.now());
240 }
241 });
242 peerConnection.addEventListener("negotiationneeded", (ev) => {
243 negotiateConnectionWithClientOffer(peerConnection, endpoint, storedKey);
244 });
245
246 peerConnection.addEventListener("track", (ev) => {
247 console.log(ev);
248 });
249
250 return () => {
251 peerConnection.close();
252 };
253 }, [endpoint, mediaStream, storedKey, retryTime]);
254 return [mediaStream, setMediaStream];
255}