-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathHTTPConnectionPool+HTTP1Connections.swift
793 lines (699 loc) · 34.2 KB
/
HTTPConnectionPool+HTTP1Connections.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
extension HTTPConnectionPool {
/// Represents the state of a single HTTP/1.1 connection
private struct HTTP1ConnectionState {
enum State {
/// the connection is creating a connection. Valid transitions are to: .backingOff, .idle, and .closed
case starting
/// the connection is waiting to retry the establishing a connection. Valid transitions are to: .closed.
/// This means, the connection can be removed from the connections without cancelling external
/// state. The connection state can then be replaced by a new one.
case backingOff
/// the connection is idle for a new request. Valid transitions to: .leased and .closed
case idle(Connection, since: NIODeadline)
/// the connection is leased and running for a request. Valid transitions to: .idle and .closed
case leased(Connection)
/// the connection is closed. final state.
case closed
}
private var state: State
let connectionID: Connection.ID
let eventLoop: EventLoop
init(connectionID: Connection.ID, eventLoop: EventLoop) {
self.connectionID = connectionID
self.eventLoop = eventLoop
self.state = .starting
}
var isConnecting: Bool {
switch self.state {
case .starting:
return true
case .backingOff, .closed, .idle, .leased:
return false
}
}
var isBackingOff: Bool {
switch self.state {
case .backingOff:
return true
case .starting, .closed, .idle, .leased:
return false
}
}
var isIdle: Bool {
switch self.state {
case .idle:
return true
case .backingOff, .starting, .leased, .closed:
return false
}
}
var canOrWillBeAbleToExecuteRequests: Bool {
switch self.state {
case .leased, .backingOff, .idle, .starting:
return true
case .closed:
return false
}
}
var isLeased: Bool {
switch self.state {
case .leased:
return true
case .backingOff, .starting, .idle, .closed:
return false
}
}
var idleSince: NIODeadline? {
switch self.state {
case .idle(_, since: let idleSince):
return idleSince
case .backingOff, .starting, .leased, .closed:
return nil
}
}
var isClosed: Bool {
switch self.state {
case .closed:
return true
case .idle, .starting, .leased, .backingOff:
return false
}
}
mutating func connected(_ connection: Connection) {
switch self.state {
case .starting:
self.state = .idle(connection, since: .now())
case .backingOff, .idle, .leased, .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
/// The connection failed to start
mutating func failedToConnect() {
switch self.state {
case .starting:
self.state = .backingOff
case .backingOff, .idle, .leased, .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func lease() -> Connection {
switch self.state {
case .idle(let connection, since: _):
self.state = .leased(connection)
return connection
case .backingOff, .starting, .leased, .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func release() {
switch self.state {
case .leased(let connection):
self.state = .idle(connection, since: .now())
case .backingOff, .starting, .idle, .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func close() -> Connection {
switch self.state {
case .idle(let connection, since: _):
self.state = .closed
return connection
case .backingOff, .starting, .leased, .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func fail() {
switch self.state {
case .starting, .backingOff, .idle, .leased:
self.state = .closed
case .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
enum CleanupAction {
case removeConnection
case keepConnection
}
/// Cleanup the current connection for shutdown.
///
/// This method is called, when the connections shall shutdown. Depending on the state
/// the connection is in, it adds itself to one of the arrays that are used to signal shutdown
/// intent to the underlying connections. Connections that are backing off can be easily
/// dropped (since, we only need to cancel the backoff timer), connections that are leased
/// need to be cancelled (notifying the `ChannelHandler` that we want to cancel the
/// running request), connections that are idle can be closed right away. Sadly we can't
/// cancel connection starts right now. For this reason we need to wait for them to succeed
/// or fail until we finalize the shutdown.
///
/// - Parameter context: A cleanup context to add the connection to based on its state.
/// - Returns: A cleanup action indicating if the connection can be removed from the
/// connection list.
func cleanup(_ context: inout CleanupContext) -> CleanupAction {
switch self.state {
case .backingOff:
context.connectBackoff.append(self.connectionID)
return .removeConnection
case .starting:
return .keepConnection
case .idle(let connection, since: _):
context.close.append(connection)
return .removeConnection
case .leased(let connection):
context.cancel.append(connection)
return .keepConnection
case .closed:
preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)")
}
}
enum MigrateAction {
case removeConnection
case keepConnection
}
func migrateToHTTP2(_ context: inout HTTP1Connections.HTTP1ToHTTP2MigrationContext) -> MigrateAction {
switch self.state {
case .starting:
context.starting.append((self.connectionID, self.eventLoop))
return .removeConnection
case .backingOff:
context.backingOff.append((self.connectionID, self.eventLoop))
return .removeConnection
case .idle(let connection, since: _):
// Idle connections can be removed right away
context.close.append(connection)
return .removeConnection
case .leased:
return .keepConnection
case .closed:
preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)")
}
}
}
/// A structure to hold the currently active HTTP/1.1 connections.
///
/// The general purpose connection pool (pool for requests that don't have special `EventLoop`
/// requirements) will grow up until `maximumConcurrentConnections`. If requests have
/// special `EventLoop` requirements overflow connections might be opened.
///
/// All connections live in the same `connections` array. In the front are the general purpose
/// connections. In the back (starting with the `overflowIndex`) are the connections for
/// requests with special needs.
struct HTTP1Connections {
/// The maximum number of connections in the general purpose pool.
private let maximumConcurrentConnections: Int
/// A connectionID generator.
private let generator: Connection.ID.Generator
/// The connections states
private var connections: [HTTP1ConnectionState]
/// The index after which you will find the connections for requests with `EventLoop`
/// requirements in `connections`.
private var overflowIndex: Array<HTTP1ConnectionState>.Index
init(maximumConcurrentConnections: Int, generator: Connection.ID.Generator) {
self.connections = []
self.connections.reserveCapacity(maximumConcurrentConnections)
self.overflowIndex = self.connections.endIndex
self.maximumConcurrentConnections = maximumConcurrentConnections
self.generator = generator
}
var stats: Stats {
var stats = Stats()
// all additions here can be unchecked, since we will have at max self.connections.count
// which itself is an Int. For this reason we will never overflow.
for connectionState in self.connections {
if connectionState.isConnecting {
stats.connecting &+= 1
} else if connectionState.isBackingOff {
stats.backingOff &+= 1
} else if connectionState.isLeased {
stats.leased &+= 1
} else if connectionState.isIdle {
stats.idle &+= 1
}
}
return stats
}
var isEmpty: Bool {
self.connections.isEmpty
}
var canGrow: Bool {
self.overflowIndex < self.maximumConcurrentConnections
}
var startingGeneralPurposeConnections: Int {
var connecting = 0
for connectionState in self.connections[0..<self.overflowIndex] {
if connectionState.isConnecting || connectionState.isBackingOff {
// connecting can't be greater than self.connections.count so it can't overflow.
// For this reason we can save the bounds check.
connecting &+= 1
}
}
return connecting
}
private var maximumAdditionalGeneralPurposeConnections: Int {
self.maximumConcurrentConnections - (self.overflowIndex - 1)
}
/// Is there at least one connection that is able to run requests
var hasActiveConnections: Bool {
self.connections.contains(where: { $0.isIdle || $0.isLeased })
}
func startingEventLoopConnections(on eventLoop: EventLoop) -> Int {
return self.connections[self.overflowIndex..<self.connections.endIndex].reduce(into: 0) { count, connection in
guard connection.eventLoop === eventLoop else { return }
if connection.isConnecting || connection.isBackingOff {
count &+= 1
}
}
}
// MARK: - Mutations -
/// A connection's use. Did it serve in the pool or was it specialized for an `EventLoop`?
enum ConnectionUse {
case generalPurpose
case eventLoop(EventLoop)
}
/// Information around an idle connection.
struct IdleConnectionContext {
/// The `EventLoop` the connection runs on.
var eventLoop: EventLoop
/// The connection's use. Either general purpose or for requests with `EventLoop`
/// requirements.
var use: ConnectionUse
}
/// Information around the failed/closed connection.
struct FailedConnectionContext {
/// The eventLoop the connection ran on.
var eventLoop: EventLoop
/// The failed connection's use.
var use: ConnectionUse
/// Connections that we start up for this use-case
var connectionsStartingForUseCase: Int
}
struct HTTP1ToHTTP2MigrationContext {
var backingOff: [(Connection.ID, EventLoop)] = []
var starting: [(Connection.ID, EventLoop)] = []
var close: [Connection] = []
}
// MARK: Connection creation
mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID {
precondition(self.canGrow)
let connection = HTTP1ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop)
self.connections.insert(connection, at: self.overflowIndex)
self.overflowIndex = self.connections.index(after: self.overflowIndex)
return connection.connectionID
}
mutating func createNewOverflowConnection(on eventLoop: EventLoop) -> Connection.ID {
let connection = HTTP1ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop)
self.connections.append(connection)
return connection.connectionID
}
/// A new HTTP/1.1 connection was established.
///
/// This will put the connection into the idle state.
///
/// - Parameter connection: The new established connection.
/// - Returns: An index and an IdleConnectionContext to determine the next action for the now idle connection.
/// Call ``leaseConnection(at:)`` or ``closeConnection(at:)`` with the supplied index after
/// this.
mutating func newHTTP1ConnectionEstablished(_ connection: Connection) -> (Int, IdleConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else {
preconditionFailure("There is a new connection that we didn't request!")
}
precondition(connection.eventLoop === self.connections[index].eventLoop, "Expected the new connection to be on EL")
self.connections[index].connected(connection)
let context = self.generateIdleConnectionContextForConnection(at: index)
return (index, context)
}
/// Move the HTTP1ConnectionState to backingOff.
///
/// - Parameter connectionID: The connectionID of the failed connection attempt
/// - Returns: The eventLoop on which to schedule the backoff timer
mutating func backoffNextConnectionAttempt(_ connectionID: Connection.ID) -> EventLoop {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to create a new connection that we know nothing about?")
}
self.connections[index].failedToConnect()
return self.connections[index].eventLoop
}
// MARK: Leasing and releasing
/// Lease a connection on the preferred EventLoop
///
/// If no connection is available on the preferred EventLoop, a connection on
/// another eventLoop might be returned, if one is available.
///
/// - Parameter eventLoop: The preferred EventLoop for the request
/// - Returns: A connection to execute a request on.
mutating func leaseConnection(onPreferred eventLoop: EventLoop) -> Connection? {
guard let index = self.findIdleConnection(onPreferred: eventLoop) else {
return nil
}
return self.connections[index].lease()
}
/// Lease a connection on the required EventLoop
///
/// If no connection is available on the required EventLoop nil is returned.
///
/// - Parameter eventLoop: The required EventLoop for the request
/// - Returns: A connection to execute a request on.
mutating func leaseConnection(onRequired eventLoop: EventLoop) -> Connection? {
guard let index = self.findIdleConnection(onRequired: eventLoop) else {
return nil
}
return self.connections[index].lease()
}
mutating func leaseConnection(at index: Int) -> Connection {
self.connections[index].lease()
}
func parkConnection(at index: Int) -> (Connection.ID, EventLoop) {
precondition(self.connections[index].isIdle)
return (self.connections[index].connectionID, self.connections[index].eventLoop)
}
/// A new HTTP/1.1 connection was released.
///
/// This will put the position into the idle state.
///
/// - Parameter connectionID: The released connection's id.
/// - Returns: An index and an IdleConnectionContext to determine the next action for the now idle connection.
/// Call ``leaseConnection(at:)`` or ``closeConnection(at:)`` with the supplied index after
/// this. If you want to park the connection no further call is required.
mutating func releaseConnection(_ connectionID: Connection.ID) -> (Int, IdleConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("A connection that we don't know was released? Something is very wrong...")
}
self.connections[index].release()
let context = self.generateIdleConnectionContextForConnection(at: index)
return (index, context)
}
// MARK: Connection close/removal
/// Closes the connection at the given index. This will also remove the connection right away.
mutating func closeConnection(at index: Int) -> Connection {
if index < self.overflowIndex {
self.overflowIndex = self.connections.index(before: self.overflowIndex)
}
var connectionState = self.connections.remove(at: index)
return connectionState.close()
}
mutating func removeConnection(at index: Int) {
precondition(self.connections[index].isClosed)
if index < self.overflowIndex {
self.overflowIndex = self.connections.index(before: self.overflowIndex)
}
self.connections.remove(at: index)
}
mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> Connection? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
// because of a race this connection (connection close runs against trigger of timeout)
// was already removed from the state machine.
return nil
}
guard self.connections[index].isIdle else {
// connection is not idle anymore, we may have just leased it for a request
return nil
}
return self.closeConnection(at: index)
}
mutating func replaceConnection(at index: Int) -> (Connection.ID, EventLoop) {
precondition(self.connections[index].isClosed)
let newConnection = HTTP1ConnectionState(
connectionID: self.generator.next(),
eventLoop: self.connections[index].eventLoop
)
self.connections[index] = newConnection
return (newConnection.connectionID, newConnection.eventLoop)
}
// MARK: Connection failure
/// Fail a connection. Call this method, if a connection suddenly closed, did not startup correctly,
/// or the backoff time is done.
///
/// This will put the position into the closed state.
///
/// - Parameter connectionID: The failed connection's id.
/// - Returns: An optional index and an IdleConnectionContext to determine the next action for the closed connection.
/// You must call ``removeConnection(at:)`` or ``replaceConnection(at:)`` with the
/// supplied index after this. If nil is returned the connection was closed by the state machine and was
/// therefore already removed.
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
return nil
}
let use: ConnectionUse
self.connections[index].fail()
let eventLoop = self.connections[index].eventLoop
let starting: Int
if index < self.overflowIndex {
use = .generalPurpose
starting = self.startingGeneralPurposeConnections
} else {
use = .eventLoop(eventLoop)
starting = self.startingEventLoopConnections(on: eventLoop)
}
let context = FailedConnectionContext(
eventLoop: eventLoop,
use: use,
connectionsStartingForUseCase: starting
)
return (index, context)
}
// MARK: Migration
mutating func migrateToHTTP2() -> HTTP1ToHTTP2MigrationContext {
var migrationContext = HTTP1ToHTTP2MigrationContext()
let initialOverflowIndex = self.overflowIndex
self.connections = self.connections.enumerated().compactMap { index, connectionState in
switch connectionState.migrateToHTTP2(&migrationContext) {
case .removeConnection:
// If the connection has an index smaller than the previous overflow index,
// we deal with a general purpose connection.
// For this reason we need to decrement the overflow index.
if index < initialOverflowIndex {
self.overflowIndex = self.connections.index(before: self.overflowIndex)
}
return nil
case .keepConnection:
return connectionState
}
}
return migrationContext
}
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
mutating func migrateFromHTTP2(
starting: [(Connection.ID, EventLoop)],
backingOff: [(Connection.ID, EventLoop)]
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
self.connections.insert(newConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}
for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
backingOffConnection.failedToConnect()
self.connections.insert(backingOffConnection, at: self.overflowIndex)
/// If we can grow, we mark the connection as a general purpose connection.
/// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop
if self.canGrow {
self.overflowIndex = self.connections.index(after: self.overflowIndex)
}
}
}
/// We will create new connections for each `requiredEventLoopOfPendingRequests`
/// In addition, we also create more general purpose connections if we do not have enough to execute
/// all requests on the given `preferredEventLoopsOfPendingGeneralPurposeRequests`
/// until we reach `maximumConcurrentConnections`
/// - Parameters:
/// - requiredEventLoopsForPendingRequests:
/// event loops for which we have requests with a required event loop.
/// Duplicates are not allowed.
/// - generalPurposeRequestCountPerPreferredEventLoop:
/// request count with no required event loop,
/// grouped by preferred event loop and ordered descending by number of requests
/// - Returns: new connections that must be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopOfPendingRequests: [(EventLoop, Int)],
generalPurposeRequestCountGroupedByPreferredEventLoop: [(EventLoop, Int)]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop
// we may already start connections for those requests and do not want to start to many
let startingRequiredEventLoopConnectionCount = Dictionary(
self.connections[self.overflowIndex..<self.connections.endIndex].lazy.map {
($0.eventLoop.id, 1)
},
uniquingKeysWith: +
)
var connectionToCreate = requiredEventLoopOfPendingRequests
.flatMap { eventLoop, requestCount -> [(Connection.ID, EventLoop)] in
// We need a connection for each queued request with a required event loop.
// Therefore, we look how many request we have queued for a given `eventLoop` and
// how many connections we are already starting on the given `eventLoop`.
// If we have not enough, we will create additional connections to have at least
// on connection per request.
let connectionsToStart = requestCount - startingRequiredEventLoopConnectionCount[eventLoop.id, default: 0]
return stride(from: 0, to: connectionsToStart, by: 1).lazy.map { _ in
(self.createNewOverflowConnection(on: eventLoop), eventLoop)
}
}
// create new connections for requests without a required event loop
// TODO: improve algorithm to create connections uniformly across all preferred event loops
// while paying attention to the number of queued request per event loop
// Currently we start by creating new connections on the event loop with the most queued
// requests. If we have created enough connections to cover all requests for the first
// event loop we will continue with the event loop with the second most queued requests
// and so on and so forth. The `generalPurposeRequestCountGroupedByPreferredEventLoop`
// array is already ordered so we can just iterate over it without sorting by request count.
let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop
// we do not want to allocated intermediate arrays.
.lazy
// we flatten the grouped list of event loops by lazily repeating the event loop
// for each request.
// As a result we get one event loop per request (`[EventLoop]`).
.flatMap { eventLoop, requestCount in
repeatElement(eventLoop, count: requestCount)
}
// we may already start connections and do not want to start too many
.dropLast(self.startingGeneralPurposeConnections)
// we need to respect the used defined `maximumConcurrentConnections`
.prefix(self.maximumAdditionalGeneralPurposeConnections)
// we now create a connection for each remaining event loop
.map { eventLoop in
(self.createNewConnection(on: eventLoop), eventLoop)
}
connectionToCreate.append(contentsOf: newGeneralPurposeConnections)
return connectionToCreate
}
// MARK: Shutdown
mutating func shutdown() -> CleanupContext {
var cleanupContext = CleanupContext()
let initialOverflowIndex = self.overflowIndex
self.connections = self.connections.enumerated().compactMap { index, connectionState in
switch connectionState.cleanup(&cleanupContext) {
case .removeConnection:
// If the connection has an index smaller than the previous overflow index,
// we deal with a general purpose connection.
// For this reason we need to decrement the overflow index.
if index < initialOverflowIndex {
self.overflowIndex = self.connections.index(before: self.overflowIndex)
}
return nil
case .keepConnection:
return connectionState
}
}
return cleanupContext
}
// MARK: - Private functions -
private func generateIdleConnectionContextForConnection(at index: Int) -> IdleConnectionContext {
precondition(self.connections[index].isIdle)
let eventLoop = self.connections[index].eventLoop
let use: ConnectionUse
if index < self.overflowIndex {
use = .generalPurpose
} else {
use = .eventLoop(eventLoop)
}
return IdleConnectionContext(eventLoop: eventLoop, use: use)
}
private func findIdleConnection(onPreferred preferredEL: EventLoop) -> Int? {
var eventLoopMatch: (Int, NIODeadline)?
var goodMatch: (Int, NIODeadline)?
// To find an appropriate connection we iterate all existing connections.
// While we do this we try to find the best fitting connection for our request.
//
// A perfect match, runs on the same eventLoop and has been idle the shortest amount
// of time (returned the most recently).
//
// An okay match is not on the same eventLoop, and has been idle for the shortest
// time.
for (index, conn) in self.connections.enumerated() {
guard let connReturn = conn.idleSince else {
continue
}
if conn.eventLoop === preferredEL {
switch eventLoopMatch {
case .none:
eventLoopMatch = (index, connReturn)
case .some((_, let existingMatchReturn)) where connReturn > existingMatchReturn:
eventLoopMatch = (index, connReturn)
default:
break
}
} else {
switch goodMatch {
case .none:
goodMatch = (index, connReturn)
case .some((_, let existingMatchReturn)):
// We don't require a specific eventLoop. For this reason we want to pick a
// matching eventLoop that has been idle the shortest.
if connReturn > existingMatchReturn {
goodMatch = (index, connReturn)
}
}
}
}
if let (index, _) = eventLoopMatch {
return index
}
if let (index, _) = goodMatch {
return index
}
return nil
}
func findIdleConnection(onRequired requiredEL: EventLoop) -> Int? {
var match: (Int, NIODeadline)?
// To find an appropriate connection we iterate all existing connections.
// While we do this we try to find the best fitting connection for our request.
//
// A match, runs on the same eventLoop and has been idle the shortest amount of time.
for (index, conn) in self.connections.enumerated() {
// 1. Ensure we are on the correct EL.
guard conn.eventLoop === requiredEL else {
continue
}
// 2. Ensure the connection is idle
guard let connReturn = conn.idleSince else {
continue
}
switch match {
case .none:
match = (index, connReturn)
case .some((_, let existingMatchReturn)) where connReturn > existingMatchReturn:
// the currently iterated eventLoop has been idle for a shorter amount than
// the current best match.
match = (index, connReturn)
default:
// the currently iterated eventLoop has been idle for a longer amount than
// the current best match. We continue the iteration.
continue
}
}
if let (index, _) = match {
return index
}
return nil
}
struct Stats {
var idle: Int = 0
var leased: Int = 0
var connecting: Int = 0
var backingOff: Int = 0
}
}
}