Skip to content

Commit 183b5d6

Browse files
Add native subscription support for coroutine Flows
Implement a SubscriptionExecutionStrategy that allows for `Flow`s and `Publisher`s to be returned from graphql schema elements, and can be processed as a `Flow` by subscription consumers. Relax restrictions that look for `Publisher`s to also allow `Flow`s. Fixes #358
1 parent b129244 commit 183b5d6

File tree

6 files changed

+352
-2
lines changed

6 files changed

+352
-2
lines changed

graphql-kotlin-schema-generator/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ dependencies {
1111
api("com.graphql-java:graphql-java:$graphQLJavaVersion")
1212
// TODO change below from api to implementation?
1313
api("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$kotlinCoroutinesVersion")
14+
api("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion")
1415
api("org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion")
1516
api("io.github.classgraph:classgraph:$classGraphVersion")
1617
api("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion")

graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/exceptions/InvalidSubscriptionTypeException.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import kotlin.reflect.KFunction
2222

2323
class InvalidSubscriptionTypeException(kClass: KClass<*>, kFunction: KFunction<*>? = null) :
2424
GraphQLKotlinException(
25-
"Schema requires all subscriptions to be public and return a type of Publisher. " +
25+
"Schema requires all subscriptions to be public and return a type of Publisher or Flow. " +
2626
"${kClass.simpleName} has ${kClass.visibility} visibility modifier. " +
2727
if (kFunction != null) "The function return type is ${kFunction.returnType.getSimpleName()}" else ""
2828
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2020 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.execution
18+
19+
import graphql.AssertException
20+
import graphql.ExecutionResult
21+
import graphql.ExecutionResultImpl
22+
import graphql.execution.DataFetcherExceptionHandler
23+
import graphql.execution.ExecutionContext
24+
import graphql.execution.ExecutionStrategy
25+
import graphql.execution.ExecutionStrategyParameters
26+
import graphql.execution.FetchedValue
27+
import graphql.execution.SimpleDataFetcherExceptionHandler
28+
import graphql.execution.SubscriptionExecutionStrategy
29+
import kotlinx.coroutines.flow.Flow
30+
import kotlinx.coroutines.flow.map
31+
import kotlinx.coroutines.future.await
32+
import kotlinx.coroutines.reactive.asFlow
33+
import org.reactivestreams.Publisher
34+
import java.util.Collections
35+
import java.util.concurrent.CompletableFuture
36+
37+
/**
38+
* [SubscriptionExecutionStrategy] replacement that returns an [ExecutionResult]
39+
* that is a [Flow] instead of a [Publisher], and allows schema subscription functions
40+
* to return either a [Flow] or a [Publisher].
41+
*/
42+
class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) {
43+
constructor() : this(SimpleDataFetcherExceptionHandler())
44+
45+
override fun execute(
46+
executionContext: ExecutionContext,
47+
parameters: ExecutionStrategyParameters
48+
): CompletableFuture<ExecutionResult> {
49+
50+
val sourceEventStream = createSourceEventStream(executionContext, parameters)
51+
52+
//
53+
// when the upstream source event stream completes, subscribe to it and wire in our adapter
54+
return sourceEventStream.thenApply { sourceFlow ->
55+
if (sourceFlow == null) {
56+
ExecutionResultImpl(null, executionContext.errors)
57+
} else {
58+
val returnFlow = sourceFlow.map {
59+
executeSubscriptionEvent(executionContext, parameters, it).await()
60+
}
61+
ExecutionResultImpl(returnFlow, executionContext.errors)
62+
}
63+
}
64+
}
65+
66+
/*
67+
https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
68+
69+
CreateSourceEventStream(subscription, schema, variableValues, initialValue):
70+
71+
Let {subscriptionType} be the root Subscription type in {schema}.
72+
Assert: {subscriptionType} is an Object type.
73+
Let {selectionSet} be the top level Selection Set in {subscription}.
74+
Let {rootField} be the first top level field in {selectionSet}.
75+
Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}.
76+
Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}.
77+
Return {fieldStream}.
78+
*/
79+
private fun createSourceEventStream(
80+
executionContext: ExecutionContext,
81+
parameters: ExecutionStrategyParameters
82+
): CompletableFuture<Flow<*>> {
83+
val newParameters = firstFieldOfSubscriptionSelection(parameters)
84+
85+
val fieldFetched = fetchField(executionContext, newParameters)
86+
return fieldFetched.thenApply { fetchedValue ->
87+
val flow = when (val publisherOrFlow = fetchedValue.fetchedValue) {
88+
null -> null
89+
is Publisher<*> -> publisherOrFlow.asFlow()
90+
is Flow<*> -> publisherOrFlow
91+
else -> throw AssertException(
92+
"You data fetcher must return a Flow of events when using graphql subscriptions"
93+
)
94+
}
95+
flow
96+
}
97+
}
98+
99+
/*
100+
ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue):
101+
102+
Let {subscriptionType} be the root Subscription type in {schema}.
103+
Assert: {subscriptionType} is an Object type.
104+
Let {selectionSet} be the top level Selection Set in {subscription}.
105+
Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization).
106+
Let {errors} be any field errors produced while executing the selection set.
107+
Return an unordered map containing {data} and {errors}.
108+
109+
Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced.
110+
*/
111+
112+
private fun executeSubscriptionEvent(
113+
executionContext: ExecutionContext,
114+
parameters: ExecutionStrategyParameters,
115+
eventPayload: Any?
116+
): CompletableFuture<ExecutionResult> {
117+
val newExecutionContext = executionContext.transform { builder -> builder.root(eventPayload) }
118+
119+
val newParameters = firstFieldOfSubscriptionSelection(parameters)
120+
val fetchedValue = FetchedValue.newFetchedValue().fetchedValue(eventPayload)
121+
.rawFetchedValue(eventPayload)
122+
.localContext(parameters.localContext)
123+
.build()
124+
return completeField(newExecutionContext, newParameters, fetchedValue).fieldValue
125+
.thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) }
126+
}
127+
128+
private fun wrapWithRootFieldName(
129+
parameters: ExecutionStrategyParameters,
130+
executionResult: ExecutionResult
131+
): ExecutionResult {
132+
val rootFieldName = getRootFieldName(parameters)
133+
return ExecutionResultImpl(
134+
Collections.singletonMap<String, Any>(rootFieldName, executionResult.getData<Any>()),
135+
executionResult.errors
136+
)
137+
}
138+
139+
private fun getRootFieldName(parameters: ExecutionStrategyParameters): String {
140+
val rootField = parameters.field.singleField
141+
return if (rootField.alias != null) rootField.alias else rootField.name
142+
}
143+
144+
private fun firstFieldOfSubscriptionSelection(
145+
parameters: ExecutionStrategyParameters
146+
): ExecutionStrategyParameters {
147+
val fields = parameters.fields
148+
val firstField = fields.getSubField(fields.keys[0])
149+
150+
val fieldPath = parameters.path.segment(ExecutionStrategy.mkNameForPath(firstField.singleField))
151+
return parameters.transform { builder -> builder.field(firstField).path(fieldPath) }
152+
}
153+
}

graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/types/generateSubscription.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.expediagroup.graphql.generator.extensions.getValidFunctions
2323
import com.expediagroup.graphql.generator.extensions.isNotPublic
2424
import com.expediagroup.graphql.generator.extensions.isSubclassOf
2525
import graphql.schema.GraphQLObjectType
26+
import kotlinx.coroutines.flow.Flow
2627
import org.reactivestreams.Publisher
2728

2829
internal fun generateSubscriptions(generator: SchemaGenerator, subscriptions: List<TopLevelObject>): GraphQLObjectType? {
@@ -40,7 +41,7 @@ internal fun generateSubscriptions(generator: SchemaGenerator, subscriptions: Li
4041

4142
subscription.kClass.getValidFunctions(generator.config.hooks)
4243
.forEach {
43-
if (it.returnType.isSubclassOf(Publisher::class).not()) {
44+
if (it.returnType.isSubclassOf(Publisher::class).or(it.returnType.isSubclassOf(Flow::class)).not()) {
4445
throw InvalidSubscriptionTypeException(subscription.kClass, it)
4546
}
4647

graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/types/utils/functionReturnTypes.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package com.expediagroup.graphql.generator.types.utils
1919
import com.expediagroup.graphql.generator.extensions.getTypeOfFirstArgument
2020
import com.expediagroup.graphql.generator.extensions.isSubclassOf
2121
import graphql.execution.DataFetcherResult
22+
import kotlinx.coroutines.flow.Flow
2223
import org.reactivestreams.Publisher
2324
import java.util.concurrent.CompletableFuture
2425
import kotlin.reflect.KType
@@ -41,6 +42,7 @@ import kotlin.reflect.KType
4142
internal fun getWrappedReturnType(returnType: KType): KType {
4243
return when {
4344
returnType.isSubclassOf(Publisher::class) -> returnType.getTypeOfFirstArgument()
45+
returnType.isSubclassOf(Flow::class) -> returnType.getTypeOfFirstArgument()
4446
returnType.isSubclassOf(DataFetcherResult::class) -> returnType.getTypeOfFirstArgument()
4547
returnType.isSubclassOf(CompletableFuture::class) -> {
4648
val wrappedType = returnType.getTypeOfFirstArgument()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright 2020 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.execution
18+
19+
import com.expediagroup.graphql.SchemaGeneratorConfig
20+
import com.expediagroup.graphql.TopLevelObject
21+
import com.expediagroup.graphql.annotations.GraphQLContext
22+
import com.expediagroup.graphql.exceptions.GraphQLKotlinException
23+
import com.expediagroup.graphql.toSchema
24+
import graphql.ExecutionInput
25+
import graphql.ExecutionResult
26+
import graphql.GraphQL
27+
import graphql.GraphQLError
28+
import graphql.GraphqlErrorBuilder
29+
import graphql.schema.GraphQLSchema
30+
import kotlinx.coroutines.InternalCoroutinesApi
31+
import kotlinx.coroutines.delay
32+
import kotlinx.coroutines.flow.Flow
33+
import kotlinx.coroutines.flow.catch
34+
import kotlinx.coroutines.flow.collect
35+
import kotlinx.coroutines.flow.flow
36+
import kotlinx.coroutines.flow.onEach
37+
import kotlinx.coroutines.reactive.asPublisher
38+
import kotlinx.coroutines.runBlocking
39+
import org.junit.jupiter.api.Test
40+
import org.reactivestreams.Publisher
41+
import kotlin.test.assertEquals
42+
import kotlin.test.assertNull
43+
import kotlin.test.assertTrue
44+
45+
@InternalCoroutinesApi
46+
class FlowSubscriptionExecutionStrategyTest {
47+
48+
private val testSchema: GraphQLSchema = toSchema(
49+
config = SchemaGeneratorConfig(supportedPackages = listOf("com.expediagroup.graphql.spring.execution")),
50+
queries = listOf(TopLevelObject(BasicQuery())),
51+
subscriptions = listOf(TopLevelObject(FlowSubscription()))
52+
)
53+
private val testGraphQL: GraphQL = GraphQL.newGraphQL(testSchema).subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy()).build()
54+
55+
@Test
56+
fun `verify subscription to flow`() = runBlocking {
57+
val request = ExecutionInput.newExecutionInput().query("subscription { ticker }").build()
58+
val response = testGraphQL.execute(request)
59+
val flow = response.getData<Flow<ExecutionResult>>()
60+
val list = mutableListOf<Int>()
61+
flow.collect {
62+
list.add(it.getData<Map<String, Int>>().getValue("ticker"))
63+
}
64+
assertEquals(5, list.size)
65+
for (i in list.indices) {
66+
assertEquals(i + 1, list[i])
67+
}
68+
}
69+
70+
@Test
71+
fun `verify subscription to publisher`() = runBlocking {
72+
val request = ExecutionInput.newExecutionInput().query("subscription { publisherTicker }").build()
73+
val response = testGraphQL.execute(request)
74+
val flow = response.getData<Flow<ExecutionResult>>()
75+
val list = mutableListOf<Int>()
76+
flow.collect {
77+
list.add(it.getData<Map<String, Int>>().getValue("publisherTicker"))
78+
}
79+
assertEquals(5, list.size)
80+
for (i in list.indices) {
81+
assertEquals(i + 1, list[i])
82+
}
83+
}
84+
85+
@Test
86+
fun `verify subscription to flow with context`() = runBlocking {
87+
val request = ExecutionInput.newExecutionInput()
88+
.query("subscription { contextualTicker }")
89+
.context(SubscriptionContext("junitHandler"))
90+
.build()
91+
val response = testGraphQL.execute(request)
92+
val flow = response.getData<Flow<ExecutionResult>>()
93+
val list = mutableListOf<Int>()
94+
flow.collect {
95+
val contextValue = it.getData<Map<String, String>>().getValue("contextualTicker")
96+
assertTrue(contextValue.startsWith("junitHandler:"))
97+
list.add(contextValue.substringAfter("junitHandler:").toInt())
98+
}
99+
assertEquals(5, list.size)
100+
for (i in list.indices) {
101+
assertEquals(i + 1, list[i])
102+
}
103+
}
104+
105+
@Test
106+
fun `verify subscription to failing flow`() = runBlocking {
107+
val request = ExecutionInput.newExecutionInput().query("subscription { alwaysThrows }").build()
108+
val response = testGraphQL.execute(request)
109+
val flow = response.getData<Flow<ExecutionResult>>()
110+
val errors = mutableListOf<GraphQLError>()
111+
val results = mutableListOf<Int>()
112+
flow.onEach {
113+
val dataMap = it.getData<Map<String, Int>>()
114+
if (dataMap != null) {
115+
results.add(dataMap.getValue("alwaysThrows"))
116+
}
117+
errors.addAll(it.errors)
118+
}.catch {
119+
errors.add(GraphqlErrorBuilder.newError().message(it.message).build())
120+
}.collect()
121+
assertEquals(2, results.size)
122+
for (i in results.indices) {
123+
assertEquals(i + 1, results[i])
124+
}
125+
assertEquals(1, errors.size)
126+
assertEquals("JUNIT subscription failure", errors[0].message)
127+
}
128+
129+
@Test
130+
fun `verify subscription to exploding flow`() = runBlocking {
131+
val request = ExecutionInput.newExecutionInput().query("subscription { throwsFast }").build()
132+
val response = testGraphQL.execute(request)
133+
val flow = response.getData<Flow<ExecutionResult>>()
134+
val errors = response.errors
135+
assertNull(flow)
136+
assertEquals(1, errors.size)
137+
assertEquals("JUNIT flow failure", errors[0].message.substringAfter(" : "))
138+
}
139+
140+
// GraphQL spec requires at least single query to be present as Query type is needed to run introspection queries
141+
// see: https://github.com/graphql/graphql-spec/issues/490 and https://github.com/graphql/graphql-spec/issues/568
142+
class BasicQuery {
143+
@Suppress("Detekt.FunctionOnlyReturningConstant")
144+
fun query(): String = "hello"
145+
}
146+
147+
class FlowSubscription {
148+
fun ticker(): Flow<Int> {
149+
return flow {
150+
for (i in 1..5) {
151+
delay(100)
152+
emit(i)
153+
}
154+
}
155+
}
156+
157+
fun publisherTicker(): Publisher<Int> {
158+
return flow {
159+
for (i in 1..5) {
160+
delay(100)
161+
emit(i)
162+
}
163+
}.asPublisher()
164+
}
165+
166+
fun throwsFast(): Flow<Int> {
167+
throw GraphQLKotlinException("JUNIT flow failure")
168+
}
169+
170+
fun alwaysThrows(): Flow<Int> {
171+
return flow {
172+
for (i in 1..5) {
173+
if (i > 2) {
174+
throw GraphQLKotlinException("JUNIT subscription failure")
175+
}
176+
delay(100)
177+
emit(i)
178+
}
179+
}
180+
}
181+
182+
fun contextualTicker(@GraphQLContext context: SubscriptionContext): Flow<String> {
183+
return flow {
184+
for (i in 1..5) {
185+
delay(100)
186+
emit("${context.value}:$i")
187+
}
188+
}
189+
}
190+
}
191+
192+
data class SubscriptionContext(val value: String)
193+
}

0 commit comments

Comments
 (0)