Skip to content

Refactor Channel creation #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions Sources/AsyncHTTPClient/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import Logging
import NIO
import NIOConcurrencyHelpers
import NIOHTTP1
import NIOHTTPCompression
import NIOSSL
import NIOTLS
import NIOTransportServices
Expand Down Expand Up @@ -86,7 +85,9 @@ final class ConnectionPool {
let provider = HTTP1ConnectionProvider(key: key,
eventLoop: taskEventLoop,
configuration: key.config(overriding: self.configuration),
tlsConfiguration: request.tlsConfiguration,
pool: self,
sslContextCache: self.sslContextCache,
backgroundActivityLogger: self.backgroundActivityLogger)
let enqueued = provider.enqueue()
assert(enqueued)
Expand Down Expand Up @@ -213,6 +214,8 @@ class HTTP1ConnectionProvider {

private let backgroundActivityLogger: Logger

private let factory: HTTPConnectionPool.ConnectionFactory

/// Creates a new `HTTP1ConnectionProvider`
///
/// - parameters:
Expand All @@ -225,7 +228,9 @@ class HTTP1ConnectionProvider {
init(key: ConnectionPool.Key,
eventLoop: EventLoop,
configuration: HTTPClient.Configuration,
tlsConfiguration: TLSConfiguration?,
pool: ConnectionPool,
sslContextCache: SSLContextCache,
backgroundActivityLogger: Logger) {
self.eventLoop = eventLoop
self.configuration = configuration
Expand All @@ -234,6 +239,13 @@ class HTTP1ConnectionProvider {
self.closePromise = eventLoop.makePromise()
self.state = .init(eventLoop: eventLoop)
self.backgroundActivityLogger = backgroundActivityLogger

self.factory = HTTPConnectionPool.ConnectionFactory(
key: self.key,
tlsConfiguration: tlsConfiguration,
clientConfiguration: self.configuration,
sslContextCache: sslContextCache
)
}

deinit {
Expand Down Expand Up @@ -440,12 +452,15 @@ class HTTP1ConnectionProvider {

private func makeChannel(preference: HTTPClient.EventLoopPreference,
logger: Logger) -> EventLoopFuture<Channel> {
return NIOClientTCPBootstrap.makeHTTP1Channel(destination: self.key,
eventLoop: self.eventLoop,
configuration: self.configuration,
sslContextCache: self.pool.sslContextCache,
preference: preference,
logger: logger)
let connectionID = HTTPConnectionPool.Connection.ID.globalGenerator.next()
let eventLoop = preference.bestEventLoop ?? self.eventLoop
let deadline = .now() + self.configuration.timeout.connectionCreationTimeout
return self.factory.makeHTTP1Channel(
connectionID: connectionID,
deadline: deadline,
eventLoop: eventLoop,
logger: logger
)
}

/// A `Waiter` represents a request that waits for a connection when none is
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import NIOHTTP1

final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHandler {
typealias OutboundIn = Never
typealias OutboundOut = HTTPClientRequestPart
typealias InboundIn = HTTPClientResponsePart

enum State {
// transitions to `.connectSent` or `.failed`
case initialized
// transitions to `.headReceived` or `.failed`
case connectSent(Scheduled<Void>)
// transitions to `.completed` or `.failed`
case headReceived(Scheduled<Void>)
// final error state
case failed(Error)
// final success state
case completed
}

private var state: State = .initialized

private let targetHost: String
private let targetPort: Int
private let proxyAuthorization: HTTPClient.Authorization?
private let deadline: NIODeadline

private var proxyEstablishedPromise: EventLoopPromise<Void>?
var proxyEstablishedFuture: EventLoopFuture<Void>? {
return self.proxyEstablishedPromise?.futureResult
}

init(targetHost: String,
targetPort: Int,
proxyAuthorization: HTTPClient.Authorization?,
deadline: NIODeadline) {
self.targetHost = targetHost
self.targetPort = targetPort
self.proxyAuthorization = proxyAuthorization
self.deadline = deadline
}

func handlerAdded(context: ChannelHandlerContext) {
self.proxyEstablishedPromise = context.eventLoop.makePromise(of: Void.self)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!

self.sendConnect(context: context)
}

func handlerRemoved(context: ChannelHandlerContext) {
switch self.state {
case .failed, .completed:
break
case .initialized, .connectSent, .headReceived:
struct NoResult: Error {}
self.state = .failed(NoResult())
self.proxyEstablishedPromise?.fail(NoResult())
}
}

func channelActive(context: ChannelHandlerContext) {
self.sendConnect(context: context)
}

func channelInactive(context: ChannelHandlerContext) {
switch self.state {
case .initialized:
preconditionFailure("How can we receive a channelInactive before a channelActive?")
case .connectSent(let timeout), .headReceived(let timeout):
timeout.cancel()
self.failWithError(HTTPClientError.remoteConnectionClosed, context: context, closeConnection: false)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

case .failed, .completed:
break
}
}

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
preconditionFailure("We don't support outgoing traffic during HTTP Proxy update.")
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
switch self.unwrapInboundIn(data) {
case .head(let head):
self.handleHTTPHeadReceived(head, context: context)
case .body:
self.handleHTTPBodyReceived(context: context)
case .end:
self.handleHTTPEndReceived(context: context)
}
}

private func sendConnect(context: ChannelHandlerContext) {
guard case .initialized = self.state else {
// we might run into this handler twice, once in handlerAdded and once in channelActive.
return
}

let timeout = context.eventLoop.scheduleTask(deadline: self.deadline) {
switch self.state {
case .initialized:
preconditionFailure("How can we have a scheduled timeout, if the connection is not even up?")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

case .connectSent, .headReceived:
self.failWithError(HTTPClientError.httpProxyHandshakeTimeout, context: context)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

case .failed, .completed:
break
}
}

self.state = .connectSent(timeout)

var head = HTTPRequestHead(
version: .init(major: 1, minor: 1),
method: .CONNECT,
uri: "\(self.targetHost):\(self.targetPort)"
)
if let authorization = self.proxyAuthorization {
head.headers.replaceOrAdd(name: "proxy-authorization", value: authorization.headerValue)
}
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
context.flush()
}

private func handleHTTPHeadReceived(_ head: HTTPResponseHead, context: ChannelHandlerContext) {
guard case .connectSent(let scheduled) = self.state else {
preconditionFailure("HTTPDecoder should throw an error, if we have not send a request")
}

switch head.status.code {
case 200..<300:
// Any 2xx (Successful) response indicates that the sender (and all
// inbound proxies) will switch to tunnel mode immediately after the
// blank line that concludes the successful response's header section
self.state = .headReceived(scheduled)
case 407:
self.failWithError(HTTPClientError.proxyAuthenticationRequired, context: context)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

default:
// Any response other than a successful response indicates that the tunnel
// has not yet been formed and that the connection remains governed by HTTP.
self.failWithError(HTTPClientError.invalidProxyResponse, context: context)
}
}

private func handleHTTPBodyReceived(context: ChannelHandlerContext) {
switch self.state {
case .headReceived(let timeout):
timeout.cancel()
// we don't expect a body
self.failWithError(HTTPClientError.invalidProxyResponse, context: context)
case .failed:
// ran into an error before... ignore this one
break
case .completed, .connectSent, .initialized:
preconditionFailure("Invalid state: \(self.state)")
}
}

private func handleHTTPEndReceived(context: ChannelHandlerContext) {
switch self.state {
case .headReceived(let timeout):
timeout.cancel()
self.state = .completed
self.proxyEstablishedPromise?.succeed(())

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

case .failed:
// ran into an error before... ignore this one
break
case .initialized, .connectSent, .completed:
preconditionFailure("Invalid state: \(self.state)")
}
}

private func failWithError(_ error: Error, context: ChannelHandlerContext, closeConnection: Bool = true) {
self.state = .failed(error)
self.proxyEstablishedPromise?.fail(error)
context.fireErrorCaught(error)
if closeConnection {
context.close(mode: .all, promise: nil)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import NIOSOCKS

final class SOCKSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = NIOAny

enum State {
// transitions to channelActive or failed
case initialized
// transitions to socksEstablished or failed
case channelActive(Scheduled<Void>)
// final success state
case socksEstablished
// final success state
case failed(Error)
}

private var socksEstablishedPromise: EventLoopPromise<Void>?
var socksEstablishedFuture: EventLoopFuture<Void>? {
return self.socksEstablishedPromise?.futureResult
}

private let deadline: NIODeadline
private var state: State = .initialized

init(deadline: NIODeadline) {
self.deadline = deadline
}

func handlerAdded(context: ChannelHandlerContext) {
self.socksEstablishedPromise = context.eventLoop.makePromise(of: Void.self)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

if context.channel.isActive {
self.connectionStarted(context: context)
}
}

func handlerRemoved(context: ChannelHandlerContext) {
struct NoResult: Error {}
self.socksEstablishedPromise!.fail(NoResult())
}

func channelActive(context: ChannelHandlerContext) {
self.connectionStarted(context: context)
}

func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
guard event is SOCKSProxyEstablishedEvent else {
return context.fireUserInboundEventTriggered(event)
}

switch self.state {
case .initialized:
preconditionFailure("How can we establish a SOCKS connection, if we are not connected?")
case .socksEstablished:
preconditionFailure("`SOCKSProxyEstablishedEvent` must only be fired once.")
case .channelActive(let scheduled):
self.state = .socksEstablished
scheduled.cancel()
self.socksEstablishedPromise?.succeed(())
context.fireUserInboundEventTriggered(event)
case .failed:
// potentially a race with the timeout...
break
}
}

func errorCaught(context: ChannelHandlerContext, error: Error) {
switch self.state {
case .initialized:
self.state = .failed(error)
self.socksEstablishedPromise?.fail(error)
case .channelActive(let scheduled):
scheduled.cancel()
self.state = .failed(error)
self.socksEstablishedPromise?.fail(error)
case .socksEstablished, .failed:
break
}
context.fireErrorCaught(error)
}

private func connectionStarted(context: ChannelHandlerContext) {
guard case .initialized = self.state else {
return
}

let scheduled = context.eventLoop.scheduleTask(deadline: self.deadline) {
switch self.state {
case .initialized, .channelActive:
// close the connection, if the handshake timed out
context.close(mode: .all, promise: nil)
let error = HTTPClientError.socksHandshakeTimeout
self.state = .failed(error)
self.socksEstablishedPromise?.fail(error)
case .failed, .socksEstablished:
break
}
}

self.state = .channelActive(scheduled)
}
}
Loading