diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift index 195508149..5fa41d7a5 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift @@ -465,18 +465,3 @@ class ReadEventHitHandler: ChannelOutboundHandler { context.read() } } - -class MockConnectionDelegate: HTTP1ConnectionDelegate { - private(set) var hitConnectionReleased = 0 - private(set) var hitConnectionClosed = 0 - - init() {} - - func http1ConnectionReleased(_: HTTP1Connection) { - self.hitConnectionReleased += 1 - } - - func http1ConnectionClosed(_: HTTP1Connection) { - self.hitConnectionClosed += 1 - } -} diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift index 8ff56e3e4..05844a517 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift @@ -30,6 +30,8 @@ extension HTTP1ConnectionTests { ("testCreateNewConnectionFailureClosedIO", testCreateNewConnectionFailureClosedIO), ("testGETRequest", testGETRequest), ("testConnectionClosesOnCloseHeader", testConnectionClosesOnCloseHeader), + ("testConnectionClosesOnRandomlyAppearingCloseHeader", testConnectionClosesOnRandomlyAppearingCloseHeader), + ("testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader", testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift index a992d4390..9881707aa 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift @@ -14,6 +14,7 @@ @testable import AsyncHTTPClient import Logging +import NIOConcurrencyHelpers import NIOCore import NIOEmbedded import NIOHTTP1 @@ -193,8 +194,61 @@ class HTTP1ConnectionTests: XCTestCase { let eventLoop = eventLoopGroup.next() defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let httpBin = HTTPBin(handlerFactory: { _ in SuddenlySendsCloseHeaderChannelHandler(closeOnRequest: 1) }) + + var maybeChannel: Channel? + + XCTAssertNoThrow(maybeChannel = try ClientBootstrap(group: eventLoop).connect(host: "localhost", port: httpBin.port).wait()) + let connectionDelegate = MockConnectionDelegate() + let logger = Logger(label: "test") + var maybeConnection: HTTP1Connection? + XCTAssertNoThrow(maybeConnection = try eventLoop.submit { try HTTP1Connection.start( + channel: XCTUnwrap(maybeChannel), + connectionID: 0, + delegate: connectionDelegate, + configuration: .init(), + logger: logger + ) }.wait()) + guard let connection = maybeConnection else { return XCTFail("Expected to have a connection here") } + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/")) + guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } + + let delegate = ResponseAccumulator(request: request) + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: request, + eventLoopPreference: .delegate(on: eventLoopGroup.next()), + task: .init(eventLoop: eventLoopGroup.next(), logger: logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + idleReadTimeout: nil, + delegate: delegate + )) + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } + + connection.executeRequest(requestBag) + + var response: HTTPClient.Response? + XCTAssertNoThrow(response = try requestBag.task.futureResult.wait()) + XCTAssertEqual(response?.status, .ok) + XCTAssertEqual(connectionDelegate.hitConnectionReleased, 0) + XCTAssertNoThrow(try XCTUnwrap(maybeChannel).closeFuture.wait()) + XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1) + + // we need to wait a small amount of time to see the connection close on the server + try! eventLoop.scheduleTask(in: .milliseconds(200)) {}.futureResult.wait() + XCTAssertEqual(httpBin.activeConnections, 0) + } + + func testConnectionClosesOnRandomlyAppearingCloseHeader() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let closeOnRequest = (30...100).randomElement()! - let httpBin = HTTPBin(handlerFactory: { _ in SuddenlySendsCloseHeaderChannel(closeOnRequest: closeOnRequest) }) + let httpBin = HTTPBin(handlerFactory: { _ in SuddenlySendsCloseHeaderChannelHandler(closeOnRequest: closeOnRequest) }) var maybeChannel: Channel? @@ -216,7 +270,7 @@ class HTTP1ConnectionTests: XCTestCase { counter += 1 var maybeRequest: HTTPClient.Request? - XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/")) + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/")) guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } let delegate = ResponseAccumulator(request: request) @@ -235,23 +289,80 @@ class HTTP1ConnectionTests: XCTestCase { connection.executeRequest(requestBag) var response: HTTPClient.Response? - if counter <= closeOnRequest { - XCTAssertNoThrow(response = try requestBag.task.futureResult.wait()) - XCTAssertEqual(response?.status, .ok) - - if response?.headers.first(name: "connection") == "close" { - XCTAssertEqual(closeOnRequest, counter) - XCTAssertEqual(maybeChannel?.isActive, false) - } - } else { - // io on close channel leads to error - XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { - XCTAssertEqual($0 as? ChannelError, .ioOnClosedChannel) - } + XCTAssertNoThrow(response = try requestBag.task.futureResult.wait()) + XCTAssertEqual(response?.status, .ok) + if response?.headers.first(name: "connection") == "close" { break // the loop + } else { + XCTAssertEqual(httpBin.activeConnections, 1) + XCTAssertEqual(connectionDelegate.hitConnectionReleased, counter) } } + + XCTAssertNoThrow(try XCTUnwrap(maybeChannel).closeFuture.wait()) + XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1) + XCTAssertFalse(try XCTUnwrap(maybeChannel).isActive) + + XCTAssertEqual(counter, closeOnRequest) + XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1) + XCTAssertEqual(connectionDelegate.hitConnectionReleased, counter - 1, + "If a close header is received connection release is not triggered.") + + // we need to wait a small amount of time to see the connection close on the server + try! eventLoop.scheduleTask(in: .milliseconds(200)) {}.futureResult.wait() + XCTAssertEqual(httpBin.activeConnections, 0) + } + + func testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let eventLoop = eventLoopGroup.next() + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let httpBin = HTTPBin(handlerFactory: { _ in AfterRequestCloseConnectionChannelHandler() }) + + var maybeChannel: Channel? + + XCTAssertNoThrow(maybeChannel = try ClientBootstrap(group: eventLoop).connect(host: "localhost", port: httpBin.port).wait()) + let connectionDelegate = MockConnectionDelegate() + let logger = Logger(label: "test") + var maybeConnection: HTTP1Connection? + XCTAssertNoThrow(maybeConnection = try eventLoop.submit { try HTTP1Connection.start( + channel: XCTUnwrap(maybeChannel), + connectionID: 0, + delegate: connectionDelegate, + configuration: .init(), + logger: logger + ) }.wait()) + guard let connection = maybeConnection else { return XCTFail("Expected to have a connection here") } + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/")) + guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } + + let delegate = ResponseAccumulator(request: request) + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: request, + eventLoopPreference: .delegate(on: eventLoopGroup.next()), + task: .init(eventLoop: eventLoopGroup.next(), logger: logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + idleReadTimeout: nil, + delegate: delegate + )) + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } + + connection.executeRequest(requestBag) + + var response: HTTPClient.Response? + XCTAssertNoThrow(response = try requestBag.task.futureResult.wait()) + XCTAssertEqual(response?.status, .ok) + XCTAssertEqual(connectionDelegate.hitConnectionReleased, 1) + + XCTAssertNoThrow(try XCTUnwrap(maybeChannel).closeFuture.wait()) + XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1) + XCTAssertEqual(httpBin.activeConnections, 0) } } @@ -268,7 +379,8 @@ class MockHTTP1ConnectionDelegate: HTTP1ConnectionDelegate { } } -class SuddenlySendsCloseHeaderChannel: ChannelInboundHandler { +/// A channel handler that sends a connection close header but does not close the connection. +class SuddenlySendsCloseHeaderChannelHandler: ChannelInboundHandler { typealias InboundIn = HTTPServerRequestPart typealias OutboundOut = HTTPServerResponsePart @@ -302,3 +414,58 @@ class SuddenlySendsCloseHeaderChannel: ChannelInboundHandler { } } } + +/// A channel handler that closes a connection after a successful request +class AfterRequestCloseConnectionChannelHandler: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + init() {} + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + switch self.unwrapInboundIn(data) { + case .head(let head): + XCTAssertTrue(head.headers.contains(name: "host")) + XCTAssertEqual(head.method, .GET) + case .body: + break + case .end: + context.write(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil) + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + + context.eventLoop.scheduleTask(in: .milliseconds(20)) { + context.close(promise: nil) + } + } + } +} + +class MockConnectionDelegate: HTTP1ConnectionDelegate { + private var lock = Lock() + + private var _hitConnectionReleased = 0 + private var _hitConnectionClosed = 0 + + var hitConnectionReleased: Int { + self.lock.withLock { self._hitConnectionReleased } + } + + var hitConnectionClosed: Int { + self.lock.withLock { self._hitConnectionClosed } + } + + init() {} + + func http1ConnectionReleased(_: HTTP1Connection) { + self.lock.withLockVoid { + self._hitConnectionReleased += 1 + } + } + + func http1ConnectionClosed(_: HTTP1Connection) { + self.lock.withLockVoid { + self._hitConnectionClosed += 1 + } + } +}