fork of hey-api/openapi-ts because I need some additional things
1// This file is auto-generated by @hey-api/openapi-ts
2
3import type { Config } from './types.gen'
4
5export type ServerSentEventsOptions<TData = unknown> = Omit<RequestInit, 'method'> &
6 Pick<Config, 'method' | 'responseTransformer' | 'responseValidator'> & {
7 /**
8 * Fetch API implementation. You can use this option to provide a custom
9 * fetch instance.
10 *
11 * @default globalThis.fetch
12 */
13 fetch?: typeof fetch
14 /**
15 * Implementing clients can call request interceptors inside this hook.
16 */
17 onRequest?: (url: string, init: RequestInit) => Promise<Request>
18 /**
19 * Callback invoked when a network or parsing error occurs during streaming.
20 *
21 * This option applies only if the endpoint returns a stream of events.
22 *
23 * @param error The error that occurred.
24 */
25 onSseError?: (error: unknown) => void
26 /**
27 * Callback invoked when an event is streamed from the server.
28 *
29 * This option applies only if the endpoint returns a stream of events.
30 *
31 * @param event Event streamed from the server.
32 * @returns Nothing (void).
33 */
34 onSseEvent?: (event: StreamEvent<TData>) => void
35 serializedBody?: RequestInit['body']
36 /**
37 * Default retry delay in milliseconds.
38 *
39 * This option applies only if the endpoint returns a stream of events.
40 *
41 * @default 3000
42 */
43 sseDefaultRetryDelay?: number
44 /**
45 * Maximum number of retry attempts before giving up.
46 */
47 sseMaxRetryAttempts?: number
48 /**
49 * Maximum retry delay in milliseconds.
50 *
51 * Applies only when exponential backoff is used.
52 *
53 * This option applies only if the endpoint returns a stream of events.
54 *
55 * @default 30000
56 */
57 sseMaxRetryDelay?: number
58 /**
59 * Optional sleep function for retry backoff.
60 *
61 * Defaults to using `setTimeout`.
62 */
63 sseSleepFn?: (ms: number) => Promise<void>
64 url: string
65 }
66
67export interface StreamEvent<TData = unknown> {
68 data: TData
69 event?: string
70 id?: string
71 retry?: number
72}
73
74export type ServerSentEventsResult<TData = unknown, TReturn = void, TNext = unknown> = {
75 stream: AsyncGenerator<
76 TData extends Record<string, unknown> ? TData[keyof TData] : TData,
77 TReturn,
78 TNext
79 >
80}
81
82export const createSseClient = <TData = unknown>({
83 onRequest,
84 onSseError,
85 onSseEvent,
86 responseTransformer,
87 responseValidator,
88 sseDefaultRetryDelay,
89 sseMaxRetryAttempts,
90 sseMaxRetryDelay,
91 sseSleepFn,
92 url,
93 ...options
94}: ServerSentEventsOptions): ServerSentEventsResult<TData> => {
95 let lastEventId: string | undefined
96
97 const sleep = sseSleepFn ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms)))
98
99 const createStream = async function* () {
100 let retryDelay: number = sseDefaultRetryDelay ?? 3000
101 let attempt = 0
102 const signal = options.signal ?? new AbortController().signal
103
104 while (true) {
105 if (signal.aborted) break
106
107 attempt++
108
109 const headers =
110 options.headers instanceof Headers
111 ? options.headers
112 : new Headers(options.headers as Record<string, string> | undefined)
113
114 if (lastEventId !== undefined) {
115 headers.set('Last-Event-ID', lastEventId)
116 }
117
118 try {
119 const requestInit: RequestInit = {
120 redirect: 'follow',
121 ...options,
122 body: options.serializedBody,
123 headers,
124 signal
125 }
126 let request = new Request(url, requestInit)
127 if (onRequest) {
128 request = await onRequest(url, requestInit)
129 }
130 // fetch must be assigned here, otherwise it would throw the error:
131 // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation
132 const _fetch = options.fetch ?? globalThis.fetch
133 const response = await _fetch(request)
134
135 if (!response.ok) throw new Error(`SSE failed: ${response.status} ${response.statusText}`)
136
137 if (!response.body) throw new Error('No body in SSE response')
138
139 const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
140
141 let buffer = ''
142
143 const abortHandler = () => {
144 try {
145 reader.cancel()
146 } catch {
147 // noop
148 }
149 }
150
151 signal.addEventListener('abort', abortHandler)
152
153 try {
154 while (true) {
155 const { done, value } = await reader.read()
156 if (done) break
157 buffer += value
158 // Normalize line endings: CRLF -> LF, then CR -> LF
159 buffer = buffer.replace(/\r\n/g, '\n').replace(/\r/g, '\n')
160
161 const chunks = buffer.split('\n\n')
162 buffer = chunks.pop() ?? ''
163
164 for (const chunk of chunks) {
165 const lines = chunk.split('\n')
166 const dataLines: Array<string> = []
167 let eventName: string | undefined
168
169 for (const line of lines) {
170 if (line.startsWith('data:')) {
171 dataLines.push(line.replace(/^data:\s*/, ''))
172 } else if (line.startsWith('event:')) {
173 eventName = line.replace(/^event:\s*/, '')
174 } else if (line.startsWith('id:')) {
175 lastEventId = line.replace(/^id:\s*/, '')
176 } else if (line.startsWith('retry:')) {
177 const parsed = Number.parseInt(line.replace(/^retry:\s*/, ''), 10)
178 if (!Number.isNaN(parsed)) {
179 retryDelay = parsed
180 }
181 }
182 }
183
184 let data: unknown
185 let parsedJson = false
186
187 if (dataLines.length) {
188 const rawData = dataLines.join('\n')
189 try {
190 data = JSON.parse(rawData)
191 parsedJson = true
192 } catch {
193 data = rawData
194 }
195 }
196
197 if (parsedJson) {
198 if (responseValidator) {
199 await responseValidator(data)
200 }
201
202 if (responseTransformer) {
203 data = await responseTransformer(data)
204 }
205 }
206
207 onSseEvent?.({
208 data,
209 event: eventName,
210 id: lastEventId,
211 retry: retryDelay
212 })
213
214 if (dataLines.length) {
215 yield data as any
216 }
217 }
218 }
219 } finally {
220 signal.removeEventListener('abort', abortHandler)
221 reader.releaseLock()
222 }
223
224 break // exit loop on normal completion
225 } catch (error) {
226 // connection failed or aborted; retry after delay
227 onSseError?.(error)
228
229 if (sseMaxRetryAttempts !== undefined && attempt >= sseMaxRetryAttempts) {
230 break // stop after firing error
231 }
232
233 // exponential backoff: double retry each attempt, cap at 30s
234 const backoff = Math.min(retryDelay * 2 ** (attempt - 1), sseMaxRetryDelay ?? 30000)
235 await sleep(backoff)
236 }
237 }
238 }
239
240 const stream = createStream()
241
242 return { stream }
243}