Skip to content

Commit 28c237d

Browse files
committed
Add HTTP2Connection
1 parent 7311c0e commit 28c237d

File tree

4 files changed

+559
-0
lines changed

4 files changed

+559
-0
lines changed

Diff for: Package.swift

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ let package = Package(
3434
dependencies: [
3535
.product(name: "NIO", package: "swift-nio"),
3636
.product(name: "NIOHTTP1", package: "swift-nio"),
37+
.product(name: "NIOHTTP2", package: "swift-nio-http2"),
3738
.product(name: "NIOSSL", package: "swift-nio-ssl"),
3839
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
3940
.product(name: "NIOHTTPCompression", package: "swift-nio-extras"),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIO
17+
import NIOHTTP1
18+
import NIOHTTP2
19+
20+
class HTTP2ClientRequestHandler: ChannelDuplexHandler {
21+
typealias OutboundIn = HTTPExecutingRequest
22+
typealias OutboundOut = HTTPClientRequestPart
23+
typealias InboundIn = HTTPClientResponsePart
24+
25+
private let eventLoop: EventLoop
26+
27+
private(set) var channelContext: ChannelHandlerContext?
28+
private(set) var state: HTTPRequestStateMachine = .init(isChannelWritable: false)
29+
private(set) var request: HTTPExecutingRequest?
30+
31+
init(eventLoop: EventLoop) {
32+
self.eventLoop = eventLoop
33+
}
34+
35+
func handlerAdded(context: ChannelHandlerContext) {
36+
self.channelContext = context
37+
38+
let isWritable = context.channel.isActive && context.channel.isWritable
39+
let action = self.state.writabilityChanged(writable: isWritable)
40+
self.run(action, context: context)
41+
}
42+
43+
func handlerRemoved() {
44+
self.channelContext = nil
45+
}
46+
47+
func channelActive(context: ChannelHandlerContext) {
48+
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
49+
self.run(action, context: context)
50+
}
51+
52+
func channelInactive(context: ChannelHandlerContext) {
53+
let action = self.state.channelInactive()
54+
self.run(action, context: context)
55+
}
56+
57+
func handlerRemoved(context: ChannelHandlerContext) {
58+
self.channelContext = nil
59+
}
60+
61+
func channelWritabilityChanged(context: ChannelHandlerContext) {
62+
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
63+
self.run(action, context: context)
64+
}
65+
66+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
67+
let request = self.unwrapOutboundIn(data)
68+
self.request = request
69+
70+
let action = self.state.startRequest(
71+
head: request.requestHead,
72+
metadata: request.requestFramingMetadata
73+
)
74+
self.run(action, context: context)
75+
}
76+
77+
func read(context: ChannelHandlerContext) {
78+
let action = self.state.read()
79+
self.run(action, context: context)
80+
}
81+
82+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
83+
let action = self.state.channelRead(self.unwrapInboundIn(data))
84+
self.run(action, context: context)
85+
}
86+
87+
func errorCaught(context: ChannelHandlerContext, error: Error) {
88+
let action = self.state.errorHappened(error)
89+
self.run(action, context: context)
90+
}
91+
92+
// MARK: - Run Actions
93+
94+
func run(_ action: HTTPRequestStateMachine.Action, context: ChannelHandlerContext) {
95+
switch action {
96+
case .sendRequestHead(let head, let startBody):
97+
if startBody {
98+
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
99+
self.request!.resumeRequestBodyStream()
100+
} else {
101+
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
102+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
103+
}
104+
105+
case .pauseRequestBodyStream:
106+
self.request!.pauseRequestBodyStream()
107+
108+
case .sendBodyPart(let data):
109+
context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: nil)
110+
111+
case .sendRequestEnd:
112+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
113+
114+
case .read:
115+
context.read()
116+
117+
case .wait:
118+
break
119+
120+
case .resumeRequestBodyStream:
121+
self.request!.resumeRequestBodyStream()
122+
123+
case .forwardResponseHead(let head, pauseRequestBodyStream: let pauseRequestBodyStream):
124+
self.request!.receiveResponseHead(head)
125+
if pauseRequestBodyStream {
126+
self.request!.pauseRequestBodyStream()
127+
}
128+
129+
case .forwardResponseBodyParts(let parts):
130+
self.request!.receiveResponseBodyParts(parts)
131+
132+
case .failRequest(let error, let finalAction):
133+
self.request!.fail(error)
134+
self.request = nil
135+
self.runFinalAction(finalAction, context: context)
136+
137+
case .succeedRequest(let finalAction, let finalParts):
138+
self.request!.succeedRequest(finalParts)
139+
self.request = nil
140+
self.runFinalAction(finalAction, context: context)
141+
}
142+
}
143+
144+
// MARK: - Private Methods -
145+
146+
private func runFinalAction(_ action: HTTPRequestStateMachine.Action.FinalStreamAction, context: ChannelHandlerContext) {
147+
switch action {
148+
case .close:
149+
context.close(promise: nil)
150+
case .sendRequestEnd:
151+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
152+
case .none:
153+
break
154+
}
155+
}
156+
157+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutingRequest) {
158+
guard self.request === request, let context = self.channelContext else {
159+
// Because the HTTPExecutingRequest may run in a different thread to our eventLoop,
160+
// calls from the HTTPExecutingRequest to our ChannelHandler may arrive here after
161+
// the request has been popped by the state machine or the ChannelHandler has been
162+
// removed from the Channel pipeline. This is a normal threading issue, noone has
163+
// screwed up.
164+
return
165+
}
166+
167+
let action = self.state.requestStreamPartReceived(data)
168+
self.run(action, context: context)
169+
}
170+
171+
private func finishRequestBodyStream0(_ request: HTTPExecutingRequest) {
172+
guard self.request === request, let context = self.channelContext else {
173+
// See code comment in `writeRequestBodyPart0`
174+
return
175+
}
176+
177+
let action = self.state.requestStreamFinished()
178+
self.run(action, context: context)
179+
}
180+
181+
private func demandResponseBodyStream0(_ request: HTTPExecutingRequest) {
182+
guard self.request === request, let context = self.channelContext else {
183+
// See code comment in `writeRequestBodyPart0`
184+
return
185+
}
186+
187+
let action = self.state.demandMoreResponseBodyParts()
188+
self.run(action, context: context)
189+
}
190+
191+
private func cancelRequest0(_ request: HTTPExecutingRequest) {
192+
guard self.request === request, let context = self.channelContext else {
193+
// See code comment in `writeRequestBodyPart0`
194+
return
195+
}
196+
197+
let action = self.state.requestCancelled()
198+
self.run(action, context: context)
199+
}
200+
}
201+
202+
extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
203+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutingRequest) {
204+
if self.eventLoop.inEventLoop {
205+
self.writeRequestBodyPart0(data, request: request)
206+
} else {
207+
self.eventLoop.execute {
208+
self.writeRequestBodyPart0(data, request: request)
209+
}
210+
}
211+
}
212+
213+
func finishRequestBodyStream(_ request: HTTPExecutingRequest) {
214+
if self.eventLoop.inEventLoop {
215+
self.finishRequestBodyStream0(request)
216+
} else {
217+
self.eventLoop.execute {
218+
self.finishRequestBodyStream0(request)
219+
}
220+
}
221+
}
222+
223+
func demandResponseBodyStream(_ request: HTTPExecutingRequest) {
224+
if self.eventLoop.inEventLoop {
225+
self.demandResponseBodyStream0(request)
226+
} else {
227+
self.eventLoop.execute {
228+
self.demandResponseBodyStream0(request)
229+
}
230+
}
231+
}
232+
233+
func cancelRequest(_ request: HTTPExecutingRequest) {
234+
if self.eventLoop.inEventLoop {
235+
self.cancelRequest0(request)
236+
} else {
237+
self.eventLoop.execute {
238+
self.cancelRequest0(request)
239+
}
240+
}
241+
}
242+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIO
17+
import NIOHTTP2
18+
19+
protocol HTTP2ConnectionDelegate {
20+
func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int)
21+
func http2ConnectionClosed(_: HTTP2Connection)
22+
}
23+
24+
class HTTP2Connection {
25+
let channel: Channel
26+
let multiplexer: HTTP2StreamMultiplexer
27+
let logger: Logger
28+
29+
/// the connection pool that created the connection
30+
let delegate: HTTP2ConnectionDelegate
31+
32+
enum State {
33+
case starting(EventLoopPromise<Void>)
34+
case active(HTTP2Settings)
35+
case closed
36+
}
37+
38+
var readyToAcceptConnectionsFuture: EventLoopFuture<Void>
39+
40+
var settings: HTTP2Settings? {
41+
self.channel.eventLoop.assertInEventLoop()
42+
switch self.state {
43+
case .starting:
44+
return nil
45+
case .active(let settings):
46+
return settings
47+
case .closed:
48+
return nil
49+
}
50+
}
51+
52+
private var state: State
53+
let id: HTTPConnectionPool.Connection.ID
54+
55+
init(channel: Channel,
56+
connectionID: HTTPConnectionPool.Connection.ID,
57+
delegate: HTTP2ConnectionDelegate,
58+
logger: Logger) throws {
59+
precondition(channel.isActive)
60+
channel.eventLoop.preconditionInEventLoop()
61+
62+
let readyToAcceptConnectionsPromise = channel.eventLoop.makePromise(of: Void.self)
63+
64+
self.channel = channel
65+
self.id = connectionID
66+
self.logger = logger
67+
self.multiplexer = HTTP2StreamMultiplexer(
68+
mode: .client,
69+
channel: channel,
70+
targetWindowSize: 65535,
71+
outboundBufferSizeHighWatermark: 8196,
72+
outboundBufferSizeLowWatermark: 4092,
73+
inboundStreamInitializer: { (channel) -> EventLoopFuture<Void> in
74+
struct HTTP2PushNotsupportedError: Error {}
75+
return channel.eventLoop.makeFailedFuture(HTTP2PushNotsupportedError())
76+
}
77+
)
78+
self.delegate = delegate
79+
self.state = .starting(readyToAcceptConnectionsPromise)
80+
self.readyToAcceptConnectionsFuture = readyToAcceptConnectionsPromise.futureResult
81+
82+
// 1. Modify channel pipeline and add http2 handlers
83+
let sync = channel.pipeline.syncOperations
84+
85+
let http2Handler = NIOHTTP2Handler(mode: .client, initialSettings: nioDefaultSettings)
86+
let idleHandler = HTTP2IdleHandler(connection: self, logger: self.logger)
87+
88+
try sync.addHandler(http2Handler, position: .last)
89+
try sync.addHandler(idleHandler, position: .last)
90+
try sync.addHandler(self.multiplexer, position: .last)
91+
92+
// 2. set properties
93+
94+
// with this we create an intended retain cycle...
95+
channel.closeFuture.whenComplete { _ in
96+
self.state = .closed
97+
self.delegate.http2ConnectionClosed(self)
98+
}
99+
}
100+
101+
func execute(request: HTTPExecutingRequest) {
102+
let createStreamChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self)
103+
104+
self.multiplexer.createStreamChannel(promise: createStreamChannelPromise) { channel -> EventLoopFuture<Void> in
105+
do {
106+
let translate = HTTP2FramePayloadToHTTP1ClientCodec(httpProtocol: .https)
107+
let handler = HTTP2ClientRequestHandler(eventLoop: channel.eventLoop)
108+
109+
try channel.pipeline.syncOperations.addHandler(translate)
110+
try channel.pipeline.syncOperations.addHandler(handler)
111+
channel.write(request, promise: nil)
112+
return channel.eventLoop.makeSucceededFuture(Void())
113+
} catch {
114+
return channel.eventLoop.makeFailedFuture(error)
115+
}
116+
}
117+
118+
createStreamChannelPromise.futureResult.whenFailure { error in
119+
request.fail(error)
120+
}
121+
}
122+
123+
func close() -> EventLoopFuture<Void> {
124+
self.channel.close()
125+
}
126+
127+
func http2SettingsReceived(_ settings: HTTP2Settings) {
128+
self.channel.eventLoop.assertInEventLoop()
129+
130+
switch self.state {
131+
case .starting(let promise):
132+
self.state = .active(settings)
133+
promise.succeed(())
134+
case .active:
135+
self.state = .active(settings)
136+
case .closed:
137+
preconditionFailure("Invalid state")
138+
}
139+
}
140+
141+
func http2GoAwayReceived() {}
142+
143+
func http2StreamClosed(availableStreams: Int) {
144+
self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams)
145+
}
146+
}

0 commit comments

Comments
 (0)