@@ -18,7 +18,8 @@ import NIOCore
18
18
import NIOHTTP1
19
19
import NIOSSL
20
20
21
- final class RequestBag < Delegate: HTTPClientResponseDelegate > {
21
+ @preconcurrency
22
+ final class RequestBag < Delegate: HTTPClientResponseDelegate & Sendable > : Sendable {
22
23
/// Defends against the call stack getting too large when consuming body parts.
23
24
///
24
25
/// If the response body comes in lots of tiny chunks, we'll deliver those tiny chunks to users
@@ -35,16 +36,23 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
35
36
}
36
37
37
38
private let delegate : Delegate
38
- private var request : HTTPClient . Request
39
39
40
- // the request state is synchronized on the task eventLoop
41
- private var state : StateMachine
42
-
43
- // the consume body part stack depth is synchronized on the task event loop.
44
- private var consumeBodyPartStackDepth : Int
40
+ struct LoopBoundState : @unchecked Sendable {
41
+ // The 'StateMachine' *isn't* Sendable (it holds various objects which aren't). This type
42
+ // needs to be sendable so that we can construct a loop bound box off of the event loop
43
+ // to hold this state and then subsequently only access it from the event loop. This needs
44
+ // to happen so that the request bag can be constructed off of the event loop. If it's
45
+ // constructed on the event loop then there's a timing window between users issuing
46
+ // a request and calling shutdown where the underlying pool doesn't know about the request
47
+ // so the shutdown call may cancel it.
48
+ var request : HTTPClient . Request
49
+ var state : StateMachine
50
+ var consumeBodyPartStackDepth : Int
51
+ // if a redirect occurs, we store the task for it so we can propagate cancellation
52
+ var redirectTask : HTTPClient . Task < Delegate . Response > ? = nil
53
+ }
45
54
46
- // if a redirect occurs, we store the task for it so we can propagate cancellation
47
- private var redirectTask : HTTPClient . Task < Delegate . Response > ? = nil
55
+ private let loopBoundState : NIOLoopBoundBox < LoopBoundState >
48
56
49
57
// MARK: HTTPClientTask properties
50
58
@@ -61,6 +69,8 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
61
69
62
70
let eventLoopPreference : HTTPClient . EventLoopPreference
63
71
72
+ let tlsConfiguration : TLSConfiguration ?
73
+
64
74
init (
65
75
request: HTTPClient . Request ,
66
76
eventLoopPreference: HTTPClient . EventLoopPreference ,
@@ -73,9 +83,13 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
73
83
self . poolKey = . init( request, dnsOverride: requestOptions. dnsOverride)
74
84
self . eventLoopPreference = eventLoopPreference
75
85
self . task = task
76
- self . state = . init( redirectHandler: redirectHandler)
77
- self . consumeBodyPartStackDepth = 0
78
- self . request = request
86
+
87
+ let loopBoundState = LoopBoundState (
88
+ request: request,
89
+ state: StateMachine ( redirectHandler: redirectHandler) ,
90
+ consumeBodyPartStackDepth: 0
91
+ )
92
+ self . loopBoundState = NIOLoopBoundBox . makeBoxSendingValue ( loopBoundState, eventLoop: task. eventLoop)
79
93
self . connectionDeadline = connectionDeadline
80
94
self . requestOptions = requestOptions
81
95
self . delegate = delegate
@@ -84,6 +98,8 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
84
98
self . requestHead = head
85
99
self . requestFramingMetadata = metadata
86
100
101
+ self . tlsConfiguration = request. tlsConfiguration
102
+
87
103
self . task. taskDelegate = self
88
104
self . task. futureResult. whenComplete { _ in
89
105
self . task. taskDelegate = nil
@@ -92,16 +108,13 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
92
108
93
109
private func requestWasQueued0( _ scheduler: HTTPRequestScheduler ) {
94
110
self . logger. debug ( " Request was queued (waiting for a connection to become available) " )
95
-
96
- self . task. eventLoop. assertInEventLoop ( )
97
- self . state. requestWasQueued ( scheduler)
111
+ self . loopBoundState. value. state. requestWasQueued ( scheduler)
98
112
}
99
113
100
114
// MARK: - Request -
101
115
102
116
private func willExecuteRequest0( _ executor: HTTPRequestExecutor ) {
103
- self . task. eventLoop. assertInEventLoop ( )
104
- let action = self . state. willExecuteRequest ( executor)
117
+ let action = self . loopBoundState. value. state. willExecuteRequest ( executor)
105
118
switch action {
106
119
case . cancelExecuter( let executor) :
107
120
executor. cancelRequest ( self )
@@ -115,26 +128,22 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
115
128
}
116
129
117
130
private func requestHeadSent0( ) {
118
- self . task. eventLoop. assertInEventLoop ( )
119
-
120
131
self . delegate. didSendRequestHead ( task: self . task, self . requestHead)
121
132
122
- if self . request. body == nil {
133
+ if self . loopBoundState . value . request. body == nil {
123
134
self . delegate. didSendRequest ( task: self . task)
124
135
}
125
136
}
126
137
127
138
private func resumeRequestBodyStream0( ) {
128
- self . task. eventLoop. assertInEventLoop ( )
129
-
130
- let produceAction = self . state. resumeRequestBodyStream ( )
139
+ let produceAction = self . loopBoundState. value. state. resumeRequestBodyStream ( )
131
140
132
141
switch produceAction {
133
142
case . startWriter:
134
- guard let body = self . request. body else {
143
+ guard let body = self . loopBoundState . value . request. body else {
135
144
preconditionFailure ( " Expected to have a body, if the `HTTPRequestStateMachine` resume a request stream " )
136
145
}
137
- self . request. body = nil
146
+ self . loopBoundState . value . request. body = nil
138
147
139
148
let writer = HTTPClient . Body. StreamWriter {
140
149
self . writeNextRequestPart ( $0)
@@ -153,9 +162,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
153
162
}
154
163
155
164
private func pauseRequestBodyStream0( ) {
156
- self . task. eventLoop. assertInEventLoop ( )
157
-
158
- self . state. pauseRequestBodyStream ( )
165
+ self . loopBoundState. value. state. pauseRequestBodyStream ( )
159
166
}
160
167
161
168
private func writeNextRequestPart( _ part: IOData ) -> EventLoopFuture < Void > {
@@ -169,9 +176,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
169
176
}
170
177
171
178
private func writeNextRequestPart0( _ part: IOData ) -> EventLoopFuture < Void > {
172
- self . eventLoop. assertInEventLoop ( )
173
-
174
- let action = self . state. writeNextRequestPart ( part, taskEventLoop: self . task. eventLoop)
179
+ let action = self . loopBoundState. value. state. writeNextRequestPart ( part, taskEventLoop: self . task. eventLoop)
175
180
176
181
switch action {
177
182
case . failTask( let error) :
@@ -193,9 +198,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
193
198
}
194
199
195
200
private func finishRequestBodyStream( _ result: Result < Void , Error > ) {
196
- self . task. eventLoop. assertInEventLoop ( )
197
-
198
- let action = self . state. finishRequestBodyStream ( result)
201
+ let action = self . loopBoundState. value. state. finishRequestBodyStream ( result)
199
202
200
203
switch action {
201
204
case . none:
@@ -226,20 +229,22 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
226
229
// MARK: - Response -
227
230
228
231
private func receiveResponseHead0( _ head: HTTPResponseHead ) {
229
- self . task. eventLoop. assertInEventLoop ( )
230
-
231
- self . delegate. didVisitURL ( task: self . task, self . request, head)
232
+ self . delegate. didVisitURL ( task: self . task, self . loopBoundState. value. request, head)
232
233
233
234
// runs most likely on channel eventLoop
234
- switch self . state. receiveResponseHead ( head) {
235
+ switch self . loopBoundState . value . state. receiveResponseHead ( head) {
235
236
case . none:
236
237
break
237
238
238
239
case . signalBodyDemand( let executor) :
239
240
executor. demandResponseBodyStream ( self )
240
241
241
242
case . redirect( let executor, let handler, let head, let newURL) :
242
- self . redirectTask = handler. redirect ( status: head. status, to: newURL, promise: self . task. promise)
243
+ self . loopBoundState. value. redirectTask = handler. redirect (
244
+ status: head. status,
245
+ to: newURL,
246
+ promise: self . task. promise
247
+ )
243
248
executor. cancelRequest ( self )
244
249
245
250
case . forwardResponseHead( let head) :
@@ -253,17 +258,19 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
253
258
}
254
259
255
260
private func receiveResponseBodyParts0( _ buffer: CircularBuffer < ByteBuffer > ) {
256
- self . task. eventLoop. assertInEventLoop ( )
257
-
258
- switch self . state. receiveResponseBodyParts ( buffer) {
261
+ switch self . loopBoundState. value. state. receiveResponseBodyParts ( buffer) {
259
262
case . none:
260
263
break
261
264
262
265
case . signalBodyDemand( let executor) :
263
266
executor. demandResponseBodyStream ( self )
264
267
265
268
case . redirect( let executor, let handler, let head, let newURL) :
266
- self . redirectTask = handler. redirect ( status: head. status, to: newURL, promise: self . task. promise)
269
+ self . loopBoundState. value. redirectTask = handler. redirect (
270
+ status: head. status,
271
+ to: newURL,
272
+ promise: self . task. promise
273
+ )
267
274
executor. cancelRequest ( self )
268
275
269
276
case . forwardResponsePart( let part) :
@@ -277,8 +284,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
277
284
}
278
285
279
286
private func succeedRequest0( _ buffer: CircularBuffer < ByteBuffer > ? ) {
280
- self . task. eventLoop. assertInEventLoop ( )
281
- let action = self . state. succeedRequest ( buffer)
287
+ let action = self . loopBoundState. value. state. succeedRequest ( buffer)
282
288
283
289
switch action {
284
290
case . none:
@@ -299,13 +305,15 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
299
305
}
300
306
301
307
case . redirect( let handler, let head, let newURL) :
302
- self . redirectTask = handler. redirect ( status: head. status, to: newURL, promise: self . task. promise)
308
+ self . loopBoundState. value. redirectTask = handler. redirect (
309
+ status: head. status,
310
+ to: newURL,
311
+ promise: self . task. promise
312
+ )
303
313
}
304
314
}
305
315
306
316
private func consumeMoreBodyData0( resultOfPreviousConsume result: Result < Void , Error > ) {
307
- self . task. eventLoop. assertInEventLoop ( )
308
-
309
317
// We get defensive here about the maximum stack depth. It's possible for the `didReceiveBodyPart`
310
318
// future to be returned to us completed. If it is, we will recurse back into this method. To
311
319
// break that recursion we have a max stack depth which we increment and decrement in this method:
@@ -316,24 +324,27 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
316
324
// that risk ending up in this loop. That's because we don't need an accurate count: our limit is
317
325
// a best-effort target anyway, one stack frame here or there does not put us at risk. We're just
318
326
// trying to prevent ourselves looping out of control.
319
- self . consumeBodyPartStackDepth += 1
327
+ self . loopBoundState . value . consumeBodyPartStackDepth += 1
320
328
defer {
321
- self . consumeBodyPartStackDepth -= 1
322
- assert ( self . consumeBodyPartStackDepth >= 0 )
329
+ self . loopBoundState . value . consumeBodyPartStackDepth -= 1
330
+ assert ( self . loopBoundState . value . consumeBodyPartStackDepth >= 0 )
323
331
}
324
332
325
- let consumptionAction = self . state. consumeMoreBodyData ( resultOfPreviousConsume: result)
333
+ let consumptionAction = self . loopBoundState. value. state. consumeMoreBodyData (
334
+ resultOfPreviousConsume: result
335
+ )
326
336
327
337
switch consumptionAction {
328
338
case . consume( let byteBuffer) :
329
339
self . delegate. didReceiveBodyPart ( task: self . task, byteBuffer)
330
340
. hop ( to: self . task. eventLoop)
341
+ . assumeIsolated ( )
331
342
. whenComplete { result in
332
- if self . consumeBodyPartStackDepth < Self . maxConsumeBodyPartStackDepth {
343
+ if self . loopBoundState . value . consumeBodyPartStackDepth < Self . maxConsumeBodyPartStackDepth {
333
344
self . consumeMoreBodyData0 ( resultOfPreviousConsume: result)
334
345
} else {
335
346
// We need to unwind the stack, let's take a break.
336
- self . task. eventLoop. execute {
347
+ self . task. eventLoop. assumeIsolated ( ) . execute {
337
348
self . consumeMoreBodyData0 ( resultOfPreviousConsume: result)
338
349
}
339
350
}
@@ -344,7 +355,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
344
355
case . finishStream:
345
356
do {
346
357
let response = try self . delegate. didFinishRequest ( task: self . task)
347
- self . task. promise. succeed ( response)
358
+ self . task. promise. assumeIsolated ( ) . succeed ( response)
348
359
} catch {
349
360
self . task. promise. fail ( error)
350
361
}
@@ -358,13 +369,11 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
358
369
}
359
370
360
371
private func fail0( _ error: Error ) {
361
- self . task. eventLoop. assertInEventLoop ( )
362
-
363
- let action = self . state. fail ( error)
372
+ let action = self . loopBoundState. value. state. fail ( error)
364
373
365
374
self . executeFailAction0 ( action)
366
375
367
- self . redirectTask? . fail ( reason: error)
376
+ self . loopBoundState . value . redirectTask? . fail ( reason: error)
368
377
}
369
378
370
379
private func executeFailAction0( _ action: RequestBag < Delegate > . StateMachine . FailAction ) {
@@ -381,8 +390,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
381
390
}
382
391
383
392
func deadlineExceeded0( ) {
384
- self . task. eventLoop. assertInEventLoop ( )
385
- let action = self . state. deadlineExceeded ( )
393
+ let action = self . loopBoundState. value. state. deadlineExceeded ( )
386
394
387
395
switch action {
388
396
case . cancelScheduler( let scheduler) :
@@ -404,9 +412,6 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
404
412
}
405
413
406
414
extension RequestBag : HTTPSchedulableRequest , HTTPClientTaskDelegate {
407
- var tlsConfiguration : TLSConfiguration ? {
408
- self . request. tlsConfiguration
409
- }
410
415
411
416
func requestWasQueued( _ scheduler: HTTPRequestScheduler ) {
412
417
if self . task. eventLoop. inEventLoop {
0 commit comments