@@ -2051,4 +2051,59 @@ class HTTPClientTests: XCTestCase {
2051
2051
2052
2052
XCTAssertNoThrow ( try future. wait ( ) )
2053
2053
}
2054
+
2055
+ func testBackpressue( ) {
2056
+ class BackpressureResponseDelegate : HTTPClientResponseDelegate {
2057
+ typealias Response = Void
2058
+ var count = 0
2059
+ var processingBodyPart = false
2060
+ var didntWait = false
2061
+ var lock = Lock ( )
2062
+
2063
+ init ( ) { }
2064
+
2065
+ func didReceiveHead( task: HTTPClient . Task < Response > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
2066
+ return task. eventLoop. makeSucceededFuture ( ( ) )
2067
+ }
2068
+
2069
+ func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ part: ByteBuffer ) -> EventLoopFuture < Void > {
2070
+ lock. withLock {
2071
+ // if processingBodyPart is true then previous body part is still being processed
2072
+ // XCTAssertEqual doesn't work here so store result to test later
2073
+ if processingBodyPart == true {
2074
+ didntWait = true
2075
+ }
2076
+ processingBodyPart = true
2077
+ count += 1
2078
+ }
2079
+ // wait one second before returning a successful future
2080
+ return task. eventLoop. scheduleTask ( in: . milliseconds( 1000 ) ) {
2081
+ self . lock. withLock {
2082
+ self . processingBodyPart = false
2083
+ self . count -= 1
2084
+ }
2085
+ } . futureResult
2086
+ }
2087
+
2088
+ func didReceiveError( task: HTTPClient . Task < Response > , _ error: Error ) { }
2089
+ func didFinishRequest( task: HTTPClient . Task < Response > ) throws { }
2090
+ }
2091
+
2092
+ let elg = MultiThreadedEventLoopGroup ( numberOfThreads: 5 )
2093
+ let client = HTTPClient ( eventLoopGroupProvider: . shared( elg) )
2094
+ defer {
2095
+ XCTAssertNoThrow ( try client. syncShutdown ( ) )
2096
+ XCTAssertNoThrow ( try elg. syncShutdownGracefully ( ) )
2097
+ }
2098
+
2099
+ let data = Data ( count: 65273 )
2100
+ let backpressureResponseDelegate = BackpressureResponseDelegate ( )
2101
+ guard let request = try ? HTTPClient . Request ( url: self . defaultHTTPBinURLPrefix + " get " , body: . data( data) ) else {
2102
+ XCTFail ( " Failed to init Request " )
2103
+ return
2104
+ }
2105
+ XCTAssertNoThrow ( try client. execute ( request: request, delegate: backpressureResponseDelegate) . wait ( ) )
2106
+ XCTAssertEqual ( backpressureResponseDelegate. didntWait, false )
2107
+ XCTAssertEqual ( backpressureResponseDelegate. count, 0 )
2108
+ }
2054
2109
}
0 commit comments