fork of hey-api/openapi-ts because I need some additional things
at main 243 lines 7.2 kB view raw
1// This file is auto-generated by @hey-api/openapi-ts 2 3import type { Config } from './types.gen' 4 5export type ServerSentEventsOptions<TData = unknown> = Omit<RequestInit, 'method'> & 6 Pick<Config, 'method' | 'responseTransformer' | 'responseValidator'> & { 7 /** 8 * Fetch API implementation. You can use this option to provide a custom 9 * fetch instance. 10 * 11 * @default globalThis.fetch 12 */ 13 fetch?: typeof fetch 14 /** 15 * Implementing clients can call request interceptors inside this hook. 16 */ 17 onRequest?: (url: string, init: RequestInit) => Promise<Request> 18 /** 19 * Callback invoked when a network or parsing error occurs during streaming. 20 * 21 * This option applies only if the endpoint returns a stream of events. 22 * 23 * @param error The error that occurred. 24 */ 25 onSseError?: (error: unknown) => void 26 /** 27 * Callback invoked when an event is streamed from the server. 28 * 29 * This option applies only if the endpoint returns a stream of events. 30 * 31 * @param event Event streamed from the server. 32 * @returns Nothing (void). 33 */ 34 onSseEvent?: (event: StreamEvent<TData>) => void 35 serializedBody?: RequestInit['body'] 36 /** 37 * Default retry delay in milliseconds. 38 * 39 * This option applies only if the endpoint returns a stream of events. 40 * 41 * @default 3000 42 */ 43 sseDefaultRetryDelay?: number 44 /** 45 * Maximum number of retry attempts before giving up. 46 */ 47 sseMaxRetryAttempts?: number 48 /** 49 * Maximum retry delay in milliseconds. 50 * 51 * Applies only when exponential backoff is used. 52 * 53 * This option applies only if the endpoint returns a stream of events. 54 * 55 * @default 30000 56 */ 57 sseMaxRetryDelay?: number 58 /** 59 * Optional sleep function for retry backoff. 60 * 61 * Defaults to using `setTimeout`. 62 */ 63 sseSleepFn?: (ms: number) => Promise<void> 64 url: string 65 } 66 67export interface StreamEvent<TData = unknown> { 68 data: TData 69 event?: string 70 id?: string 71 retry?: number 72} 73 74export type ServerSentEventsResult<TData = unknown, TReturn = void, TNext = unknown> = { 75 stream: AsyncGenerator< 76 TData extends Record<string, unknown> ? TData[keyof TData] : TData, 77 TReturn, 78 TNext 79 > 80} 81 82export const createSseClient = <TData = unknown>({ 83 onRequest, 84 onSseError, 85 onSseEvent, 86 responseTransformer, 87 responseValidator, 88 sseDefaultRetryDelay, 89 sseMaxRetryAttempts, 90 sseMaxRetryDelay, 91 sseSleepFn, 92 url, 93 ...options 94}: ServerSentEventsOptions): ServerSentEventsResult<TData> => { 95 let lastEventId: string | undefined 96 97 const sleep = sseSleepFn ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms))) 98 99 const createStream = async function* () { 100 let retryDelay: number = sseDefaultRetryDelay ?? 3000 101 let attempt = 0 102 const signal = options.signal ?? new AbortController().signal 103 104 while (true) { 105 if (signal.aborted) break 106 107 attempt++ 108 109 const headers = 110 options.headers instanceof Headers 111 ? options.headers 112 : new Headers(options.headers as Record<string, string> | undefined) 113 114 if (lastEventId !== undefined) { 115 headers.set('Last-Event-ID', lastEventId) 116 } 117 118 try { 119 const requestInit: RequestInit = { 120 redirect: 'follow', 121 ...options, 122 body: options.serializedBody, 123 headers, 124 signal 125 } 126 let request = new Request(url, requestInit) 127 if (onRequest) { 128 request = await onRequest(url, requestInit) 129 } 130 // fetch must be assigned here, otherwise it would throw the error: 131 // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation 132 const _fetch = options.fetch ?? globalThis.fetch 133 const response = await _fetch(request) 134 135 if (!response.ok) throw new Error(`SSE failed: ${response.status} ${response.statusText}`) 136 137 if (!response.body) throw new Error('No body in SSE response') 138 139 const reader = response.body.pipeThrough(new TextDecoderStream()).getReader() 140 141 let buffer = '' 142 143 const abortHandler = () => { 144 try { 145 reader.cancel() 146 } catch { 147 // noop 148 } 149 } 150 151 signal.addEventListener('abort', abortHandler) 152 153 try { 154 while (true) { 155 const { done, value } = await reader.read() 156 if (done) break 157 buffer += value 158 // Normalize line endings: CRLF -> LF, then CR -> LF 159 buffer = buffer.replace(/\r\n/g, '\n').replace(/\r/g, '\n') 160 161 const chunks = buffer.split('\n\n') 162 buffer = chunks.pop() ?? '' 163 164 for (const chunk of chunks) { 165 const lines = chunk.split('\n') 166 const dataLines: Array<string> = [] 167 let eventName: string | undefined 168 169 for (const line of lines) { 170 if (line.startsWith('data:')) { 171 dataLines.push(line.replace(/^data:\s*/, '')) 172 } else if (line.startsWith('event:')) { 173 eventName = line.replace(/^event:\s*/, '') 174 } else if (line.startsWith('id:')) { 175 lastEventId = line.replace(/^id:\s*/, '') 176 } else if (line.startsWith('retry:')) { 177 const parsed = Number.parseInt(line.replace(/^retry:\s*/, ''), 10) 178 if (!Number.isNaN(parsed)) { 179 retryDelay = parsed 180 } 181 } 182 } 183 184 let data: unknown 185 let parsedJson = false 186 187 if (dataLines.length) { 188 const rawData = dataLines.join('\n') 189 try { 190 data = JSON.parse(rawData) 191 parsedJson = true 192 } catch { 193 data = rawData 194 } 195 } 196 197 if (parsedJson) { 198 if (responseValidator) { 199 await responseValidator(data) 200 } 201 202 if (responseTransformer) { 203 data = await responseTransformer(data) 204 } 205 } 206 207 onSseEvent?.({ 208 data, 209 event: eventName, 210 id: lastEventId, 211 retry: retryDelay 212 }) 213 214 if (dataLines.length) { 215 yield data as any 216 } 217 } 218 } 219 } finally { 220 signal.removeEventListener('abort', abortHandler) 221 reader.releaseLock() 222 } 223 224 break // exit loop on normal completion 225 } catch (error) { 226 // connection failed or aborted; retry after delay 227 onSseError?.(error) 228 229 if (sseMaxRetryAttempts !== undefined && attempt >= sseMaxRetryAttempts) { 230 break // stop after firing error 231 } 232 233 // exponential backoff: double retry each attempt, cap at 30s 234 const backoff = Math.min(retryDelay * 2 ** (attempt - 1), sseMaxRetryDelay ?? 30000) 235 await sleep(backoff) 236 } 237 } 238 } 239 240 const stream = createStream() 241 242 return { stream } 243}