batch individual queries into one single request
jsr.io/@mary/batch-fetch
typescript
jsr
1type Promisable<T> = T | Promise<T>;
2
3/** identifies a resource */
4export type ResourceId = string | number;
5
6type BatchedFetchMap<Id extends ResourceId, Resource, Query = Id> = {
7 /** grouping key */
8 key: string | number | undefined;
9 /** timer for batch execution */
10 timeout: ReturnType<typeof setTimeout> | undefined;
11 /** controls the lifecycle of this batch */
12 controller: AbortController;
13 /** a map of pending queries */
14 pending: Map<
15 Id,
16 {
17 /** original query descriptor */
18 query: Query;
19 /** promise that will resolve with the resource */
20 deferred: PromiseWithResolvers<Resource>;
21 /** whether we have a consumer without an abort signal */
22 passive: boolean;
23 /** amount of consumers with an abort signal */
24 signals: number;
25 }
26 >;
27};
28
29type BaseOptions<Id extends ResourceId, Resource, Query = Id> = {
30 /**
31 * maximum number of queries that can be included in one request
32 */
33 limit: number;
34
35 /**
36 * how long to wait for new queries to be collected before a request
37 * @default 125
38 */
39 timeout?: number;
40
41 /**
42 * performs the request
43 * @param queries queries passed for this batch
44 * @param signal abort signal
45 * @returns array of resources
46 */
47 fetch: (queries: Query[], signal: AbortSignal) => Promisable<Resource[]>;
48
49 /**
50 * optional function for separating queries into different batch
51 * @param query query descriptor
52 * @returns batch grouping key
53 */
54 key?: (query: Query) => string | number;
55
56 /**
57 * function that takes in the resource's identifier, used to associate
58 * resources with the queries
59 * @param resource resource
60 * @returns resource identifier
61 */
62 idFromResource: (resource: Resource) => Id;
63};
64
65/** options for batch fetching */
66export type BatchedFetchOptions<Id extends ResourceId, Resource, Query = Id> =
67 & BaseOptions<Id, Resource, Query>
68 & (Query extends Id ? {
69 /**
70 * function that takes in the resource identifier from the query descriptor,
71 * used for deduplication and resource matching.
72 * @param query query descriptor
73 * @returns resource identifier
74 */
75 idFromQuery?: (query: Query) => Id;
76 }
77 : {
78 /**
79 * function that takes in the resource identifier from the query descriptor,
80 * used for deduplication and resource matching.
81 * @param query query descriptor
82 * @returns resource identifier
83 */
84 idFromQuery: (query: Query) => Id;
85 });
86
87/** error thrown when a resource wasn't returned in the response */
88export class ResourceMissingError extends Error {
89 override readonly name = 'ResourceMissingError';
90}
91
92const identity = <T>(value: T): T => value;
93
94/**
95 * creates a function that batches individual queries into one single request.
96 * @param options configurations
97 * @returns a function that you can use to request for a query.
98 */
99/*#__NO_SIDE_EFFECTS__*/
100export const createBatchedFetch = <Id extends ResourceId, Resource, Query = Id>(
101 options: BatchedFetchOptions<Id, Resource, Query>,
102): (query: Query, signal?: AbortSignal) => Promise<Resource> => {
103 const {
104 limit,
105 timeout = 125,
106 fetch,
107 key: _key,
108 idFromQuery = identity,
109 idFromResource,
110 } = options;
111
112 /** current active batch */
113 let curr: BatchedFetchMap<Id, Resource, Query> | undefined;
114
115 return (query: Query, signal?: AbortSignal): Promise<Resource> => {
116 // throw early if provided signal is already aborted
117 signal?.throwIfAborted();
118
119 const id = idFromQuery(query);
120 const key = _key?.(query);
121
122 // create a new batch if:
123 // - we don't have a batch currently waiting
124 // - the current batch has already reached the limit
125 // - batch key doesn't match
126 let batch = curr;
127 if (batch === undefined || batch.pending.size >= limit || batch.key !== key) {
128 batch = curr = {
129 key,
130 timeout: undefined,
131 controller: new AbortController(),
132 pending: new Map(),
133 };
134 }
135
136 let meta = batch.pending.get(id);
137 if (meta === undefined) {
138 meta = {
139 query: query,
140 deferred: Promise.withResolvers(),
141 passive: false,
142 signals: 0,
143 };
144
145 batch.pending.set(id, meta);
146 }
147
148 let promise = meta.deferred.promise;
149
150 if (signal === undefined) {
151 // this consumer provided no signal, so we can't consider this query for
152 // removal if a different consumer has aborted theirs.
153 meta.passive = true;
154 } else {
155 // we need the returned promise to resolve early if the signal is aborted.
156 // so we'll race it with this deferred that will only throw.
157 const def = Promise.withResolvers<never>();
158 promise = Promise.race([promise, def.promise]);
159
160 // make this signal count
161 meta.signals++;
162
163 signal.addEventListener(
164 'abort',
165 () => {
166 // immediately reject this consumer's promise
167 def.reject(signal.reason);
168
169 // decrement the count
170 meta.signals--;
171
172 // return early, have the query remain in batch if:
173 // - we have passive consumers waiting on this query
174 // - there are still other consumers with an abort signal waiting
175 if (meta.passive || meta.signals > 0) {
176 return;
177 }
178
179 // no more consumers care about this query, remove from batch
180 batch.pending.delete(id);
181
182 // return early, have the batch continue execution if we still need
183 // to process other queries.
184 if (batch.pending.size > 0) {
185 return;
186 }
187
188 // batch is empty, clean up completely
189 batch.controller.abort();
190 clearTimeout(batch.timeout);
191
192 if (curr === batch) {
193 curr = undefined;
194 }
195 },
196 {
197 once: true,
198 signal: batch.controller.signal,
199 },
200 );
201 }
202
203 {
204 // reset the execution timer
205 clearTimeout(batch.timeout);
206
207 batch.timeout = setTimeout(() => {
208 if (curr === batch) {
209 curr = undefined;
210 }
211
212 perform(batch, fetch, idFromResource);
213 }, timeout);
214 }
215
216 return promise;
217 };
218};
219
220const perform = async <Id extends ResourceId, Resource, Query = Id>(
221 map: BatchedFetchMap<Id, Resource, Query>,
222 fetch: (queries: Query[], signal: AbortSignal) => Promisable<Resource[]>,
223 idFromResource: (data: Resource) => Id,
224) => {
225 const signal = map.controller.signal;
226 if (signal.aborted) {
227 return;
228 }
229
230 const pending = map.pending;
231 if (pending.size === 0) {
232 // theoretically this should only be empty if the whole-batch signal is
233 // aborted, but better be safe.
234 return;
235 }
236
237 let errored = false;
238
239 try {
240 const queries = Array.from(pending.values(), (meta) => meta.query);
241 const dataset = await fetch(queries, signal);
242
243 for (const data of dataset) {
244 const id = idFromResource(data);
245 const meta = pending.get(id);
246
247 meta?.deferred.resolve(data);
248 }
249 } catch (error) {
250 errored = true;
251
252 for (const meta of pending.values()) {
253 meta.deferred.reject(error);
254 }
255 } finally {
256 if (!errored) {
257 // we've succeeded! we're iterating the pending map again to boot
258 // unresolved promises, else they'll end up waiting forever.
259 //
260 // this should only apply for scenarios where the caller/API handles
261 // nonexistent data by omitting it entirely from the results.
262 for (const meta of pending.values()) {
263 meta.deferred.reject(new ResourceMissingError());
264 }
265 }
266 }
267
268 // abort the controller to clean up event listeners to upstream signals
269 map.controller.abort();
270};