From 94653dfba2e41678729a4985bd0ce14b209137b8 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Mon, 15 Mar 2021 13:26:17 +0000 Subject: [PATCH] Use synchronous pipeline hops to remove windows. Motivation: There is an awkward timing window in the TLSEventsHandler flow where it is possible for the NIOSSLClientHandler to fail the handshake on handlerAdded. If this happens, the TLSEventsHandler will not be in the pipeline, and so the handshake failure error will be lost and we'll get a generic one instead. This window can be resolved without performance penalty if we use the new synchronous pipeline operations view to add the two handlers backwards. If this is done then we can ensure that the TLSEventsHandler is always in the pipeline before the NIOSSLClientHandler, and so there is no risk of event loss. While I'm here, AHC does a lot of pipeline modification. This has led to lengthy future chains with lots of event loop hops for no particularly good reason. I've therefore replaced all pipeline operations with their synchronous counterparts. All but one sequence was happening on the correct event loop, and for the one that may not I've added a fast-path dispatch that should tolerate being on the wrong one. The result is cleaner, more linear code that also reduces the allocations and event loop hops. Modifications: - Use synchronous pipeline operations everywhere - Change the order of adding TLSEventsHandler and NIOSSLClientHandler Result: Faster, safer, fewer timing windows. --- Package.swift | 2 +- Sources/AsyncHTTPClient/HTTPClient.swift | 64 +++++++++++++++--------- Sources/AsyncHTTPClient/Utils.swift | 37 +++++++------- 3 files changed, 60 insertions(+), 43 deletions(-) diff --git a/Package.swift b/Package.swift index 261d3f019..01912ec09 100644 --- a/Package.swift +++ b/Package.swift @@ -21,7 +21,7 @@ let package = Package( .library(name: "AsyncHTTPClient", targets: ["AsyncHTTPClient"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.27.0"), .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"), .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.5.1"), diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 409d16cfe..10ff09a8f 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -561,16 +561,21 @@ public class HTTPClient { "ahc-task-el": "\(taskEL)"]) let channel = connection.channel - let future: EventLoopFuture - if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) { - future = channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout)) - } else { - future = channel.eventLoop.makeSucceededFuture(()) - } - return future.flatMap { - return channel.pipeline.addHandler(taskHandler) - }.flatMap { + func prepareChannelForTask0() -> EventLoopFuture { + do { + let syncPipelineOperations = channel.pipeline.syncOperations + + if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) { + try syncPipelineOperations.addHandler(IdleStateHandler(readTimeout: timeout)) + } + + try syncPipelineOperations.addHandler(taskHandler) + } catch { + connection.release(closing: true, logger: logger) + return channel.eventLoop.makeFailedFuture(error) + } + task.setConnection(connection) let isCancelled = task.lock.withLock { @@ -581,14 +586,19 @@ public class HTTPClient { return channel.writeAndFlush(request).flatMapError { _ in // At this point the `TaskHandler` will already be present // to handle the failure and pass it to the `promise` - channel.eventLoop.makeSucceededFuture(()) + channel.eventLoop.makeSucceededVoidFuture() } } else { - return channel.eventLoop.makeSucceededFuture(()) + return channel.eventLoop.makeSucceededVoidFuture() + } + } + + if channel.eventLoop.inEventLoop { + return prepareChannelForTask0() + } else { + return channel.eventLoop.flatSubmit { + return prepareChannelForTask0() } - }.flatMapError { error in - connection.release(closing: true, logger: logger) - return channel.eventLoop.makeFailedFuture(error) } }.always { _ in setupComplete.succeed(()) @@ -873,7 +883,7 @@ extension HTTPClient.Configuration { } extension ChannelPipeline { - func addProxyHandler(host: String, port: Int, authorization: HTTPClient.Authorization?) -> EventLoopFuture { + func syncAddProxyHandler(host: String, port: Int, authorization: HTTPClient.Authorization?) throws { let encoder = HTTPRequestEncoder() let decoder = ByteToMessageHandler(HTTPResponseDecoder(leftOverBytesStrategy: .forwardBytes)) let handler = HTTPClientProxyHandler(host: host, port: port, authorization: authorization) { channel in @@ -883,28 +893,34 @@ extension ChannelPipeline { channel.pipeline.removeHandler(decoder) } } - return addHandlers([encoder, decoder, handler]) + + let sync = self.syncOperations + try sync.addHandler(encoder) + try sync.addHandler(decoder) + try sync.addHandler(handler) } - func addSSLHandlerIfNeeded(for key: ConnectionPool.Key, tlsConfiguration: TLSConfiguration?, addSSLClient: Bool, handshakePromise: EventLoopPromise) { + func syncAddSSLHandlerIfNeeded(for key: ConnectionPool.Key, tlsConfiguration: TLSConfiguration?, addSSLClient: Bool, handshakePromise: EventLoopPromise) { guard key.scheme.requiresTLS else { handshakePromise.succeed(()) return } do { - let handlers: [ChannelHandler] + let synchronousPipelineView = self.syncOperations + + // We add the TLSEventsHandler first so that it's always in the pipeline before any other TLS handler we add. + let eventsHandler = TLSEventsHandler(completionPromise: handshakePromise) + try synchronousPipelineView.addHandler(eventsHandler) + if addSSLClient { let tlsConfiguration = tlsConfiguration ?? TLSConfiguration.forClient() let context = try NIOSSLContext(configuration: tlsConfiguration) - handlers = [ + try synchronousPipelineView.addHandler( try NIOSSLClientHandler(context: context, serverHostname: (key.host.isIPAddress || key.host.isEmpty) ? nil : key.host), - TLSEventsHandler(completionPromise: handshakePromise), - ] - } else { - handlers = [TLSEventsHandler(completionPromise: handshakePromise)] + position: .before(eventsHandler) + ) } - self.addHandlers(handlers).cascadeFailure(to: handshakePromise) } catch { handshakePromise.fail(error) } diff --git a/Sources/AsyncHTTPClient/Utils.swift b/Sources/AsyncHTTPClient/Utils.swift index 30fbd8107..2f1bf0b40 100644 --- a/Sources/AsyncHTTPClient/Utils.swift +++ b/Sources/AsyncHTTPClient/Utils.swift @@ -128,14 +128,14 @@ extension NIOClientTCPBootstrap { return try self.makeBootstrap(on: eventLoop, host: host, requiresTLS: requiresTLS, configuration: configuration) .channelOption(ChannelOptions.socket(SocketOptionLevel(IPPROTO_TCP), TCP_NODELAY), value: 1) .channelInitializer { channel in - let channelAddedFuture: EventLoopFuture - switch configuration.proxy { - case .none: - channelAddedFuture = eventLoop.makeSucceededFuture(()) - case .some: - channelAddedFuture = channel.pipeline.addProxyHandler(host: host, port: port, authorization: configuration.proxy?.authorization) + do { + if let proxy = configuration.proxy { + try channel.pipeline.syncAddProxyHandler(host: host, port: port, authorization: proxy.authorization) + } + return channel.eventLoop.makeSucceededVoidFuture() + } catch { + return channel.eventLoop.makeFailedFuture(error) } - return channelAddedFuture } } @@ -165,27 +165,28 @@ extension NIOClientTCPBootstrap { let requiresSSLHandler = configuration.proxy != nil && key.scheme.requiresTLS let handshakePromise = channel.eventLoop.makePromise(of: Void.self) - channel.pipeline.addSSLHandlerIfNeeded(for: key, tlsConfiguration: configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) + channel.pipeline.syncAddSSLHandlerIfNeeded(for: key, tlsConfiguration: configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) + + return handshakePromise.futureResult.flatMapThrowing { + let syncOperations = channel.pipeline.syncOperations + + try syncOperations.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) - return handshakePromise.futureResult.flatMap { - channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) - }.flatMap { #if canImport(Network) if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) + try syncOperations.addHandler(HTTPClient.NWErrorHandler(), position: .first) } #endif - return channel.eventLoop.makeSucceededFuture(()) - }.flatMap { + switch configuration.decompression { case .disabled: - return channel.eventLoop.makeSucceededFuture(()) + () case .enabled(let limit): let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) - return channel.pipeline.addHandler(decompressHandler) + try syncOperations.addHandler(decompressHandler) } - }.map { - channel + + return channel } }.flatMapError { error in #if canImport(Network)