@@ -66,7 +66,7 @@ final class RequestBagTests: XCTestCase {
66
66
)
67
67
XCTAssert ( bag. task. eventLoop === embeddedEventLoop)
68
68
69
- let executor = MockRequestExecutor ( )
69
+ let executor = MockRequestExecutor ( pauseRequestBodyPartStreamAfterASingleWrite : true )
70
70
71
71
bag. willExecuteRequest ( executor)
72
72
@@ -294,6 +294,71 @@ final class RequestBagTests: XCTestCase {
294
294
XCTAssertEqual ( $0 as? HTTPClientError , . cancelled)
295
295
}
296
296
}
297
+
298
+ func testHTTPUploadIsCancelledEvenThoughRequestSucceeds( ) {
299
+ let embeddedEventLoop = EmbeddedEventLoop ( )
300
+ defer { XCTAssertNoThrow ( try embeddedEventLoop. syncShutdownGracefully ( ) ) }
301
+ let logger = Logger ( label: " test " )
302
+
303
+ var maybeRequest : HTTPClient . Request ?
304
+ let writeSecondPartPromise = embeddedEventLoop. makePromise ( of: Void . self)
305
+
306
+ XCTAssertNoThrow ( maybeRequest = try HTTPClient . Request (
307
+ url: " https://swift.org " ,
308
+ method: . POST,
309
+ headers: [ " content-length " : " 12 " ] ,
310
+ body: . stream( length: 12 ) { writer -> EventLoopFuture < Void > in
311
+ var firstWriteSuccess = false
312
+ return writer. write ( . byteBuffer( . init( bytes: 0 ... 3 ) ) ) . flatMap { _ in
313
+ firstWriteSuccess = true
314
+
315
+ return writeSecondPartPromise. futureResult
316
+ } . flatMap {
317
+ return writer. write ( . byteBuffer( . init( bytes: 4 ... 7 ) ) )
318
+ } . always { result in
319
+ XCTAssertTrue ( firstWriteSuccess)
320
+
321
+ guard case . failure( let error) = result else {
322
+ return XCTFail ( " Expected the second write to fail " )
323
+ }
324
+ XCTAssertEqual ( error as? HTTPClientError , . requestStreamCancelled)
325
+ }
326
+ }
327
+ ) )
328
+ guard let request = maybeRequest else { return XCTFail ( " Expected to have a request " ) }
329
+
330
+ let delegate = UploadCountingDelegate ( eventLoop: embeddedEventLoop)
331
+ let bag = RequestBag (
332
+ request: request,
333
+ eventLoopPreference: . delegate( on: embeddedEventLoop) ,
334
+ task: . init( eventLoop: embeddedEventLoop, logger: logger) ,
335
+ redirectHandler: nil ,
336
+ connectionDeadline: . now( ) + . seconds( 30 ) ,
337
+ idleReadTimeout: nil ,
338
+ delegate: delegate
339
+ )
340
+
341
+ let executor = MockRequestExecutor ( )
342
+ bag. willExecuteRequest ( executor)
343
+
344
+ XCTAssertEqual ( delegate. hitDidSendRequestHead, 0 )
345
+ XCTAssertEqual ( delegate. hitDidSendRequest, 0 )
346
+ bag. requestHeadSent ( )
347
+ XCTAssertEqual ( delegate. hitDidSendRequestHead, 1 )
348
+ XCTAssertEqual ( delegate. hitDidSendRequest, 0 )
349
+
350
+ bag. resumeRequestBodyStream ( )
351
+ XCTAssertEqual ( executor. nextBodyPart ( ) , . body( . byteBuffer( . init( bytes: 0 ... 3 ) ) ) )
352
+ // receive a 301 response immediately.
353
+ bag. receiveResponseHead ( . init( version: . http1_1, status: . movedPermanently) )
354
+ bag. succeedRequest ( . init( ) )
355
+
356
+ // if we now write our second part of the response this should fail the backpressure promise
357
+ writeSecondPartPromise. succeed ( ( ) )
358
+
359
+ XCTAssertEqual ( delegate. receivedHead? . status, . movedPermanently)
360
+ XCTAssertNoThrow ( try bag. task. futureResult. wait ( ) )
361
+ }
297
362
}
298
363
299
364
class MockRequestExecutor : HTTPRequestExecutor {
@@ -302,11 +367,15 @@ class MockRequestExecutor: HTTPRequestExecutor {
302
367
case endOfStream
303
368
}
304
369
370
+ let pauseRequestBodyPartStreamAfterASingleWrite : Bool
371
+
305
372
private( set) var requestBodyParts = CircularBuffer < RequestParts > ( )
306
373
private( set) var isCancelled : Bool = false
307
374
private( set) var signalledDemandForResponseBody : Bool = false
308
375
309
- init ( ) { }
376
+ init ( pauseRequestBodyPartStreamAfterASingleWrite: Bool = false ) {
377
+ self . pauseRequestBodyPartStreamAfterASingleWrite = pauseRequestBodyPartStreamAfterASingleWrite
378
+ }
310
379
311
380
func nextBodyPart( ) -> RequestParts ? {
312
381
guard !self . requestBodyParts. isEmpty else { return nil }
@@ -321,7 +390,7 @@ class MockRequestExecutor: HTTPRequestExecutor {
321
390
// data is already scheduled. If we call pause here, once, after the second call new subsequent
322
391
// calls should not be scheduled.
323
392
func writeRequestBodyPart( _ part: IOData , request: HTTPExecutingRequest ) {
324
- if self . requestBodyParts. isEmpty {
393
+ if self . requestBodyParts. isEmpty, self . pauseRequestBodyPartStreamAfterASingleWrite {
325
394
request. pauseRequestBodyStream ( )
326
395
}
327
396
self . requestBodyParts. append ( . body( part) )
0 commit comments