diff --git a/graphql-kotlin-schema-generator/build.gradle.kts b/graphql-kotlin-schema-generator/build.gradle.kts index f8f46d1d11..5ffa6c1937 100644 --- a/graphql-kotlin-schema-generator/build.gradle.kts +++ b/graphql-kotlin-schema-generator/build.gradle.kts @@ -4,10 +4,12 @@ val classGraphVersion: String by project val graphQLJavaVersion: String by project val jacksonVersion: String by project val kotlinVersion: String by project +val kotlinCoroutinesVersion: String by project val rxjavaVersion: String by project dependencies { api("com.graphql-java:graphql-java:$graphQLJavaVersion") + api("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinCoroutinesVersion") api("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonVersion") implementation(kotlin("reflect", kotlinVersion)) implementation("io.github.classgraph:classgraph:$classGraphVersion") diff --git a/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategy.kt b/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategy.kt new file mode 100644 index 0000000000..68020db188 --- /dev/null +++ b/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategy.kt @@ -0,0 +1,157 @@ +/* + * Copyright 2020 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.execution + +import graphql.ExecutionResult +import graphql.ExecutionResultImpl +import graphql.execution.DataFetcherExceptionHandler +import graphql.execution.ExecutionContext +import graphql.execution.ExecutionStrategy +import graphql.execution.ExecutionStrategyParameters +import graphql.execution.FetchedValue +import graphql.execution.SimpleDataFetcherExceptionHandler +import graphql.execution.SubscriptionExecutionStrategy +import graphql.execution.reactive.CompletionStageMappingPublisher +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.future.await +import kotlinx.coroutines.reactive.asFlow +import org.reactivestreams.Publisher +import java.util.Collections +import java.util.concurrent.CompletableFuture + +/** + * [SubscriptionExecutionStrategy] replacement that returns an [ExecutionResult] + * that is a [Flow] instead of a [Publisher], and allows schema subscription functions + * to return either a [Flow] or a [Publisher]. + * + * Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy], + * with [CompletionStageMappingPublisher] replaced by a [Flow] mapping, and [Flow] allowed + * as an additional return type. Any [Publisher]s returned will be converted to [Flow]s, + * which may lose meaningful context information, so users are encouraged to create and + * consume [Flow]s directly (see https://github.com/Kotlin/kotlinx.coroutines/issues/1825 + * https://github.com/Kotlin/kotlinx.coroutines/issues/1860 for some examples of lost context) + */ +class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) { + constructor() : this(SimpleDataFetcherExceptionHandler()) + + override fun execute( + executionContext: ExecutionContext, + parameters: ExecutionStrategyParameters + ): CompletableFuture { + + val sourceEventStream = createSourceEventStream(executionContext, parameters) + + // + // when the upstream source event stream completes, subscribe to it and wire in our adapter + return sourceEventStream.thenApply { sourceFlow -> + if (sourceFlow == null) { + ExecutionResultImpl(null, executionContext.errors) + } else { + val returnFlow = sourceFlow.map { + executeSubscriptionEvent(executionContext, parameters, it).await() + } + ExecutionResultImpl(returnFlow, executionContext.errors) + } + } + } + + /* + https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md + + CreateSourceEventStream(subscription, schema, variableValues, initialValue): + + Let {subscriptionType} be the root Subscription type in {schema}. + Assert: {subscriptionType} is an Object type. + Let {selectionSet} be the top level Selection Set in {subscription}. + Let {rootField} be the first top level field in {selectionSet}. + Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}. + Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}. + Return {fieldStream}. + */ + private fun createSourceEventStream( + executionContext: ExecutionContext, + parameters: ExecutionStrategyParameters + ): CompletableFuture> { + val newParameters = firstFieldOfSubscriptionSelection(parameters) + + val fieldFetched = fetchField(executionContext, newParameters) + return fieldFetched.thenApply { fetchedValue -> + val flow = when (val publisherOrFlow = fetchedValue.fetchedValue) { + is Publisher<*> -> publisherOrFlow.asFlow() + is Flow<*> -> publisherOrFlow + else -> null + } + flow + } + } + + /* + ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): + + Let {subscriptionType} be the root Subscription type in {schema}. + Assert: {subscriptionType} is an Object type. + Let {selectionSet} be the top level Selection Set in {subscription}. + Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization). + Let {errors} be any field errors produced while executing the selection set. + Return an unordered map containing {data} and {errors}. + + Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced. + */ + + private fun executeSubscriptionEvent( + executionContext: ExecutionContext, + parameters: ExecutionStrategyParameters, + eventPayload: Any? + ): CompletableFuture { + val newExecutionContext = executionContext.transform { builder -> builder.root(eventPayload) } + + val newParameters = firstFieldOfSubscriptionSelection(parameters) + val fetchedValue = FetchedValue.newFetchedValue().fetchedValue(eventPayload) + .rawFetchedValue(eventPayload) + .localContext(parameters.localContext) + .build() + return completeField(newExecutionContext, newParameters, fetchedValue).fieldValue + .thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) } + } + + private fun wrapWithRootFieldName( + parameters: ExecutionStrategyParameters, + executionResult: ExecutionResult + ): ExecutionResult { + val rootFieldName = getRootFieldName(parameters) + return ExecutionResultImpl( + Collections.singletonMap(rootFieldName, executionResult.getData()), + executionResult.errors + ) + } + + private fun getRootFieldName(parameters: ExecutionStrategyParameters): String { + val rootField = parameters.field.singleField + return if (rootField.alias != null) rootField.alias else rootField.name + } + + private fun firstFieldOfSubscriptionSelection( + parameters: ExecutionStrategyParameters + ): ExecutionStrategyParameters { + val fields = parameters.fields + val firstField = fields.getSubField(fields.keys[0]) + + val fieldPath = parameters.path.segment(ExecutionStrategy.mkNameForPath(firstField.singleField)) + return parameters.transform { builder -> builder.field(firstField).path(fieldPath) } + } +} diff --git a/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/hooks/FlowSubscriptionSchemaGeneratorHooks.kt b/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/hooks/FlowSubscriptionSchemaGeneratorHooks.kt new file mode 100644 index 0000000000..dbfc2bfc32 --- /dev/null +++ b/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/hooks/FlowSubscriptionSchemaGeneratorHooks.kt @@ -0,0 +1,32 @@ +package com.expediagroup.graphql.hooks + +import com.expediagroup.graphql.generator.extensions.getTypeOfFirstArgument +import com.expediagroup.graphql.generator.extensions.isSubclassOf +import kotlinx.coroutines.flow.Flow +import org.reactivestreams.Publisher +import kotlin.reflect.KClass +import kotlin.reflect.KFunction +import kotlin.reflect.KType + +/** + * Subclassable [SchemaGeneratorHooks] implementation that supports + * subscriptions that return either [Flow]s or [Publisher]s + */ +open class FlowSubscriptionSchemaGeneratorHooks : SchemaGeneratorHooks { + /** + * Unwrap a [Flow] to its argument type + */ + override fun willResolveMonad(type: KType): KType { + return when { + type.isSubclassOf(Flow::class) -> type.getTypeOfFirstArgument() + else -> super.willResolveMonad(type) + } + } + + /** + * Allow for [Flow] subscription types + */ + override fun isValidSubscriptionReturnType(kClass: KClass<*>, function: KFunction<*>): Boolean { + return function.returnType.isSubclassOf(Flow::class) || super.isValidSubscriptionReturnType(kClass, function) + } +} diff --git a/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategyTest.kt b/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategyTest.kt new file mode 100644 index 0000000000..ee86415c77 --- /dev/null +++ b/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategyTest.kt @@ -0,0 +1,212 @@ +/* + * Copyright 2020 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.execution + +import com.expediagroup.graphql.SchemaGeneratorConfig +import com.expediagroup.graphql.TopLevelObject +import com.expediagroup.graphql.exceptions.GraphQLKotlinException +import com.expediagroup.graphql.hooks.FlowSubscriptionSchemaGeneratorHooks +import com.expediagroup.graphql.toSchema +import graphql.ExecutionInput +import graphql.ExecutionResult +import graphql.GraphQL +import graphql.GraphQLError +import graphql.GraphqlErrorBuilder +import graphql.schema.GraphQLSchema +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.reactive.asPublisher +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import org.reactivestreams.Publisher +import kotlin.test.assertEquals +import kotlin.test.assertNull +import kotlin.test.assertTrue + +@InternalCoroutinesApi +class FlowSubscriptionExecutionStrategyTest { + + private val testSchema: GraphQLSchema = toSchema( + config = SchemaGeneratorConfig( + supportedPackages = listOf("com.expediagroup.graphql.spring.execution"), + hooks = FlowSubscriptionSchemaGeneratorHooks() + ), + queries = listOf(TopLevelObject(BasicQuery())), + mutations = listOf(TopLevelObject(BasicQuery())), + subscriptions = listOf(TopLevelObject(FlowSubscription())) + ) + private val testGraphQL: GraphQL = GraphQL.newGraphQL(testSchema).subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy()).build() + + @Test + fun `verify subscription to flow`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { ticker }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + list.add(it.getData>().getValue("ticker")) + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + @Test + fun `verify subscription to publisher`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { publisherTicker }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + list.add(it.getData>().getValue("publisherTicker")) + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + @Test + fun `verify subscription to flow with context`() = runBlocking { + val request = ExecutionInput.newExecutionInput() + .query("subscription { contextualTicker }") + .context(SubscriptionContext("junitHandler")) + .build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + val contextValue = it.getData>().getValue("contextualTicker") + assertTrue(contextValue.startsWith("junitHandler:")) + list.add(contextValue.substringAfter("junitHandler:").toInt()) + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + @Test + fun `verify subscription to failing flow`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { alwaysThrows }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val errors = mutableListOf() + val results = mutableListOf() + flow.onEach { + val dataMap = it.getData>() + if (dataMap != null) { + results.add(dataMap.getValue("alwaysThrows")) + } + errors.addAll(it.errors) + }.catch { + errors.add(GraphqlErrorBuilder.newError().message(it.message).build()) + }.collect() + assertEquals(2, results.size) + for (i in results.indices) { + assertEquals(i + 1, results[i]) + } + assertEquals(1, errors.size) + assertEquals("JUNIT subscription failure", errors[0].message) + } + + @Test + fun `verify subscription to exploding flow`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { throwsFast }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val errors = response.errors + assertNull(flow) + assertEquals(1, errors.size) + assertEquals("JUNIT flow failure", errors[0].message.substringAfter(" : ")) + } + + @Test + fun `verify subscription alias`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { t: ticker }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + list.add(it.getData>().getValue("t")) + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + // GraphQL spec requires at least single query to be present as Query type is needed to run introspection queries + // see: https://github.com/graphql/graphql-spec/issues/490 and https://github.com/graphql/graphql-spec/issues/568 + class BasicQuery { + @Suppress("Detekt.FunctionOnlyReturningConstant") + fun query(): String = "hello" + } + + class FlowSubscription { + fun ticker(): Flow { + return flow { + for (i in 1..5) { + delay(100) + emit(i) + } + } + } + + fun publisherTicker(): Publisher { + return flow { + for (i in 1..5) { + delay(100) + emit(i) + } + }.asPublisher() + } + + fun throwsFast(): Flow { + throw GraphQLKotlinException("JUNIT flow failure") + } + + fun alwaysThrows(): Flow { + return flow { + for (i in 1..5) { + if (i > 2) { + throw GraphQLKotlinException("JUNIT subscription failure") + } + delay(100) + emit(i) + } + } + } + + fun contextualTicker(context: SubscriptionContext): Flow { + return flow { + for (i in 1..5) { + delay(100) + emit("${context.value}:$i") + } + } + } + } + + data class SubscriptionContext(val value: String) : GraphQLContext +} diff --git a/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/hooks/SchemaGeneratorHooksTest.kt b/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/hooks/SchemaGeneratorHooksTest.kt index c5226d896f..4dadab9c75 100644 --- a/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/hooks/SchemaGeneratorHooksTest.kt +++ b/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/hooks/SchemaGeneratorHooksTest.kt @@ -21,6 +21,7 @@ import com.expediagroup.graphql.annotations.GraphQLIgnore import com.expediagroup.graphql.exceptions.EmptyInputObjectTypeException import com.expediagroup.graphql.exceptions.EmptyInterfaceTypeException import com.expediagroup.graphql.exceptions.EmptyObjectTypeException +import com.expediagroup.graphql.extensions.deepName import com.expediagroup.graphql.generator.extensions.getSimpleName import com.expediagroup.graphql.getTestSchemaConfigWithHooks import com.expediagroup.graphql.test.utils.graphqlUUIDType @@ -32,8 +33,11 @@ import graphql.schema.GraphQLObjectType import graphql.schema.GraphQLScalarType import graphql.schema.GraphQLSchema import graphql.schema.GraphQLType +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.reactive.asPublisher import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows +import org.reactivestreams.Publisher import java.util.UUID import kotlin.random.Random import kotlin.reflect.KClass @@ -246,6 +250,22 @@ class SchemaGeneratorHooksTest { assertEquals("Hijacked Description", query.description) } + @Test + fun `noop subscription basics`() { + for (hooks in listOf(NoopSchemaGeneratorHooks, FlowSubscriptionSchemaGeneratorHooks())) { + val schema = toSchema( + queries = listOf(TopLevelObject(TestQuery())), + mutations = listOf(TopLevelObject(TestQuery())), + subscriptions = listOf(TopLevelObject(TestSubscription())), + config = getTestSchemaConfigWithHooks(hooks) + ) + assertTrue(hooks.isValidSubscriptionReturnType(Publisher::class, TestSubscription::subscription)) + val topLevelSub = schema.getObjectType("Subscription") + val sub = topLevelSub.getFieldDefinition("subscription") + assertEquals("SomeData!", sub.type.deepName) + } + } + @Test fun `willResolveMonad returns basic type`() { val hooks = NoopSchemaGeneratorHooks @@ -284,6 +304,10 @@ class SchemaGeneratorHooksTest { fun query(): SomeData = SomeData("someData", 0) } + class TestSubscription { + fun subscription(): Publisher = flowOf(SomeData("someData", 0)).asPublisher() + } + class CustomTypesQuery { fun uuid(): UUID = UUID.randomUUID() }