@@ -92,39 +92,45 @@ extension RequestBag.StateMachine {
92
92
}
93
93
}
94
94
95
- enum StartProducingAction {
96
- case startWriter( HTTPClient . Body . StreamWriter , body: HTTPClient . Body )
95
+ enum ResumeProducingAction {
96
+ case startWriter
97
+ case succeedBackpressurePromise( EventLoopPromise < Void > ? )
97
98
case none
98
99
}
99
100
100
- mutating func startRequestBodyStream(
101
- _ body: HTTPClient . Body ? ,
102
- writeMethod: @escaping ( IOData ) -> EventLoopFuture < Void >
103
- ) -> StartProducingAction {
101
+ mutating func resumeRequestBodyStream( ) -> ResumeProducingAction {
104
102
switch self . state {
103
+ case . initialized, . queued:
104
+ preconditionFailure ( " A request stream can only be resumed, if the request was started " )
105
+
105
106
case . executing( let executor, . initialized, . initialized) :
106
- guard let body = body else {
107
- preconditionFailure ( " Did not expect to get a call to `startRequestBodyStream`, if there is no body. " )
108
- }
107
+ self . state = . executing( executor, . producing, . initialized)
108
+ return . startWriter
109
109
110
- let streamWriter = HTTPClient . Body. StreamWriter { part -> EventLoopFuture < Void > in
111
- writeMethod ( part)
112
- }
110
+ case . executing( _, . producing, _) :
111
+ preconditionFailure ( " Expected that resume is only called when if we were paused before " )
113
112
114
- self . state = . executing( executor, . producing, . initialized)
113
+ case . executing( let executor, . paused( let promise) , let responseState) :
114
+ self . state = . executing( executor, . producing, responseState)
115
+ return . succeedBackpressurePromise( promise)
115
116
116
- return . startWriter( streamWriter, body: body)
117
+ case . executing( _, . finished, _) :
118
+ // the channels writability changed to writable after we have forwarded all the
119
+ // request bytes. Can be ignored.
120
+ return . none
117
121
118
- case . finished( error: . some) :
122
+ case . executing( _, . initialized, . buffering) , . executing( _, . initialized, . waitingForRemote) :
123
+ preconditionFailure ( " Invalid states: Response can not be received before request " )
124
+
125
+ case . redirected:
126
+ // if we are redirected, we should cancel our request body stream anyway
119
127
return . none
120
128
121
- case . initialized,
122
- . queued,
123
- . executing( _, _, _) ,
124
- . finished( error: . none) ,
125
- . redirected,
126
- . modifying:
127
- preconditionFailure ( " Expected the state to be either initialized or failed. Invalid state: \( self . state) " )
129
+ case . finished:
130
+ preconditionFailure ( " Invalid state " )
131
+
132
+ case . modifying:
133
+ preconditionFailure ( " Invalid state " )
128
134
}
129
135
}
130
136
@@ -156,37 +162,6 @@ extension RequestBag.StateMachine {
156
162
}
157
163
}
158
164
159
- mutating func resumeRequestBodyStream( ) -> EventLoopPromise < Void > ? {
160
- switch self . state {
161
- case . initialized, . queued:
162
- preconditionFailure ( " A request stream can only be resumed, if the request was started " )
163
- case . executing( let executor, let requestState, let responseState) :
164
- switch requestState {
165
- case . initialized:
166
- preconditionFailure ( " Request stream must be started before it can be paused " )
167
- case . producing:
168
- preconditionFailure ( " Expected that resume is only called when if we were paused before " )
169
- case . paused( let promise) :
170
- self . state = . executing( executor, . producing, responseState)
171
- return promise
172
- case . finished:
173
- // the channels writability changed to writable after we have forwarded all the
174
- // request bytes. Can be ignored.
175
- return nil
176
- }
177
-
178
- case . redirected:
179
- // if we are redirected, we should cancel our request body stream anyway
180
- return nil
181
-
182
- case . finished:
183
- preconditionFailure ( " Invalid state " )
184
-
185
- case . modifying:
186
- preconditionFailure ( " Invalid state " )
187
- }
188
- }
189
-
190
165
enum WriteAction {
191
166
case write( IOData , HTTPRequestExecutor , EventLoopFuture < Void > )
192
167
@@ -195,10 +170,6 @@ extension RequestBag.StateMachine {
195
170
}
196
171
197
172
mutating func writeNextRequestPart( _ part: IOData , taskEventLoop: EventLoop ) -> WriteAction {
198
- // this method is invoked with bodyPart and returns a future that signals that
199
- // more data can be send.
200
- // it may be invoked on any eventLoop
201
-
202
173
switch self . state {
203
174
case . initialized, . queued:
204
175
preconditionFailure ( " Invalid state: \( self . state) " )
@@ -210,6 +181,8 @@ extension RequestBag.StateMachine {
210
181
return . write( part, executor, taskEventLoop. makeSucceededFuture ( ( ) ) )
211
182
212
183
case . paused( . none) :
184
+ // backpressure is signaled to the writer using unfulfilled futures. if there
185
+ // is no existing, unfulfilled promise, let's create a new one
213
186
let promise = taskEventLoop. makePromise ( of: Void . self)
214
187
self . state = . executing( executor, . paused( promise) , responseState)
215
188
return . write( part, executor, promise. futureResult)
@@ -459,7 +432,6 @@ extension RequestBag.StateMachine {
459
432
return . consume( byteBuffer)
460
433
}
461
434
462
- // buffer is empty, wait for more
463
435
self . state = . finished( error: nil )
464
436
return . finishStream
465
437
0 commit comments