Skip to content

All internal connection flow should be executed when shutting down #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions Sources/AsyncHTTPClient/ConnectionPool.swift
Original file line number Diff line number Diff line change
@@ -374,15 +374,6 @@ class HTTP1ConnectionProvider {
"ahc-action": "\(action)"])
connection.channel.close(promise: nil)
self.execute(action, logger: logger)
case .cancel(let connection, let close):
logger.trace("cancelling connection",
metadata: ["ahc-connection": "\(connection)",
"ahc-close": "\(close)"])
connection.cancel().whenComplete { _ in
if close {
self.closeAndDelete()
}
}
case .fail(let waiter, let error):
logger.debug("failing connection for waiter",
metadata: ["ahc-waiter": "\(waiter)",
@@ -428,7 +419,17 @@ class HTTP1ConnectionProvider {
}
return self.state.offer(connection: connection)
}
waiter.promise.succeed(connection)

switch action {
case .closeAnd:
// This happens when client was shut down during connect
logger.trace("connection cancelled due to client shutdown",
metadata: ["ahc-connection": "\(channel)"])
connection.channel.close(promise: nil)
waiter.promise.fail(HTTPClientError.cancelled)
default:
waiter.promise.succeed(connection)
}
case .failure(let error):
logger.debug("connection attempt failed",
metadata: ["ahc-error": "\(error)"])
@@ -437,6 +438,7 @@ class HTTP1ConnectionProvider {
}
waiter.promise.fail(error)
}

waiter.setupComplete.whenComplete { _ in
self.execute(action, logger: logger)
}
@@ -456,7 +458,7 @@ class HTTP1ConnectionProvider {
// Since both `.park` and `.deleteProvider` are terminal in terms of execution,
// we can execute them immediately
self.execute(action, logger: logger)
case .cancel, .closeAnd, .create, .fail, .lease, .parkAnd, .replace:
case .closeAnd, .create, .fail, .lease, .parkAnd, .replace:
// This is needed to start a new stack, otherwise, since this is called on a previous
// future completion handler chain, it will be growing indefinitely until the connection is closed.
// We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved.
@@ -493,7 +495,7 @@ class HTTP1ConnectionProvider {
$0.promise.fail(HTTPClientError.cancelled)
}

if available.isEmpty, leased.isEmpty {
if available.isEmpty, leased.isEmpty, clean {
self.closePromise.succeed(())
return self.closePromise.futureResult.map { clean }
}
18 changes: 13 additions & 5 deletions Sources/AsyncHTTPClient/ConnectionsState.swift
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ extension HTTP1ConnectionProvider {
case park(Connection)
case none
case fail(Waiter, Error)
case cancel(Connection, Bool)
indirect case closeAnd(Connection, Action)
indirect case parkAnd(Connection, Action)
}
@@ -209,8 +208,9 @@ extension HTTP1ConnectionProvider {
case .active:
self.leasedConnections.insert(ConnectionKey(connection))
return .none
case .closed: // This can happen when we close the client while connections was being estableshed
return .cancel(connection, self.isEmpty)
case .closed: // This can happen when we close the client while connections was being established
self.openedConnectionsCount -= 1
return .closeAnd(connection, self.processNextWaiter())
}
}

@@ -233,7 +233,9 @@ extension HTTP1ConnectionProvider {
// user calls `syncShutdown` before we received an error from the bootstrap. In this scenario,
// pool will be `.closed` but connection will be still in the process of being established/failed,
// so then this process finishes, it will get to this point.
return .none
// We need to call `processNextWaiter` to finish deleting provider from the pool.
self.openedConnectionsCount -= 1
return self.processNextWaiter()
}
}

@@ -273,7 +275,13 @@ extension HTTP1ConnectionProvider {

return .closeAnd(connection, self.processNextWaiter())
case .closed:
return .none
// This situation can happen when we call close, state changes, but before we call `close` on all
// available connections, in this case we should close this connection and, potentially,
// delete the provider
self.openedConnectionsCount -= 1
self.availableConnections.removeAll { $0 === connection }

return .closeAnd(connection, self.processNextWaiter())
}
}

