Skip to content

Use synchronous pipeline hops to remove windows. #346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let package = Package(
.library(name: "AsyncHTTPClient", targets: ["AsyncHTTPClient"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.27.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.3.0"),
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.5.1"),
Expand Down
64 changes: 40 additions & 24 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -561,16 +561,21 @@ public class HTTPClient {
"ahc-task-el": "\(taskEL)"])

let channel = connection.channel
let future: EventLoopFuture<Void>
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
future = channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout))
} else {
future = channel.eventLoop.makeSucceededFuture(())
}

return future.flatMap {
return channel.pipeline.addHandler(taskHandler)
}.flatMap {
func prepareChannelForTask0() -> EventLoopFuture<Void> {
do {
let syncPipelineOperations = channel.pipeline.syncOperations

if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
try syncPipelineOperations.addHandler(IdleStateHandler(readTimeout: timeout))
}

try syncPipelineOperations.addHandler(taskHandler)
} catch {
connection.release(closing: true, logger: logger)
return channel.eventLoop.makeFailedFuture(error)
}

task.setConnection(connection)

let isCancelled = task.lock.withLock {
Expand All @@ -581,14 +586,19 @@ public class HTTPClient {
return channel.writeAndFlush(request).flatMapError { _ in
// At this point the `TaskHandler` will already be present
// to handle the failure and pass it to the `promise`
channel.eventLoop.makeSucceededFuture(())
channel.eventLoop.makeSucceededVoidFuture()
}
} else {
return channel.eventLoop.makeSucceededFuture(())
return channel.eventLoop.makeSucceededVoidFuture()
}
}

if channel.eventLoop.inEventLoop {
return prepareChannelForTask0()
} else {
return channel.eventLoop.flatSubmit {
return prepareChannelForTask0()
}
}.flatMapError { error in
connection.release(closing: true, logger: logger)
return channel.eventLoop.makeFailedFuture(error)
}
}.always { _ in
setupComplete.succeed(())
Expand Down Expand Up @@ -873,7 +883,7 @@ extension HTTPClient.Configuration {
}

extension ChannelPipeline {
func addProxyHandler(host: String, port: Int, authorization: HTTPClient.Authorization?) -> EventLoopFuture<Void> {
func syncAddProxyHandler(host: String, port: Int, authorization: HTTPClient.Authorization?) throws {
let encoder = HTTPRequestEncoder()
let decoder = ByteToMessageHandler(HTTPResponseDecoder(leftOverBytesStrategy: .forwardBytes))
let handler = HTTPClientProxyHandler(host: host, port: port, authorization: authorization) { channel in
Expand All @@ -883,28 +893,34 @@ extension ChannelPipeline {
channel.pipeline.removeHandler(decoder)
}
}
return addHandlers([encoder, decoder, handler])

let sync = self.syncOperations
try sync.addHandler(encoder)
try sync.addHandler(decoder)
try sync.addHandler(handler)
}

func addSSLHandlerIfNeeded(for key: ConnectionPool.Key, tlsConfiguration: TLSConfiguration?, addSSLClient: Bool, handshakePromise: EventLoopPromise<Void>) {
func syncAddSSLHandlerIfNeeded(for key: ConnectionPool.Key, tlsConfiguration: TLSConfiguration?, addSSLClient: Bool, handshakePromise: EventLoopPromise<Void>) {
guard key.scheme.requiresTLS else {
handshakePromise.succeed(())
return
}

do {
let handlers: [ChannelHandler]
let synchronousPipelineView = self.syncOperations

// We add the TLSEventsHandler first so that it's always in the pipeline before any other TLS handler we add.
let eventsHandler = TLSEventsHandler(completionPromise: handshakePromise)
try synchronousPipelineView.addHandler(eventsHandler)

if addSSLClient {
let tlsConfiguration = tlsConfiguration ?? TLSConfiguration.forClient()
let context = try NIOSSLContext(configuration: tlsConfiguration)
handlers = [
try synchronousPipelineView.addHandler(
try NIOSSLClientHandler(context: context, serverHostname: (key.host.isIPAddress || key.host.isEmpty) ? nil : key.host),
TLSEventsHandler(completionPromise: handshakePromise),
]
} else {
handlers = [TLSEventsHandler(completionPromise: handshakePromise)]
position: .before(eventsHandler)
)
}
self.addHandlers(handlers).cascadeFailure(to: handshakePromise)
} catch {
handshakePromise.fail(error)
}
Expand Down
37 changes: 19 additions & 18 deletions Sources/AsyncHTTPClient/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ extension NIOClientTCPBootstrap {
return try self.makeBootstrap(on: eventLoop, host: host, requiresTLS: requiresTLS, configuration: configuration)
.channelOption(ChannelOptions.socket(SocketOptionLevel(IPPROTO_TCP), TCP_NODELAY), value: 1)
.channelInitializer { channel in
let channelAddedFuture: EventLoopFuture<Void>
switch configuration.proxy {
case .none:
channelAddedFuture = eventLoop.makeSucceededFuture(())
case .some:
channelAddedFuture = channel.pipeline.addProxyHandler(host: host, port: port, authorization: configuration.proxy?.authorization)
do {
if let proxy = configuration.proxy {
try channel.pipeline.syncAddProxyHandler(host: host, port: port, authorization: proxy.authorization)
}
return channel.eventLoop.makeSucceededVoidFuture()
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
return channelAddedFuture
}
}

Expand Down Expand Up @@ -165,27 +165,28 @@ extension NIOClientTCPBootstrap {
let requiresSSLHandler = configuration.proxy != nil && key.scheme.requiresTLS
let handshakePromise = channel.eventLoop.makePromise(of: Void.self)

channel.pipeline.addSSLHandlerIfNeeded(for: key, tlsConfiguration: configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise)
channel.pipeline.syncAddSSLHandlerIfNeeded(for: key, tlsConfiguration: configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise)

return handshakePromise.futureResult.flatMapThrowing {
let syncOperations = channel.pipeline.syncOperations

try syncOperations.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes)

return handshakePromise.futureResult.flatMap {
channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes)
}.flatMap {
#if canImport(Network)
if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap {
return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first)
try syncOperations.addHandler(HTTPClient.NWErrorHandler(), position: .first)
}
#endif
return channel.eventLoop.makeSucceededFuture(())
}.flatMap {

switch configuration.decompression {
case .disabled:
return channel.eventLoop.makeSucceededFuture(())
()
case .enabled(let limit):
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
return channel.pipeline.addHandler(decompressHandler)
try syncOperations.addHandler(decompressHandler)
}
}.map {
channel

return channel
}
}.flatMapError { error in
#if canImport(Network)
Expand Down