Skip to content

Commit b9e6497

Browse files
committed
Channel creation refactored
1 parent 711622b commit b9e6497

17 files changed

+1064
-538
lines changed

Diff for: Package.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ let package = Package(
2323
dependencies: [
2424
.package(url: "https://github.com/apple/swift-nio.git", from: "2.29.0"),
2525
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.13.0"),
26-
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.9.1"),
26+
.package(url: "https://github.com/fabianfett/swift-nio-extras.git", .branch("ff-socks-client-removes-itself")),
2727
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.5.1"),
2828
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"),
2929
],

Diff for: Sources/AsyncHTTPClient/ConnectionPool.swift

+32-6
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ final class ConnectionPool {
8686
let provider = HTTP1ConnectionProvider(key: key,
8787
eventLoop: taskEventLoop,
8888
configuration: key.config(overriding: self.configuration),
89+
tlsConfiguration: request.tlsConfiguration,
8990
pool: self,
91+
sslContextCache: self.sslContextCache,
9092
backgroundActivityLogger: self.backgroundActivityLogger)
9193
let enqueued = provider.enqueue()
9294
assert(enqueued)
@@ -213,6 +215,8 @@ class HTTP1ConnectionProvider {
213215

214216
private let backgroundActivityLogger: Logger
215217

218+
private let factory: HTTPConnectionPool.ConnectionFactory
219+
216220
/// Creates a new `HTTP1ConnectionProvider`
217221
///
218222
/// - parameters:
@@ -225,7 +229,9 @@ class HTTP1ConnectionProvider {
225229
init(key: ConnectionPool.Key,
226230
eventLoop: EventLoop,
227231
configuration: HTTPClient.Configuration,
232+
tlsConfiguration: TLSConfiguration?,
228233
pool: ConnectionPool,
234+
sslContextCache: SSLContextCache,
229235
backgroundActivityLogger: Logger) {
230236
self.eventLoop = eventLoop
231237
self.configuration = configuration
@@ -234,6 +240,13 @@ class HTTP1ConnectionProvider {
234240
self.closePromise = eventLoop.makePromise()
235241
self.state = .init(eventLoop: eventLoop)
236242
self.backgroundActivityLogger = backgroundActivityLogger
243+
244+
self.factory = HTTPConnectionPool.ConnectionFactory(
245+
key: self.key,
246+
tlsConfiguration: tlsConfiguration ?? configuration.tlsConfiguration ?? .forClient(),
247+
clientConfiguration: self.configuration,
248+
sslContextCache: sslContextCache
249+
)
237250
}
238251

239252
deinit {
@@ -440,12 +453,25 @@ class HTTP1ConnectionProvider {
440453

441454
private func makeChannel(preference: HTTPClient.EventLoopPreference,
442455
logger: Logger) -> EventLoopFuture<Channel> {
443-
return NIOClientTCPBootstrap.makeHTTP1Channel(destination: self.key,
444-
eventLoop: self.eventLoop,
445-
configuration: self.configuration,
446-
sslContextCache: self.pool.sslContextCache,
447-
preference: preference,
448-
logger: logger)
456+
let connectionID = HTTPConnectionPool.Connection.ID.globalGenerator.next()
457+
let eventLoop = preference.bestEventLoop ?? self.eventLoop
458+
return self.factory.makeBestChannel(connectionID: connectionID, eventLoop: eventLoop, logger: logger).flatMapThrowing {
459+
(channel, _) -> Channel in
460+
461+
// add the http1.1 channel handlers
462+
let syncOperations = channel.pipeline.syncOperations
463+
try syncOperations.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes)
464+
465+
switch self.configuration.decompression {
466+
case .disabled:
467+
()
468+
case .enabled(let limit):
469+
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
470+
try syncOperations.addHandler(decompressHandler)
471+
}
472+
473+
return channel
474+
}
449475
}
450476

451477
/// A `Waiter` represents a request that waits for a connection when none is
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import NIOHTTP1
17+
18+
final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHandler {
19+
typealias OutboundIn = Never
20+
typealias OutboundOut = HTTPClientRequestPart
21+
typealias InboundIn = HTTPClientResponsePart
22+
23+
enum State {
24+
case initialized(EventLoopPromise<Void>)
25+
case connectSent(EventLoopPromise<Void>)
26+
case headReceived(EventLoopPromise<Void>)
27+
case failed(Error)
28+
case completed
29+
}
30+
31+
private var state: State
32+
33+
let targetHost: String
34+
let targetPort: Int
35+
let proxyAuthorization: HTTPClient.Authorization?
36+
37+
init(targetHost: String,
38+
targetPort: Int,
39+
proxyAuthorization: HTTPClient.Authorization?,
40+
connectPromise: EventLoopPromise<Void>) {
41+
self.targetHost = targetHost
42+
self.targetPort = targetPort
43+
self.proxyAuthorization = proxyAuthorization
44+
45+
self.state = .initialized(connectPromise)
46+
}
47+
48+
func handlerAdded(context: ChannelHandlerContext) {
49+
precondition(context.channel.isActive, "Expected to be added to an active channel")
50+
51+
self.sendConnect(context: context)
52+
}
53+
54+
func handlerRemoved(context: ChannelHandlerContext) {
55+
switch self.state {
56+
case .failed, .completed:
57+
break
58+
case .initialized, .connectSent, .headReceived:
59+
preconditionFailure("Removing the handler, while connecting seems wrong")
60+
}
61+
}
62+
63+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
64+
preconditionFailure("We don't support outgoing traffic during HTTP Proxy update.")
65+
}
66+
67+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
68+
switch self.unwrapInboundIn(data) {
69+
case .head(let head):
70+
guard case .connectSent(let promise) = self.state else {
71+
preconditionFailure("HTTPDecoder should throw an error, if we have not send a request")
72+
}
73+
74+
switch head.status.code {
75+
case 200..<300:
76+
// Any 2xx (Successful) response indicates that the sender (and all
77+
// inbound proxies) will switch to tunnel mode immediately after the
78+
// blank line that concludes the successful response's header section
79+
self.state = .headReceived(promise)
80+
case 407:
81+
let error = HTTPClientError.proxyAuthenticationRequired
82+
self.state = .failed(error)
83+
context.close(promise: nil)
84+
promise.fail(error)
85+
default:
86+
// Any response other than a successful response
87+
// indicates that the tunnel has not yet been formed and that the
88+
// connection remains governed by HTTP.
89+
let error = HTTPClientError.invalidProxyResponse
90+
self.state = .failed(error)
91+
context.close(promise: nil)
92+
promise.fail(error)
93+
}
94+
case .body:
95+
switch self.state {
96+
case .headReceived(let promise):
97+
// we don't expect a body
98+
let error = HTTPClientError.invalidProxyResponse
99+
self.state = .failed(error)
100+
context.close(promise: nil)
101+
promise.fail(error)
102+
case .failed:
103+
// ran into an error before... ignore this one
104+
break
105+
case .completed, .connectSent, .initialized:
106+
preconditionFailure("Invalid state")
107+
}
108+
109+
case .end:
110+
switch self.state {
111+
case .headReceived(let promise):
112+
self.state = .completed
113+
promise.succeed(())
114+
case .failed:
115+
// ran into an error before... ignore this one
116+
break
117+
case .initialized, .connectSent, .completed:
118+
preconditionFailure("Invalid state")
119+
}
120+
}
121+
}
122+
123+
func sendConnect(context: ChannelHandlerContext) {
124+
guard case .initialized(let promise) = self.state else {
125+
preconditionFailure("Invalid state")
126+
}
127+
128+
self.state = .connectSent(promise)
129+
130+
var head = HTTPRequestHead(
131+
version: .init(major: 1, minor: 1),
132+
method: .CONNECT,
133+
uri: "\(self.targetHost):\(self.targetPort)"
134+
)
135+
head.headers.add(name: "proxy-connection", value: "keep-alive")
136+
if let authorization = self.proxyAuthorization {
137+
head.headers.add(name: "proxy-authorization", value: authorization.headerValue)
138+
}
139+
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
140+
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
141+
context.flush()
142+
}
143+
}

0 commit comments

Comments
 (0)