Skip to content

Commit 780128d

Browse files
committed
add defer/stream support for subscriptions (#7)
1 parent bd091bf commit 780128d

File tree

4 files changed

+340
-14
lines changed

4 files changed

+340
-14
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import { expect } from 'chai';
2+
import { describe, it } from 'mocha';
3+
4+
import flattenAsyncIterator from '../flattenAsyncIterator';
5+
6+
describe('flattenAsyncIterator', () => {
7+
it('does not modify an already flat async generator', async () => {
8+
async function* source() {
9+
yield 1;
10+
yield 2;
11+
yield 3;
12+
}
13+
14+
const result = flattenAsyncIterator(source());
15+
16+
expect(await result.next()).to.deep.equal({ value: 1, done: false });
17+
expect(await result.next()).to.deep.equal({ value: 2, done: false });
18+
expect(await result.next()).to.deep.equal({ value: 3, done: false });
19+
expect(await result.next()).to.deep.equal({
20+
value: undefined,
21+
done: true,
22+
});
23+
});
24+
25+
it('does not modify an already flat async iterator', async () => {
26+
const items = [1, 2, 3];
27+
28+
const iterator: any = {
29+
[Symbol.asyncIterator]() {
30+
return this;
31+
},
32+
next() {
33+
return Promise.resolve({
34+
done: items.length === 0,
35+
value: items.shift(),
36+
});
37+
},
38+
};
39+
40+
const result = flattenAsyncIterator(iterator);
41+
42+
expect(await result.next()).to.deep.equal({ value: 1, done: false });
43+
expect(await result.next()).to.deep.equal({ value: 2, done: false });
44+
expect(await result.next()).to.deep.equal({ value: 3, done: false });
45+
expect(await result.next()).to.deep.equal({
46+
value: undefined,
47+
done: true,
48+
});
49+
});
50+
51+
it('flatten nested async generators', async () => {
52+
async function* source() {
53+
yield 1;
54+
yield 2;
55+
yield (async function* (): AsyncGenerator<number, void, void> {
56+
yield 2.1;
57+
yield 2.2;
58+
})();
59+
yield 3;
60+
}
61+
62+
const doubles = flattenAsyncIterator(source());
63+
64+
const result = [];
65+
for await (const x of doubles) {
66+
result.push(x);
67+
}
68+
expect(result).to.deep.equal([1, 2, 2.1, 2.2, 3]);
69+
});
70+
71+
it('allows returning early from a nested async generator', async () => {
72+
async function* source() {
73+
yield 1;
74+
yield 2;
75+
yield (async function* (): AsyncGenerator<number, void, void> {
76+
yield 2.1;
77+
// istanbul ignore next (Shouldn't be reached)
78+
yield 2.2;
79+
})();
80+
// istanbul ignore next (Shouldn't be reached)
81+
yield 3;
82+
}
83+
84+
const doubles = flattenAsyncIterator(source());
85+
86+
expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
87+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
88+
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });
89+
90+
// Early return
91+
expect(await doubles.return()).to.deep.equal({
92+
value: undefined,
93+
done: true,
94+
});
95+
96+
// Subsequent next calls
97+
expect(await doubles.next()).to.deep.equal({
98+
value: undefined,
99+
done: true,
100+
});
101+
expect(await doubles.next()).to.deep.equal({
102+
value: undefined,
103+
done: true,
104+
});
105+
});
106+
107+
it('allows throwing errors from a nested async generator', async () => {
108+
async function* source() {
109+
yield 1;
110+
yield 2;
111+
yield (async function* (): AsyncGenerator<number, void, void> {
112+
yield 2.1;
113+
// istanbul ignore next (Shouldn't be reached)
114+
yield 2.2;
115+
})();
116+
// istanbul ignore next (Shouldn't be reached)
117+
yield 3;
118+
}
119+
120+
const doubles = flattenAsyncIterator(source());
121+
122+
expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
123+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
124+
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });
125+
126+
// Throw error
127+
let caughtError;
128+
try {
129+
await doubles.throw('ouch');
130+
} catch (e) {
131+
caughtError = e;
132+
}
133+
expect(caughtError).to.equal('ouch');
134+
});
135+
});

