Skip to content

Commit 1f5b633

Browse files
authored
Add a RequestQueue for the ConnectionPool (#412)
1 parent 6af7c8c commit 1f5b633

8 files changed

+351
-162
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift

-57
This file was deleted.

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

+40
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,46 @@ enum HTTPConnectionPool {
123123
}
124124
}
125125

126+
extension HTTPConnectionPool {
127+
/// This is a wrapper that we use inside the connection pool state machine to ensure that
128+
/// the actual request can not be accessed at any time. Further it exposes all that is needed within
129+
/// the state machine. A request ID and the `EventLoop` requirement.
130+
struct Request {
131+
struct ID: Hashable {
132+
let objectIdentifier: ObjectIdentifier
133+
let eventLoopID: EventLoopID?
134+
135+
fileprivate init(_ request: HTTPSchedulableRequest, eventLoopRequirement eventLoopID: EventLoopID?) {
136+
self.objectIdentifier = ObjectIdentifier(request)
137+
self.eventLoopID = eventLoopID
138+
}
139+
}
140+
141+
fileprivate let req: HTTPSchedulableRequest
142+
143+
init(_ request: HTTPSchedulableRequest) {
144+
self.req = request
145+
}
146+
147+
var id: HTTPConnectionPool.Request.ID {
148+
HTTPConnectionPool.Request.ID(self.req, eventLoopRequirement: self.requiredEventLoop?.id)
149+
}
150+
151+
var requiredEventLoop: EventLoop? {
152+
switch self.req.eventLoopPreference.preference {
153+
case .indifferent, .delegate:
154+
return nil
155+
case .delegateAndChannel(on: let eventLoop), .testOnly_exact(channelOn: let eventLoop, delegateOn: _):
156+
return eventLoop
157+
}
158+
}
159+
160+
func __testOnly_wrapped_request() -> HTTPSchedulableRequest {
161+
self.req
162+
}
163+
}
164+
}
165+
126166
struct EventLoopID: Hashable {
127167
private var id: Identifier
128168

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
extension HTTPConnectionPool {
18+
/// A struct to store all queued requests.
19+
struct RequestQueue {
20+
private var generalPurposeQueue: CircularBuffer<Request>
21+
private var eventLoopQueues: [EventLoopID: CircularBuffer<Request>]
22+
23+
init() {
24+
self.generalPurposeQueue = CircularBuffer(initialCapacity: 32)
25+
self.eventLoopQueues = [:]
26+
}
27+
28+
var count: Int {
29+
self.generalPurposeQueue.count + self.eventLoopQueues.reduce(0) { $0 + $1.value.count }
30+
}
31+
32+
var isEmpty: Bool {
33+
self.count == 0
34+
}
35+
36+
func count(for eventLoop: EventLoop?) -> Int {
37+
if let eventLoop = eventLoop {
38+
return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0
39+
}
40+
return self.generalPurposeQueue.count
41+
}
42+
43+
func isEmpty(for eventLoop: EventLoop?) -> Bool {
44+
if let eventLoop = eventLoop {
45+
return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.isEmpty } ?? true
46+
}
47+
return self.generalPurposeQueue.isEmpty
48+
}
49+
50+
@discardableResult
51+
mutating func push(_ request: Request) -> Request.ID {
52+
if let eventLoop = request.requiredEventLoop {
53+
self.withEventLoopQueue(for: eventLoop.id) { queue in
54+
queue.append(request)
55+
}
56+
} else {
57+
self.generalPurposeQueue.append(request)
58+
}
59+
return request.id
60+
}
61+
62+
mutating func popFirst(for eventLoop: EventLoop? = nil) -> Request? {
63+
if let eventLoop = eventLoop {
64+
return self.withEventLoopQueue(for: eventLoop.id) { queue in
65+
queue.popFirst()
66+
}
67+
} else {
68+
return self.generalPurposeQueue.popFirst()
69+
}
70+
}
71+
72+
mutating func remove(_ requestID: Request.ID) -> Request? {
73+
if let eventLoopID = requestID.eventLoopID {
74+
return self.withEventLoopQueue(for: eventLoopID) { queue in
75+
guard let index = queue.firstIndex(where: { $0.id == requestID }) else {
76+
return nil
77+
}
78+
return queue.remove(at: index)
79+
}
80+
} else {
81+
if let index = self.generalPurposeQueue.firstIndex(where: { $0.id == requestID }) {
82+
// TBD: This is slow. Do we maybe want something more sophisticated here?
83+
return self.generalPurposeQueue.remove(at: index)
84+
}
85+
return nil
86+
}
87+
}
88+
89+
mutating func removeAll() -> [Request] {
90+
var result = [Request]()
91+
result = self.eventLoopQueues.flatMap { $0.value }
92+
result.append(contentsOf: self.generalPurposeQueue)
93+
94+
self.eventLoopQueues.removeAll()
95+
self.generalPurposeQueue.removeAll()
96+
return result
97+
}
98+
99+
private mutating func withEventLoopQueue<Result>(
100+
for eventLoopID: EventLoopID,
101+
_ closure: (inout CircularBuffer<Request>) -> Result
102+
) -> Result {
103+
if self.eventLoopQueues[eventLoopID] == nil {
104+
self.eventLoopQueues[eventLoopID] = CircularBuffer(initialCapacity: 32)
105+
}
106+
return closure(&self.eventLoopQueues[eventLoopID]!)
107+
}
108+
109+
private func withEventLoopQueueIfAvailable<Result>(
110+
for eventLoopID: EventLoopID,
111+
_ closure: (CircularBuffer<Request>) -> Result
112+
) -> Result? {
113+
if let queue = self.eventLoopQueues[eventLoopID] {
114+
return closure(queue)
115+
}
116+
return nil
117+
}
118+
}
119+
}

