Skip to content

Commit 7b4da91

Browse files
committed
Add HTTPConnectionPool
1 parent 6eda4da commit 7b4da91

File tree

6 files changed

+779
-10
lines changed

6 files changed

+779
-10
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

+367-2
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,16 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Logging
16+
import NIOConcurrencyHelpers
1517
import NIOCore
18+
import NIOSSL
1619

17-
enum HTTPConnectionPool {
20+
protocol HTTPConnectionPoolDelegate {
21+
func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool)
22+
}
23+
24+
class HTTPConnectionPool {
1825
struct Connection: Hashable {
1926
typealias ID = Int
2027

@@ -121,6 +128,364 @@ enum HTTPConnectionPool {
121128
}
122129
}
123130
}
131+
132+
let stateLock = Lock()
133+
private var _state: StateMachine {
134+
didSet {
135+
self.logger.trace("Connection Pool State changed", metadata: [
136+
"key": "\(self.key)",
137+
"state": "\(self._state)",
138+
])
139+
}
140+
}
141+
142+
let timerLock = Lock()
143+
private var _requestTimer = [Request.ID: Scheduled<Void>]()
144+
private var _idleTimer = [Connection.ID: Scheduled<Void>]()
145+
private var _backoffTimer = [Connection.ID: Scheduled<Void>]()
146+
147+
let key: ConnectionPool.Key
148+
var logger: Logger
149+
150+
let eventLoopGroup: EventLoopGroup
151+
let connectionFactory: ConnectionFactory
152+
let clientConfiguration: HTTPClient.Configuration
153+
let idleConnectionTimeout: TimeAmount
154+
155+
let delegate: HTTPConnectionPoolDelegate
156+
157+
init(eventLoopGroup: EventLoopGroup,
158+
sslContextCache: SSLContextCache,
159+
tlsConfiguration: TLSConfiguration?,
160+
clientConfiguration: HTTPClient.Configuration,
161+
key: ConnectionPool.Key,
162+
delegate: HTTPConnectionPoolDelegate,
163+
idGenerator: Connection.ID.Generator,
164+
backgroundActivityLogger logger: Logger) {
165+
self.eventLoopGroup = eventLoopGroup
166+
self.connectionFactory = ConnectionFactory(
167+
key: key,
168+
tlsConfiguration: tlsConfiguration,
169+
clientConfiguration: clientConfiguration,
170+
sslContextCache: sslContextCache
171+
)
172+
self.clientConfiguration = clientConfiguration
173+
self.key = key
174+
self.delegate = delegate
175+
self.logger = logger
176+
177+
self.idleConnectionTimeout = clientConfiguration.connectionPool.idleTimeout
178+
179+
self._state = StateMachine(
180+
eventLoopGroup: eventLoopGroup,
181+
idGenerator: idGenerator,
182+
maximumConcurrentHTTP1Connections: 8
183+
)
184+
}
185+
186+
func executeRequest(_ request: HTTPSchedulableRequest) {
187+
let action = self.stateLock.withLock { () -> StateMachine.Action in
188+
self._state.executeRequest(.init(request))
189+
}
190+
self.run(action: action)
191+
}
192+
193+
func shutdown() {
194+
let action = self.stateLock.withLock { () -> StateMachine.Action in
195+
self._state.shutdown()
196+
}
197+
self.run(action: action)
198+
}
199+
200+
func run(action: StateMachine.Action) {
201+
self.runConnectionAction(action.connection)
202+
self.runRequestAction(action.request)
203+
}
204+
205+
func runConnectionAction(_ action: StateMachine.ConnectionAction) {
206+
switch action {
207+
case .createConnection(let connectionID, let eventLoop):
208+
self.createConnection(connectionID, on: eventLoop)
209+
210+
case .scheduleBackoffTimer(let connectionID, let backoff, on: let eventLoop):
211+
self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop)
212+
213+
case .scheduleTimeoutTimer(let connectionID, on: let eventLoop):
214+
self.scheduleIdleTimerForConnection(connectionID, on: eventLoop)
215+
216+
case .cancelTimeoutTimer(let connectionID):
217+
self.cancelIdleTimerForConnection(connectionID)
218+
219+
case .closeConnection(let connection, isShutdown: let isShutdown):
220+
// we are not interested in the close future...
221+
_ = connection.close()
222+
223+
if case .yes(let unclean) = isShutdown {
224+
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
225+
}
226+
227+
case .cleanupConnections(let cleanupContext, isShutdown: let isShutdown):
228+
for connection in cleanupContext.close {
229+
_ = connection.close()
230+
}
231+
232+
for connection in cleanupContext.cancel {
233+
_ = connection.close()
234+
}
235+
236+
for connectionID in cleanupContext.connectBackoff {
237+
self.cancelConnectionStartBackoffTimer(connectionID)
238+
}
239+
240+
if case .yes(let unclean) = isShutdown {
241+
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
242+
}
243+
244+
case .none:
245+
break
246+
}
247+
}
248+
249+
func runRequestAction(_ action: StateMachine.RequestAction) {
250+
switch action {
251+
case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout):
252+
connection.executeRequest(request.req)
253+
if cancelTimeout {
254+
self.cancelRequestTimeout(request.id)
255+
}
256+
257+
case .executeRequestsAndCancelTimeouts(let requests, let connection):
258+
for request in requests {
259+
connection.executeRequest(request.req)
260+
self.cancelRequestTimeout(request.id)
261+
}
262+
263+
case .failRequest(let request, let error, cancelTimeout: let cancelTimeout):
264+
if cancelTimeout {
265+
self.cancelRequestTimeout(request.id)
266+
}
267+
request.req.fail(error)
268+
269+
case .failRequestsAndCancelTimeouts(let requests, let error):
270+
for request in requests {
271+
self.cancelRequestTimeout(request.id)
272+
request.req.fail(error)
273+
}
274+
275+
case .scheduleRequestTimeout(let request, on: let eventLoop):
276+
self.scheduleRequestTimeout(request, on: eventLoop)
277+
278+
case .cancelRequestTimeout(let requestID):
279+
self.cancelRequestTimeout(requestID)
280+
281+
case .none:
282+
break
283+
}
284+
}
285+
286+
// MARK: Run actions
287+
288+
private func createConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) {
289+
self.connectionFactory.makeConnection(
290+
for: self,
291+
connectionID: connectionID,
292+
http1ConnectionDelegate: self,
293+
http2ConnectionDelegate: self,
294+
deadline: .now() + (self.clientConfiguration.timeout.connect ?? .seconds(30)),
295+
eventLoop: eventLoop,
296+
logger: self.logger
297+
)
298+
}
299+
300+
private func scheduleRequestTimeout(_ request: Request, on eventLoop: EventLoop) {
301+
let requestID = request.id
302+
let scheduled = eventLoop.scheduleTask(deadline: request.connectionDeadline) {
303+
// The timer has fired. Now we need to do a couple of things:
304+
//
305+
// 1. Remove ourselves from the timer dictionary to not leak any data. If our
306+
// waiter entry still exist, we need to tell the state machine, that we want
307+
// to fail the request.
308+
309+
let timeout = self.timerLock.withLock {
310+
self._requestTimer.removeValue(forKey: requestID) != nil
311+
}
312+
313+
// 2. If the entry did not exists anymore, we can assume that the request was
314+
// scheduled on another connection. The timer still fired anyhow because of a
315+
// race. In such a situation we don't need to do anything.
316+
guard timeout else { return }
317+
318+
// 3. Tell the state machine about the timeout
319+
let action = self.stateLock.withLock {
320+
self._state.timeoutRequest(requestID)
321+
}
322+
323+
self.run(action: action)
324+
}
325+
326+
self.timerLock.withLockVoid {
327+
assert(self._requestTimer[requestID] == nil)
328+
self._requestTimer[requestID] = scheduled
329+
}
330+
331+
request.req.requestWasQueued(self)
332+
}
333+
334+
private func cancelRequestTimeout(_ id: Request.ID) {
335+
let scheduled = self.timerLock.withLock {
336+
self._requestTimer.removeValue(forKey: id)
337+
}
338+
339+
scheduled?.cancel()
340+
}
341+
342+
private func scheduleIdleTimerForConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) {
343+
let scheduled = eventLoop.scheduleTask(in: self.idleConnectionTimeout) {
344+
// there might be a race between a cancelTimer call and the triggering
345+
// of this scheduled task. both want to acquire the lock
346+
let timerExisted = self.timerLock.withLock {
347+
self._idleTimer.removeValue(forKey: connectionID) != nil
348+
}
349+
350+
guard timerExisted else { return }
351+
352+
let action = self.stateLock.withLock {
353+
self._state.connectionIdleTimeout(connectionID)
354+
}
355+
self.run(action: action)
356+
}
357+
358+
self.timerLock.withLock {
359+
assert(self._idleTimer[connectionID] == nil)
360+
self._idleTimer[connectionID] = scheduled
361+
}
362+
}
363+
364+
private func cancelIdleTimerForConnection(_ connectionID: Connection.ID) {
365+
let cancelTimer = self.timerLock.withLock {
366+
self._idleTimer.removeValue(forKey: connectionID)
367+
}
368+
369+
cancelTimer?.cancel()
370+
}
371+
372+
private func scheduleConnectionStartBackoffTimer(
373+
_ connectionID: Connection.ID,
374+
_ timeAmount: TimeAmount,
375+
on eventLoop: EventLoop
376+
) {
377+
let scheduled = eventLoop.scheduleTask(in: timeAmount) {
378+
// there might be a race between a backoffTimer and the pool shutting down.
379+
let timerExisted = self.timerLock.withLock {
380+
self._backoffTimer.removeValue(forKey: connectionID) != nil
381+
}
382+
383+
guard timerExisted else { return }
384+
385+
let action = self.stateLock.withLock {
386+
self._state.connectionCreationBackoffDone(connectionID)
387+
}
388+
self.run(action: action)
389+
}
390+
391+
self.timerLock.withLock {
392+
assert(self._backoffTimer[connectionID] == nil)
393+
self._backoffTimer[connectionID] = scheduled
394+
}
395+
}
396+
397+
private func cancelConnectionStartBackoffTimer(_ connectionID: Connection.ID) {
398+
let backoffTimer = self.timerLock.withLock {
399+
self._backoffTimer[connectionID]
400+
}
401+
402+
backoffTimer?.cancel()
403+
}
404+
}
405+
406+
// MARK: - Protocol methods -
407+
408+
extension HTTPConnectionPool: HTTPConnectionRequester {
409+
func http1ConnectionCreated(_ connection: HTTP1Connection) {
410+
let action = self.stateLock.withLock {
411+
self._state.newHTTP1ConnectionCreated(.http1_1(connection))
412+
}
413+
self.run(action: action)
414+
}
415+
416+
func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) {
417+
preconditionFailure("Did not expect http/2 connections right now.")
418+
// let action = self.stateLock.withLock { () -> StateMachine.Action in
419+
// if let settings = connection.settings {
420+
// return self._state.newHTTP2ConnectionCreated(.http2(connection), settings: settings)
421+
// } else {
422+
// // immidiate connection closure before we can register with state machine
423+
// // is the only reason we don't have settings
424+
// struct ImmidiateConnectionClose: Error {}
425+
// return self._state.failedToCreateNewConnection(ImmidiateConnectionClose(), connectionID: connection.id)
426+
// }
427+
// }
428+
// self.run(action: action)
429+
}
430+
431+
func failedToCreateHTTPConnection(_ connectionID: HTTPConnectionPool.Connection.ID, error: Error) {
432+
let action = self.stateLock.withLock {
433+
self._state.failedToCreateNewConnection(error, connectionID: connectionID)
434+
}
435+
self.run(action: action)
436+
}
437+
}
438+
439+
extension HTTPConnectionPool: HTTP1ConnectionDelegate {
440+
func http1ConnectionClosed(_ connection: HTTP1Connection) {
441+
let action = self.stateLock.withLock {
442+
self._state.connectionClosed(connection.id)
443+
}
444+
self.run(action: action)
445+
}
446+
447+
func http1ConnectionReleased(_ connection: HTTP1Connection) {
448+
let action = self.stateLock.withLock {
449+
self._state.http1ConnectionReleased(connection.id)
450+
}
451+
self.run(action: action)
452+
}
453+
}
454+
455+
extension HTTPConnectionPool: HTTP2ConnectionDelegate {
456+
func http2Connection(_ connection: HTTP2Connection, newMaxStreamSetting: Int) {
457+
// ignore for now
458+
}
459+
460+
func http2ConnectionGoAwayReceived(_: HTTP2Connection) {
461+
// ignore for now
462+
}
463+
464+
func http2ConnectionClosed(_: HTTP2Connection) {
465+
// ignore for now
466+
// let action = self.stateLock.withLock {
467+
// self._state.connectionClosed(connection.id)
468+
// }
469+
// self.run(action: action)
470+
}
471+
472+
func http2ConnectionStreamClosed(_ connection: HTTP2Connection, availableStreams: Int) {
473+
// ignore for now
474+
// let action = self.stateLock.withLock {
475+
// self._state.http2ConnectionStreamClosed(connection.id, availableStreams: availableStreams)
476+
// }
477+
// self.run(action: action)
478+
}
479+
}
480+
481+
extension HTTPConnectionPool: HTTPRequestScheduler {
482+
func cancelRequest(_ request: HTTPSchedulableRequest) {
483+
let requestID = Request(request).id
484+
let action = self.stateLock.withLock {
485+
self._state.cancelRequest(requestID)
486+
}
487+
self.run(action: action)
488+
}
124489
}
125490

126491
extension HTTPConnectionPool {
@@ -156,7 +521,7 @@ extension HTTPConnectionPool {
156521
self.req.preferredEventLoop
157522
}
158523

159-
var connectionDeadline: NIODeadline? {
524+
var connectionDeadline: NIODeadline {
160525
self.req.connectionDeadline
161526
}
162527

0 commit comments

Comments
 (0)