src/subscription/__tests__/subscribe-test.js

+147
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,153 @@ describe('Subscription Publish Phase', () => {
668668
});
669669
});
670670

671+
it('produces additional payloads for subscriptions with @defer', async () => {
672+
const pubsub = new SimplePubSub();
673+
const subscription = await createSubscription(
674+
pubsub,
675+
emailSchema,
676+
parse(`
677+
subscription ($priority: Int = 0) {
678+
importantEmail(priority: $priority) {
679+
email {
680+
from
681+
subject
682+
}
683+
... @defer {
684+
inbox {
685+
unread
686+
total
687+
}
688+
}
689+
}
690+
}
691+
`),
692+
);
693+
invariant(isAsyncIterable(subscription));
694+
// Wait for the next subscription payload.
695+
const payload = subscription.next();
696+
697+
// A new email arrives!
698+
expect(
699+
pubsub.emit({
700+
701+
subject: 'Alright',
702+
message: 'Tests are good',
703+
unread: true,
704+
}),
705+
).to.equal(true);
706+
707+
// The previously waited on payload now has a value.
708+
expect(await payload).to.deep.equal({
709+
done: false,
710+
value: {
711+
data: {
712+
importantEmail: {
713+
email: {
714+
715+
subject: 'Alright',
716+
},
717+
},
718+
},
719+
hasNext: true,
720+
},
721+
});
722+
723+
// Wait for the next payload from @defer
724+
expect(await subscription.next()).to.deep.equal({
725+
done: false,
726+
value: {
727+
data: {
728+
inbox: {
729+
unread: 1,
730+
total: 2,
731+
},
732+
},
733+
path: ['importantEmail'],
734+
hasNext: false,
735+
},
736+
});
737+
738+
// Another new email arrives, after all incrementally delivered payloads are received.
739+
expect(
740+
pubsub.emit({
741+
742+
subject: 'Tools',
743+
message: 'I <3 making things',
744+
unread: true,
745+
}),
746+
).to.equal(true);
747+
748+
// The next waited on payload will have a value.
749+
expect(await subscription.next()).to.deep.equal({
750+
done: false,
751+
value: {
752+
data: {
753+
importantEmail: {
754+
email: {
755+
756+
subject: 'Tools',
757+
},
758+
},
759+
},
760+
hasNext: true,
761+
},
762+
});
763+
764+
// Another new email arrives, before the incrementally delivered payloads from the last email was received.
765+
expect(
766+
pubsub.emit({
767+
768+
subject: 'Important',
769+
message: 'Read me please',
770+
unread: true,
771+
}),
772+
).to.equal(true);
773+
774+
// Deferred payload from previous event is received.
775+
expect(await subscription.next()).to.deep.equal({
776+
done: false,
777+
value: {
778+
data: {
779+
inbox: {
780+
unread: 2,
781+
total: 3,
782+
},
783+
},
784+
path: ['importantEmail'],
785+
hasNext: false,
786+
},
787+
});
788+
789+
// Next payload from last event
790+
expect(await subscription.next()).to.deep.equal({
791+
done: false,
792+
value: {
793+
data: {
794+
importantEmail: {
795+
email: {
796+
797+
subject: 'Important',
798+
},
799+
},
800+
},
801+
hasNext: true,
802+
},
803+
});
804+
805+
// The client disconnects before the deferred payload is consumed.
806+
expect(await subscription.return()).to.deep.equal({
807+
done: true,
808+
value: undefined,
809+
});
810+
811+
// Awaiting a subscription after closing it results in completed results.
812+
expect(await subscription.next()).to.deep.equal({
813+
done: true,
814+
value: undefined,
815+
});
816+
});
817+
671818
it('produces a payload when there are multiple events', async () => {
672819
const pubsub = new SimplePubSub();
673820
const subscription = await createSubscription(pubsub);
+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { SYMBOL_ASYNC_ITERATOR } from '../polyfills/symbols';
2+
3+
import isAsyncIterable from '../jsutils/isAsyncIterable';
4+
5+
/**
6+
* Given an AsyncIterable that could potentially yield other async iterators,
7+
* flatten all yielded results into a single AsyncIterable
8+
*/
9+
export default function flattenAsyncIterator<T>(
10+
iterable: AsyncGenerator<AsyncGenerator<T, void, void> | T, void, void>,
11+
): AsyncGenerator<T, void, void> {
12+
// $FlowFixMe[prop-missing]
13+
const iteratorMethod = iterable[SYMBOL_ASYNC_ITERATOR];
14+
const iterator: any = iteratorMethod.call(iterable);
15+
let iteratorStack: Array<AsyncGenerator<T, void, void>> = [iterator];
16+
17+
function next(): Promise<IteratorResult<T, void>> {
18+
const currentIterator = iteratorStack[0];
19+
if (!currentIterator) {
20+
return Promise.resolve({ value: undefined, done: true });
21+
}
22+
return currentIterator.next().then((result) => {
23+
if (result.done) {
24+
iteratorStack.shift();
25+
return next();
26+
} else if (isAsyncIterable(result.value)) {
27+
const childIteratorMethod = result.value[SYMBOL_ASYNC_ITERATOR];
28+
const childIterator: any = childIteratorMethod.call(result.value);
29+
iteratorStack.unshift(childIterator);
30+
return next();
31+
}
32+
return result;
33+
});
34+
}
35+
return ({
36+
next,
37+
return() {
38+
iteratorStack = [];
39+
return iterator.return();
40+
},
41+
throw(error?: mixed): Promise<IteratorResult<T, void>> {
42+
iteratorStack = [];
43+
return iterator.throw(error);
44+
},
45+
[SYMBOL_ASYNC_ITERATOR]() {
46+
return this;
47+
},
48+
}: $FlowFixMe);
49+
}

src/subscription/subscribe.js

+9-14
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import type { GraphQLFieldResolver } from '../type/definition';
2424
import { getOperationRootType } from '../utilities/getOperationRootType';
2525

2626
import mapAsyncIterator from './mapAsyncIterator';
27+
import flattenAsyncIterator from './flattenAsyncIterator';
2728

2829
export type SubscriptionArgs = {|
2930
schema: GraphQLSchema,
@@ -140,8 +141,8 @@ function subscribeImpl(
140141
// the GraphQL specification. The `execute` function provides the
141142
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
142143
// "ExecuteQuery" algorithm, for which `execute` is also used.
143-
const mapSourceToResponse = (payload) => {
144-
const executionResult = execute({
144+
const mapSourceToResponse = (payload) =>
145+
execute({
145146
schema,
146147
document,
147148
rootValue: payload,
@@ -150,24 +151,18 @@ function subscribeImpl(
150151
operationName,
151152
fieldResolver,
152153
});
153-
/* istanbul ignore if - TODO: implement support for defer/stream in subscriptions */
154-
if (isAsyncIterable(executionResult)) {
155-
throw new Error(
156-
'TODO: implement support for defer/stream in subscriptions',
157-
);
158-
}
159-
return executionResult;
160-
};
161154

162155
// Resolve the Source Stream, then map every source value to a
163156
// ExecutionResult value as described above.
164157
return sourcePromise.then((resultOrStream) =>
165158
// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
166159
isAsyncIterable(resultOrStream)
167-
? mapAsyncIterator(
168-
resultOrStream,
169-
mapSourceToResponse,
170-
reportGraphQLError,
160+
? flattenAsyncIterator(
161+
mapAsyncIterator(
162+
resultOrStream,
163+
mapSourceToResponse,
164+
reportGraphQLError,
165+
),
171166
)
172167
: ((resultOrStream: any): ExecutionResult),
173168
);

0 commit comments

Comments
 (0)