From 8e4093c2335b3945f6ee465cdee9889e86b5bba6 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 9 Sep 2021 16:10:12 +0200 Subject: [PATCH 1/3] Add HTTPConnectionPool --- .../ConnectionPool/HTTPConnectionPool.swift | 369 +++++++++++++++++- .../HTTPClientTestUtils.swift | 20 +- .../HTTPConnectionPoolTests+XCTest.swift | 36 ++ .../HTTPConnectionPoolTests.swift | 358 +++++++++++++++++ Tests/LinuxMain.swift | 1 + 5 files changed, 777 insertions(+), 7 deletions(-) create mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift create mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests.swift diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index ba8c7ae62..0f6339687 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -12,9 +12,16 @@ // //===----------------------------------------------------------------------===// +import Logging +import NIOConcurrencyHelpers import NIOCore +import NIOSSL -enum HTTPConnectionPool { +protocol HTTPConnectionPoolDelegate { + func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) +} + +class HTTPConnectionPool { struct Connection: Hashable { typealias ID = Int @@ -121,6 +128,364 @@ enum HTTPConnectionPool { } } } + + let stateLock = Lock() + private var _state: StateMachine { + didSet { + self.logger.trace("Connection Pool State changed", metadata: [ + "key": "\(self.key)", + "state": "\(self._state)", + ]) + } + } + + let timerLock = Lock() + private var _requestTimer = [Request.ID: Scheduled]() + private var _idleTimer = [Connection.ID: Scheduled]() + private var _backoffTimer = [Connection.ID: Scheduled]() + + let key: ConnectionPool.Key + var logger: Logger + + let eventLoopGroup: EventLoopGroup + let connectionFactory: ConnectionFactory + let clientConfiguration: HTTPClient.Configuration + let idleConnectionTimeout: TimeAmount + + let delegate: HTTPConnectionPoolDelegate + + init(eventLoopGroup: EventLoopGroup, + sslContextCache: SSLContextCache, + tlsConfiguration: TLSConfiguration?, + clientConfiguration: HTTPClient.Configuration, + key: ConnectionPool.Key, + delegate: HTTPConnectionPoolDelegate, + idGenerator: Connection.ID.Generator, + backgroundActivityLogger logger: Logger) { + self.eventLoopGroup = eventLoopGroup + self.connectionFactory = ConnectionFactory( + key: key, + tlsConfiguration: tlsConfiguration, + clientConfiguration: clientConfiguration, + sslContextCache: sslContextCache + ) + self.clientConfiguration = clientConfiguration + self.key = key + self.delegate = delegate + self.logger = logger + + self.idleConnectionTimeout = clientConfiguration.connectionPool.idleTimeout + + self._state = StateMachine( + eventLoopGroup: eventLoopGroup, + idGenerator: idGenerator, + maximumConcurrentHTTP1Connections: 8 + ) + } + + func executeRequest(_ request: HTTPSchedulableRequest) { + let action = self.stateLock.withLock { () -> StateMachine.Action in + self._state.executeRequest(.init(request)) + } + self.run(action: action) + } + + func shutdown() { + let action = self.stateLock.withLock { () -> StateMachine.Action in + self._state.shutdown() + } + self.run(action: action) + } + + func run(action: StateMachine.Action) { + self.runConnectionAction(action.connection) + self.runRequestAction(action.request) + } + + func runConnectionAction(_ action: StateMachine.ConnectionAction) { + switch action { + case .createConnection(let connectionID, let eventLoop): + self.createConnection(connectionID, on: eventLoop) + + case .scheduleBackoffTimer(let connectionID, let backoff, on: let eventLoop): + self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop) + + case .scheduleTimeoutTimer(let connectionID, on: let eventLoop): + self.scheduleIdleTimerForConnection(connectionID, on: eventLoop) + + case .cancelTimeoutTimer(let connectionID): + self.cancelIdleTimerForConnection(connectionID) + + case .closeConnection(let connection, isShutdown: let isShutdown): + // we are not interested in the close future... + _ = connection.close() + + if case .yes(let unclean) = isShutdown { + self.delegate.connectionPoolDidShutdown(self, unclean: unclean) + } + + case .cleanupConnections(let cleanupContext, isShutdown: let isShutdown): + for connection in cleanupContext.close { + _ = connection.close() + } + + for connection in cleanupContext.cancel { + _ = connection.close() + } + + for connectionID in cleanupContext.connectBackoff { + self.cancelConnectionStartBackoffTimer(connectionID) + } + + if case .yes(let unclean) = isShutdown { + self.delegate.connectionPoolDidShutdown(self, unclean: unclean) + } + + case .none: + break + } + } + + func runRequestAction(_ action: StateMachine.RequestAction) { + switch action { + case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout): + connection.executeRequest(request.req) + if cancelTimeout { + self.cancelRequestTimeout(request.id) + } + + case .executeRequestsAndCancelTimeouts(let requests, let connection): + for request in requests { + connection.executeRequest(request.req) + self.cancelRequestTimeout(request.id) + } + + case .failRequest(let request, let error, cancelTimeout: let cancelTimeout): + if cancelTimeout { + self.cancelRequestTimeout(request.id) + } + request.req.fail(error) + + case .failRequestsAndCancelTimeouts(let requests, let error): + for request in requests { + self.cancelRequestTimeout(request.id) + request.req.fail(error) + } + + case .scheduleRequestTimeout(let request, on: let eventLoop): + self.scheduleRequestTimeout(request, on: eventLoop) + + case .cancelRequestTimeout(let requestID): + self.cancelRequestTimeout(requestID) + + case .none: + break + } + } + + // MARK: Run actions + + private func createConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) { + self.connectionFactory.makeConnection( + for: self, + connectionID: connectionID, + http1ConnectionDelegate: self, + http2ConnectionDelegate: self, + deadline: .now() + (self.clientConfiguration.timeout.connect ?? .seconds(30)), + eventLoop: eventLoop, + logger: self.logger + ) + } + + private func scheduleRequestTimeout(_ request: Request, on eventLoop: EventLoop) { + let requestID = request.id + let scheduled = eventLoop.scheduleTask(deadline: request.connectionDeadline) { + // The timer has fired. Now we need to do a couple of things: + // + // 1. Remove ourselves from the timer dictionary to not leak any data. If our + // waiter entry still exist, we need to tell the state machine, that we want + // to fail the request. + + let timeout = self.timerLock.withLock { + self._requestTimer.removeValue(forKey: requestID) != nil + } + + // 2. If the entry did not exists anymore, we can assume that the request was + // scheduled on another connection. The timer still fired anyhow because of a + // race. In such a situation we don't need to do anything. + guard timeout else { return } + + // 3. Tell the state machine about the timeout + let action = self.stateLock.withLock { + self._state.timeoutRequest(requestID) + } + + self.run(action: action) + } + + self.timerLock.withLockVoid { + assert(self._requestTimer[requestID] == nil) + self._requestTimer[requestID] = scheduled + } + + request.req.requestWasQueued(self) + } + + private func cancelRequestTimeout(_ id: Request.ID) { + let scheduled = self.timerLock.withLock { + self._requestTimer.removeValue(forKey: id) + } + + scheduled?.cancel() + } + + private func scheduleIdleTimerForConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) { + let scheduled = eventLoop.scheduleTask(in: self.idleConnectionTimeout) { + // there might be a race between a cancelTimer call and the triggering + // of this scheduled task. both want to acquire the lock + let timerExisted = self.timerLock.withLock { + self._idleTimer.removeValue(forKey: connectionID) != nil + } + + guard timerExisted else { return } + + let action = self.stateLock.withLock { + self._state.connectionIdleTimeout(connectionID) + } + self.run(action: action) + } + + self.timerLock.withLock { + assert(self._idleTimer[connectionID] == nil) + self._idleTimer[connectionID] = scheduled + } + } + + private func cancelIdleTimerForConnection(_ connectionID: Connection.ID) { + let cancelTimer = self.timerLock.withLock { + self._idleTimer.removeValue(forKey: connectionID) + } + + cancelTimer?.cancel() + } + + private func scheduleConnectionStartBackoffTimer( + _ connectionID: Connection.ID, + _ timeAmount: TimeAmount, + on eventLoop: EventLoop + ) { + let scheduled = eventLoop.scheduleTask(in: timeAmount) { + // there might be a race between a backoffTimer and the pool shutting down. + let timerExisted = self.timerLock.withLock { + self._backoffTimer.removeValue(forKey: connectionID) != nil + } + + guard timerExisted else { return } + + let action = self.stateLock.withLock { + self._state.connectionCreationBackoffDone(connectionID) + } + self.run(action: action) + } + + self.timerLock.withLock { + assert(self._backoffTimer[connectionID] == nil) + self._backoffTimer[connectionID] = scheduled + } + } + + private func cancelConnectionStartBackoffTimer(_ connectionID: Connection.ID) { + let backoffTimer = self.timerLock.withLock { + self._backoffTimer[connectionID] + } + + backoffTimer?.cancel() + } +} + +// MARK: - Protocol methods - + +extension HTTPConnectionPool: HTTPConnectionRequester { + func http1ConnectionCreated(_ connection: HTTP1Connection) { + let action = self.stateLock.withLock { + self._state.newHTTP1ConnectionCreated(.http1_1(connection)) + } + self.run(action: action) + } + + func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) { + preconditionFailure("Did not expect http/2 connections right now.") +// let action = self.stateLock.withLock { () -> StateMachine.Action in +// if let settings = connection.settings { +// return self._state.newHTTP2ConnectionCreated(.http2(connection), settings: settings) +// } else { +// // immidiate connection closure before we can register with state machine +// // is the only reason we don't have settings +// struct ImmidiateConnectionClose: Error {} +// return self._state.failedToCreateNewConnection(ImmidiateConnectionClose(), connectionID: connection.id) +// } +// } +// self.run(action: action) + } + + func failedToCreateHTTPConnection(_ connectionID: HTTPConnectionPool.Connection.ID, error: Error) { + let action = self.stateLock.withLock { + self._state.failedToCreateNewConnection(error, connectionID: connectionID) + } + self.run(action: action) + } +} + +extension HTTPConnectionPool: HTTP1ConnectionDelegate { + func http1ConnectionClosed(_ connection: HTTP1Connection) { + let action = self.stateLock.withLock { + self._state.connectionClosed(connection.id) + } + self.run(action: action) + } + + func http1ConnectionReleased(_ connection: HTTP1Connection) { + let action = self.stateLock.withLock { + self._state.http1ConnectionReleased(connection.id) + } + self.run(action: action) + } +} + +extension HTTPConnectionPool: HTTP2ConnectionDelegate { + func http2Connection(_ connection: HTTP2Connection, newMaxStreamSetting: Int) { + // ignore for now + } + + func http2ConnectionGoAwayReceived(_: HTTP2Connection) { + // ignore for now + } + + func http2ConnectionClosed(_: HTTP2Connection) { + // ignore for now +// let action = self.stateLock.withLock { +// self._state.connectionClosed(connection.id) +// } +// self.run(action: action) + } + + func http2ConnectionStreamClosed(_ connection: HTTP2Connection, availableStreams: Int) { + // ignore for now +// let action = self.stateLock.withLock { +// self._state.http2ConnectionStreamClosed(connection.id, availableStreams: availableStreams) +// } +// self.run(action: action) + } +} + +extension HTTPConnectionPool: HTTPRequestScheduler { + func cancelRequest(_ request: HTTPSchedulableRequest) { + let requestID = Request(request).id + let action = self.stateLock.withLock { + self._state.cancelRequest(requestID) + } + self.run(action: action) + } } extension HTTPConnectionPool { @@ -156,7 +521,7 @@ extension HTTPConnectionPool { self.req.preferredEventLoop } - var connectionDeadline: NIODeadline? { + var connectionDeadline: NIODeadline { self.req.connectionDeadline } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 0789575e3..db5d871fc 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -306,11 +306,15 @@ internal final class HTTPBin where let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) - private let activeConnCounterHandler: CountActiveConnectionsHandler + private let activeConnCounterHandler: ConnectionsCountHandler var activeConnections: Int { return self.activeConnCounterHandler.currentlyActiveConnections } + var createdConnections: Int { + return self.activeConnCounterHandler.createdConnections + } + var port: Int { return Int(self.serverChannel.localAddress!.port!) } @@ -343,7 +347,7 @@ internal final class HTTPBin where socketAddress = try! SocketAddress(unixDomainSocketPath: path) } - self.activeConnCounterHandler = CountActiveConnectionsHandler() + self.activeConnCounterHandler = ConnectionsCountHandler() let connectionIDAtomic = NIOAtomic.makeAtomic(value: 0) @@ -908,19 +912,25 @@ internal final class HTTPBinHandler: ChannelInboundHandler { } } -final class CountActiveConnectionsHandler: ChannelInboundHandler { +final class ConnectionsCountHandler: ChannelInboundHandler { typealias InboundIn = Channel private let activeConns = NIOAtomic.makeAtomic(value: 0) + private let createdConns = NIOAtomic.makeAtomic(value: 0) + + var createdConnections: Int { + self.createdConns.load() + } - public var currentlyActiveConnections: Int { - return self.activeConns.load() + var currentlyActiveConnections: Int { + self.activeConns.load() } func channelRead(context: ChannelHandlerContext, data: NIOAny) { let channel = self.unwrapInboundIn(data) _ = self.activeConns.add(1) + _ = self.createdConns.add(1) channel.closeFuture.whenComplete { _ in _ = self.activeConns.sub(1) } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift new file mode 100644 index 000000000..da3bb5db7 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests+XCTest.swift @@ -0,0 +1,36 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// HTTPConnectionPoolTests+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension HTTPConnectionPoolTests { + static var allTests: [(String, (HTTPConnectionPoolTests) -> () throws -> Void)] { + return [ + ("testOnlyOneConnectionIsUsedForSubSequentRequests", testOnlyOneConnectionIsUsedForSubSequentRequests), + ("testConnectionsForEventLoopRequirementsAreClosed", testConnectionsForEventLoopRequirementsAreClosed), + ("testConnectionPoolGrowsToMaxConcurrentConnections", testConnectionPoolGrowsToMaxConcurrentConnections), + ("testConnectionCreationIsRetriedUntilRequestIsFailed", testConnectionCreationIsRetriedUntilRequestIsFailed), + ("testConnectionCreationIsRetriedUntilPoolIsShutdown", testConnectionCreationIsRetriedUntilPoolIsShutdown), + ("testConnectionCreationIsRetriedUntilRequestIsCancelled", testConnectionCreationIsRetriedUntilRequestIsCancelled), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests.swift new file mode 100644 index 000000000..dd13284b1 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPoolTests.swift @@ -0,0 +1,358 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import Logging +import NIOCore +import NIOPosix +import XCTest + +class HTTPConnectionPoolTests: XCTestCase { + func testOnlyOneConnectionIsUsedForSubSequentRequests() { + let httpBin = HTTPBin() + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init(), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .seconds(1)) {}.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 0) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + for _ in 0..<10 { + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoop, logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .distantFuture, + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + + XCTAssertNoThrow(try requestBag.task.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 1) + XCTAssertEqual(httpBin.createdConnections, 1) + } + } + + func testConnectionsForEventLoopRequirementsAreClosed() { + let httpBin = HTTPBin() + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init(), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .milliseconds(100)) {}.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 0) + XCTAssertEqual(httpBin.createdConnections, 10) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + for i in 0..<10 { + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .init(.testOnly_exact(channelOn: eventLoopGroup.next(), delegateOn: eventLoopGroup.next())), + task: .init(eventLoop: eventLoop, logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .distantFuture, + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + XCTAssertNoThrow(try requestBag.task.futureResult.wait()) + XCTAssertEqual(httpBin.createdConnections, i + 1) + } + } + + func testConnectionPoolGrowsToMaxConcurrentConnections() { + let httpBin = HTTPBin() + let maxConnections = 8 + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init(connectionPool: .init(idleTimeout: .milliseconds(500))), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + + XCTAssertEqual(httpBin.activeConnections, 0) + XCTAssertEqual(httpBin.createdConnections, maxConnections) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + var tasks = [EventLoopFuture]() + + for _ in 0..<1000 { + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .distantFuture, + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + tasks.append(requestBag.task.futureResult) + } + + XCTAssertNoThrow(try EventLoopFuture.whenAllSucceed(tasks, on: eventLoopGroup.next()).wait()) + XCTAssertEqual(httpBin.activeConnections, maxConnections) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .milliseconds(600)) {}.futureResult.wait()) + XCTAssertEqual(httpBin.activeConnections, 0) + } + + func testConnectionCreationIsRetriedUntilRequestIsFailed() { + let httpBin = HTTPBin(proxy: .simulate(authorization: "abc123")) + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:9000") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init( + proxy: .init(host: "localhost", port: httpBin.port, type: .http(.basic(credentials: "invalid"))) + ), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(5), + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .getConnectionFromPoolTimeout) + } + XCTAssertGreaterThanOrEqual(httpBin.createdConnections, 8) + XCTAssertEqual(httpBin.activeConnections, 0) + } + + func testConnectionCreationIsRetriedUntilPoolIsShutdown() { + let httpBin = HTTPBin(proxy: .simulate(authorization: "abc123")) + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:9000") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init( + proxy: .init(host: "localhost", port: httpBin.port, type: .http(.basic(credentials: "invalid"))) + ), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + + XCTAssertEqual(httpBin.createdConnections, 0) + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(5), + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .seconds(2)) {}.futureResult.wait()) + pool.shutdown() + + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .cancelled) + } + XCTAssertGreaterThanOrEqual(httpBin.createdConnections, 3) + XCTAssertNoThrow(try poolDelegate.future.wait()) + XCTAssertEqual(httpBin.activeConnections, 0) + } + + func testConnectionCreationIsRetriedUntilRequestIsCancelled() { + let httpBin = HTTPBin(proxy: .simulate(authorization: "abc123")) + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let request = try! HTTPClient.Request(url: "http://localhost:9000") + let poolDelegate = TestDelegate(eventLoop: eventLoop) + + let pool = HTTPConnectionPool( + eventLoopGroup: eventLoopGroup, + sslContextCache: .init(), + tlsConfiguration: .none, + clientConfiguration: .init( + proxy: .init(host: "localhost", port: httpBin.port, type: .http(.basic(credentials: "invalid"))) + ), + key: .init(request), + delegate: poolDelegate, + idGenerator: .init(), + backgroundActivityLogger: .init(label: "test") + ) + defer { + pool.shutdown() + XCTAssertNoThrow(try poolDelegate.future.wait()) + } + + XCTAssertEqual(httpBin.createdConnections, 0) + + var maybeRequest: HTTPClient.Request? + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(httpBin.port)")) + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: XCTUnwrap(maybeRequest), + eventLoopPreference: .indifferent, + task: .init(eventLoop: eventLoopGroup.next(), logger: .init(label: "test")), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(5), + idleReadTimeout: nil, + delegate: ResponseAccumulator(request: XCTUnwrap(maybeRequest)) + )) + + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to get a request") } + + pool.executeRequest(requestBag) + XCTAssertNoThrow(try eventLoop.scheduleTask(in: .milliseconds(100)) {}.futureResult.wait()) + requestBag.cancel() + + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { + XCTAssertEqual($0 as? HTTPClientError, .cancelled) + } + XCTAssertGreaterThanOrEqual(httpBin.createdConnections, 1) + } +} + +class TestDelegate: HTTPConnectionPoolDelegate { + private let promise: EventLoopPromise + var future: EventLoopFuture { + self.promise.futureResult + } + + init(eventLoop: EventLoop) { + self.promise = eventLoop.makePromise(of: Bool.self) + } + + func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) { + self.promise.succeed(unclean) + } +} diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index a30d560a9..efa4ffd01 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -40,6 +40,7 @@ import XCTest testCase(HTTPClientNIOTSTests.allTests), testCase(HTTPClientSOCKSTests.allTests), testCase(HTTPClientTests.allTests), + testCase(HTTPConnectionPoolTests.allTests), testCase(HTTPConnectionPool_FactoryTests.allTests), testCase(HTTPConnectionPool_HTTP1ConnectionsTests.allTests), testCase(HTTPConnectionPool_HTTP1StateMachineTests.allTests), From ff70dd2ac4e9b9399b72123de2060200d571e80e Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 9 Sep 2021 18:32:51 +0200 Subject: [PATCH 2/3] Code review Co-authored-by: George Barnett --- .../HTTP1.1/HTTP1Connection.swift | 8 +- .../HTTP2/HTTP2Connection.swift | 8 +- .../ConnectionPool/HTTPConnectionPool.swift | 80 +++++++++++-------- Sources/AsyncHTTPClient/HTTPClient.swift | 2 +- 4 files changed, 61 insertions(+), 37 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift index 8268a598b..4c47eb2cc 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift @@ -79,8 +79,14 @@ final class HTTP1Connection { self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil) } + func close(promise: EventLoopPromise?) { + return self.channel.close(mode: .all, promise: promise) + } + func close() -> EventLoopFuture { - return self.channel.close() + let promise = self.channel.eventLoop.makePromise(of: Void.self) + self.close(promise: promise) + return promise.futureResult } func taskCompleted() { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift index 70c64105a..3761c646e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift @@ -144,8 +144,14 @@ final class HTTP2Connection { } } + func close(promise: EventLoopPromise?) { + return self.channel.close(mode: .all, promise: promise) + } + func close() -> EventLoopFuture { - self.channel.close() + let promise = self.channel.eventLoop.makePromise(of: Void.self) + self.close(promise: promise) + return promise.futureResult } private func start() -> EventLoopFuture { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index 0f6339687..aafb6b814 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -21,7 +21,7 @@ protocol HTTPConnectionPoolDelegate { func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) } -class HTTPConnectionPool { +final class HTTPConnectionPool { struct Connection: Hashable { typealias ID = Int @@ -92,14 +92,14 @@ class HTTPConnectionPool { /// Closes the connection without cancelling running requests. Use this when you are sure, that the /// connection is currently idle. - fileprivate func close() -> EventLoopFuture { + fileprivate func close(promise: EventLoopPromise?) { switch self._ref { case .http1_1(let connection): - return connection.close() + return connection.close(promise: promise) case .http2(let connection): - return connection.close() - case .__testOnly_connection(_, let eventLoop): - return eventLoop.makeSucceededFuture(()) + return connection.close(promise: promise) + case .__testOnly_connection: + promise?.succeed(()) } } @@ -139,18 +139,20 @@ class HTTPConnectionPool { } } - let timerLock = Lock() + static let fallbackConnectTimeout: TimeAmount = .seconds(30) + + private let timerLock = Lock() private var _requestTimer = [Request.ID: Scheduled]() private var _idleTimer = [Connection.ID: Scheduled]() private var _backoffTimer = [Connection.ID: Scheduled]() - let key: ConnectionPool.Key - var logger: Logger + private let key: ConnectionPool.Key + private var logger: Logger - let eventLoopGroup: EventLoopGroup - let connectionFactory: ConnectionFactory - let clientConfiguration: HTTPClient.Configuration - let idleConnectionTimeout: TimeAmount + private let eventLoopGroup: EventLoopGroup + private let connectionFactory: ConnectionFactory + private let clientConfiguration: HTTPClient.Configuration + private let idleConnectionTimeout: TimeAmount let delegate: HTTPConnectionPoolDelegate @@ -197,12 +199,14 @@ class HTTPConnectionPool { self.run(action: action) } - func run(action: StateMachine.Action) { + // MARK: Run actions + + private func run(action: StateMachine.Action) { self.runConnectionAction(action.connection) self.runRequestAction(action.request) } - func runConnectionAction(_ action: StateMachine.ConnectionAction) { + private func runConnectionAction(_ action: StateMachine.ConnectionAction) { switch action { case .createConnection(let connectionID, let eventLoop): self.createConnection(connectionID, on: eventLoop) @@ -218,7 +222,7 @@ class HTTPConnectionPool { case .closeConnection(let connection, isShutdown: let isShutdown): // we are not interested in the close future... - _ = connection.close() + connection.close(promise: nil) if case .yes(let unclean) = isShutdown { self.delegate.connectionPoolDidShutdown(self, unclean: unclean) @@ -226,11 +230,11 @@ class HTTPConnectionPool { case .cleanupConnections(let cleanupContext, isShutdown: let isShutdown): for connection in cleanupContext.close { - _ = connection.close() + connection.close(promise: nil) } for connection in cleanupContext.cancel { - _ = connection.close() + connection.close(promise: nil) } for connectionID in cleanupContext.connectBackoff { @@ -246,7 +250,11 @@ class HTTPConnectionPool { } } - func runRequestAction(_ action: StateMachine.RequestAction) { + private func runRequestAction(_ action: StateMachine.RequestAction) { + // The order of execution fail/execute request vs cancelling the request timeout timer does + // not matter in the actions here. The actions don't cause any side effects that will be + // reported back to the state machine and are not dependent on each other. + switch action { case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout): connection.executeRequest(request.req) @@ -255,10 +263,8 @@ class HTTPConnectionPool { } case .executeRequestsAndCancelTimeouts(let requests, let connection): - for request in requests { - connection.executeRequest(request.req) - self.cancelRequestTimeout(request.id) - } + requests.forEach { connection.executeRequest($0.req) } + self.cancelRequestTimeouts(requests) case .failRequest(let request, let error, cancelTimeout: let cancelTimeout): if cancelTimeout { @@ -267,10 +273,8 @@ class HTTPConnectionPool { request.req.fail(error) case .failRequestsAndCancelTimeouts(let requests, let error): - for request in requests { - self.cancelRequestTimeout(request.id) - request.req.fail(error) - } + requests.forEach { $0.req.fail(error) } + self.cancelRequestTimeouts(requests) case .scheduleRequestTimeout(let request, on: let eventLoop): self.scheduleRequestTimeout(request, on: eventLoop) @@ -283,15 +287,15 @@ class HTTPConnectionPool { } } - // MARK: Run actions - private func createConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) { + // Even though this function is called make it actually creates/establishes a connection. + // TBD: Should we rename it? To what? self.connectionFactory.makeConnection( for: self, connectionID: connectionID, http1ConnectionDelegate: self, http2ConnectionDelegate: self, - deadline: .now() + (self.clientConfiguration.timeout.connect ?? .seconds(30)), + deadline: .now() + (self.clientConfiguration.timeout.connect ?? Self.fallbackConnectTimeout), eventLoop: eventLoop, logger: self.logger ) @@ -303,17 +307,16 @@ class HTTPConnectionPool { // The timer has fired. Now we need to do a couple of things: // // 1. Remove ourselves from the timer dictionary to not leak any data. If our - // waiter entry still exist, we need to tell the state machine, that we want + // waiter entry still exists, we need to tell the state machine, that we want // to fail the request. - - let timeout = self.timerLock.withLock { + let timeoutFired = self.timerLock.withLock { self._requestTimer.removeValue(forKey: requestID) != nil } // 2. If the entry did not exists anymore, we can assume that the request was // scheduled on another connection. The timer still fired anyhow because of a // race. In such a situation we don't need to do anything. - guard timeout else { return } + guard timeoutFired else { return } // 3. Tell the state machine about the timeout let action = self.stateLock.withLock { @@ -339,6 +342,15 @@ class HTTPConnectionPool { scheduled?.cancel() } + private func cancelRequestTimeouts(_ requests: [Request]) { + let scheduled = self.timerLock.withLock { + requests.compactMap { + self._requestTimer.removeValue(forKey: $0.id) + } + } + scheduled.forEach { $0.cancel() } + } + private func scheduleIdleTimerForConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) { let scheduled = eventLoop.scheduleTask(in: self.idleConnectionTimeout) { // there might be a race between a cancelTimer call and the triggering diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 478f62a6a..596bfc57b 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -829,7 +829,7 @@ public class HTTPClient { extension HTTPClient.Configuration { /// Timeout configuration. public struct Timeout { - /// Specifies connect timeout. + /// Specifies connect timeout. If no connect timeout is given, a default 30 seconds timeout will applied. public var connect: TimeAmount? /// Specifies read timeout. public var read: TimeAmount? From a3a844eca94fb22b7716a74eb18ff279bf8e1ad9 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 10 Sep 2021 11:06:28 +0200 Subject: [PATCH 3/3] Code review --- .../ConnectionPool/HTTPConnectionPool.swift | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index aafb6b814..3a076b688 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -129,7 +129,7 @@ final class HTTPConnectionPool { } } - let stateLock = Lock() + private let stateLock = Lock() private var _state: StateMachine { didSet { self.logger.trace("Connection Pool State changed", metadata: [ @@ -139,7 +139,7 @@ final class HTTPConnectionPool { } } - static let fallbackConnectTimeout: TimeAmount = .seconds(30) + private static let fallbackConnectTimeout: TimeAmount = .seconds(30) private let timerLock = Lock() private var _requestTimer = [Request.ID: Scheduled]() @@ -257,14 +257,14 @@ final class HTTPConnectionPool { switch action { case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout): - connection.executeRequest(request.req) if cancelTimeout { self.cancelRequestTimeout(request.id) } + connection.executeRequest(request.req) case .executeRequestsAndCancelTimeouts(let requests, let connection): - requests.forEach { connection.executeRequest($0.req) } self.cancelRequestTimeouts(requests) + requests.forEach { connection.executeRequest($0.req) } case .failRequest(let request, let error, cancelTimeout: let cancelTimeout): if cancelTimeout { @@ -273,8 +273,8 @@ final class HTTPConnectionPool { request.req.fail(error) case .failRequestsAndCancelTimeouts(let requests, let error): - requests.forEach { $0.req.fail(error) } self.cancelRequestTimeouts(requests) + requests.forEach { $0.req.fail(error) } case .scheduleRequestTimeout(let request, on: let eventLoop): self.scheduleRequestTimeout(request, on: eventLoop)