Skip to content

Commit e967f9a

Browse files
authored
Add HTTP1Connection (#400)
1 parent 79db46a commit e967f9a

13 files changed

+1629
-4
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1ClientChannelHandler.swift

+470
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIO
17+
import NIOHTTP1
18+
import NIOHTTPCompression
19+
20+
protocol HTTP1ConnectionDelegate {
21+
func http1ConnectionReleased(_: HTTP1Connection)
22+
func http1ConnectionClosed(_: HTTP1Connection)
23+
}
24+
25+
final class HTTP1Connection {
26+
let channel: Channel
27+
28+
/// the connection's delegate, that will be informed about connection close and connection release
29+
/// (ready to run next request).
30+
let delegate: HTTP1ConnectionDelegate
31+
32+
enum State {
33+
case initialized
34+
case active
35+
case closed
36+
}
37+
38+
private var state: State = .initialized
39+
40+
let id: HTTPConnectionPool.Connection.ID
41+
42+
init(channel: Channel,
43+
connectionID: HTTPConnectionPool.Connection.ID,
44+
delegate: HTTP1ConnectionDelegate) {
45+
self.channel = channel
46+
self.id = connectionID
47+
self.delegate = delegate
48+
}
49+
50+
deinit {
51+
guard case .closed = self.state else {
52+
preconditionFailure("Connection must be closed, before we can deinit it")
53+
}
54+
}
55+
56+
static func start(
57+
channel: Channel,
58+
connectionID: HTTPConnectionPool.Connection.ID,
59+
delegate: HTTP1ConnectionDelegate,
60+
configuration: HTTPClient.Configuration,
61+
logger: Logger
62+
) throws -> HTTP1Connection {
63+
let connection = HTTP1Connection(channel: channel, connectionID: connectionID, delegate: delegate)
64+
try connection.start(configuration: configuration, logger: logger)
65+
return connection
66+
}
67+
68+
func execute(request: HTTPExecutableRequest) {
69+
if self.channel.eventLoop.inEventLoop {
70+
self.execute0(request: request)
71+
} else {
72+
self.channel.eventLoop.execute {
73+
self.execute0(request: request)
74+
}
75+
}
76+
}
77+
78+
func cancel() {
79+
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.cancelRequest, promise: nil)
80+
}
81+
82+
func close() -> EventLoopFuture<Void> {
83+
return self.channel.close()
84+
}
85+
86+
func taskCompleted() {
87+
self.delegate.http1ConnectionReleased(self)
88+
}
89+
90+
private func execute0(request: HTTPExecutableRequest) {
91+
guard self.channel.isActive else {
92+
return request.fail(ChannelError.ioOnClosedChannel)
93+
}
94+
95+
self.channel.write(request, promise: nil)
96+
}
97+
98+
private func start(configuration: HTTPClient.Configuration, logger: Logger) throws {
99+
self.channel.eventLoop.assertInEventLoop()
100+
101+
guard case .initialized = self.state else {
102+
preconditionFailure("Connection must be initialized, to start it")
103+
}
104+
105+
self.state = .active
106+
self.channel.closeFuture.whenComplete { _ in
107+
self.state = .closed
108+
self.delegate.http1ConnectionClosed(self)
109+
}
110+
111+
do {
112+
let sync = self.channel.pipeline.syncOperations
113+
try sync.addHTTPClientHandlers()
114+
115+
if case .enabled(let limit) = configuration.decompression {
116+
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
117+
try sync.addHandler(decompressHandler)
118+
}
119+
120+
let channelHandler = HTTP1ClientChannelHandler(
121+
connection: self,
122+
eventLoop: channel.eventLoop,
123+
logger: logger
124+
)
125+
126+
try sync.addHandler(channelHandler)
127+
} catch {
128+
self.channel.close(mode: .all, promise: nil)
129+
throw error
130+
}
131+
}
132+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
enum HTTPConnectionEvent {
16+
case cancelRequest
17+
}

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift

+4-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ import NIOHTTP1
120120
/// - The executor may have received an error in thread A that it passes along to the request.
121121
/// After having passed on the error, the executor considers the request done and releases
122122
/// the request's reference.
123-
/// - The request may issue a call to `writeRequestBodyPart(_: IOData, task: HTTPExecutingRequest)`
123+
/// - The request may issue a call to `writeRequestBodyPart(_: IOData, task: HTTPExecutableRequest)`
124124
/// on thread B in the same moment the request error above occurred. For this reason it may
125125
/// happen that the executor receives, the invocation of `writeRequestBodyPart` after it has
126126
/// failed the request.
@@ -187,6 +187,9 @@ protocol HTTPRequestExecutor {
187187
}
188188

189189
protocol HTTPExecutableRequest: AnyObject {
190+
/// The request's logger
191+
var logger: Logger { get }
192+
190193
/// The request's head.
191194
///
192195
/// The HTTP request head, that shall be sent. The HTTPRequestExecutor **will not** run any validation

Diff for: Sources/AsyncHTTPClient/RequestBag+StateMachine.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -497,9 +497,9 @@ extension RequestBag.StateMachine {
497497
case .executing(let executor, let requestState, .buffering(_, next: .eof)):
498498
self.state = .executing(executor, requestState, .buffering(.init(), next: .error(error)))
499499
return .cancelExecutor(executor)
500-
case .executing(let executor, let requestState, .buffering(_, next: .askExecutorForMore)):
501-
self.state = .executing(executor, requestState, .buffering(.init(), next: .error(error)))
502-
return .cancelExecutor(executor)
500+
case .executing(let executor, _, .buffering(_, next: .askExecutorForMore)):
501+
self.state = .finished(error: error)
502+
return .failTask(nil, executor)
503503
case .executing(let executor, _, .buffering(_, next: .error(_))):
504504
// this would override another error, let's keep the first one
505505
return .cancelExecutor(executor)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AsyncHTTPClient
16+
import Logging
17+
import NIO
18+
import NIOHTTP1
19+
20+
extension EmbeddedChannel {
21+
public func receiveHeadAndVerify(_ verify: (HTTPRequestHead) throws -> Void = { _ in }) throws {
22+
let part = try self.readOutbound(as: HTTPClientRequestPart.self)
23+
switch part {
24+
case .head(let head):
25+
try verify(head)
26+
case .body, .end:
27+
throw HTTP1EmbeddedChannelError(reason: "Expected .head but got '\(part!)'")
28+
case .none:
29+
throw HTTP1EmbeddedChannelError(reason: "Nothing in buffer")
30+
}
31+
}
32+
33+
public func receiveBodyAndVerify(_ verify: (IOData) throws -> Void = { _ in }) throws {
34+
let part = try self.readOutbound(as: HTTPClientRequestPart.self)
35+
switch part {
36+
case .body(let iodata):
37+
try verify(iodata)
38+
case .head, .end:
39+
throw HTTP1EmbeddedChannelError(reason: "Expected .head but got '\(part!)'")
40+
case .none:
41+
throw HTTP1EmbeddedChannelError(reason: "Nothing in buffer")
42+
}
43+
}
44+
45+
public func receiveEnd() throws {
46+
let part = try self.readOutbound(as: HTTPClientRequestPart.self)
47+
switch part {
48+
case .end:
49+
break
50+
case .head, .body:
51+
throw HTTP1EmbeddedChannelError(reason: "Expected .head but got '\(part!)'")
52+
case .none:
53+
throw HTTP1EmbeddedChannelError(reason: "Nothing in buffer")
54+
}
55+
}
56+
}
57+
58+
struct HTTP1TestTools {
59+
let connection: HTTP1Connection
60+
let connectionDelegate: MockConnectionDelegate
61+
let readEventHandler: ReadEventHitHandler
62+
let logger: Logger
63+
}
64+
65+
extension EmbeddedChannel {
66+
func setupHTTP1Connection() throws -> HTTP1TestTools {
67+
let logger = Logger(label: "test")
68+
let readEventHandler = ReadEventHitHandler()
69+
70+
try self.pipeline.syncOperations.addHandler(readEventHandler)
71+
try self.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait()
72+
73+
let connectionDelegate = MockConnectionDelegate()
74+
let connection = try HTTP1Connection.start(
75+
channel: self,
76+
connectionID: 1,
77+
delegate: connectionDelegate,
78+
configuration: .init(),
79+
logger: logger
80+
)
81+
82+
// remove HTTP client encoder and decoder
83+
84+
let decoder = try self.pipeline.syncOperations.handler(type: ByteToMessageHandler<HTTPResponseDecoder>.self)
85+
let encoder = try self.pipeline.syncOperations.handler(type: HTTPRequestEncoder.self)
86+
87+
let removeDecoderFuture = self.pipeline.removeHandler(decoder)
88+
let removeEncoderFuture = self.pipeline.removeHandler(encoder)
89+
90+
self.embeddedEventLoop.run()
91+
92+
try removeDecoderFuture.wait()
93+
try removeEncoderFuture.wait()
94+
95+
return .init(
96+
connection: connection,
97+
connectionDelegate: connectionDelegate,
98+
readEventHandler: readEventHandler,
99+
logger: logger
100+
)
101+
}
102+
}
103+
104+
public struct HTTP1EmbeddedChannelError: Error, Hashable, CustomStringConvertible {
105+
public var reason: String
106+
107+
public init(reason: String) {
108+
self.reason = reason
109+
}
110+
111+
public var description: String {
112+
return self.reason
113+
}
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
//
15+
// HTTP1ClientChannelHandlerTests+XCTest.swift
16+
//
17+
import XCTest
18+
19+
///
20+
/// NOTE: This file was generated by generate_linux_tests.rb
21+
///
22+
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
23+
///
24+
25+
extension HTTP1ClientChannelHandlerTests {
26+
static var allTests: [(String, (HTTP1ClientChannelHandlerTests) -> () throws -> Void)] {
27+
return [
28+
("testResponseBackpressure", testResponseBackpressure),
29+
("testWriteBackpressure", testWriteBackpressure),
30+
("testClientHandlerCancelsRequestIfWeWantToShutdown", testClientHandlerCancelsRequestIfWeWantToShutdown),
31+
("testIdleReadTimeout", testIdleReadTimeout),
32+
]
33+
}
34+
}

0 commit comments

Comments
 (0)