Skip to content

All internal connection flow should be executed when shutting down #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Sources/AsyncHTTPClient/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,6 @@ class HTTP1ConnectionProvider {
"ahc-action": "\(action)"])
connection.channel.close(promise: nil)
self.execute(action, logger: logger)
case .cancel(let connection, let close):
logger.trace("cancelling connection",
metadata: ["ahc-connection": "\(connection)",
"ahc-close": "\(close)"])
connection.cancel().whenComplete { _ in
if close {
self.closeAndDelete()
}
}
case .fail(let waiter, let error):
logger.debug("failing connection for waiter",
metadata: ["ahc-waiter": "\(waiter)",
Expand Down Expand Up @@ -428,7 +419,15 @@ class HTTP1ConnectionProvider {
}
return self.state.offer(connection: connection)
}
waiter.promise.succeed(connection)

switch action {
case .closeAnd:
// This happens when client was shut down during connect
connection.channel.close(promise: nil)
waiter.promise.fail(HTTPClientError.cancelled)
default:
waiter.promise.succeed(connection)
}
case .failure(let error):
logger.debug("connection attempt failed",
metadata: ["ahc-error": "\(error)"])
Expand All @@ -437,6 +436,7 @@ class HTTP1ConnectionProvider {
}
waiter.promise.fail(error)
}

waiter.setupComplete.whenComplete { _ in
self.execute(action, logger: logger)
}
Expand All @@ -456,7 +456,7 @@ class HTTP1ConnectionProvider {
// Since both `.park` and `.deleteProvider` are terminal in terms of execution,
// we can execute them immediately
self.execute(action, logger: logger)
case .cancel, .closeAnd, .create, .fail, .lease, .parkAnd, .replace:
case .closeAnd, .create, .fail, .lease, .parkAnd, .replace:
// This is needed to start a new stack, otherwise, since this is called on a previous
// future completion handler chain, it will be growing indefinitely until the connection is closed.
// We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved.
Expand Down Expand Up @@ -493,7 +493,7 @@ class HTTP1ConnectionProvider {
$0.promise.fail(HTTPClientError.cancelled)
}

if available.isEmpty, leased.isEmpty {
if available.isEmpty, leased.isEmpty, clean {
self.closePromise.succeed(())
return self.closePromise.futureResult.map { clean }
}
Expand Down
18 changes: 13 additions & 5 deletions Sources/AsyncHTTPClient/ConnectionsState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ extension HTTP1ConnectionProvider {
case park(Connection)
case none
case fail(Waiter, Error)
case cancel(Connection, Bool)
indirect case closeAnd(Connection, Action)
indirect case parkAnd(Connection, Action)
}
Expand Down Expand Up @@ -209,8 +208,9 @@ extension HTTP1ConnectionProvider {
case .active:
self.leasedConnections.insert(ConnectionKey(connection))
return .none
case .closed: // This can happen when we close the client while connections was being estableshed
return .cancel(connection, self.isEmpty)
case .closed: // This can happen when we close the client while connections was being established
self.openedConnectionsCount -= 1
return .closeAnd(connection, self.processNextWaiter())
}
}

Expand All @@ -233,7 +233,9 @@ extension HTTP1ConnectionProvider {
// user calls `syncShutdown` before we received an error from the bootstrap. In this scenario,
// pool will be `.closed` but connection will be still in the process of being established/failed,
// so then this process finishes, it will get to this point.
return .none
// We need to call `processNextWaiter` to finish deleting provider from the pool.
self.openedConnectionsCount -= 1
return self.processNextWaiter()
}
}

Expand Down Expand Up @@ -273,7 +275,13 @@ extension HTTP1ConnectionProvider {

return .closeAnd(connection, self.processNextWaiter())
case .closed:
return .none
// This situation can happen when we call close, state changes, but before we call `close` on all
// available connections, in this case we should close this connection and, potentially,
// delete the provider
self.openedConnectionsCount -= 1
self.availableConnections.removeAll { $0 === connection }

return .closeAnd(connection, self.processNextWaiter())
}
}

Expand Down
4 changes: 4 additions & 0 deletions Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ extension ConnectionPoolTests {
("testConnectionRemoteCloseRelease", testConnectionRemoteCloseRelease),
("testConnectionTimeoutRelease", testConnectionTimeoutRelease),
("testAcquireAvailableBecomesUnavailable", testAcquireAvailableBecomesUnavailable),
("testShutdownOnPendingAndSuccess", testShutdownOnPendingAndSuccess),
("testShutdownOnPendingAndError", testShutdownOnPendingAndError),
("testShutdownTimeout", testShutdownTimeout),
("testShutdownRemoteClosed", testShutdownRemoteClosed),
]
}
}
162 changes: 162 additions & 0 deletions Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,168 @@ class ConnectionPoolTests: XCTestCase {
}
}

