fork of hey-api/openapi-ts because I need some additional things
1// This file is auto-generated by @hey-api/openapi-ts
2
3import type { Config } from './types.gen';
4
5export type ServerSentEventsOptions<TData = unknown> = Omit<RequestInit, 'method'> &
6 Pick<Config, 'method' | 'responseTransformer' | 'responseValidator'> & {
7 /**
8 * Fetch API implementation. You can use this option to provide a custom
9 * fetch instance.
10 *
11 * @default globalThis.fetch
12 */
13 fetch?: typeof fetch;
14 /**
15 * Implementing clients can call request interceptors inside this hook.
16 */
17 onRequest?: (url: string, init: RequestInit) => Promise<Request>;
18 /**
19 * Callback invoked when a network or parsing error occurs during streaming.
20 *
21 * This option applies only if the endpoint returns a stream of events.
22 *
23 * @param error The error that occurred.
24 */
25 onSseError?: (error: unknown) => void;
26 /**
27 * Callback invoked when an event is streamed from the server.
28 *
29 * This option applies only if the endpoint returns a stream of events.
30 *
31 * @param event Event streamed from the server.
32 * @returns Nothing (void).
33 */
34 onSseEvent?: (event: StreamEvent<TData>) => void;
35 serializedBody?: RequestInit['body'];
36 /**
37 * Default retry delay in milliseconds.
38 *
39 * This option applies only if the endpoint returns a stream of events.
40 *
41 * @default 3000
42 */
43 sseDefaultRetryDelay?: number;
44 /**
45 * Maximum number of retry attempts before giving up.
46 */
47 sseMaxRetryAttempts?: number;
48 /**
49 * Maximum retry delay in milliseconds.
50 *
51 * Applies only when exponential backoff is used.
52 *
53 * This option applies only if the endpoint returns a stream of events.
54 *
55 * @default 30000
56 */
57 sseMaxRetryDelay?: number;
58 /**
59 * Optional sleep function for retry backoff.
60 *
61 * Defaults to using `setTimeout`.
62 */
63 sseSleepFn?: (ms: number) => Promise<void>;
64 url: string;
65 };
66
67export interface StreamEvent<TData = unknown> {
68 data: TData;
69 event?: string;
70 id?: string;
71 retry?: number;
72}
73
74export type ServerSentEventsResult<TData = unknown, TReturn = void, TNext = unknown> = {
75 stream: AsyncGenerator<
76 TData extends Record<string, unknown> ? TData[keyof TData] : TData,
77 TReturn,
78 TNext
79 >;
80};
81
82export const createSseClient = <TData = unknown>({
83 onRequest,
84 onSseError,
85 onSseEvent,
86 responseTransformer,
87 responseValidator,
88 sseDefaultRetryDelay,
89 sseMaxRetryAttempts,
90 sseMaxRetryDelay,
91 sseSleepFn,
92 url,
93 ...options
94}: ServerSentEventsOptions): ServerSentEventsResult<TData> => {
95 let lastEventId: string | undefined;
96
97 const sleep = sseSleepFn ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms)));
98
99 const createStream = async function* () {
100 let retryDelay: number = sseDefaultRetryDelay ?? 3000;
101 let attempt = 0;
102 const signal = options.signal ?? new AbortController().signal;
103
104 while (true) {
105 if (signal.aborted) break;
106
107 attempt++;
108
109 const headers =
110 options.headers instanceof Headers
111 ? options.headers
112 : new Headers(options.headers as Record<string, string> | undefined);
113
114 if (lastEventId !== undefined) {
115 headers.set('Last-Event-ID', lastEventId);
116 }
117
118 try {
119 const requestInit: RequestInit = {
120 redirect: 'follow',
121 ...options,
122 body: options.serializedBody,
123 headers,
124 signal,
125 };
126 let request = new Request(url, requestInit);
127 if (onRequest) {
128 request = await onRequest(url, requestInit);
129 }
130 // fetch must be assigned here, otherwise it would throw the error:
131 // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation
132 const _fetch = options.fetch ?? globalThis.fetch;
133 const response = await _fetch(request);
134
135 if (!response.ok) throw new Error(`SSE failed: ${response.status} ${response.statusText}`);
136
137 if (!response.body) throw new Error('No body in SSE response');
138
139 const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
140
141 let buffer = '';
142
143 const abortHandler = () => {
144 try {
145 reader.cancel();
146 } catch {
147 // noop
148 }
149 };
150
151 signal.addEventListener('abort', abortHandler);
152
153 try {
154 while (true) {
155 const { done, value } = await reader.read();
156 if (done) break;
157 buffer += value;
158 // Normalize line endings: CRLF -> LF, then CR -> LF
159 buffer = buffer.replace(/\r\n/g, '\n').replace(/\r/g, '\n');
160
161 const chunks = buffer.split('\n\n');
162 buffer = chunks.pop() ?? '';
163
164 for (const chunk of chunks) {
165 const lines = chunk.split('\n');
166 const dataLines: Array<string> = [];
167 let eventName: string | undefined;
168
169 for (const line of lines) {
170 if (line.startsWith('data:')) {
171 dataLines.push(line.replace(/^data:\s*/, ''));
172 } else if (line.startsWith('event:')) {
173 eventName = line.replace(/^event:\s*/, '');
174 } else if (line.startsWith('id:')) {
175 lastEventId = line.replace(/^id:\s*/, '');
176 } else if (line.startsWith('retry:')) {
177 const parsed = Number.parseInt(line.replace(/^retry:\s*/, ''), 10);
178 if (!Number.isNaN(parsed)) {
179 retryDelay = parsed;
180 }
181 }
182 }
183
184 let data: unknown;
185 let parsedJson = false;
186
187 if (dataLines.length) {
188 const rawData = dataLines.join('\n');
189 try {
190 data = JSON.parse(rawData);
191 parsedJson = true;
192 } catch {
193 data = rawData;
194 }
195 }
196
197 if (parsedJson) {
198 if (responseValidator) {
199 await responseValidator(data);
200 }
201
202 if (responseTransformer) {
203 data = await responseTransformer(data);
204 }
205 }
206
207 onSseEvent?.({
208 data,
209 event: eventName,
210 id: lastEventId,
211 retry: retryDelay,
212 });
213
214 if (dataLines.length) {
215 yield data as any;
216 }
217 }
218 }
219 } finally {
220 signal.removeEventListener('abort', abortHandler);
221 reader.releaseLock();
222 }
223
224 break; // exit loop on normal completion
225 } catch (error) {
226 // connection failed or aborted; retry after delay
227 onSseError?.(error);
228
229 if (sseMaxRetryAttempts !== undefined && attempt >= sseMaxRetryAttempts) {
230 break; // stop after firing error
231 }
232
233 // exponential backoff: double retry each attempt, cap at 30s
234 const backoff = Math.min(retryDelay * 2 ** (attempt - 1), sseMaxRetryDelay ?? 30000);
235 await sleep(backoff);
236 }
237 }
238 };
239
240 const stream = createStream();
241
242 return { stream };
243};