@@ -364,6 +364,154 @@ class HTTP1ConnectionTests: XCTestCase {
364
364
XCTAssertEqual ( connectionDelegate. hitConnectionClosed, 1 )
365
365
XCTAssertEqual ( httpBin. activeConnections, 0 )
366
366
}
367
+
368
+ // In order to test backpressure we need to make sure that reads will not happen
369
+ // until the backpressure promise is succeeded. Since we cannot guarantee when
370
+ // messages will be delivered to a client pipeline and we need this test to be
371
+ // fast (no waiting for arbitrary amounts of time), we do the following.
372
+ // First, we enforce NIO to send us only 1 byte at a time. Then we send a message
373
+ // of 4 bytes. This will guarantee that if we see first byte of the message, other
374
+ // bytes a ready to be read as well. This will allow us to test if subsequent reads
375
+ // are waiting for backpressure promise.
376
+ func testDownloadStreamingBackpressure( ) {
377
+ class BackpressureTestDelegate : HTTPClientResponseDelegate {
378
+ typealias Response = Void
379
+
380
+ var _reads = 0
381
+ var _channel : Channel ?
382
+
383
+ let lock : Lock
384
+ let backpressurePromise : EventLoopPromise < Void >
385
+ let messageReceived : EventLoopPromise < Void >
386
+
387
+ init ( eventLoop: EventLoop ) {
388
+ self . lock = Lock ( )
389
+ self . backpressurePromise = eventLoop. makePromise ( )
390
+ self . messageReceived = eventLoop. makePromise ( )
391
+ }
392
+
393
+ var reads : Int {
394
+ return self . lock. withLock {
395
+ self . _reads
396
+ }
397
+ }
398
+
399
+ func willExecuteOnChannel( _ channel: Channel ) {
400
+ self . lock. withLockVoid {
401
+ self . _channel = channel
402
+ }
403
+ }
404
+
405
+ func didReceiveHead( task: HTTPClient . Task < Void > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
406
+ return task. futureResult. eventLoop. makeSucceededVoidFuture ( )
407
+ }
408
+
409
+ func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
410
+ // We count a number of reads received.
411
+ self . lock. withLockVoid {
412
+ self . _reads += 1
413
+ }
414
+ // We need to notify the test when first byte of the message is arrived.
415
+ self . messageReceived. succeed ( ( ) )
416
+ return self . backpressurePromise. futureResult
417
+ }
418
+
419
+ func didFinishRequest( task: HTTPClient . Task < Response > ) throws { }
420
+ }
421
+
422
+ final class WriteAfterFutureSucceedsHandler : ChannelInboundHandler {
423
+ typealias InboundIn = HTTPServerRequestPart
424
+ typealias OutboundOut = HTTPServerResponsePart
425
+
426
+ let endFuture : EventLoopFuture < Void >
427
+
428
+ init ( endFuture: EventLoopFuture < Void > ) {
429
+ self . endFuture = endFuture
430
+ }
431
+
432
+ func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
433
+ switch self . unwrapInboundIn ( data) {
434
+ case . head:
435
+ let head = HTTPResponseHead ( version: HTTPVersion ( major: 1 , minor: 1 ) , status: . ok)
436
+ context. writeAndFlush ( wrapOutboundOut ( . head( head) ) , promise: nil )
437
+ case . body:
438
+ // ignore
439
+ break
440
+ case . end:
441
+ let buffer = context. channel. allocator. buffer ( string: " 1234 " )
442
+ context. writeAndFlush ( self . wrapOutboundOut ( . body( . byteBuffer( buffer) ) ) , promise: nil )
443
+
444
+ self . endFuture. hop ( to: context. eventLoop) . whenSuccess {
445
+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: nil )
446
+ }
447
+ }
448
+ }
449
+ }
450
+
451
+ let logger = Logger ( label: " test " )
452
+
453
+ // cannot test with NIOTS as `maxMessagesPerRead` is not supported
454
+ let eventLoopGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
455
+ defer { XCTAssertNoThrow ( try eventLoopGroup. syncShutdownGracefully ( ) ) }
456
+ let requestEventLoop = eventLoopGroup. next ( )
457
+ let backpressureDelegate = BackpressureTestDelegate ( eventLoop: requestEventLoop)
458
+
459
+ let httpBin = HTTPBin { _ in
460
+ WriteAfterFutureSucceedsHandler (
461
+ endFuture: backpressureDelegate. backpressurePromise. futureResult
462
+ )
463
+ }
464
+ defer { XCTAssertNoThrow ( try httpBin. shutdown ( ) ) }
465
+
466
+ var maybeChannel : Channel ?
467
+ XCTAssertNoThrow ( maybeChannel = try ClientBootstrap ( group: eventLoopGroup)
468
+ . channelOption ( ChannelOptions . maxMessagesPerRead, value: 1 )
469
+ . channelOption ( ChannelOptions . recvAllocator, value: FixedSizeRecvByteBufferAllocator ( capacity: 1 ) )
470
+ . connect ( host: " localhost " , port: httpBin. port)
471
+ . wait ( ) )
472
+ guard let channel = maybeChannel else { return XCTFail ( " Expected to have a channel at this point " ) }
473
+ let connectionDelegate = MockConnectionDelegate ( )
474
+ var maybeConnection : HTTP1Connection ?
475
+ XCTAssertNoThrow ( maybeConnection = try channel. eventLoop. submit { try HTTP1Connection . start (
476
+ channel: channel,
477
+ connectionID: 0 ,
478
+ delegate: connectionDelegate,
479
+ configuration: . init( ) ,
480
+ logger: logger
481
+ ) } . wait ( ) )
482
+ guard let connection = maybeConnection else { return XCTFail ( " Expected to have a connection at this point " ) }
483
+
484
+ var maybeRequestBag : RequestBag < BackpressureTestDelegate > ?
485
+
486
+ XCTAssertNoThrow ( maybeRequestBag = try RequestBag (
487
+ request: HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /custom " ) ,
488
+ eventLoopPreference: . delegate( on: requestEventLoop) ,
489
+ task: . init( eventLoop: requestEventLoop, logger: logger) ,
490
+ redirectHandler: nil ,
491
+ connectionDeadline: . now( ) + . seconds( 30 ) ,
492
+ requestOptions: . forTests( ) ,
493
+ delegate: backpressureDelegate
494
+ ) )
495
+ guard let requestBag = maybeRequestBag else { return XCTFail ( " Expected to be able to create a request bag " ) }
496
+ backpressureDelegate. willExecuteOnChannel ( connection. channel)
497
+
498
+ connection. executeRequest ( requestBag)
499
+
500
+ let requestFuture = requestBag. task. futureResult
501
+
502
+ // Send 4 bytes, but only one should be received until the backpressure promise is succeeded.
503
+
504
+ // Now we wait until message is delivered to client channel pipeline
505
+ XCTAssertNoThrow ( try backpressureDelegate. messageReceived. futureResult. wait ( ) )
506
+ XCTAssertEqual ( backpressureDelegate. reads, 1 )
507
+
508
+ // Succeed the backpressure promise.
509
+ backpressureDelegate. backpressurePromise. succeed ( ( ) )
510
+ XCTAssertNoThrow ( try requestFuture. wait ( ) )
511
+
512
+ // At this point all other bytes should be delivered.
513
+ XCTAssertEqual ( backpressureDelegate. reads, 4 )
514
+ }
367
515
}
368
516
369
517
class MockHTTP1ConnectionDelegate : HTTP1ConnectionDelegate {
0 commit comments