Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit be5e4bd

Browse files
committedJun 18, 2021
Started work on h2
1 parent 6150977 commit be5e4bd

11 files changed

+1078
-508
lines changed
 

‎Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift

+52
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,58 @@ extension HTTPConnectionPool {
4646
}
4747

4848
extension HTTPConnectionPool.ConnectionFactory {
49+
50+
func makeConnection(for pool: HTTPConnectionPool, connectionID: HTTPConnectionPool.Connection.ID, eventLoop: EventLoop, logger: Logger) {
51+
var logger = logger
52+
logger[metadataKey: "ahc-connection"] = "\(connectionID)"
53+
54+
let future: EventLoopFuture<(Channel, HTTPVersion)>
55+
56+
if self.key.scheme.isProxyable, let proxy = self.clientConfiguration.proxy {
57+
future = self.makeHTTPProxyChannel(proxy, connectionID: connectionID, eventLoop: eventLoop, logger: logger)
58+
} else {
59+
future = self.makeChannel(eventLoop: eventLoop, logger: logger)
60+
}
61+
62+
future.whenComplete { result in
63+
do {
64+
switch result {
65+
case .success(let (channel, .http1_0)), .success(let (channel, .http1_1)):
66+
let connection = try HTTP1Connection(
67+
channel: channel,
68+
connectionID: connectionID,
69+
configuration: self.clientConfiguration,
70+
delegate: pool,
71+
logger: logger
72+
)
73+
pool.http1ConnectionCreated(connection)
74+
case .success(let (channel, .http2)):
75+
let http2Connection = try HTTP2Connection(
76+
channel: channel,
77+
connectionID: connectionID,
78+
delegate: pool,
79+
logger: logger
80+
)
81+
82+
http2Connection.readyToAcceptConnectionsFuture.whenComplete { result in
83+
switch result {
84+
case .success:
85+
pool.http2ConnectionCreated(http2Connection)
86+
case .failure(let error):
87+
pool.failedToCreateHTTPConnection(connectionID, error: error)
88+
}
89+
}
90+
case .failure(let error):
91+
throw error
92+
default:
93+
preconditionFailure("Unexpected new http version")
94+
}
95+
} catch {
96+
pool.failedToCreateHTTPConnection(connectionID, error: error)
97+
}
98+
}
99+
}
100+
49101
func makeBestChannel(connectionID: HTTPConnectionPool.Connection.ID, eventLoop: EventLoop, logger: Logger) -> EventLoopFuture<(Channel, HTTPVersion)> {
50102
if self.key.scheme.isProxyable, let proxy = self.clientConfiguration.proxy {
51103
return self.makeHTTPProxyChannel(proxy, connectionID: connectionID, eventLoop: eventLoop, logger: logger)

‎Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Manager.swift

+152
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,159 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Logging
16+
import NIO
1517
import NIOConcurrencyHelpers
18+
import NIOHTTP1
19+
20+
protocol HTTPConnectionPoolManagerDelegate: AnyObject {
21+
func httpConnectionPoolManagerDidShutdown(_: HTTPConnectionPool.Manager, unclean: Bool)
22+
}
23+
24+
extension HTTPConnectionPool {
25+
final class Manager {
26+
private typealias Key = ConnectionPool.Key
27+
28+
private var _pools: [Key: HTTPConnectionPool] = [:]
29+
private let lock = Lock()
30+
31+
private let sslContextCache = SSLContextCache()
32+
33+
enum State {
34+
case active
35+
case shuttingDown(unclean: Bool)
36+
case shutDown
37+
}
38+
39+
let eventLoopGroup: EventLoopGroup
40+
let configuration: HTTPClient.Configuration
41+
let connectionIDGenerator = Connection.ID.globalGenerator
42+
let logger: Logger
43+
44+
/// A delegate to inform about the pools managers shutdown
45+
///
46+
/// NOTE: Normally we create retain cycles in SwiftNIO code that we break on shutdown. However we wan't to inform
47+
/// users that they must call `shutdown` on their AsyncHTTPClient. The best way to make them aware is with
48+
/// a `preconditionFailure` in the HTTPClient's `deinit`. If we create a retain cycle here, the
49+
/// `HTTPClient`'s `deinit` can never be reached. Instead the `HTTPClient` would leak.
50+
///
51+
/// The delegate is not thread safe at all. This only works if the HTTPClient sets itself as a delegate in its own
52+
/// init.
53+
weak var delegate: HTTPConnectionPoolManagerDelegate?
54+
55+
private var state: State = .active
56+
57+
init(eventLoopGroup: EventLoopGroup,
58+
configuration: HTTPClient.Configuration,
59+
backgroundActivityLogger logger: Logger) {
60+
self.eventLoopGroup = eventLoopGroup
61+
self.configuration = configuration
62+
self.logger = logger
63+
}
64+
65+
deinit {
66+
guard case .shutDown = self.state else {
67+
preconditionFailure("Manager must be shutdown before deinit")
68+
}
69+
}
70+
71+
func execute(request: HTTPRequestTask) {
72+
let key = Key(request.request)
73+
74+
let poolResult = self.lock.withLock { () -> Result<HTTPConnectionPool, HTTPClientError> in
75+
guard case .active = self.state else {
76+
return .failure(HTTPClientError.alreadyShutdown)
77+
}
78+
79+
if let pool = self._pools[key] {
80+
return .success(pool)
81+
}
82+
83+
let pool = HTTPConnectionPool(
84+
eventLoopGroup: self.eventLoopGroup,
85+
sslContextCache: self.sslContextCache,
86+
tlsConfiguration: request.request.tlsConfiguration,
87+
clientConfiguration: self.configuration,
88+
key: key,
89+
delegate: self,
90+
idGenerator: self.connectionIDGenerator,
91+
logger: self.logger
92+
)
93+
self._pools[key] = pool
94+
return .success(pool)
95+
}
96+
97+
switch poolResult {
98+
case .success(let pool):
99+
pool.execute(request: request)
100+
case .failure(let error):
101+
request.fail(error)
102+
}
103+
}
104+
105+
func shutdown() {
106+
let pools = self.lock.withLock { () -> [Key: HTTPConnectionPool] in
107+
guard case .active = self.state else {
108+
preconditionFailure("PoolManager already shutdown")
109+
}
110+
111+
// If there aren't any pools, we can mark the pool as shut down right away.
112+
if self._pools.isEmpty {
113+
self.state = .shutDown
114+
} else {
115+
self.state = .shuttingDown(unclean: false)
116+
}
117+
118+
return self._pools
119+
}
120+
121+
// if no pools are returned, the manager is already shutdown completely. Inform the
122+
// delegate. This is a very clean shutdown...
123+
if pools.isEmpty {
124+
self.delegate?.httpConnectionPoolManagerDidShutdown(self, unclean: false)
125+
return
126+
}
127+
128+
pools.values.forEach { pool in
129+
pool.shutdown()
130+
}
131+
}
132+
}
133+
}
134+
135+
extension HTTPConnectionPool.Manager: HTTPConnectionPoolDelegate {
136+
enum CloseAction {
137+
case close(unclean: Bool)
138+
case wait
139+
}
140+
141+
func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool) {
142+
let closeAction = self.lock.withLock { () -> CloseAction in
143+
guard case .shuttingDown(let soFarUnclean) = self.state else {
144+
preconditionFailure("Why are pools shutting down, if the manager did not give a signal")
145+
}
146+
147+
guard self._pools.removeValue(forKey: pool.key) === pool else {
148+
preconditionFailure("Expected that the pool was ")
149+
}
150+
151+
if self._pools.isEmpty {
152+
self.state = .shutDown
153+
return .close(unclean: soFarUnclean || unclean)
154+
} else {
155+
self.state = .shuttingDown(unclean: soFarUnclean || unclean)
156+
return .wait
157+
}
158+
}
159+
160+
switch closeAction {
161+
case .close(unclean: let unclean):
162+
self.delegate?.httpConnectionPoolManagerDidShutdown(self, unclean: unclean)
163+
case .wait:
164+
break
165+
}
166+
}
167+
}
16168

17169
extension HTTPConnectionPool.Connection.ID {
18170
static var globalGenerator = Generator()

‎Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

+336-2
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,23 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import Logging
1516
import NIO
17+
import NIOConcurrencyHelpers
18+
import NIOHTTP1
19+
import NIOSSL
20+
import NIOTLS
21+
import NIOTransportServices
22+
#if canImport(Network)
23+
import Network
24+
import Security
25+
#endif
1626

17-
enum HTTPConnectionPool {
18-
27+
protocol HTTPConnectionPoolDelegate {
28+
func connectionPoolDidShutdown(_ pool: HTTPConnectionPool, unclean: Bool)
29+
}
30+
31+
class HTTPConnectionPool {
1932
struct Connection: Equatable {
2033
typealias ID = Int
2134

@@ -145,6 +158,327 @@ enum HTTPConnectionPool {
145158
}
146159
}
147160

161+
let stateLock = Lock()
162+
private var _state: StateMachine {
163+
didSet {
164+
self.logger.trace("Connection Pool State changed", metadata: [
165+
"key": "\(self.key)",
166+
"state": "\(self._state)",
167+
])
168+
}
169+
}
170+
171+
let timerLock = Lock()
172+
private var _waiters = [Waiter.ID: Scheduled<Void>]()
173+
private var _timer = [Connection.ID: Scheduled<Void>]()
174+
175+
let key: ConnectionPool.Key
176+
var logger: Logger
177+
178+
let eventLoopGroup: EventLoopGroup
179+
let connectionFactory: ConnectionFactory
180+
let idleConnectionTimeout: TimeAmount
181+
182+
let delegate: HTTPConnectionPoolDelegate
183+
184+
init(eventLoopGroup: EventLoopGroup,
185+
sslContextCache: SSLContextCache,
186+
tlsConfiguration: TLSConfiguration?,
187+
clientConfiguration: HTTPClient.Configuration,
188+
key: ConnectionPool.Key,
189+
delegate: HTTPConnectionPoolDelegate,
190+
idGenerator: Connection.ID.Generator,
191+
logger: Logger) {
192+
self.eventLoopGroup = eventLoopGroup
193+
self.connectionFactory = ConnectionFactory(
194+
key: key,
195+
tlsConfiguration: tlsConfiguration,
196+
clientConfiguration: clientConfiguration,
197+
sslContextCache: sslContextCache
198+
)
199+
self.key = key
200+
self.delegate = delegate
201+
self.logger = logger
202+
203+
self.idleConnectionTimeout = clientConfiguration.connectionPool.idleTimeout
204+
205+
self._state = StateMachine(
206+
eventLoopGroup: eventLoopGroup,
207+
idGenerator: idGenerator,
208+
maximumConcurrentHTTP1Connections: 8
209+
)
210+
}
211+
212+
func execute(request: HTTPRequestTask) {
213+
let (eventLoop, required) = request.resolveEventLoop()
214+
215+
let action = self.stateLock.withLock { () -> StateMachine.Action in
216+
self._state.executeTask(request, onPreffered: eventLoop, required: required)
217+
}
218+
self.run(action: action)
219+
}
220+
221+
func shutdown() {
222+
let action = self.stateLock.withLock { () -> StateMachine.Action in
223+
self._state.shutdown()
224+
}
225+
self.run(action: action)
226+
}
227+
228+
func run(action: StateMachine.Action) {
229+
self.run(connectionAction: action.connection)
230+
self.run(taskAction: action.task)
231+
}
232+
233+
func run(connectionAction: StateMachine.ConnectionAction) {
234+
switch connectionAction {
235+
case .createConnection(let connectionID, let eventLoop):
236+
self.createConnection(connectionID, on: eventLoop)
237+
238+
case .scheduleTimeoutTimer(let connectionID):
239+
self.scheduleTimerForConnection(connectionID)
240+
241+
case .cancelTimeoutTimer(let connectionID):
242+
self.cancelTimerForConnection(connectionID)
243+
244+
case .replaceConnection(let oldConnection, with: let newConnectionID, on: let eventLoop):
245+
oldConnection.close()
246+
self.createConnection(newConnectionID, on: eventLoop)
247+
248+
case .closeConnection(let connection, isShutdown: let isShutdown):
249+
connection.close()
250+
251+
if case .yes(let unclean) = isShutdown {
252+
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
253+
}
254+
255+
case .cleanupConnection(let close, let cancel, isShutdown: let isShutdown):
256+
for connection in close {
257+
connection.close()
258+
}
259+
260+
for connection in cancel {
261+
connection.cancel()
262+
}
263+
264+
if case .yes(let unclean) = isShutdown {
265+
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
266+
}
267+
268+
case .none:
269+
break
270+
}
271+
}
272+
273+
func run(taskAction: StateMachine.TaskAction) {
274+
switch taskAction {
275+
case .executeTask(let request, let connection, let waiterID):
276+
connection.execute(request: request)
277+
if let waiterID = waiterID {
278+
self.cancelWaiterTimeout(waiterID)
279+
}
280+
281+
case .executeTasks(let requests, let connection):
282+
for (request, waiterID) in requests {
283+
connection.execute(request: request)
284+
if let waiterID = waiterID {
285+
self.cancelWaiterTimeout(waiterID)
286+
}
287+
}
288+
289+
case .failTask(let request, let error, cancelWaiter: let waiterID):
290+
request.fail(error)
291+
292+
if let waiterID = waiterID {
293+
self.cancelWaiterTimeout(waiterID)
294+
}
295+
296+
case .failTasks(let requests, let error):
297+
for (request, waiterID) in requests {
298+
request.fail(error)
299+
300+
if let waiterID = waiterID {
301+
self.cancelWaiterTimeout(waiterID)
302+
}
303+
}
304+
305+
case .scheduleWaiterTimeout(let waiterID, let task, on: let eventLoop):
306+
self.scheduleWaiterTimeout(waiterID, task, on: eventLoop)
307+
308+
case .cancelWaiterTimeout(let waiterID):
309+
self.cancelWaiterTimeout(waiterID)
310+
311+
case .none:
312+
break
313+
}
314+
}
315+
316+
// MARK: Run actions
317+
318+
func createConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) {
319+
self.connectionFactory.makeConnection(
320+
for: self,
321+
connectionID: connectionID,
322+
eventLoop: eventLoop,
323+
logger: self.logger
324+
)
325+
}
326+
327+
func scheduleWaiterTimeout(_ id: Waiter.ID, _ task: HTTPRequestTask, on eventLoop: EventLoop) {
328+
let deadline = task.connectionDeadline
329+
let scheduled = eventLoop.scheduleTask(deadline: deadline) {
330+
// The timer has fired. Now we need to do a couple of things:
331+
//
332+
// 1. Remove ourselfes from the timer dictionary to not leak any data. If our
333+
// waiter entry still exist, we need to tell the state machine, that we want
334+
// to fail the request.
335+
336+
let timeout = self.timerLock.withLock {
337+
self._waiters.removeValue(forKey: id) != nil
338+
}
339+
340+
// 2. If the entry did not exists anymore, we can assume that the request was
341+
// scheduled on another connection. The timer still fired anyhow because of a
342+
// race. In such a situation we don't need to do anything.
343+
guard timeout else { return }
344+
345+
// 3. Tell the state machine about the time
346+
let action = self.stateLock.withLock {
347+
self._state.waiterTimeout(id)
348+
}
349+
350+
self.run(action: action)
351+
}
352+
353+
self.timerLock.withLockVoid {
354+
precondition(self._waiters[id] == nil)
355+
self._waiters[id] = scheduled
356+
}
357+
358+
task.requestWasQueued(self)
359+
}
360+
361+
func cancelWaiterTimeout(_ id: Waiter.ID) {
362+
let scheduled = self.timerLock.withLock {
363+
self._waiters.removeValue(forKey: id)
364+
}
365+
366+
scheduled?.cancel()
367+
}
368+
369+
func scheduleTimerForConnection(_ connectionID: Connection.ID) {
370+
assert(self._timer[connectionID] == nil)
371+
372+
let scheduled = self.eventLoopGroup.next().scheduleTask(in: self.idleConnectionTimeout) {
373+
// there might be a race between a cancelTimer call and the triggering
374+
// of this scheduled task. both want to acquire the lock
375+
self.stateLock.withLockVoid {
376+
guard self._timer.removeValue(forKey: connectionID) != nil else {
377+
// a cancel method has potentially won
378+
return
379+
}
380+
381+
let action = self._state.connectionTimeout(connectionID)
382+
self.run(action: action)
383+
}
384+
}
385+
386+
self._timer[connectionID] = scheduled
387+
}
388+
389+
func cancelTimerForConnection(_ connectionID: Connection.ID) {
390+
guard let cancelTimer = self._timer.removeValue(forKey: connectionID) else {
391+
return
392+
}
393+
394+
cancelTimer.cancel()
395+
}
396+
}
397+
398+
extension HTTPConnectionPool {
399+
func http1ConnectionCreated(_ connection: HTTP1Connection) {
400+
let action = self.stateLock.withLock {
401+
self._state.newHTTP1ConnectionCreated(.http1_1(connection))
402+
}
403+
self.run(action: action)
404+
}
405+
406+
func http2ConnectionCreated(_ connection: HTTP2Connection) {
407+
let action = self.stateLock.withLock { () -> StateMachine.Action in
408+
if let settings = connection.settings {
409+
return self._state.newHTTP2ConnectionCreated(.http2(connection), settings: settings)
410+
} else {
411+
// immidiate connection closure before we can register with state machine
412+
// is the only reason we don't have settings
413+
struct ImmidiateConnectionClose: Error {}
414+
return self._state.failedToCreateNewConnection(ImmidiateConnectionClose(), connectionID: connection.id)
415+
}
416+
}
417+
self.run(action: action)
418+
}
419+
420+
func failedToCreateHTTPConnection(_ connectionID: Connection.ID, error: Error) {
421+
let action = self.stateLock.withLock {
422+
self._state.failedToCreateNewConnection(error, connectionID: connectionID)
423+
}
424+
self.run(action: action)
425+
}
426+
}
427+
428+
extension HTTPConnectionPool: HTTP1ConnectionDelegate {
429+
func http1ConnectionClosed(_ connection: HTTP1Connection) {
430+
let action = self.stateLock.withLock {
431+
self._state.connectionClosed(connection.id)
432+
}
433+
self.run(action: action)
434+
}
435+
436+
func http1ConnectionReleased(_ connection: HTTP1Connection) {
437+
let action = self.stateLock.withLock {
438+
self._state.http1ConnectionReleased(connection.id)
439+
}
440+
self.run(action: action)
441+
}
442+
}
443+
444+
extension HTTPConnectionPool: HTTP2ConnectionDelegate {
445+
func http2ConnectionClosed(_ connection: HTTP2Connection) {
446+
self.stateLock.withLock {
447+
let action = self._state.connectionClosed(connection.id)
448+
self.run(action: action)
449+
}
450+
}
451+
452+
func http2ConnectionStreamClosed(_ connection: HTTP2Connection, availableStreams: Int) {
453+
self.stateLock.withLock {
454+
let action = self._state.http2ConnectionStreamClosed(connection.id, availableStreams: availableStreams)
455+
self.run(action: action)
456+
}
457+
}
458+
}
459+
460+
extension HTTPConnectionPool: HTTP1RequestQueuer {
461+
func cancelRequest(task: HTTPRequestTask) {
462+
let waiterID = Waiter.ID(task)
463+
let action = self.stateLock.withLock {
464+
self._state.cancelWaiter(waiterID)
465+
}
466+
467+
self.run(action: action)
468+
}
469+
}
470+
471+
extension HTTPRequestTask {
472+
fileprivate func resolveEventLoop() -> (EventLoop, Bool) {
473+
switch self.eventLoopPreference.preference {
474+
case .indifferent:
475+
return (self.eventLoop, false)
476+
case .delegate(let el):
477+
return (el, false)
478+
case .delegateAndChannel(let el), .testOnly_exact(let el, _):
479+
return (el, true)
480+
}
481+
}
148482
}
149483

150484
struct EventLoopID: Hashable {

‎Sources/AsyncHTTPClient/HTTPClient.swift

+53-94
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class HTTPClient {
6666
public let eventLoopGroup: EventLoopGroup
6767
let eventLoopGroupProvider: EventLoopGroupProvider
6868
let configuration: Configuration
69-
let pool: ConnectionPool
69+
let poolManager: HTTPConnectionPool.Manager
7070
var state: State
7171
private let stateLock = Lock()
7272

@@ -108,14 +108,20 @@ public class HTTPClient {
108108
#endif
109109
}
110110
self.configuration = configuration
111-
self.pool = ConnectionPool(configuration: configuration,
112-
backgroundActivityLogger: backgroundActivityLogger)
111+
self.poolManager = HTTPConnectionPool.Manager(
112+
eventLoopGroup: self.eventLoopGroup,
113+
configuration: self.configuration,
114+
backgroundActivityLogger: backgroundActivityLogger
115+
)
113116
self.state = .upAndRunning
117+
118+
self.poolManager.delegate = self
114119
}
115120

116121
deinit {
117-
assert(self.pool.count == 0)
118-
assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
122+
guard case .shutDown = self.state else {
123+
preconditionFailure("Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.")
124+
}
119125
}
120126

121127
/// Shuts down the client and `EventLoopGroup` if it was created by the client.
@@ -189,36 +195,17 @@ public class HTTPClient {
189195
private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
190196
do {
191197
try self.stateLock.withLock {
192-
if self.state != .upAndRunning {
198+
guard case .upAndRunning = self.state else {
193199
throw HTTPClientError.alreadyShutdown
194200
}
195-
self.state = .shuttingDown
201+
self.state = .shuttingDown(requiresCleanClose: requiresCleanClose, callback: callback)
196202
}
197203
} catch {
198204
callback(error)
199205
return
200206
}
201207

202-
self.pool.close(on: self.eventLoopGroup.next()).whenComplete { result in
203-
var closeError: Error?
204-
switch result {
205-
case .failure(let error):
206-
closeError = error
207-
case .success(let cleanShutdown):
208-
if !cleanShutdown, requiresCleanClose {
209-
closeError = HTTPClientError.uncleanShutdown
210-
}
211-
212-
self.shutdownEventLoop(queue: queue) { eventLoopError in
213-
// we prioritise .uncleanShutdown here
214-
if let error = closeError {
215-
callback(error)
216-
} else {
217-
callback(eventLoopError)
218-
}
219-
}
220-
}
221-
}
208+
self.poolManager.shutdown()
222209
}
223210

224211
/// Execute `GET` request using specified URL.
@@ -490,7 +477,7 @@ public class HTTPClient {
490477
let taskEL: EventLoop
491478
switch eventLoopPreference.preference {
492479
case .indifferent:
493-
taskEL = self.pool.associatedEventLoop(for: ConnectionPool.Key(request)) ?? self.eventLoopGroup.next()
480+
taskEL = self.eventLoopGroup.next()
494481
case .delegate(on: let eventLoop):
495482
precondition(self.eventLoopGroup.makeIterator().contains { $0 === eventLoop }, "Provided EventLoop must be part of clients EventLoopGroup.")
496483
taskEL = eventLoop
@@ -538,77 +525,30 @@ public class HTTPClient {
538525
}
539526

540527
let task = Task<Delegate.Response>(eventLoop: taskEL, logger: logger)
541-
let setupComplete = taskEL.makePromise(of: Void.self)
542-
let connection = self.pool.getConnection(request,
543-
preference: eventLoopPreference,
544-
taskEventLoop: taskEL,
545-
deadline: deadline,
546-
setupComplete: setupComplete.futureResult,
547-
logger: logger)
548-
549-
let taskHandler = TaskHandler(task: task,
550-
kind: request.kind,
551-
delegate: delegate,
552-
redirectHandler: redirectHandler,
553-
ignoreUncleanSSLShutdown: self.configuration.ignoreUncleanSSLShutdown,
554-
logger: logger)
555-
556-
connection.flatMap { connection -> EventLoopFuture<Void> in
557-
logger.debug("got connection for request",
558-
metadata: ["ahc-connection": "\(connection)",
559-
"ahc-request": "\(request.method) \(request.url)",
560-
"ahc-channel-el": "\(connection.channel.eventLoop)",
561-
"ahc-task-el": "\(taskEL)"])
562-
563-
let channel = connection.channel
564-
565-
func prepareChannelForTask0() -> EventLoopFuture<Void> {
566-
do {
567-
let syncPipelineOperations = channel.pipeline.syncOperations
568-
569-
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
570-
try syncPipelineOperations.addHandler(IdleStateHandler(readTimeout: timeout))
571-
}
572-
573-
try syncPipelineOperations.addHandler(taskHandler)
574-
} catch {
575-
connection.release(closing: true, logger: logger)
576-
return channel.eventLoop.makeFailedFuture(error)
577-
}
578-
579-
task.setConnection(connection)
580528

581-
let isCancelled = task.lock.withLock {
582-
task.cancelled
583-
}
584-
585-
if !isCancelled {
586-
return channel.writeAndFlush(request).flatMapError { _ in
587-
// At this point the `TaskHandler` will already be present
588-
// to handle the failure and pass it to the `promise`
589-
channel.eventLoop.makeSucceededVoidFuture()
590-
}
591-
} else {
592-
return channel.eventLoop.makeSucceededVoidFuture()
593-
}
529+
let requestBag = RequestBag(
530+
request: request,
531+
eventLoopPreference: eventLoopPreference,
532+
task: task,
533+
redirectHandler: redirectHandler,
534+
connectionDeadline: .now() + (self.configuration.timeout.connect ?? .seconds(10)),
535+
idleReadTimeout: self.configuration.timeout.read,
536+
delegate: delegate
537+
)
538+
539+
var deadlineSchedule: Scheduled<Void>?
540+
if let deadline = deadline {
541+
deadlineSchedule = taskEL.scheduleTask(deadline: deadline) {
542+
requestBag.fail(HTTPClientError.deadlineExceeded)
594543
}
595544

596-
if channel.eventLoop.inEventLoop {
597-
return prepareChannelForTask0()
598-
} else {
599-
return channel.eventLoop.flatSubmit {
600-
return prepareChannelForTask0()
601-
}
602-
}
603-
}.always { _ in
604-
setupComplete.succeed(())
605-
}.whenFailure { error in
606-
taskHandler.callOutToDelegateFireAndForget { task in
607-
delegate.didReceiveError(task: task, error)
545+
task.promise.futureResult.whenComplete { _ in
546+
deadlineSchedule?.cancel()
608547
}
609-
task.promise.fail(error)
610548
}
611549

550+
self.poolManager.execute(request: requestBag)
551+
612552
return task
613553
}
614554

@@ -815,7 +755,7 @@ public class HTTPClient {
815755

816756
enum State {
817757
case upAndRunning
818-
case shuttingDown
758+
case shuttingDown(requiresCleanClose: Bool, callback: (Error?) -> Void)
819759
case shutDown
820760
}
821761
}
@@ -882,6 +822,22 @@ extension HTTPClient.Configuration {
882822
}
883823
}
884824

825+
extension HTTPClient: HTTPConnectionPoolManagerDelegate {
826+
func httpConnectionPoolManagerDidShutdown(_: HTTPConnectionPool.Manager, unclean: Bool) {
827+
let (callback, error) = self.stateLock.withLock { () -> ((Error?) -> Void, Error?) in
828+
guard case .shuttingDown(let requiresClean, callback: let callback) = self.state else {
829+
preconditionFailure("Why did the pool manager shut down, if it was not instructed to")
830+
}
831+
832+
self.state = .shutDown
833+
let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil
834+
return (callback, error)
835+
}
836+
837+
callback(error)
838+
}
839+
}
840+
885841
/// Possible client errors.
886842
public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
887843
private enum Code: Equatable {
@@ -909,6 +865,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
909865
case incompatibleHeaders
910866
case connectTimeout
911867
case getConnectionFromPoolTimeout
868+
case deadlineExceeded
912869
}
913870

914871
private var code: Code
@@ -973,4 +930,6 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
973930
/// - A connection could not be created within the timout period.
974931
/// - Tasks are not processed fast enough on the existing connections, to process all waiters in time
975932
public static let getConnectionFromPoolTimeout = HTTPClientError(code: .getConnectionFromPoolTimeout)
933+
/// The request deadline was exceeded. The request was cancelled because of this.
934+
public static let deadlineExceeded = HTTPClientError(code: .deadlineExceeded)
976935
}

‎Sources/AsyncHTTPClient/HTTPHandler.swift

+32-58
Original file line numberDiff line numberDiff line change
@@ -636,18 +636,33 @@ extension HTTPClient {
636636
public let eventLoop: EventLoop
637637

638638
let promise: EventLoopPromise<Response>
639-
var completion: EventLoopFuture<Void>
640-
var connection: Connection?
641-
var cancelled: Bool
642-
let lock: Lock
639+
640+
var connection: HTTPConnectionPool.Connection? {
641+
self.lock.withLock { self._connection }
642+
}
643+
644+
var isCancelled: Bool {
645+
self.lock.withLock { self._isCancelled }
646+
}
647+
648+
var taskDelegate: HTTPClientTaskDelegate? {
649+
get {
650+
self.lock.withLock { self._taskDelegate }
651+
}
652+
set {
653+
self.lock.withLock { self._taskDelegate = newValue }
654+
}
655+
}
656+
657+
private var _connection: HTTPConnectionPool.Connection?
658+
private var _isCancelled: Bool = false
659+
private var _taskDelegate: HTTPClientTaskDelegate?
660+
private let lock = Lock()
643661
let logger: Logger // We are okay to store the logger here because a Task is for only one request.
644662

645663
init(eventLoop: EventLoop, logger: Logger) {
646664
self.eventLoop = eventLoop
647665
self.promise = eventLoop.makePromise()
648-
self.completion = self.promise.futureResult.map { _ in }
649-
self.cancelled = false
650-
self.lock = Lock()
651666
self.logger = logger
652667
}
653668

@@ -672,69 +687,30 @@ extension HTTPClient {
672687

673688
/// Cancels the request execution.
674689
public func cancel() {
675-
let channel: Channel? = self.lock.withLock {
676-
if !self.cancelled {
677-
self.cancelled = true
678-
return self.connection?.channel
679-
} else {
680-
return nil
681-
}
690+
let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in
691+
self._isCancelled = true
692+
return self._taskDelegate
682693
}
683-
channel?.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
694+
695+
taskDelegate?.cancel()
684696
}
685697

686-
@discardableResult
687-
func setConnection(_ connection: Connection) -> Connection {
698+
func setConnection(_ connection: HTTPConnectionPool.Connection) {
688699
return self.lock.withLock {
689-
self.connection = connection
690-
if self.cancelled {
691-
connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
692-
}
693-
return connection
700+
self._connection = connection
694701
}
695702
}
696703

697704
func succeed<Delegate: HTTPClientResponseDelegate>(promise: EventLoopPromise<Response>?,
698705
with value: Response,
699706
delegateType: Delegate.Type,
700707
closing: Bool) {
701-
self.releaseAssociatedConnection(delegateType: delegateType,
702-
closing: closing).whenSuccess {
703-
promise?.succeed(value)
704-
}
708+
promise?.succeed(value)
705709
}
706710

707711
func fail<Delegate: HTTPClientResponseDelegate>(with error: Error,
708712
delegateType: Delegate.Type) {
709-
if let connection = self.connection {
710-
self.releaseAssociatedConnection(delegateType: delegateType, closing: true)
711-
.whenSuccess {
712-
self.promise.fail(error)
713-
connection.channel.close(promise: nil)
714-
}
715-
} else {
716-
// this is used in tests where we don't want to bootstrap the whole connection pool
717-
self.promise.fail(error)
718-
}
719-
}
720-
721-
func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type,
722-
closing: Bool) -> EventLoopFuture<Void> {
723-
if let connection = self.connection {
724-
// remove read timeout handler
725-
return connection.removeHandler(IdleStateHandler.self).flatMap {
726-
connection.removeHandler(TaskHandler<Delegate>.self)
727-
}.map {
728-
connection.release(closing: closing, logger: self.logger)
729-
}.flatMapError { error in
730-
fatalError("Couldn't remove taskHandler: \(error)")
731-
}
732-
} else {
733-
// TODO: This seems only reached in some internal unit test
734-
// Maybe there could be a better handling in the future to make
735-
// it an error outside of testing contexts
736-
return self.eventLoop.makeSucceededFuture(())
737-
}
713+
self.promise.fail(error)
738714
}
739715
}
740716
}
@@ -1071,9 +1047,7 @@ extension TaskHandler: ChannelDuplexHandler {
10711047
break
10721048
case .redirected(let head, let redirectURL):
10731049
self.state = .endOrError
1074-
self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess {
1075-
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
1076-
}
1050+
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
10771051
default:
10781052
self.state = .bufferedEnd
10791053
self.handleReadForDelegate(response, context: context)

‎Sources/AsyncHTTPClient/RequestBag.swift

+5-5
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate>: HTTPRequestTask {
8686
self.idleReadTimeout = idleReadTimeout
8787
self.delegate = delegate
8888

89-
// self.task.taskDelegate = self
90-
// self.task.futureResult.whenComplete { _ in
91-
// self.task.taskDelegate = nil
92-
// }
89+
self.task.taskDelegate = self
90+
self.task.futureResult.whenComplete { _ in
91+
self.task.taskDelegate = nil
92+
}
9393
}
9494

9595
func willBeExecutedOnConnection(_ connection: HTTPConnectionPool.Connection) {
96-
// self.task.setConnection(connection)
96+
self.task.setConnection(connection)
9797
}
9898

9999
func requestWasQueued(_ queuer: HTTP1RequestQueuer) {

‎Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift

+323-315
Large diffs are not rendered by default.

‎Tests/AsyncHTTPClientTests/HTTPClientNIOTSTests.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class HTTPClientNIOTSTests: XCTestCase {
8989
XCTAssertNoThrow(try httpBin.shutdown())
9090

9191
XCTAssertThrowsError(try httpClient.get(url: "https://localhost:\(port)/get").wait()) { error in
92-
XCTAssertEqual(.connectTimeout(.milliseconds(100)), error as? ChannelError)
92+
XCTAssertEqual(.connectTimeout, error as? HTTPClientError)
9393
}
9494
}
9595

‎Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+19-33
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ class HTTPClientTests: XCTestCase {
300300
XCTAssertEqual(.ok, response.status)
301301
}
302302

303-
func testGetHttpsWithIP() throws {
303+
func testGetHttpsWithIP() {
304304
let localHTTPBin = HTTPBin(ssl: true)
305305
let localClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
306306
configuration: HTTPClient.Configuration(certificateVerification: .none))
@@ -309,11 +309,12 @@ class HTTPClientTests: XCTestCase {
309309
XCTAssertNoThrow(try localHTTPBin.shutdown())
310310
}
311311

312-
let response = try localClient.get(url: "https://127.0.0.1:\(localHTTPBin.port)/get").wait()
313-
XCTAssertEqual(.ok, response.status)
312+
var response: HTTPClient.Response?
313+
XCTAssertNoThrow(response = try localClient.get(url: "https://127.0.0.1:\(localHTTPBin.port)/get").wait())
314+
XCTAssertEqual(response?.status, .ok)
314315
}
315316

316-
func testGetHTTPSWorksOnMTELGWithIP() throws {
317+
func testGetHTTPSWorksOnMTELGWithIP() {
317318
// Same test as above but this one will use NIO on Sockets even on Apple platforms, just to make sure
318319
// this works.
319320
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
@@ -328,8 +329,9 @@ class HTTPClientTests: XCTestCase {
328329
XCTAssertNoThrow(try localHTTPBin.shutdown())
329330
}
330331

331-
let response = try localClient.get(url: "https://127.0.0.1:\(localHTTPBin.port)/get").wait()
332-
XCTAssertEqual(.ok, response.status)
332+
var response: HTTPClient.Response?
333+
XCTAssertNoThrow(response = try localClient.get(url: "https://127.0.0.1:\(localHTTPBin.port)/get").wait())
334+
XCTAssertEqual(response?.status, .ok)
333335
}
334336

335337
func testPostHttps() throws {
@@ -572,13 +574,11 @@ class HTTPClientTests: XCTestCase {
572574
}
573575

574576
XCTAssertThrowsError(try localClient.get(url: self.defaultHTTPBinURLPrefix + "wait").wait(), "Should fail") { error in
575-
guard case let error = error as? HTTPClientError, error == .readTimeout else {
576-
return XCTFail("Should fail with readTimeout")
577-
}
577+
XCTAssertEqual(error as? HTTPClientError, .readTimeout)
578578
}
579579
}
580580

581-
func testConnectTimeout() throws {
581+
func testConnectTimeout() {
582582
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup),
583583
configuration: .init(timeout: .init(connect: .milliseconds(100), read: .milliseconds(150))))
584584

@@ -588,15 +588,13 @@ class HTTPClientTests: XCTestCase {
588588

589589
// This must throw as 198.51.100.254 is reserved for documentation only
590590
XCTAssertThrowsError(try httpClient.get(url: "http://198.51.100.254:65535/get").wait()) { error in
591-
XCTAssertEqual(.connectTimeout(.milliseconds(100)), error as? ChannelError)
591+
XCTAssertEqual(error as? HTTPClientError, .connectTimeout)
592592
}
593593
}
594594

595595
func testDeadline() throws {
596596
XCTAssertThrowsError(try self.defaultClient.get(url: self.defaultHTTPBinURLPrefix + "wait", deadline: .now() + .milliseconds(150)).wait(), "Should fail") { error in
597-
guard case let error = error as? HTTPClientError, error == .readTimeout else {
598-
return XCTFail("Should fail with readTimeout")
599-
}
597+
XCTAssertEqual(error as? HTTPClientError, .deadlineExceeded)
600598
}
601599
}
602600

@@ -610,9 +608,7 @@ class HTTPClientTests: XCTestCase {
610608
}
611609

612610
XCTAssertThrowsError(try task.wait(), "Should fail") { error in
613-
guard case let error = error as? HTTPClientError, error == .cancelled else {
614-
return XCTFail("Should fail with cancelled")
615-
}
611+
XCTAssertEqual(error as? HTTPClientError, .cancelled)
616612
}
617613
}
618614

@@ -1839,18 +1835,8 @@ class HTTPClientTests: XCTestCase {
18391835
XCTAssertNoThrow(try localClient.syncShutdown())
18401836
}
18411837

1842-
XCTAssertThrowsError(try localClient.get(url: "http://localhost:\(port)").wait()) { error in
1843-
if isTestingNIOTS() {
1844-
guard case ChannelError.connectTimeout = error else {
1845-
XCTFail("Unexpected error: \(error)")
1846-
return
1847-
}
1848-
} else {
1849-
guard error is NIOConnectionError else {
1850-
XCTFail("Unexpected error: \(error)")
1851-
return
1852-
}
1853-
}
1838+
XCTAssertThrowsError(try localClient.get(url: "http://localhost:\(port)").wait()) {
1839+
XCTAssertEqual($0 as? HTTPClientError, .connectTimeout)
18541840
}
18551841
}
18561842

@@ -2506,8 +2492,8 @@ class HTTPClientTests: XCTestCase {
25062492
let delegate = TestDelegate()
25072493

25082494
XCTAssertThrowsError(try httpClient.execute(request: request, delegate: delegate).wait()) { error in
2509-
XCTAssertEqual(.connectTimeout(.milliseconds(10)), error as? ChannelError)
2510-
XCTAssertEqual(.connectTimeout(.milliseconds(10)), delegate.error as? ChannelError)
2495+
XCTAssertEqual(.connectTimeout, error as? HTTPClientError)
2496+
XCTAssertEqual(.connectTimeout, delegate.error as? HTTPClientError)
25112497
}
25122498
}
25132499

@@ -2728,7 +2714,7 @@ class HTTPClientTests: XCTestCase {
27282714

27292715
XCTAssertThrowsError(try task.wait()) { error in
27302716
if isTestingNIOTS() {
2731-
XCTAssertEqual(error as? ChannelError, .connectTimeout(.milliseconds(100)))
2717+
XCTAssertEqual(error as? HTTPClientError, .connectTimeout)
27322718
} else {
27332719
switch error as? NIOSSLError {
27342720
case .some(.handshakeFailed(.sslError(_))): break
@@ -2775,7 +2761,7 @@ class HTTPClientTests: XCTestCase {
27752761

27762762
XCTAssertThrowsError(try task.wait()) { error in
27772763
if isTestingNIOTS() {
2778-
XCTAssertEqual(error as? ChannelError, .connectTimeout(.milliseconds(200)))
2764+
XCTAssertEqual(error as? HTTPClientError, .connectTimeout)
27792765
} else {
27802766
switch error as? NIOSSLError {
27812767
case .some(.handshakeFailed(.sslError(_))): break
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AsyncHTTPClient
16+
import Logging
17+
import NIO
18+
import NIOHTTP1
19+
import XCTest
20+
21+
class HTTPRequestTaskTests: XCTestCase {
22+
func testWrapperOfAwesomeness() {
23+
// let elg = MultiThreadedEventLoopGroup(numberOfThreads: 2)
24+
// let channelEventLoop = elg.next()
25+
// let delegateEventLoop = elg.next()
26+
// let logger = Logger(label: "test")
27+
//
28+
// let request = try! HTTPClient.Request(
29+
// url: "http://localhost/",
30+
// method: .POST, headers: HTTPHeaders([("content-length", "4")]),
31+
// body: .stream({ writer -> EventLoopFuture<Void> in
32+
// func recursive(count: UInt8, promise: EventLoopPromise<Void>) {
33+
// writer.write(.byteBuffer(ByteBuffer(bytes: [count]))).whenComplete { result in
34+
// switch result {
35+
// case .failure(let error):
36+
// XCTFail("Unexpected error: \(error)")
37+
// case .success:
38+
// guard count < 4 else {
39+
// return promise.succeed(())
40+
// }
41+
// recursive(count: count + 1, promise: promise)
42+
// }
43+
// }
44+
// }
45+
//
46+
// let promise = channelEventLoop.makePromise(of: Void.self)
47+
// recursive(count: 0, promise: promise)
48+
// return promise.futureResult
49+
// }))
50+
//
51+
//
52+
// let task = HTTPClient.Task<HTTPClient.Response>(eventLoop: channelEventLoop, logger: logger)
53+
//
54+
// let wrapper = RequestBag(
55+
// request: request,
56+
// eventLoopPreference: .delegate(on: delegateEventLoop),
57+
// task: task,
58+
// connectionDeadline: .now() + .seconds(60),
59+
// delegate: MockRequestDelegate())
60+
//
61+
// XCTAssertNoThrow(try task.futureResult.wait())
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import AsyncHTTPClient
16+
import NIO
17+
import NIOHTTP1
18+
import XCTest
19+
20+
class MockRequestDelegate: HTTPClientResponseDelegate {
21+
typealias Response = HTTPClient.Response
22+
23+
private(set) var didSendRequestHeadCount = 0
24+
private(set) var didSendRequestPartCount = 0
25+
private(set) var didSendRequestCount = 0
26+
27+
func didSendRequestHead(task: HTTPClient.Task<HTTPClient.Response>, _ head: HTTPRequestHead) {
28+
self.didSendRequestHeadCount += 1
29+
}
30+
31+
func didSendRequestPart(task: HTTPClient.Task<HTTPClient.Response>, _ part: IOData) {
32+
self.didSendRequestPartCount += 1
33+
}
34+
35+
func didSendRequest(task: HTTPClient.Task<HTTPClient.Response>) {
36+
self.didSendRequestCount += 1
37+
}
38+
39+
func didFinishRequest(task: HTTPClient.Task<HTTPClient.Response>) throws -> HTTPClient.Response {
40+
HTTPClient.Response(host: "localhost", status: .ok, version: .http1_1, headers: .init(), body: nil)
41+
}
42+
}

0 commit comments

Comments
 (0)
Please sign in to comment.