Skip to content

Subscription #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Apr 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8c7cb18
Adds RxSwift package
Feb 17, 2021
96edc49
Adds execution helper method from JS reference
Feb 17, 2021
377b321
Includes subscription into GraphQL definition file
Feb 17, 2021
7a4abcf
Subscription attempt with custom AsyncIterable
Feb 17, 2021
681e213
Adds subscription result and test stub
Feb 18, 2021
220713b
Implements some testing... getting closer
Feb 19, 2021
a52bed8
Fixes Observable<Any> type issues with testing
Feb 19, 2021
50dc3a0
Adds subscription resolver nil handling
Feb 19, 2021
ec6132f
Fixes test schema. Test subscriptions are working!
Feb 20, 2021
7f3f60a
refactors helpers, adds more tests
Feb 20, 2021
8e27509
bug fix to extract incorrect names
Feb 20, 2021
07d2409
Minor refactor of event resolver for clarity
Feb 22, 2021
82f3c21
Simplifies MapSourceToResponseEvent futures
Feb 22, 2021
e789328
Changes observer to resolve to a future
Feb 22, 2021
db299f7
Makes location equatable for better error checking
Feb 22, 2021
f6b5eae
Improves error message for incorrect arg types
Feb 22, 2021
f020990
Improves resolver return type issue message
Feb 22, 2021
aa9d7f8
Adds remaining graphql-js subscription tests
Feb 22, 2021
bff1618
Refactored to support multiple subscribers
Feb 22, 2021
8040a87
Removes unused parameters
Feb 22, 2021
f6b9207
Refactors tests to be simpler and more logical
Feb 22, 2021
7adab89
Cleans up definition file changes
Feb 23, 2021
aee8841
Adds handling for multiple errors
Feb 23, 2021
83632df
Adds justification for forced unwrapping
Feb 23, 2021
471791d
Updates documentation to reflect implementation
Feb 23, 2021
292091a
Removes unnecessary function
Feb 23, 2021
c6da6b8
Adds primary subscription entry point function
Feb 23, 2021
367e5a1
Removes thread deadlocking comment
Feb 23, 2021
baba1bc
Adds testing of subscription arguments
Feb 23, 2021
6728e31
Adds test for observable type checking
Feb 24, 2021
fbe96f6
Minor final cleanup
Mar 2, 2021
5f18287
Merge branch 'master' into subscription
paulofaria Mar 8, 2021
f1d9ece
Adds wrapper class for Observables, generalizes
Mar 10, 2021
33130f3
Adds convenience methods and passing tests
Mar 10, 2021
9da26da
All subscription tests passing
Mar 10, 2021
e4beac6
Modifies EventStream permissions for package overrides
Mar 10, 2021
bee0675
Moves all RxSwift dependency to GraphQLRxSwift
Mar 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let package = Package(
name: "GraphQL",
dependencies: [
.product(name: "NIO", package: "swift-nio"),
.product(name: "Runtime", package: "Runtime"),
.product(name: "Runtime", package: "Runtime")
]
),
.testTarget(name: "GraphQLTests", dependencies: ["GraphQL"]),
Expand Down
76 changes: 76 additions & 0 deletions Sources/GraphQL/GraphQL.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ public struct GraphQLResult : Equatable, Codable, CustomStringConvertible {
}
}

/// SubscriptionResult wraps the observable and error data returned by the subscribe request.
public struct SubscriptionResult {
public let stream: SubscriptionEventStream?
public let errors: [GraphQLError]

public init(stream: SubscriptionEventStream? = nil, errors: [GraphQLError] = []) {
self.stream = stream
self.errors = errors
}
}
/// SubscriptionObservable represents an event stream of fully resolved GraphQL subscription results. Subscribers can be added to this stream.
public typealias SubscriptionEventStream = EventStream<Future<GraphQLResult>>

/// This is the primary entry point function for fulfilling GraphQL operations
/// by parsing, validating, and executing a GraphQL document along side a
/// GraphQL schema.
Expand Down Expand Up @@ -151,3 +164,66 @@ public func graphql<Retrieval: PersistedQueryRetrieval>(
)
}
}

