diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift index 05844a517..35c510e4f 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests+XCTest.swift @@ -32,6 +32,7 @@ extension HTTP1ConnectionTests { ("testConnectionClosesOnCloseHeader", testConnectionClosesOnCloseHeader), ("testConnectionClosesOnRandomlyAppearingCloseHeader", testConnectionClosesOnRandomlyAppearingCloseHeader), ("testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader", testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader), + ("testDownloadStreamingBackpressure", testDownloadStreamingBackpressure), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift index bb99454d6..6fc8f3e94 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift @@ -364,6 +364,154 @@ class HTTP1ConnectionTests: XCTestCase { XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1) XCTAssertEqual(httpBin.activeConnections, 0) } + + // In order to test backpressure we need to make sure that reads will not happen + // until the backpressure promise is succeeded. Since we cannot guarantee when + // messages will be delivered to a client pipeline and we need this test to be + // fast (no waiting for arbitrary amounts of time), we do the following. + // First, we enforce NIO to send us only 1 byte at a time. Then we send a message + // of 4 bytes. This will guarantee that if we see first byte of the message, other + // bytes a ready to be read as well. This will allow us to test if subsequent reads + // are waiting for backpressure promise. + func testDownloadStreamingBackpressure() { + class BackpressureTestDelegate: HTTPClientResponseDelegate { + typealias Response = Void + + var _reads = 0 + var _channel: Channel? + + let lock: Lock + let backpressurePromise: EventLoopPromise<Void> + let messageReceived: EventLoopPromise<Void> + + init(eventLoop: EventLoop) { + self.lock = Lock() + self.backpressurePromise = eventLoop.makePromise() + self.messageReceived = eventLoop.makePromise() + } + + var reads: Int { + return self.lock.withLock { + self._reads + } + } + + func willExecuteOnChannel(_ channel: Channel) { + self.lock.withLockVoid { + self._channel = channel + } + } + + func didReceiveHead(task: HTTPClient.Task<Void>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> { + return task.futureResult.eventLoop.makeSucceededVoidFuture() + } + + func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> { + // We count a number of reads received. + self.lock.withLockVoid { + self._reads += 1 + } + // We need to notify the test when first byte of the message is arrived. + self.messageReceived.succeed(()) + return self.backpressurePromise.futureResult + } + + func didFinishRequest(task: HTTPClient.Task<Response>) throws {} + } + + final class WriteAfterFutureSucceedsHandler: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + let endFuture: EventLoopFuture<Void> + + init(endFuture: EventLoopFuture<Void>) { + self.endFuture = endFuture + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + switch self.unwrapInboundIn(data) { + case .head: + let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok) + context.writeAndFlush(wrapOutboundOut(.head(head)), promise: nil) + case .body: + // ignore + break + case .end: + let buffer = context.channel.allocator.buffer(string: "1234") + context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil) + + self.endFuture.hop(to: context.eventLoop).whenSuccess { + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } + } + } + } + + let logger = Logger(label: "test") + + // cannot test with NIOTS as `maxMessagesPerRead` is not supported + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let requestEventLoop = eventLoopGroup.next() + let backpressureDelegate = BackpressureTestDelegate(eventLoop: requestEventLoop) + + let httpBin = HTTPBin { _ in + WriteAfterFutureSucceedsHandler( + endFuture: backpressureDelegate.backpressurePromise.futureResult + ) + } + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + + var maybeChannel: Channel? + XCTAssertNoThrow(maybeChannel = try ClientBootstrap(group: eventLoopGroup) + .channelOption(ChannelOptions.maxMessagesPerRead, value: 1) + .channelOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 1)) + .connect(host: "localhost", port: httpBin.port) + .wait()) + guard let channel = maybeChannel else { return XCTFail("Expected to have a channel at this point") } + let connectionDelegate = MockConnectionDelegate() + var maybeConnection: HTTP1Connection? + XCTAssertNoThrow(maybeConnection = try channel.eventLoop.submit { try HTTP1Connection.start( + channel: channel, + connectionID: 0, + delegate: connectionDelegate, + configuration: .init(), + logger: logger + ) }.wait()) + guard let connection = maybeConnection else { return XCTFail("Expected to have a connection at this point") } + + var maybeRequestBag: RequestBag<BackpressureTestDelegate>? + + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: HTTPClient.Request(url: "http://localhost:\(httpBin.port)/custom"), + eventLoopPreference: .delegate(on: requestEventLoop), + task: .init(eventLoop: requestEventLoop, logger: logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + requestOptions: .forTests(), + delegate: backpressureDelegate + )) + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } + backpressureDelegate.willExecuteOnChannel(connection.channel) + + connection.executeRequest(requestBag) + + let requestFuture = requestBag.task.futureResult + + // Send 4 bytes, but only one should be received until the backpressure promise is succeeded. + + // Now we wait until message is delivered to client channel pipeline + XCTAssertNoThrow(try backpressureDelegate.messageReceived.futureResult.wait()) + XCTAssertEqual(backpressureDelegate.reads, 1) + + // Succeed the backpressure promise. + backpressureDelegate.backpressurePromise.succeed(()) + XCTAssertNoThrow(try requestFuture.wait()) + + // At this point all other bytes should be delivered. + XCTAssertEqual(backpressureDelegate.reads, 4) + } } class MockHTTP1ConnectionDelegate: HTTP1ConnectionDelegate {