Skip to content

Commit 2408ef4

Browse files
authored
Merge branch 'main' into ff-better-error-messages
2 parents db395e4 + 75b716e commit 2408ef4

19 files changed

+1393
-63
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift

+7-1
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,14 @@ final class HTTP1Connection {
7979
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
8080
}
8181

82+
func close(promise: EventLoopPromise<Void>?) {
83+
return self.channel.close(mode: .all, promise: promise)
84+
}
85+
8286
func close() -> EventLoopFuture<Void> {
83-
return self.channel.close()
87+
let promise = self.channel.eventLoop.makePromise(of: Void.self)
88+
self.close(promise: promise)
89+
return promise.futureResult
8490
}
8591

8692
func taskCompleted() {

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

+7-1
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,14 @@ final class HTTP2Connection {
144144
}
145145
}
146146

147+
func close(promise: EventLoopPromise<Void>?) {
148+
return self.channel.close(mode: .all, promise: promise)
149+
}
150+
147151
func close() -> EventLoopFuture<Void> {
148-
self.channel.close()
152+
let promise = self.channel.eventLoop.makePromise(of: Void.self)
153+
self.close(promise: promise)
154+
return promise.futureResult
149155
}
150156

151157
private func start() -> EventLoopFuture<Void> {

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Manager.swift

+147
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,154 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Logging
16+
import NIO
1517
import NIOConcurrencyHelpers
18+
import NIOHTTP1
19+
20+
extension HTTPConnectionPool {
21+
final class Manager {
22+
private typealias Key = ConnectionPool.Key
23+
24+
private enum State {
25+
case active
26+
case shuttingDown(promise: EventLoopPromise<Bool>?, unclean: Bool)
27+
case shutDown
28+
}
29+
30+
private let eventLoopGroup: EventLoopGroup
31+
private let configuration: HTTPClient.Configuration
32+
private let connectionIDGenerator = Connection.ID.globalGenerator
33+
private let logger: Logger
34+
35+
private var state: State = .active
36+
private var _pools: [Key: HTTPConnectionPool] = [:]
37+
private let lock = Lock()
38+
39+
private let sslContextCache = SSLContextCache()
40+
41+
init(eventLoopGroup: EventLoopGroup,
42+
configuration: HTTPClient.Configuration,
43+
backgroundActivityLogger logger: Logger) {
44+
self.eventLoopGroup = eventLoopGroup
45+
self.configuration = configuration
46+
self.logger = logger
47+
}
48+
49+
func executeRequest(_ request: HTTPSchedulableRequest) {
50+
let poolKey = request.poolKey
51+
let poolResult = self.lock.withLock { () -> Result<HTTPConnectionPool, HTTPClientError> in
52+
switch self.state {
53+
case .active:
54+
if let pool = self._pools[poolKey] {
55+
return .success(pool)
56+
}
57+
58+
let pool = HTTPConnectionPool(
59+
eventLoopGroup: self.eventLoopGroup,
60+
sslContextCache: self.sslContextCache,
61+
tlsConfiguration: request.tlsConfiguration,
62+
clientConfiguration: self.configuration,
63+
key: poolKey,
64+
delegate: self,
65+
idGenerator: self.connectionIDGenerator,
66+
backgroundActivityLogger: self.logger
67+
)
68+
self._pools[poolKey] = pool
69+
return .success(pool)
70+
71+
case .shuttingDown, .shutDown:
72+
return .failure(HTTPClientError.alreadyShutdown)
73+
}
74+
}
75+
76+
switch poolResult {
77+
case .success(let pool):
78+
pool.executeRequest(request)
79+
case .failure(let error):
80+
request.fail(error)
81+
}
82+
}
83+
84+
/// Shutdown the connection pool manager. You **must** shutdown the pool manager, since it leak otherwise.
85+
///
86+
/// - Parameter promise: An `EventLoopPromise` that is succeeded once all connections pools are shutdown.
87+
/// - Returns: An EventLoopFuture that is succeeded once the pool is shutdown. The bool indicates if the
88+
/// shutdown was unclean.
89+
func shutdown(promise: EventLoopPromise<Bool>?) {
90+
enum ShutdownAction {
91+
case done(EventLoopPromise<Bool>?)
92+
case shutdown([Key: HTTPConnectionPool])
93+
}
94+
95+
let action = self.lock.withLock { () -> ShutdownAction in
96+
switch self.state {
97+
case .active:
98+
// If there aren't any pools, we can mark the pool as shut down right away.
99+
if self._pools.isEmpty {
100+
self.state = .shutDown
101+
return .done(promise)
102+
} else {
103+
// this promise will be succeeded once all connection pools are shutdown
104+
self.state = .shuttingDown(promise: promise, unclean: false)
105+
return .shutdown(self._pools)
106+
}
107+
108+
case .shuttingDown, .shutDown:
109+
preconditionFailure("PoolManager already shutdown")
110+
}
111+
}
112+
113+
// if no pools are returned, the manager is already shutdown completely. Inform the
114+
// delegate. This is a very clean shutdown...
115+
switch action {
116+
case .done(let promise):
117+
promise?.succeed(false)
118+
119+
case .shutdown(let pools):
120+
pools.values.forEach { pool in
121+
pool.shutdown()
122+
}
123+
}
124+
}
125+
}
126+
}
127+
128+
extension HTTPConnectionPool.Manager: HTTPConnectionPoolDelegate {
129+
func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) {
130+
enum CloseAction {
131+
case close(EventLoopPromise<Bool>?, unclean: Bool)
132+
case wait
133+
}
134+
135+
let closeAction = self.lock.withLock { () -> CloseAction in
136+
switch self.state {
137+
case .active, .shutDown:
138+
preconditionFailure("Why are pools shutting down, if the manager did not give a signal")
139+
140+
case .shuttingDown(let promise, let soFarUnclean):
141+
guard self._pools.removeValue(forKey: pool.key) === pool else {
142+
preconditionFailure("Expected that the pool was created by this manager and is known for this reason.")
143+
}
144+
145+
if self._pools.isEmpty {
146+
self.state = .shutDown
147+
return .close(promise, unclean: soFarUnclean || unclean)
148+
} else {
149+
self.state = .shuttingDown(promise: promise, unclean: soFarUnclean || unclean)
150+
return .wait
151+
}
152+
}
153+
}
154+
155+
switch closeAction {
156+
case .close(let promise, unclean: let unclean):
157+
promise?.succeed(unclean)
158+
case .wait:
159+
break
160+
}
161+
}
162+
}
16163

17164
extension HTTPConnectionPool.Connection.ID {
18165
static var globalGenerator = Generator()

0 commit comments

Comments
 (0)