@@ -31,6 +31,10 @@ extension RequestBag {
31
31
fileprivate enum State {
32
32
case initialized
33
33
case queued( HTTPRequestScheduler )
34
+ /// if the deadline was exceeded while in the `.queued(_:)` state,
35
+ /// we wait until the request pool fails the request with a potential more descriptive error message,
36
+ /// if a connection failure has occured while the request was queued.
37
+ case deadlineExceededWhileQueued
34
38
case executing( HTTPRequestExecutor , RequestStreamState , ResponseStreamState )
35
39
case finished( error: Error ? )
36
40
case redirected( HTTPRequestExecutor , Int , HTTPResponseHead , URL )
@@ -90,13 +94,23 @@ extension RequestBag.StateMachine {
90
94
self . state = . queued( scheduler)
91
95
}
92
96
93
- mutating func willExecuteRequest( _ executor: HTTPRequestExecutor ) -> Bool {
97
+ enum WillExecuteRequestAction {
98
+ case cancelExecuter( HTTPRequestExecutor )
99
+ case failTaskAndCancelExecutor( Error , HTTPRequestExecutor )
100
+ case none
101
+ }
102
+
103
+ mutating func willExecuteRequest( _ executor: HTTPRequestExecutor ) -> WillExecuteRequestAction {
94
104
switch self . state {
95
105
case . initialized, . queued:
96
106
self . state = . executing( executor, . initialized, . initialized)
97
- return true
107
+ return . none
108
+ case . deadlineExceededWhileQueued:
109
+ let error : Error = HTTPClientError . deadlineExceeded
110
+ self . state = . finished( error: error)
111
+ return . failTaskAndCancelExecutor( error, executor)
98
112
case . finished( error: . some) :
99
- return false
113
+ return . cancelExecuter ( executor )
100
114
case . executing, . redirected, . finished( error: . none) , . modifying:
101
115
preconditionFailure ( " Invalid state: \( self . state) " )
102
116
}
@@ -110,7 +124,7 @@ extension RequestBag.StateMachine {
110
124
111
125
mutating func resumeRequestBodyStream( ) -> ResumeProducingAction {
112
126
switch self . state {
113
- case . initialized, . queued:
127
+ case . initialized, . queued, . deadlineExceededWhileQueued :
114
128
preconditionFailure ( " A request stream can only be resumed, if the request was started " )
115
129
116
130
case . executing( let executor, . initialized, . initialized) :
@@ -150,7 +164,7 @@ extension RequestBag.StateMachine {
150
164
151
165
mutating func pauseRequestBodyStream( ) {
152
166
switch self . state {
153
- case . initialized, . queued:
167
+ case . initialized, . queued, . deadlineExceededWhileQueued :
154
168
preconditionFailure ( " A request stream can only be paused, if the request was started " )
155
169
case . executing( let executor, let requestState, let responseState) :
156
170
switch requestState {
@@ -185,7 +199,7 @@ extension RequestBag.StateMachine {
185
199
186
200
mutating func writeNextRequestPart( _ part: IOData , taskEventLoop: EventLoop ) -> WriteAction {
187
201
switch self . state {
188
- case . initialized, . queued:
202
+ case . initialized, . queued, . deadlineExceededWhileQueued :
189
203
preconditionFailure ( " Invalid state: \( self . state) " )
190
204
case . executing( let executor, let requestState, let responseState) :
191
205
switch requestState {
@@ -231,7 +245,7 @@ extension RequestBag.StateMachine {
231
245
232
246
mutating func finishRequestBodyStream( _ result: Result < Void , Error > ) -> FinishAction {
233
247
switch self . state {
234
- case . initialized, . queued:
248
+ case . initialized, . queued, . deadlineExceededWhileQueued :
235
249
preconditionFailure ( " Invalid state: \( self . state) " )
236
250
case . executing( let executor, let requestState, let responseState) :
237
251
switch requestState {
@@ -282,7 +296,7 @@ extension RequestBag.StateMachine {
282
296
/// - Returns: Whether the response should be forwarded to the delegate. Will be `false` if the request follows a redirect.
283
297
mutating func receiveResponseHead( _ head: HTTPResponseHead ) -> ReceiveResponseHeadAction {
284
298
switch self . state {
285
- case . initialized, . queued:
299
+ case . initialized, . queued, . deadlineExceededWhileQueued :
286
300
preconditionFailure ( " How can we receive a response, if the request hasn't started yet. " )
287
301
case . executing( let executor, let requestState, let responseState) :
288
302
guard case . initialized = responseState else {
@@ -328,7 +342,7 @@ extension RequestBag.StateMachine {
328
342
329
343
mutating func receiveResponseBodyParts( _ buffer: CircularBuffer < ByteBuffer > ) -> ReceiveResponseBodyAction {
330
344
switch self . state {
331
- case . initialized, . queued:
345
+ case . initialized, . queued, . deadlineExceededWhileQueued :
332
346
preconditionFailure ( " How can we receive a response body part, if the request hasn't started yet. " )
333
347
case . executing( _, _, . initialized) :
334
348
preconditionFailure ( " If we receive a response body, we must have received a head before " )
@@ -385,7 +399,7 @@ extension RequestBag.StateMachine {
385
399
386
400
mutating func succeedRequest( _ newChunks: CircularBuffer < ByteBuffer > ? ) -> ReceiveResponseEndAction {
387
401
switch self . state {
388
- case . initialized, . queued:
402
+ case . initialized, . queued, . deadlineExceededWhileQueued :
389
403
preconditionFailure ( " How can we receive a response body part, if the request hasn't started yet. " )
390
404
case . executing( _, _, . initialized) :
391
405
preconditionFailure ( " If we receive a response body, we must have received a head before " )
@@ -447,7 +461,7 @@ extension RequestBag.StateMachine {
447
461
448
462
private mutating func failWithConsumptionError( _ error: Error ) -> ConsumeAction {
449
463
switch self . state {
450
- case . initialized, . queued:
464
+ case . initialized, . queued, . deadlineExceededWhileQueued :
451
465
preconditionFailure ( " Invalid state: \( self . state) " )
452
466
case . executing( _, _, . initialized) :
453
467
preconditionFailure ( " Invalid state: Must have received response head, before this method is called for the first time " )
@@ -482,7 +496,7 @@ extension RequestBag.StateMachine {
482
496
483
497
private mutating func consumeMoreBodyData( ) -> ConsumeAction {
484
498
switch self . state {
485
- case . initialized, . queued:
499
+ case . initialized, . queued, . deadlineExceededWhileQueued :
486
500
preconditionFailure ( " Invalid state: \( self . state) " )
487
501
488
502
case . executing( _, _, . initialized) :
@@ -532,8 +546,33 @@ extension RequestBag.StateMachine {
532
546
}
533
547
}
534
548
549
+ enum DeadlineExceededAction {
550
+ case cancelScheduler( HTTPRequestScheduler ? )
551
+ case fail( FailAction )
552
+ }
553
+
554
+ mutating func deadlineExceeded( ) -> DeadlineExceededAction {
555
+ switch self . state {
556
+ case . queued( let queuer) :
557
+ /// We do not fail the request immediately because we want to give the scheduler a chance of throwing a better error message
558
+ /// We therefore depend on the scheduler failing the request after we cancel the request.
559
+ self . state = . deadlineExceededWhileQueued
560
+ return . cancelScheduler( queuer)
561
+
562
+ case . initialized,
563
+ . deadlineExceededWhileQueued,
564
+ . executing,
565
+ . finished,
566
+ . redirected,
567
+ . modifying:
568
+ /// if we are not in the queued state, we can fail early by just calling down to `self.fail(_:)`
569
+ /// which does the appropriate state transition for us.
570
+ return . fail( self . fail ( HTTPClientError . deadlineExceeded) )
571
+ }
572
+ }
573
+
535
574
enum FailAction {
536
- case failTask( HTTPRequestScheduler ? , HTTPRequestExecutor ? )
575
+ case failTask( Error , HTTPRequestScheduler ? , HTTPRequestExecutor ? )
537
576
case cancelExecutor( HTTPRequestExecutor )
538
577
case none
539
578
}
@@ -542,31 +581,39 @@ extension RequestBag.StateMachine {
542
581
switch self . state {
543
582
case . initialized:
544
583
self . state = . finished( error: error)
545
- return . failTask( nil , nil )
584
+ return . failTask( error , nil , nil )
546
585
case . queued( let queuer) :
547
586
self . state = . finished( error: error)
548
- return . failTask( queuer, nil )
587
+ return . failTask( error , queuer, nil )
549
588
case . executing( let executor, let requestState, . buffering( _, next: . eof) ) :
550
589
self . state = . executing( executor, requestState, . buffering( . init( ) , next: . error( error) ) )
551
590
return . cancelExecutor( executor)
552
591
case . executing( let executor, _, . buffering( _, next: . askExecutorForMore) ) :
553
592
self . state = . finished( error: error)
554
- return . failTask( nil , executor)
593
+ return . failTask( error , nil , executor)
555
594
case . executing( let executor, _, . buffering( _, next: . error( _) ) ) :
556
595
// this would override another error, let's keep the first one
557
596
return . cancelExecutor( executor)
558
597
case . executing( let executor, _, . initialized) :
559
598
self . state = . finished( error: error)
560
- return . failTask( nil , executor)
599
+ return . failTask( error , nil , executor)
561
600
case . executing( let executor, _, . waitingForRemote) :
562
601
self . state = . finished( error: error)
563
- return . failTask( nil , executor)
602
+ return . failTask( error , nil , executor)
564
603
case . redirected:
565
604
self . state = . finished( error: error)
566
- return . failTask( nil , nil )
605
+ return . failTask( error , nil , nil )
567
606
case . finished( . none) :
568
607
// An error occurred after the request has finished. Ignore...
569
608
return . none
609
+ case . deadlineExceededWhileQueued:
610
+ // if we just get a `HTTPClientError.cancelled` we can use the original cancellation reason
611
+ // to give a more descriptive error to the user.
612
+ if ( error as? HTTPClientError ) == . cancelled {
613
+ return . failTask( HTTPClientError . deadlineExceeded, nil , nil )
614
+ }
615
+ // otherwise we already had an intermediate connection error which we should present to the user instead
616
+ return . failTask( error, nil , nil )
570
617
case . finished( . some( _) ) :
571
618
// this might happen, if the stream consumer has failed... let's just drop the data
572
619
return . none
0 commit comments