Skip to content

Commit da5da25

Browse files
authored
Fix bi directional streaming test (#405)
The bi directional streaming test writes and reads from an echo server.
1 parent 44efb94 commit da5da25

File tree

2 files changed

+193
-32
lines changed

2 files changed

+193
-32
lines changed

Diff for: Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift

+142-8
Original file line numberDiff line numberDiff line change
@@ -1149,23 +1149,157 @@ struct CollectEverythingLogHandler: LogHandler {
11491149
}
11501150
}
11511151

1152+
/// A ``HTTPClientResponseDelegate`` that buffers the incoming response parts for the consumer. The consumer can
1153+
/// consume the bytes by calling ``next()`` on the delegate.
1154+
///
1155+
/// The sole purpose of this class is to enable straight-line stream tests.
1156+
class ResponseStreamDelegate: HTTPClientResponseDelegate {
1157+
typealias Response = Void
1158+
1159+
enum State {
1160+
/// The delegate is in the idle state. There are no http response parts to be buffered
1161+
/// and the consumer did not signal a demand. Transitions to all other states are allowed.
1162+
case idle
1163+
/// The consumer has signaled a demand for more bytes, but none where available. Can
1164+
/// transition to `.idle` (when new bytes arrive), `.finished` (when the stream finishes or fails)
1165+
case waitingForBytes(EventLoopPromise<ByteBuffer?>)
1166+
/// The consumer has signaled no further demand but bytes keep arriving. Valid transitions
1167+
/// to `.idle` (when bytes are consumed), `.finished` (when bytes are consumed, and the
1168+
/// stream has ended), `.failed` (if an error is forwarded)
1169+
case buffering(ByteBuffer, done: Bool)
1170+
/// Stores an error for consumption. Valid transitions are: `.finished`, when the error was consumed.
1171+
case failed(Error)
1172+
/// The stream has finished and all bytes or errors where consumed.
1173+
case finished
1174+
}
1175+
1176+
let eventLoop: EventLoop
1177+
private var state: State = .idle
1178+
1179+
init(eventLoop: EventLoop) {
1180+
self.eventLoop = eventLoop
1181+
}
1182+
1183+
func next() -> EventLoopFuture<ByteBuffer?> {
1184+
if self.eventLoop.inEventLoop {
1185+
return self.next0()
1186+
} else {
1187+
return self.eventLoop.flatSubmit {
1188+
self.next0()
1189+
}
1190+
}
1191+
}
1192+
1193+
private func next0() -> EventLoopFuture<ByteBuffer?> {
1194+
switch self.state {
1195+
case .idle:
1196+
let promise = self.eventLoop.makePromise(of: ByteBuffer?.self)
1197+
self.state = .waitingForBytes(promise)
1198+
return promise.futureResult
1199+
1200+
case .buffering(let byteBuffer, done: false):
1201+
self.state = .idle
1202+
return self.eventLoop.makeSucceededFuture(byteBuffer)
1203+
1204+
case .buffering(let byteBuffer, done: true):
1205+
self.state = .finished
1206+
return self.eventLoop.makeSucceededFuture(byteBuffer)
1207+
1208+
case .waitingForBytes:
1209+
preconditionFailure("Don't call `.next` twice")
1210+
1211+
case .failed(let error):
1212+
self.state = .finished
1213+
return self.eventLoop.makeFailedFuture(error)
1214+
1215+
case .finished:
1216+
return self.eventLoop.makeSucceededFuture(nil)
1217+
}
1218+
}
1219+
1220+
// MARK: HTTPClientResponseDelegate
1221+
1222+
func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {
1223+
self.eventLoop.preconditionInEventLoop()
1224+
}
1225+
1226+
func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {
1227+
self.eventLoop.preconditionInEventLoop()
1228+
}
1229+
1230+
func didSendRequest(task: HTTPClient.Task<Response>) {
1231+
self.eventLoop.preconditionInEventLoop()
1232+
}
1233+
1234+
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
1235+
self.eventLoop.preconditionInEventLoop()
1236+
return task.eventLoop.makeSucceededVoidFuture()
1237+
}
1238+
1239+
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
1240+
self.eventLoop.preconditionInEventLoop()
1241+
1242+
switch self.state {
1243+
case .idle:
1244+
self.state = .buffering(buffer, done: false)
1245+
case .waitingForBytes(let promise):
1246+
self.state = .idle
1247+
promise.succeed(buffer)
1248+
case .buffering(var byteBuffer, done: false):
1249+
var buffer = buffer
1250+
byteBuffer.writeBuffer(&buffer)
1251+
self.state = .buffering(byteBuffer, done: false)
1252+
case .buffering(_, done: true), .finished, .failed:
1253+
preconditionFailure("Invalid state: \(self.state)")
1254+
}
1255+
1256+
return task.eventLoop.makeSucceededVoidFuture()
1257+
}
1258+
1259+
func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
1260+
self.eventLoop.preconditionInEventLoop()
1261+
1262+
switch self.state {
1263+
case .idle:
1264+
self.state = .failed(error)
1265+
case .waitingForBytes(let promise):
1266+
self.state = .finished
1267+
promise.fail(error)
1268+
case .buffering(_, done: false):
1269+
self.state = .failed(error)
1270+
case .buffering(_, done: true), .finished, .failed:
1271+
preconditionFailure("Invalid state: \(self.state)")
1272+
}
1273+
}
1274+
1275+
func didFinishRequest(task: HTTPClient.Task<Response>) throws {
1276+
self.eventLoop.preconditionInEventLoop()
1277+
1278+
switch self.state {
1279+
case .idle:
1280+
self.state = .finished
1281+
case .waitingForBytes(let promise):
1282+
self.state = .finished
1283+
promise.succeed(nil)
1284+
case .buffering(let byteBuffer, done: false):
1285+
self.state = .buffering(byteBuffer, done: true)
1286+
case .buffering(_, done: true), .finished, .failed:
1287+
preconditionFailure("Invalid state: \(self.state)")
1288+
}
1289+
}
1290+
}
1291+
11521292
class HTTPEchoHandler: ChannelInboundHandler {
11531293
typealias InboundIn = HTTPServerRequestPart
11541294
typealias OutboundOut = HTTPServerResponsePart
11551295

1156-
var promises: CircularBuffer<EventLoopPromise<Void>> = CircularBuffer()
1157-
11581296
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
11591297
let request = self.unwrapInboundIn(data)
11601298
switch request {
11611299
case .head:
1162-
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), promise: nil)
1300+
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil)
11631301
case .body(let bytes):
1164-
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes)))).whenSuccess {
1165-
if let promise = self.promises.popFirst() {
1166-
promise.succeed(())
1167-
}
1168-
}
1302+
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes))), promise: nil)
11691303
case .end:
11701304
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
11711305
context.close(promise: nil)

