Skip to content

[HTTP1Connection] Add download streaming backpressure test #435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extension HTTP1ConnectionTests {
("testConnectionClosesOnCloseHeader", testConnectionClosesOnCloseHeader),
("testConnectionClosesOnRandomlyAppearingCloseHeader", testConnectionClosesOnRandomlyAppearingCloseHeader),
("testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader", testConnectionClosesAfterTheRequestWithoutHavingSentAnCloseHeader),
("testDownloadStreamingBackpressure", testDownloadStreamingBackpressure),
]
}
}
148 changes: 148 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,154 @@ class HTTP1ConnectionTests: XCTestCase {
XCTAssertEqual(connectionDelegate.hitConnectionClosed, 1)
XCTAssertEqual(httpBin.activeConnections, 0)
}

// In order to test backpressure we need to make sure that reads will not happen
// until the backpressure promise is succeeded. Since we cannot guarantee when
// messages will be delivered to a client pipeline and we need this test to be
// fast (no waiting for arbitrary amounts of time), we do the following.
// First, we enforce NIO to send us only 1 byte at a time. Then we send a message
// of 4 bytes. This will guarantee that if we see first byte of the message, other
// bytes a ready to be read as well. This will allow us to test if subsequent reads
// are waiting for backpressure promise.
func testDownloadStreamingBackpressure() {
class BackpressureTestDelegate: HTTPClientResponseDelegate {
typealias Response = Void

var _reads = 0
var _channel: Channel?

let lock: Lock
let backpressurePromise: EventLoopPromise<Void>
let messageReceived: EventLoopPromise<Void>

init(eventLoop: EventLoop) {
self.lock = Lock()
self.backpressurePromise = eventLoop.makePromise()
self.messageReceived = eventLoop.makePromise()
}

var reads: Int {
return self.lock.withLock {
self._reads
}
}

func willExecuteOnChannel(_ channel: Channel) {
self.lock.withLockVoid {
self._channel = channel
}
}

func didReceiveHead(task: HTTPClient.Task<Void>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
return task.futureResult.eventLoop.makeSucceededVoidFuture()
}

func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
// We count a number of reads received.
self.lock.withLockVoid {
self._reads += 1
}
// We need to notify the test when first byte of the message is arrived.
self.messageReceived.succeed(())
return self.backpressurePromise.futureResult
}

func didFinishRequest(task: HTTPClient.Task<Response>) throws {}
}

final class WriteAfterFutureSucceedsHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

let endFuture: EventLoopFuture<Void>

init(endFuture: EventLoopFuture<Void>) {
self.endFuture = endFuture
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
switch self.unwrapInboundIn(data) {
case .head:
let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok)
context.writeAndFlush(wrapOutboundOut(.head(head)), promise: nil)
case .body:
// ignore
break
case .end:
let buffer = context.channel.allocator.buffer(string: "1234")
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)

self.endFuture.hop(to: context.eventLoop).whenSuccess {
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
}
}
}
}

let logger = Logger(label: "test")

// cannot test with NIOTS as `maxMessagesPerRead` is not supported
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let requestEventLoop = eventLoopGroup.next()
let backpressureDelegate = BackpressureTestDelegate(eventLoop: requestEventLoop)

let httpBin = HTTPBin { _ in
WriteAfterFutureSucceedsHandler(
endFuture: backpressureDelegate.backpressurePromise.futureResult
)
}
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

var maybeChannel: Channel?
XCTAssertNoThrow(maybeChannel = try ClientBootstrap(group: eventLoopGroup)
.channelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.channelOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 1))
.connect(host: "localhost", port: httpBin.port)
.wait())
guard let channel = maybeChannel else { return XCTFail("Expected to have a channel at this point") }
let connectionDelegate = MockConnectionDelegate()
var maybeConnection: HTTP1Connection?
XCTAssertNoThrow(maybeConnection = try channel.eventLoop.submit { try HTTP1Connection.start(
channel: channel,
connectionID: 0,
delegate: connectionDelegate,
configuration: .init(),
logger: logger
) }.wait())
guard let connection = maybeConnection else { return XCTFail("Expected to have a connection at this point") }

var maybeRequestBag: RequestBag<BackpressureTestDelegate>?

XCTAssertNoThrow(maybeRequestBag = try RequestBag(
request: HTTPClient.Request(url: "http://localhost:\(httpBin.port)/custom"),
eventLoopPreference: .delegate(on: requestEventLoop),
task: .init(eventLoop: requestEventLoop, logger: logger),
redirectHandler: nil,
connectionDeadline: .now() + .seconds(30),
requestOptions: .forTests(),
delegate: backpressureDelegate
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
backpressureDelegate.willExecuteOnChannel(connection.channel)

connection.executeRequest(requestBag)

let requestFuture = requestBag.task.futureResult

// Send 4 bytes, but only one should be received until the backpressure promise is succeeded.

// Now we wait until message is delivered to client channel pipeline
XCTAssertNoThrow(try backpressureDelegate.messageReceived.futureResult.wait())
XCTAssertEqual(backpressureDelegate.reads, 1)

// Succeed the backpressure promise.
backpressureDelegate.backpressurePromise.succeed(())
XCTAssertNoThrow(try requestFuture.wait())

// At this point all other bytes should be delivered.
XCTAssertEqual(backpressureDelegate.reads, 4)
}
}

class MockHTTP1ConnectionDelegate: HTTP1ConnectionDelegate {
Expand Down