/// This is the primary entry point function for fulfilling GraphQL subscription
/// operations by parsing, validating, and executing a GraphQL subscription
/// document along side a GraphQL schema.
///
/// More sophisticated GraphQL servers, such as those which persist queries,
/// may wish to separate the validation and execution phases to a static time
/// tooling step, and a server runtime step.
///
/// - parameter queryStrategy: The field execution strategy to use for query requests
/// - parameter mutationStrategy: The field execution strategy to use for mutation requests
/// - parameter subscriptionStrategy: The field execution strategy to use for subscription requests
/// - parameter instrumentation: The instrumentation implementation to call during the parsing, validating, execution, and field resolution stages.
/// - parameter schema: The GraphQL type system to use when validating and executing a query.
/// - parameter request: A GraphQL language formatted string representing the requested operation.
/// - parameter rootValue: The value provided as the first argument to resolver functions on the top level type (e.g. the query object type).
/// - parameter contextValue: A context value provided to all resolver functions
/// - parameter variableValues: A mapping of variable name to runtime value to use for all variables defined in the `request`.
/// - parameter operationName: The name of the operation to use if `request` contains multiple possible operations. Can be omitted if `request` contains only one operation.
///
/// - throws: throws GraphQLError if an error occurs while parsing the `request`.
///
/// - returns: returns a SubscriptionResult containing the subscription observable inside the key `observable` and any validation or execution errors inside the key `errors`. The
/// value of `observable` might be `null` if, for example, the query is invalid. It's not possible to have both `observable` and `errors`. The observable payloads are
/// GraphQLResults which contain the result of the query inside the key `data` and any validation or execution errors inside the key `errors`. The value of `data` might be `null`.
/// It's possible to have both `data` and `errors` if an error occurs only in a specific field. If that happens the value of that field will be `null` and there
/// will be an error inside `errors` specifying the reason for the failure and the path of the failed field.
public func graphqlSubscribe(
queryStrategy: QueryFieldExecutionStrategy = SerialFieldExecutionStrategy(),
mutationStrategy: MutationFieldExecutionStrategy = SerialFieldExecutionStrategy(),
subscriptionStrategy: SubscriptionFieldExecutionStrategy = SerialFieldExecutionStrategy(),
instrumentation: Instrumentation = NoOpInstrumentation,
schema: GraphQLSchema,
request: String,
rootValue: Any = Void(),
context: Any = Void(),
eventLoopGroup: EventLoopGroup,
variableValues: [String: Map] = [:],
operationName: String? = nil
) throws -> Future<SubscriptionResult> {

let source = Source(body: request, name: "GraphQL Subscription request")
let documentAST = try parse(instrumentation: instrumentation, source: source)
let validationErrors = validate(instrumentation: instrumentation, schema: schema, ast: documentAST)

guard validationErrors.isEmpty else {
return eventLoopGroup.next().makeSucceededFuture(SubscriptionResult(errors: validationErrors))
}

return subscribe(
queryStrategy: queryStrategy,
mutationStrategy: mutationStrategy,
subscriptionStrategy: subscriptionStrategy,
instrumentation: instrumentation,
schema: schema,
documentAST: documentAST,
rootValue: rootValue,
context: context,
eventLoopGroup: eventLoopGroup,
variableValues: variableValues,
operationName: operationName
)
}
2 changes: 1 addition & 1 deletion Sources/GraphQL/Language/Location.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Foundation

