Skip to content

Fix bi directional streaming test #405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 142 additions & 8 deletions Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1149,23 +1149,157 @@ struct CollectEverythingLogHandler: LogHandler {
}
}

/// A ``HTTPClientResponseDelegate`` that buffers the incoming response parts for the consumer. The consumer can
/// consume the bytes by calling ``next()`` on the delegate.
///
/// The sole purpose of this class is to enable straight-line stream tests.
class ResponseStreamDelegate: HTTPClientResponseDelegate {
typealias Response = Void

enum State {
/// The delegate is in the idle state. There are no http response parts to be buffered
/// and the consumer did not signal a demand. Transitions to all other states are allowed.
case idle
/// The consumer has signaled a demand for more bytes, but none where available. Can
/// transition to `.idle` (when new bytes arrive), `.finished` (when the stream finishes or fails)
case waitingForBytes(EventLoopPromise<ByteBuffer?>)
/// The consumer has signaled no further demand but bytes keep arriving. Valid transitions
/// to `.idle` (when bytes are consumed), `.finished` (when bytes are consumed, and the
/// stream has ended), `.failed` (if an error is forwarded)
case buffering(ByteBuffer, done: Bool)
/// Stores an error for consumption. Valid transitions are: `.finished`, when the error was consumed.
case failed(Error)
/// The stream has finished and all bytes or errors where consumed.
case finished
}

let eventLoop: EventLoop
private var state: State = .idle

init(eventLoop: EventLoop) {
self.eventLoop = eventLoop
}

func next() -> EventLoopFuture<ByteBuffer?> {
if self.eventLoop.inEventLoop {
return self.next0()
} else {
return self.eventLoop.flatSubmit {
self.next0()
}
}
}

private func next0() -> EventLoopFuture<ByteBuffer?> {
switch self.state {
case .idle:
let promise = self.eventLoop.makePromise(of: ByteBuffer?.self)
self.state = .waitingForBytes(promise)
return promise.futureResult

case .buffering(let byteBuffer, done: false):
self.state = .idle
return self.eventLoop.makeSucceededFuture(byteBuffer)

case .buffering(let byteBuffer, done: true):
self.state = .finished
return self.eventLoop.makeSucceededFuture(byteBuffer)

case .waitingForBytes:
preconditionFailure("Don't call `.next` twice")

case .failed(let error):
self.state = .finished
return self.eventLoop.makeFailedFuture(error)

case .finished:
return self.eventLoop.makeSucceededFuture(nil)
}
}

// MARK: HTTPClientResponseDelegate

func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {
self.eventLoop.preconditionInEventLoop()
}

func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {
self.eventLoop.preconditionInEventLoop()
}

func didSendRequest(task: HTTPClient.Task<Response>) {
self.eventLoop.preconditionInEventLoop()
}

func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
self.eventLoop.preconditionInEventLoop()
return task.eventLoop.makeSucceededVoidFuture()
}

func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
self.eventLoop.preconditionInEventLoop()

switch self.state {
case .idle:
self.state = .buffering(buffer, done: false)
case .waitingForBytes(let promise):
self.state = .idle
promise.succeed(buffer)
case .buffering(var byteBuffer, done: false):
var buffer = buffer
byteBuffer.writeBuffer(&buffer)
self.state = .buffering(byteBuffer, done: false)
case .buffering(_, done: true), .finished, .failed:
preconditionFailure("Invalid state: \(self.state)")
}

return task.eventLoop.makeSucceededVoidFuture()
}

func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
self.eventLoop.preconditionInEventLoop()

switch self.state {
case .idle:
self.state = .failed(error)
case .waitingForBytes(let promise):
self.state = .finished
promise.fail(error)
case .buffering(_, done: false):
self.state = .failed(error)
case .buffering(_, done: true), .finished, .failed:
preconditionFailure("Invalid state: \(self.state)")
}
}

func didFinishRequest(task: HTTPClient.Task<Response>) throws {
self.eventLoop.preconditionInEventLoop()

switch self.state {
case .idle:
self.state = .finished
case .waitingForBytes(let promise):
self.state = .finished
promise.succeed(nil)
case .buffering(let byteBuffer, done: false):
self.state = .buffering(byteBuffer, done: true)
case .buffering(_, done: true), .finished, .failed:
preconditionFailure("Invalid state: \(self.state)")
}
}
}

class HTTPEchoHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

var promises: CircularBuffer<EventLoopPromise<Void>> = CircularBuffer()

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let request = self.unwrapInboundIn(data)
switch request {
case .head:
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil)
case .body(let bytes):
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes)))).whenSuccess {
if let promise = self.promises.popFirst() {
promise.succeed(())
}
}
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes))), promise: nil)
case .end:
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
context.close(promise: nil)
Expand Down
75 changes: 51 additions & 24 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2815,40 +2815,67 @@ class HTTPClientTests: XCTestCase {
XCTAssertEqual(result, .success, "we never closed the connection!")
}

func testBiDirectionalStreaming() throws {
let handler = HTTPEchoHandler()
// In this test, we test that a request can continue to stream its body after the response head,
// was received. The client sends a number to the server and waits for the server to echo the
// number. Once the client receives the echoed number, it will continue with the next number.
// The client and server ping/pong 30 times.
func testBiDirectionalStreaming() {
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() }
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let server = try ServerBootstrap(group: self.serverGroup)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline().flatMap {
channel.pipeline.addHandler(handler)
}
}
.bind(host: "localhost", port: 0)
.wait()
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let writeEL = eventLoopGroup.next()
let delegateEL = eventLoopGroup.next()

defer {
server.close(promise: nil)
}
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
defer { XCTAssertNoThrow(try httpClient.syncShutdown()) }

let delegate = ResponseStreamDelegate(eventLoop: delegateEL)

let body: HTTPClient.Body = .stream { writer in
let promise = self.clientGroup.next().makePromise(of: Void.self)
handler.promises.append(promise)
return writer.write(.byteBuffer(ByteBuffer(string: "hello"))).flatMap {
promise.futureResult
}.flatMap {
let promise = self.clientGroup.next().makePromise(of: Void.self)
handler.promises.append(promise)
return writer.write(.byteBuffer(ByteBuffer(string: "hello2"))).flatMap {
promise.futureResult
let finalPromise = writeEL.makePromise(of: Void.self)

func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) {
// always invoke from the wrong el to test thread safety
writeEL.preconditionInEventLoop()

if index >= 30 {
return finalPromise.succeed(())
}

let sent = ByteBuffer(integer: index)
writer.write(.byteBuffer(sent)).flatMap { () -> EventLoopFuture<ByteBuffer?> in
// ensure, that the writer dispatches back to the expected delegate el.
delegateEL.preconditionInEventLoop()
return delegate.next()
}.whenComplete { result in
switch result {
case .success(let returned):
XCTAssertEqual(returned, sent)

writeEL.execute {
writeLoop(writer, index: index + 1)
}

case .failure(let error):
finalPromise.fail(error)
}
}
}

writeEL.execute {
writeLoop(writer, index: 0)
}

return finalPromise.futureResult
}

let future = self.defaultClient.execute(url: "http://localhost:\(server.localAddress!.port!)", body: body)
let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body)
let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL))

XCTAssertNoThrow(try future.wait())
XCTAssertNil(try delegate.next().wait())
}

func testSynchronousHandshakeErrorReporting() throws {
Expand Down