diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index f18a30a98..ce648469c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -374,15 +374,6 @@ class HTTP1ConnectionProvider { "ahc-action": "\(action)"]) connection.channel.close(promise: nil) self.execute(action, logger: logger) - case .cancel(let connection, let close): - logger.trace("cancelling connection", - metadata: ["ahc-connection": "\(connection)", - "ahc-close": "\(close)"]) - connection.cancel().whenComplete { _ in - if close { - self.closeAndDelete() - } - } case .fail(let waiter, let error): logger.debug("failing connection for waiter", metadata: ["ahc-waiter": "\(waiter)", @@ -428,7 +419,17 @@ class HTTP1ConnectionProvider { } return self.state.offer(connection: connection) } - waiter.promise.succeed(connection) + + switch action { + case .closeAnd: + // This happens when client was shut down during connect + logger.trace("connection cancelled due to client shutdown", + metadata: ["ahc-connection": "\(channel)"]) + connection.channel.close(promise: nil) + waiter.promise.fail(HTTPClientError.cancelled) + default: + waiter.promise.succeed(connection) + } case .failure(let error): logger.debug("connection attempt failed", metadata: ["ahc-error": "\(error)"]) @@ -437,6 +438,7 @@ class HTTP1ConnectionProvider { } waiter.promise.fail(error) } + waiter.setupComplete.whenComplete { _ in self.execute(action, logger: logger) } @@ -456,7 +458,7 @@ class HTTP1ConnectionProvider { // Since both `.park` and `.deleteProvider` are terminal in terms of execution, // we can execute them immediately self.execute(action, logger: logger) - case .cancel, .closeAnd, .create, .fail, .lease, .parkAnd, .replace: + case .closeAnd, .create, .fail, .lease, .parkAnd, .replace: // This is needed to start a new stack, otherwise, since this is called on a previous // future completion handler chain, it will be growing indefinitely until the connection is closed. // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. @@ -493,7 +495,7 @@ class HTTP1ConnectionProvider { $0.promise.fail(HTTPClientError.cancelled) } - if available.isEmpty, leased.isEmpty { + if available.isEmpty, leased.isEmpty, clean { self.closePromise.succeed(()) return self.closePromise.futureResult.map { clean } } diff --git a/Sources/AsyncHTTPClient/ConnectionsState.swift b/Sources/AsyncHTTPClient/ConnectionsState.swift index 0c5f908e2..d99416a49 100644 --- a/Sources/AsyncHTTPClient/ConnectionsState.swift +++ b/Sources/AsyncHTTPClient/ConnectionsState.swift @@ -23,7 +23,6 @@ extension HTTP1ConnectionProvider { case park(Connection) case none case fail(Waiter, Error) - case cancel(Connection, Bool) indirect case closeAnd(Connection, Action) indirect case parkAnd(Connection, Action) } @@ -209,8 +208,9 @@ extension HTTP1ConnectionProvider { case .active: self.leasedConnections.insert(ConnectionKey(connection)) return .none - case .closed: // This can happen when we close the client while connections was being estableshed - return .cancel(connection, self.isEmpty) + case .closed: // This can happen when we close the client while connections was being established + self.openedConnectionsCount -= 1 + return .closeAnd(connection, self.processNextWaiter()) } } @@ -233,7 +233,9 @@ extension HTTP1ConnectionProvider { // user calls `syncShutdown` before we received an error from the bootstrap. In this scenario, // pool will be `.closed` but connection will be still in the process of being established/failed, // so then this process finishes, it will get to this point. - return .none + // We need to call `processNextWaiter` to finish deleting provider from the pool. + self.openedConnectionsCount -= 1 + return self.processNextWaiter() } } @@ -273,7 +275,13 @@ extension HTTP1ConnectionProvider { return .closeAnd(connection, self.processNextWaiter()) case .closed: - return .none + // This situation can happen when we call close, state changes, but before we call `close` on all + // available connections, in this case we should close this connection and, potentially, + // delete the provider + self.openedConnectionsCount -= 1 + self.availableConnections.removeAll { $0 === connection } + + return .closeAnd(connection, self.processNextWaiter()) } } diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift index b751ec1ce..04cfa6421 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift @@ -62,6 +62,10 @@ extension ConnectionPoolTests { ("testConnectionRemoteCloseRelease", testConnectionRemoteCloseRelease), ("testConnectionTimeoutRelease", testConnectionTimeoutRelease), ("testAcquireAvailableBecomesUnavailable", testAcquireAvailableBecomesUnavailable), + ("testShutdownOnPendingAndSuccess", testShutdownOnPendingAndSuccess), + ("testShutdownOnPendingAndError", testShutdownOnPendingAndError), + ("testShutdownTimeout", testShutdownTimeout), + ("testShutdownRemoteClosed", testShutdownRemoteClosed), ] } } diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index bacc04dd8..aee407b28 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -1391,6 +1391,151 @@ class ConnectionPoolTests: XCTestCase { } } + // MARK: - Shutdown tests + + func testShutdownOnPendingAndSuccess() { + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + + XCTAssertTrue(state.enqueue()) + + let connectionPromise = self.eventLoop.makePromise(of: Connection.self) + let setupPromise = self.eventLoop.makePromise(of: Void.self) + let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) + var action = state.acquire(waiter: waiter) + + guard case .create = action else { + XCTFail("unexpected action \(action)") + return + } + + let snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(snapshot.openedConnectionsCount, 1) + + if let (waiters, available, leased, clean) = state.close() { + XCTAssertTrue(waiters.isEmpty) + XCTAssertTrue(available.isEmpty) + XCTAssertTrue(leased.isEmpty) + XCTAssertFalse(clean) + } else { + XCTFail("Expecting snapshot") + } + + let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) + + action = state.offer(connection: connection) + guard case .closeAnd(_, .closeProvider) = action else { + XCTFail("unexpected action \(action)") + return + } + + connectionPromise.fail(TempError()) + setupPromise.succeed(()) + } + + func testShutdownOnPendingAndError() { + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + + XCTAssertTrue(state.enqueue()) + + let connectionPromise = self.eventLoop.makePromise(of: Connection.self) + let setupPromise = self.eventLoop.makePromise(of: Void.self) + let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) + var action = state.acquire(waiter: waiter) + + guard case .create = action else { + XCTFail("unexpected action \(action)") + return + } + + let snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(snapshot.openedConnectionsCount, 1) + + if let (waiters, available, leased, clean) = state.close() { + XCTAssertTrue(waiters.isEmpty) + XCTAssertTrue(available.isEmpty) + XCTAssertTrue(leased.isEmpty) + XCTAssertFalse(clean) + } else { + XCTFail("Expecting snapshot") + } + + action = state.connectFailed() + guard case .closeProvider = action else { + XCTFail("unexpected action \(action)") + return + } + + connectionPromise.fail(TempError()) + setupPromise.succeed(()) + } + + func testShutdownTimeout() { + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + + let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + state.testsOnly_setInternalState(snapshot) + + if let (waiters, available, leased, clean) = state.close() { + XCTAssertTrue(waiters.isEmpty) + XCTAssertFalse(available.isEmpty) + XCTAssertTrue(leased.isEmpty) + XCTAssertTrue(clean) + } else { + XCTFail("Expecting snapshot") + } + + let action = state.timeout(connection: connection) + switch action { + case .closeAnd(_, let next): + switch next { + case .closeProvider: + // expected + break + default: + XCTFail("Unexpected action: \(action)") + } + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testShutdownRemoteClosed() { + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + + let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + state.testsOnly_setInternalState(snapshot) + + if let (waiters, available, leased, clean) = state.close() { + XCTAssertTrue(waiters.isEmpty) + XCTAssertFalse(available.isEmpty) + XCTAssertTrue(leased.isEmpty) + XCTAssertTrue(clean) + } else { + XCTFail("Expecting snapshot") + } + + let action = state.remoteClosed(connection: connection) + switch action { + case .closeProvider: + // expected + break + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Helpers + override func setUp() { XCTAssertNil(self.eventLoop) XCTAssertNil(self.http1ConnectionProvider)