Skip to content

Commit 2806cd8

Browse files
Add native subscription support for coroutine Flows (#629)
Implement a SubscriptionExecutionStrategy that allows for `Flow`s and `Publisher`s to be returned from graphql schema elements, and can be processed as a `Flow` by subscription consumers. Relax restrictions that look for `Publisher`s to also allow `Flow`s. Fixes #358
1 parent acf8866 commit 2806cd8

File tree

5 files changed

+427
-0
lines changed

5 files changed

+427
-0
lines changed

graphql-kotlin-schema-generator/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ val classGraphVersion: String by project
44
val graphQLJavaVersion: String by project
55
val jacksonVersion: String by project
66
val kotlinVersion: String by project
7+
val kotlinCoroutinesVersion: String by project
78
val rxjavaVersion: String by project
89

910
dependencies {
1011
api("com.graphql-java:graphql-java:$graphQLJavaVersion")
12+
api("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion")
1113
api("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion")
1214
implementation(kotlin("reflect", kotlinVersion))
1315
implementation("io.github.classgraph:classgraph:$classGraphVersion")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2020 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.execution
18+
19+
import graphql.ExecutionResult
20+
import graphql.ExecutionResultImpl
21+
import graphql.execution.DataFetcherExceptionHandler
22+
import graphql.execution.ExecutionContext
23+
import graphql.execution.ExecutionStrategy
24+
import graphql.execution.ExecutionStrategyParameters
25+
import graphql.execution.FetchedValue
26+
import graphql.execution.SimpleDataFetcherExceptionHandler
27+
import graphql.execution.SubscriptionExecutionStrategy
28+
import graphql.execution.reactive.CompletionStageMappingPublisher
29+
import kotlinx.coroutines.flow.Flow
30+
import kotlinx.coroutines.flow.map
31+
import kotlinx.coroutines.future.await
32+
import kotlinx.coroutines.reactive.asFlow
33+
import org.reactivestreams.Publisher
34+
import java.util.Collections
35+
import java.util.concurrent.CompletableFuture
36+
37+
/**
38+
* [SubscriptionExecutionStrategy] replacement that returns an [ExecutionResult]
39+
* that is a [Flow] instead of a [Publisher], and allows schema subscription functions
40+
* to return either a [Flow] or a [Publisher].
41+
*
42+
* Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy],
43+
* with [CompletionStageMappingPublisher] replaced by a [Flow] mapping, and [Flow] allowed
44+
* as an additional return type. Any [Publisher]s returned will be converted to [Flow]s,
45+
* which may lose meaningful context information, so users are encouraged to create and
46+
* consume [Flow]s directly (see https://github.com/Kotlin/kotlinx.coroutines/issues/1825
47+
* https://github.com/Kotlin/kotlinx.coroutines/issues/1860 for some examples of lost context)
48+
*/
49+
class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) {
50+
constructor() : this(SimpleDataFetcherExceptionHandler())
51+
52+
override fun execute(
53+
executionContext: ExecutionContext,
54+
parameters: ExecutionStrategyParameters
55+
): CompletableFuture<ExecutionResult> {
56+
57+
val sourceEventStream = createSourceEventStream(executionContext, parameters)
58+
59+
//
60+
// when the upstream source event stream completes, subscribe to it and wire in our adapter
61+
return sourceEventStream.thenApply { sourceFlow ->
62+
if (sourceFlow == null) {
63+
ExecutionResultImpl(null, executionContext.errors)
64+
} else {
65+
val returnFlow = sourceFlow.map {
66+
executeSubscriptionEvent(executionContext, parameters, it).await()
67+
}
68+
ExecutionResultImpl(returnFlow, executionContext.errors)
69+
}
70+
}
71+
}
72+
73+
/*
74+
https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
75+
76+
CreateSourceEventStream(subscription, schema, variableValues, initialValue):
77+
78+
Let {subscriptionType} be the root Subscription type in {schema}.
79+
Assert: {subscriptionType} is an Object type.
80+
Let {selectionSet} be the top level Selection Set in {subscription}.
81+
Let {rootField} be the first top level field in {selectionSet}.
82+
Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}.
83+
Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}.
84+
Return {fieldStream}.
85+
*/
86+
private fun createSourceEventStream(
87+
executionContext: ExecutionContext,
88+
parameters: ExecutionStrategyParameters
89+
): CompletableFuture<Flow<*>> {
90+
val newParameters = firstFieldOfSubscriptionSelection(parameters)
91+
92+
val fieldFetched = fetchField(executionContext, newParameters)
93+
return fieldFetched.thenApply { fetchedValue ->
94+
val flow = when (val publisherOrFlow = fetchedValue.fetchedValue) {
95+
is Publisher<*> -> publisherOrFlow.asFlow()
96+
is Flow<*> -> publisherOrFlow
97+
else -> null
98+
}
99+
flow
100+
}
101+
}
102+
103+
/*
104+
ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue):
105+
106+
Let {subscriptionType} be the root Subscription type in {schema}.
107+
Assert: {subscriptionType} is an Object type.
108+
Let {selectionSet} be the top level Selection Set in {subscription}.
109+
Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization).
110+
Let {errors} be any field errors produced while executing the selection set.
111+
Return an unordered map containing {data} and {errors}.
112+
113+
Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced.
114+
*/
115+
116+
private fun executeSubscriptionEvent(
117+
executionContext: ExecutionContext,
118+
parameters: ExecutionStrategyParameters,
119+
eventPayload: Any?
120+
): CompletableFuture<ExecutionResult> {
121+
val newExecutionContext = executionContext.transform { builder -> builder.root(eventPayload) }
122+
123+
val newParameters = firstFieldOfSubscriptionSelection(parameters)
124+
val fetchedValue = FetchedValue.newFetchedValue().fetchedValue(eventPayload)
125+
.rawFetchedValue(eventPayload)
126+
.localContext(parameters.localContext)
127+
.build()
128+
return completeField(newExecutionContext, newParameters, fetchedValue).fieldValue
129+
.thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) }
130+
}
131+
132+
private fun wrapWithRootFieldName(
133+
parameters: ExecutionStrategyParameters,
134+
executionResult: ExecutionResult
135+
): ExecutionResult {
136+
val rootFieldName = getRootFieldName(parameters)
137+
return ExecutionResultImpl(
138+
Collections.singletonMap<String, Any>(rootFieldName, executionResult.getData<Any>()),
139+
executionResult.errors
140+
)
141+
}
142+
143+
private fun getRootFieldName(parameters: ExecutionStrategyParameters): String {
144+
val rootField = parameters.field.singleField
145+
return if (rootField.alias != null) rootField.alias else rootField.name
146+
}
147+
148+
private fun firstFieldOfSubscriptionSelection(
149+
parameters: ExecutionStrategyParameters
150+
): ExecutionStrategyParameters {
151+
val fields = parameters.fields
152+
val firstField = fields.getSubField(fields.keys[0])
153+
154+
val fieldPath = parameters.path.segment(ExecutionStrategy.mkNameForPath(firstField.singleField))
155+
return parameters.transform { builder -> builder.field(firstField).path(fieldPath) }
156+
}
157+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.expediagroup.graphql.hooks
2+
3+
import com.expediagroup.graphql.generator.extensions.getTypeOfFirstArgument
4+
import com.expediagroup.graphql.generator.extensions.isSubclassOf
5+
import kotlinx.coroutines.flow.Flow
6+
import org.reactivestreams.Publisher
7+
import kotlin.reflect.KClass
8+
import kotlin.reflect.KFunction
9+
import kotlin.reflect.KType
10+
11+
/**
12+
* Subclassable [SchemaGeneratorHooks] implementation that supports
13+
* subscriptions that return either [Flow]s or [Publisher]s
14+
*/
15+
open class FlowSubscriptionSchemaGeneratorHooks : SchemaGeneratorHooks {
16+
/**
17+
* Unwrap a [Flow] to its argument type
18+
*/
19+
override fun willResolveMonad(type: KType): KType {
20+
return when {
21+
type.isSubclassOf(Flow::class) -> type.getTypeOfFirstArgument()
22+
else -> super.willResolveMonad(type)
23+
}
24+
}
25+
26+
/**
27+
* Allow for [Flow] subscription types
28+
*/
29+
override fun isValidSubscriptionReturnType(kClass: KClass<*>, function: KFunction<*>): Boolean {
30+
return function.returnType.isSubclassOf(Flow::class) || super.isValidSubscriptionReturnType(kClass, function)
31+
}
32+
}

0 commit comments

Comments
 (0)