Skip to content

Commit 9cdd1dc

Browse files
Trevörweissi
Trevör
andauthored
Close idle pool connections (#170)
* Close idle pool connections Motivation: Pooled connections should close at some point (see #168) Changes: - Add new poolingTimeout property to HTTPClient.Configuration, it's default value is .seconds(60), it can be set to nil if one wishes to disable this timeout. - Add relevant unit test Co-authored-by: Johannes Weiss <[email protected]>
1 parent a794d30 commit 9cdd1dc

8 files changed

+294
-7
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool.swift

+44-3
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,36 @@ final class ConnectionPool {
224224
fileprivate var closePromise: EventLoopPromise<Void>
225225

226226
var closeFuture: EventLoopFuture<Void>
227+
228+
func removeIdleConnectionHandlersForLease() -> EventLoopFuture<Connection> {
229+
return self.channel.eventLoop.flatSubmit {
230+
self.removeHandler(IdleStateHandler.self).flatMap { () -> EventLoopFuture<Bool> in
231+
self.channel.pipeline.handler(type: IdlePoolConnectionHandler.self).flatMap { idleHandler in
232+
self.channel.pipeline.removeHandler(idleHandler).flatMapError { _ in
233+
self.channel.eventLoop.makeSucceededFuture(())
234+
}.map {
235+
idleHandler.hasNotSentClose && self.channel.isActive
236+
}
237+
}.flatMapError { error in
238+
// These handlers are only added on connection release, they are not added
239+
// when a connection is made to be instantly leased, so we ignore this error
240+
if let channelError = error as? ChannelPipelineError, channelError == .notFound {
241+
return self.channel.eventLoop.makeSucceededFuture(self.channel.isActive)
242+
} else {
243+
return self.channel.eventLoop.makeFailedFuture(error)
244+
}
245+
}
246+
}.flatMap { channelIsUsable in
247+
if channelIsUsable {
248+
return self.channel.eventLoop.makeSucceededFuture(self)
249+
} else {
250+
return self.channel.eventLoop.makeFailedFuture(InactiveChannelError())
251+
}
252+
}
253+
}
254+
}
255+
256+
struct InactiveChannelError: Error {}
227257
}
228258

229259
/// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port)
@@ -294,7 +324,14 @@ final class ConnectionPool {
294324
let action = self.stateLock.withLock { self.state.connectionAction(for: preference) }
295325
switch action {
296326
case .leaseConnection(let connection):
297-
return connection.channel.eventLoop.makeSucceededFuture(connection)
327+
return connection.removeIdleConnectionHandlersForLease().flatMapError { _ in
328+
connection.closeFuture.flatMap { // We ensure close actions are run first
329+
let defaultEventLoop = self.stateLock.withLock {
330+
self.state.defaultEventLoop
331+
}
332+
return self.makeConnection(on: preference.bestEventLoop ?? defaultEventLoop)
333+
}
334+
}
298335
case .makeConnection(let eventLoop):
299336
return self.makeConnection(on: eventLoop)
300337
case .leaseFutureConnection(let futureConnection):
@@ -453,7 +490,7 @@ final class ConnectionPool {
453490

454491
fileprivate struct State {
455492
/// The default `EventLoop` to use for this `HTTP1ConnectionProvider`
456-
private let defaultEventLoop: EventLoop
493+
let defaultEventLoop: EventLoop
457494

458495
/// The maximum number of connections to a certain (host, scheme, port) tuple.
459496
private let maximumConcurrentConnections: Int = 8
@@ -476,7 +513,11 @@ final class ConnectionPool {
476513

477514
fileprivate var activity: Activity = .opened
478515

479-
fileprivate var pending: Int = 0
516+
fileprivate var pending: Int = 0 {
517+
didSet {
518+
assert(self.pending >= 0)
519+
}
520+
}
480521

481522
private let parentPool: ConnectionPool
482523

Diff for: Sources/AsyncHTTPClient/HTTPClient.swift

+54-2
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ public class HTTPClient {
305305
redirectHandler = nil
306306
}
307307

308-
let task = Task<Delegate.Response>(eventLoop: taskEL)
308+
let task = Task<Delegate.Response>(eventLoop: taskEL, poolingTimeout: self.configuration.maximumAllowedIdleTimeInConnectionPool)
309309
self.stateLock.withLock {
310310
self.tasks[task.id] = task
311311
}
@@ -322,7 +322,6 @@ public class HTTPClient {
322322
connection.flatMap { connection -> EventLoopFuture<Void> in
323323
let channel = connection.channel
324324
let addedFuture: EventLoopFuture<Void>
325-
326325
switch self.configuration.decompression {
327326
case .disabled:
328327
addedFuture = channel.eventLoop.makeSucceededFuture(())
@@ -408,6 +407,8 @@ public class HTTPClient {
408407
public var redirectConfiguration: RedirectConfiguration
409408
/// Default client timeout, defaults to no timeouts.
410409
public var timeout: Timeout
410+
/// Timeout of pooled connections
411+
public var maximumAllowedIdleTimeInConnectionPool: TimeAmount?
411412
/// Upstream proxy, defaults to no proxy.
412413
public var proxy: Proxy?
413414
/// Enables automatic body decompression. Supported algorithms are gzip and deflate.
@@ -418,30 +419,68 @@ public class HTTPClient {
418419
public init(tlsConfiguration: TLSConfiguration? = nil,
419420
redirectConfiguration: RedirectConfiguration? = nil,
420421
timeout: Timeout = Timeout(),
422+
maximumAllowedIdleTimeInConnectionPool: TimeAmount,
421423
proxy: Proxy? = nil,
422424
ignoreUncleanSSLShutdown: Bool = false,
423425
decompression: Decompression = .disabled) {
424426
self.tlsConfiguration = tlsConfiguration
425427
self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration()
426428
self.timeout = timeout
429+
self.maximumAllowedIdleTimeInConnectionPool = maximumAllowedIdleTimeInConnectionPool
427430
self.proxy = proxy
428431
self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown
429432
self.decompression = decompression
430433
}
431434

435+
public init(tlsConfiguration: TLSConfiguration? = nil,
436+
redirectConfiguration: RedirectConfiguration? = nil,
437+
timeout: Timeout = Timeout(),
438+
proxy: Proxy? = nil,
439+
ignoreUncleanSSLShutdown: Bool = false,
440+
decompression: Decompression = .disabled) {
441+
self.init(
442+
tlsConfiguration: tlsConfiguration,
443+
redirectConfiguration: redirectConfiguration,
444+
timeout: timeout,
445+
maximumAllowedIdleTimeInConnectionPool: .seconds(60),
446+
proxy: proxy,
447+
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
448+
decompression: decompression
449+
)
450+
}
451+
432452
public init(certificateVerification: CertificateVerification,
433453
redirectConfiguration: RedirectConfiguration? = nil,
434454
timeout: Timeout = Timeout(),
455+
maximumAllowedIdleTimeInConnectionPool: TimeAmount = .seconds(60),
435456
proxy: Proxy? = nil,
436457
ignoreUncleanSSLShutdown: Bool = false,
437458
decompression: Decompression = .disabled) {
438459
self.tlsConfiguration = TLSConfiguration.forClient(certificateVerification: certificateVerification)
439460
self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration()
440461
self.timeout = timeout
462+
self.maximumAllowedIdleTimeInConnectionPool = maximumAllowedIdleTimeInConnectionPool
441463
self.proxy = proxy
442464
self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown
443465
self.decompression = decompression
444466
}
467+
468+
public init(certificateVerification: CertificateVerification,
469+
redirectConfiguration: RedirectConfiguration? = nil,
470+
timeout: Timeout = Timeout(),
471+
proxy: Proxy? = nil,
472+
ignoreUncleanSSLShutdown: Bool = false,
473+
decompression: Decompression = .disabled) {
474+
self.init(
475+
certificateVerification: certificateVerification,
476+
redirectConfiguration: redirectConfiguration,
477+
timeout: timeout,
478+
maximumAllowedIdleTimeInConnectionPool: .seconds(60),
479+
proxy: proxy,
480+
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
481+
decompression: decompression
482+
)
483+
}
445484
}
446485

447486
/// Specifies how `EventLoopGroup` will be created and establishes lifecycle ownership.
@@ -490,6 +529,19 @@ public class HTTPClient {
490529
public static func delegateAndChannel(on eventLoop: EventLoop) -> EventLoopPreference {
491530
return EventLoopPreference(.delegateAndChannel(on: eventLoop))
492531
}
532+
533+
var bestEventLoop: EventLoop? {
534+
switch self.preference {
535+
case .delegate(on: let el):
536+
return el
537+
case .delegateAndChannel(on: let el):
538+
return el
539+
case .testOnly_exact(channelOn: let el, delegateOn: _):
540+
return el
541+
case .indifferent:
542+
return nil
543+
}
544+
}
493545
}
494546

495547
/// Specifies decompression settings.

Diff for: Sources/AsyncHTTPClient/HTTPHandler.swift

+34-1
Original file line numberDiff line numberDiff line change
@@ -497,13 +497,15 @@ extension HTTPClient {
497497
var cancelled: Bool
498498
let lock: Lock
499499
let id = UUID()
500+
let poolingTimeout: TimeAmount?
500501

501-
init(eventLoop: EventLoop) {
502+
init(eventLoop: EventLoop, poolingTimeout: TimeAmount? = nil) {
502503
self.eventLoop = eventLoop
503504
self.promise = eventLoop.makePromise()
504505
self.completion = self.promise.futureResult.map { _ in }
505506
self.cancelled = false
506507
self.lock = Lock()
508+
self.poolingTimeout = poolingTimeout
507509
}
508510

509511
static func failedTask(eventLoop: EventLoop, error: Error) -> Task<Response> {
@@ -571,6 +573,19 @@ extension HTTPClient {
571573
connection.removeHandler(IdleStateHandler.self)
572574
}.flatMap {
573575
connection.removeHandler(TaskHandler<Delegate>.self)
576+
}.flatMap {
577+
let idlePoolConnectionHandler = IdlePoolConnectionHandler()
578+
return connection.channel.pipeline.addHandler(idlePoolConnectionHandler, position: .last).flatMap {
579+
connection.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: self.poolingTimeout), position: .before(idlePoolConnectionHandler))
580+
}
581+
}.flatMapError { error in
582+
if let error = error as? ChannelError, error == .ioOnClosedChannel {
583+
// We may get this error if channel is released because it is
584+
// closed, it is safe to ignore it
585+
return connection.channel.eventLoop.makeSucceededFuture(())
586+
} else {
587+
return connection.channel.eventLoop.makeFailedFuture(error)
588+
}
574589
}.map {
575590
connection.release()
576591
}.flatMapError { error in
@@ -1008,3 +1023,21 @@ internal struct RedirectHandler<ResponseType> {
10081023
}
10091024
}
10101025
}
1026+
1027+
class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler {
1028+
typealias InboundIn = NIOAny
1029+
1030+
let _hasNotSentClose: NIOAtomic<Bool> = .makeAtomic(value: true)
1031+
var hasNotSentClose: Bool {
1032+
return self._hasNotSentClose.load()
1033+
}
1034+
1035+
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
1036+
if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write {
1037+
self._hasNotSentClose.store(false)
1038+
context.close(promise: nil)
1039+
} else {
1040+
context.fireUserInboundEventTriggered(event)
1041+
}
1042+
}
1043+
}

Diff for: Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ extension HTTPClientInternalTests {
3636
("testResponseConnectionCloseGet", testResponseConnectionCloseGet),
3737
("testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool", testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool),
3838
("testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown", testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown),
39+
("testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection", testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection),
3940
]
4041
}
4142
}

Diff for: Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift

+102
Original file line numberDiff line numberDiff line change
@@ -623,4 +623,106 @@ class HTTPClientInternalTests: XCTestCase {
623623
}
624624
XCTAssertNoThrow(try client.syncShutdown())
625625
}
626+
627+
func testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection() {
628+
final class DelayChannelCloseUntilToldHandler: ChannelOutboundHandler {
629+
typealias OutboundIn = Any
630+
631+
enum State {
632+
case idling
633+
case delayedClose
634+
case closeDone
635+
}
636+
637+
var state: State = .idling
638+
let doTheCloseNowFuture: EventLoopFuture<Void>
639+
let sawTheClosePromise: EventLoopPromise<Void>
640+
641+
init(doTheCloseNowFuture: EventLoopFuture<Void>,
642+
sawTheClosePromise: EventLoopPromise<Void>) {
643+
self.doTheCloseNowFuture = doTheCloseNowFuture
644+
self.sawTheClosePromise = sawTheClosePromise
645+
}
646+
647+
func handlerRemoved(context: ChannelHandlerContext) {
648+
XCTAssertEqual(.closeDone, self.state)
649+
}
650+
651+
func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
652+
XCTAssertEqual(.idling, self.state)
653+
self.state = .delayedClose
654+
self.sawTheClosePromise.succeed(())
655+
// let's hold the close until the future's complete
656+
self.doTheCloseNowFuture.whenSuccess {
657+
context.close(mode: mode).map {
658+
XCTAssertEqual(.delayedClose, self.state)
659+
self.state = .closeDone
660+
}.cascade(to: promise)
661+
}
662+
}
663+
}
664+
665+
let web = HTTPBin()
666+
defer {
667+
XCTAssertNoThrow(try web.shutdown())
668+
}
669+
670+
let client = HTTPClient(eventLoopGroupProvider: .createNew)
671+
defer {
672+
XCTAssertNoThrow(try client.syncShutdown())
673+
}
674+
675+
let req = try! HTTPClient.Request(url: "http://localhost:\(web.serverChannel.localAddress!.port!)/get",
676+
method: .GET,
677+
body: nil)
678+
679+
// Let's start by getting a connection so we can mess with the Channel :).
680+
var maybeConnection: ConnectionPool.Connection?
681+
XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(for: req,
682+
preference: .indifferent,
683+
on: client.eventLoopGroup.next(),
684+
deadline: nil).wait())
685+
guard let connection = maybeConnection else {
686+
XCTFail("couldn't make connection")
687+
return
688+
}
689+
690+
let channel = connection.channel
691+
let doActualCloseNowPromise = channel.eventLoop.makePromise(of: Void.self)
692+
let sawTheClosePromise = channel.eventLoop.makePromise(of: Void.self)
693+
694+
XCTAssertNoThrow(try channel.pipeline.addHandler(DelayChannelCloseUntilToldHandler(doTheCloseNowFuture: doActualCloseNowPromise.futureResult,
695+
sawTheClosePromise: sawTheClosePromise),
696+
position: .first).wait())
697+
client.pool.release(connection)
698+
699+
XCTAssertNoThrow(try client.execute(request: req).wait())
700+
701+
// Now, let's pretend the timeout happened
702+
channel.pipeline.fireUserInboundEventTriggered(IdleStateHandler.IdleStateEvent.write)
703+
704+
// The Channel's closure should have already been initialised now but still, let's make sure the close
705+
// was initiated
706+
XCTAssertNoThrow(try sawTheClosePromise.futureResult.wait())
707+
// The Channel should still be active though because we delayed the close through our handler above.
708+
XCTAssertTrue(channel.isActive)
709+
710+
// When asking for a connection again, we should _not_ get the same one back because we did most of the close,
711+
// similar to what the SSLHandler would do.
712+
let connection2Future = client.pool.getConnection(for: req,
713+
preference: .indifferent,
714+
on: client.eventLoopGroup.next(),
715+
deadline: nil)
716+
doActualCloseNowPromise.succeed(())
717+
718+
XCTAssertNoThrow(try maybeConnection = connection2Future.wait())
719+
guard let connection2 = maybeConnection else {
720+
XCTFail("couldn't get second connection")
721+
return
722+
}
723+
724+
XCTAssert(connection !== connection2)
725+
client.pool.release(connection2)
726+
XCTAssertTrue(connection2.channel.isActive)
727+
}
626728
}

0 commit comments

Comments
 (0)