@@ -37,6 +37,17 @@ class HTTP2ClientTests: XCTestCase {
37
37
)
38
38
}
39
39
40
+ func makeClientWithActiveHTTP2Connection< RequestHandler> (
41
+ to bin: HTTPBin < RequestHandler >
42
+ ) -> HTTPClient {
43
+ let client = self . makeDefaultHTTPClient ( )
44
+ var response : HTTPClient . Response ?
45
+ XCTAssertNoThrow ( response = try client. get ( url: " https://localhost: \( bin. port) /get " ) . wait ( ) )
46
+ XCTAssertEqual ( . ok, response? . status)
47
+ XCTAssertEqual ( response? . version, . http2)
48
+ return client
49
+ }
50
+
40
51
func testSimpleGet( ) {
41
52
let bin = HTTPBin ( . http2( compress: false ) )
42
53
defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
@@ -67,7 +78,7 @@ class HTTP2ClientTests: XCTestCase {
67
78
func testConcurrentRequestsFromDifferentThreads( ) {
68
79
let bin = HTTPBin ( . http2( compress: false ) )
69
80
defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
70
- let client = self . makeDefaultHTTPClient ( )
81
+ let client = self . makeClientWithActiveHTTP2Connection ( to : bin )
71
82
defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
72
83
let numberOfWorkers = 20
73
84
let numberOfRequestsPerWorkers = 20
@@ -92,7 +103,7 @@ class HTTP2ClientTests: XCTestCase {
92
103
93
104
for _ in 0 ..< numberOfRequestsPerWorkers {
94
105
var response : HTTPClient . Response ?
95
- XCTAssertNoThrow ( response = try client. get ( url: url ) . wait ( ) )
106
+ XCTAssertNoThrow ( response = try client. get ( url: " https://localhost: \( bin . port ) /get " ) . wait ( ) )
96
107
97
108
XCTAssertEqual ( . ok, response? . status)
98
109
XCTAssertEqual ( response? . version, . http2)
@@ -187,4 +198,167 @@ class HTTP2ClientTests: XCTestCase {
187
198
// all workers should be running, let's wait for them to finish
188
199
allDone. wait ( )
189
200
}
201
+
202
+ func testUncleanShutdownCancelsExecutingAndQueuedTasks( ) {
203
+ let bin = HTTPBin ( . http2( compress: false ) )
204
+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
205
+ let client = self . makeClientWithActiveHTTP2Connection ( to: bin)
206
+
207
+ // start 20 requests which are guaranteed to never get any response
208
+ // 10 of them will executed and the other 10 will be queued
209
+ // because HTTPBin has a default `maxConcurrentStreams` limit of 10
210
+ let responses = ( 0 ..< 20 ) . map { _ in
211
+ client. get ( url: " https://localhost: \( bin. port) /wait " )
212
+ }
213
+
214
+ XCTAssertNoThrow ( try client. syncShutdown ( ) )
215
+
216
+ var results : [ Result < HTTPClient . Response , Error > ] = [ ]
217
+ XCTAssertNoThrow ( results = try EventLoopFuture
218
+ . whenAllComplete ( responses, on: client. eventLoopGroup. next ( ) )
219
+ . timeout ( after: . seconds( 2 ) )
220
+ . wait ( ) )
221
+
222
+ for result in results {
223
+ switch result {
224
+ case . success:
225
+ XCTFail ( " Shouldn't succeed " )
226
+ case . failure( let error) :
227
+ if let clientError = error as? HTTPClientError , clientError == . cancelled {
228
+ continue
229
+ } else {
230
+ XCTFail ( " Unexpected error: \( error) " )
231
+ }
232
+ }
233
+ }
234
+ }
235
+
236
+ func testCancelingRunningRequest( ) {
237
+ let bin = HTTPBin ( . http2( compress: false ) )
238
+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
239
+ let client = self . makeClientWithActiveHTTP2Connection ( to: bin)
240
+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
241
+
242
+ var maybeRequest : HTTPClient . Request ?
243
+ XCTAssertNoThrow ( maybeRequest = try HTTPClient . Request ( url: " https://localhost: \( bin. port) /sendheaderandwait " ) )
244
+ guard let request = maybeRequest else { return }
245
+
246
+ var task : HTTPClient . Task < TestHTTPDelegate . Response > !
247
+ let delegate = TestHTTPDelegate ( )
248
+ delegate. stateDidChangeCallback = { state in
249
+ guard case . head = state else { return }
250
+ // request is definitely running because we just received a head from the server
251
+ task. cancel ( )
252
+ }
253
+ task = client. execute (
254
+ request: request,
255
+ delegate: delegate
256
+ )
257
+
258
+ XCTAssertThrowsError ( try task. futureResult. timeout ( after: . seconds( 2 ) ) . wait ( ) , " Should fail " ) { error in
259
+ guard case let error = error as? HTTPClientError , error == . cancelled else {
260
+ return XCTFail ( " Should fail with cancelled " )
261
+ }
262
+ }
263
+ }
264
+
265
+ func testStressCancelingRunningRequestFromDifferentThreads( ) {
266
+ let bin = HTTPBin ( . http2( compress: false ) )
267
+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
268
+ let client = self . makeClientWithActiveHTTP2Connection ( to: bin)
269
+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
270
+ let cancelPool = MultiThreadedEventLoopGroup ( numberOfThreads: 10 )
271
+ defer { XCTAssertNoThrow ( try cancelPool. syncShutdownGracefully ( ) ) }
272
+
273
+ var maybeRequest : HTTPClient . Request ?
274
+ XCTAssertNoThrow ( maybeRequest = try HTTPClient . Request ( url: " https://localhost: \( bin. port) /sendheaderandwait " ) )
275
+ guard let request = maybeRequest else { return }
276
+
277
+ let tasks = ( 0 ..< 100 ) . map { _ -> HTTPClient . Task < TestHTTPDelegate . Response > in
278
+ var task : HTTPClient . Task < TestHTTPDelegate . Response > !
279
+ let delegate = TestHTTPDelegate ( )
280
+ delegate. stateDidChangeCallback = { state in
281
+ guard case . head = state else { return }
282
+ // request is definitely running because we just received a head from the server
283
+ cancelPool. next ( ) . execute {
284
+ // canceling from a different thread
285
+ task. cancel ( )
286
+ }
287
+ }
288
+ task = client. execute (
289
+ request: request,
290
+ delegate: delegate
291
+ )
292
+ return task
293
+ }
294
+
295
+ for task in tasks {
296
+ XCTAssertThrowsError ( try task. futureResult. timeout ( after: . seconds( 2 ) ) . wait ( ) , " Should fail " ) { error in
297
+ guard case let error = error as? HTTPClientError , error == . cancelled else {
298
+ return XCTFail ( " Should fail with cancelled " )
299
+ }
300
+ }
301
+ }
302
+ }
303
+
304
+ func testPlatformConnectErrorIsForwardedOnTimeout( ) {
305
+ let bin = HTTPBin ( . http2( compress: false ) )
306
+
307
+ let clientGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 2 )
308
+ let el1 = clientGroup. next ( )
309
+ let el2 = clientGroup. next ( )
310
+ defer { XCTAssertNoThrow ( try clientGroup. syncShutdownGracefully ( ) ) }
311
+ var tlsConfig = TLSConfiguration . makeClientConfiguration ( )
312
+ tlsConfig. certificateVerification = . none
313
+ let client = HTTPClient (
314
+ eventLoopGroupProvider: . shared( clientGroup) ,
315
+ configuration: HTTPClient . Configuration (
316
+ tlsConfiguration: tlsConfig,
317
+ timeout: . init( connect: . milliseconds( 1000 ) ) ,
318
+ httpVersion: . automatic
319
+ ) ,
320
+ backgroundActivityLogger: Logger ( label: " HTTPClient " , factory: StreamLogHandler . standardOutput ( label: ) )
321
+ )
322
+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
323
+
324
+ var maybeRequest1 : HTTPClient . Request ?
325
+ XCTAssertNoThrow ( maybeRequest1 = try HTTPClient . Request ( url: " https://localhost: \( bin. port) /get " ) )
326
+ guard let request1 = maybeRequest1 else { return }
327
+
328
+ let task1 = client. execute ( request: request1, delegate: ResponseAccumulator ( request: request1) , eventLoop: . delegateAndChannel( on: el1) )
329
+ var response1 : ResponseAccumulator . Response ?
330
+ XCTAssertNoThrow ( response1 = try task1. wait ( ) )
331
+
332
+ XCTAssertEqual ( . ok, response1? . status)
333
+ XCTAssertEqual ( response1? . version, . http2)
334
+ let serverPort = bin. port
335
+ XCTAssertNoThrow ( try bin. shutdown ( ) )
336
+ // client is now in HTTP/2 state and the HTTPBin is closed
337
+ // start a new server on the old port which closes all connections immediately
338
+ let serverGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
339
+ defer { XCTAssertNoThrow ( try serverGroup. syncShutdownGracefully ( ) ) }
340
+ var maybeServer : Channel ?
341
+ XCTAssertNoThrow ( maybeServer = try ServerBootstrap ( group: serverGroup)
342
+ . serverChannelOption ( ChannelOptions . socketOption ( . so_reuseaddr) , value: 1 )
343
+ . childChannelInitializer { channel in
344
+ channel. close ( )
345
+ }
346
+ . childChannelOption ( ChannelOptions . socketOption ( . so_reuseaddr) , value: 1 )
347
+ . bind ( host: " 0.0.0.0 " , port: serverPort)
348
+ . wait ( ) )
349
+ guard let server = maybeServer else { return }
350
+ defer { XCTAssertNoThrow ( try server. close ( ) . wait ( ) ) }
351
+
352
+ var maybeRequest2 : HTTPClient . Request ?
353
+ XCTAssertNoThrow ( maybeRequest2 = try HTTPClient . Request ( url: " https://localhost: \( serverPort) / " ) )
354
+ guard let request2 = maybeRequest2 else { return }
355
+
356
+ let task2 = client. execute ( request: request2, delegate: ResponseAccumulator ( request: request2) , eventLoop: . delegateAndChannel( on: el2) )
357
+ XCTAssertThrowsError ( try task2. wait ( ) ) { error in
358
+ XCTAssertNil (
359
+ error as? HTTPClientError ,
360
+ " error should be some platform specific error that the connection is closed/reset by the other side "
361
+ )
362
+ }
363
+ }
190
364
}
0 commit comments