Diff for: Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+51-24
Original file line numberDiff line numberDiff line change
@@ -2815,40 +2815,67 @@ class HTTPClientTests: XCTestCase {
28152815
XCTAssertEqual(result, .success, "we never closed the connection!")
28162816
}
28172817

2818-
func testBiDirectionalStreaming() throws {
2819-
let handler = HTTPEchoHandler()
2818+
// In this test, we test that a request can continue to stream its body after the response head,
2819+
// was received. The client sends a number to the server and waits for the server to echo the
2820+
// number. Once the client receives the echoed number, it will continue with the next number.
2821+
// The client and server ping/pong 30 times.
2822+
func testBiDirectionalStreaming() {
2823+
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() }
2824+
defer { XCTAssertNoThrow(try httpBin.shutdown()) }
28202825

2821-
let server = try ServerBootstrap(group: self.serverGroup)
2822-
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
2823-
.childChannelInitializer { channel in
2824-
channel.pipeline.configureHTTPServerPipeline().flatMap {
2825-
channel.pipeline.addHandler(handler)
2826-
}
2827-
}
2828-
.bind(host: "localhost", port: 0)
2829-
.wait()
2826+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
2827+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
2828+
let writeEL = eventLoopGroup.next()
2829+
let delegateEL = eventLoopGroup.next()
28302830

2831-
defer {
2832-
server.close(promise: nil)
2833-
}
2831+
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
2832+
defer { XCTAssertNoThrow(try httpClient.syncShutdown()) }
2833+
2834+
let delegate = ResponseStreamDelegate(eventLoop: delegateEL)
28342835

28352836
let body: HTTPClient.Body = .stream { writer in
2836-
let promise = self.clientGroup.next().makePromise(of: Void.self)
2837-
handler.promises.append(promise)
2838-
return writer.write(.byteBuffer(ByteBuffer(string: "hello"))).flatMap {
2839-
promise.futureResult
2840-
}.flatMap {
2841-
let promise = self.clientGroup.next().makePromise(of: Void.self)
2842-
handler.promises.append(promise)
2843-
return writer.write(.byteBuffer(ByteBuffer(string: "hello2"))).flatMap {
2844-
promise.futureResult
2837+
let finalPromise = writeEL.makePromise(of: Void.self)
2838+
2839+
func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) {
2840+
// always invoke from the wrong el to test thread safety
2841+
writeEL.preconditionInEventLoop()
2842+
2843+
if index >= 30 {
2844+
return finalPromise.succeed(())
2845+
}
2846+
2847+
let sent = ByteBuffer(integer: index)
2848+
writer.write(.byteBuffer(sent)).flatMap { () -> EventLoopFuture<ByteBuffer?> in
2849+
// ensure, that the writer dispatches back to the expected delegate el.
2850+
delegateEL.preconditionInEventLoop()
2851+
return delegate.next()
2852+
}.whenComplete { result in
2853+
switch result {
2854+
case .success(let returned):
2855+
XCTAssertEqual(returned, sent)
2856+
2857+
writeEL.execute {
2858+
writeLoop(writer, index: index + 1)
2859+
}
2860+
2861+
case .failure(let error):
2862+
finalPromise.fail(error)
2863+
}
28452864
}
28462865
}
2866+
2867+
writeEL.execute {
2868+
writeLoop(writer, index: 0)
2869+
}
2870+
2871+
return finalPromise.futureResult
28472872
}
28482873

2849-
let future = self.defaultClient.execute(url: "http://localhost:\(server.localAddress!.port!)", body: body)
2874+
let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body)
2875+
let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL))
28502876

28512877
XCTAssertNoThrow(try future.wait())
2878+
XCTAssertNil(try delegate.next().wait())
28522879
}
28532880

28542881
func testSynchronousHandshakeErrorReporting() throws {

0 commit comments

Comments
 (0)