fork of hey-api/openapi-ts because I need some additional things
at main 7.5 kB view raw
1// This file is auto-generated by @hey-api/openapi-ts 2 3import type { Config } from './types.gen'; 4 5export type ServerSentEventsOptions<TData = unknown> = Omit< 6 RequestInit, 7 'method' 8> & 9 Pick<Config, 'method' | 'responseTransformer' | 'responseValidator'> & { 10 /** 11 * Fetch API implementation. You can use this option to provide a custom 12 * fetch instance. 13 * 14 * @default globalThis.fetch 15 */ 16 fetch?: typeof fetch; 17 /** 18 * Implementing clients can call request interceptors inside this hook. 19 */ 20 onRequest?: (url: string, init: RequestInit) => Promise<Request>; 21 /** 22 * Callback invoked when a network or parsing error occurs during streaming. 23 * 24 * This option applies only if the endpoint returns a stream of events. 25 * 26 * @param error The error that occurred. 27 */ 28 onSseError?: (error: unknown) => void; 29 /** 30 * Callback invoked when an event is streamed from the server. 31 * 32 * This option applies only if the endpoint returns a stream of events. 33 * 34 * @param event Event streamed from the server. 35 * @returns Nothing (void). 36 */ 37 onSseEvent?: (event: StreamEvent<TData>) => void; 38 serializedBody?: RequestInit['body']; 39 /** 40 * Default retry delay in milliseconds. 41 * 42 * This option applies only if the endpoint returns a stream of events. 43 * 44 * @default 3000 45 */ 46 sseDefaultRetryDelay?: number; 47 /** 48 * Maximum number of retry attempts before giving up. 49 */ 50 sseMaxRetryAttempts?: number; 51 /** 52 * Maximum retry delay in milliseconds. 53 * 54 * Applies only when exponential backoff is used. 55 * 56 * This option applies only if the endpoint returns a stream of events. 57 * 58 * @default 30000 59 */ 60 sseMaxRetryDelay?: number; 61 /** 62 * Optional sleep function for retry backoff. 63 * 64 * Defaults to using `setTimeout`. 65 */ 66 sseSleepFn?: (ms: number) => Promise<void>; 67 url: string; 68 }; 69 70export interface StreamEvent<TData = unknown> { 71 data: TData; 72 event?: string; 73 id?: string; 74 retry?: number; 75} 76 77export type ServerSentEventsResult< 78 TData = unknown, 79 TReturn = void, 80 TNext = unknown, 81> = { 82 stream: AsyncGenerator< 83 TData extends Record<string, unknown> ? TData[keyof TData] : TData, 84 TReturn, 85 TNext 86 >; 87}; 88 89export const createSseClient = <TData = unknown>({ 90 onRequest, 91 onSseError, 92 onSseEvent, 93 responseTransformer, 94 responseValidator, 95 sseDefaultRetryDelay, 96 sseMaxRetryAttempts, 97 sseMaxRetryDelay, 98 sseSleepFn, 99 url, 100 ...options 101}: ServerSentEventsOptions): ServerSentEventsResult<TData> => { 102 let lastEventId: string | undefined; 103 104 const sleep = 105 sseSleepFn ?? 106 ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms))); 107 108 const createStream = async function* () { 109 let retryDelay: number = sseDefaultRetryDelay ?? 3000; 110 let attempt = 0; 111 const signal = options.signal ?? new AbortController().signal; 112 113 while (true) { 114 if (signal.aborted) break; 115 116 attempt++; 117 118 const headers = 119 options.headers instanceof Headers 120 ? options.headers 121 : new Headers(options.headers as Record<string, string> | undefined); 122 123 if (lastEventId !== undefined) { 124 headers.set('Last-Event-ID', lastEventId); 125 } 126 127 try { 128 const requestInit: RequestInit = { 129 redirect: 'follow', 130 ...options, 131 body: options.serializedBody, 132 headers, 133 signal, 134 }; 135 let request = new Request(url, requestInit); 136 if (onRequest) { 137 request = await onRequest(url, requestInit); 138 } 139 // fetch must be assigned here, otherwise it would throw the error: 140 // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation 141 const _fetch = options.fetch ?? globalThis.fetch; 142 const response = await _fetch(request); 143 144 if (!response.ok) 145 throw new Error( 146 `SSE failed: ${response.status} ${response.statusText}`, 147 ); 148 149 if (!response.body) throw new Error('No body in SSE response'); 150 151 const reader = response.body 152 .pipeThrough(new TextDecoderStream()) 153 .getReader(); 154 155 let buffer = ''; 156 157 const abortHandler = () => { 158 try { 159 reader.cancel(); 160 } catch { 161 // noop 162 } 163 }; 164 165 signal.addEventListener('abort', abortHandler); 166 167 try { 168 while (true) { 169 const { done, value } = await reader.read(); 170 if (done) break; 171 buffer += value; 172 // Normalize line endings: CRLF -> LF, then CR -> LF 173 buffer = buffer.replace(/\r\n/g, '\n').replace(/\r/g, '\n'); 174 175 const chunks = buffer.split('\n\n'); 176 buffer = chunks.pop() ?? ''; 177 178 for (const chunk of chunks) { 179 const lines = chunk.split('\n'); 180 const dataLines: Array<string> = []; 181 let eventName: string | undefined; 182 183 for (const line of lines) { 184 if (line.startsWith('data:')) { 185 dataLines.push(line.replace(/^data:\s*/, '')); 186 } else if (line.startsWith('event:')) { 187 eventName = line.replace(/^event:\s*/, ''); 188 } else if (line.startsWith('id:')) { 189 lastEventId = line.replace(/^id:\s*/, ''); 190 } else if (line.startsWith('retry:')) { 191 const parsed = Number.parseInt( 192 line.replace(/^retry:\s*/, ''), 193 10, 194 ); 195 if (!Number.isNaN(parsed)) { 196 retryDelay = parsed; 197 } 198 } 199 } 200 201 let data: unknown; 202 let parsedJson = false; 203 204 if (dataLines.length) { 205 const rawData = dataLines.join('\n'); 206 try { 207 data = JSON.parse(rawData); 208 parsedJson = true; 209 } catch { 210 data = rawData; 211 } 212 } 213 214 if (parsedJson) { 215 if (responseValidator) { 216 await responseValidator(data); 217 } 218 219 if (responseTransformer) { 220 data = await responseTransformer(data); 221 } 222 } 223 224 onSseEvent?.({ 225 data, 226 event: eventName, 227 id: lastEventId, 228 retry: retryDelay, 229 }); 230 231 if (dataLines.length) { 232 yield data as any; 233 } 234 } 235 } 236 } finally { 237 signal.removeEventListener('abort', abortHandler); 238 reader.releaseLock(); 239 } 240 241 break; // exit loop on normal completion 242 } catch (error) { 243 // connection failed or aborted; retry after delay 244 onSseError?.(error); 245 246 if ( 247 sseMaxRetryAttempts !== undefined && 248 attempt >= sseMaxRetryAttempts 249 ) { 250 break; // stop after firing error 251 } 252 253 // exponential backoff: double retry each attempt, cap at 30s 254 const backoff = Math.min( 255 retryDelay * 2 ** (attempt - 1), 256 sseMaxRetryDelay ?? 30000, 257 ); 258 await sleep(backoff); 259 } 260 } 261 }; 262 263 const stream = createStream(); 264 265 return { stream }; 266};