Skip to content

Cleanup: Connection cancel -> shutdown #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {

func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
switch event {
case HTTPConnectionEvent.cancelRequest:
case HTTPConnectionEvent.shutdownRequested:
self.logger.trace("User outbound event triggered: Cancel request for connection close")
let action = self.state.requestCancelled(closeConnection: true)
self.run(action, context: context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ final class HTTP1Connection {
return connection
}

func execute(request: HTTPExecutableRequest) {
func executeRequest(_ request: HTTPExecutableRequest) {
if self.channel.eventLoop.inEventLoop {
self.execute0(request: request)
} else {
Expand All @@ -75,8 +75,8 @@ final class HTTP1Connection {
}
}

func cancel() {
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.cancelRequest, promise: nil)
func shutdown() {
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
}

func close() -> EventLoopFuture<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {

func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
switch event {
case HTTPConnectionEvent.cancelRequest:
case HTTPConnectionEvent.shutdownRequested:
let action = self.state.requestCancelled()
self.run(action, context: context)
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ final class HTTP2Connection {

// inform all open streams, that the currently running request should be cancelled.
self.openStreams.forEach { box in
box.channel.triggerUserOutboundEvent(HTTPConnectionEvent.cancelRequest, promise: nil)
box.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
}

// inform the idle connection handler, that connection should be closed, once all streams
// are closed.
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.closeConnection, promise: nil)
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ final class HTTP2IdleHandler<Delegate: HTTP2IdleHandlerDelegate>: ChannelDuplexH

func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
switch event {
case HTTPConnectionEvent.closeConnection:
case HTTPConnectionEvent.shutdownRequested:
let action = self.state.closeEventReceived()
self.run(action, context: context)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@
//===----------------------------------------------------------------------===//

enum HTTPConnectionEvent {
case cancelRequest
case closeConnection
case shutdownRequested
}
84 changes: 49 additions & 35 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,89 +18,103 @@ enum HTTPConnectionPool {
struct Connection: Hashable {
typealias ID = Int

// PLEASE NOTE:
// The HTTP/1.1 connection code here is commented out, for a sad and simple reason: We
// don't have a HTTP1Connection yet. As soon as the HTTP1Connection has landed
// (https://github.com/swift-server/async-http-client/pull/400) we will enable
// HTTP1Connections here. Landing the connection box now enables us to already review the
// ConnectionPool StateMachines.

private enum Reference {
// case http1_1(HTTP1Connection)

case http1_1(HTTP1Connection)
case http2(HTTP2Connection)
case __testOnly_connection(ID, EventLoop)
}

private let _ref: Reference

// fileprivate static func http1_1(_ conn: HTTP1Connection) -> Self {
// Connection(_ref: .http1_1(conn))
// }
fileprivate static func http1_1(_ conn: HTTP1Connection) -> Self {
Connection(_ref: .http1_1(conn))
}

fileprivate static func http2(_ conn: HTTP2Connection) -> Self {
Connection(_ref: .http2(conn))
}

static func __testOnly_connection(id: ID, eventLoop: EventLoop) -> Self {
Connection(_ref: .__testOnly_connection(id, eventLoop))
}

var id: ID {
switch self._ref {
// case .http1_1(let connection):
// return connection.id
case .http1_1(let connection):
return connection.id
case .http2(let connection):
return connection.id
case .__testOnly_connection(let id, _):
return id
}
}

var eventLoop: EventLoop {
switch self._ref {
// case .http1_1(let connection):
// return connection.channel.eventLoop
case .http1_1(let connection):
return connection.channel.eventLoop
case .http2(let connection):
return connection.channel.eventLoop
case .__testOnly_connection(_, let eventLoop):
return eventLoop
}
}

@discardableResult
fileprivate func close() -> EventLoopFuture<Void> {
fileprivate func executeRequest(_ request: HTTPExecutableRequest) {
switch self._ref {
// case .http1_1(let connection):
// return connection.close()

case .__testOnly_connection(_, let eventLoop):
return eventLoop.makeSucceededFuture(())
case .http1_1(let connection):
return connection.executeRequest(request)
case .http2(let connection):
return connection.executeRequest(request)
case .__testOnly_connection:
break
}
}

fileprivate func execute(request: HTTPExecutableRequest) {
/// Shutdown cancels any running requests on the connection and then closes the connection
fileprivate func shutdown() {
switch self._ref {
// case .http1_1(let connection):
// return connection.execute(request: request)
case .http1_1(let connection):
return connection.shutdown()
case .http2(let connection):
return connection.shutdown()
case .__testOnly_connection:
break
}
}

fileprivate func cancel() {
/// Closes the connection without cancelling running requests. Use this when you are sure, that the
/// connection is currently idle.
fileprivate func close() -> EventLoopFuture<Void> {
switch self._ref {
// case .http1_1(let connection):
// return connection.cancel()
case .__testOnly_connection:
break
case .http1_1(let connection):
return connection.close()
case .http2(let connection):
return connection.close()
case .__testOnly_connection(_, let eventLoop):
return eventLoop.makeSucceededFuture(())
}
}

static func == (lhs: HTTPConnectionPool.Connection, rhs: HTTPConnectionPool.Connection) -> Bool {
switch (lhs._ref, rhs._ref) {
// case (.http1_1(let lhsConn), .http1_1(let rhsConn)):
// return lhsConn === rhsConn
case (.http1_1(let lhsConn), .http1_1(let rhsConn)):
return lhsConn.id == rhsConn.id
case (.http2(let lhsConn), .http2(let rhsConn)):
return lhsConn.id == rhsConn.id
case (.__testOnly_connection(let lhsID, let lhsEventLoop), .__testOnly_connection(let rhsID, let rhsEventLoop)):
return lhsID == rhsID && lhsEventLoop === rhsEventLoop
// default:
// return false
default:
return false
}
}

func hash(into hasher: inout Hasher) {
switch self._ref {
case .http1_1(let conn):
hasher.combine(conn.id)
case .http2(let conn):
hasher.combine(conn.id)
case .__testOnly_connection(let id, let eventLoop):
hasher.combine(id)
hasher.combine(eventLoop.id)
Expand Down
10 changes: 5 additions & 5 deletions Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }

testUtils.connection.execute(request: requestBag)
testUtils.connection.executeRequest(requestBag)

XCTAssertNoThrow(try embedded.receiveHeadAndVerify {
XCTAssertEqual($0.method, .GET)
Expand Down Expand Up @@ -134,7 +134,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
embedded.isWritable = false
testWriter.writabilityChanged(false)
embedded.pipeline.fireChannelWritabilityChanged()
testUtils.connection.execute(request: requestBag)
testUtils.connection.executeRequest(requestBag)

XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)

Expand Down Expand Up @@ -211,7 +211,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }

testUtils.connection.execute(request: requestBag)
testUtils.connection.executeRequest(requestBag)

XCTAssertNoThrow(try embedded.receiveHeadAndVerify {
XCTAssertEqual($0.method, .GET)
Expand All @@ -223,7 +223,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
XCTAssertTrue(embedded.isActive)
XCTAssertEqual(testUtils.connectionDelegate.hitConnectionClosed, 0)
XCTAssertEqual(testUtils.connectionDelegate.hitConnectionReleased, 0)
testUtils.connection.cancel()
testUtils.connection.shutdown()
XCTAssertFalse(embedded.isActive)
embedded.embeddedEventLoop.run()
XCTAssertEqual(testUtils.connectionDelegate.hitConnectionClosed, 1)
Expand Down Expand Up @@ -257,7 +257,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }

testUtils.connection.execute(request: requestBag)
testUtils.connection.executeRequest(requestBag)

XCTAssertNoThrow(try embedded.receiveHeadAndVerify {
XCTAssertEqual($0.method, .GET)
Expand Down
4 changes: 2 additions & 2 deletions Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class HTTP1ConnectionTests: XCTestCase {
delegate: ResponseAccumulator(request: request)
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") }
connection.execute(request: requestBag)
connection.executeRequest(requestBag)

XCTAssertNoThrow(try server.receiveHeadAndVerify { head in
XCTAssertEqual(head.method, .POST)
Expand Down Expand Up @@ -230,7 +230,7 @@ class HTTP1ConnectionTests: XCTestCase {
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }

connection.execute(request: requestBag)
connection.executeRequest(requestBag)

var response: HTTPClient.Response?
if counter <= closeOnRequest {
Expand Down
6 changes: 3 additions & 3 deletions Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class HTTP2IdleHandlerTests: XCTestCase {
XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait())

XCTAssertTrue(embedded.isActive)
embedded.pipeline.triggerUserOutboundEvent(HTTPConnectionEvent.closeConnection, promise: nil)
embedded.pipeline.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
XCTAssertFalse(embedded.isActive)
}

Expand All @@ -143,7 +143,7 @@ class HTTP2IdleHandlerTests: XCTestCase {
XCTAssertEqual(delegate.maxStreams, 10)

XCTAssertTrue(embedded.isActive)
embedded.pipeline.triggerUserOutboundEvent(HTTPConnectionEvent.closeConnection, promise: nil)
embedded.pipeline.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
XCTAssertFalse(embedded.isActive)
}

Expand All @@ -167,7 +167,7 @@ class HTTP2IdleHandlerTests: XCTestCase {
openStreams.insert(streamID)
}

embedded.pipeline.triggerUserOutboundEvent(HTTPConnectionEvent.closeConnection, promise: nil)
embedded.pipeline.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
XCTAssertTrue(embedded.isActive)

while let streamID = openStreams.randomElement() {
Expand Down