Skip to content

refactor pool #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 38 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
18a5bd6
refactor pool
artemredkin Apr 3, 2020
8da635a
revert test
artemredkin Apr 3, 2020
110dc2a
review fixes - move callouts out of locks
artemredkin Apr 3, 2020
6c51cc6
restore some asserts
artemredkin Apr 3, 2020
d5830a8
remove duplicate test
artemredkin Apr 3, 2020
4b48322
Merge branch 'master' into refactor_pool
artemredkin Apr 6, 2020
5bc7c36
add empty provider removal
artemredkin Apr 16, 2020
de8a139
Merge branch 'refactor_pool' of github.com:swift-server/async-http-cl…
artemredkin Apr 16, 2020
8188902
fix warning and a race
artemredkin Apr 16, 2020
2be80a2
Merge branch 'master' into refactor_pool
artemredkin Apr 16, 2020
addd0cd
Merge branch 'refactor_pool' of github.com:swift-server/async-http-cl…
artemredkin Apr 16, 2020
1bf32d6
review fix: add missing lock
artemredkin Apr 26, 2020
e77fdb9
add waiter for provider deletion
artemredkin Apr 26, 2020
ef484e3
make provider close more robust
artemredkin Apr 26, 2020
f58c791
Merge branch 'master' into refactor_pool
artemredkin Apr 26, 2020
7663dee
small optimization
artemredkin Apr 26, 2020
c19705d
unoptimize
artemredkin Apr 26, 2020
4e3e884
refactor pool and add tests
artemredkin May 3, 2020
c4e3987
fix formatting and update linuxmain
artemredkin May 5, 2020
8b6d6d0
make state fields private
artemredkin May 5, 2020
54704e4
remove all state from connection to provider state
artemredkin May 12, 2020
e6ff088
generate tests and fix format
artemredkin May 12, 2020
7655bf5
remove temp code
artemredkin May 12, 2020
f4bfb33
add debug output for failing test
artemredkin May 13, 2020
68d1f97
fix formatting
artemredkin May 13, 2020
5a46a2a
add test debug to test it
artemredkin May 13, 2020
2436d33
even more debug logging
artemredkin May 13, 2020
000e3ad
execute post-connect actions on channel event loop and remove debug o…
artemredkin May 14, 2020
e6f58c6
Merge branch 'master' into refactor_pool
artemredkin May 14, 2020
13fbd6d
make http compressor part of the static channel setup
artemredkin May 14, 2020
30c5936
properly wait on channel setup complete
artemredkin May 15, 2020
66d641a
move setup success to after request sent
artemredkin May 15, 2020
4a9866b
fix test http server crash
artemredkin May 15, 2020
b8c51a8
review fixes
artemredkin May 15, 2020
aa2d279
Merge branch 'master' into refactor_pool
artemredkin May 15, 2020
551c1f8
review fixes
artemredkin May 18, 2020
e8fb614
review fix
artemredkin May 18, 2020
554b71c
fix compilation
artemredkin May 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
914 changes: 370 additions & 544 deletions Sources/AsyncHTTPClient/ConnectionPool.swift

Large diffs are not rendered by default.

335 changes: 335 additions & 0 deletions Sources/AsyncHTTPClient/ConnectionsState.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2019-2020 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO

extension HTTP1ConnectionProvider {
enum Action {
case lease(Connection, Waiter)
case create(Waiter)
case replace(Connection, Waiter)
case closeProvider
case park(Connection)
case none
case fail(Waiter, Error)
case cancel(Connection, Bool)
indirect case closeAnd(Connection, Action)
indirect case parkAnd(Connection, Action)
}

struct ConnectionsState {
enum State {
case active
case closed
}

struct Snapshot {
var state: State
var availableConnections: CircularBuffer<Connection>
var leasedConnections: Set<ConnectionKey>
var waiters: CircularBuffer<Waiter>
var openedConnectionsCount: Int
var pending: Int
}

let maximumConcurrentConnections: Int
let eventLoop: EventLoop

private var state: State = .active

/// Opened connections that are available.
private var availableConnections: CircularBuffer<Connection> = .init(initialCapacity: 8)

/// Opened connections that are leased to the user.
private var leasedConnections: Set<ConnectionKey> = .init()

/// Consumers that weren't able to get a new connection without exceeding
/// `maximumConcurrentConnections` get a `Future<Connection>`
/// whose associated promise is stored in `Waiter`. The promise is completed
/// as soon as possible by the provider, in FIFO order.
private var waiters: CircularBuffer<Waiter> = .init(initialCapacity: 8)

/// Number of opened or opening connections, used to keep track of all connections and enforcing `maximumConcurrentConnections` limit.
private var openedConnectionsCount: Int = 0

/// Number of enqueued requests, used to track if it is safe to delete the provider.
private var pending: Int = 1

init(maximumConcurrentConnections: Int = 8, eventLoop: EventLoop) {
self.maximumConcurrentConnections = maximumConcurrentConnections
self.eventLoop = eventLoop
}

func testsOnly_getInternalState() -> Snapshot {
return Snapshot(state: self.state, availableConnections: self.availableConnections, leasedConnections: self.leasedConnections, waiters: self.waiters, openedConnectionsCount: self.openedConnectionsCount, pending: self.pending)
}

mutating func testsOnly_setInternalState(_ snapshot: Snapshot) {
self.state = snapshot.state
self.availableConnections = snapshot.availableConnections
self.leasedConnections = snapshot.leasedConnections
self.waiters = snapshot.waiters
self.openedConnectionsCount = snapshot.openedConnectionsCount
self.pending = snapshot.pending
}

func assertInvariants() {
assert(self.waiters.isEmpty)
assert(self.availableConnections.isEmpty)
assert(self.leasedConnections.isEmpty)
assert(self.openedConnectionsCount == 0)
assert(self.pending == 0)
}

mutating func enqueue() -> Bool {
switch self.state {
case .active:
self.pending += 1
return true
case .closed:
return false
}
}

private var hasCapacity: Bool {
return self.openedConnectionsCount < self.maximumConcurrentConnections
}

private var isEmpty: Bool {
return self.openedConnectionsCount == 0 && self.pending == 0
}

mutating func acquire(waiter: Waiter) -> Action {
switch self.state {
case .active:
self.pending -= 1

let (eventLoop, required) = self.resolvePreference(waiter.preference)
if required {
// If there is an opened connection on the same EL - use it
if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) {
let connection = self.availableConnections.remove(at: found)
self.leasedConnections.insert(ConnectionKey(connection))
return .lease(connection, waiter)
}

// If we can create additional connection, create
if self.hasCapacity {
self.openedConnectionsCount += 1
return .create(waiter)
}

// If we cannot create additional connection, but there is one in the pool, replace it
if let connection = self.availableConnections.popFirst() {
return .replace(connection, waiter)
}

self.waiters.append(waiter)
return .none
} else if let connection = self.availableConnections.popFirst() {
self.leasedConnections.insert(ConnectionKey(connection))
return .lease(connection, waiter)
} else if self.hasCapacity {
self.openedConnectionsCount += 1
return .create(waiter)
} else {
self.waiters.append(waiter)
return .none
}
case .closed:
return .fail(waiter, ProviderClosedError())
}
}

mutating func release(connection: Connection, closing: Bool) -> Action {
switch self.state {
case .active:
assert(self.leasedConnections.contains(ConnectionKey(connection)))

if connection.isActiveEstimation, !closing { // If connection is alive, we can offer it to a next waiter
if let waiter = self.waiters.popFirst() {
let (eventLoop, required) = self.resolvePreference(waiter.preference)

// If returned connection is on same EL or we do not require special EL - lease it
if connection.channel.eventLoop === eventLoop || !required {
return .lease(connection, waiter)
}

// If there is an opened connection on the same loop, lease it and park returned
if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) {
self.leasedConnections.remove(ConnectionKey(connection))
let replacement = self.availableConnections.swap(at: found, with: connection)
self.leasedConnections.insert(ConnectionKey(replacement))
return .parkAnd(connection, .lease(replacement, waiter))
}

// If we can create new connection - do it
if self.hasCapacity {
self.leasedConnections.remove(ConnectionKey(connection))
self.availableConnections.append(connection)
self.openedConnectionsCount += 1
return .parkAnd(connection, .create(waiter))
}

// If we cannot create new connections, we will have to replace returned connection with a new one on the required loop
return .replace(connection, waiter)
} else { // or park, if there are no waiters
self.leasedConnections.remove(ConnectionKey(connection))
self.availableConnections.append(connection)
return .park(connection)
}
} else { // if connection is not alive, we delete it and process the next waiter
// this connections is now gone, we will either create new connection or do nothing
self.openedConnectionsCount -= 1
self.leasedConnections.remove(ConnectionKey(connection))

return self.processNextWaiter()
}
case .closed:
self.openedConnectionsCount -= 1
self.leasedConnections.remove(ConnectionKey(connection))

return self.processNextWaiter()
}
}

