diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift deleted file mode 100644 index d21d06d8d..000000000 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift +++ /dev/null @@ -1,57 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIOCore - -extension HTTPConnectionPool { - struct RequestID: Hashable { - private let objectIdentifier: ObjectIdentifier - - init(_ request: HTTPSchedulableRequest) { - self.objectIdentifier = ObjectIdentifier(request) - } - } - - struct Waiter { - var requestID: RequestID { - RequestID(self.request) - } - - var request: HTTPSchedulableRequest - - private var eventLoopRequirement: EventLoop? { - switch self.request.eventLoopPreference.preference { - case .delegateAndChannel(on: let eventLoop), - .testOnly_exact(channelOn: let eventLoop, delegateOn: _): - return eventLoop - case .delegate(on: _), - .indifferent: - return nil - } - } - - init(request: HTTPSchedulableRequest) { - self.request = request - } - - func canBeRun(on option: EventLoop) -> Bool { - guard let requirement = self.eventLoopRequirement else { - // if no requirement exists we can run on any EventLoop - return true - } - - return requirement === option - } - } -} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index 5d15f6b16..c9206765c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -123,6 +123,46 @@ enum HTTPConnectionPool { } } +extension HTTPConnectionPool { + /// This is a wrapper that we use inside the connection pool state machine to ensure that + /// the actual request can not be accessed at any time. Further it exposes all that is needed within + /// the state machine. A request ID and the `EventLoop` requirement. + struct Request { + struct ID: Hashable { + let objectIdentifier: ObjectIdentifier + let eventLoopID: EventLoopID? + + fileprivate init(_ request: HTTPSchedulableRequest, eventLoopRequirement eventLoopID: EventLoopID?) { + self.objectIdentifier = ObjectIdentifier(request) + self.eventLoopID = eventLoopID + } + } + + fileprivate let req: HTTPSchedulableRequest + + init(_ request: HTTPSchedulableRequest) { + self.req = request + } + + var id: HTTPConnectionPool.Request.ID { + HTTPConnectionPool.Request.ID(self.req, eventLoopRequirement: self.requiredEventLoop?.id) + } + + var requiredEventLoop: EventLoop? { + switch self.req.eventLoopPreference.preference { + case .indifferent, .delegate: + return nil + case .delegateAndChannel(on: let eventLoop), .testOnly_exact(channelOn: let eventLoop, delegateOn: _): + return eventLoop + } + } + + func __testOnly_wrapped_request() -> HTTPSchedulableRequest { + self.req + } + } +} + struct EventLoopID: Hashable { private var id: Identifier diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift new file mode 100644 index 000000000..ca7af66fb --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -0,0 +1,119 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +extension HTTPConnectionPool { + /// A struct to store all queued requests. + struct RequestQueue { + private var generalPurposeQueue: CircularBuffer + private var eventLoopQueues: [EventLoopID: CircularBuffer] + + init() { + self.generalPurposeQueue = CircularBuffer(initialCapacity: 32) + self.eventLoopQueues = [:] + } + + var count: Int { + self.generalPurposeQueue.count + self.eventLoopQueues.reduce(0) { $0 + $1.value.count } + } + + var isEmpty: Bool { + self.count == 0 + } + + func count(for eventLoop: EventLoop?) -> Int { + if let eventLoop = eventLoop { + return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0 + } + return self.generalPurposeQueue.count + } + + func isEmpty(for eventLoop: EventLoop?) -> Bool { + if let eventLoop = eventLoop { + return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.isEmpty } ?? true + } + return self.generalPurposeQueue.isEmpty + } + + @discardableResult + mutating func push(_ request: Request) -> Request.ID { + if let eventLoop = request.requiredEventLoop { + self.withEventLoopQueue(for: eventLoop.id) { queue in + queue.append(request) + } + } else { + self.generalPurposeQueue.append(request) + } + return request.id + } + + mutating func popFirst(for eventLoop: EventLoop? = nil) -> Request? { + if let eventLoop = eventLoop { + return self.withEventLoopQueue(for: eventLoop.id) { queue in + queue.popFirst() + } + } else { + return self.generalPurposeQueue.popFirst() + } + } + + mutating func remove(_ requestID: Request.ID) -> Request? { + if let eventLoopID = requestID.eventLoopID { + return self.withEventLoopQueue(for: eventLoopID) { queue in + guard let index = queue.firstIndex(where: { $0.id == requestID }) else { + return nil + } + return queue.remove(at: index) + } + } else { + if let index = self.generalPurposeQueue.firstIndex(where: { $0.id == requestID }) { + // TBD: This is slow. Do we maybe want something more sophisticated here? + return self.generalPurposeQueue.remove(at: index) + } + return nil + } + } + + mutating func removeAll() -> [Request] { + var result = [Request]() + result = self.eventLoopQueues.flatMap { $0.value } + result.append(contentsOf: self.generalPurposeQueue) + + self.eventLoopQueues.removeAll() + self.generalPurposeQueue.removeAll() + return result + } + + private mutating func withEventLoopQueue( + for eventLoopID: EventLoopID, + _ closure: (inout CircularBuffer) -> Result + ) -> Result { + if self.eventLoopQueues[eventLoopID] == nil { + self.eventLoopQueues[eventLoopID] = CircularBuffer(initialCapacity: 32) + } + return closure(&self.eventLoopQueues[eventLoopID]!) + } + + private func withEventLoopQueueIfAvailable( + for eventLoopID: EventLoopID, + _ closure: (CircularBuffer) -> Result + ) -> Result? { + if let queue = self.eventLoopQueues[eventLoopID] { + return closure(queue) + } + return nil + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests+XCTest.swift similarity index 66% rename from Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests+XCTest.swift rename to Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests+XCTest.swift index 22ac2329d..2511ba267 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests+XCTest.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// // -// HTTPConnectionPool+WaiterTests+XCTest.swift +// HTTPConnectionPool+RequestQueueTests+XCTest.swift // import XCTest @@ -22,11 +22,10 @@ import XCTest /// Do NOT edit this file directly as it will be regenerated automatically when needed. /// -extension HTTPConnectionPool_WaiterTests { - static var allTests: [(String, (HTTPConnectionPool_WaiterTests) -> () throws -> Void)] { +extension HTTPConnectionPool_RequestQueueTests { + static var allTests: [(String, (HTTPConnectionPool_RequestQueueTests) -> () throws -> Void)] { return [ - ("testCanBeRunIfEventLoopIsSpecified", testCanBeRunIfEventLoopIsSpecified), - ("testCanBeRunIfNoEventLoopIsSpecified", testCanBeRunIfNoEventLoopIsSpecified), + ("testCountAndIsEmptyWorks", testCountAndIsEmptyWorks), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift new file mode 100644 index 000000000..500bcf296 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift @@ -0,0 +1,134 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import Logging +import NIOCore +import NIOEmbedded +import NIOHTTP1 +import XCTest + +class HTTPConnectionPool_RequestQueueTests: XCTestCase { + func testCountAndIsEmptyWorks() { + var queue = HTTPConnectionPool.RequestQueue() + XCTAssertTrue(queue.isEmpty) + XCTAssertEqual(queue.count, 0) + let req1 = MockScheduledRequest(eventLoopPreference: .indifferent) + let req1ID = queue.push(.init(req1)) + XCTAssertFalse(queue.isEmpty) + XCTAssertFalse(queue.isEmpty(for: nil)) + XCTAssertEqual(queue.count, 1) + XCTAssertEqual(queue.count(for: nil), 1) + + let req2 = MockScheduledRequest(eventLoopPreference: .indifferent) + let req2ID = queue.push(.init(req2)) + XCTAssertEqual(queue.count, 2) + + XCTAssert(queue.popFirst()?.__testOnly_wrapped_request() === req1) + XCTAssertEqual(queue.count, 1) + XCTAssertFalse(queue.isEmpty) + XCTAssert(queue.remove(req2ID)?.__testOnly_wrapped_request() === req2) + XCTAssertNil(queue.remove(req1ID)) + XCTAssertEqual(queue.count, 0) + XCTAssertTrue(queue.isEmpty) + + let eventLoop = EmbeddedEventLoop() + + XCTAssertTrue(queue.isEmpty(for: eventLoop)) + XCTAssertEqual(queue.count(for: eventLoop), 0) + let req3 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop)) + let req3ID = queue.push(.init(req3)) + XCTAssertFalse(queue.isEmpty(for: eventLoop)) + XCTAssertEqual(queue.count(for: eventLoop), 1) + XCTAssertFalse(queue.isEmpty) + XCTAssertEqual(queue.count, 1) + XCTAssert(queue.popFirst(for: eventLoop)?.__testOnly_wrapped_request() === req3) + XCTAssertNil(queue.remove(req3ID)) + XCTAssertTrue(queue.isEmpty(for: eventLoop)) + XCTAssertEqual(queue.count(for: eventLoop), 0) + XCTAssertTrue(queue.isEmpty) + XCTAssertEqual(queue.count, 0) + + let req4 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop)) + let req4ID = queue.push(.init(req4)) + XCTAssert(queue.remove(req4ID)?.__testOnly_wrapped_request() === req4) + + let req5 = MockScheduledRequest(eventLoopPreference: .indifferent) + queue.push(.init(req5)) + let req6 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop)) + queue.push(.init(req6)) + let all = queue.removeAll() + let testSet = all.map { $0.__testOnly_wrapped_request() } + XCTAssertEqual(testSet.count, 2) + XCTAssertTrue(testSet.contains(where: { $0 === req5 })) + XCTAssertTrue(testSet.contains(where: { $0 === req6 })) + XCTAssertFalse(testSet.contains(where: { $0 === req4 })) + XCTAssertTrue(queue.isEmpty(for: eventLoop)) + XCTAssertEqual(queue.count(for: eventLoop), 0) + XCTAssertTrue(queue.isEmpty) + XCTAssertEqual(queue.count, 0) + } +} + +private class MockScheduledRequest: HTTPSchedulableRequest { + init(eventLoopPreference: HTTPClient.EventLoopPreference) { + self.eventLoopPreference = eventLoopPreference + } + + var logger: Logger { preconditionFailure("Unimplemented") } + var connectionDeadline: NIODeadline { preconditionFailure("Unimplemented") } + let eventLoopPreference: HTTPClient.EventLoopPreference + + func requestWasQueued(_: HTTPRequestScheduler) { + preconditionFailure("Unimplemented") + } + + func fail(_: Error) { + preconditionFailure("Unimplemented") + } + + // MARK: HTTPExecutableRequest + + var requestHead: HTTPRequestHead { preconditionFailure("Unimplemented") } + var requestFramingMetadata: RequestFramingMetadata { preconditionFailure("Unimplemented") } + var idleReadTimeout: TimeAmount? { preconditionFailure("Unimplemented") } + + func willExecuteRequest(_: HTTPRequestExecutor) { + preconditionFailure("Unimplemented") + } + + func requestHeadSent() { + preconditionFailure("Unimplemented") + } + + func resumeRequestBodyStream() { + preconditionFailure("Unimplemented") + } + + func pauseRequestBodyStream() { + preconditionFailure("Unimplemented") + } + + func receiveResponseHead(_: HTTPResponseHead) { + preconditionFailure("Unimplemented") + } + + func receiveResponseBodyParts(_: CircularBuffer) { + preconditionFailure("Unimplemented") + } + + func succeedRequest(_: CircularBuffer?) { + preconditionFailure("Unimplemented") + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift new file mode 100644 index 000000000..0b1a1b8a4 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+StateTestUtils.swift @@ -0,0 +1,53 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import NIOConcurrencyHelpers +import NIOCore +import NIOEmbedded + +/// An `EventLoopGroup` of `EmbeddedEventLoop`s. +final class EmbeddedEventLoopGroup: EventLoopGroup { + private let loops: [EmbeddedEventLoop] + private let index = NIOAtomic.makeAtomic(value: 0) + + internal init(loops: Int) { + self.loops = (0.. EventLoop { + let index: Int = self.index.add(1) + return self.loops[index % self.loops.count] + } + + internal func makeIterator() -> EventLoopIterator { + return EventLoopIterator(self.loops) + } + + internal func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + var shutdownError: Error? + + for loop in self.loops { + loop.shutdownGracefully(queue: queue) { error in + if let error = error { + shutdownError = error + } + } + } + + queue.sync { + callback(shutdownError) + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests.swift deleted file mode 100644 index 69f43dfb7..000000000 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests.swift +++ /dev/null @@ -1,99 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the AsyncHTTPClient open source project -// -// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -@testable import AsyncHTTPClient -import Logging -import NIOCore -import NIOHTTP1 -import NIOPosix -import XCTest - -class HTTPConnectionPool_WaiterTests: XCTestCase { - func testCanBeRunIfEventLoopIsSpecified() { - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) - - let theRightEL = eventLoopGroup.next() - let theFalseEL = eventLoopGroup.next() - - let mockRequest = MockScheduledRequest(eventLoopPreference: .init(.testOnly_exact(channelOn: theRightEL, delegateOn: theFalseEL))) - - let waiter = HTTPConnectionPool.Waiter(request: mockRequest) - - XCTAssertTrue(waiter.canBeRun(on: theRightEL)) - XCTAssertFalse(waiter.canBeRun(on: theFalseEL)) - } - - func testCanBeRunIfNoEventLoopIsSpecified() { - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) - - let mockRequest = MockScheduledRequest(eventLoopPreference: .indifferent) - let waiter = HTTPConnectionPool.Waiter(request: mockRequest) - - for el in eventLoopGroup.makeIterator() { - XCTAssertTrue(waiter.canBeRun(on: el)) - } - } -} - -private class MockScheduledRequest: HTTPSchedulableRequest { - init(eventLoopPreference: HTTPClient.EventLoopPreference) { - self.eventLoopPreference = eventLoopPreference - } - - var logger: Logger { preconditionFailure("Unimplemented") } - var connectionDeadline: NIODeadline { preconditionFailure("Unimplemented") } - let eventLoopPreference: HTTPClient.EventLoopPreference - - func requestWasQueued(_: HTTPRequestScheduler) { - preconditionFailure("Unimplemented") - } - - func fail(_: Error) { - preconditionFailure("Unimplemented") - } - - // MARK: HTTPExecutableRequest - - var requestHead: HTTPRequestHead { preconditionFailure("Unimplemented") } - var requestFramingMetadata: RequestFramingMetadata { preconditionFailure("Unimplemented") } - var idleReadTimeout: TimeAmount? { preconditionFailure("Unimplemented") } - - func willExecuteRequest(_: HTTPRequestExecutor) { - preconditionFailure("Unimplemented") - } - - func requestHeadSent() { - preconditionFailure("Unimplemented") - } - - func resumeRequestBodyStream() { - preconditionFailure("Unimplemented") - } - - func pauseRequestBodyStream() { - preconditionFailure("Unimplemented") - } - - func receiveResponseHead(_: HTTPResponseHead) { - preconditionFailure("Unimplemented") - } - - func receiveResponseBodyParts(_: CircularBuffer) { - preconditionFailure("Unimplemented") - } - - func succeedRequest(_: CircularBuffer?) { - preconditionFailure("Unimplemented") - } -} diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index b31e0d29d..8c189d15a 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -41,7 +41,7 @@ import XCTest testCase(HTTPClientSOCKSTests.allTests), testCase(HTTPClientTests.allTests), testCase(HTTPConnectionPool_FactoryTests.allTests), - testCase(HTTPConnectionPool_WaiterTests.allTests), + testCase(HTTPConnectionPool_RequestQueueTests.allTests), testCase(HTTPRequestStateMachineTests.allTests), testCase(LRUCacheTests.allTests), testCase(RequestBagTests.allTests),