fork of hey-api/openapi-ts because I need some additional things
1// This file is auto-generated by @hey-api/openapi-ts
2
3import type { Config } from './types.gen';
4
5export type ServerSentEventsOptions<TData = unknown> = Omit<
6 RequestInit,
7 'method'
8> &
9 Pick<Config, 'method' | 'responseTransformer' | 'responseValidator'> & {
10 /**
11 * Fetch API implementation. You can use this option to provide a custom
12 * fetch instance.
13 *
14 * @default globalThis.fetch
15 */
16 fetch?: typeof fetch;
17 /**
18 * Implementing clients can call request interceptors inside this hook.
19 */
20 onRequest?: (url: string, init: RequestInit) => Promise<Request>;
21 /**
22 * Callback invoked when a network or parsing error occurs during streaming.
23 *
24 * This option applies only if the endpoint returns a stream of events.
25 *
26 * @param error The error that occurred.
27 */
28 onSseError?: (error: unknown) => void;
29 /**
30 * Callback invoked when an event is streamed from the server.
31 *
32 * This option applies only if the endpoint returns a stream of events.
33 *
34 * @param event Event streamed from the server.
35 * @returns Nothing (void).
36 */
37 onSseEvent?: (event: StreamEvent<TData>) => void;
38 serializedBody?: RequestInit['body'];
39 /**
40 * Default retry delay in milliseconds.
41 *
42 * This option applies only if the endpoint returns a stream of events.
43 *
44 * @default 3000
45 */
46 sseDefaultRetryDelay?: number;
47 /**
48 * Maximum number of retry attempts before giving up.
49 */
50 sseMaxRetryAttempts?: number;
51 /**
52 * Maximum retry delay in milliseconds.
53 *
54 * Applies only when exponential backoff is used.
55 *
56 * This option applies only if the endpoint returns a stream of events.
57 *
58 * @default 30000
59 */
60 sseMaxRetryDelay?: number;
61 /**
62 * Optional sleep function for retry backoff.
63 *
64 * Defaults to using `setTimeout`.
65 */
66 sseSleepFn?: (ms: number) => Promise<void>;
67 url: string;
68 };
69
70export interface StreamEvent<TData = unknown> {
71 data: TData;
72 event?: string;
73 id?: string;
74 retry?: number;
75}
76
77export type ServerSentEventsResult<
78 TData = unknown,
79 TReturn = void,
80 TNext = unknown,
81> = {
82 stream: AsyncGenerator<
83 TData extends Record<string, unknown> ? TData[keyof TData] : TData,
84 TReturn,
85 TNext
86 >;
87};
88
89export const createSseClient = <TData = unknown>({
90 onRequest,
91 onSseError,
92 onSseEvent,
93 responseTransformer,
94 responseValidator,
95 sseDefaultRetryDelay,
96 sseMaxRetryAttempts,
97 sseMaxRetryDelay,
98 sseSleepFn,
99 url,
100 ...options
101}: ServerSentEventsOptions): ServerSentEventsResult<TData> => {
102 let lastEventId: string | undefined;
103
104 const sleep =
105 sseSleepFn ??
106 ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms)));
107
108 const createStream = async function* () {
109 let retryDelay: number = sseDefaultRetryDelay ?? 3000;
110 let attempt = 0;
111 const signal = options.signal ?? new AbortController().signal;
112
113 while (true) {
114 if (signal.aborted) break;
115
116 attempt++;
117
118 const headers =
119 options.headers instanceof Headers
120 ? options.headers
121 : new Headers(options.headers as Record<string, string> | undefined);
122
123 if (lastEventId !== undefined) {
124 headers.set('Last-Event-ID', lastEventId);
125 }
126
127 try {
128 const requestInit: RequestInit = {
129 redirect: 'follow',
130 ...options,
131 body: options.serializedBody,
132 headers,
133 signal,
134 };
135 let request = new Request(url, requestInit);
136 if (onRequest) {
137 request = await onRequest(url, requestInit);
138 }
139 // fetch must be assigned here, otherwise it would throw the error:
140 // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation
141 const _fetch = options.fetch ?? globalThis.fetch;
142 const response = await _fetch(request);
143
144 if (!response.ok)
145 throw new Error(
146 `SSE failed: ${response.status} ${response.statusText}`,
147 );
148
149 if (!response.body) throw new Error('No body in SSE response');
150
151 const reader = response.body
152 .pipeThrough(new TextDecoderStream())
153 .getReader();
154
155 let buffer = '';
156
157 const abortHandler = () => {
158 try {
159 reader.cancel();
160 } catch {
161 // noop
162 }
163 };
164
165 signal.addEventListener('abort', abortHandler);
166
167 try {
168 while (true) {
169 const { done, value } = await reader.read();
170 if (done) break;
171 buffer += value;
172 // Normalize line endings: CRLF -> LF, then CR -> LF
173 buffer = buffer.replace(/\r\n/g, '\n').replace(/\r/g, '\n');
174
175 const chunks = buffer.split('\n\n');
176 buffer = chunks.pop() ?? '';
177
178 for (const chunk of chunks) {
179 const lines = chunk.split('\n');
180 const dataLines: Array<string> = [];
181 let eventName: string | undefined;
182
183 for (const line of lines) {
184 if (line.startsWith('data:')) {
185 dataLines.push(line.replace(/^data:\s*/, ''));
186 } else if (line.startsWith('event:')) {
187 eventName = line.replace(/^event:\s*/, '');
188 } else if (line.startsWith('id:')) {
189 lastEventId = line.replace(/^id:\s*/, '');
190 } else if (line.startsWith('retry:')) {
191 const parsed = Number.parseInt(
192 line.replace(/^retry:\s*/, ''),
193 10,
194 );
195 if (!Number.isNaN(parsed)) {
196 retryDelay = parsed;
197 }
198 }
199 }
200
201 let data: unknown;
202 let parsedJson = false;
203
204 if (dataLines.length) {
205 const rawData = dataLines.join('\n');
206 try {
207 data = JSON.parse(rawData);
208 parsedJson = true;
209 } catch {
210 data = rawData;
211 }
212 }
213
214 if (parsedJson) {
215 if (responseValidator) {
216 await responseValidator(data);
217 }
218
219 if (responseTransformer) {
220 data = await responseTransformer(data);
221 }
222 }
223
224 onSseEvent?.({
225 data,
226 event: eventName,
227 id: lastEventId,
228 retry: retryDelay,
229 });
230
231 if (dataLines.length) {
232 yield data as any;
233 }
234 }
235 }
236 } finally {
237 signal.removeEventListener('abort', abortHandler);
238 reader.releaseLock();
239 }
240
241 break; // exit loop on normal completion
242 } catch (error) {
243 // connection failed or aborted; retry after delay
244 onSseError?.(error);
245
246 if (
247 sseMaxRetryAttempts !== undefined &&
248 attempt >= sseMaxRetryAttempts
249 ) {
250 break; // stop after firing error
251 }
252
253 // exponential backoff: double retry each attempt, cap at 30s
254 const backoff = Math.min(
255 retryDelay * 2 ** (attempt - 1),
256 sseMaxRetryDelay ?? 30000,
257 );
258 await sleep(backoff);
259 }
260 }
261 };
262
263 const stream = createStream();
264
265 return { stream };
266};