forked from swift-server/async-http-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHTTPConnectionPool+HTTP2Connections.swift
690 lines (605 loc) · 30.5 KB
/
HTTPConnectionPool+HTTP2Connections.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
extension HTTPConnectionPool {
private struct HTTP2ConnectionState {
private enum State {
/// the pool is establishing a connection. Valid transitions are to: .backingOff, .active and .closed
case starting
/// the connection is waiting to retry to establish a connection. Valid transitions are to .closed.
/// From .closed a new connection state must be created for a retry.
case backingOff
/// the connection is active and is able to run requests. Valid transitions are to: .draining and .closed
case active(Connection, maxStreams: Int, usedStreams: Int, lastIdle: NIODeadline)
/// the connection is active and is running requests. No new requests must be scheduled.
/// Valid transitions to: .draining and .closed
case draining(Connection, maxStreams: Int, usedStreams: Int)
/// the connection is closed
case closed
}
var isStartingOrBackingOff: Bool {
switch self.state {
case .starting, .backingOff:
return true
case .active, .draining, .closed:
return false
}
}
var canOrWillBeAbleToExecuteRequests: Bool {
switch self.state {
case .starting, .backingOff, .active:
return true
case .draining, .closed:
return false
}
}
/// A connection is established and can potentially execute requests if not all streams are leased
var isActive: Bool {
switch self.state {
case .active:
return true
case .starting, .backingOff, .draining, .closed:
return false
}
}
/// A request can be scheduled on the connection
var isAvailable: Bool {
switch self.state {
case .active(_, let maxStreams, let usedStreams, _):
return usedStreams < maxStreams
case .starting, .backingOff, .draining, .closed:
return false
}
}
/// The connection is active, but there are no running requests on the connection.
/// Every idle connection is available, but not every available connection is idle.
var isIdle: Bool {
switch self.state {
case .active(_, _, let usedStreams, _):
return usedStreams == 0
case .starting, .backingOff, .draining, .closed:
return false
}
}
var isClosed: Bool {
switch self.state {
case .starting, .backingOff, .draining, .active:
return false
case .closed:
return true
}
}
private var state: State
let eventLoop: EventLoop
let connectionID: Connection.ID
/// should be called after the connection was successfully established
/// - Parameters:
/// - conn: HTTP2 connection
/// - maxStreams: max streams settings from the server
/// - Returns: number of available streams which can be leased
mutating func connected(_ conn: Connection, maxStreams: Int) -> Int {
switch self.state {
case .active, .draining, .backingOff, .closed:
preconditionFailure("Invalid state: \(self.state)")
case .starting:
self.state = .active(conn, maxStreams: maxStreams, usedStreams: 0, lastIdle: .now())
return maxStreams
}
}
/// should be called after receiving new http2 settings from the server
/// - Parameters:
/// - maxStreams: max streams settings from the server
/// - Returns: number of available streams which can be leased
mutating func newMaxConcurrentStreams(_ maxStreams: Int) -> Int {
switch self.state {
case .starting, .backingOff, .closed:
preconditionFailure("Invalid state for updating max concurrent streams: \(self.state)")
case .active(let conn, _, let usedStreams, let lastIdle):
self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle)
return max(maxStreams - usedStreams, 0)
case .draining(let conn, _, let usedStreams):
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
return 0
}
}
mutating func goAwayReceived() -> EventLoop {
switch self.state {
case .starting, .backingOff, .closed:
preconditionFailure("Invalid state for draining a connection: \(self.state)")
case .active(let conn, let maxStreams, let usedStreams, _):
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
return conn.eventLoop
case .draining(let conn, _, _):
// we could potentially receive another go away while we drain all active streams and we just ignore it
return conn.eventLoop
}
}
/// The connection failed to start
mutating func failedToConnect() {
switch self.state {
case .starting:
self.state = .backingOff
case .backingOff, .active, .draining, .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func fail() {
switch self.state {
case .starting, .active, .backingOff, .draining:
self.state = .closed
case .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func lease(_ count: Int) -> Connection {
switch self.state {
case .starting, .backingOff, .draining, .closed:
preconditionFailure("Invalid state for leasing a stream: \(self.state)")
case .active(let conn, let maxStreams, var usedStreams, let lastIdle):
usedStreams += count
precondition(usedStreams <= maxStreams, "tried to lease a connection which is not available")
self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle)
return conn
}
}
/// should be called after a request has finished and the stream can be used again for a new request
/// - Returns: number of available streams which can be leased
mutating func release() -> Int {
switch self.state {
case .starting, .backingOff, .closed:
preconditionFailure("Invalid state: \(self.state)")
case .active(let conn, let maxStreams, var usedStreams, var lastIdle):
precondition(usedStreams > 0, "we cannot release more streams than we have leased")
usedStreams &-= 1
if usedStreams == 0 {
lastIdle = .now()
}
self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle)
return max(maxStreams &- usedStreams, 0)
case .draining(let conn, let maxStreams, var usedStreams):
precondition(usedStreams > 0, "we cannot release more streams than we have leased")
usedStreams &-= 1
self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams)
return 0
}
}
mutating func close() -> Connection {
switch self.state {
case .active(let conn, _, 0, _):
self.state = .closed
return conn
case .starting, .backingOff, .draining, .closed, .active:
preconditionFailure("Invalid state for closing a connection: \(self.state)")
}
}
enum CleanupAction {
case removeConnection
case keepConnection
}
/// Cleanup the current connection for shutdown.
///
/// This method is called, when the connections shall shutdown. Depending on the state
/// the connection is in, it adds itself to one of the arrays that are used to signal shutdown
/// intent to the underlying connections. Connections that are backing off can be easily
/// dropped (since, we only need to cancel the backoff timer), connections that are leased
/// need to be cancelled (notifying the `ChannelHandler` that we want to cancel the
/// running request), connections that are idle can be closed right away. Sadly we can't
/// cancel connection starts right now. For this reason we need to wait for them to succeed
/// or fail until we finalize the shutdown.
///
/// - Parameter context: A cleanup context to add the connection to based on its state.
/// - Returns: A cleanup action indicating if the connection can be removed from the
/// connection list.
func cleanup(_ context: inout CleanupContext) -> CleanupAction {
switch self.state {
case .starting:
return .keepConnection
case .backingOff:
context.connectBackoff.append(self.connectionID)
return .removeConnection
case .active(let connection, _, let usedStreams, _):
precondition(usedStreams >= 0)
if usedStreams == 0 {
context.close.append(connection)
return .removeConnection
} else {
context.cancel.append(connection)
return .keepConnection
}
case .draining(let connection, _, _):
context.cancel.append(connection)
return .keepConnection
case .closed:
preconditionFailure("Unexpected state for cleanup: Did not expect to have closed connections in the state machine.")
}
}
func addStats(into stats: inout HTTP2Connections.Stats) {
switch self.state {
case .starting:
stats.startingConnections &+= 1
case .backingOff:
stats.backingOffConnections &+= 1
case .active(_, let maxStreams, let usedStreams, _):
stats.availableStreams += max(maxStreams - usedStreams, 0)
stats.leasedStreams += usedStreams
stats.availableConnections &+= 1
precondition(usedStreams >= 0)
if usedStreams == 0 {
stats.idleConnections &+= 1
}
case .draining(_, _, let usedStreams):
stats.drainingConnections &+= 1
stats.leasedStreams += usedStreams
precondition(usedStreams >= 0)
case .closed:
break
}
}
enum MigrateAction {
case removeConnection
case keepConnection
}
func migrateToHTTP1(
context: inout HTTP2Connections.HTTP2ToHTTP1MigrationContext
) -> MigrateAction {
switch self.state {
case .starting:
context.starting.append((self.connectionID, self.eventLoop))
return .removeConnection
case .active(let connection, _, let usedStreams, _):
precondition(usedStreams >= 0)
if usedStreams == 0 {
context.close.append(connection)
return .removeConnection
} else {
return .keepConnection
}
case .draining:
return .keepConnection
case .backingOff:
context.backingOff.append((self.connectionID, self.eventLoop))
return .removeConnection
case .closed:
preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)")
}
}
init(connectionID: Connection.ID, eventLoop: EventLoop) {
self.connectionID = connectionID
self.eventLoop = eventLoop
self.state = .starting
}
}
struct HTTP2Connections {
/// A connectionID generator.
private let generator: Connection.ID.Generator
/// The connections states
private var connections: [HTTP2ConnectionState]
var isEmpty: Bool {
self.connections.isEmpty
}
var stats: Stats {
self.connections.reduce(into: Stats()) { stats, connection in
connection.addStats(into: &stats)
}
}
init(generator: Connection.ID.Generator) {
self.generator = generator
self.connections = []
}
// MARK: Migration
/// We only handle starting and backing off connection here.
/// All already running connections must be handled by the enclosing state machine.
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
mutating func migrateFromHTTP1(
starting: [(Connection.ID, EventLoop)],
backingOff: [(Connection.ID, EventLoop)]
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP2ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
self.connections.append(newConnection)
}
for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP2ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
// TODO: Maybe we want to add a static init for backing off connections to HTTP2ConnectionState
backingOffConnection.failedToConnect()
self.connections.append(backingOffConnection)
}
}
/// We will create new connections for `requiredEventLoopsOfPendingRequests`
/// if we do not already have a connection that can or will be able to execute requests on the given event loop.
/// - Parameters:
/// - requiredEventLoopsForPendingRequests: event loops for which we have requests with a required event loop. Duplicates are not allowed.
/// - Returns: new connections that need to be created
mutating func createConnectionsAfterMigrationIfNeeded(
requiredEventLoopsOfPendingRequests: [EventLoop]
) -> [(Connection.ID, EventLoop)] {
// create new connections for requests with a required event loop
let eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests = Set(
self.connections.lazy
.filter {
$0.canOrWillBeAbleToExecuteRequests
}.map {
$0.eventLoop.id
}
)
return requiredEventLoopsOfPendingRequests.compactMap { eventLoop -> (Connection.ID, EventLoop)? in
guard !eventLoopsWithConnectionThatCanOrWillBeAbleToExecuteRequests.contains(eventLoop.id)
else { return nil }
let connectionID = self.createNewConnection(on: eventLoop)
return (connectionID, eventLoop)
}
}
struct HTTP2ToHTTP1MigrationContext {
var backingOff: [(Connection.ID, EventLoop)] = []
var starting: [(Connection.ID, EventLoop)] = []
var close: [Connection] = []
}
mutating func migrateToHTTP1() -> HTTP2ToHTTP1MigrationContext {
var context = HTTP2ToHTTP1MigrationContext()
self.connections.removeAll { connection in
switch connection.migrateToHTTP1(context: &context) {
case .removeConnection:
return false
case .keepConnection:
return true
}
}
return context
}
// MARK: Connection creation
/// true if one ore more connections are active
var hasActiveConnections: Bool {
self.connections.contains { $0.isActive }
}
/// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one
var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool {
self.connections.contains { $0.canOrWillBeAbleToExecuteRequests }
}
/// used in eventLoop scenarios. does at least one connection exist for this eventLoop, or should we create a new one?
/// - Parameter eventLoop: connection `EventLoop` to search for
/// - Returns: true if at least one connection is starting or active for the given `eventLoop`
func hasConnectionThatCanOrWillBeAbleToExecuteRequests(for eventLoop: EventLoop) -> Bool {
self.connections.contains {
$0.eventLoop === eventLoop && $0.canOrWillBeAbleToExecuteRequests
}
}
mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID {
assert(
!self.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop),
"we should not create more than one connection per event loop"
)
let connection = HTTP2ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop)
self.connections.append(connection)
return connection.connectionID
}
/// A new HTTP/2 connection was established.
///
/// This will put the connection into the idle state.
///
/// - Parameter connection: The new established connection.
/// - Returns: An index and an ``EstablishedConnectionContext`` to determine the next action for the now idle connection.
/// Call ``leaseStreams(at:count:)`` or ``closeConnection(at:)`` with the supplied index after
/// this.
mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> (Int, EstablishedConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else {
preconditionFailure("There is a new connection that we didn't request!")
}
precondition(connection.eventLoop === self.connections[index].eventLoop, "Expected the new connection to be on EL")
let availableStreams = self.connections[index].connected(connection, maxStreams: maxConcurrentStreams)
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: connection.eventLoop,
isIdle: self.connections[index].isIdle,
connectionID: connection.id
)
return (index, context)
}
/// Move the connection state to backingOff.
///
/// - Parameter connectionID: The connectionID of the failed connection attempt
/// - Returns: The eventLoop on which to schedule the backoff timer
/// - Precondition: connection needs to be in the `.starting` state
mutating func backoffNextConnectionAttempt(_ connectionID: Connection.ID) -> EventLoop {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to create a new connection that we know nothing about?")
}
self.connections[index].failedToConnect()
return self.connections[index].eventLoop
}
// MARK: Connection lifecycle events
/// Sets the connection with the given `connectionId` to the draining state.
/// - Returns: the `EventLoop` to create a new connection on if applicable
/// - Precondition: connection with given `connectionId` must be either `.active` or already in the `.draining` state
mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("go away recieved for a connection that does not exists")
}
let eventLoop = self.connections[index].goAwayReceived()
return GoAwayContext(eventLoop: eventLoop)
}
/// Update the maximum number of concurrent streams for the given connection.
/// - Parameters:
/// - connectionID: The connectionID for which we received new settings
/// - newMaxStreams: new maximum concurrent streams
/// - Returns: index of the connection and new number of available streams in the `EstablishedConnectionContext`
/// - Precondition: Connections must be in the `.active` or `.draining` state.
mutating func newHTTP2MaxConcurrentStreamsReceived(
_ connectionID: Connection.ID,
newMaxStreams: Int
) -> (Int, EstablishedConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to update the maximum number of concurrent streams for a connection that does not exists")
}
let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams)
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: self.connections[index].eventLoop,
isIdle: self.connections[index].isIdle,
connectionID: connectionID
)
return (index, context)
}
// MARK: Leasing and releasing
mutating func leaseStream(onPreferred eventLoop: EventLoop) -> (Connection, LeasedStreamContext)? {
guard let index = self.findAvailableConnection(onPreferred: eventLoop) else { return nil }
return self.leaseStreams(at: index, count: 1)
}
/// tries to find an available connection on the preferred `eventLoop`. If it can't find one with the given `eventLoop`, it returns the first available connection
private func findAvailableConnection(onPreferred eventLoop: EventLoop) -> Int? {
var availableConnectionIndex: Int?
for (offset, connection) in self.connections.enumerated() {
guard connection.isAvailable else { continue }
if connection.eventLoop === eventLoop {
return self.connections.index(self.connections.startIndex, offsetBy: offset)
} else if availableConnectionIndex == nil {
availableConnectionIndex = self.connections.index(self.connections.startIndex, offsetBy: offset)
}
}
return availableConnectionIndex
}
mutating func leaseStream(onRequired eventLoop: EventLoop) -> (Connection, LeasedStreamContext)? {
guard let index = self.findAvailableConnection(onRequired: eventLoop) else { return nil }
return self.leaseStreams(at: index, count: 1)
}
/// tries to find an available connection on the required `eventLoop`
private func findAvailableConnection(onRequired eventLoop: EventLoop) -> Int? {
self.connections.firstIndex(where: { $0.eventLoop === eventLoop && $0.isAvailable })
}
/// lease `count` streams after connections establishment
/// - Parameters:
/// - index: index of the connection you got by calling `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)`
/// - count: number of streams you want to lease. You get the current available streams from the `EstablishedConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns
/// - Returns: connection to execute `count` requests on
/// - precondition: `index` needs to be valid. `count` must be greater than or equal to *1* and not exceed the number of available streams.
mutating func leaseStreams(at index: Int, count: Int) -> (Connection, LeasedStreamContext) {
precondition(count >= 1, "stream lease count must be greater than or equal to 1")
let isIdle = self.connections[index].isIdle
let connection = self.connections[index].lease(count)
let context = LeasedStreamContext(wasIdle: isIdle)
return (connection, context)
}
mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, EstablishedConnectionContext) {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to release a connection we do not know anything about")
}
let availableStreams = self.connections[index].release()
let context = EstablishedConnectionContext(
availableStreams: availableStreams,
eventLoop: self.connections[index].eventLoop,
isIdle: self.connections[index].isIdle,
connectionID: connectionID
)
return (index, context)
}
// MARK: Connection close/removal
/// Closes the connection at the given index. This will also remove the connection right away.
/// - Parameter index: index of the connection which we get from `releaseStream(_:)`
/// - Returns: closed and removed connection
mutating func closeConnection(at index: Int) -> Connection {
let connection = self.connections[index].close()
self.removeConnection(at: index)
return connection
}
/// removes a closed connection.
/// - Parameter index: index of the connection which we get from `failConnection(_:)`
/// - Precondition: connection must be closed
mutating func removeConnection(at index: Int) {
precondition(self.connections[index].isClosed, "We tried to remove a connection which is not closed")
self.connections.remove(at: index)
}
mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> Connection? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
// because of a race this connection (connection close runs against trigger of timeout)
// was already removed from the state machine.
return nil
}
guard self.connections[index].isIdle else {
// connection is not idle anymore, we may have just leased it for a request
return nil
}
return self.closeConnection(at: index)
}
/// replaces a closed connection by creating a new starting connection.
/// - Parameter index: index of the connection which we got from `failConnection(_:)`
/// - Precondition: connection must be closed
mutating func createNewConnectionByReplacingClosedConnection(at index: Int) -> (Connection.ID, EventLoop) {
precondition(self.connections[index].isClosed)
let newConnection = HTTP2ConnectionState(
connectionID: self.generator.next(),
eventLoop: self.connections[index].eventLoop
)
self.connections[index] = newConnection
return (newConnection.connectionID, newConnection.eventLoop)
}
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
/// When a connection close is initiated by the connection pool (e.g. because the connection was idle for too long), the connection will
/// still report its close to the state machine and then to us. In those cases we must ignore the event.
return nil
}
self.connections[index].fail()
let eventLoop = self.connections[index].eventLoop
let context = FailedConnectionContext(eventLoop: eventLoop)
return (index, context)
}
mutating func shutdown() -> CleanupContext {
var cleanupContext = CleanupContext()
self.connections.removeAll(where: { connectionState in
switch connectionState.cleanup(&cleanupContext) {
case .removeConnection:
return true
case .keepConnection:
return false
}
})
return cleanupContext
}
// MARK: Result structs
/// Information around a connection which is either in the .active or .draining state.
struct EstablishedConnectionContext {
/// number of streams which can be leased
var availableStreams: Int
/// The eventLoop the connection is running on.
var eventLoop: EventLoop
/// true if no stream is leased
var isIdle: Bool
/// id of the connection
var connectionID: Connection.ID
}
struct LeasedStreamContext {
/// true if the connection was idle before leasing the stream
var wasIdle: Bool
}
struct GoAwayContext {
/// The eventLoop the connection is running on.
var eventLoop: EventLoop
}
/// Information around the failed/closed connection.
struct FailedConnectionContext {
/// The eventLoop the connection ran on.
var eventLoop: EventLoop
}
struct Stats: Equatable {
var startingConnections: Int = 0
var backingOffConnections: Int = 0
var idleConnections: Int = 0
var availableConnections: Int = 0
var drainingConnections: Int = 0
var leasedStreams: Int = 0
var availableStreams: Int = 0
}
}
}