@@ -364,6 +364,171 @@ class HTTP1ConnectionTests: XCTestCase {
364
364
XCTAssertEqual ( connectionDelegate. hitConnectionClosed, 1 )
365
365
XCTAssertEqual ( httpBin. activeConnections, 0 )
366
366
}
367
+
368
+ // In order to test backpressure we need to make sure that reads will not happen
369
+ // until the backpressure promise is succeeded. Since we cannot guarantee when
370
+ // messages will be delivered to a client pipeline and we need this test to be
371
+ // fast (no waiting for arbitrary amounts of time), we do the following.
372
+ // First, we enforce NIO to send us only 1 byte at a time. Then we send a message
373
+ // of 4 bytes. This will guarantee that if we see first byte of the message, other
374
+ // bytes a ready to be read as well. This will allow us to test if subsequent reads
375
+ // are waiting for backpressure promise.
376
+ func testDownloadStreamingBackpressure( ) {
377
+ class BackpressureTestDelegate : HTTPClientResponseDelegate {
378
+ typealias Response = Void
379
+
380
+ var _reads = 0
381
+ var _channel : Channel ?
382
+
383
+ let lock : Lock
384
+ let backpressurePromise : EventLoopPromise < Void >
385
+ let optionsApplied : EventLoopPromise < Void >
386
+ let messageReceived : EventLoopPromise < Void >
387
+
388
+ init ( eventLoop: EventLoop ) {
389
+ self . lock = Lock ( )
390
+ self . backpressurePromise = eventLoop. makePromise ( )
391
+ self . optionsApplied = eventLoop. makePromise ( )
392
+ self . messageReceived = eventLoop. makePromise ( )
393
+ }
394
+
395
+ var reads : Int {
396
+ return self . lock. withLock {
397
+ self . _reads
398
+ }
399
+ }
400
+
401
+ func willExecuteOnChannel( _ channel: Channel ) {
402
+ self . lock. withLockVoid {
403
+ self . _channel = channel
404
+ }
405
+ }
406
+
407
+ func didReceiveHead( task: HTTPClient . Task < Void > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
408
+ // This is to force NIO to send only 1 byte at a time.
409
+ guard let channel = self . lock. withLock ( { self . _channel } ) else {
410
+ preconditionFailure ( " Expected to have a channel at this point " )
411
+ }
412
+
413
+ let future = channel. setOption ( ChannelOptions . maxMessagesPerRead, value: 1 ) . flatMap {
414
+ channel. setOption ( ChannelOptions . recvAllocator, value: FixedSizeRecvByteBufferAllocator ( capacity: 1 ) )
415
+ }
416
+ future. cascade ( to: self . optionsApplied)
417
+ return future
418
+ }
419
+
420
+ func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
421
+ // We count a number of reads received.
422
+ self . lock. withLockVoid {
423
+ self . _reads += 1
424
+ }
425
+ // We need to notify the test when first byte of the message is arrived.
426
+ self . messageReceived. succeed ( ( ) )
427
+ return self . backpressurePromise. futureResult
428
+ }
429
+
430
+ func didFinishRequest( task: HTTPClient . Task < Response > ) throws { }
431
+ }
432
+
433
+ final class WriteAfterFutureSucceedsHandler : ChannelInboundHandler {
434
+ typealias InboundIn = HTTPServerRequestPart
435
+ typealias OutboundOut = HTTPServerResponsePart
436
+
437
+ let bodyFuture : EventLoopFuture < Void >
438
+ let endFuture : EventLoopFuture < Void >
439
+
440
+ init ( bodyFuture: EventLoopFuture < Void > , endFuture: EventLoopFuture < Void > ) {
441
+ self . bodyFuture = bodyFuture
442
+ self . endFuture = endFuture
443
+ }
444
+
445
+ func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
446
+ switch self . unwrapInboundIn ( data) {
447
+ case . head:
448
+ let head = HTTPResponseHead ( version: HTTPVersion ( major: 1 , minor: 1 ) , status: . ok)
449
+ context. writeAndFlush ( wrapOutboundOut ( . head( head) ) , promise: nil )
450
+ case . body:
451
+ // ignore
452
+ break
453
+ case . end:
454
+ self . bodyFuture. hop ( to: context. eventLoop) . whenSuccess {
455
+ let buffer = context. channel. allocator. buffer ( string: " 1234 " )
456
+ context. writeAndFlush ( self . wrapOutboundOut ( . body( . byteBuffer( buffer) ) ) , promise: nil )
457
+ }
458
+
459
+ self . endFuture. hop ( to: context. eventLoop) . whenSuccess {
460
+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: nil )
461
+ }
462
+ }
463
+ }
464
+ }
465
+
466
+ let logger = Logger ( label: " test " )
467
+
468
+ // cannot test with NIOTS as `maxMessagesPerRead` is not supported
469
+ let eventLoopGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
470
+ defer { XCTAssertNoThrow ( try eventLoopGroup. syncShutdownGracefully ( ) ) }
471
+ let requestEventLoop = eventLoopGroup. next ( )
472
+ let backpressureDelegate = BackpressureTestDelegate ( eventLoop: requestEventLoop)
473
+
474
+ let httpBin = HTTPBin { _ in
475
+ WriteAfterFutureSucceedsHandler (
476
+ bodyFuture: backpressureDelegate. optionsApplied. futureResult,
477
+ endFuture: backpressureDelegate. backpressurePromise. futureResult
478
+ )
479
+ }
480
+ defer { XCTAssertNoThrow ( try httpBin. shutdown ( ) ) }
481
+
482
+ var maybeChannel : Channel ?
483
+ XCTAssertNoThrow ( maybeChannel = try ClientBootstrap ( group: eventLoopGroup)
484
+ . connect ( host: " localhost " , port: httpBin. port)
485
+ . wait ( ) )
486
+ guard let channel = maybeChannel else { return XCTFail ( " Expected to have a channel at this point " ) }
487
+ let connectionDelegate = MockConnectionDelegate ( )
488
+ var maybeConnection : HTTP1Connection ?
489
+ XCTAssertNoThrow ( maybeConnection = try channel. eventLoop. submit { try HTTP1Connection . start (
490
+ channel: channel,
491
+ connectionID: 0 ,
492
+ delegate: connectionDelegate,
493
+ configuration: . init( ) ,
494
+ logger: logger
495
+ ) } . wait ( ) )
496
+ guard let connection = maybeConnection else { return XCTFail ( " Expected to have a connection at this point " ) }
497
+
498
+ var maybeRequestBag : RequestBag < BackpressureTestDelegate > ?
499
+
500
+ XCTAssertNoThrow ( maybeRequestBag = try RequestBag (
501
+ request: HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /custom " ) ,
502
+ eventLoopPreference: . delegate( on: requestEventLoop) ,
503
+ task: . init( eventLoop: requestEventLoop, logger: logger) ,
504
+ redirectHandler: nil ,
505
+ connectionDeadline: . now( ) + . seconds( 30 ) ,
506
+ requestOptions: . forTests( ) ,
507
+ delegate: backpressureDelegate
508
+ ) )
509
+ guard let requestBag = maybeRequestBag else { return XCTFail ( " Expected to be able to create a request bag " ) }
510
+ backpressureDelegate. willExecuteOnChannel ( connection. channel)
511
+
512
+ connection. executeRequest ( requestBag)
513
+
514
+ let requestFuture = requestBag. task. futureResult
515
+
516
+ // We need to wait for channel options that limit NIO to sending only one byte at a time.
517
+ XCTAssertNoThrow ( try backpressureDelegate. optionsApplied. futureResult. wait ( ) )
518
+
519
+ // Send 4 bytes, but only one should be received until the backpressure promise is succeeded.
520
+
521
+ // Now we wait until message is delivered to client channel pipeline
522
+ XCTAssertNoThrow ( try backpressureDelegate. messageReceived. futureResult. wait ( ) )
523
+ XCTAssertEqual ( backpressureDelegate. reads, 1 )
524
+
525
+ // Succeed the backpressure promise.
526
+ backpressureDelegate. backpressurePromise. succeed ( ( ) )
527
+ XCTAssertNoThrow ( try requestFuture. wait ( ) )
528
+
529
+ // At this point all other bytes should be delivered.
530
+ XCTAssertEqual ( backpressureDelegate. reads, 4 )
531
+ }
367
532
}
368
533
369
534
class MockHTTP1ConnectionDelegate : HTTP1ConnectionDelegate {
0 commit comments