Skip to content

refactor pool #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
18a5bd6
refactor pool
artemredkin Apr 3, 2020
8da635a
revert test
artemredkin Apr 3, 2020
110dc2a
review fixes - move callouts out of locks
artemredkin Apr 3, 2020
6c51cc6
restore some asserts
artemredkin Apr 3, 2020
d5830a8
remove duplicate test
artemredkin Apr 3, 2020
4b48322
Merge branch 'master' into refactor_pool
artemredkin Apr 6, 2020
5bc7c36
add empty provider removal
artemredkin Apr 16, 2020
de8a139
Merge branch 'refactor_pool' of github.com:swift-server/async-http-cl…
artemredkin Apr 16, 2020
8188902
fix warning and a race
artemredkin Apr 16, 2020
2be80a2
Merge branch 'master' into refactor_pool
artemredkin Apr 16, 2020
addd0cd
Merge branch 'refactor_pool' of github.com:swift-server/async-http-cl…
artemredkin Apr 16, 2020
1bf32d6
review fix: add missing lock
artemredkin Apr 26, 2020
e77fdb9
add waiter for provider deletion
artemredkin Apr 26, 2020
ef484e3
make provider close more robust
artemredkin Apr 26, 2020
f58c791
Merge branch 'master' into refactor_pool
artemredkin Apr 26, 2020
7663dee
small optimization
artemredkin Apr 26, 2020
c19705d
unoptimize
artemredkin Apr 26, 2020
4e3e884
refactor pool and add tests
artemredkin May 3, 2020
c4e3987
fix formatting and update linuxmain
artemredkin May 5, 2020
8b6d6d0
make state fields private
artemredkin May 5, 2020
54704e4
remove all state from connection to provider state
artemredkin May 12, 2020
e6ff088
generate tests and fix format
artemredkin May 12, 2020
7655bf5
remove temp code
artemredkin May 12, 2020
f4bfb33
add debug output for failing test
artemredkin May 13, 2020
68d1f97
fix formatting
artemredkin May 13, 2020
5a46a2a
add test debug to test it
artemredkin May 13, 2020
2436d33
even more debug logging
artemredkin May 13, 2020
000e3ad
execute post-connect actions on channel event loop and remove debug o…
artemredkin May 14, 2020
e6f58c6
Merge branch 'master' into refactor_pool
artemredkin May 14, 2020
13fbd6d
make http compressor part of the static channel setup
artemredkin May 14, 2020
30c5936
properly wait on channel setup complete
artemredkin May 15, 2020
66d641a
move setup success to after request sent
artemredkin May 15, 2020
4a9866b
fix test http server crash
artemredkin May 15, 2020
b8c51a8
review fixes
artemredkin May 15, 2020
aa2d279
Merge branch 'master' into refactor_pool
artemredkin May 15, 2020
551c1f8
review fixes
artemredkin May 18, 2020
e8fb614
review fix
artemredkin May 18, 2020
554b71c
fix compilation
artemredkin May 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,200 changes: 685 additions & 515 deletions Sources/AsyncHTTPClient/ConnectionPool.swift

Large diffs are not rendered by default.