// MARK: - Shutdown tests

func testShutdownOnPendingAndSuccess() {
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)

XCTAssertTrue(state.enqueue())

let connectionPromise = self.eventLoop.makePromise(of: Connection.self)
let setupPromise = self.eventLoop.makePromise(of: Void.self)
let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent)
var action = state.acquire(waiter: waiter)

switch action {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can write this a bit shorter as

guard case .create = action else {
    XCTFail("unexpected action \(action)")
    return
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

case .create:
// expected
break
default:
XCTFail("Unexpected action: \(action)")
}

let snapshot = state.testsOnly_getInternalState()
XCTAssertEqual(snapshot.openedConnectionsCount, 1)

if let (waiters, available, leased, clean) = state.close() {
XCTAssertTrue(waiters.isEmpty)
XCTAssertTrue(available.isEmpty)
XCTAssertTrue(leased.isEmpty)
XCTAssertFalse(clean)
} else {
XCTFail("Expecting snapshot")
}

let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)

action = state.offer(connection: connection)
switch action {
case .closeAnd(_, let next):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this

guard case .closeAnd(_, .closeProvider) = action else {
    XCTFail("unexpected action \(action)")
    return
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

switch next {
case .closeProvider:
// expected
break
default:
XCTFail("Unexpected action: \(action)")
}
default:
XCTFail("Unexpected action: \(action)")
}

connectionPromise.fail(TempError())
setupPromise.succeed(())
}

func testShutdownOnPendingAndError() {
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)

XCTAssertTrue(state.enqueue())

let connectionPromise = self.eventLoop.makePromise(of: Connection.self)
let setupPromise = self.eventLoop.makePromise(of: Void.self)
let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent)
var action = state.acquire(waiter: waiter)

switch action {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be a guard too I think

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

case .create:
// expected
break
default:
XCTFail("Unexpected action: \(action)")
}

let snapshot = state.testsOnly_getInternalState()
XCTAssertEqual(snapshot.openedConnectionsCount, 1)

if let (waiters, available, leased, clean) = state.close() {
XCTAssertTrue(waiters.isEmpty)
XCTAssertTrue(available.isEmpty)
XCTAssertTrue(leased.isEmpty)
XCTAssertFalse(clean)
} else {
XCTFail("Expecting snapshot")
}

action = state.connectFailed()
switch action {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this too

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

case .closeProvider:
// expected
break
default:
XCTFail("Unexpected action: \(action)")
}

connectionPromise.fail(TempError())
setupPromise.succeed(())
}

func testShutdownTimeout() {
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)

var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState()

let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)
snapshot.availableConnections.append(connection)
snapshot.openedConnectionsCount = 1

state.testsOnly_setInternalState(snapshot)

if let (waiters, available, leased, clean) = state.close() {
XCTAssertTrue(waiters.isEmpty)
XCTAssertFalse(available.isEmpty)
XCTAssertTrue(leased.isEmpty)
XCTAssertTrue(clean)
} else {
XCTFail("Expecting snapshot")
}

let action = state.timeout(connection: connection)
switch action {
case .closeAnd(_, let next):
switch next {
case .closeProvider:
// expected
break
default:
XCTFail("Unexpected action: \(action)")
}
default:
XCTFail("Unexpected action: \(action)")
}
}

func testShutdownRemoteClosed() {
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)

var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState()

let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)
snapshot.availableConnections.append(connection)
snapshot.openedConnectionsCount = 1

state.testsOnly_setInternalState(snapshot)

if let (waiters, available, leased, clean) = state.close() {
XCTAssertTrue(waiters.isEmpty)
XCTAssertFalse(available.isEmpty)
XCTAssertTrue(leased.isEmpty)
XCTAssertTrue(clean)
} else {
XCTFail("Expecting snapshot")
}

let action = state.remoteClosed(connection: connection)
switch action {
case .closeProvider:
// expected
break
default:
XCTFail("Unexpected action: \(action)")
}
}

// MARK: - Helpers

override func setUp() {
XCTAssertNil(self.eventLoop)
XCTAssertNil(self.http1ConnectionProvider)
Expand Down