forked from swift-server/async-http-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHTTPConnectionPool+RequestQueue.swift
207 lines (184 loc) · 7.79 KB
/
HTTPConnectionPool+RequestQueue.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
private struct HashableEventLoop: Hashable {
static func == (lhs: HashableEventLoop, rhs: HashableEventLoop) -> Bool {
lhs.eventLoop === rhs.eventLoop
}
init(_ eventLoop: EventLoop) {
self.eventLoop = eventLoop
}
let eventLoop: EventLoop
func hash(into hasher: inout Hasher) {
self.eventLoop.id.hash(into: &hasher)
}
}
extension HTTPConnectionPool {
/// A struct to store all queued requests.
struct RequestQueue {
private var generalPurposeQueue: CircularBuffer<Request>
private var eventLoopQueues: [EventLoopID: CircularBuffer<Request>]
init() {
self.generalPurposeQueue = CircularBuffer(initialCapacity: 32)
self.eventLoopQueues = [:]
}
var count: Int {
self.generalPurposeQueue.count + self.eventLoopQueues.reduce(0) { $0 + $1.value.count }
}
var isEmpty: Bool {
self.count == 0
}
var generalPurposeCount: Int {
self.generalPurposeQueue.count
}
func count(for eventLoop: EventLoop) -> Int {
self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0
}
func isEmpty(for eventLoop: EventLoop?) -> Bool {
if let eventLoop = eventLoop {
return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.isEmpty } ?? true
}
return self.generalPurposeQueue.isEmpty
}
@discardableResult
mutating func push(_ request: Request) -> Request.ID {
if let eventLoop = request.requiredEventLoop {
self.withEventLoopQueue(for: eventLoop.id) { queue in
queue.append(request)
}
} else {
self.generalPurposeQueue.append(request)
}
return request.id
}
mutating func popFirst(for eventLoop: EventLoop? = nil) -> Request? {
if let eventLoop = eventLoop {
return self.withEventLoopQueue(for: eventLoop.id) { queue in
queue.popFirst()
}
} else {
return self.generalPurposeQueue.popFirst()
}
}
/// removes up to `max` requests from the queue for the given `eventLoop` and returns them.
/// - Parameters:
/// - max: maximum number of requests to pop
/// - eventLoop: required event loop of the request
/// - Returns: requests for the given `eventLoop`
mutating func popFirst(max: Int, for eventLoop: EventLoop? = nil) -> [Request] {
if let eventLoop = eventLoop {
return self.withEventLoopQueue(for: eventLoop.id) { queue in
queue.popFirst(max: max)
}
} else {
return self.generalPurposeQueue.popFirst(max: max)
}
}
mutating func remove(_ requestID: Request.ID) -> Request? {
if let eventLoopID = requestID.eventLoopID {
return self.withEventLoopQueue(for: eventLoopID) { queue in
guard let index = queue.firstIndex(where: { $0.id == requestID }) else {
return nil
}
return queue.remove(at: index)
}
} else {
if let index = self.generalPurposeQueue.firstIndex(where: { $0.id == requestID }) {
// TBD: This is slow. Do we maybe want something more sophisticated here?
return self.generalPurposeQueue.remove(at: index)
}
return nil
}
}
mutating func removeAll() -> [Request] {
var result = [Request]()
result = self.eventLoopQueues.flatMap { $0.value }
result.append(contentsOf: self.generalPurposeQueue)
self.eventLoopQueues.removeAll()
self.generalPurposeQueue.removeAll()
return result
}
private mutating func withEventLoopQueue<Result>(
for eventLoopID: EventLoopID,
_ closure: (inout CircularBuffer<Request>) -> Result
) -> Result {
if self.eventLoopQueues[eventLoopID] == nil {
self.eventLoopQueues[eventLoopID] = CircularBuffer(initialCapacity: 32)
}
return closure(&self.eventLoopQueues[eventLoopID]!)
}
private func withEventLoopQueueIfAvailable<Result>(
for eventLoopID: EventLoopID,
_ closure: (CircularBuffer<Request>) -> Result
) -> Result? {
if let queue = self.eventLoopQueues[eventLoopID] {
return closure(queue)
}
return nil
}
/// - Returns: event loops with at least one request with a required event loop
func eventLoopsWithPendingRequests() -> [EventLoop] {
self.eventLoopQueues.compactMap {
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`
/// however, a queue can be empty
$0.value.first?.requiredEventLoop!
}
}
/// - Returns: request count for requests with required event loop, grouped by required event loop without any particular order
func requestCountGroupedByRequiredEventLoop() -> [(EventLoop, Int)] {
self.eventLoopQueues.values.compactMap { requests -> (EventLoop, Int)? in
/// all requests in `eventLoopQueues` are guaranteed to have a `requiredEventLoop`,
/// however, a queue can be empty
guard let requiredEventLoop = requests.first?.requiredEventLoop! else {
return nil
}
return (requiredEventLoop, requests.count)
}
}
/// - Returns: request count with **no** required event loop, grouped by preferred event loop and ordered descending by number of requests
func generalPurposeRequestCountGroupedByPreferredEventLoop() -> [(EventLoop, Int)] {
let requestCountPerEventLoop = Dictionary(
self.generalPurposeQueue.lazy.map { request in
(HashableEventLoop(request.preferredEventLoop), 1)
},
uniquingKeysWith: +
)
return requestCountPerEventLoop.lazy
.map { ($0.key.eventLoop, $0.value) }
.sorted { lhs, rhs in
lhs.1 > rhs.1
}
}
}
}
extension CircularBuffer {
/// Removes up to `max` elements from the beginning of the
/// `CircularBuffer` and returns them.
///
/// Calling this method may invalidate any existing indices for use with this
/// `CircularBuffer`.
///
/// - Parameter max: The number of elements to remove.
/// `max` must be greater than or equal to zero.
/// - Returns: removed elements
///
/// - Complexity: O(*k*), where *k* is the number of elements removed.
fileprivate mutating func popFirst(max: Int) -> [Element] {
precondition(max >= 0)
let elementCountToRemove = Swift.min(max, self.count)
let array = Array(self[self.startIndex..<self.index(self.startIndex, offsetBy: elementCountToRemove)])
self.removeFirst(elementCountToRemove)
return array
}
}