batch individual queries into one single request jsr.io/@mary/batch-fetch
typescript jsr
at trunk 7.2 kB view raw
1type Promisable<T> = T | Promise<T>; 2 3/** identifies a resource */ 4export type ResourceId = string | number; 5 6type BatchedFetchMap<Id extends ResourceId, Resource, Query = Id> = { 7 /** grouping key */ 8 key: string | number | undefined; 9 /** timer for batch execution */ 10 timeout: ReturnType<typeof setTimeout> | undefined; 11 /** controls the lifecycle of this batch */ 12 controller: AbortController; 13 /** a map of pending queries */ 14 pending: Map< 15 Id, 16 { 17 /** original query descriptor */ 18 query: Query; 19 /** promise that will resolve with the resource */ 20 deferred: PromiseWithResolvers<Resource>; 21 /** whether we have a consumer without an abort signal */ 22 passive: boolean; 23 /** amount of consumers with an abort signal */ 24 signals: number; 25 } 26 >; 27}; 28 29type BaseOptions<Id extends ResourceId, Resource, Query = Id> = { 30 /** 31 * maximum number of queries that can be included in one request 32 */ 33 limit: number; 34 35 /** 36 * how long to wait for new queries to be collected before a request 37 * @default 125 38 */ 39 timeout?: number; 40 41 /** 42 * performs the request 43 * @param queries queries passed for this batch 44 * @param signal abort signal 45 * @returns array of resources 46 */ 47 fetch: (queries: Query[], signal: AbortSignal) => Promisable<Resource[]>; 48 49 /** 50 * optional function for separating queries into different batch 51 * @param query query descriptor 52 * @returns batch grouping key 53 */ 54 key?: (query: Query) => string | number; 55 56 /** 57 * function that takes in the resource's identifier, used to associate 58 * resources with the queries 59 * @param resource resource 60 * @returns resource identifier 61 */ 62 idFromResource: (resource: Resource) => Id; 63}; 64 65/** options for batch fetching */ 66export type BatchedFetchOptions<Id extends ResourceId, Resource, Query = Id> = 67 & BaseOptions<Id, Resource, Query> 68 & (Query extends Id ? { 69 /** 70 * function that takes in the resource identifier from the query descriptor, 71 * used for deduplication and resource matching. 72 * @param query query descriptor 73 * @returns resource identifier 74 */ 75 idFromQuery?: (query: Query) => Id; 76 } 77 : { 78 /** 79 * function that takes in the resource identifier from the query descriptor, 80 * used for deduplication and resource matching. 81 * @param query query descriptor 82 * @returns resource identifier 83 */ 84 idFromQuery: (query: Query) => Id; 85 }); 86 87/** error thrown when a resource wasn't returned in the response */ 88export class ResourceMissingError extends Error { 89 override readonly name = 'ResourceMissingError'; 90} 91 92const identity = <T>(value: T): T => value; 93 94/** 95 * creates a function that batches individual queries into one single request. 96 * @param options configurations 97 * @returns a function that you can use to request for a query. 98 */ 99/*#__NO_SIDE_EFFECTS__*/ 100export const createBatchedFetch = <Id extends ResourceId, Resource, Query = Id>( 101 options: BatchedFetchOptions<Id, Resource, Query>, 102): (query: Query, signal?: AbortSignal) => Promise<Resource> => { 103 const { 104 limit, 105 timeout = 125, 106 fetch, 107 key: _key, 108 idFromQuery = identity, 109 idFromResource, 110 } = options; 111 112 /** current active batch */ 113 let curr: BatchedFetchMap<Id, Resource, Query> | undefined; 114 115 return (query: Query, signal?: AbortSignal): Promise<Resource> => { 116 // throw early if provided signal is already aborted 117 signal?.throwIfAborted(); 118 119 const id = idFromQuery(query); 120 const key = _key?.(query); 121 122 // create a new batch if: 123 // - we don't have a batch currently waiting 124 // - the current batch has already reached the limit 125 // - batch key doesn't match 126 let batch = curr; 127 if (batch === undefined || batch.pending.size >= limit || batch.key !== key) { 128 batch = curr = { 129 key, 130 timeout: undefined, 131 controller: new AbortController(), 132 pending: new Map(), 133 }; 134 } 135 136 let meta = batch.pending.get(id); 137 if (meta === undefined) { 138 meta = { 139 query: query, 140 deferred: Promise.withResolvers(), 141 passive: false, 142 signals: 0, 143 }; 144 145 batch.pending.set(id, meta); 146 } 147 148 let promise = meta.deferred.promise; 149 150 if (signal === undefined) { 151 // this consumer provided no signal, so we can't consider this query for 152 // removal if a different consumer has aborted theirs. 153 meta.passive = true; 154 } else { 155 // we need the returned promise to resolve early if the signal is aborted. 156 // so we'll race it with this deferred that will only throw. 157 const def = Promise.withResolvers<never>(); 158 promise = Promise.race([promise, def.promise]); 159 160 // make this signal count 161 meta.signals++; 162 163 signal.addEventListener( 164 'abort', 165 () => { 166 // immediately reject this consumer's promise 167 def.reject(signal.reason); 168 169 // decrement the count 170 meta.signals--; 171 172 // return early, have the query remain in batch if: 173 // - we have passive consumers waiting on this query 174 // - there are still other consumers with an abort signal waiting 175 if (meta.passive || meta.signals > 0) { 176 return; 177 } 178 179 // no more consumers care about this query, remove from batch 180 batch.pending.delete(id); 181 182 // return early, have the batch continue execution if we still need 183 // to process other queries. 184 if (batch.pending.size > 0) { 185 return; 186 } 187 188 // batch is empty, clean up completely 189 batch.controller.abort(); 190 clearTimeout(batch.timeout); 191 192 if (curr === batch) { 193 curr = undefined; 194 } 195 }, 196 { 197 once: true, 198 signal: batch.controller.signal, 199 }, 200 ); 201 } 202 203 { 204 // reset the execution timer 205 clearTimeout(batch.timeout); 206 207 batch.timeout = setTimeout(() => { 208 if (curr === batch) { 209 curr = undefined; 210 } 211 212 perform(batch, fetch, idFromResource); 213 }, timeout); 214 } 215 216 return promise; 217 }; 218}; 219 220const perform = async <Id extends ResourceId, Resource, Query = Id>( 221 map: BatchedFetchMap<Id, Resource, Query>, 222 fetch: (queries: Query[], signal: AbortSignal) => Promisable<Resource[]>, 223 idFromResource: (data: Resource) => Id, 224) => { 225 const signal = map.controller.signal; 226 if (signal.aborted) { 227 return; 228 } 229 230 const pending = map.pending; 231 if (pending.size === 0) { 232 // theoretically this should only be empty if the whole-batch signal is 233 // aborted, but better be safe. 234 return; 235 } 236 237 let errored = false; 238 239 try { 240 const queries = Array.from(pending.values(), (meta) => meta.query); 241 const dataset = await fetch(queries, signal); 242 243 for (const data of dataset) { 244 const id = idFromResource(data); 245 const meta = pending.get(id); 246 247 meta?.deferred.resolve(data); 248 } 249 } catch (error) { 250 errored = true; 251 252 for (const meta of pending.values()) { 253 meta.deferred.reject(error); 254 } 255 } finally { 256 if (!errored) { 257 // we've succeeded! we're iterating the pending map again to boot 258 // unresolved promises, else they'll end up waiting forever. 259 // 260 // this should only apply for scenarios where the caller/API handles 261 // nonexistent data by omitting it entirely from the results. 262 for (const meta of pending.values()) { 263 meta.deferred.reject(new ResourceMissingError()); 264 } 265 } 266 } 267 268 // abort the controller to clean up event listeners to upstream signals 269 map.controller.abort(); 270};