Skip to content

Add HTTP2Connection #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import NIO
import NIOHTTP1
import NIOHTTP2

class HTTP2ClientRequestHandler: ChannelDuplexHandler {
final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
typealias OutboundIn = HTTPExecutableRequest
typealias OutboundOut = HTTPClientRequestPart
typealias InboundIn = HTTPClientResponsePart
Expand All @@ -35,10 +35,8 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {

private var request: HTTPExecutableRequest? {
didSet {
if let newRequest = self.request {
if let idleReadTimeout = newRequest.idleReadTimeout {
self.idleReadTimeoutStateMachine = .init(timeAmount: idleReadTimeout)
}
if let newRequest = self.request, let idleReadTimeout = newRequest.idleReadTimeout {
self.idleReadTimeoutStateMachine = .init(timeAmount: idleReadTimeout)
} else {
self.idleReadTimeoutStateMachine = nil
}
Expand Down Expand Up @@ -88,7 +86,7 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
self.runTimeoutAction(timeoutAction, context: context)
}

let action = self.state.channelRead(self.unwrapInboundIn(data))
let action = self.state.channelRead(httpPart)
self.run(action, context: context)
}

Expand All @@ -108,6 +106,8 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
let request = self.unwrapOutboundIn(data)
self.request = request

request.willExecuteRequest(self)

let action = self.state.startRequest(
head: request.requestHead,
metadata: request.requestFramingMetadata
Expand All @@ -120,6 +120,16 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
self.run(action, context: context)
}

func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
switch event {
case HTTPConnectionEvent.cancelRequest:
let action = self.state.requestCancelled()
self.run(action, context: context)
default:
context.fireUserInboundEventTriggered(event)
}
}

// MARK: - Private Methods -

// MARK: Run Actions
Expand Down
137 changes: 72 additions & 65 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import NIO
import NIOHTTP2

protocol HTTP2ConnectionDelegate {
func http2Connection(_: HTTP2Connection, newMaxStreamSetting: Int)
func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int)
func http2ConnectionGoAwayReceived(_: HTTP2Connection)
func http2ConnectionClosed(_: HTTP2Connection)
Expand All @@ -26,7 +27,7 @@ struct HTTP2PushNotSupportedError: Error {}

struct HTTP2ReceivedGoAwayBeforeSettingsError: Error {}

class HTTP2Connection {
final class HTTP2Connection {
let channel: Channel
let multiplexer: HTTP2StreamMultiplexer
let logger: Logger
Expand All @@ -37,12 +38,12 @@ class HTTP2Connection {
enum State {
case initialized
case starting(EventLoopPromise<Void>)
case active(HTTP2Settings)
case active(maxStreams: Int)
case closing
case closed
}

/// A structure to store a Channel in a Set.
/// A structure to store a http/2 stream channel in a set.
private struct ChannelBox: Hashable {
struct ID: Hashable {
private let id: ObjectIdentifier
Expand Down Expand Up @@ -71,16 +72,6 @@ class HTTP2Connection {
}
}

var settings: HTTP2Settings? {
self.channel.eventLoop.assertInEventLoop()
switch self.state {
case .initialized, .starting, .closing, .closed:
return nil
case .active(let settings):
return settings
}
}

private var state: State

/// We use this channel set to remember, which open streams we need to inform that
Expand All @@ -89,13 +80,14 @@ class HTTP2Connection {
private var openStreams = Set<ChannelBox>()
let id: HTTPConnectionPool.Connection.ID

var closeFuture: EventLoopFuture<Void> {
self.channel.closeFuture
}

init(channel: Channel,
connectionID: HTTPConnectionPool.Connection.ID,
delegate: HTTP2ConnectionDelegate,
logger: Logger) {
precondition(channel.isActive)
channel.eventLoop.preconditionInEventLoop()

self.channel = channel
self.id = connectionID
self.logger = logger
Expand All @@ -106,7 +98,6 @@ class HTTP2Connection {
outboundBufferSizeHighWatermark: 8196,
outboundBufferSizeLowWatermark: 4092,
inboundStreamInitializer: { (channel) -> EventLoopFuture<Void> in

channel.eventLoop.makeFailedFuture(HTTP2PushNotSupportedError())
}
)
Expand All @@ -116,7 +107,7 @@ class HTTP2Connection {

deinit {
guard case .closed = self.state else {
preconditionFailure("")
preconditionFailure("Connection must be closed, before we can deinit it")
}
}

Expand All @@ -136,20 +127,25 @@ class HTTP2Connection {

self.multiplexer.createStreamChannel(promise: createStreamChannelPromise) { channel -> EventLoopFuture<Void> in
do {
// We only support http/2 over an https connection – using the Application-Layer
// Protocol Negotiation (ALPN). For this reason it is save to fix this to `.https`.
let translate = HTTP2FramePayloadToHTTP1ClientCodec(httpProtocol: .https)
let handler = HTTP2ClientRequestHandler(eventLoop: channel.eventLoop)

try channel.pipeline.syncOperations.addHandler(translate)
try channel.pipeline.syncOperations.addHandler(handler)
channel.write(request, promise: nil)

// We must add the new channel to the list of open channels BEFORE we write the
// request to it. In case of an error, we are sure that the channel was added
// before.
let box = ChannelBox(channel)
self.openStreams.insert(box)
self.channel.closeFuture.whenComplete { _ in
self.openStreams.remove(box)
}

return channel.eventLoop.makeSucceededFuture(Void())
channel.write(request, promise: nil)
return channel.eventLoop.makeSucceededVoidFuture()
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
Expand All @@ -174,50 +170,6 @@ class HTTP2Connection {
self.channel.close()
}

func http2SettingsReceived(_ settings: HTTP2Settings) {
self.channel.eventLoop.assertInEventLoop()

switch self.state {
case .initialized, .closed:
preconditionFailure("Invalid state: \(self.state)")
case .starting(let promise):
self.state = .active(settings)
promise.succeed(())
case .active:
self.state = .active(settings)
case .closing:
// ignore. we only wait for all connections to be closed anyway.
break
}
}

func http2GoAwayReceived() {
self.channel.eventLoop.assertInEventLoop()

switch self.state {
case .initialized, .closed:
preconditionFailure("Invalid state: \(self.state)")

case .starting(let promise):
self.state = .closing
promise.fail(HTTP2ReceivedGoAwayBeforeSettingsError())

case .active:
self.state = .closing
self.delegate.http2ConnectionGoAwayReceived(self)

case .closing:
// we are already closing. Nothing new
break
}
}

func http2StreamClosed(availableStreams: Int) {
self.channel.eventLoop.assertInEventLoop()

self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams)
}

private func start() -> EventLoopFuture<Void> {
self.channel.eventLoop.assertInEventLoop()

Expand All @@ -238,7 +190,7 @@ class HTTP2Connection {
let sync = self.channel.pipeline.syncOperations

let http2Handler = NIOHTTP2Handler(mode: .client, initialSettings: nioDefaultSettings)
let idleHandler = HTTP2IdleHandler(connection: self, logger: self.logger)
let idleHandler = HTTP2IdleHandler(delegate: self, logger: self.logger)

try sync.addHandler(http2Handler, position: .last)
try sync.addHandler(idleHandler, position: .last)
Expand All @@ -256,8 +208,63 @@ class HTTP2Connection {

self.state = .closing

// inform all open streams, that the currently running request should be cancelled.
self.openStreams.forEach { box in
box.channel.triggerUserOutboundEvent(HTTPConnectionEvent.cancelRequest, promise: nil)
}

// inform the idle connection handler, that connection should be closed, once all streams
// are closed.
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.closeConnection, promise: nil)
}
}

extension HTTP2Connection: HTTP2IdleHandlerDelegate {
func http2SettingsReceived(maxStreams: Int) {
self.channel.eventLoop.assertInEventLoop()

switch self.state {
case .initialized:
preconditionFailure("Invalid state: \(self.state)")

case .starting(let promise):
self.state = .active(maxStreams: maxStreams)
promise.succeed(())

case .active:
self.state = .active(maxStreams: maxStreams)
self.delegate.http2Connection(self, newMaxStreamSetting: maxStreams)

case .closing, .closed:
// ignore. we only wait for all connections to be closed anyway.
break
}
}

func http2GoAwayReceived() {
self.channel.eventLoop.assertInEventLoop()

switch self.state {
case .initialized:
preconditionFailure("Invalid state: \(self.state)")

case .starting(let promise):
self.state = .closing
promise.fail(HTTP2ReceivedGoAwayBeforeSettingsError())

case .active:
self.state = .closing
self.delegate.http2ConnectionGoAwayReceived(self)

case .closing, .closed:
// we are already closing. Nothing new
break
}
}

func http2StreamClosed(availableStreams: Int) {
self.channel.eventLoop.assertInEventLoop()

self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams)
}
}
Loading