mutating func offer(connection: Connection) -> Action {
switch self.state {
case .active:
self.leasedConnections.insert(ConnectionKey(connection))
return .none
case .closed: // This can happen when we close the client while connections was being estableshed
return .cancel(connection, self.isEmpty)
}
}

mutating func drop(connection: Connection) {
switch self.state {
case .active:
self.leasedConnections.remove(ConnectionKey(connection))
case .closed:
assertionFailure("should not happen")
}
}

mutating func connectFailed() -> Action {
switch self.state {
case .active:
self.openedConnectionsCount -= 1
return self.processNextWaiter()
case .closed:
assertionFailure("should not happen")
return .none
}
}

mutating func remoteClosed(connection: Connection) -> Action {
switch self.state {
case .active:
// Connection can be closed remotely while we wait for `.lease` action to complete.
// If this happens when connections is leased, we do not remove it from leased connections,
// it will be done when a new replacement will be ready for it.
if self.leasedConnections.contains(ConnectionKey(connection)) {
return .none
}

// If this connection is not in use, the have to release it as well
self.openedConnectionsCount -= 1
self.availableConnections.removeAll { $0 === connection }

return self.processNextWaiter()
case .closed:
self.openedConnectionsCount -= 1
return self.processNextWaiter()
}
}

mutating func timeout(connection: Connection) -> Action {
switch self.state {
case .active:
// We can get timeout and inUse = true when we decided to lease the connection, but this action is not executed yet.
// In this case we can ignore timeout notification.
if self.leasedConnections.contains(ConnectionKey(connection)) {
return .none
}

// If connection was not in use, we release it from the pool, increasing available capacity
self.openedConnectionsCount -= 1
self.availableConnections.removeAll { $0 === connection }

return .closeAnd(connection, self.processNextWaiter())
case .closed:
return .none
}
}

mutating func processNextWaiter() -> Action {
if let waiter = self.waiters.popFirst() {
let (eventLoop, required) = self.resolvePreference(waiter.preference)

// If specific EL is required, we have only two options - find open one or create a new one
if required, let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) {
let connection = self.availableConnections.remove(at: found)
self.leasedConnections.insert(ConnectionKey(connection))
return .lease(connection, waiter)
} else if !required, let connection = self.availableConnections.popFirst() {
self.leasedConnections.insert(ConnectionKey(connection))
return .lease(connection, waiter)
} else {
self.openedConnectionsCount += 1
return .create(waiter)
}
}

// if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider
if self.isEmpty {
// deactivate and remove
self.state = .closed
return .closeProvider
}

return .none
}

mutating func close() -> (CircularBuffer<Waiter>, CircularBuffer<Connection>, Set<ConnectionKey>, Bool)? {
switch self.state {
case .active:
let waiters = self.waiters
self.waiters.removeAll()

let available = self.availableConnections
self.availableConnections.removeAll()

let leased = self.leasedConnections

self.state = .closed

return (waiters, available, leased, self.openedConnectionsCount - available.count == 0)
case .closed:
return nil
}
}

private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) {
switch preference.preference {
case .indifferent:
return (self.eventLoop, false)
case .delegate(let el):
return (el, false)
case .delegateAndChannel(let el), .testOnly_exact(let el, _):
return (el, true)
}
}
}
}
Loading