batch individual queries into one single request jsr.io/@mary/batch-fetch
typescript jsr

initial commit

mary.my.id 8396fcdc

+5
.vscode/extensions.json
··· 1 + { 2 + "recommendations": [ 3 + "denoland.vscode-deno" 4 + ] 5 + }
+4
.vscode/settings.json
··· 1 + { 2 + "editor.defaultFormatter": "denoland.vscode-deno", 3 + "deno.enable": true 4 + }
+28
LICENSE
··· 1 + BSD 3-Clause License 2 + 3 + Copyright (c) 2025, Mary 4 + 5 + Redistribution and use in source and binary forms, with or without 6 + modification, are permitted provided that the following conditions are met: 7 + 8 + 1. Redistributions of source code must retain the above copyright notice, this 9 + list of conditions and the following disclaimer. 10 + 11 + 2. Redistributions in binary form must reproduce the above copyright notice, 12 + this list of conditions and the following disclaimer in the documentation 13 + and/or other materials provided with the distribution. 14 + 15 + 3. Neither the name of the copyright holder nor the names of its 16 + contributors may be used to endorse or promote products derived from 17 + this software without specific prior written permission. 18 + 19 + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 20 + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 21 + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 22 + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 23 + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 24 + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 25 + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 26 + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 27 + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+35
README.md
··· 1 + # batch-fetch 2 + 3 + utility for batching individual queries into one single request. 4 + 5 + ```ts 6 + type UserId = ReturnType<typeof crypto.randomUUID>; 7 + interface User { 8 + id: UserId; 9 + name: string; 10 + } 11 + 12 + const fetchUser = createBatchedFetch<UserId, User>({ 13 + limit: 50, 14 + async fetch(userIds, signal) { 15 + const url = new URL(`/api/users`, location.origin); 16 + for (const id of userIds) { 17 + url.searchParams.append('ids', id); 18 + } 19 + 20 + const response = await fetch(url, { signal }); 21 + 22 + return await response.json() as unknown as User[]; 23 + }, 24 + idFromResource: (user) => user.id, 25 + }); 26 + 27 + // these individual queries will be batched into one 28 + { 29 + const p1 = fetchUser('c83e7271-c865-4eae-8c6a-d6f21acb17c1'); 30 + const p2 = fetchUser('306e38a2-bea6-4bf7-80ea-21822aa24c40'); 31 + 32 + const [user1, user2] = await Promise.all([p1, p2]); 33 + // ... 34 + } 35 + ```
+16
deno.json
··· 1 + { 2 + "name": "@mary/batch-fetch", 3 + "version": "0.1.0", 4 + "license": "BSD-3-Clause", 5 + "exports": "./lib/mod.ts", 6 + "fmt": { 7 + "useTabs": true, 8 + "indentWidth": 2, 9 + "lineWidth": 110, 10 + "semiColons": true, 11 + "singleQuote": true 12 + }, 13 + "publish": { 14 + "include": ["lib/", "LICENSE", "README.md", "deno.json"] 15 + } 16 + }
+270
lib/mod.ts
··· 1 + type Promisable<T> = T | Promise<T>; 2 + 3 + /** identifies a resource */ 4 + export type ResourceId = string | number; 5 + 6 + type BatchedFetchMap<Id extends ResourceId, Resource, Query = Id> = { 7 + /** grouping key */ 8 + key: string | number | undefined; 9 + /** timer for batch execution */ 10 + timeout: ReturnType<typeof setTimeout> | undefined; 11 + /** controls the lifecycle of this batch */ 12 + controller: AbortController; 13 + /** a map of pending queries */ 14 + pending: Map< 15 + Id, 16 + { 17 + /** original query descriptor */ 18 + query: Query; 19 + /** promise that will resolve with the resource */ 20 + deferred: PromiseWithResolvers<Resource>; 21 + /** whether we have a consumer without an abort signal */ 22 + passive: boolean; 23 + /** amount of consumers with an abort signal */ 24 + signals: number; 25 + } 26 + >; 27 + }; 28 + 29 + type BaseOptions<Id extends ResourceId, Resource, Query = Id> = { 30 + /** 31 + * maximum number of queries that can be included in one request 32 + */ 33 + limit: number; 34 + 35 + /** 36 + * how long to wait for new queries to be collected before a request 37 + * @default 125 38 + */ 39 + timeout?: number; 40 + 41 + /** 42 + * performs the request 43 + * @param queries queries passed for this batch 44 + * @param signal abort signal 45 + * @returns array of resources 46 + */ 47 + fetch: (queries: Query[], signal: AbortSignal) => Promisable<Resource[]>; 48 + 49 + /** 50 + * optional function for separating queries into different batch 51 + * @param query query descriptor 52 + * @returns batch grouping key 53 + */ 54 + key?: (query: Query) => string | number; 55 + 56 + /** 57 + * function that takes in the resource's identifier, used to associate 58 + * resources with the queries 59 + * @param resource resource 60 + * @returns resource identifier 61 + */ 62 + idFromResource: (resource: Resource) => Id; 63 + }; 64 + 65 + /** options for batch fetching */ 66 + export type BatchedFetchOptions<Id extends ResourceId, Resource, Query = Id> = 67 + & BaseOptions<Id, Resource, Query> 68 + & (Query extends Id ? { 69 + /** 70 + * function that takes in the resource identifier from the query descriptor, 71 + * used for deduplication and resource matching. 72 + * @param query query descriptor 73 + * @returns resource identifier 74 + */ 75 + idFromQuery?: (query: Query) => Id; 76 + } 77 + : { 78 + /** 79 + * function that takes in the resource identifier from the query descriptor, 80 + * used for deduplication and resource matching. 81 + * @param query query descriptor 82 + * @returns resource identifier 83 + */ 84 + idFromQuery: (query: Query) => Id; 85 + }); 86 + 87 + /** error thrown when a resource wasn't returned in the response */ 88 + export class ResourceMissingError extends Error { 89 + override readonly name = 'ResourceMissingError'; 90 + } 91 + 92 + const identity = <T>(value: T): T => value; 93 + 94 + /** 95 + * creates a function that batches individual queries into one single request. 96 + * @param options configurations 97 + * @returns a function that you can use to request for a query. 98 + */ 99 + /*#__NO_SIDE_EFFECTS__*/ 100 + export const createBatchedFetch = <Id extends ResourceId, Resource, Query = Id>( 101 + options: BatchedFetchOptions<Id, Resource, Query>, 102 + ): (query: Query, signal?: AbortSignal) => Promise<Resource> => { 103 + const { 104 + limit, 105 + timeout = 125, 106 + fetch, 107 + key: _key, 108 + idFromQuery = identity, 109 + idFromResource, 110 + } = options; 111 + 112 + /** current active batch */ 113 + let curr: BatchedFetchMap<Id, Resource, Query> | undefined; 114 + 115 + return (query: Query, signal?: AbortSignal): Promise<Resource> => { 116 + // throw early if provided signal is already aborted 117 + signal?.throwIfAborted(); 118 + 119 + const id = idFromQuery(query); 120 + const key = _key?.(query); 121 + 122 + // create a new batch if: 123 + // - we don't have a batch currently waiting 124 + // - the current batch has already reached the limit 125 + // - batch key doesn't match 126 + let batch = curr; 127 + if (batch === undefined || batch.pending.size >= limit || batch.key !== key) { 128 + batch = curr = { 129 + key, 130 + timeout: undefined, 131 + controller: new AbortController(), 132 + pending: new Map(), 133 + }; 134 + } 135 + 136 + let meta = batch.pending.get(id); 137 + if (meta === undefined) { 138 + meta = { 139 + query: query, 140 + deferred: Promise.withResolvers(), 141 + passive: false, 142 + signals: 0, 143 + }; 144 + 145 + batch.pending.set(id, meta); 146 + } 147 + 148 + let promise = meta.deferred.promise; 149 + 150 + if (signal === undefined) { 151 + // this consumer provided no signal, so we can't consider this query for 152 + // removal if a different consumer has aborted theirs. 153 + meta.passive = true; 154 + } else { 155 + // we need the returned promise to resolve early if the signal is aborted. 156 + // so we'll race it with this deferred that will only throw. 157 + const def = Promise.withResolvers<never>(); 158 + promise = Promise.race([promise, def.promise]); 159 + 160 + // make this signal count 161 + meta.signals++; 162 + 163 + signal.addEventListener( 164 + 'abort', 165 + () => { 166 + // immediately reject this consumer's promise 167 + def.reject(signal.reason); 168 + 169 + // decrement the count 170 + meta.signals--; 171 + 172 + // return early, have the query remain in batch if: 173 + // - we have passive consumers waiting on this query 174 + // - there are still other consumers with an abort signal waiting 175 + if (meta.passive || meta.signals > 0) { 176 + return; 177 + } 178 + 179 + // no more consumers care about this query, remove from batch 180 + batch.pending.delete(id); 181 + 182 + // return early, have the batch continue execution if we still need 183 + // to process other queries. 184 + if (batch.pending.size > 0) { 185 + return; 186 + } 187 + 188 + // batch is empty, clean up completely 189 + batch.controller.abort(); 190 + clearTimeout(batch.timeout); 191 + 192 + if (curr === batch) { 193 + curr = undefined; 194 + } 195 + }, 196 + { 197 + once: true, 198 + signal: batch.controller.signal, 199 + }, 200 + ); 201 + } 202 + 203 + { 204 + // reset the execution timer 205 + clearTimeout(batch.timeout); 206 + 207 + batch.timeout = setTimeout(() => { 208 + if (curr === batch) { 209 + curr = undefined; 210 + } 211 + 212 + perform(batch, fetch, idFromResource); 213 + }, timeout); 214 + } 215 + 216 + return promise; 217 + }; 218 + }; 219 + 220 + const perform = async <Id extends ResourceId, Resource, Query = Id>( 221 + map: BatchedFetchMap<Id, Resource, Query>, 222 + fetch: (queries: Query[], signal: AbortSignal) => Promisable<Resource[]>, 223 + idFromResource: (data: Resource) => Id, 224 + ) => { 225 + const signal = map.controller.signal; 226 + if (signal.aborted) { 227 + return; 228 + } 229 + 230 + const pending = map.pending; 231 + if (pending.size === 0) { 232 + // theoretically this should only be empty if the whole-batch signal is 233 + // aborted, but better be safe. 234 + return; 235 + } 236 + 237 + let errored = false; 238 + 239 + try { 240 + const queries = Array.from(pending.values(), (meta) => meta.query); 241 + const dataset = await fetch(queries, signal); 242 + 243 + for (const data of dataset) { 244 + const id = idFromResource(data); 245 + const meta = pending.get(id); 246 + 247 + meta?.deferred.resolve(data); 248 + } 249 + } catch (error) { 250 + errored = true; 251 + 252 + for (const meta of pending.values()) { 253 + meta.deferred.reject(error); 254 + } 255 + } finally { 256 + if (!errored) { 257 + // we've succeeded! we're iterating the pending map again to boot 258 + // unresolved promises, else they'll end up waiting forever. 259 + // 260 + // this should only apply for scenarios where the caller/API handles 261 + // nonexistent data by omitting it entirely from the results. 262 + for (const meta of pending.values()) { 263 + meta.deferred.reject(new ResourceMissingError()); 264 + } 265 + } 266 + } 267 + 268 + // abort the controller to clean up event listeners to upstream signals 269 + map.controller.abort(); 270 + };