@@ -21,6 +21,12 @@ extension HTTPConnectionPool {
21
21
typealias RequestAction = HTTPConnectionPool . StateMachine . RequestAction
22
22
typealias ConnectionAction = HTTPConnectionPool . StateMachine . ConnectionAction
23
23
24
+ private enum State : Equatable {
25
+ case running
26
+ case shuttingDown( unclean: Bool )
27
+ case shutDown
28
+ }
29
+
24
30
private var lastConnectFailure : Error ?
25
31
private var failedConsecutiveConnectionAttempts = 0
26
32
@@ -31,6 +37,8 @@ extension HTTPConnectionPool {
31
37
32
38
private let idGenerator : Connection . ID . Generator
33
39
40
+ private var state : State = . running
41
+
34
42
init (
35
43
idGenerator: Connection . ID . Generator
36
44
) {
@@ -68,10 +76,24 @@ extension HTTPConnectionPool {
68
76
}
69
77
70
78
mutating func executeRequest( _ request: Request ) -> Action {
71
- if let eventLoop = request. requiredEventLoop {
72
- return self . executeRequest ( request, onRequired: eventLoop)
73
- } else {
74
- return self . executeRequest ( request, onPreferred: request. preferredEventLoop)
79
+ switch self . state {
80
+ case . running:
81
+ if let eventLoop = request. requiredEventLoop {
82
+ return self . executeRequest ( request, onRequired: eventLoop)
83
+ } else {
84
+ return self . executeRequest ( request, onPreferred: request. preferredEventLoop)
85
+ }
86
+ case . shutDown, . shuttingDown:
87
+ // it is fairly unlikely that this condition is met, since the ConnectionPoolManager
88
+ // also fails new requests immediately, if it is shutting down. However there might
89
+ // be race conditions in which a request passes through a running connection pool
90
+ // manager, but hits a connection pool that is already shutting down.
91
+ //
92
+ // (Order in one lock does not guarantee order in the next lock!)
93
+ return . init(
94
+ request: . failRequest( request, HTTPClientError . alreadyShutdown, cancelTimeout: false ) ,
95
+ connection: . none
96
+ )
75
97
}
76
98
}
77
99
@@ -149,36 +171,57 @@ extension HTTPConnectionPool {
149
171
at index: Int ,
150
172
context: HTTP2Connections . AvailableConnectionContext
151
173
) -> Action {
152
- // We prioritise requests with a required event loop over those without a requirement.
153
- // This can cause starvation for request without a required event loop.
154
- // We should come up with a better algorithm in the future.
155
-
156
- var requestsToExecute = self . requests. popFirst ( max: context. availableStreams, for: context. eventLoop)
157
- let remainingAvailableStreams = context. availableStreams - requestsToExecute. count
158
- // use the remaining available streams for requests without a required event loop
159
- requestsToExecute += self . requests. popFirst ( max: remainingAvailableStreams, for: nil )
160
- let connection = self . connections. leaseStreams ( at: index, count: requestsToExecute. count)
161
-
162
- let requestAction = { ( ) -> RequestAction in
163
- if requestsToExecute. isEmpty {
164
- return . none
165
- } else {
166
- return . executeRequestsAndCancelTimeouts( requestsToExecute, connection)
167
- }
168
- } ( )
174
+ switch self . state {
175
+ case . running:
176
+ // We prioritise requests with a required event loop over those without a requirement.
177
+ // This can cause starvation for request without a required event loop.
178
+ // We should come up with a better algorithm in the future.
179
+
180
+ var requestsToExecute = self . requests. popFirst ( max: context. availableStreams, for: context. eventLoop)
181
+ let remainingAvailableStreams = context. availableStreams - requestsToExecute. count
182
+ // use the remaining available streams for requests without a required event loop
183
+ requestsToExecute += self . requests. popFirst ( max: remainingAvailableStreams, for: nil )
184
+ let connection = self . connections. leaseStreams ( at: index, count: requestsToExecute. count)
185
+
186
+ let requestAction = { ( ) -> RequestAction in
187
+ if requestsToExecute. isEmpty {
188
+ return . none
189
+ } else {
190
+ return . executeRequestsAndCancelTimeouts( requestsToExecute, connection)
191
+ }
192
+ } ( )
193
+
194
+ let connectionAction = { ( ) -> ConnectionAction in
195
+ if context. isIdle, requestsToExecute. isEmpty {
196
+ return . scheduleTimeoutTimer( connection. id, on: context. eventLoop)
197
+ } else {
198
+ return . none
199
+ }
200
+ } ( )
169
201
170
- let connectionAction = { ( ) -> ConnectionAction in
171
- if context. isIdle, requestsToExecute. isEmpty {
172
- return . scheduleTimeoutTimer( connection. id, on: context. eventLoop)
173
- } else {
202
+ return . init(
203
+ request: requestAction,
204
+ connection: connectionAction
205
+ )
206
+ case . shuttingDown( let unclean) :
207
+ guard context. isIdle else {
174
208
return . none
175
209
}
176
- } ( )
177
210
178
- return . init(
179
- request: requestAction,
180
- connection: connectionAction
181
- )
211
+ let connection = self . connections. closeConnection ( at: index)
212
+ if self . connections. isEmpty {
213
+ return . init(
214
+ request: . none,
215
+ connection: . closeConnection( connection, isShutdown: . yes( unclean: unclean) )
216
+ )
217
+ }
218
+ return . init(
219
+ request: . none,
220
+ connection: . closeConnection( connection, isShutdown: . no)
221
+ )
222
+ case . shutDown:
223
+ preconditionFailure ( " It the pool is already shutdown, all connections must have been torn down. " )
224
+ }
182
225
}
183
226
184
227
mutating func newHTTP2MaxConcurrentStreamsReceived( _ connectionID: Connection . ID , newMaxStreams: Int ) -> Action {
@@ -199,32 +242,53 @@ extension HTTPConnectionPool {
199
242
}
200
243
201
244
private mutating func nextActionForFailedConnection( at index: Int , on eventLoop: EventLoop ) -> Action {
202
- let hasPendingRequest = !self . requests. isEmpty ( for: eventLoop) || !self . requests. isEmpty ( for: nil )
203
- guard hasPendingRequest else {
204
- return . none
205
- }
245
+ switch self . state {
246
+ case . running:
247
+ let hasPendingRequest = !self . requests. isEmpty ( for: eventLoop) || !self . requests. isEmpty ( for: nil )
248
+ guard hasPendingRequest else {
249
+ return . none
250
+ }
206
251
207
- let ( newConnectionID, previousEventLoop) = self . connections. createNewConnectionByReplacingClosedConnection ( at: index)
208
- precondition ( previousEventLoop === eventLoop)
252
+ let ( newConnectionID, previousEventLoop) = self . connections. createNewConnectionByReplacingClosedConnection ( at: index)
253
+ precondition ( previousEventLoop === eventLoop)
209
254
210
- return . init(
211
- request: . none,
212
- connection: . createConnection( newConnectionID, on: eventLoop)
213
- )
255
+ return . init(
256
+ request: . none,
257
+ connection: . createConnection( newConnectionID, on: eventLoop)
258
+ )
259
+ case . shuttingDown( let unclean) :
260
+ assert ( self . requests. isEmpty)
261
+ self . connections. removeConnection ( at: index)
262
+ if self . connections. isEmpty {
263
+ return . init(
264
+ request: . none,
265
+ connection: . cleanupConnections( . init( ) , isShutdown: . yes( unclean: unclean) )
266
+ )
267
+ }
268
+ return . none
269
+
270
+ case . shutDown:
271
+ preconditionFailure ( " If the pool is already shutdown, all connections must have been torn down. " )
272
+ }
214
273
}
215
274
216
275
private mutating func nextActionForClosingConnection( on eventLoop: EventLoop ) -> Action {
217
- let hasPendingRequest = !self . requests. isEmpty ( for: eventLoop) || !self . requests. isEmpty ( for: nil )
218
- guard hasPendingRequest else {
219
- return . none
220
- }
276
+ switch self . state {
277
+ case . running:
278
+ let hasPendingRequest = !self . requests. isEmpty ( for: eventLoop) || !self . requests. isEmpty ( for: nil )
279
+ guard hasPendingRequest else {
280
+ return . none
281
+ }
221
282
222
- let newConnectionID = self . connections. createNewConnection ( on: eventLoop)
283
+ let newConnectionID = self . connections. createNewConnection ( on: eventLoop)
223
284
224
- return . init(
225
- request: . none,
226
- connection: . createConnection( newConnectionID, on: eventLoop)
227
- )
285
+ return . init(
286
+ request: . none,
287
+ connection: . createConnection( newConnectionID, on: eventLoop)
288
+ )
289
+ case . shutDown, . shuttingDown:
290
+ return . none
291
+ }
228
292
}
229
293
230
294
mutating func http2ConnectionStreamClosed( _ connectionID: Connection . ID ) -> Action {
@@ -248,7 +312,7 @@ extension HTTPConnectionPool {
248
312
guard let ( index, context) = self . connections. failConnection ( connectionID) else {
249
313
preconditionFailure ( " Backing off a connection that is unknown to us? " )
250
314
}
251
- return nextActionForFailedConnection ( at: index, on: context. eventLoop)
315
+ return self . nextActionForFailedConnection ( at: index, on: context. eventLoop)
252
316
}
253
317
254
318
mutating func timeoutRequest( _ requestID: Request . ID ) -> Action {
@@ -289,8 +353,13 @@ extension HTTPConnectionPool {
289
353
290
354
mutating func connectionIdleTimeout( _ connectionID: Connection . ID ) -> Action {
291
355
guard let connection = connections. closeConnectionIfIdle ( connectionID) else {
356
+ // because of a race this connection (connection close runs against trigger of timeout)
357
+ // was already removed from the state machine.
292
358
return . none
293
359
}
360
+
361
+ precondition ( self . state == . running, " If we are shutting down, we must not have any idle connections " )
362
+
294
363
return . init(
295
364
request: . none,
296
365
connection: . closeConnection( connection, isShutdown: . no)
@@ -329,8 +398,10 @@ extension HTTPConnectionPool {
329
398
let unclean = !( cleanupContext. cancel. isEmpty && waitingRequests. isEmpty)
330
399
if self . connections. isEmpty {
331
400
isShutdown = . yes( unclean: unclean)
401
+ self . state = . shutDown
332
402
} else {
333
403
isShutdown = . no
404
+ self . state = . shuttingDown( unclean: unclean)
334
405
}
335
406
return . init(
336
407
request: requestAction,
0 commit comments