@@ -368,11 +368,49 @@ class HTTPClientInternalTests: XCTestCase {
368
368
func didFinishRequest( task: HTTPClient . Task < Response > ) throws { }
369
369
}
370
370
371
+ final class WriteAfterFutureSucceedsHandler : ChannelInboundHandler {
372
+ typealias InboundIn = HTTPServerRequestPart
373
+ typealias OutboundOut = HTTPServerResponsePart
374
+
375
+ let bodyFuture : EventLoopFuture < Void >
376
+ let endFuture : EventLoopFuture < Void >
377
+
378
+ init ( bodyFuture: EventLoopFuture < Void > , endFuture: EventLoopFuture < Void > ) {
379
+ self . bodyFuture = bodyFuture
380
+ self . endFuture = endFuture
381
+ }
382
+
383
+ func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
384
+ switch self . unwrapInboundIn ( data) {
385
+ case . head:
386
+ let head = HTTPResponseHead ( version: HTTPVersion ( major: 1 , minor: 1 ) , status: . ok)
387
+ context. writeAndFlush ( wrapOutboundOut ( . head( head) ) , promise: nil )
388
+ case . body:
389
+ // ignore
390
+ break
391
+ case . end:
392
+ self . bodyFuture. hop ( to: context. eventLoop) . whenSuccess {
393
+ let buffer = context. channel. allocator. buffer ( string: " 1234 " )
394
+ context. writeAndFlush ( self . wrapOutboundOut ( . body( . byteBuffer( buffer) ) ) , promise: nil )
395
+ }
396
+
397
+ self . endFuture. hop ( to: context. eventLoop) . whenSuccess {
398
+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: nil )
399
+ }
400
+ }
401
+ }
402
+ }
403
+
371
404
// cannot test with NIOTS as `maxMessagesPerRead` is not supported
372
405
let eventLoopGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
373
406
let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( eventLoopGroup) )
374
- let promise = httpClient. eventLoopGroup. next ( ) . makePromise ( of: Channel . self)
375
- let httpBin = HTTPBin ( channelPromise: promise)
407
+ let delegate = BackpressureTestDelegate ( eventLoop: httpClient. eventLoopGroup. next ( ) )
408
+ let httpBin = HTTPBin { _ in
409
+ WriteAfterFutureSucceedsHandler (
410
+ bodyFuture: delegate. optionsApplied. futureResult,
411
+ endFuture: delegate. backpressurePromise. futureResult
412
+ )
413
+ }
376
414
377
415
defer {
378
416
XCTAssertNoThrow ( try httpClient. syncShutdown ( requiresCleanClose: true ) )
@@ -381,27 +419,21 @@ class HTTPClientInternalTests: XCTestCase {
381
419
}
382
420
383
421
let request = try Request ( url: " http://localhost: \( httpBin. port) /custom " )
384
- let delegate = BackpressureTestDelegate ( eventLoop: httpClient. eventLoopGroup. next ( ) )
385
- let future = httpClient. execute ( request: request, delegate: delegate) . futureResult
386
422
387
- let channel = try promise . futureResult . wait ( )
423
+ let requestFuture = httpClient . execute ( request : request , delegate : delegate ) . futureResult
388
424
389
425
// We need to wait for channel options that limit NIO to sending only one byte at a time.
390
426
try delegate. optionsApplied. futureResult. wait ( )
391
427
392
428
// Send 4 bytes, but only one should be received until the backpressure promise is succeeded.
393
- let buffer = channel. allocator. buffer ( string: " 1234 " )
394
- try channel. writeAndFlush ( HTTPServerResponsePart . body ( . byteBuffer( buffer) ) ) . wait ( )
395
429
396
430
// Now we wait until message is delivered to client channel pipeline
397
431
try delegate. messageReceived. futureResult. wait ( )
398
432
XCTAssertEqual ( delegate. reads, 1 )
399
433
400
434
// Succeed the backpressure promise.
401
435
delegate. backpressurePromise. succeed ( ( ) )
402
-
403
- try channel. writeAndFlush ( HTTPServerResponsePart . end ( nil ) ) . wait ( )
404
- try future. wait ( )
436
+ try requestFuture. wait ( )
405
437
406
438
// At this point all other bytes should be delivered.
407
439
XCTAssertEqual ( delegate. reads, 4 )
@@ -602,7 +634,7 @@ class HTTPClientInternalTests: XCTestCase {
602
634
}
603
635
604
636
func testResponseConnectionCloseGet( ) throws {
605
- let httpBin = HTTPBin ( ssl : false )
637
+ let httpBin = HTTPBin ( . http1_1 ( ) )
606
638
let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( self . clientGroup) ,
607
639
configuration: HTTPClient . Configuration ( certificateVerification: . none) )
608
640
defer {
@@ -756,14 +788,14 @@ class HTTPClientInternalTests: XCTestCase {
756
788
struct NoChannelError : Error { }
757
789
758
790
let client = HTTPClient ( eventLoopGroupProvider: . shared( self . clientGroup) )
759
- var maybeServersAndChannels : [ ( HTTPBin , Channel ) ] ?
791
+ var maybeServersAndChannels : [ ( HTTPBin < HTTPBinHandler > , Channel ) ] ?
760
792
XCTAssertNoThrow ( maybeServersAndChannels = try ( 0 ..< 10 ) . map { _ in
761
793
let web = HTTPBin ( )
762
794
defer {
763
795
XCTAssertNoThrow ( try web. shutdown ( ) )
764
796
}
765
797
766
- let req = try ! HTTPClient . Request ( url: " http://localhost: \( web. serverChannel . localAddress! . port! ) /get " ,
798
+ let req = try ! HTTPClient . Request ( url: " http://localhost: \( web. port) /get " ,
767
799
method: . GET,
768
800
body: nil )
769
801
var maybeConnection : Connection ?
@@ -847,7 +879,7 @@ class HTTPClientInternalTests: XCTestCase {
847
879
XCTAssertNoThrow ( try client. syncShutdown ( ) )
848
880
}
849
881
850
- let req = try ! HTTPClient . Request ( url: " http://localhost: \( web. serverChannel . localAddress! . port! ) /get " ,
882
+ let req = try ! HTTPClient . Request ( url: " http://localhost: \( web. port) /get " ,
851
883
method: . GET,
852
884
body: nil )
853
885
@@ -1083,7 +1115,7 @@ class HTTPClientInternalTests: XCTestCase {
1083
1115
let el1 = elg. next ( )
1084
1116
let el2 = elg. next ( )
1085
1117
1086
- let httpBin = HTTPBin ( refusesConnections : true )
1118
+ let httpBin = HTTPBin ( . refuse )
1087
1119
let client = HTTPClient ( eventLoopGroupProvider: . shared( elg) )
1088
1120
1089
1121
defer {
0 commit comments