diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 5e75d5cc6..db4e54564 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -1149,23 +1149,157 @@ struct CollectEverythingLogHandler: LogHandler { } } +/// A ``HTTPClientResponseDelegate`` that buffers the incoming response parts for the consumer. The consumer can +/// consume the bytes by calling ``next()`` on the delegate. +/// +/// The sole purpose of this class is to enable straight-line stream tests. +class ResponseStreamDelegate: HTTPClientResponseDelegate { + typealias Response = Void + + enum State { + /// The delegate is in the idle state. There are no http response parts to be buffered + /// and the consumer did not signal a demand. Transitions to all other states are allowed. + case idle + /// The consumer has signaled a demand for more bytes, but none where available. Can + /// transition to `.idle` (when new bytes arrive), `.finished` (when the stream finishes or fails) + case waitingForBytes(EventLoopPromise) + /// The consumer has signaled no further demand but bytes keep arriving. Valid transitions + /// to `.idle` (when bytes are consumed), `.finished` (when bytes are consumed, and the + /// stream has ended), `.failed` (if an error is forwarded) + case buffering(ByteBuffer, done: Bool) + /// Stores an error for consumption. Valid transitions are: `.finished`, when the error was consumed. + case failed(Error) + /// The stream has finished and all bytes or errors where consumed. + case finished + } + + let eventLoop: EventLoop + private var state: State = .idle + + init(eventLoop: EventLoop) { + self.eventLoop = eventLoop + } + + func next() -> EventLoopFuture { + if self.eventLoop.inEventLoop { + return self.next0() + } else { + return self.eventLoop.flatSubmit { + self.next0() + } + } + } + + private func next0() -> EventLoopFuture { + switch self.state { + case .idle: + let promise = self.eventLoop.makePromise(of: ByteBuffer?.self) + self.state = .waitingForBytes(promise) + return promise.futureResult + + case .buffering(let byteBuffer, done: false): + self.state = .idle + return self.eventLoop.makeSucceededFuture(byteBuffer) + + case .buffering(let byteBuffer, done: true): + self.state = .finished + return self.eventLoop.makeSucceededFuture(byteBuffer) + + case .waitingForBytes: + preconditionFailure("Don't call `.next` twice") + + case .failed(let error): + self.state = .finished + return self.eventLoop.makeFailedFuture(error) + + case .finished: + return self.eventLoop.makeSucceededFuture(nil) + } + } + + // MARK: HTTPClientResponseDelegate + + func didSendRequestHead(task: HTTPClient.Task, _ head: HTTPRequestHead) { + self.eventLoop.preconditionInEventLoop() + } + + func didSendRequestPart(task: HTTPClient.Task, _ part: IOData) { + self.eventLoop.preconditionInEventLoop() + } + + func didSendRequest(task: HTTPClient.Task) { + self.eventLoop.preconditionInEventLoop() + } + + func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { + self.eventLoop.preconditionInEventLoop() + return task.eventLoop.makeSucceededVoidFuture() + } + + func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { + self.eventLoop.preconditionInEventLoop() + + switch self.state { + case .idle: + self.state = .buffering(buffer, done: false) + case .waitingForBytes(let promise): + self.state = .idle + promise.succeed(buffer) + case .buffering(var byteBuffer, done: false): + var buffer = buffer + byteBuffer.writeBuffer(&buffer) + self.state = .buffering(byteBuffer, done: false) + case .buffering(_, done: true), .finished, .failed: + preconditionFailure("Invalid state: \(self.state)") + } + + return task.eventLoop.makeSucceededVoidFuture() + } + + func didReceiveError(task: HTTPClient.Task, _ error: Error) { + self.eventLoop.preconditionInEventLoop() + + switch self.state { + case .idle: + self.state = .failed(error) + case .waitingForBytes(let promise): + self.state = .finished + promise.fail(error) + case .buffering(_, done: false): + self.state = .failed(error) + case .buffering(_, done: true), .finished, .failed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + func didFinishRequest(task: HTTPClient.Task) throws { + self.eventLoop.preconditionInEventLoop() + + switch self.state { + case .idle: + self.state = .finished + case .waitingForBytes(let promise): + self.state = .finished + promise.succeed(nil) + case .buffering(let byteBuffer, done: false): + self.state = .buffering(byteBuffer, done: true) + case .buffering(_, done: true), .finished, .failed: + preconditionFailure("Invalid state: \(self.state)") + } + } +} + class HTTPEchoHandler: ChannelInboundHandler { typealias InboundIn = HTTPServerRequestPart typealias OutboundOut = HTTPServerResponsePart - var promises: CircularBuffer> = CircularBuffer() - func channelRead(context: ChannelHandlerContext, data: NIOAny) { let request = self.unwrapInboundIn(data) switch request { case .head: - context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil) case .body(let bytes): - context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes)))).whenSuccess { - if let promise = self.promises.popFirst() { - promise.succeed(()) - } - } + context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes))), promise: nil) case .end: context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) context.close(promise: nil) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 490860b1a..f37510778 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -2815,40 +2815,67 @@ class HTTPClientTests: XCTestCase { XCTAssertEqual(result, .success, "we never closed the connection!") } - func testBiDirectionalStreaming() throws { - let handler = HTTPEchoHandler() + // In this test, we test that a request can continue to stream its body after the response head, + // was received. The client sends a number to the server and waits for the server to echo the + // number. Once the client receives the echoed number, it will continue with the next number. + // The client and server ping/pong 30 times. + func testBiDirectionalStreaming() { + let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() } + defer { XCTAssertNoThrow(try httpBin.shutdown()) } - let server = try ServerBootstrap(group: self.serverGroup) - .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) - .childChannelInitializer { channel in - channel.pipeline.configureHTTPServerPipeline().flatMap { - channel.pipeline.addHandler(handler) - } - } - .bind(host: "localhost", port: 0) - .wait() + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let writeEL = eventLoopGroup.next() + let delegateEL = eventLoopGroup.next() - defer { - server.close(promise: nil) - } + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) + defer { XCTAssertNoThrow(try httpClient.syncShutdown()) } + + let delegate = ResponseStreamDelegate(eventLoop: delegateEL) let body: HTTPClient.Body = .stream { writer in - let promise = self.clientGroup.next().makePromise(of: Void.self) - handler.promises.append(promise) - return writer.write(.byteBuffer(ByteBuffer(string: "hello"))).flatMap { - promise.futureResult - }.flatMap { - let promise = self.clientGroup.next().makePromise(of: Void.self) - handler.promises.append(promise) - return writer.write(.byteBuffer(ByteBuffer(string: "hello2"))).flatMap { - promise.futureResult + let finalPromise = writeEL.makePromise(of: Void.self) + + func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { + // always invoke from the wrong el to test thread safety + writeEL.preconditionInEventLoop() + + if index >= 30 { + return finalPromise.succeed(()) + } + + let sent = ByteBuffer(integer: index) + writer.write(.byteBuffer(sent)).flatMap { () -> EventLoopFuture in + // ensure, that the writer dispatches back to the expected delegate el. + delegateEL.preconditionInEventLoop() + return delegate.next() + }.whenComplete { result in + switch result { + case .success(let returned): + XCTAssertEqual(returned, sent) + + writeEL.execute { + writeLoop(writer, index: index + 1) + } + + case .failure(let error): + finalPromise.fail(error) + } } } + + writeEL.execute { + writeLoop(writer, index: 0) + } + + return finalPromise.futureResult } - let future = self.defaultClient.execute(url: "http://localhost:\(server.localAddress!.port!)", body: body) + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body) + let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL)) XCTAssertNoThrow(try future.wait()) + XCTAssertNil(try delegate.next().wait()) } func testSynchronousHandshakeErrorReporting() throws {