Skip to content

Commit aec3fee

Browse files
authored
All internal connection flow should be executed when shutting down (#268)
1 parent c097c17 commit aec3fee

File tree

4 files changed

+176
-17
lines changed

4 files changed

+176
-17
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool.swift

+14-12
Original file line numberDiff line numberDiff line change
@@ -374,15 +374,6 @@ class HTTP1ConnectionProvider {
374374
"ahc-action": "\(action)"])
375375
connection.channel.close(promise: nil)
376376
self.execute(action, logger: logger)
377-
case .cancel(let connection, let close):
378-
logger.trace("cancelling connection",
379-
metadata: ["ahc-connection": "\(connection)",
380-
"ahc-close": "\(close)"])
381-
connection.cancel().whenComplete { _ in
382-
if close {
383-
self.closeAndDelete()
384-
}
385-
}
386377
case .fail(let waiter, let error):
387378
logger.debug("failing connection for waiter",
388379
metadata: ["ahc-waiter": "\(waiter)",
@@ -428,7 +419,17 @@ class HTTP1ConnectionProvider {
428419
}
429420
return self.state.offer(connection: connection)
430421
}
431-
waiter.promise.succeed(connection)
422+
423+
switch action {
424+
case .closeAnd:
425+
// This happens when client was shut down during connect
426+
logger.trace("connection cancelled due to client shutdown",
427+
metadata: ["ahc-connection": "\(channel)"])
428+
connection.channel.close(promise: nil)
429+
waiter.promise.fail(HTTPClientError.cancelled)
430+
default:
431+
waiter.promise.succeed(connection)
432+
}
432433
case .failure(let error):
433434
logger.debug("connection attempt failed",
434435
metadata: ["ahc-error": "\(error)"])
@@ -437,6 +438,7 @@ class HTTP1ConnectionProvider {
437438
}
438439
waiter.promise.fail(error)
439440
}
441+
440442
waiter.setupComplete.whenComplete { _ in
441443
self.execute(action, logger: logger)
442444
}
@@ -456,7 +458,7 @@ class HTTP1ConnectionProvider {
456458
// Since both `.park` and `.deleteProvider` are terminal in terms of execution,
457459
// we can execute them immediately
458460
self.execute(action, logger: logger)
459-
case .cancel, .closeAnd, .create, .fail, .lease, .parkAnd, .replace:
461+
case .closeAnd, .create, .fail, .lease, .parkAnd, .replace:
460462
// This is needed to start a new stack, otherwise, since this is called on a previous
461463
// future completion handler chain, it will be growing indefinitely until the connection is closed.
462464
// We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved.
@@ -493,7 +495,7 @@ class HTTP1ConnectionProvider {
493495
$0.promise.fail(HTTPClientError.cancelled)
494496
}
495497

496-
if available.isEmpty, leased.isEmpty {
498+
if available.isEmpty, leased.isEmpty, clean {
497499
self.closePromise.succeed(())
498500
return self.closePromise.futureResult.map { clean }
499501
}

Diff for: Sources/AsyncHTTPClient/ConnectionsState.swift

+13-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ extension HTTP1ConnectionProvider {
2323
case park(Connection)
2424
case none
2525
case fail(Waiter, Error)
26-
case cancel(Connection, Bool)
2726
indirect case closeAnd(Connection, Action)
2827
indirect case parkAnd(Connection, Action)
2928
}
@@ -209,8 +208,9 @@ extension HTTP1ConnectionProvider {
209208
case .active:
210209
self.leasedConnections.insert(ConnectionKey(connection))
211210
return .none
212-
case .closed: // This can happen when we close the client while connections was being estableshed
213-
return .cancel(connection, self.isEmpty)
211+
case .closed: // This can happen when we close the client while connections was being established
212+
self.openedConnectionsCount -= 1
213+
return .closeAnd(connection, self.processNextWaiter())
214214
}
215215
}
216216

@@ -233,7 +233,9 @@ extension HTTP1ConnectionProvider {
233233
// user calls `syncShutdown` before we received an error from the bootstrap. In this scenario,
234234
// pool will be `.closed` but connection will be still in the process of being established/failed,
235235
// so then this process finishes, it will get to this point.
236-
return .none
236+
// We need to call `processNextWaiter` to finish deleting provider from the pool.
237+
self.openedConnectionsCount -= 1
238+
return self.processNextWaiter()
237239
}
238240
}
239241

@@ -273,7 +275,13 @@ extension HTTP1ConnectionProvider {
273275

274276
return .closeAnd(connection, self.processNextWaiter())
275277
case .closed:
276-
return .none
278+
// This situation can happen when we call close, state changes, but before we call `close` on all
279+
// available connections, in this case we should close this connection and, potentially,
280+
// delete the provider
281+
self.openedConnectionsCount -= 1
282+
self.availableConnections.removeAll { $0 === connection }
283+
284+
return .closeAnd(connection, self.processNextWaiter())
277285
}
278286
}
279287

Diff for: Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift

+4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ extension ConnectionPoolTests {
6262
("testConnectionRemoteCloseRelease", testConnectionRemoteCloseRelease),
6363
("testConnectionTimeoutRelease", testConnectionTimeoutRelease),
6464
("testAcquireAvailableBecomesUnavailable", testAcquireAvailableBecomesUnavailable),
65+
("testShutdownOnPendingAndSuccess", testShutdownOnPendingAndSuccess),
66+
("testShutdownOnPendingAndError", testShutdownOnPendingAndError),
67+
("testShutdownTimeout", testShutdownTimeout),
68+
("testShutdownRemoteClosed", testShutdownRemoteClosed),
6569
]
6670
}
6771
}

