Skip to content

[HTTP2] More Integration Tests #471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 11, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP2ClientTests+XCTest.swift
Original file line number Diff line number Diff line change
@@ -29,6 +29,10 @@ extension HTTP2ClientTests {
("testConcurrentRequests", testConcurrentRequests),
("testConcurrentRequestsFromDifferentThreads", testConcurrentRequestsFromDifferentThreads),
("testConcurrentRequestsWorkWithRequiredEventLoop", testConcurrentRequestsWorkWithRequiredEventLoop),
("testUncleanShutdownCancelsExecutingAndQueuedTasks", testUncleanShutdownCancelsExecutingAndQueuedTasks),
("testCancelingRunningRequest", testCancelingRunningRequest),
("testStressCancelingRunningRequestFromDifferentThreads", testStressCancelingRunningRequestFromDifferentThreads),
("testPlatformConnectErrorIsForwardedOnTimeout", testPlatformConnectErrorIsForwardedOnTimeout),
]
}
}
210 changes: 207 additions & 3 deletions Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift
Original file line number Diff line number Diff line change
@@ -19,16 +19,19 @@
#endif
import Logging
import NIOCore
import NIOHTTP1
import NIOPosix
import NIOSSL
import XCTest

class HTTP2ClientTests: XCTestCase {
func makeDefaultHTTPClient() -> HTTPClient {
func makeDefaultHTTPClient(
eventLoopGroupProvider: HTTPClient.EventLoopGroupProvider = .createNew
) -> HTTPClient {
var tlsConfig = TLSConfiguration.makeClientConfiguration()
tlsConfig.certificateVerification = .none
return HTTPClient(
eventLoopGroupProvider: .createNew,
eventLoopGroupProvider: eventLoopGroupProvider,
configuration: HTTPClient.Configuration(
tlsConfiguration: tlsConfig,
httpVersion: .automatic
@@ -37,6 +40,18 @@ class HTTP2ClientTests: XCTestCase {
)
}

func makeClientWithActiveHTTP2Connection<RequestHandler>(
to bin: HTTPBin<RequestHandler>,
eventLoopGroupProvider: HTTPClient.EventLoopGroupProvider = .createNew
) -> HTTPClient {
let client = self.makeDefaultHTTPClient(eventLoopGroupProvider: eventLoopGroupProvider)
var response: HTTPClient.Response?
XCTAssertNoThrow(response = try client.get(url: "https://localhost:\(bin.port)/get").wait())
XCTAssertEqual(.ok, response?.status)
XCTAssertEqual(response?.version, .http2)
return client
}

func testSimpleGet() {
let bin = HTTPBin(.http2(compress: false))
defer { XCTAssertNoThrow(try bin.shutdown()) }
@@ -92,7 +107,7 @@ class HTTP2ClientTests: XCTestCase {

for _ in 0..<numberOfRequestsPerWorkers {
var response: HTTPClient.Response?
XCTAssertNoThrow(response = try client.get(url: url).wait())
XCTAssertNoThrow(response = try client.get(url: "https://localhost:\(bin.port)/get").wait())

XCTAssertEqual(.ok, response?.status)
XCTAssertEqual(response?.version, .http2)
@@ -187,4 +202,193 @@ class HTTP2ClientTests: XCTestCase {
// all workers should be running, let's wait for them to finish
allDone.wait()
}

func testUncleanShutdownCancelsExecutingAndQueuedTasks() {
let bin = HTTPBin(.http2(compress: false))
defer { XCTAssertNoThrow(try bin.shutdown()) }
let clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try clientGroup.syncShutdownGracefully()) }
// we need an active connection to guarantee that requests are executed immediately
// without waiting for connection establishment
let client = self.makeClientWithActiveHTTP2Connection(to: bin, eventLoopGroupProvider: .shared(clientGroup))

// start 20 requests which are guaranteed to never get any response
// 10 of them will executed and the other 10 will be queued
// because HTTPBin has a default `maxConcurrentStreams` limit of 10
let responses = (0..<20).map { _ in
client.get(url: "https://localhost:\(bin.port)/wait")
}

XCTAssertNoThrow(try client.syncShutdown())

var results: [Result<HTTPClient.Response, Error>] = []
XCTAssertNoThrow(results = try EventLoopFuture
.whenAllComplete(responses, on: clientGroup.next())
.timeout(after: .seconds(2))
.wait())

for result in results {
switch result {
case .success:
XCTFail("Shouldn't succeed")
case .failure(let error):
XCTAssertEqual(error as? HTTPClientError, .cancelled)
}
}
}

func testCancelingRunningRequest() {
let bin = HTTPBin(.http2(compress: false)) { _ in SendHeaderAndWaitChannelHandler() }
defer { XCTAssertNoThrow(try bin.shutdown()) }
let client = self.makeDefaultHTTPClient()
defer { XCTAssertNoThrow(try client.syncShutdown()) }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(bin.port)"))
guard let request = maybeRequest else { return }

var task: HTTPClient.Task<Void>!
let delegate = HeadReceivedCallback { _ in
// request is definitely running because we just received a head from the server
task.cancel()
}
task = client.execute(
request: request,
delegate: delegate
)

XCTAssertThrowsError(try task.futureResult.timeout(after: .seconds(2)).wait()) {
XCTAssertEqual($0 as? HTTPClientError, .cancelled)
}
}

func testStressCancelingRunningRequestFromDifferentThreads() {
let bin = HTTPBin(.http2(compress: false)) { _ in SendHeaderAndWaitChannelHandler() }
defer { XCTAssertNoThrow(try bin.shutdown()) }
let client = self.makeDefaultHTTPClient()
defer { XCTAssertNoThrow(try client.syncShutdown()) }
let cancelPool = MultiThreadedEventLoopGroup(numberOfThreads: 10)
defer { XCTAssertNoThrow(try cancelPool.syncShutdownGracefully()) }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(bin.port)"))
guard let request = maybeRequest else { return }

let tasks = (0..<100).map { _ -> HTTPClient.Task<TestHTTPDelegate.Response> in
var task: HTTPClient.Task<Void>!
let delegate = HeadReceivedCallback { _ in
// request is definitely running because we just received a head from the server
cancelPool.next().execute {
// canceling from a different thread
task.cancel()
}
}
task = client.execute(
request: request,
delegate: delegate
)
return task
}

for task in tasks {
XCTAssertThrowsError(try task.futureResult.timeout(after: .seconds(2)).wait()) {
XCTAssertEqual($0 as? HTTPClientError, .cancelled)
}
}
}

func testPlatformConnectErrorIsForwardedOnTimeout() {
let bin = HTTPBin(.http2(compress: false))
let clientGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
let el1 = clientGroup.next()
let el2 = clientGroup.next()
defer { XCTAssertNoThrow(try clientGroup.syncShutdownGracefully()) }
var tlsConfig = TLSConfiguration.makeClientConfiguration()
tlsConfig.certificateVerification = .none
let client = HTTPClient(
eventLoopGroupProvider: .shared(clientGroup),
configuration: HTTPClient.Configuration(
tlsConfiguration: tlsConfig,
timeout: .init(connect: .milliseconds(1000)),
httpVersion: .automatic
),
backgroundActivityLogger: Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:))
)
defer { XCTAssertNoThrow(try client.syncShutdown()) }

var maybeRequest1: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest1 = try HTTPClient.Request(url: "https://localhost:\(bin.port)/get"))
guard let request1 = maybeRequest1 else { return }

let task1 = client.execute(request: request1, delegate: ResponseAccumulator(request: request1), eventLoop: .delegateAndChannel(on: el1))
var response1: ResponseAccumulator.Response?
XCTAssertNoThrow(response1 = try task1.wait())

XCTAssertEqual(.ok, response1?.status)
XCTAssertEqual(response1?.version, .http2)
let serverPort = bin.port
XCTAssertNoThrow(try bin.shutdown())
// client is now in HTTP/2 state and the HTTPBin is closed
// start a new server on the old port which closes all connections immediately
let serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try serverGroup.syncShutdownGracefully()) }
var maybeServer: Channel?
XCTAssertNoThrow(maybeServer = try ServerBootstrap(group: serverGroup)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelInitializer { channel in
channel.close()
}
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.bind(host: "0.0.0.0", port: serverPort)
.wait())
guard let server = maybeServer else { return }
defer { XCTAssertNoThrow(try server.close().wait()) }

var maybeRequest2: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest2 = try HTTPClient.Request(url: "https://localhost:\(serverPort)/"))
guard let request2 = maybeRequest2 else { return }

