Skip to content

Commit a194dea

Browse files
committed
Fix bi directional streaming test
1 parent 44efb94 commit a194dea

File tree

2 files changed

+178
-32
lines changed

2 files changed

+178
-32
lines changed

Diff for: Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift

+129-8
Original file line numberDiff line numberDiff line change
@@ -1149,23 +1149,144 @@ struct CollectEverythingLogHandler: LogHandler {
11491149
}
11501150
}
11511151

1152+
class StreamDelegate: HTTPClientResponseDelegate {
1153+
typealias Response = Void
1154+
1155+
enum State {
1156+
case idle
1157+
case waitingForBytes(EventLoopPromise<ByteBuffer?>)
1158+
case buffering(ByteBuffer, done: Bool)
1159+
case failed(Error)
1160+
case finished
1161+
}
1162+
1163+
let eventLoop: EventLoop
1164+
private var state: State = .idle
1165+
1166+
init(eventLoop: EventLoop) {
1167+
self.eventLoop = eventLoop
1168+
}
1169+
1170+
func next() -> EventLoopFuture<ByteBuffer?> {
1171+
if self.eventLoop.inEventLoop {
1172+
return self.next0()
1173+
} else {
1174+
return self.eventLoop.flatSubmit {
1175+
self.next0()
1176+
}
1177+
}
1178+
}
1179+
1180+
private func next0() -> EventLoopFuture<ByteBuffer?> {
1181+
switch self.state {
1182+
case .idle:
1183+
let promise = self.eventLoop.makePromise(of: ByteBuffer?.self)
1184+
self.state = .waitingForBytes(promise)
1185+
return promise.futureResult
1186+
1187+
case .buffering(let byteBuffer, done: false):
1188+
self.state = .idle
1189+
return self.eventLoop.makeSucceededFuture(byteBuffer)
1190+
1191+
case .buffering(let byteBuffer, done: true):
1192+
self.state = .finished
1193+
return self.eventLoop.makeSucceededFuture(byteBuffer)
1194+
1195+
case .waitingForBytes:
1196+
preconditionFailure("Don't call `.next` twice")
1197+
1198+
case .failed(let error):
1199+
self.state = .finished
1200+
return self.eventLoop.makeFailedFuture(error)
1201+
1202+
case .finished:
1203+
return self.eventLoop.makeSucceededFuture(nil)
1204+
}
1205+
}
1206+
1207+
// MARK: HTTPClientResponseDelegate
1208+
1209+
func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {
1210+
XCTAssert(self.eventLoop.inEventLoop)
1211+
}
1212+
1213+
func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {
1214+
XCTAssert(self.eventLoop.inEventLoop)
1215+
}
1216+
1217+
func didSendRequest(task: HTTPClient.Task<Response>) {
1218+
XCTAssert(self.eventLoop.inEventLoop)
1219+
}
1220+
1221+
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
1222+
XCTAssert(self.eventLoop.inEventLoop)
1223+
return task.eventLoop.makeSucceededVoidFuture()
1224+
}
1225+
1226+
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
1227+
XCTAssert(self.eventLoop.inEventLoop)
1228+
1229+
switch self.state {
1230+
case .idle:
1231+
self.state = .buffering(buffer, done: false)
1232+
case .waitingForBytes(let promise):
1233+
self.state = .idle
1234+
promise.succeed(buffer)
1235+
case .buffering(var byteBuffer, done: false):
1236+
var buffer = buffer
1237+
byteBuffer.writeBuffer(&buffer)
1238+
self.state = .buffering(byteBuffer, done: false)
1239+
case .buffering(_, done: true), .finished, .failed:
1240+
preconditionFailure("Invalid state: \(self.state)")
1241+
}
1242+
1243+
return task.eventLoop.makeSucceededVoidFuture()
1244+
}
1245+
1246+
func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
1247+
XCTAssert(self.eventLoop.inEventLoop)
1248+
1249+
switch self.state {
1250+
case .idle:
1251+
self.state = .failed(error)
1252+
case .waitingForBytes(let promise):
1253+
self.state = .finished
1254+
promise.fail(error)
1255+
case .buffering(_, done: false):
1256+
self.state = .failed(error)
1257+
case .buffering(_, done: true), .finished, .failed:
1258+
preconditionFailure("Invalid state: \(self.state)")
1259+
}
1260+
}
1261+
1262+
func didFinishRequest(task: HTTPClient.Task<Response>) throws {
1263+
XCTAssert(self.eventLoop.inEventLoop)
1264+
1265+
switch self.state {
1266+
case .idle:
1267+
self.state = .finished
1268+
case .waitingForBytes(let promise):
1269+
self.state = .finished
1270+
promise.succeed(nil)
1271+
case .buffering(let byteBuffer, done: false):
1272+
self.state = .buffering(byteBuffer, done: true)
1273+
case .buffering(_, done: true), .finished, .failed:
1274+
preconditionFailure("Invalid state: \(self.state)")
1275+
}
1276+
}
1277+
}
1278+
11521279
class HTTPEchoHandler: ChannelInboundHandler {
11531280
typealias InboundIn = HTTPServerRequestPart
11541281
typealias OutboundOut = HTTPServerResponsePart
11551282

1156-
var promises: CircularBuffer<EventLoopPromise<Void>> = CircularBuffer()
1157-
11581283
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
11591284
let request = self.unwrapInboundIn(data)
11601285
switch request {
11611286
case .head:
1162-
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), promise: nil)
1287+
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil)
11631288
case .body(let bytes):
1164-
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes)))).whenSuccess {
1165-
if let promise = self.promises.popFirst() {
1166-
promise.succeed(())
1167-
}
1168-
}
1289+
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes))), promise: nil)
11691290
case .end:
11701291
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
11711292
context.close(promise: nil)

