Skip to content

Commit 7b732f9

Browse files
Add native subscription support for coroutine Flows
Implement a SubscriptionExecutionStrategy that allows for `Flow`s and `Publisher`s to be returned from graphql schema elements, and can be processed as a `Flow` by subscription consumers. Relax restrictions that look for `Publisher`s to also allow `Flow`s. Fixes ExpediaGroup#358
1 parent acf8866 commit 7b732f9

File tree

6 files changed

+419
-0
lines changed

6 files changed

+419
-0
lines changed

graphql-kotlin-schema-generator/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ val classGraphVersion: String by project
44
val graphQLJavaVersion: String by project
55
val jacksonVersion: String by project
66
val kotlinVersion: String by project
7+
val kotlinCoroutinesVersion: String by project
78
val rxjavaVersion: String by project
89

910
dependencies {
1011
api("com.graphql-java:graphql-java:$graphQLJavaVersion")
12+
api("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion")
1113
api("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion")
1214
implementation(kotlin("reflect", kotlinVersion))
1315
implementation("io.github.classgraph:classgraph:$classGraphVersion")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2020 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.execution
18+
19+
import graphql.AssertException
20+
import graphql.ExecutionResult
21+
import graphql.ExecutionResultImpl
22+
import graphql.execution.DataFetcherExceptionHandler
23+
import graphql.execution.ExecutionContext
24+
import graphql.execution.ExecutionStrategy
25+
import graphql.execution.ExecutionStrategyParameters
26+
import graphql.execution.FetchedValue
27+
import graphql.execution.SimpleDataFetcherExceptionHandler
28+
import graphql.execution.SubscriptionExecutionStrategy
29+
import graphql.execution.reactive.CompletionStageMappingPublisher
30+
import kotlinx.coroutines.flow.Flow
31+
import kotlinx.coroutines.flow.map
32+
import kotlinx.coroutines.future.await
33+
import kotlinx.coroutines.reactive.asFlow
34+
import org.reactivestreams.Publisher
35+
import java.util.Collections
36+
import java.util.concurrent.CompletableFuture
37+
38+
/**
39+
* [SubscriptionExecutionStrategy] replacement that returns an [ExecutionResult]
40+
* that is a [Flow] instead of a [Publisher], and allows schema subscription functions
41+
* to return either a [Flow] or a [Publisher].
42+
*
43+
* Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy],
44+
* with [CompletionStageMappingPublisher] replaced by a [Flow] mapping, and [Flow] allowed
45+
* as an additional return type. Any [Publisher]s returned will be converted to [Flow]s,
46+
* which may lose meaningful context information, so users are encouraged to create and
47+
* consume [Flow]s directly (see https://github.com/Kotlin/kotlinx.coroutines/issues/1825
48+
* https://github.com/Kotlin/kotlinx.coroutines/issues/1860 for some examples of lost context)
49+
*/
50+
class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) {
51+
constructor() : this(SimpleDataFetcherExceptionHandler())
52+
53+
override fun execute(
54+
executionContext: ExecutionContext,
55+
parameters: ExecutionStrategyParameters
56+
): CompletableFuture<ExecutionResult> {
57+
58+
val sourceEventStream = createSourceEventStream(executionContext, parameters)
59+
60+
//
61+
// when the upstream source event stream completes, subscribe to it and wire in our adapter
62+
return sourceEventStream.thenApply { sourceFlow ->
63+
if (sourceFlow == null) {
64+
ExecutionResultImpl(null, executionContext.errors)
65+
} else {
66+
val returnFlow = sourceFlow.map {
67+
executeSubscriptionEvent(executionContext, parameters, it).await()
68+
}
69+
ExecutionResultImpl(returnFlow, executionContext.errors)
70+
}
71+
}
72+
}
73+
74+
/*
75+
https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
76+
77+
CreateSourceEventStream(subscription, schema, variableValues, initialValue):
78+
79+
Let {subscriptionType} be the root Subscription type in {schema}.
80+
Assert: {subscriptionType} is an Object type.
81+
Let {selectionSet} be the top level Selection Set in {subscription}.
82+
Let {rootField} be the first top level field in {selectionSet}.
83+
Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}.
84+
Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}.
85+
Return {fieldStream}.
86+
*/
87+
private fun createSourceEventStream(
88+
executionContext: ExecutionContext,
89+
parameters: ExecutionStrategyParameters
90+
): CompletableFuture<Flow<*>> {
91+
val newParameters = firstFieldOfSubscriptionSelection(parameters)
92+
93+
val fieldFetched = fetchField(executionContext, newParameters)
94+
return fieldFetched.thenApply { fetchedValue ->
95+
val flow = when (val publisherOrFlow = fetchedValue.fetchedValue) {
96+
null -> null
97+
is Publisher<*> -> publisherOrFlow.asFlow()
98+
is Flow<*> -> publisherOrFlow
99+
else -> throw AssertException(
100+
"You data fetcher must return a Flow of events when using graphql subscriptions"
101+
)
102+
}
103+
flow
104+
}
105+
}
106+
107+
/*
108+
ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue):
109+
110+
Let {subscriptionType} be the root Subscription type in {schema}.
111+
Assert: {subscriptionType} is an Object type.
112+
Let {selectionSet} be the top level Selection Set in {subscription}.
113+
Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization).
114+
Let {errors} be any field errors produced while executing the selection set.
115+
Return an unordered map containing {data} and {errors}.
116+
117+
Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced.
118+
*/
119+
120+
private fun executeSubscriptionEvent(
121+
executionContext: ExecutionContext,
122+
parameters: ExecutionStrategyParameters,
123+
eventPayload: Any?
124+
): CompletableFuture<ExecutionResult> {
125+
val newExecutionContext = executionContext.transform { builder -> builder.root(eventPayload) }
126+
127+
val newParameters = firstFieldOfSubscriptionSelection(parameters)
128+
val fetchedValue = FetchedValue.newFetchedValue().fetchedValue(eventPayload)
129+
.rawFetchedValue(eventPayload)
130+
.localContext(parameters.localContext)
131+
.build()
132+
return completeField(newExecutionContext, newParameters, fetchedValue).fieldValue
133+
.thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) }
134+
}
135+
136+
private fun wrapWithRootFieldName(
137+
parameters: ExecutionStrategyParameters,
138+
executionResult: ExecutionResult
139+
): ExecutionResult {
140+
val rootFieldName = getRootFieldName(parameters)
141+
return ExecutionResultImpl(
142+
Collections.singletonMap<String, Any>(rootFieldName, executionResult.getData<Any>()),
143+
executionResult.errors
144+
)
145+
}
146+
147+
private fun getRootFieldName(parameters: ExecutionStrategyParameters): String {
148+
val rootField = parameters.field.singleField
149+
return if (rootField.alias != null) rootField.alias else rootField.name
150+
}
151+
152+
private fun firstFieldOfSubscriptionSelection(
153+
parameters: ExecutionStrategyParameters
154+
): ExecutionStrategyParameters {
155+
val fields = parameters.fields
156+
val firstField = fields.getSubField(fields.keys[0])
157+
158+
val fieldPath = parameters.path.segment(ExecutionStrategy.mkNameForPath(firstField.singleField))
159+
return parameters.transform { builder -> builder.field(firstField).path(fieldPath) }
160+
}
161+
}

graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/types/utils/functionReturnTypes.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package com.expediagroup.graphql.generator.types.utils
1919
import com.expediagroup.graphql.generator.extensions.getTypeOfFirstArgument
2020
import com.expediagroup.graphql.generator.extensions.isSubclassOf
2121
import graphql.execution.DataFetcherResult
22+
import kotlinx.coroutines.flow.Flow
2223
import org.reactivestreams.Publisher
2324
import java.util.concurrent.CompletableFuture
2425
import kotlin.reflect.KType
@@ -41,6 +42,7 @@ import kotlin.reflect.KType
4142
internal fun getWrappedReturnType(returnType: KType): KType {
4243
return when {
4344
returnType.isSubclassOf(Publisher::class) -> returnType.getTypeOfFirstArgument()
45+
returnType.isSubclassOf(Flow::class) -> returnType.getTypeOfFirstArgument()
4446
returnType.isSubclassOf(DataFetcherResult::class) -> returnType.getTypeOfFirstArgument()
4547
returnType.isSubclassOf(CompletableFuture::class) -> {
4648
val wrappedType = returnType.getTypeOfFirstArgument()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.expediagroup.graphql.hooks
2+
3+
import com.expediagroup.graphql.generator.extensions.isSubclassOf
4+
import kotlinx.coroutines.flow.Flow
5+
import org.reactivestreams.Publisher
6+
import kotlin.reflect.KClass
7+
import kotlin.reflect.KFunction
8+
9+
/**
10+
* Subclassable [SchemaGeneratorHooks] implementation that supports
11+
* subscriptions that return either [Flow]s or [Publisher]s
12+
*/
13+
open class FlowSubscriptionSchemaGeneratorHooks : SchemaGeneratorHooks {
14+
/**
15+
* Allow for [Flow] subscription types
16+
*/
17+
override fun isValidSubscriptionReturnType(kClass: KClass<*>, function: KFunction<*>): Boolean {
18+
return function.returnType.isSubclassOf(Flow::class) || super.isValidSubscriptionReturnType(kClass, function)
19+
}
20+
}

0 commit comments

Comments
 (0)