Skip to content

Commit c2789df

Browse files
Abstract out FlowSubscriptionExecutionStrategy
Put common methods in a base class and add an implementation for a NativeFlowSubscriptionExecutionStrategy that returns Flows instead of Publishers.
1 parent 245b457 commit c2789df

File tree

4 files changed

+526
-151
lines changed

4 files changed

+526
-151
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2020 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.generator.execution
18+
19+
import graphql.ExecutionResult
20+
import graphql.ExecutionResultImpl
21+
import graphql.execution.DataFetcherExceptionHandler
22+
import graphql.execution.ExecutionContext
23+
import graphql.execution.ExecutionStepInfo
24+
import graphql.execution.ExecutionStrategy
25+
import graphql.execution.ExecutionStrategyParameters
26+
import graphql.execution.SimpleDataFetcherExceptionHandler
27+
import graphql.execution.SubscriptionExecutionStrategy
28+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
29+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
30+
import graphql.execution.instrumentation.parameters.InstrumentationFieldParameters
31+
import graphql.schema.GraphQLObjectType
32+
import kotlinx.coroutines.flow.Flow
33+
import org.reactivestreams.Publisher
34+
import java.util.Collections
35+
import java.util.concurrent.CompletableFuture
36+
37+
/**
38+
* Abstract [SubscriptionExecutionStrategy] replacement that and allows schema subscription
39+
* functions to return either a [Flow] or a [Publisher].
40+
*
41+
* Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy],
42+
* with updated [createSourceEventStream] that supports [Flow] and [Publisher]. Any returned
43+
* [Flow]s/[Publisher]s will be automatically converted to corresponding [T] by the
44+
* implementing subclass.
45+
*/
46+
abstract class BaseFlowSubscriptionExecutionStrategy<T>(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) {
47+
constructor() : this(SimpleDataFetcherExceptionHandler())
48+
49+
/**
50+
* Convert a schema returned [Flow] or [Publisher] to the supported [T] variant for this implementation.
51+
*/
52+
abstract fun convertToSupportedFlow(publisherOrFlow: Any?): T?
53+
54+
/**
55+
* Returns a function that takes a [Flow] or [Publisher], subscribes to it, and returns a
56+
* corresponding [ExecutionResult]
57+
*/
58+
abstract fun getSubscriberAdapter(executionContext: ExecutionContext, parameters: ExecutionStrategyParameters): (T?) -> ExecutionResult
59+
60+
override fun execute(
61+
executionContext: ExecutionContext,
62+
parameters: ExecutionStrategyParameters
63+
): CompletableFuture<ExecutionResult> {
64+
65+
val instrumentation = executionContext.instrumentation
66+
val instrumentationParameters = InstrumentationExecutionStrategyParameters(executionContext, parameters)
67+
val executionStrategyCtx = instrumentation.beginExecutionStrategy(instrumentationParameters)
68+
69+
val sourceEventStream = createSourceEventStream(executionContext, parameters)
70+
71+
//
72+
// when the upstream source event stream completes, subscribe to it and wire in our adapter
73+
val overallResult: CompletableFuture<ExecutionResult> = sourceEventStream.thenApply(getSubscriberAdapter(executionContext, parameters))
74+
75+
// dispatched the subscription query
76+
executionStrategyCtx.onDispatched(overallResult)
77+
overallResult.whenComplete(executionStrategyCtx::onCompleted)
78+
79+
return overallResult
80+
}
81+
/*
82+
https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
83+
84+
CreateSourceEventStream(subscription, schema, variableValues, initialValue):
85+
86+
Let {subscriptionType} be the root Subscription type in {schema}.
87+
Assert: {subscriptionType} is an Object type.
88+
Let {selectionSet} be the top level Selection Set in {subscription}.
89+
Let {rootField} be the first top level field in {selectionSet}.
90+
Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}.
91+
Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}.
92+
Return {fieldStream}.
93+
*/
94+
private fun createSourceEventStream(
95+
executionContext: ExecutionContext,
96+
parameters: ExecutionStrategyParameters
97+
): CompletableFuture<T?> {
98+
val newParameters = firstFieldOfSubscriptionSelection(parameters)
99+
100+
val fieldFetched = fetchField(executionContext, newParameters)
101+
return fieldFetched.thenApply { fetchedValue ->
102+
convertToSupportedFlow(fetchedValue.fetchedValue)
103+
}
104+
}
105+
106+
/*
107+
ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue):
108+
109+
Let {subscriptionType} be the root Subscription type in {schema}.
110+
Assert: {subscriptionType} is an Object type.
111+
Let {selectionSet} be the top level Selection Set in {subscription}.
112+
Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization).
113+
Let {errors} be any field errors produced while executing the selection set.
114+
Return an unordered map containing {data} and {errors}.
115+
116+
Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced.
117+
*/
118+
protected fun executeSubscriptionEvent(
119+
executionContext: ExecutionContext,
120+
parameters: ExecutionStrategyParameters,
121+
eventPayload: Any?
122+
): CompletableFuture<ExecutionResult> {
123+
val instrumentation = executionContext.instrumentation
124+
125+
val newExecutionContext = executionContext.transform { builder ->
126+
builder
127+
.root(eventPayload)
128+
.resetErrors()
129+
}
130+
val newParameters = firstFieldOfSubscriptionSelection(parameters)
131+
val subscribedFieldStepInfo = createSubscribedFieldStepInfo(executionContext, newParameters)
132+
133+
val i13nFieldParameters = InstrumentationFieldParameters(executionContext) { subscribedFieldStepInfo }
134+
val subscribedFieldCtx = instrumentation.beginSubscribedFieldEvent(i13nFieldParameters)
135+
136+
val fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload)
137+
138+
val fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue)
139+
val overallResult = fieldValueInfo
140+
.fieldValue
141+
.thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) }
142+
143+
// dispatch instrumentation so they can know about each subscription event
144+
subscribedFieldCtx.onDispatched(overallResult)
145+
overallResult.whenComplete(subscribedFieldCtx::onCompleted)
146+
147+
// allow them to instrument each ER should they want to
148+
val i13ExecutionParameters = InstrumentationExecutionParameters(
149+
executionContext.executionInput, executionContext.graphQLSchema, executionContext.instrumentationState
150+
)
151+
152+
return overallResult.thenCompose { executionResult ->
153+
instrumentation.instrumentExecutionResult(executionResult, i13ExecutionParameters)
154+
}
155+
}
156+
157+
private fun wrapWithRootFieldName(
158+
parameters: ExecutionStrategyParameters,
159+
executionResult: ExecutionResult
160+
): ExecutionResult {
161+
val rootFieldName = getRootFieldName(parameters)
162+
return ExecutionResultImpl(
163+
Collections.singletonMap<String, Any>(rootFieldName, executionResult.getData<Any>()),
164+
executionResult.errors
165+
)
166+
}
167+
168+
private fun getRootFieldName(parameters: ExecutionStrategyParameters): String {
169+
val rootField = parameters.field.singleField
170+
return if (rootField.alias != null) rootField.alias else rootField.name
171+
}
172+
173+
private fun firstFieldOfSubscriptionSelection(
174+
parameters: ExecutionStrategyParameters
175+
): ExecutionStrategyParameters {
176+
val fields = parameters.fields
177+
val firstField = fields.getSubField(fields.keys[0])
178+
179+
val fieldPath = parameters.path.segment(mkNameForPath(firstField.singleField))
180+
return parameters.transform { builder -> builder.field(firstField).path(fieldPath) }
181+
}
182+
183+
private fun createSubscribedFieldStepInfo(
184+
executionContext: ExecutionContext,
185+
parameters: ExecutionStrategyParameters
186+
): ExecutionStepInfo {
187+
val field = parameters.field.singleField
188+
val parentType = parameters.executionStepInfo.unwrappedNonNullType as GraphQLObjectType
189+
val fieldDef = getFieldDef(executionContext.graphQLSchema, parentType, field)
190+
return createExecutionStepInfo(executionContext, parameters, fieldDef, parentType)
191+
}
192+
}

generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/FlowSubscriptionExecutionStrategy.kt

Lines changed: 12 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -20,49 +20,34 @@ import graphql.ExecutionResult
2020
import graphql.ExecutionResultImpl
2121
import graphql.execution.DataFetcherExceptionHandler
2222
import graphql.execution.ExecutionContext
23-
import graphql.execution.ExecutionStepInfo
24-
import graphql.execution.ExecutionStrategy
2523
import graphql.execution.ExecutionStrategyParameters
2624
import graphql.execution.SimpleDataFetcherExceptionHandler
2725
import graphql.execution.SubscriptionExecutionStrategy
28-
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
29-
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
30-
import graphql.execution.instrumentation.parameters.InstrumentationFieldParameters
3126
import graphql.execution.reactive.CompletionStageMappingPublisher
32-
import graphql.schema.GraphQLObjectType
3327
import kotlinx.coroutines.flow.Flow
3428
import kotlinx.coroutines.reactive.asPublisher
3529
import org.reactivestreams.Publisher
36-
import java.util.Collections
37-
import java.util.concurrent.CompletableFuture
3830
import java.util.concurrent.CompletionStage
3931
import java.util.function.Function
4032

4133
/**
4234
* [SubscriptionExecutionStrategy] replacement that and allows schema subscription functions
43-
* to return either a [Flow] or a [Publisher].
44-
*
45-
* Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy],
46-
* with updated [createSourceEventStream] that supports [Flow] and [Publisher]. Any returned
47-
* [Flow]s will be automatically converted to corresponding [Publisher].
35+
* to return either a [Flow] or a [Publisher], and converts [Flow]s to [Publisher]s.
4836
*/
49-
class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) {
37+
class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : BaseFlowSubscriptionExecutionStrategy<Publisher<out Any>>(dfe) {
5038
constructor() : this(SimpleDataFetcherExceptionHandler())
5139

52-
override fun execute(
53-
executionContext: ExecutionContext,
54-
parameters: ExecutionStrategyParameters
55-
): CompletableFuture<ExecutionResult> {
56-
57-
val instrumentation = executionContext.instrumentation
58-
val instrumentationParameters = InstrumentationExecutionStrategyParameters(executionContext, parameters)
59-
val executionStrategyCtx = instrumentation.beginExecutionStrategy(instrumentationParameters)
60-
61-
val sourceEventStream = createSourceEventStream(executionContext, parameters)
40+
override fun convertToSupportedFlow(publisherOrFlow: Any?): Publisher<out Any>? {
41+
return when (publisherOrFlow) {
42+
is Publisher<*> -> publisherOrFlow
43+
// below explicit cast is required due to the type erasure and Kotlin declaration-site variance vs Java use-site variance
44+
is Flow<*> -> (publisherOrFlow as? Flow<Any>)?.asPublisher()
45+
else -> null
46+
}
47+
}
6248

63-
//
64-
// when the upstream source event stream completes, subscribe to it and wire in our adapter
65-
val overallResult: CompletableFuture<ExecutionResult> = sourceEventStream.thenApply { publisher ->
49+
override fun getSubscriberAdapter(executionContext: ExecutionContext, parameters: ExecutionStrategyParameters): (Publisher<out Any>?) -> ExecutionResult {
50+
return { publisher ->
6651
if (publisher == null) {
6752
ExecutionResultImpl(null, executionContext.errors)
6853
} else {
@@ -78,129 +63,5 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec
7863
ExecutionResultImpl(mapSourceToResponse, executionContext.errors)
7964
}
8065
}
81-
82-
// dispatched the subscription query
83-
executionStrategyCtx.onDispatched(overallResult)
84-
overallResult.whenComplete(executionStrategyCtx::onCompleted)
85-
86-
return overallResult
87-
}
88-
89-
/*
90-
https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md
91-
92-
CreateSourceEventStream(subscription, schema, variableValues, initialValue):
93-
94-
Let {subscriptionType} be the root Subscription type in {schema}.
95-
Assert: {subscriptionType} is an Object type.
96-
Let {selectionSet} be the top level Selection Set in {subscription}.
97-
Let {rootField} be the first top level field in {selectionSet}.
98-
Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}.
99-
Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}.
100-
Return {fieldStream}.
101-
*/
102-
private fun createSourceEventStream(
103-
executionContext: ExecutionContext,
104-
parameters: ExecutionStrategyParameters
105-
): CompletableFuture<Publisher<out Any>?> {
106-
val newParameters = firstFieldOfSubscriptionSelection(parameters)
107-
108-
val fieldFetched = fetchField(executionContext, newParameters)
109-
return fieldFetched.thenApply { fetchedValue ->
110-
val publisher = when (val publisherOrFlow: Any? = fetchedValue.fetchedValue) {
111-
is Publisher<*> -> publisherOrFlow
112-
// below explicit cast is required due to the type erasure and Kotlin declaration-site variance vs Java use-site variance
113-
is Flow<*> -> (publisherOrFlow as? Flow<Any>)?.asPublisher()
114-
else -> null
115-
}
116-
publisher
117-
}
118-
}
119-
120-
/*
121-
ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue):
122-
123-
Let {subscriptionType} be the root Subscription type in {schema}.
124-
Assert: {subscriptionType} is an Object type.
125-
Let {selectionSet} be the top level Selection Set in {subscription}.
126-
Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization).
127-
Let {errors} be any field errors produced while executing the selection set.
128-
Return an unordered map containing {data} and {errors}.
129-
130-
Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced.
131-
*/
132-
private fun executeSubscriptionEvent(
133-
executionContext: ExecutionContext,
134-
parameters: ExecutionStrategyParameters,
135-
eventPayload: Any?
136-
): CompletableFuture<ExecutionResult> {
137-
val instrumentation = executionContext.instrumentation
138-
139-
val newExecutionContext = executionContext.transform { builder ->
140-
builder
141-
.root(eventPayload)
142-
.resetErrors()
143-
}
144-
val newParameters = firstFieldOfSubscriptionSelection(parameters)
145-
val subscribedFieldStepInfo = createSubscribedFieldStepInfo(executionContext, newParameters)
146-
147-
val i13nFieldParameters = InstrumentationFieldParameters(executionContext) { subscribedFieldStepInfo }
148-
val subscribedFieldCtx = instrumentation.beginSubscribedFieldEvent(i13nFieldParameters)
149-
150-
val fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload)
151-
152-
val fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue)
153-
val overallResult = fieldValueInfo
154-
.fieldValue
155-
.thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) }
156-
157-
// dispatch instrumentation so they can know about each subscription event
158-
subscribedFieldCtx.onDispatched(overallResult)
159-
overallResult.whenComplete(subscribedFieldCtx::onCompleted)
160-
161-
// allow them to instrument each ER should they want to
162-
val i13ExecutionParameters = InstrumentationExecutionParameters(
163-
executionContext.executionInput, executionContext.graphQLSchema, executionContext.instrumentationState
164-
)
165-
166-
return overallResult.thenCompose { executionResult ->
167-
instrumentation.instrumentExecutionResult(executionResult, i13ExecutionParameters)
168-
}
169-
}
170-
171-
private fun wrapWithRootFieldName(
172-
parameters: ExecutionStrategyParameters,
173-
executionResult: ExecutionResult
174-
): ExecutionResult {
175-
val rootFieldName = getRootFieldName(parameters)
176-
return ExecutionResultImpl(
177-
Collections.singletonMap<String, Any>(rootFieldName, executionResult.getData<Any>()),
178-
executionResult.errors
179-
)
180-
}
181-
182-
private fun getRootFieldName(parameters: ExecutionStrategyParameters): String {
183-
val rootField = parameters.field.singleField
184-
return if (rootField.alias != null) rootField.alias else rootField.name
185-
}
186-
187-
private fun firstFieldOfSubscriptionSelection(
188-
parameters: ExecutionStrategyParameters
189-
): ExecutionStrategyParameters {
190-
val fields = parameters.fields
191-
val firstField = fields.getSubField(fields.keys[0])
192-
193-
val fieldPath = parameters.path.segment(mkNameForPath(firstField.singleField))
194-
return parameters.transform { builder -> builder.field(firstField).path(fieldPath) }
195-
}
196-
197-
private fun createSubscribedFieldStepInfo(
198-
executionContext: ExecutionContext,
199-
parameters: ExecutionStrategyParameters
200-
): ExecutionStepInfo {
201-
val field = parameters.field.singleField
202-
val parentType = parameters.executionStepInfo.unwrappedNonNullType as GraphQLObjectType
203-
val fieldDef = getFieldDef(executionContext.graphQLSchema, parentType, field)
204-
return createExecutionStepInfo(executionContext, parameters, fieldDef, parentType)
20566
}
20667
}

0 commit comments

Comments
 (0)