From 3239c9ff9bc5a7c1dfeee4f4c50fb9bf840a2ded Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 12 Jun 2020 17:26:11 +0100 Subject: [PATCH 1/3] refactor provider close and pending flows --- Sources/AsyncHTTPClient/ConnectionPool.swift | 4 +- .../AsyncHTTPClient/ConnectionsState.swift | 4 +- .../ConnectionPoolTests.swift | 51 ++++++++++++------- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 46e0b6109..c0395efe8 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -80,12 +80,12 @@ final class ConnectionPool { if let existing = self.providers[key], existing.enqueue() { return existing } else { - // Connection provider will be created with `pending = 1` let provider = HTTP1ConnectionProvider(key: key, eventLoop: taskEventLoop, configuration: self.configuration, pool: self, backgroundActivityLogger: self.backgroundActivityLogger) + _ = provider.enqueue() self.providers[key] = provider return provider } @@ -263,8 +263,6 @@ struct ConnectionKey: Hashable { /// of concurrent requests as it has built-in politeness regarding the maximum number /// of concurrent requests to the server. class HTTP1ConnectionProvider { - struct ProviderClosedError: Error {} - /// The client configuration used to bootstrap new requests private let configuration: HTTPClient.Configuration diff --git a/Sources/AsyncHTTPClient/ConnectionsState.swift b/Sources/AsyncHTTPClient/ConnectionsState.swift index 7d82771be..9962d5dd5 100644 --- a/Sources/AsyncHTTPClient/ConnectionsState.swift +++ b/Sources/AsyncHTTPClient/ConnectionsState.swift @@ -64,7 +64,7 @@ extension HTTP1ConnectionProvider { private var openedConnectionsCount: Int = 0 /// Number of enqueued requests, used to track if it is safe to delete the provider. - private var pending: Int = 1 + private var pending: Int = 0 init(maximumConcurrentConnections: Int = 8, eventLoop: EventLoop) { self.maximumConcurrentConnections = maximumConcurrentConnections @@ -148,7 +148,7 @@ extension HTTP1ConnectionProvider { return .none } case .closed: - return .fail(waiter, ProviderClosedError()) + return .fail(waiter, HTTPClientError.alreadyShutdown) } } diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index 16ab7c8f9..10a597564 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -36,7 +36,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) XCTAssertTrue(state.enqueue()) @@ -45,7 +45,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(2, snapshot.pending) + XCTAssertEqual(1, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) } @@ -58,9 +58,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertTrue(state.enqueue()) let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .create(let waiter): @@ -92,9 +93,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertTrue(self.http1ConnectionProvider.enqueue()) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .lease(let connection, let waiter): @@ -127,9 +130,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertTrue(self.http1ConnectionProvider.enqueue()) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .none: @@ -159,9 +164,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) switch action { case .create(let waiter): @@ -192,9 +199,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertTrue(self.http1ConnectionProvider.enqueue()) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) switch action { case .lease(let connection, let waiter): @@ -231,9 +240,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertTrue(self.http1ConnectionProvider.enqueue()) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) switch action { case .replace(_, let waiter): @@ -266,9 +277,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertTrue(self.http1ConnectionProvider.enqueue()) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) switch action { case .none: @@ -1177,7 +1190,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) connection.release(closing: false, logger: HTTPClient.loggingDisabled) @@ -1187,7 +1200,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup @@ -1211,7 +1224,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) connection.release(closing: true, logger: HTTPClient.loggingDisabled) @@ -1220,7 +1233,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) // cleanup @@ -1244,7 +1257,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) connection.remoteClosed(logger: HTTPClient.loggingDisabled) @@ -1253,7 +1266,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) // cleanup @@ -1277,7 +1290,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) connection.timeout(logger: HTTPClient.loggingDisabled) @@ -1286,7 +1299,7 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) // cleanup @@ -1309,9 +1322,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.availableConnections.count) XCTAssertEqual(0, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertTrue(self.http1ConnectionProvider.enqueue()) + let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .lease(let connection, let waiter): From a63485e503eed9596e2ddbe581dfa0b3fecaa9d2 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 12 Jun 2020 20:56:17 +0100 Subject: [PATCH 2/3] review fixes --- Sources/AsyncHTTPClient/ConnectionPool.swift | 12 +- .../ConnectionPoolTests.swift | 179 +++++++++--------- 2 files changed, 102 insertions(+), 89 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index c0395efe8..4a9d20174 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -85,7 +85,8 @@ final class ConnectionPool { configuration: self.configuration, pool: self, backgroundActivityLogger: self.backgroundActivityLogger) - _ = provider.enqueue() + let enqueued = provider.enqueue() + assert(enqueued) self.providers[key] = provider return provider } @@ -316,7 +317,7 @@ class HTTP1ConnectionProvider { self.state.assertInvariants() } - private func execute(_ action: Action, logger: Logger) { + func execute(_ action: Action, logger: Logger) { switch action { case .lease(let connection, let waiter): // if connection is became inactive, we create a new one. @@ -488,10 +489,17 @@ class HTTP1ConnectionProvider { func close() -> EventLoopFuture { if let (waiters, available, leased, clean) = self.lock.withLock({ self.state.close() }) { + print(waiters.count, available.count, leased.count, clean) + waiters.forEach { $0.promise.fail(HTTPClientError.cancelled) } + if available.isEmpty, leased.isEmpty { + self.closePromise.succeed(()) + return self.closePromise.futureResult.map { clean } + } + EventLoopFuture.andAllComplete(leased.map { $0.cancel() }, on: self.eventLoop).flatMap { _ in EventLoopFuture.andAllComplete(available.map { $0.close() }, on: self.eventLoop) }.whenFailure { error in diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index 10a597564..0b7f94dee 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -110,11 +110,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - // cleanup, since we don't call release + // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -216,11 +215,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - // cleanup, since we don't call release + // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -320,7 +318,6 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } - print(state.testsOnly_getInternalState()) } // MARK: - Release Tests @@ -351,15 +348,14 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + connection.remoteClosed(logger: HTTPClient.loggingDisabled) } func testReleaseAliveButClosingConnectionEmptyQueue() throws { @@ -392,6 +388,11 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } func testReleaseInactiveConnectionEmptyQueue() throws { @@ -421,9 +422,15 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) + default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } func testReleaseInactiveConnectionEmptyQueueHasConnections() throws { @@ -455,15 +462,14 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + connection.remoteClosed(logger: HTTPClient.loggingDisabled) } func testReleaseAliveConnectionHasWaiter() throws { @@ -488,8 +494,8 @@ class ConnectionPoolTests: XCTestCase { let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): - // XCTAssertTrue(connection.isInUse) snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) XCTAssertEqual(0, snapshot.waiters.count) @@ -500,8 +506,7 @@ class ConnectionPoolTests: XCTestCase { // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -539,9 +544,8 @@ class ConnectionPoolTests: XCTestCase { // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.fail(TempError()) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // simulate create -> use -> release cycle + self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -580,9 +584,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -621,9 +626,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -665,9 +671,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(2, snapshot.openedConnectionsCount) // cleanup - waiter.promise.succeed(connection) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + // simulate create -> use -> release cycle + self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) + connection.remoteClosed(logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -710,8 +718,8 @@ class ConnectionPoolTests: XCTestCase { // cleanup waiter.promise.succeed(replacement) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + connection.remoteClosed(logger: HTTPClient.loggingDisabled) + self.http1ConnectionProvider.release(connection: replacement, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -752,9 +760,14 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(8, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.fail(TempError()) - snapshot.openedConnectionsCount = 0 + snapshot.openedConnectionsCount = 2 self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + + snapshot.availableConnections.forEach { $0.remoteClosed(logger: HTTPClient.loggingDisabled) } + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -796,9 +809,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(1, snapshot.openedConnectionsCount) // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -838,9 +852,10 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(2, snapshot.openedConnectionsCount) // cleanup - waiter.promise.fail(TempError()) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234)s + self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) + snapshot.availableConnections.forEach { $0.remoteClosed(logger: HTTPClient.loggingDisabled) } default: XCTFail("Unexpected action: \(action)") } @@ -872,6 +887,11 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } func testNextWaiterEmptyQueueHasConnections() throws { @@ -905,10 +925,7 @@ class ConnectionPoolTests: XCTestCase { // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - XCTAssertNoThrow(try available.close().wait()) - snapshot.availableConnections.removeAll() - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + available.remoteClosed(logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -948,9 +965,8 @@ class ConnectionPoolTests: XCTestCase { // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.succeed(connection) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + waiter.promise.fail(TempError()) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -979,7 +995,7 @@ class ConnectionPoolTests: XCTestCase { let action = self.http1ConnectionProvider.state.processNextWaiter() switch action { case .lease(let connection, let waiter): - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + let snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) XCTAssertEqual(0, snapshot.availableConnections.count) XCTAssertEqual(1, snapshot.leasedConnections.count) @@ -990,9 +1006,8 @@ class ConnectionPoolTests: XCTestCase { // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.succeed(connection) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + waiter.promise.fail(TempError()) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -1031,9 +1046,9 @@ class ConnectionPoolTests: XCTestCase { // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.fail(TempError()) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // simulate create -> use -> release cycle + self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) + available.remoteClosed(logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -1071,6 +1086,11 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) } func testTimeoutAvailableConnection() throws { @@ -1109,6 +1129,11 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } func testRemoteClosedLeasedConnection() throws { @@ -1141,6 +1166,11 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) } func testRemoteClosedAvailableConnection() throws { @@ -1173,6 +1203,11 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } + + // cleanup + // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead + // (https://github.com/swift-server/async-http-client/issues/234) + self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } // MARK: - Connection Tests @@ -1206,8 +1241,7 @@ class ConnectionPoolTests: XCTestCase { // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.pending = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + connection.remoteClosed(logger: HTTPClient.loggingDisabled) } func testConnectionReleaseInactive() throws { @@ -1235,12 +1269,6 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.pending = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } func testConnectionRemoteCloseRelease() throws { @@ -1268,12 +1296,6 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.pending = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } func testConnectionTimeoutRelease() throws { @@ -1301,12 +1323,6 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.waiters.count) XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(0, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.pending = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) } func testAcquireAvailableBecomesUnavailable() throws { @@ -1354,13 +1370,11 @@ class ConnectionPoolTests: XCTestCase { XCTAssertEqual(0, snapshot.pending) XCTAssertEqual(1, snapshot.openedConnectionsCount) - waiter.promise.succeed(connection) - // cleanup // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + waiter.promise.fail(TempError()) + self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } @@ -1381,15 +1395,6 @@ class ConnectionPoolTests: XCTestCase { override func tearDown() { XCTAssertNotNil(self.eventLoop) XCTAssertNotNil(self.http1ConnectionProvider) - /* BEGIN workaround for #232, this whole block is to be replaced by the commented out line below */ - // not closing the provider here (https://github.com/swift-server/async-http-client/issues/232) - var state = self.http1ConnectionProvider.state.testsOnly_getInternalState() - if state.pending == 1, state.waiters.isEmpty, state.leasedConnections.isEmpty, state.openedConnectionsCount == 0 { - state.pending = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(state) - } - self.http1ConnectionProvider.closePromise.succeed(()) - /* END workaround for #232 */ XCTAssertNoThrow(try self.http1ConnectionProvider.close().wait()) XCTAssertNoThrow(try self.eventLoop.syncShutdownGracefully()) self.eventLoop = nil From 87da072e779798ccfdd1f4d0e13fb2e721fa153f Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 12 Jun 2020 21:01:04 +0100 Subject: [PATCH 3/3] remove debug output --- Sources/AsyncHTTPClient/ConnectionPool.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 4a9d20174..f18a30a98 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -489,8 +489,6 @@ class HTTP1ConnectionProvider { func close() -> EventLoopFuture { if let (waiters, available, leased, clean) = self.lock.withLock({ self.state.close() }) { - print(waiters.count, available.count, leased.count, clean) - waiters.forEach { $0.promise.fail(HTTPClientError.cancelled) }