|
| 1 | +import Dispatch |
| 2 | +import Runtime |
| 3 | +import NIO |
| 4 | + |
| 5 | +/** |
| 6 | + * Implements the "Subscribe" algorithm described in the GraphQL specification. |
| 7 | + * |
| 8 | + * Returns a future which resolves to a SubscriptionResult containing either |
| 9 | + * a SubscriptionObservable (if successful), or GraphQLErrors (error). |
| 10 | + * |
| 11 | + * If the client-provided arguments to this function do not result in a |
| 12 | + * compliant subscription, the future will resolve to a |
| 13 | + * SubscriptionResult containing `errors` and no `observable`. |
| 14 | + * |
| 15 | + * If the source stream could not be created due to faulty subscription |
| 16 | + * resolver logic or underlying systems, the future will resolve to a |
| 17 | + * SubscriptionResult containing `errors` and no `observable`. |
| 18 | + * |
| 19 | + * If the operation succeeded, the future will resolve to a SubscriptionResult, |
| 20 | + * containing an `observable` which yields a stream of GraphQLResults |
| 21 | + * representing the response stream. |
| 22 | + * |
| 23 | + * Accepts either an object with named arguments, or individual arguments. |
| 24 | + */ |
| 25 | +func subscribe( |
| 26 | + queryStrategy: QueryFieldExecutionStrategy, |
| 27 | + mutationStrategy: MutationFieldExecutionStrategy, |
| 28 | + subscriptionStrategy: SubscriptionFieldExecutionStrategy, |
| 29 | + instrumentation: Instrumentation, |
| 30 | + schema: GraphQLSchema, |
| 31 | + documentAST: Document, |
| 32 | + rootValue: Any, |
| 33 | + context: Any, |
| 34 | + eventLoopGroup: EventLoopGroup, |
| 35 | + variableValues: [String: Map] = [:], |
| 36 | + operationName: String? = nil |
| 37 | +) -> EventLoopFuture<SubscriptionResult> { |
| 38 | + |
| 39 | + let sourceFuture = createSourceEventStream( |
| 40 | + queryStrategy: queryStrategy, |
| 41 | + mutationStrategy: mutationStrategy, |
| 42 | + subscriptionStrategy: subscriptionStrategy, |
| 43 | + instrumentation: instrumentation, |
| 44 | + schema: schema, |
| 45 | + documentAST: documentAST, |
| 46 | + rootValue: rootValue, |
| 47 | + context: context, |
| 48 | + eventLoopGroup: eventLoopGroup, |
| 49 | + variableValues: variableValues, |
| 50 | + operationName: operationName |
| 51 | + ) |
| 52 | + |
| 53 | + return sourceFuture.map{ sourceResult -> SubscriptionResult in |
| 54 | + if let sourceStream = sourceResult.stream { |
| 55 | + let subscriptionStream = sourceStream.map { eventPayload -> Future<GraphQLResult> in |
| 56 | + |
| 57 | + // For each payload yielded from a subscription, map it over the normal |
| 58 | + // GraphQL `execute` function, with `payload` as the rootValue. |
| 59 | + // This implements the "MapSourceToResponseEvent" algorithm described in |
| 60 | + // the GraphQL specification. The `execute` function provides the |
| 61 | + // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the |
| 62 | + // "ExecuteQuery" algorithm, for which `execute` is also used. |
| 63 | + return execute( |
| 64 | + queryStrategy: queryStrategy, |
| 65 | + mutationStrategy: mutationStrategy, |
| 66 | + subscriptionStrategy: subscriptionStrategy, |
| 67 | + instrumentation: instrumentation, |
| 68 | + schema: schema, |
| 69 | + documentAST: documentAST, |
| 70 | + rootValue: eventPayload, |
| 71 | + context: context, |
| 72 | + eventLoopGroup: eventLoopGroup, |
| 73 | + variableValues: variableValues, |
| 74 | + operationName: operationName |
| 75 | + ) |
| 76 | + } |
| 77 | + return SubscriptionResult(stream: subscriptionStream, errors: sourceResult.errors) |
| 78 | + } else { |
| 79 | + return SubscriptionResult(errors: sourceResult.errors) |
| 80 | + } |
| 81 | + } |
| 82 | +} |
| 83 | + |
| 84 | +/** |
| 85 | + * Implements the "CreateSourceEventStream" algorithm described in the |
| 86 | + * GraphQL specification, resolving the subscription source event stream. |
| 87 | + * |
| 88 | + * Returns a Future which resolves to a SourceEventStreamResult, containing |
| 89 | + * either an Observable (if successful) or GraphQLErrors (error). |
| 90 | + * |
| 91 | + * If the client-provided arguments to this function do not result in a |
| 92 | + * compliant subscription, the future will resolve to a |
| 93 | + * SourceEventStreamResult containing `errors` and no `observable`. |
| 94 | + * |
| 95 | + * If the source stream could not be created due to faulty subscription |
| 96 | + * resolver logic or underlying systems, the future will resolve to a |
| 97 | + * SourceEventStreamResult containing `errors` and no `observable`. |
| 98 | + * |
| 99 | + * If the operation succeeded, the future will resolve to a SubscriptionResult, |
| 100 | + * containing an `observable` which yields a stream of event objects |
| 101 | + * returned by the subscription resolver. |
| 102 | + * |
| 103 | + * A Source Event Stream represents a sequence of events, each of which triggers |
| 104 | + * a GraphQL execution for that event. |
| 105 | + * |
| 106 | + * This may be useful when hosting the stateful subscription service in a |
| 107 | + * different process or machine than the stateless GraphQL execution engine, |
| 108 | + * or otherwise separating these two steps. For more on this, see the |
| 109 | + * "Supporting Subscriptions at Scale" information in the GraphQL specification. |
| 110 | + */ |
| 111 | +func createSourceEventStream( |
| 112 | + queryStrategy: QueryFieldExecutionStrategy, |
| 113 | + mutationStrategy: MutationFieldExecutionStrategy, |
| 114 | + subscriptionStrategy: SubscriptionFieldExecutionStrategy, |
| 115 | + instrumentation: Instrumentation, |
| 116 | + schema: GraphQLSchema, |
| 117 | + documentAST: Document, |
| 118 | + rootValue: Any, |
| 119 | + context: Any, |
| 120 | + eventLoopGroup: EventLoopGroup, |
| 121 | + variableValues: [String: Map] = [:], |
| 122 | + operationName: String? = nil |
| 123 | +) -> EventLoopFuture<SourceEventStreamResult> { |
| 124 | + |
| 125 | + let executeStarted = instrumentation.now |
| 126 | + |
| 127 | + do { |
| 128 | + // If a valid context cannot be created due to incorrect arguments, |
| 129 | + // this will throw an error. |
| 130 | + let exeContext = try buildExecutionContext( |
| 131 | + queryStrategy: queryStrategy, |
| 132 | + mutationStrategy: mutationStrategy, |
| 133 | + subscriptionStrategy: subscriptionStrategy, |
| 134 | + instrumentation: instrumentation, |
| 135 | + schema: schema, |
| 136 | + documentAST: documentAST, |
| 137 | + rootValue: rootValue, |
| 138 | + context: context, |
| 139 | + eventLoopGroup: eventLoopGroup, |
| 140 | + rawVariableValues: variableValues, |
| 141 | + operationName: operationName |
| 142 | + ) |
| 143 | + return try executeSubscription(context: exeContext, eventLoopGroup: eventLoopGroup) |
| 144 | + } catch let error as GraphQLError { |
| 145 | + instrumentation.operationExecution( |
| 146 | + processId: processId(), |
| 147 | + threadId: threadId(), |
| 148 | + started: executeStarted, |
| 149 | + finished: instrumentation.now, |
| 150 | + schema: schema, |
| 151 | + document: documentAST, |
| 152 | + rootValue: rootValue, |
| 153 | + eventLoopGroup: eventLoopGroup, |
| 154 | + variableValues: variableValues, |
| 155 | + operation: nil, |
| 156 | + errors: [error], |
| 157 | + result: nil |
| 158 | + ) |
| 159 | + |
| 160 | + return eventLoopGroup.next().makeSucceededFuture(SourceEventStreamResult(errors: [error])) |
| 161 | + } catch { |
| 162 | + return eventLoopGroup.next().makeSucceededFuture(SourceEventStreamResult(errors: [GraphQLError(error)])) |
| 163 | + } |
| 164 | +} |
| 165 | + |
| 166 | +func executeSubscription( |
| 167 | + context: ExecutionContext, |
| 168 | + eventLoopGroup: EventLoopGroup |
| 169 | +) throws -> EventLoopFuture<SourceEventStreamResult> { |
| 170 | + |
| 171 | + // Get the first node |
| 172 | + let type = try getOperationRootType(schema: context.schema, operation: context.operation) |
| 173 | + var inputFields: [String:[Field]] = [:] |
| 174 | + var visitedFragmentNames: [String:Bool] = [:] |
| 175 | + let fields = try collectFields( |
| 176 | + exeContext: context, |
| 177 | + runtimeType: type, |
| 178 | + selectionSet: context.operation.selectionSet, |
| 179 | + fields: &inputFields, |
| 180 | + visitedFragmentNames: &visitedFragmentNames |
| 181 | + ) |
| 182 | + // If query is valid, fields is guaranteed to have at least 1 member |
| 183 | + let responseName = fields.keys.first! |
| 184 | + let fieldNodes = fields[responseName]! |
| 185 | + let fieldNode = fieldNodes.first! |
| 186 | + |
| 187 | + guard let fieldDef = getFieldDef(schema: context.schema, parentType: type, fieldAST: fieldNode) else { |
| 188 | + throw GraphQLError( |
| 189 | + message: "The subscription field '\(fieldNode.name.value)' is not defined.", |
| 190 | + nodes: fieldNodes |
| 191 | + ) |
| 192 | + } |
| 193 | + |
| 194 | + // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification. |
| 195 | + // It differs from "ResolveFieldValue" due to providing a different `resolveFn`. |
| 196 | + |
| 197 | + // Build a map of arguments from the field.arguments AST, using the |
| 198 | + // variables scope to fulfill any variable references. |
| 199 | + let args = try getArgumentValues(argDefs: fieldDef.args, argASTs: fieldNode.arguments, variableValues: context.variableValues) |
| 200 | + |
| 201 | + // The resolve function's optional third argument is a context value that |
| 202 | + // is provided to every resolve function within an execution. It is commonly |
| 203 | + // used to represent an authenticated user, or request-specific caches. |
| 204 | + let contextValue = context.context |
| 205 | + |
| 206 | + // The resolve function's optional fourth argument is a collection of |
| 207 | + // information about the current execution state. |
| 208 | + let path = IndexPath.init().appending(fieldNode.name.value) |
| 209 | + let info = GraphQLResolveInfo.init( |
| 210 | + fieldName: fieldDef.name, |
| 211 | + fieldASTs: fieldNodes, |
| 212 | + returnType: fieldDef.type, |
| 213 | + parentType: type, |
| 214 | + path: path, |
| 215 | + schema: context.schema, |
| 216 | + fragments: context.fragments, |
| 217 | + rootValue: context.rootValue, |
| 218 | + operation: context.operation, |
| 219 | + variableValues: context.variableValues |
| 220 | + ) |
| 221 | + |
| 222 | + // Call the `subscribe()` resolver or the default resolver to produce an |
| 223 | + // Observable yielding raw payloads. |
| 224 | + let resolve = fieldDef.subscribe ?? defaultResolve |
| 225 | + |
| 226 | + // Get the resolve func, regardless of if its result is normal |
| 227 | + // or abrupt (error). |
| 228 | + let resolvedFutureOrError = resolveOrError( |
| 229 | + resolve: resolve, |
| 230 | + source: context.rootValue, |
| 231 | + args: args, |
| 232 | + context: contextValue, |
| 233 | + eventLoopGroup: eventLoopGroup, |
| 234 | + info: info |
| 235 | + ) |
| 236 | + |
| 237 | + let resolvedFuture:Future<Any?> |
| 238 | + switch resolvedFutureOrError { |
| 239 | + case let .failure(error): |
| 240 | + if let graphQLError = error as? GraphQLError { |
| 241 | + throw graphQLError |
| 242 | + } else { |
| 243 | + throw GraphQLError(error) |
| 244 | + } |
| 245 | + case let .success(success): |
| 246 | + resolvedFuture = success |
| 247 | + } |
| 248 | + return resolvedFuture.map { resolved -> SourceEventStreamResult in |
| 249 | + if !context.errors.isEmpty { |
| 250 | + return SourceEventStreamResult(errors: context.errors) |
| 251 | + } else if let error = resolved as? GraphQLError { |
| 252 | + return SourceEventStreamResult(errors: [error]) |
| 253 | + } else if let stream = resolved as? EventStream<Any> { |
| 254 | + return SourceEventStreamResult(stream: stream) |
| 255 | + } else if resolved == nil { |
| 256 | + return SourceEventStreamResult(errors: [ |
| 257 | + GraphQLError(message: "Resolved subscription was nil") |
| 258 | + ]) |
| 259 | + } else { |
| 260 | + let resolvedObj = resolved as AnyObject |
| 261 | + return SourceEventStreamResult(errors: [ |
| 262 | + GraphQLError( |
| 263 | + message: "Subscription field resolver must return EventStream<Any>. Received: '\(resolvedObj)'" |
| 264 | + ) |
| 265 | + ]) |
| 266 | + } |
| 267 | + } |
| 268 | +} |
| 269 | + |
| 270 | +// Subscription resolvers MUST return observables that are declared as 'Any' due to Swift not having covariant generic support for type |
| 271 | +// checking. Normal resolvers for subscription fields should handle type casting, same as resolvers for query fields. |
| 272 | +struct SourceEventStreamResult { |
| 273 | + public let stream: EventStream<Any>? |
| 274 | + public let errors: [GraphQLError] |
| 275 | + |
| 276 | + public init(stream: EventStream<Any>? = nil, errors: [GraphQLError] = []) { |
| 277 | + self.stream = stream |
| 278 | + self.errors = errors |
| 279 | + } |
| 280 | +} |
0 commit comments