Skip to content

Close idle pool connections #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions Sources/AsyncHTTPClient/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,36 @@ final class ConnectionPool {
fileprivate var closePromise: EventLoopPromise<Void>

var closeFuture: EventLoopFuture<Void>

func removeIdleConnectionHandlersForLease() -> EventLoopFuture<Connection> {
return self.channel.eventLoop.flatSubmit {
self.removeHandler(IdleStateHandler.self).flatMap { () -> EventLoopFuture<Bool> in
self.channel.pipeline.handler(type: IdlePoolConnectionHandler.self).flatMap { idleHandler in
self.channel.pipeline.removeHandler(idleHandler).flatMapError { _ in
self.channel.eventLoop.makeSucceededFuture(())
}.map {
idleHandler.hasNotSentClose && self.channel.isActive
}
}.flatMapError { error in
// These handlers are only added on connection release, they are not added
// when a connection is made to be instantly leased, so we ignore this error
if let channelError = error as? ChannelPipelineError, channelError == .notFound {
return self.channel.eventLoop.makeSucceededFuture(self.channel.isActive)
} else {
return self.channel.eventLoop.makeFailedFuture(error)
}
}
}.flatMap { channelIsUsable in
if channelIsUsable {
return self.channel.eventLoop.makeSucceededFuture(self)
} else {
return self.channel.eventLoop.makeFailedFuture(InactiveChannelError())
}
}
}
}

struct InactiveChannelError: Error {}
}

/// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port)
Expand Down Expand Up @@ -294,7 +324,14 @@ final class ConnectionPool {
let action = self.stateLock.withLock { self.state.connectionAction(for: preference) }
switch action {
case .leaseConnection(let connection):
return connection.channel.eventLoop.makeSucceededFuture(connection)
return connection.removeIdleConnectionHandlersForLease().flatMapError { _ in
connection.closeFuture.flatMap { // We ensure close actions are run first
let defaultEventLoop = self.stateLock.withLock {
self.state.defaultEventLoop
}
return self.makeConnection(on: preference.bestEventLoop ?? defaultEventLoop)
}
}
case .makeConnection(let eventLoop):
return self.makeConnection(on: eventLoop)
case .leaseFutureConnection(let futureConnection):
Expand Down Expand Up @@ -453,7 +490,7 @@ final class ConnectionPool {

fileprivate struct State {
/// The default `EventLoop` to use for this `HTTP1ConnectionProvider`
private let defaultEventLoop: EventLoop
let defaultEventLoop: EventLoop

/// The maximum number of connections to a certain (host, scheme, port) tuple.
private let maximumConcurrentConnections: Int = 8
Expand All @@ -476,7 +513,11 @@ final class ConnectionPool {

fileprivate var activity: Activity = .opened

fileprivate var pending: Int = 0
fileprivate var pending: Int = 0 {
didSet {
assert(self.pending >= 0)
}
}

private let parentPool: ConnectionPool

Expand Down
56 changes: 54 additions & 2 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public class HTTPClient {
redirectHandler = nil
}

let task = Task<Delegate.Response>(eventLoop: taskEL)
let task = Task<Delegate.Response>(eventLoop: taskEL, poolingTimeout: self.configuration.maximumAllowedIdleTimeInConnectionPool)
self.stateLock.withLock {
self.tasks[task.id] = task
}
Expand All @@ -322,7 +322,6 @@ public class HTTPClient {
connection.flatMap { connection -> EventLoopFuture<Void> in
let channel = connection.channel
let addedFuture: EventLoopFuture<Void>

switch self.configuration.decompression {
case .disabled:
addedFuture = channel.eventLoop.makeSucceededFuture(())
Expand Down Expand Up @@ -408,6 +407,8 @@ public class HTTPClient {
public var redirectConfiguration: RedirectConfiguration
/// Default client timeout, defaults to no timeouts.
public var timeout: Timeout
/// Timeout of pooled connections
public var maximumAllowedIdleTimeInConnectionPool: TimeAmount?
/// Upstream proxy, defaults to no proxy.
public var proxy: Proxy?
/// Enables automatic body decompression. Supported algorithms are gzip and deflate.
Expand All @@ -418,30 +419,68 @@ public class HTTPClient {
public init(tlsConfiguration: TLSConfiguration? = nil,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
maximumAllowedIdleTimeInConnectionPool: TimeAmount,
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled) {
self.tlsConfiguration = tlsConfiguration
self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration()
self.timeout = timeout
self.maximumAllowedIdleTimeInConnectionPool = maximumAllowedIdleTimeInConnectionPool
self.proxy = proxy
self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown
self.decompression = decompression
}

public init(tlsConfiguration: TLSConfiguration? = nil,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled) {
self.init(
tlsConfiguration: tlsConfiguration,
redirectConfiguration: redirectConfiguration,
timeout: timeout,
maximumAllowedIdleTimeInConnectionPool: .seconds(60),
proxy: proxy,
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
decompression: decompression
)
}

public init(certificateVerification: CertificateVerification,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
maximumAllowedIdleTimeInConnectionPool: TimeAmount = .seconds(60),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled) {
self.tlsConfiguration = TLSConfiguration.forClient(certificateVerification: certificateVerification)
self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration()
self.timeout = timeout
self.maximumAllowedIdleTimeInConnectionPool = maximumAllowedIdleTimeInConnectionPool
self.proxy = proxy
self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown
self.decompression = decompression
}

public init(certificateVerification: CertificateVerification,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled) {
self.init(
certificateVerification: certificateVerification,
redirectConfiguration: redirectConfiguration,
timeout: timeout,
maximumAllowedIdleTimeInConnectionPool: .seconds(60),
proxy: proxy,
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
decompression: decompression
)
}
}

/// Specifies how `EventLoopGroup` will be created and establishes lifecycle ownership.
Expand Down Expand Up @@ -490,6 +529,19 @@ public class HTTPClient {
public static func delegateAndChannel(on eventLoop: EventLoop) -> EventLoopPreference {
return EventLoopPreference(.delegateAndChannel(on: eventLoop))
}

var bestEventLoop: EventLoop? {
switch self.preference {
case .delegate(on: let el):
return el
case .delegateAndChannel(on: let el):
return el
case .testOnly_exact(channelOn: let el, delegateOn: _):
return el
case .indifferent:
return nil
}
}
}

/// Specifies decompression settings.
Expand Down
35 changes: 34 additions & 1 deletion Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,15 @@ extension HTTPClient {
var cancelled: Bool
let lock: Lock
let id = UUID()
let poolingTimeout: TimeAmount?

init(eventLoop: EventLoop) {
init(eventLoop: EventLoop, poolingTimeout: TimeAmount? = nil) {
self.eventLoop = eventLoop
self.promise = eventLoop.makePromise()
self.completion = self.promise.futureResult.map { _ in }
self.cancelled = false
self.lock = Lock()
self.poolingTimeout = poolingTimeout
}

static func failedTask(eventLoop: EventLoop, error: Error) -> Task<Response> {
Expand Down Expand Up @@ -571,6 +573,19 @@ extension HTTPClient {
connection.removeHandler(IdleStateHandler.self)
}.flatMap {
connection.removeHandler(TaskHandler<Delegate>.self)
}.flatMap {
let idlePoolConnectionHandler = IdlePoolConnectionHandler()
return connection.channel.pipeline.addHandler(idlePoolConnectionHandler, position: .last).flatMap {
connection.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: self.poolingTimeout), position: .before(idlePoolConnectionHandler))
}
}.flatMapError { error in
if let error = error as? ChannelError, error == .ioOnClosedChannel {
// We may get this error if channel is released because it is
// closed, it is safe to ignore it
return connection.channel.eventLoop.makeSucceededFuture(())
} else {
return connection.channel.eventLoop.makeFailedFuture(error)
}
}.map {
connection.release()
}.flatMapError { error in
Expand Down Expand Up @@ -1008,3 +1023,21 @@ internal struct RedirectHandler<ResponseType> {
}
}
}

class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = NIOAny

let _hasNotSentClose: NIOAtomic<Bool> = .makeAtomic(value: true)
var hasNotSentClose: Bool {
return self._hasNotSentClose.load()
}

func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write {
self._hasNotSentClose.store(false)
context.close(promise: nil)
} else {
context.fireUserInboundEventTriggered(event)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extension HTTPClientInternalTests {
("testResponseConnectionCloseGet", testResponseConnectionCloseGet),
("testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool", testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool),
("testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown", testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown),
("testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection", testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection),
]
}
}
102 changes: 102 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -623,4 +623,106 @@ class HTTPClientInternalTests: XCTestCase {
}
XCTAssertNoThrow(try client.syncShutdown())
}

func testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection() {
final class DelayChannelCloseUntilToldHandler: ChannelOutboundHandler {
typealias OutboundIn = Any

enum State {
case idling
case delayedClose
case closeDone
}

var state: State = .idling
let doTheCloseNowFuture: EventLoopFuture<Void>
let sawTheClosePromise: EventLoopPromise<Void>

init(doTheCloseNowFuture: EventLoopFuture<Void>,
sawTheClosePromise: EventLoopPromise<Void>) {
self.doTheCloseNowFuture = doTheCloseNowFuture
self.sawTheClosePromise = sawTheClosePromise
}

func handlerRemoved(context: ChannelHandlerContext) {
XCTAssertEqual(.closeDone, self.state)
}

func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
XCTAssertEqual(.idling, self.state)
self.state = .delayedClose
self.sawTheClosePromise.succeed(())
// let's hold the close until the future's complete
self.doTheCloseNowFuture.whenSuccess {
context.close(mode: mode).map {
XCTAssertEqual(.delayedClose, self.state)
self.state = .closeDone
}.cascade(to: promise)
}
}
}

let web = HTTPBin()
defer {
XCTAssertNoThrow(try web.shutdown())
}

let client = HTTPClient(eventLoopGroupProvider: .createNew)
defer {
XCTAssertNoThrow(try client.syncShutdown())
}

let req = try! HTTPClient.Request(url: "http://localhost:\(web.serverChannel.localAddress!.port!)/get",
method: .GET,
body: nil)

// Let's start by getting a connection so we can mess with the Channel :).
var maybeConnection: ConnectionPool.Connection?
XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(for: req,
preference: .indifferent,
on: client.eventLoopGroup.next(),
deadline: nil).wait())
guard let connection = maybeConnection else {
XCTFail("couldn't make connection")
return
}

let channel = connection.channel
let doActualCloseNowPromise = channel.eventLoop.makePromise(of: Void.self)
let sawTheClosePromise = channel.eventLoop.makePromise(of: Void.self)

XCTAssertNoThrow(try channel.pipeline.addHandler(DelayChannelCloseUntilToldHandler(doTheCloseNowFuture: doActualCloseNowPromise.futureResult,
sawTheClosePromise: sawTheClosePromise),
position: .first).wait())
client.pool.release(connection)

XCTAssertNoThrow(try client.execute(request: req).wait())

// Now, let's pretend the timeout happened
channel.pipeline.fireUserInboundEventTriggered(IdleStateHandler.IdleStateEvent.write)

// The Channel's closure should have already been initialised now but still, let's make sure the close
// was initiated
XCTAssertNoThrow(try sawTheClosePromise.futureResult.wait())
// The Channel should still be active though because we delayed the close through our handler above.
XCTAssertTrue(channel.isActive)

// When asking for a connection again, we should _not_ get the same one back because we did most of the close,
// similar to what the SSLHandler would do.
let connection2Future = client.pool.getConnection(for: req,
preference: .indifferent,
on: client.eventLoopGroup.next(),
deadline: nil)
doActualCloseNowPromise.succeed(())

XCTAssertNoThrow(try maybeConnection = connection2Future.wait())
guard let connection2 = maybeConnection else {
XCTFail("couldn't get second connection")
return
}

XCTAssert(connection !== connection2)
client.pool.release(connection2)
XCTAssertTrue(connection2.channel.isActive)
}
}
Loading