diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index ad2dd2476..caed79945 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -81,6 +81,7 @@ final class HTTPConnectionPool { } func shutdown() { + self.logger.debug("Shutting down connection pool") self.modifyStateAndRunActions { $0.shutdown() } } diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 596bfc57b..3f81e5b74 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -68,7 +68,7 @@ public class HTTPClient { public let eventLoopGroup: EventLoopGroup let eventLoopGroupProvider: EventLoopGroupProvider let configuration: Configuration - let pool: ConnectionPool + let poolManager: HTTPConnectionPool.Manager var state: State private let stateLock = Lock() @@ -110,14 +110,18 @@ public class HTTPClient { #endif } self.configuration = configuration - self.pool = ConnectionPool(configuration: configuration, - backgroundActivityLogger: backgroundActivityLogger) + self.poolManager = HTTPConnectionPool.Manager( + eventLoopGroup: self.eventLoopGroup, + configuration: self.configuration, + backgroundActivityLogger: backgroundActivityLogger + ) self.state = .upAndRunning } deinit { - assert(self.pool.count == 0) - assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.") + guard case .shutDown = self.state else { + preconditionFailure("Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.") + } } /// Shuts down the client and `EventLoopGroup` if it was created by the client. @@ -175,14 +179,16 @@ public class HTTPClient { switch self.eventLoopGroupProvider { case .shared: self.state = .shutDown - callback(nil) + queue.async { + callback(nil) + } case .createNew: switch self.state { case .shuttingDown: self.state = .shutDown self.eventLoopGroup.shutdownGracefully(queue: queue, callback) case .shutDown, .upAndRunning: - assertionFailure("The only valid state at this point is \(State.shutDown)") + assertionFailure("The only valid state at this point is \(String(describing: State.shuttingDown))") } } } @@ -191,33 +197,35 @@ public class HTTPClient { private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { do { try self.stateLock.withLock { - if self.state != .upAndRunning { + guard case .upAndRunning = self.state else { throw HTTPClientError.alreadyShutdown } - self.state = .shuttingDown + self.state = .shuttingDown(requiresCleanClose: requiresCleanClose, callback: callback) } } catch { callback(error) return } - self.pool.close(on: self.eventLoopGroup.next()).whenComplete { result in - var closeError: Error? + let promise = self.eventLoopGroup.next().makePromise(of: Bool.self) + self.poolManager.shutdown(promise: promise) + promise.futureResult.whenComplete { result in switch result { - case .failure(let error): - closeError = error - case .success(let cleanShutdown): - if !cleanShutdown, requiresCleanClose { - closeError = HTTPClientError.uncleanShutdown + case .failure: + preconditionFailure("Shutting down the connection pool must not fail, ever.") + case .success(let unclean): + let (callback, uncleanError) = self.stateLock.withLock { () -> ((Error?) -> Void, Error?) in + guard case .shuttingDown(let requiresClean, callback: let callback) = self.state else { + preconditionFailure("Why did the pool manager shut down, if it was not instructed to") + } + + let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil + return (callback, error) } - self.shutdownEventLoop(queue: queue) { eventLoopError in - // we prioritise .uncleanShutdown here - if let error = closeError { - callback(error) - } else { - callback(eventLoopError) - } + self.shutdownEventLoop(queue: queue) { error in + let reportedError = error ?? uncleanError + callback(reportedError) } } } @@ -492,7 +500,7 @@ public class HTTPClient { let taskEL: EventLoop switch eventLoopPreference.preference { case .indifferent: - taskEL = self.pool.associatedEventLoop(for: ConnectionPool.Key(request)) ?? self.eventLoopGroup.next() + taskEL = self.eventLoopGroup.next() case .delegate(on: let eventLoop): precondition(self.eventLoopGroup.makeIterator().contains { $0 === eventLoop }, "Provided EventLoop must be part of clients EventLoopGroup.") taskEL = eventLoop @@ -540,75 +548,31 @@ public class HTTPClient { } let task = Task(eventLoop: taskEL, logger: logger) - let setupComplete = taskEL.makePromise(of: Void.self) - let connection = self.pool.getConnection(request, - preference: eventLoopPreference, - taskEventLoop: taskEL, - deadline: deadline, - setupComplete: setupComplete.futureResult, - logger: logger) - - let taskHandler = TaskHandler(task: task, - kind: request.kind, - delegate: delegate, - redirectHandler: redirectHandler, - ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown, - logger: logger) - - connection.flatMap { connection -> EventLoopFuture in - logger.debug("got connection for request", - metadata: ["ahc-connection": "\(connection)", - "ahc-request": "\(request.method) \(request.url)", - "ahc-channel-el": "\(connection.channel.eventLoop)", - "ahc-task-el": "\(taskEL)"]) - - let channel = connection.channel - - func prepareChannelForTask0() -> EventLoopFuture { - do { - let syncPipelineOperations = channel.pipeline.syncOperations - - if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) { - try syncPipelineOperations.addHandler(IdleStateHandler(readTimeout: timeout)) - } - - try syncPipelineOperations.addHandler(taskHandler) - } catch { - connection.release(closing: true, logger: logger) - return channel.eventLoop.makeFailedFuture(error) - } - - task.setConnection(connection) + do { + let requestBag = try RequestBag( + request: request, + eventLoopPreference: eventLoopPreference, + task: task, + redirectHandler: redirectHandler, + connectionDeadline: .now() + (self.configuration.timeout.connect ?? .seconds(10)), + requestOptions: .fromClientConfiguration(self.configuration), + delegate: delegate + ) - let isCancelled = task.lock.withLock { - task.cancelled + var deadlineSchedule: Scheduled? + if let deadline = deadline { + deadlineSchedule = taskEL.scheduleTask(deadline: deadline) { + requestBag.fail(HTTPClientError.deadlineExceeded) } - if !isCancelled { - return channel.writeAndFlush(request).flatMapError { _ in - // At this point the `TaskHandler` will already be present - // to handle the failure and pass it to the `promise` - channel.eventLoop.makeSucceededVoidFuture() - } - } else { - return channel.eventLoop.makeSucceededVoidFuture() + task.promise.futureResult.whenComplete { _ in + deadlineSchedule?.cancel() } } - if channel.eventLoop.inEventLoop { - return prepareChannelForTask0() - } else { - return channel.eventLoop.flatSubmit { - return prepareChannelForTask0() - } - } - }.always { _ in - setupComplete.succeed(()) - }.whenFailure { error in - taskHandler.callOutToDelegateFireAndForget { task in - delegate.didReceiveError(task: task, error) - } - task.promise.fail(error) + self.poolManager.executeRequest(requestBag) + } catch { + task.fail(with: error, delegateType: Delegate.self) } return task @@ -821,7 +785,7 @@ public class HTTPClient { enum State { case upAndRunning - case shuttingDown + case shuttingDown(requiresCleanClose: Bool, callback: (Error?) -> Void) case shutDown } } @@ -926,6 +890,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { case serverOfferedUnsupportedApplicationProtocol(String) case requestStreamCancelled case getConnectionFromPoolTimeout + case deadlineExceeded } private var code: Code @@ -995,6 +960,9 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { return HTTPClientError(code: .serverOfferedUnsupportedApplicationProtocol(proto)) } + /// The request deadline was exceeded. The request was cancelled because of this. + public static let deadlineExceeded = HTTPClientError(code: .deadlineExceeded) + /// The remote server responded with a status code >= 300, before the full request was sent. The request stream /// was therefore cancelled public static let requestStreamCancelled = HTTPClientError(code: .requestStreamCancelled) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 4c6c01a16..4877d10e9 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -658,18 +658,28 @@ extension HTTPClient { public let eventLoop: EventLoop let promise: EventLoopPromise - var completion: EventLoopFuture - var connection: Connection? - var cancelled: Bool - let lock: Lock let logger: Logger // We are okay to store the logger here because a Task is for only one request. + var isCancelled: Bool { + self.lock.withLock { self._isCancelled } + } + + var taskDelegate: HTTPClientTaskDelegate? { + get { + self.lock.withLock { self._taskDelegate } + } + set { + self.lock.withLock { self._taskDelegate = newValue } + } + } + + private var _isCancelled: Bool = false + private var _taskDelegate: HTTPClientTaskDelegate? + private let lock = Lock() + init(eventLoop: EventLoop, logger: Logger) { self.eventLoop = eventLoop self.promise = eventLoop.makePromise() - self.completion = self.promise.futureResult.map { _ in } - self.cancelled = false - self.lock = Lock() self.logger = logger } @@ -694,69 +704,24 @@ extension HTTPClient { /// Cancels the request execution. public func cancel() { - let channel: Channel? = self.lock.withLock { - if !self.cancelled { - self.cancelled = true - return self.connection?.channel - } else { - return nil - } + let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in + self._isCancelled = true + return self._taskDelegate } - channel?.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil) - } - @discardableResult - func setConnection(_ connection: Connection) -> Connection { - return self.lock.withLock { - self.connection = connection - if self.cancelled { - connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil) - } - return connection - } + taskDelegate?.cancel() } func succeed(promise: EventLoopPromise?, with value: Response, delegateType: Delegate.Type, closing: Bool) { - self.releaseAssociatedConnection(delegateType: delegateType, - closing: closing).whenSuccess { - promise?.succeed(value) - } + promise?.succeed(value) } func fail(with error: Error, delegateType: Delegate.Type) { - if let connection = self.connection { - self.releaseAssociatedConnection(delegateType: delegateType, closing: true) - .whenSuccess { - self.promise.fail(error) - connection.channel.close(promise: nil) - } - } else { - // this is used in tests where we don't want to bootstrap the whole connection pool - self.promise.fail(error) - } - } - - func releaseAssociatedConnection(delegateType: Delegate.Type, - closing: Bool) -> EventLoopFuture { - if let connection = self.connection { - // remove read timeout handler - return connection.removeHandler(IdleStateHandler.self).flatMap { - connection.removeHandler(TaskHandler.self) - }.map { - connection.release(closing: closing, logger: self.logger) - }.flatMapError { error in - fatalError("Couldn't remove taskHandler: \(error)") - } - } else { - // TODO: This seems only reached in some internal unit test - // Maybe there could be a better handling in the future to make - // it an error outside of testing contexts - return self.eventLoop.makeSucceededFuture(()) - } + self.promise.fail(error) } } } @@ -1076,9 +1041,7 @@ extension TaskHandler: ChannelDuplexHandler { break case .redirected(let head, let redirectURL): self.state = .endOrError - self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess { - self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise) - } + self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise) default: self.state = .bufferedEnd self.handleReadForDelegate(response, context: context) diff --git a/Sources/AsyncHTTPClient/RequestBag.swift b/Sources/AsyncHTTPClient/RequestBag.swift index 939f46ee2..ed2571307 100644 --- a/Sources/AsyncHTTPClient/RequestBag.swift +++ b/Sources/AsyncHTTPClient/RequestBag.swift @@ -65,11 +65,10 @@ final class RequestBag { self.requestHead = head self.requestFramingMetadata = metadata - // TODO: comment in once we switch to using the Request bag in AHC -// self.task.taskDelegate = self -// self.task.futureResult.whenComplete { _ in -// self.task.taskDelegate = nil -// } + self.task.taskDelegate = self + self.task.futureResult.whenComplete { _ in + self.task.taskDelegate = nil + } } private func requestWasQueued0(_ scheduler: HTTPRequestScheduler) { @@ -113,7 +112,7 @@ final class RequestBag { self.writeNextRequestPart($0) } - body.stream(writer).whenComplete { + body.stream(writer).hop(to: self.eventLoop).whenComplete { self.finishRequestBodyStream($0) } @@ -142,7 +141,7 @@ final class RequestBag { } private func writeNextRequestPart0(_ part: IOData) -> EventLoopFuture { - self.task.eventLoop.assertInEventLoop() + self.eventLoop.assertInEventLoop() let action = self.state.writeNextRequestPart(part, taskEventLoop: self.task.eventLoop) diff --git a/Sources/AsyncHTTPClient/SSLContextCache.swift b/Sources/AsyncHTTPClient/SSLContextCache.swift index b4b9112ec..31ed106a0 100644 --- a/Sources/AsyncHTTPClient/SSLContextCache.swift +++ b/Sources/AsyncHTTPClient/SSLContextCache.swift @@ -34,12 +34,12 @@ extension SSLContextCache { } if let sslContext = sslContext { - logger.debug("found SSL context in cache", + logger.trace("found SSL context in cache", metadata: ["ahc-tls-config": "\(tlsConfiguration)"]) return eventLoop.makeSucceededFuture(sslContext) } - logger.debug("creating new SSL context", + logger.trace("creating new SSL context", metadata: ["ahc-tls-config": "\(tlsConfiguration)"]) let newSSLContext = self.offloadQueue.asyncWithFuture(eventLoop: eventLoop) { try NIOSSLContext(configuration: tlsConfiguration) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift index 64f691af9..dc9c1f701 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift @@ -34,17 +34,11 @@ extension HTTPClientInternalTests { ("testRequestFinishesAfterRedirectIfServerRespondsBeforeClientFinishes", testRequestFinishesAfterRedirectIfServerRespondsBeforeClientFinishes), ("testProxyStreaming", testProxyStreaming), ("testProxyStreamingFailure", testProxyStreamingFailure), - ("testUploadStreamingBackpressure", testUploadStreamingBackpressure), ("testRequestURITrailingSlash", testRequestURITrailingSlash), ("testChannelAndDelegateOnDifferentEventLoops", testChannelAndDelegateOnDifferentEventLoops), - ("testResponseConnectionCloseGet", testResponseConnectionCloseGet), - ("testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool", testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool), - ("testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown", testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown), - ("testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection", testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection), ("testResponseFutureIsOnCorrectEL", testResponseFutureIsOnCorrectEL), ("testUncleanCloseThrows", testUncleanCloseThrows), ("testUploadStreamingIsCalledOnTaskEL", testUploadStreamingIsCalledOnTaskEL), - ("testWeCanActuallyExactlySetTheEventLoops", testWeCanActuallyExactlySetTheEventLoops), ("testTaskPromiseBoundToEL", testTaskPromiseBoundToEL), ("testConnectErrorCalloutOnCorrectEL", testConnectErrorCalloutOnCorrectEL), ("testInternalRequestURI", testInternalRequestURI), diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 5f9750b35..2c7230e58 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -290,7 +290,7 @@ class HTTPClientInternalTests: XCTestCase { let httpBin = HTTPBin() let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) defer { - XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) + XCTAssertNoThrow(try httpClient.syncShutdown()) XCTAssertNoThrow(try httpBin.shutdown()) } @@ -317,130 +317,6 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertThrowsError(try httpClient.post(url: "http://localhost:\(httpBin.port)/post", body: body).wait()) } - // In order to test backpressure we need to make sure that reads will not happen - // until the backpressure promise is succeeded. Since we cannot guarantee when - // messages will be delivered to a client pipeline and we need this test to be - // fast (no waiting for arbitrary amounts of time), we do the following. - // First, we enforce NIO to send us only 1 byte at a time. Then we send a message - // of 4 bytes. This will guarantee that if we see first byte of the message, other - // bytes a ready to be read as well. This will allow us to test if subsequent reads - // are waiting for backpressure promise. - func testUploadStreamingBackpressure() throws { - class BackpressureTestDelegate: HTTPClientResponseDelegate { - typealias Response = Void - - var _reads = 0 - let lock: Lock - let backpressurePromise: EventLoopPromise - let optionsApplied: EventLoopPromise - let messageReceived: EventLoopPromise - - init(eventLoop: EventLoop) { - self.lock = Lock() - self.backpressurePromise = eventLoop.makePromise() - self.optionsApplied = eventLoop.makePromise() - self.messageReceived = eventLoop.makePromise() - } - - var reads: Int { - return self.lock.withLock { - self._reads - } - } - - func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - // This is to force NIO to send only 1 byte at a time. - let future = task.connection!.channel.setOption(ChannelOptions.maxMessagesPerRead, value: 1).flatMap { - task.connection!.channel.setOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 1)) - } - future.cascade(to: self.optionsApplied) - return future - } - - func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { - // We count a number of reads received. - self.lock.withLockVoid { - self._reads += 1 - } - // We need to notify the test when first byte of the message is arrived. - self.messageReceived.succeed(()) - return self.backpressurePromise.futureResult - } - - func didFinishRequest(task: HTTPClient.Task) throws {} - } - - final class WriteAfterFutureSucceedsHandler: ChannelInboundHandler { - typealias InboundIn = HTTPServerRequestPart - typealias OutboundOut = HTTPServerResponsePart - - let bodyFuture: EventLoopFuture - let endFuture: EventLoopFuture - - init(bodyFuture: EventLoopFuture, endFuture: EventLoopFuture) { - self.bodyFuture = bodyFuture - self.endFuture = endFuture - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - switch self.unwrapInboundIn(data) { - case .head: - let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok) - context.writeAndFlush(wrapOutboundOut(.head(head)), promise: nil) - case .body: - // ignore - break - case .end: - self.bodyFuture.hop(to: context.eventLoop).whenSuccess { - let buffer = context.channel.allocator.buffer(string: "1234") - context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil) - } - - self.endFuture.hop(to: context.eventLoop).whenSuccess { - context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) - } - } - } - } - - // cannot test with NIOTS as `maxMessagesPerRead` is not supported - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) - let delegate = BackpressureTestDelegate(eventLoop: httpClient.eventLoopGroup.next()) - let httpBin = HTTPBin { _ in - WriteAfterFutureSucceedsHandler( - bodyFuture: delegate.optionsApplied.futureResult, - endFuture: delegate.backpressurePromise.futureResult - ) - } - - defer { - XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) - XCTAssertNoThrow(try httpBin.shutdown()) - XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) - } - - let request = try Request(url: "http://localhost:\(httpBin.port)/custom") - - let requestFuture = httpClient.execute(request: request, delegate: delegate).futureResult - - // We need to wait for channel options that limit NIO to sending only one byte at a time. - try delegate.optionsApplied.futureResult.wait() - - // Send 4 bytes, but only one should be received until the backpressure promise is succeeded. - - // Now we wait until message is delivered to client channel pipeline - try delegate.messageReceived.futureResult.wait() - XCTAssertEqual(delegate.reads, 1) - - // Succeed the backpressure promise. - delegate.backpressurePromise.succeed(()) - try requestFuture.wait() - - // At this point all other bytes should be delivered. - XCTAssertEqual(delegate.reads, 4) - } - func testRequestURITrailingSlash() throws { let request1 = try Request(url: "https://someserver.com:8888/some/path?foo=bar#ref") XCTAssertEqual(request1.url.uri, "/some/path?foo=bar") @@ -635,316 +511,6 @@ class HTTPClientInternalTests: XCTestCase { } } - func testResponseConnectionCloseGet() throws { - let httpBin = HTTPBin(.http1_1()) - let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup), - configuration: HTTPClient.Configuration(certificateVerification: .none)) - defer { - XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) - XCTAssertNoThrow(try httpBin.shutdown()) - } - - let req = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", - method: .GET, - headers: ["X-Send-Back-Header-Connection": "close"], body: nil) - _ = try! httpClient.execute(request: req).wait() - let el = httpClient.eventLoopGroup.next() - try! el.scheduleTask(in: .milliseconds(500)) { - XCTAssertEqual(httpClient.pool.count, 0) - }.futureResult.wait() - } - - func testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool() throws { - final class ServerThatRespondsThenJustCloses: ChannelInboundHandler { - typealias InboundIn = HTTPServerRequestPart - typealias OutboundOut = HTTPServerResponsePart - - let requestNumber: NIOAtomic - let connectionNumber: NIOAtomic - - init(requestNumber: NIOAtomic, connectionNumber: NIOAtomic) { - self.requestNumber = requestNumber - self.connectionNumber = connectionNumber - } - - func channelActive(context: ChannelHandlerContext) { - _ = self.connectionNumber.add(1) - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - let req = self.unwrapInboundIn(data) - - switch req { - case .head, .body: - () - case .end: - let last = self.requestNumber.add(1) - switch last { - case 0: - context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), - promise: nil) - context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { _ in - context.eventLoop.scheduleTask(in: .milliseconds(10)) { - context.close(promise: nil) - } - } - case 1: - context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), - promise: nil) - context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) - default: - XCTFail("did not expect request \(last + 1)") - } - } - } - } - - final class ObserveWhenClosedHandler: ChannelInboundHandler { - typealias InboundIn = Any - - let channelInactivePromise: EventLoopPromise - - init(channelInactivePromise: EventLoopPromise) { - self.channelInactivePromise = channelInactivePromise - } - - func channelInactive(context: ChannelHandlerContext) { - context.fireChannelInactive() - self.channelInactivePromise.succeed(()) - } - } - - let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) - defer { - XCTAssertNoThrow(try group.syncShutdownGracefully()) - } - let requestNumber = NIOAtomic.makeAtomic(value: 0) - let connectionNumber = NIOAtomic.makeAtomic(value: 0) - let sharedStateServerHandler = ServerThatRespondsThenJustCloses(requestNumber: requestNumber, - connectionNumber: connectionNumber) - var maybeServer: Channel? - XCTAssertNoThrow(maybeServer = try ServerBootstrap(group: group) - .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1) - .childChannelInitializer { channel in - channel.pipeline.configureHTTPServerPipeline().flatMap { - // We're deliberately adding a handler which is shared between multiple channels. This is normally - // very verboten but this handler is specially crafted to tolerate this. - channel.pipeline.addHandler(sharedStateServerHandler) - } - } - .bind(host: "127.0.0.1", port: 0) - .wait()) - guard let server = maybeServer else { - XCTFail("couldn't create server") - return - } - defer { - XCTAssertNoThrow(try server.close().wait()) - } - - let url = "http://127.0.0.1:\(server.localAddress!.port!)" - let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) - defer { - XCTAssertNoThrow(try client.syncShutdown()) - } - - var maybeConnection: Connection? - // This is pretty evil but we literally just get hold of a connection to get to the channel to be able to - // observe when the server closing the connection is known to the client. - let el = group.next() - XCTAssertNoThrow(maybeConnection = try client.pool.getConnection(Request(url: url), - preference: .indifferent, - taskEventLoop: el, - deadline: nil, - setupComplete: el.makeSucceededFuture(()), - logger: HTTPClient.loggingDisabled).wait()) - guard let connection = maybeConnection else { - XCTFail("couldn't get connection") - return - } - - // And let's also give the connection back :). - try connection.channel.eventLoop.submit { - connection.release(closing: false, logger: HTTPClient.loggingDisabled) - }.wait() - - XCTAssertEqual(0, sharedStateServerHandler.requestNumber.load()) - XCTAssertEqual(1, client.pool.count) - XCTAssertTrue(connection.channel.isActive) - XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status)) - XCTAssertEqual(1, sharedStateServerHandler.connectionNumber.load()) - XCTAssertEqual(1, sharedStateServerHandler.requestNumber.load()) - - // We have received the first response and we know the remote end will now close the connection. - // Let's wait until we see the closure in the client's channel. - XCTAssertNoThrow(try connection.channel.closeFuture.wait()) - - // Now that we should have learned that the connection is dead, a subsequent request should work and use a new - // connection - XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status)) - XCTAssertEqual(2, sharedStateServerHandler.connectionNumber.load()) - XCTAssertEqual(2, sharedStateServerHandler.requestNumber.load()) - } - - func testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown() { - struct NoChannelError: Error {} - - let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) - var maybeServersAndChannels: [(HTTPBin, Channel)]? - XCTAssertNoThrow(maybeServersAndChannels = try (0..<10).map { _ in - let web = HTTPBin() - defer { - XCTAssertNoThrow(try web.shutdown()) - } - - let req = try! HTTPClient.Request(url: "http://localhost:\(web.port)/get", - method: .GET, - body: nil) - var maybeConnection: Connection? - let el = client.eventLoopGroup.next() - XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(req, - preference: .indifferent, - taskEventLoop: el, - deadline: nil, - setupComplete: el.makeSucceededFuture(()), - logger: HTTPClient.loggingDisabled).wait()) - guard let connection = maybeConnection else { - XCTFail("couldn't make connection") - throw NoChannelError() - } - - let channel = connection.channel - try! channel.eventLoop.submit { - connection.release(closing: true, logger: HTTPClient.loggingDisabled) - }.wait() - return (web, channel) - }) - - guard let serversAndChannels = maybeServersAndChannels else { - XCTFail("couldn't open servers") - return - } - - DispatchQueue.global().async { - serversAndChannels.forEach { serverAndChannel in - serverAndChannel.1.close(promise: nil) - } - } - XCTAssertNoThrow(try client.syncShutdown()) - } - - func testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection() { - final class DelayChannelCloseUntilToldHandler: ChannelOutboundHandler { - typealias OutboundIn = Any - - enum State { - case idling - case delayedClose - case closeDone - } - - var state: State = .idling - let doTheCloseNowFuture: EventLoopFuture - let sawTheClosePromise: EventLoopPromise - - init(doTheCloseNowFuture: EventLoopFuture, - sawTheClosePromise: EventLoopPromise) { - self.doTheCloseNowFuture = doTheCloseNowFuture - self.sawTheClosePromise = sawTheClosePromise - } - - func handlerRemoved(context: ChannelHandlerContext) { - XCTAssertEqual(.closeDone, self.state) - } - - func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { - XCTAssertEqual(.idling, self.state) - self.state = .delayedClose - self.sawTheClosePromise.succeed(()) - // let's hold the close until the future's complete - self.doTheCloseNowFuture.whenSuccess { - context.close(mode: mode).map { - XCTAssertEqual(.delayedClose, self.state) - self.state = .closeDone - }.cascade(to: promise) - } - } - } - - let web = HTTPBin() - defer { - XCTAssertNoThrow(try web.shutdown()) - } - - let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) - defer { - XCTAssertNoThrow(try client.syncShutdown()) - } - - let req = try! HTTPClient.Request(url: "http://localhost:\(web.port)/get", - method: .GET, - body: nil) - - // Let's start by getting a connection so we can mess with the Channel :). - var maybeConnection: Connection? - let el = client.eventLoopGroup.next() - XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(req, - preference: .indifferent, - taskEventLoop: el, - deadline: nil, - setupComplete: el.makeSucceededFuture(()), - logger: HTTPClient.loggingDisabled).wait()) - guard let connection = maybeConnection else { - XCTFail("couldn't make connection") - return - } - - let channel = connection.channel - let doActualCloseNowPromise = channel.eventLoop.makePromise(of: Void.self) - let sawTheClosePromise = channel.eventLoop.makePromise(of: Void.self) - - XCTAssertNoThrow(try channel.pipeline.addHandler(DelayChannelCloseUntilToldHandler(doTheCloseNowFuture: doActualCloseNowPromise.futureResult, - sawTheClosePromise: sawTheClosePromise), - position: .first).wait()) - try! connection.channel.eventLoop.submit { - connection.release(closing: false, logger: HTTPClient.loggingDisabled) - }.wait() - - XCTAssertNoThrow(try client.execute(request: req).wait()) - - // Now, let's pretend the timeout happened - channel.pipeline.fireUserInboundEventTriggered(IdleStateHandler.IdleStateEvent.write) - - // The Channel's closure should have already been initialised now but still, let's make sure the close - // was initiated - XCTAssertNoThrow(try sawTheClosePromise.futureResult.wait()) - // The Channel should still be active though because we delayed the close through our handler above. - XCTAssertTrue(channel.isActive) - - // When asking for a connection again, we should _not_ get the same one back because we did most of the close, - // similar to what the SSLHandler would do. - let el2 = client.eventLoopGroup.next() - let connection2Future = client.pool.getConnection(req, - preference: .indifferent, - taskEventLoop: el2, - deadline: nil, - setupComplete: el2.makeSucceededFuture(()), - logger: HTTPClient.loggingDisabled) - doActualCloseNowPromise.succeed(()) - - XCTAssertNoThrow(try maybeConnection = connection2Future.wait()) - guard let connection2 = maybeConnection else { - XCTFail("couldn't get second connection") - return - } - - XCTAssert(connection !== connection2) - try! connection2.channel.eventLoop.submit { - connection2.release(closing: false, logger: HTTPClient.loggingDisabled) - }.wait() - XCTAssertTrue(connection2.channel.isActive) - } - func testResponseFutureIsOnCorrectEL() throws { let group = getDefaultEventLoopGroup(numberOfThreads: 4) defer { @@ -1031,48 +597,6 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertNoThrow(try response.wait()) } - func testWeCanActuallyExactlySetTheEventLoops() throws { - let group = getDefaultEventLoopGroup(numberOfThreads: 3) - defer { - XCTAssertNoThrow(try group.syncShutdownGracefully()) - } - - let httpBin = HTTPBin() - let httpClient = HTTPClient(eventLoopGroupProvider: .shared(group)) - defer { - XCTAssertNoThrow(try httpClient.syncShutdown()) - XCTAssertNoThrow(try httpBin.shutdown()) - } - - let el1 = group.next() - let el2 = group.next() - XCTAssert(el1 !== el2) - - let taskPromise = group.next().makePromise(of: HTTPClient.Task.self) - let body: HTTPClient.Body = .stream(length: 8) { writer in - XCTAssert(el1.inEventLoop) - let buffer = ByteBuffer(string: "1234") - return writer.write(.byteBuffer(buffer)).flatMap { - XCTAssert(el1.inEventLoop) - let buffer = ByteBuffer(string: "4321") - return taskPromise.futureResult.map { (task: HTTPClient.Task) -> Void in - XCTAssertNotNil(task.connection) - XCTAssert(task.connection?.channel.eventLoop === el2) - }.flatMap { - writer.write(.byteBuffer(buffer)) - } - } - } - let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/post", method: .POST, body: body) - let response = httpClient.execute(request: request, - delegate: ResponseAccumulator(request: request), - eventLoop: HTTPClient.EventLoopPreference(.testOnly_exact(channelOn: el2, - delegateOn: el1))) - taskPromise.succeed(response) - XCTAssert(el1 === response.eventLoop) - XCTAssertNoThrow(try response.wait()) - } - func testTaskPromiseBoundToEL() throws { let elg = getDefaultEventLoopGroup(numberOfThreads: 2) let el1 = elg.next() diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index a7b343bc2..5cdc2ada1 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -591,7 +591,7 @@ class HTTPClientTests: XCTestCase { func testDeadline() { XCTAssertThrowsError(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "wait", deadline: .now() + .milliseconds(150)).wait()) { - XCTAssertEqual($0 as? HTTPClientError, .readTimeout) + XCTAssertEqual($0 as? HTTPClientError, .deadlineExceeded) } } @@ -1098,14 +1098,16 @@ class HTTPClientTests: XCTestCase { XCTAssertNoThrow(try web.stop()) } let result = self.defaultClient.get(url: "http://localhost:\(web.serverPort)/foo") + XCTAssertNoThrow(try web.receiveHeadAndVerify { + XCTAssertEqual($0, HTTPRequestHead( + version: .init(major: 1, minor: 1), + method: .GET, + uri: "/foo", + headers: HTTPHeaders([("Host", "localhost:\(web.serverPort)")]) + )) + }) + XCTAssertNoThrow(try web.receiveEnd()) - XCTAssertNoThrow(XCTAssertEqual(.head(.init(version: .init(major: 1, minor: 1), - method: .GET, - uri: "/foo", - headers: HTTPHeaders([("Host", "localhost:\(web.serverPort)")]))), - try web.readInbound())) - XCTAssertNoThrow(XCTAssertEqual(.end(nil), - try web.readInbound())) XCTAssertNoThrow(try web.writeOutbound(.head(.init(version: .init(major: 1, minor: 0), status: .internalServerError)))) XCTAssertNoThrow(try web.writeOutbound(.end(nil))) @@ -2199,7 +2201,17 @@ class HTTPClientTests: XCTestCase { } }) XCTAssert(logsAfterReq1.contains { entry in - entry.message == "opening fresh connection (no connections to reuse available)" + // Since a new connection must be created first we expect that the request is queued + // and log message describing this is emitted. + entry.message == "Request was queued (waiting for a connection to become available)" + && entry.level == .debug + }) + XCTAssert(logsAfterReq1.contains { entry in + // After the new connection was created we expect a log message that describes that the + // request was scheduled on a connection. The connection id must be set from here on. + entry.message == "Request was scheduled on connection" + && entry.level == .debug + && entry.metadata["ahc-connection-id"] != nil }) XCTAssert(logsAfterReq2.allSatisfy { entry in @@ -2214,8 +2226,13 @@ class HTTPClientTests: XCTestCase { return false } }) + XCTAssertFalse(logsAfterReq2.contains { entry in + entry.message == "Request was queued (waiting for a connection to become available)" + }) XCTAssert(logsAfterReq2.contains { entry in - entry.message.starts(with: "leasing existing connection") + entry.message == "Request was scheduled on connection" + && entry.level == .debug + && entry.metadata["ahc-connection-id"] != nil }) XCTAssert(logsAfterReq3.allSatisfy { entry in @@ -2230,8 +2247,13 @@ class HTTPClientTests: XCTestCase { return false } }) + XCTAssertFalse(logsAfterReq3.contains { entry in + entry.message == "Request was queued (waiting for a connection to become available)" + }) XCTAssert(logsAfterReq3.contains { entry in - entry.message.starts(with: "leasing existing connection") + entry.message == "Request was scheduled on connection" + && entry.level == .debug + && entry.metadata["ahc-connection-id"] != nil }) } @@ -2271,7 +2293,7 @@ class HTTPClientTests: XCTestCase { logger: logger).wait()) XCTAssertEqual(0, logStore.allEntries.count) - XCTAssertEqual(0, self.backgroundLogStore.allEntries.count) + XCTAssertEqual(0, self.backgroundLogStore.allEntries.filter { $0.level >= .info }.count) // === Synthesized Socket Path Request XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in @@ -2297,7 +2319,7 @@ class HTTPClientTests: XCTestCase { logger: logger).wait()) XCTAssertEqual(0, logStore.allEntries.count) - XCTAssertEqual(0, backgroundLogStore.allEntries.count) + XCTAssertEqual(0, backgroundLogStore.allEntries.filter { $0.level >= .info }.count) }) // === Synthesized Secure Socket Path Request @@ -2325,7 +2347,7 @@ class HTTPClientTests: XCTestCase { logger: logger).wait()) XCTAssertEqual(0, logStore.allEntries.count) - XCTAssertEqual(0, backgroundLogStore.allEntries.count) + XCTAssertEqual(0, backgroundLogStore.allEntries.filter { $0.level >= .info }.count) }) } @@ -2375,7 +2397,7 @@ class HTTPClientTests: XCTestCase { }.status)) // No background activity expected here. - XCTAssertEqual(0, self.backgroundLogStore.allEntries.count) + XCTAssertEqual(0, self.backgroundLogStore.allEntries.filter { $0.level >= .debug }.count) XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in let backgroundLogStore = CollectEverythingLogHandler.LogStore() @@ -2397,7 +2419,7 @@ class HTTPClientTests: XCTestCase { }.status)) // No background activity expected here. - XCTAssertEqual(0, backgroundLogStore.allEntries.count) + XCTAssertEqual(0, backgroundLogStore.allEntries.filter { $0.level >= .debug }.count) }) XCTAssertNoThrow(try TemporaryFileHelpers.withTemporaryUnixDomainSocketPathName { path in @@ -2421,7 +2443,7 @@ class HTTPClientTests: XCTestCase { }.status)) // No background activity expected here. - XCTAssertEqual(0, backgroundLogStore.allEntries.count) + XCTAssertEqual(0, backgroundLogStore.allEntries.filter { $0.level >= .debug }.count) }) } @@ -2432,12 +2454,12 @@ class HTTPClientTests: XCTestCase { XCTAssertGreaterThanOrEqual(self.backgroundLogStore.allEntries.count, 0) XCTAssert(self.backgroundLogStore.allEntries.contains { entry in - entry.message == "closing provider" + entry.message == "Shutting down connection pool" }) XCTAssert(self.backgroundLogStore.allEntries.allSatisfy { entry in entry.metadata["ahc-request-id"] == nil && entry.metadata["ahc-request"] == nil && - entry.metadata["ahc-provider"] != nil + entry.metadata["ahc-pool-key"] != nil }) self.defaultClient = nil // so it doesn't get shut down again.