4 changes: 4 additions & 0 deletions Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift
Original file line number Diff line number Diff line change
@@ -62,6 +62,10 @@ extension ConnectionPoolTests {
("testConnectionRemoteCloseRelease", testConnectionRemoteCloseRelease),
("testConnectionTimeoutRelease", testConnectionTimeoutRelease),
("testAcquireAvailableBecomesUnavailable", testAcquireAvailableBecomesUnavailable),
("testShutdownOnPendingAndSuccess", testShutdownOnPendingAndSuccess),
("testShutdownOnPendingAndError", testShutdownOnPendingAndError),
("testShutdownTimeout", testShutdownTimeout),
("testShutdownRemoteClosed", testShutdownRemoteClosed),
]
}
}
145 changes: 145 additions & 0 deletions Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
@@ -1391,6 +1391,151 @@ class ConnectionPoolTests: XCTestCase {
}
}

// MARK: - Shutdown tests

func testShutdownOnPendingAndSuccess() {
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)

XCTAssertTrue(state.enqueue())

let connectionPromise = self.eventLoop.makePromise(of: Connection.self)
let setupPromise = self.eventLoop.makePromise(of: Void.self)
let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent)
var action = state.acquire(waiter: waiter)

guard case .create = action else {
XCTFail("unexpected action \(action)")
return
}

let snapshot = state.testsOnly_getInternalState()
XCTAssertEqual(snapshot.openedConnectionsCount, 1)

if let (waiters, available, leased, clean) = state.close() {
XCTAssertTrue(waiters.isEmpty)
XCTAssertTrue(available.isEmpty)
XCTAssertTrue(leased.isEmpty)
XCTAssertFalse(clean)
} else {
XCTFail("Expecting snapshot")
}

let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)

action = state.offer(connection: connection)
guard case .closeAnd(_, .closeProvider) = action else {
XCTFail("unexpected action \(action)")
return
}

connectionPromise.fail(TempError())
setupPromise.succeed(())
}

func testShutdownOnPendingAndError() {
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)

XCTAssertTrue(state.enqueue())

let connectionPromise = self.eventLoop.makePromise(of: Connection.self)
let setupPromise = self.eventLoop.makePromise(of: Void.self)
let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent)
var action = state.acquire(waiter: waiter)

guard case .create = action else {
XCTFail("unexpected action \(action)")
return
}

let snapshot = state.testsOnly_getInternalState()
XCTAssertEqual(snapshot.openedConnectionsCount, 1)

if let (waiters, available, leased, clean) = state.close() {
XCTAssertTrue(waiters.isEmpty)
XCTAssertTrue(available.isEmpty)
XCTAssertTrue(leased.isEmpty)
XCTAssertFalse(clean)
} else {
XCTFail("Expecting snapshot")
}

action = state.connectFailed()
guard case .closeProvider = action else {
XCTFail("unexpected action \(action)")
return
}

connectionPromise.fail(TempError())
setupPromise.succeed(())
}

func testShutdownTimeout() {
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)

var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState()

let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)
snapshot.availableConnections.append(connection)
snapshot.openedConnectionsCount = 1

state.testsOnly_setInternalState(snapshot)

if let (waiters, available, leased, clean) = state.close() {
XCTAssertTrue(waiters.isEmpty)
XCTAssertFalse(available.isEmpty)
XCTAssertTrue(leased.isEmpty)
XCTAssertTrue(clean)
} else {
XCTFail("Expecting snapshot")
}

let action = state.timeout(connection: connection)
switch action {
case .closeAnd(_, let next):
switch next {
case .closeProvider:
// expected
break
default:
XCTFail("Unexpected action: \(action)")
}
default:
XCTFail("Unexpected action: \(action)")
}
}

func testShutdownRemoteClosed() {
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)

var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState()

let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)
snapshot.availableConnections.append(connection)
snapshot.openedConnectionsCount = 1

state.testsOnly_setInternalState(snapshot)

if let (waiters, available, leased, clean) = state.close() {
XCTAssertTrue(waiters.isEmpty)
XCTAssertFalse(available.isEmpty)
XCTAssertTrue(leased.isEmpty)
XCTAssertTrue(clean)
} else {
XCTFail("Expecting snapshot")
}

let action = state.remoteClosed(connection: connection)
switch action {
case .closeProvider:
// expected
break
default:
XCTFail("Unexpected action: \(action)")
}
}

// MARK: - Helpers

override func setUp() {
XCTAssertNil(self.eventLoop)
XCTAssertNil(self.http1ConnectionProvider)