Diff for: Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+49-24
Original file line numberDiff line numberDiff line change
@@ -2815,40 +2815,65 @@ class HTTPClientTests: XCTestCase {
28152815
XCTAssertEqual(result, .success, "we never closed the connection!")
28162816
}
28172817

2818-
func testBiDirectionalStreaming() throws {
2819-
let handler = HTTPEchoHandler()
2818+
// In this test, we test that a request can continue to stream its body after the response head,
2819+
// was received. The client sends a number to the server and waits for the server to echo the
2820+
// number. Once the client receives the echoed number, it will continue with the next number.
2821+
// The client and server ping/pong 30 times.
2822+
func testBiDirectionalStreaming() {
2823+
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() }
2824+
defer { XCTAssertNoThrow(try httpBin.shutdown()) }
28202825

2821-
let server = try ServerBootstrap(group: self.serverGroup)
2822-
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
2823-
.childChannelInitializer { channel in
2824-
channel.pipeline.configureHTTPServerPipeline().flatMap {
2825-
channel.pipeline.addHandler(handler)
2826-
}
2827-
}
2828-
.bind(host: "localhost", port: 0)
2829-
.wait()
2826+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
2827+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
2828+
let writeEL = eventLoopGroup.next()
2829+
let delegateEL = eventLoopGroup.next()
28302830

2831-
defer {
2832-
server.close(promise: nil)
2833-
}
2831+
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
2832+
defer { XCTAssertNoThrow(try httpClient.syncShutdown()) }
2833+
2834+
let delegate = StreamDelegate(eventLoop: delegateEL)
28342835

28352836
let body: HTTPClient.Body = .stream { writer in
2836-
let promise = self.clientGroup.next().makePromise(of: Void.self)
2837-
handler.promises.append(promise)
2838-
return writer.write(.byteBuffer(ByteBuffer(string: "hello"))).flatMap {
2839-
promise.futureResult
2840-
}.flatMap {
2841-
let promise = self.clientGroup.next().makePromise(of: Void.self)
2842-
handler.promises.append(promise)
2843-
return writer.write(.byteBuffer(ByteBuffer(string: "hello2"))).flatMap {
2844-
promise.futureResult
2837+
let finalPromise = writeEL.makePromise(of: Void.self)
2838+
2839+
func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) {
2840+
XCTAssert(writeEL.inEventLoop, "Always write from unexpected el")
2841+
2842+
if index >= 30 {
2843+
return finalPromise.succeed(())
2844+
}
2845+
2846+
let sent = ByteBuffer(integer: index)
2847+
writer.write(.byteBuffer(sent)).flatMap { () -> EventLoopFuture<ByteBuffer?> in
2848+
XCTAssert(delegateEL.inEventLoop, "Always dispatch back to delegate el")
2849+
return delegate.next()
2850+
}.whenComplete { result in
2851+
switch result {
2852+
case .success(let returned):
2853+
XCTAssertEqual(returned, sent)
2854+
2855+
writeEL.execute {
2856+
writeLoop(writer, index: index + 1)
2857+
}
2858+
2859+
case .failure(let error):
2860+
finalPromise.fail(error)
2861+
}
28452862
}
28462863
}
2864+
2865+
writeEL.execute {
2866+
writeLoop(writer, index: 0)
2867+
}
2868+
2869+
return finalPromise.futureResult
28472870
}
28482871

2849-
let future = self.defaultClient.execute(url: "http://localhost:\(server.localAddress!.port!)", body: body)
2872+
let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body)
2873+
let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL))
28502874

28512875
XCTAssertNoThrow(try future.wait())
2876+
XCTAssertNil(try delegate.next().wait())
28522877
}
28532878

28542879
func testSynchronousHandshakeErrorReporting() throws {

0 commit comments

Comments
 (0)