diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index a2189df40..238d7cc7c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -61,6 +61,7 @@ extension HTTPClientTests { ("testDecompressionLimit", testDecompressionLimit), ("testLoopDetectionRedirectLimit", testLoopDetectionRedirectLimit), ("testCountRedirectLimit", testCountRedirectLimit), + ("testMultipleConcurrentRequests", testMultipleConcurrentRequests), ("testWorksWith500Error", testWorksWith500Error), ("testWorksWithHTTP10Response", testWorksWithHTTP10Response), ("testWorksWhenServerClosesConnectionAfterReceivingRequest", testWorksWhenServerClosesConnectionAfterReceivingRequest), diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index b7e9aa382..a022b4a40 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -673,6 +673,66 @@ class HTTPClientTests: XCTestCase { } } + func testMultipleConcurrentRequests() throws { + let numberOfRequestsPerThread = 100 + let numberOfParallelWorkers = 5 + + final class HTTPServer: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + if case .end = self.unwrapInboundIn(data) { + let responseHead = HTTPServerResponsePart.head(.init(version: .init(major: 1, minor: 1), + status: .ok)) + context.write(self.wrapOutboundOut(responseHead), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } + } + } + + let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + var server: Channel? + XCTAssertNoThrow(server = try ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1) + .serverChannelOption(ChannelOptions.backlog, value: .init(numberOfParallelWorkers)) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false, + withServerUpgrade: nil, + withErrorHandling: false).flatMap { + channel.pipeline.addHandler(HTTPServer()) + } + } + .bind(to: .init(ipAddress: "127.0.0.1", port: 0)) + .wait()) + defer { + XCTAssertNoThrow(try server?.close().wait()) + } + + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(group)) + defer { + XCTAssertNoThrow(try httpClient.syncShutdown()) + } + + let g = DispatchGroup() + for workerID in 0..