public struct SourceLocation : Codable {
public struct SourceLocation : Codable, Equatable {
public let line: Int
public let column: Int

Expand Down
8 changes: 8 additions & 0 deletions Sources/GraphQL/Subscription/EventStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/// Abstract event stream class - Should be overridden for actual implementations
open class EventStream<Element> {
public init() { }
/// Template method for mapping an event stream to a new generic type - MUST be overridden by implementing types.
open func map<To>(_ closure: @escaping (Element) throws -> To) -> EventStream<To> {
fatalError("This function should be overridden by implementing classes")
}
}
280 changes: 280 additions & 0 deletions Sources/GraphQL/Subscription/Subscribe.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
import Dispatch
import Runtime
import NIO

/**
* Implements the "Subscribe" algorithm described in the GraphQL specification.
*
* Returns a future which resolves to a SubscriptionResult containing either
* a SubscriptionObservable (if successful), or GraphQLErrors (error).
*
* If the client-provided arguments to this function do not result in a
* compliant subscription, the future will resolve to a
* SubscriptionResult containing `errors` and no `observable`.
*
* If the source stream could not be created due to faulty subscription
* resolver logic or underlying systems, the future will resolve to a
* SubscriptionResult containing `errors` and no `observable`.
*
* If the operation succeeded, the future will resolve to a SubscriptionResult,
* containing an `observable` which yields a stream of GraphQLResults
* representing the response stream.
*
* Accepts either an object with named arguments, or individual arguments.
*/
func subscribe(
queryStrategy: QueryFieldExecutionStrategy,
mutationStrategy: MutationFieldExecutionStrategy,
subscriptionStrategy: SubscriptionFieldExecutionStrategy,
instrumentation: Instrumentation,
schema: GraphQLSchema,
documentAST: Document,
rootValue: Any,
context: Any,
eventLoopGroup: EventLoopGroup,
variableValues: [String: Map] = [:],
operationName: String? = nil
) -> EventLoopFuture<SubscriptionResult> {

let sourceFuture = createSourceEventStream(
queryStrategy: queryStrategy,
mutationStrategy: mutationStrategy,
subscriptionStrategy: subscriptionStrategy,
instrumentation: instrumentation,
schema: schema,
documentAST: documentAST,
rootValue: rootValue,
context: context,
eventLoopGroup: eventLoopGroup,
variableValues: variableValues,
operationName: operationName
)

return sourceFuture.map{ sourceResult -> SubscriptionResult in
if let sourceStream = sourceResult.stream {
let subscriptionStream = sourceStream.map { eventPayload -> Future<GraphQLResult> in

// For each payload yielded from a subscription, map it over the normal
// GraphQL `execute` function, with `payload` as the rootValue.
// This implements the "MapSourceToResponseEvent" algorithm described in
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
return execute(
queryStrategy: queryStrategy,
mutationStrategy: mutationStrategy,
subscriptionStrategy: subscriptionStrategy,
instrumentation: instrumentation,
schema: schema,
documentAST: documentAST,
rootValue: eventPayload,
context: context,
eventLoopGroup: eventLoopGroup,
variableValues: variableValues,
operationName: operationName
)
}
return SubscriptionResult(stream: subscriptionStream, errors: sourceResult.errors)
} else {
return SubscriptionResult(errors: sourceResult.errors)
}
}
}

/**
* Implements the "CreateSourceEventStream" algorithm described in the
* GraphQL specification, resolving the subscription source event stream.
*
* Returns a Future which resolves to a SourceEventStreamResult, containing
* either an Observable (if successful) or GraphQLErrors (error).
*
* If the client-provided arguments to this function do not result in a
* compliant subscription, the future will resolve to a
* SourceEventStreamResult containing `errors` and no `observable`.
*
* If the source stream could not be created due to faulty subscription
* resolver logic or underlying systems, the future will resolve to a
* SourceEventStreamResult containing `errors` and no `observable`.
*
* If the operation succeeded, the future will resolve to a SubscriptionResult,
* containing an `observable` which yields a stream of event objects
* returned by the subscription resolver.
*
* A Source Event Stream represents a sequence of events, each of which triggers
* a GraphQL execution for that event.
*
* This may be useful when hosting the stateful subscription service in a
* different process or machine than the stateless GraphQL execution engine,
* or otherwise separating these two steps. For more on this, see the
* "Supporting Subscriptions at Scale" information in the GraphQL specification.
*/
func createSourceEventStream(
queryStrategy: QueryFieldExecutionStrategy,
mutationStrategy: MutationFieldExecutionStrategy,
subscriptionStrategy: SubscriptionFieldExecutionStrategy,
instrumentation: Instrumentation,
schema: GraphQLSchema,
documentAST: Document,
rootValue: Any,
context: Any,
eventLoopGroup: EventLoopGroup,
variableValues: [String: Map] = [:],
operationName: String? = nil
) -> EventLoopFuture<SourceEventStreamResult> {

let executeStarted = instrumentation.now

do {
// If a valid context cannot be created due to incorrect arguments,
// this will throw an error.
let exeContext = try buildExecutionContext(
queryStrategy: queryStrategy,
mutationStrategy: mutationStrategy,
subscriptionStrategy: subscriptionStrategy,
instrumentation: instrumentation,
schema: schema,
documentAST: documentAST,
rootValue: rootValue,
context: context,
eventLoopGroup: eventLoopGroup,
rawVariableValues: variableValues,
operationName: operationName
)
return try executeSubscription(context: exeContext, eventLoopGroup: eventLoopGroup)
} catch let error as GraphQLError {
instrumentation.operationExecution(
processId: processId(),
threadId: threadId(),
started: executeStarted,
finished: instrumentation.now,
schema: schema,
document: documentAST,
rootValue: rootValue,
eventLoopGroup: eventLoopGroup,
variableValues: variableValues,
operation: nil,
errors: [error],
result: nil
)

return eventLoopGroup.next().makeSucceededFuture(SourceEventStreamResult(errors: [error]))
} catch {
return eventLoopGroup.next().makeSucceededFuture(SourceEventStreamResult(errors: [GraphQLError(error)]))
}
}

func executeSubscription(
context: ExecutionContext,
eventLoopGroup: EventLoopGroup
) throws -> EventLoopFuture<SourceEventStreamResult> {

// Get the first node
let type = try getOperationRootType(schema: context.schema, operation: context.operation)
var inputFields: [String:[Field]] = [:]
var visitedFragmentNames: [String:Bool] = [:]
let fields = try collectFields(
exeContext: context,
runtimeType: type,
selectionSet: context.operation.selectionSet,
fields: &inputFields,
visitedFragmentNames: &visitedFragmentNames
)
// If query is valid, fields is guaranteed to have at least 1 member
let responseName = fields.keys.first!
let fieldNodes = fields[responseName]!
let fieldNode = fieldNodes.first!

guard let fieldDef = getFieldDef(schema: context.schema, parentType: type, fieldAST: fieldNode) else {
throw GraphQLError(
message: "The subscription field '\(fieldNode.name.value)' is not defined.",
nodes: fieldNodes
)
}

// Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
// It differs from "ResolveFieldValue" due to providing a different `resolveFn`.

// Build a map of arguments from the field.arguments AST, using the
// variables scope to fulfill any variable references.
let args = try getArgumentValues(argDefs: fieldDef.args, argASTs: fieldNode.arguments, variableValues: context.variableValues)

// The resolve function's optional third argument is a context value that
// is provided to every resolve function within an execution. It is commonly
// used to represent an authenticated user, or request-specific caches.
let contextValue = context.context

// The resolve function's optional fourth argument is a collection of
// information about the current execution state.
let path = IndexPath.init().appending(fieldNode.name.value)
let info = GraphQLResolveInfo.init(
fieldName: fieldDef.name,
fieldASTs: fieldNodes,
returnType: fieldDef.type,
parentType: type,
path: path,
schema: context.schema,
fragments: context.fragments,
rootValue: context.rootValue,
operation: context.operation,
variableValues: context.variableValues
)

// Call the `subscribe()` resolver or the default resolver to produce an
// Observable yielding raw payloads.
let resolve = fieldDef.subscribe ?? defaultResolve

// Get the resolve func, regardless of if its result is normal
// or abrupt (error).
let resolvedFutureOrError = resolveOrError(
resolve: resolve,
source: context.rootValue,
args: args,
context: contextValue,
eventLoopGroup: eventLoopGroup,
info: info
)

let resolvedFuture:Future<Any?>
switch resolvedFutureOrError {
case let .failure(error):
if let graphQLError = error as? GraphQLError {
throw graphQLError
} else {
throw GraphQLError(error)
}
case let .success(success):
resolvedFuture = success
}
return resolvedFuture.map { resolved -> SourceEventStreamResult in
if !context.errors.isEmpty {
return SourceEventStreamResult(errors: context.errors)
} else if let error = resolved as? GraphQLError {
return SourceEventStreamResult(errors: [error])
} else if let stream = resolved as? EventStream<Any> {
return SourceEventStreamResult(stream: stream)
} else if resolved == nil {
return SourceEventStreamResult(errors: [
GraphQLError(message: "Resolved subscription was nil")
])
} else {
let resolvedObj = resolved as AnyObject
return SourceEventStreamResult(errors: [
GraphQLError(
message: "Subscription field resolver must return EventStream<Any>. Received: '\(resolvedObj)'"
)
])
}
}
}

// Subscription resolvers MUST return observables that are declared as 'Any' due to Swift not having covariant generic support for type
// checking. Normal resolvers for subscription fields should handle type casting, same as resolvers for query fields.
struct SourceEventStreamResult {
public let stream: EventStream<Any>?
public let errors: [GraphQLError]

public init(stream: EventStream<Any>? = nil, errors: [GraphQLError] = []) {
self.stream = stream
self.errors = errors
}
}
Loading