Diff for: Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift

+145
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,151 @@ class ConnectionPoolTests: XCTestCase {
13911391
}
13921392
}
13931393

1394+
// MARK: - Shutdown tests
1395+
1396+
func testShutdownOnPendingAndSuccess() {
1397+
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)
1398+
1399+
XCTAssertTrue(state.enqueue())
1400+
1401+
let connectionPromise = self.eventLoop.makePromise(of: Connection.self)
1402+
let setupPromise = self.eventLoop.makePromise(of: Void.self)
1403+
let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent)
1404+
var action = state.acquire(waiter: waiter)
1405+
1406+
guard case .create = action else {
1407+
XCTFail("unexpected action \(action)")
1408+
return
1409+
}
1410+
1411+
let snapshot = state.testsOnly_getInternalState()
1412+
XCTAssertEqual(snapshot.openedConnectionsCount, 1)
1413+
1414+
if let (waiters, available, leased, clean) = state.close() {
1415+
XCTAssertTrue(waiters.isEmpty)
1416+
XCTAssertTrue(available.isEmpty)
1417+
XCTAssertTrue(leased.isEmpty)
1418+
XCTAssertFalse(clean)
1419+
} else {
1420+
XCTFail("Expecting snapshot")
1421+
}
1422+
1423+
let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)
1424+
1425+
action = state.offer(connection: connection)
1426+
guard case .closeAnd(_, .closeProvider) = action else {
1427+
XCTFail("unexpected action \(action)")
1428+
return
1429+
}
1430+
1431+
connectionPromise.fail(TempError())
1432+
setupPromise.succeed(())
1433+
}
1434+
1435+
func testShutdownOnPendingAndError() {
1436+
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)
1437+
1438+
XCTAssertTrue(state.enqueue())
1439+
1440+
let connectionPromise = self.eventLoop.makePromise(of: Connection.self)
1441+
let setupPromise = self.eventLoop.makePromise(of: Void.self)
1442+
let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent)
1443+
var action = state.acquire(waiter: waiter)
1444+
1445+
guard case .create = action else {
1446+
XCTFail("unexpected action \(action)")
1447+
return
1448+
}
1449+
1450+
let snapshot = state.testsOnly_getInternalState()
1451+
XCTAssertEqual(snapshot.openedConnectionsCount, 1)
1452+
1453+
if let (waiters, available, leased, clean) = state.close() {
1454+
XCTAssertTrue(waiters.isEmpty)
1455+
XCTAssertTrue(available.isEmpty)
1456+
XCTAssertTrue(leased.isEmpty)
1457+
XCTAssertFalse(clean)
1458+
} else {
1459+
XCTFail("Expecting snapshot")
1460+
}
1461+
1462+
action = state.connectFailed()
1463+
guard case .closeProvider = action else {
1464+
XCTFail("unexpected action \(action)")
1465+
return
1466+
}
1467+
1468+
connectionPromise.fail(TempError())
1469+
setupPromise.succeed(())
1470+
}
1471+
1472+
func testShutdownTimeout() {
1473+
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)
1474+
1475+
var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState()
1476+
1477+
let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)
1478+
snapshot.availableConnections.append(connection)
1479+
snapshot.openedConnectionsCount = 1
1480+
1481+
state.testsOnly_setInternalState(snapshot)
1482+
1483+
if let (waiters, available, leased, clean) = state.close() {
1484+
XCTAssertTrue(waiters.isEmpty)
1485+
XCTAssertFalse(available.isEmpty)
1486+
XCTAssertTrue(leased.isEmpty)
1487+
XCTAssertTrue(clean)
1488+
} else {
1489+
XCTFail("Expecting snapshot")
1490+
}
1491+
1492+
let action = state.timeout(connection: connection)
1493+
switch action {
1494+
case .closeAnd(_, let next):
1495+
switch next {
1496+
case .closeProvider:
1497+
// expected
1498+
break
1499+
default:
1500+
XCTFail("Unexpected action: \(action)")
1501+
}
1502+
default:
1503+
XCTFail("Unexpected action: \(action)")
1504+
}
1505+
}
1506+
1507+
func testShutdownRemoteClosed() {
1508+
var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop)
1509+
1510+
var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState()
1511+
1512+
let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider)
1513+
snapshot.availableConnections.append(connection)
1514+
snapshot.openedConnectionsCount = 1
1515+
1516+
state.testsOnly_setInternalState(snapshot)
1517+
1518+
if let (waiters, available, leased, clean) = state.close() {
1519+
XCTAssertTrue(waiters.isEmpty)
1520+
XCTAssertFalse(available.isEmpty)
1521+
XCTAssertTrue(leased.isEmpty)
1522+
XCTAssertTrue(clean)
1523+
} else {
1524+
XCTFail("Expecting snapshot")
1525+
}
1526+
1527+
let action = state.remoteClosed(connection: connection)
1528+
switch action {
1529+
case .closeProvider:
1530+
// expected
1531+
break
1532+
default:
1533+
XCTFail("Unexpected action: \(action)")
1534+
}
1535+
}
1536+
1537+
// MARK: - Helpers
1538+
13941539
override func setUp() {
13951540
XCTAssertNil(self.eventLoop)
13961541
XCTAssertNil(self.http1ConnectionProvider)

0 commit comments

Comments
 (0)