96 changes: 34 additions & 62 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class HTTPClient {
let configuration: Configuration
let pool: ConnectionPool
var state: State
private var tasks = [UUID: TaskProtocol]()
private let stateLock = Lock()

/// Create an `HTTPClient` with specified `EventLoopGroup` provider and configuration.
Expand Down Expand Up @@ -136,14 +135,6 @@ public class HTTPClient {
self.shutdown(requiresCleanClose: false, queue: queue, callback)
}

private func cancelTasks(_ tasks: Dictionary<UUID, TaskProtocol>.Values) -> EventLoopFuture<Void> {
for task in tasks {
task.cancel()
}

return EventLoopFuture.andAllComplete(tasks.map { $0.completion }, on: self.eventLoopGroup.next())
}

private func shutdownEventLoop(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
self.stateLock.withLock {
switch self.eventLoopGroupProvider {
Expand All @@ -163,37 +154,34 @@ public class HTTPClient {
}

private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
let result: Result<Dictionary<UUID, TaskProtocol>.Values, Error> = self.stateLock.withLock {
if self.state != .upAndRunning {
return .failure(HTTPClientError.alreadyShutdown)
} else {
do {
try self.stateLock.withLock {
if self.state != .upAndRunning {
throw HTTPClientError.alreadyShutdown
}
self.state = .shuttingDown
return .success(self.tasks.values)
}
} catch {
callback(error)
return
}

switch result {
case .failure(let error):
callback(error)
case .success(let tasks):
self.pool.prepareForClose(on: self.eventLoopGroup.next()).whenComplete { _ in
var closeError: Error?
if !tasks.isEmpty, requiresCleanClose {
self.pool.close(on: self.eventLoopGroup.next()).whenComplete { result in
var closeError: Error?
switch result {
case .failure(let error):
closeError = error
case .success(let cleanShutdown):
if !cleanShutdown, requiresCleanClose {
closeError = HTTPClientError.uncleanShutdown
}

// we ignore errors here
self.cancelTasks(tasks).whenComplete { _ in
// we ignore errors here
self.pool.close(on: self.eventLoopGroup.next()).whenComplete { _ in
self.shutdownEventLoop(queue: queue) { eventLoopError in
// we prioritise .uncleanShutdown here
if let error = closeError {
callback(error)
} else {
callback(eventLoopError)
}
}
self.shutdownEventLoop(queue: queue) { eventLoopError in
// we prioritise .uncleanShutdown here
if let error = closeError {
callback(error)
} else {
callback(eventLoopError)
}
}
}
Expand Down Expand Up @@ -361,38 +349,20 @@ public class HTTPClient {
redirectHandler = nil
}

let task = Task<Delegate.Response>(eventLoop: taskEL, poolingTimeout: self.configuration.maximumAllowedIdleTimeInConnectionPool)
self.stateLock.withLock {
self.tasks[task.id] = task
}
let promise = task.promise

promise.futureResult.whenComplete { _ in
self.stateLock.withLock {
self.tasks[task.id] = nil
}
}

let connection = self.pool.getConnection(for: request, preference: eventLoopPreference, on: taskEL, deadline: deadline)
let task = Task<Delegate.Response>(eventLoop: taskEL)
let setupComplete = taskEL.makePromise(of: Void.self)
let connection = self.pool.getConnection(for: request, preference: eventLoopPreference, on: taskEL, deadline: deadline, setupComplete: setupComplete.futureResult)

connection.flatMap { connection -> EventLoopFuture<Void> in
let channel = connection.channel
let addedFuture: EventLoopFuture<Void>
switch self.configuration.decompression {
case .disabled:
addedFuture = channel.eventLoop.makeSucceededFuture(())
case .enabled(let limit):
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
addedFuture = channel.pipeline.addHandler(decompressHandler)
let future: EventLoopFuture<Void>
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
future = channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout))
} else {
future = channel.eventLoop.makeSucceededFuture(())
}

return addedFuture.flatMap {
if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) {
return channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout))
} else {
return channel.eventLoop.makeSucceededFuture(())
}
}.flatMap {
return future.flatMap {
let taskHandler = TaskHandler(task: task,
kind: request.kind,
delegate: delegate,
Expand All @@ -416,10 +386,12 @@ public class HTTPClient {
return channel.eventLoop.makeSucceededFuture(())
}
}.flatMapError { error in
connection.release()
connection.release(closing: true)
return channel.eventLoop.makeFailedFuture(error)
}
}.cascadeFailure(to: promise)
}.always { _ in
setupComplete.succeed(())
}.cascadeFailure(to: task.promise)

return task
}
Expand Down
98 changes: 28 additions & 70 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -485,25 +485,22 @@ extension URL {
extension HTTPClient {
/// Response execution context. Will be created by the library and could be used for obtaining
/// `EventLoopFuture<Response>` of the execution or cancellation of the execution.
public final class Task<Response>: TaskProtocol {
public final class Task<Response> {
/// The `EventLoop` the delegate will be executed on.
public let eventLoop: EventLoop

let promise: EventLoopPromise<Response>
var completion: EventLoopFuture<Void>
var connection: ConnectionPool.Connection?
var connection: Connection?
var cancelled: Bool
let lock: Lock
let id = UUID()
let poolingTimeout: TimeAmount?

init(eventLoop: EventLoop, poolingTimeout: TimeAmount? = nil) {
init(eventLoop: EventLoop) {
self.eventLoop = eventLoop
self.promise = eventLoop.makePromise()
self.completion = self.promise.futureResult.map { _ in }
self.cancelled = false
self.lock = Lock()
self.poolingTimeout = poolingTimeout
}

static func failedTask(eventLoop: EventLoop, error: Error) -> Task<Response> {
Expand All @@ -528,8 +525,8 @@ extension HTTPClient {
/// Cancels the request execution.
public func cancel() {
let channel: Channel? = self.lock.withLock {
if !cancelled {
cancelled = true
if !self.cancelled {
self.cancelled = true
return self.connection?.channel
} else {
return nil
Expand All @@ -539,7 +536,7 @@ extension HTTPClient {
}

@discardableResult
func setConnection(_ connection: ConnectionPool.Connection) -> ConnectionPool.Connection {
func setConnection(_ connection: Connection) -> Connection {
return self.lock.withLock {
self.connection = connection
if self.cancelled {
Expand All @@ -549,47 +546,32 @@ extension HTTPClient {
}
}

func succeed<Delegate: HTTPClientResponseDelegate>(promise: EventLoopPromise<Response>?, with value: Response, delegateType: Delegate.Type) {
self.releaseAssociatedConnection(delegateType: delegateType).whenSuccess {
func succeed<Delegate: HTTPClientResponseDelegate>(promise: EventLoopPromise<Response>?, with value: Response, delegateType: Delegate.Type, closing: Bool) {
self.releaseAssociatedConnection(delegateType: delegateType, closing: closing).whenSuccess {
promise?.succeed(value)
}
}

func fail<Delegate: HTTPClientResponseDelegate>(with error: Error, delegateType: Delegate.Type) {
if let connection = self.connection {
connection.close().whenComplete { _ in
self.releaseAssociatedConnection(delegateType: delegateType).whenComplete { _ in
connection.channel.close(promise: nil)
self.releaseAssociatedConnection(delegateType: delegateType, closing: true)
.whenSuccess {
self.promise.fail(error)
}
}
}
}

func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type) -> EventLoopFuture<Void> {
func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type, closing: Bool) -> EventLoopFuture<Void> {
if let connection = self.connection {
return connection.removeHandler(NIOHTTPResponseDecompressor.self).flatMap {
connection.removeHandler(IdleStateHandler.self)
}.flatMap {
// remove read timeout handler
return connection.removeHandler(IdleStateHandler.self).flatMap {
connection.removeHandler(TaskHandler<Delegate>.self)
}.flatMap {
let idlePoolConnectionHandler = IdlePoolConnectionHandler()
return connection.channel.pipeline.addHandler(idlePoolConnectionHandler, position: .last).flatMap {
connection.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: self.poolingTimeout), position: .before(idlePoolConnectionHandler))
}
}.flatMapError { error in
if let error = error as? ChannelError, error == .ioOnClosedChannel {
// We may get this error if channel is released because it is
// closed, it is safe to ignore it
return connection.channel.eventLoop.makeSucceededFuture(())
} else {
return connection.channel.eventLoop.makeFailedFuture(error)
}
}.map {
connection.release()
connection.release(closing: closing)
}.flatMapError { error in
fatalError("Couldn't remove taskHandler: \(error)")
}

} else {
// TODO: This seems only reached in some internal unit test
// Maybe there could be a better handling in the future to make
Expand All @@ -602,12 +584,6 @@ extension HTTPClient {

internal struct TaskCancelEvent {}

internal protocol TaskProtocol {
func cancel()
var id: UUID { get }
var completion: EventLoopFuture<Void> { get }
}

// MARK: - TaskHandler

internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChannelHandler {
Expand All @@ -628,6 +604,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
var state: State = .idle
var pendingRead = false
var mayRead = true
var closing = false
let kind: HTTPClient.Request.Kind

init(task: HTTPClient.Task<Delegate.Response>,
Expand Down Expand Up @@ -695,7 +672,7 @@ extension TaskHandler {
do {
let result = try body(self.task)

self.task.succeed(promise: promise, with: result, delegateType: Delegate.self)
self.task.succeed(promise: promise, with: result, delegateType: Delegate.self, closing: self.closing)
} catch {
self.task.fail(with: error, delegateType: Delegate.self)
}
Expand Down Expand Up @@ -727,7 +704,7 @@ extension TaskHandler: ChannelDuplexHandler {

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
self.state = .idle
let request = unwrapOutboundIn(data)
let request = self.unwrapOutboundIn(data)

let uri: String
switch (self.kind, request.url.baseURL) {
Expand Down Expand Up @@ -772,8 +749,13 @@ extension TaskHandler: ChannelDuplexHandler {
self.callOutToDelegateFireAndForget(self.delegate.didSendRequest)
}.flatMapErrorThrowing { error in
context.eventLoop.assertInEventLoop()
self.state = .end
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
switch self.state {
case .end:
break
default:
self.state = .end
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
}
throw error
}.cascade(to: promise)
}
Expand Down Expand Up @@ -805,16 +787,10 @@ extension TaskHandler: ChannelDuplexHandler {
switch response {
case .head(let head):
if !head.isKeepAlive {
self.task.lock.withLock {
if let connection = self.task.connection {
connection.isClosing = true
} else {
preconditionFailure("There should always be a connection at this point")
}
}
self.closing = true
}

if let redirectURL = redirectHandler?.redirectTarget(status: head.status, headers: head.headers) {
if let redirectURL = self.redirectHandler?.redirectTarget(status: head.status, headers: head.headers) {
self.state = .redirected(head, redirectURL)
} else {
self.state = .head
Expand All @@ -840,7 +816,7 @@ extension TaskHandler: ChannelDuplexHandler {
switch self.state {
case .redirected(let head, let redirectURL):
self.state = .end
self.task.releaseAssociatedConnection(delegateType: Delegate.self).whenSuccess {
self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess {
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
}
default:
Expand Down Expand Up @@ -1021,21 +997,3 @@ internal struct RedirectHandler<ResponseType> {
}
}
}

class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = NIOAny

let _hasNotSentClose: NIOAtomic<Bool> = .makeAtomic(value: true)
var hasNotSentClose: Bool {
return self._hasNotSentClose.load()
}

func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write {
self._hasNotSentClose.store(false)
context.close(promise: nil)
} else {
context.fireUserInboundEventTriggered(event)
}
}
}
24 changes: 1 addition & 23 deletions Sources/AsyncHTTPClient/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -135,29 +135,7 @@ extension NIOClientTCPBootstrap {
}
}

extension CircularBuffer {
@discardableResult
mutating func swapWithFirstAndRemove(at index: Index) -> Element? {
precondition(index >= self.startIndex && index < self.endIndex)
if !self.isEmpty {
self.swapAt(self.startIndex, index)
return self.removeFirst()
} else {
return nil
}
}

@discardableResult
mutating func swapWithFirstAndRemove(where predicate: (Element) throws -> Bool) rethrows -> Element? {
if let existingIndex = try self.firstIndex(where: predicate) {
return self.swapWithFirstAndRemove(at: existingIndex)
} else {
return nil
}
}
}

extension ConnectionPool.Connection {
extension Connection {
func removeHandler<Handler: RemovableChannelHandler>(_ type: Handler.Type) -> EventLoopFuture<Void> {
return self.channel.pipeline.handler(type: type).flatMap { handler in
self.channel.pipeline.removeHandler(handler)
Expand Down
Loading