let task2 = client.execute(request: request2, delegate: ResponseAccumulator(request: request2), eventLoop: .delegateAndChannel(on: el2))
XCTAssertThrowsError(try task2.wait()) { error in
XCTAssertNil(
error as? HTTPClientError,
"error should be some platform specific error that the connection is closed/reset by the other side"
)
}
}
}

private final class HeadReceivedCallback: HTTPClientResponseDelegate {
typealias Response = Void
private let didReceiveHeadCallback: (HTTPResponseHead) -> Void
init(didReceiveHead: @escaping (HTTPResponseHead) -> Void) {
self.didReceiveHeadCallback = didReceiveHead
}

func didReceiveHead(task: HTTPClient.Task<Void>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
self.didReceiveHeadCallback(head)
return task.eventLoop.makeSucceededVoidFuture()
}

func didFinishRequest(task: HTTPClient.Task<Void>) throws {}
}

/// sends some headers and waits indefinitely afterwards
private final class SendHeaderAndWaitChannelHandler: ChannelInboundHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh we should use this one to test idleReadTimeout as well. can be another pr.

typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let requestPart = self.unwrapInboundIn(data)
switch requestPart {
case .head:
context.writeAndFlush(self.wrapOutboundOut(.head(HTTPResponseHead(
version: HTTPVersion(major: 1, minor: 1),
status: .ok
))
), promise: nil)
case .body, .end:
return
}
}
}