Diff for: Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests+XCTest.swift renamed to Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests+XCTest.swift

+4-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414
//
15-
// HTTPConnectionPool+WaiterTests+XCTest.swift
15+
// HTTPConnectionPool+RequestQueueTests+XCTest.swift
1616
//
1717
import XCTest
1818

@@ -22,11 +22,10 @@ import XCTest
2222
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
2323
///
2424

25-
extension HTTPConnectionPool_WaiterTests {
26-
static var allTests: [(String, (HTTPConnectionPool_WaiterTests) -> () throws -> Void)] {
25+
extension HTTPConnectionPool_RequestQueueTests {
26+
static var allTests: [(String, (HTTPConnectionPool_RequestQueueTests) -> () throws -> Void)] {
2727
return [
28-
("testCanBeRunIfEventLoopIsSpecified", testCanBeRunIfEventLoopIsSpecified),
29-
("testCanBeRunIfNoEventLoopIsSpecified", testCanBeRunIfNoEventLoopIsSpecified),
28+
("testCountAndIsEmptyWorks", testCountAndIsEmptyWorks),
3029
]
3130
}
3231
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AsyncHTTPClient
16+
import Logging
17+
import NIOCore
18+
import NIOEmbedded
19+
import NIOHTTP1
20+
import XCTest
21+
22+
class HTTPConnectionPool_RequestQueueTests: XCTestCase {
23+
func testCountAndIsEmptyWorks() {
24+
var queue = HTTPConnectionPool.RequestQueue()
25+
XCTAssertTrue(queue.isEmpty)
26+
XCTAssertEqual(queue.count, 0)
27+
let req1 = MockScheduledRequest(eventLoopPreference: .indifferent)
28+
let req1ID = queue.push(.init(req1))
29+
XCTAssertFalse(queue.isEmpty)
30+
XCTAssertFalse(queue.isEmpty(for: nil))
31+
XCTAssertEqual(queue.count, 1)
32+
XCTAssertEqual(queue.count(for: nil), 1)
33+
34+
let req2 = MockScheduledRequest(eventLoopPreference: .indifferent)
35+
let req2ID = queue.push(.init(req2))
36+
XCTAssertEqual(queue.count, 2)
37+
38+
XCTAssert(queue.popFirst()?.__testOnly_wrapped_request() === req1)
39+
XCTAssertEqual(queue.count, 1)
40+
XCTAssertFalse(queue.isEmpty)
41+
XCTAssert(queue.remove(req2ID)?.__testOnly_wrapped_request() === req2)
42+
XCTAssertNil(queue.remove(req1ID))
43+
XCTAssertEqual(queue.count, 0)
44+
XCTAssertTrue(queue.isEmpty)
45+
46+
let eventLoop = EmbeddedEventLoop()
47+
48+
XCTAssertTrue(queue.isEmpty(for: eventLoop))
49+
XCTAssertEqual(queue.count(for: eventLoop), 0)
50+
let req3 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop))
51+
let req3ID = queue.push(.init(req3))
52+
XCTAssertFalse(queue.isEmpty(for: eventLoop))
53+
XCTAssertEqual(queue.count(for: eventLoop), 1)
54+
XCTAssertFalse(queue.isEmpty)
55+
XCTAssertEqual(queue.count, 1)
56+
XCTAssert(queue.popFirst(for: eventLoop)?.__testOnly_wrapped_request() === req3)
57+
XCTAssertNil(queue.remove(req3ID))
58+
XCTAssertTrue(queue.isEmpty(for: eventLoop))
59+
XCTAssertEqual(queue.count(for: eventLoop), 0)
60+
XCTAssertTrue(queue.isEmpty)
61+
XCTAssertEqual(queue.count, 0)
62+
63+
let req4 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop))
64+
let req4ID = queue.push(.init(req4))
65+
XCTAssert(queue.remove(req4ID)?.__testOnly_wrapped_request() === req4)
66+
67+
let req5 = MockScheduledRequest(eventLoopPreference: .indifferent)
68+
queue.push(.init(req5))
69+
let req6 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop))
70+
queue.push(.init(req6))
71+
let all = queue.removeAll()
72+
let testSet = all.map { $0.__testOnly_wrapped_request() }
73+
XCTAssertEqual(testSet.count, 2)
74+
XCTAssertTrue(testSet.contains(where: { $0 === req5 }))
75+
XCTAssertTrue(testSet.contains(where: { $0 === req6 }))
76+
XCTAssertFalse(testSet.contains(where: { $0 === req4 }))
77+
XCTAssertTrue(queue.isEmpty(for: eventLoop))
78+
XCTAssertEqual(queue.count(for: eventLoop), 0)
79+
XCTAssertTrue(queue.isEmpty)
80+
XCTAssertEqual(queue.count, 0)
81+
}
82+
}
83+
84+
private class MockScheduledRequest: HTTPSchedulableRequest {
85+
init(eventLoopPreference: HTTPClient.EventLoopPreference) {
86+
self.eventLoopPreference = eventLoopPreference
87+
}
88+
89+
var logger: Logger { preconditionFailure("Unimplemented") }
90+
var connectionDeadline: NIODeadline { preconditionFailure("Unimplemented") }
91+
let eventLoopPreference: HTTPClient.EventLoopPreference
92+
93+
func requestWasQueued(_: HTTPRequestScheduler) {
94+
preconditionFailure("Unimplemented")
95+
}
96+
97+
func fail(_: Error) {
98+
preconditionFailure("Unimplemented")
99+
}
100+
101+
// MARK: HTTPExecutableRequest
102+
103+
var requestHead: HTTPRequestHead { preconditionFailure("Unimplemented") }
104+
var requestFramingMetadata: RequestFramingMetadata { preconditionFailure("Unimplemented") }
105+
var idleReadTimeout: TimeAmount? { preconditionFailure("Unimplemented") }
106+
107+
func willExecuteRequest(_: HTTPRequestExecutor) {
108+
preconditionFailure("Unimplemented")
109+
}
110+
111+
func requestHeadSent() {
112+
preconditionFailure("Unimplemented")
113+
}
114+
115+
func resumeRequestBodyStream() {
116+
preconditionFailure("Unimplemented")
117+
}
118+
119+
func pauseRequestBodyStream() {
120+
preconditionFailure("Unimplemented")
121+
}
122+
123+
func receiveResponseHead(_: HTTPResponseHead) {
124+
preconditionFailure("Unimplemented")
125+
}
126+
127+
func receiveResponseBodyParts(_: CircularBuffer<ByteBuffer>) {
128+
preconditionFailure("Unimplemented")
129+
}
130+
131+
func succeedRequest(_: CircularBuffer<ByteBuffer>?) {
132+
preconditionFailure("Unimplemented")
133+
}
134+
}

0 commit comments

Comments
 (0)