1import type { Source } from 'wonka';
2import { pipe, filter, takeUntil, mergeMap, merge, make } from 'wonka';
3
4import type {
5 GraphQLSchema,
6 GraphQLFieldResolver,
7 GraphQLTypeResolver,
8 ExecutionArgs,
9 SubscriptionArgs,
10} from 'graphql';
11import { execute, subscribe, Kind } from 'graphql';
12
13import type {
14 Exchange,
15 ExecutionResult,
16 Operation,
17 OperationResult,
18} from '@urql/core';
19import { makeResult, makeErrorResult, mergeResultPatch } from '@urql/core';
20
21/** Input parameters for the {@link executeExchange}.
22 * @see {@link ExecutionArgs} which this interface mirrors. */
23export interface ExecuteExchangeArgs {
24 /** GraphQL Schema definition that `Operation`s are execute against. */
25 schema: GraphQLSchema;
26 /** Context object or a factory function creating a `context` object.
27 *
28 * @remarks
29 * The `context` that is passed to the `schema` may either be passed
30 * or created from an incoming `Operation`, which also allows it to
31 * be recreated per `Operation`.
32 */
33 context?: ((operation: Operation) => any) | any;
34 rootValue?: any;
35 fieldResolver?: GraphQLFieldResolver<any, any>;
36 typeResolver?: GraphQLTypeResolver<any, any>;
37 subscribeFieldResolver?: GraphQLFieldResolver<any, any>;
38}
39
40type ExecuteParams = ExecutionArgs | SubscriptionArgs;
41
42const asyncIterator =
43 typeof Symbol !== 'undefined' ? Symbol.asyncIterator : null;
44
45const makeExecuteSource = (
46 operation: Operation,
47 _args: ExecuteParams
48): Source<OperationResult> => {
49 return make<OperationResult>(observer => {
50 let iterator: AsyncIterator<ExecutionResult>;
51 let ended = false;
52
53 Promise.resolve()
54 .then(async () => ({ ..._args, contextValue: await _args.contextValue }))
55 .then(args => {
56 if (ended) return;
57 if (operation.kind === 'subscription') {
58 return subscribe(args) as any;
59 }
60 return execute(args) as any;
61 })
62 .then((result: ExecutionResult | AsyncIterable<ExecutionResult>) => {
63 if (ended || !result) {
64 return;
65 } else if (!asyncIterator || !result[asyncIterator]) {
66 observer.next(makeResult(operation, result as ExecutionResult));
67 return;
68 }
69 iterator = result[asyncIterator!]();
70 let prevResult: OperationResult | null = null;
71
72 function next({
73 done,
74 value,
75 }: {
76 done?: boolean;
77 value: ExecutionResult;
78 }) {
79 if (value) {
80 observer.next(
81 (prevResult = prevResult
82 ? mergeResultPatch(prevResult, value)
83 : makeResult(operation, value))
84 );
85 }
86
87 if (!done && !ended) {
88 return iterator.next().then(next);
89 }
90 }
91
92 return iterator.next().then(next);
93 })
94 .then(() => {
95 observer.complete();
96 })
97 .catch(error => {
98 observer.next(makeErrorResult(operation, error));
99 observer.complete();
100 });
101
102 return () => {
103 if (iterator && iterator.return) iterator.return();
104 ended = true;
105 };
106 });
107};
108
109/** Exchange factory that executes operations against a GraphQL schema.
110 *
111 * @param options - A {@link ExecuteExchangeArgs} configuration object.
112 * @returns the created execute {@link Exchange}.
113 *
114 * @remarks
115 * The `executeExchange` executes GraphQL operations against the `schema`
116 * that it’s passed. As such, its options mirror the options that GraphQL.js’
117 * {@link execute} function accepts.
118 */
119export const executeExchange =
120 (options: ExecuteExchangeArgs): Exchange =>
121 ({ forward }) => {
122 return operations$ => {
123 const executedOps$ = pipe(
124 operations$,
125 filter((operation: Operation) => {
126 return (
127 operation.kind === 'query' ||
128 operation.kind === 'mutation' ||
129 operation.kind === 'subscription'
130 );
131 }),
132 mergeMap((operation: Operation) => {
133 const { key } = operation;
134 const teardown$ = pipe(
135 operations$,
136 filter(op => op.kind === 'teardown' && op.key === key)
137 );
138
139 const contextValue =
140 typeof options.context === 'function'
141 ? options.context(operation)
142 : options.context;
143
144 // Filter undefined values from variables before calling execute()
145 // to support default values within directives.
146 const variableValues = Object.create(null);
147 if (operation.variables) {
148 for (const key in operation.variables) {
149 if (operation.variables[key] !== undefined) {
150 variableValues[key] = operation.variables[key];
151 }
152 }
153 }
154
155 let operationName: string | undefined;
156 for (const node of operation.query.definitions) {
157 if (node.kind === Kind.OPERATION_DEFINITION) {
158 operationName = node.name ? node.name.value : undefined;
159 break;
160 }
161 }
162
163 return pipe(
164 makeExecuteSource(operation, {
165 schema: options.schema,
166 document: operation.query,
167 rootValue: options.rootValue,
168 contextValue,
169 variableValues,
170 operationName,
171 fieldResolver: options.fieldResolver,
172 typeResolver: options.typeResolver,
173 subscribeFieldResolver: options.subscribeFieldResolver,
174 }),
175 takeUntil(teardown$)
176 );
177 })
178 );
179
180 const forwardedOps$ = pipe(
181 operations$,
182 filter(operation => operation.kind === 'teardown'),
183 forward
184 );
185
186 return merge([executedOps$, forwardedOps$]);
187 };
188 };