@@ -93,7 +93,9 @@ struct HTTPRequestStateMachine {
93
93
94
94
case sendRequestHead( HTTPRequestHead , startBody: Bool )
95
95
case sendBodyPart( IOData )
96
- case sendRequestEnd
96
+ /// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded,
97
+ /// as soon as we wrote the request end onto the wire. In this case the succeedRequest property is set.
98
+ case sendRequestEnd( succeedRequest: FinalStreamAction ? )
97
99
98
100
case pauseRequestBodyStream
99
101
case resumeRequestBodyStream
@@ -111,11 +113,9 @@ struct HTTPRequestStateMachine {
111
113
private var state : State = . initialized
112
114
113
115
private var isChannelWritable : Bool
114
- private let idleReadTimeout : TimeAmount ?
115
116
116
- init ( isChannelWritable: Bool , idleReadTimeout : TimeAmount ? ) {
117
+ init ( isChannelWritable: Bool ) {
117
118
self . isChannelWritable = isChannelWritable
118
- self . idleReadTimeout = idleReadTimeout
119
119
}
120
120
121
121
mutating func startRequest( head: HTTPRequestHead , metadata: RequestFramingMetadata ) -> Action {
@@ -301,8 +301,7 @@ struct HTTPRequestStateMachine {
301
301
switch self . state {
302
302
case . initialized,
303
303
. waitForChannelToBecomeWritable,
304
- . running( . endSent, _) ,
305
- . finished:
304
+ . running( . endSent, _) :
306
305
preconditionFailure ( " A request body stream end is only expected if we are in state request streaming. Invalid state: \( self . state) " )
307
306
308
307
case . running( . streaming( let expectedBodyLength, let sentBodyBytes, _) , . waitingForHead) :
@@ -313,7 +312,7 @@ struct HTTPRequestStateMachine {
313
312
}
314
313
315
314
self . state = . running( . endSent, . waitingForHead)
316
- return . sendRequestEnd
315
+ return . sendRequestEnd( succeedRequest : nil )
317
316
318
317
case . running( . streaming( let expectedBodyLength, let sentBodyBytes, _) , . receivingBody( let head, let streamState) ) :
319
318
assert ( head. status. code < 300 )
@@ -325,7 +324,7 @@ struct HTTPRequestStateMachine {
325
324
}
326
325
327
326
self . state = . running( . endSent, . receivingBody( head, streamState) )
328
- return . sendRequestEnd
327
+ return . sendRequestEnd( succeedRequest : nil )
329
328
330
329
case . running( . streaming( let expectedBodyLength, let sentBodyBytes, _) , . endReceived) :
331
330
if let expected = expectedBodyLength, expected != sentBodyBytes {
@@ -335,10 +334,22 @@ struct HTTPRequestStateMachine {
335
334
}
336
335
337
336
self . state = . finished
338
- return . succeedRequest( . none)
337
+ return . sendRequestEnd ( succeedRequest: . some ( . none) )
339
338
340
339
case . failed:
341
340
return . wait
341
+
342
+ case . finished:
343
+ // A request may be finished, before we have send all parts. This might be the case if
344
+ // the server responded with an HTTP status code that is equal or larger to 300
345
+ // (Redirection, Client Error or Server Error). In those cases we pause the request body
346
+ // stream as soon as we have received the response head and we succeed the request as
347
+ // when response end is received. This may mean, that we succeed a request, even though
348
+ // we have not sent all it's body parts.
349
+
350
+ // We may still receive something, here because of potential race conditions with the
351
+ // producing thread.
352
+ return . wait
342
353
}
343
354
}
344
355
@@ -376,23 +387,23 @@ struct HTTPRequestStateMachine {
376
387
// MARK: - Response
377
388
378
389
mutating func receivedHTTPResponseHead( _ head: HTTPResponseHead ) -> Action {
390
+ guard head. status. code >= 200 else {
391
+ // we ignore any leading 1xx headers... No state change needed.
392
+ return . wait
393
+ }
394
+
379
395
switch self . state {
380
396
case . initialized, . waitForChannelToBecomeWritable:
381
397
preconditionFailure ( " How can we receive a response head before sending a request head ourselves " )
382
398
383
399
case . running( . streaming( let expectedBodyLength, let sentBodyBytes, producer: . paused) , . waitingForHead) :
384
400
self . state = . running(
385
- . streaming( expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: . producing ) ,
401
+ . streaming( expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: . paused ) ,
386
402
. receivingBody( head, . downstreamIsConsuming( readPending: false ) )
387
403
)
388
404
return . forwardResponseHead( head, pauseRequestBodyStream: false )
389
405
390
406
case . running( . streaming( let expectedBodyLength, let sentBodyBytes, producer: . producing) , . waitingForHead) :
391
- guard head. status. code >= 200 else {
392
- // we ignore any leading 1xx headers... No state change needed.
393
- return . wait
394
- }
395
-
396
407
if head. status. code >= 300 {
397
408
self . state = . running(
398
409
. streaming( expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: . paused) ,
@@ -450,12 +461,25 @@ struct HTTPRequestStateMachine {
450
461
case . running( _, . waitingForHead) :
451
462
preconditionFailure ( " How can we receive a response end, if we haven't a received a head. Invalid state: \( self . state) " )
452
463
453
- case . running( . streaming( let expectedBodyLength, let sentBodyBytes, let producerState) , . receivingBody( let head, _ ) ) where head. status. code < 300 :
464
+ case . running( . streaming( let expectedBodyLength, let sentBodyBytes, let producerState) , . receivingBody( let head, let consumerState ) ) where head. status. code < 300 :
454
465
self . state = . running(
455
466
. streaming( expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState) ,
456
467
. endReceived
457
468
)
458
- return . wait
469
+
470
+ switch consumerState {
471
+ case . downstreamHasDemand, . downstreamIsConsuming( readPending: false ) :
472
+ return . wait
473
+ case . downstreamIsConsuming( readPending: true ) :
474
+ // If we have a received a read event before, we must ensure that the read event
475
+ // eventually gets onto the channel pipeline again. The end of the request gives
476
+ // us an opportunity for this clean up task.
477
+ // It is very unlikely that we can see this in the real world. If we have swallowed
478
+ // a read event we don't expect to receive further data from the channel incl.
479
+ // response ends.
480
+
481
+ return . read
482
+ }
459
483
460
484
case . running( . streaming( _, _, let producerState) , . receivingBody( let head, _) ) :
461
485
assert ( head. status. code >= 300 )
0 commit comments