Skip to content

Commit d5ba480

Browse files
committed
Add HTTP2Connection
1 parent ed44283 commit d5ba480

File tree

4 files changed

+543
-0
lines changed

4 files changed

+543
-0
lines changed

Diff for: Package.swift

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ let package = Package(
3434
dependencies: [
3535
.product(name: "NIO", package: "swift-nio"),
3636
.product(name: "NIOHTTP1", package: "swift-nio"),
37+
.product(name: "NIOHTTP2", package: "swift-nio-http2"),
3738
.product(name: "NIOSSL", package: "swift-nio-ssl"),
3839
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
3940
.product(name: "NIOHTTPCompression", package: "swift-nio-extras"),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIO
17+
import NIOHTTP1
18+
import NIOHTTP2
19+
20+
class HTTP2ClientRequestHandler: ChannelDuplexHandler {
21+
typealias OutboundIn = HTTPExecutingRequest
22+
typealias OutboundOut = HTTPClientRequestPart
23+
typealias InboundIn = HTTPClientResponsePart
24+
25+
private(set) var channelContext: ChannelHandlerContext!
26+
private(set) var state: HTTPRequestStateMachine = .init(isChannelWritable: false)
27+
private(set) var request: HTTPExecutingRequest?
28+
29+
init() {}
30+
31+
func channelActive(context: ChannelHandlerContext) {
32+
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
33+
self.run(action, context: context)
34+
}
35+
36+
func channelInactive(context: ChannelHandlerContext) {
37+
let action = self.state.channelInactive()
38+
self.run(action, context: context)
39+
}
40+
41+
func handlerAdded(context: ChannelHandlerContext) {
42+
self.channelContext = context
43+
44+
let isWritable = context.channel.isActive && context.channel.isWritable
45+
let action = self.state.writabilityChanged(writable: isWritable)
46+
self.run(action, context: context)
47+
}
48+
49+
func handlerRemoved(context: ChannelHandlerContext) {
50+
self.channelContext = nil
51+
}
52+
53+
func channelWritabilityChanged(context: ChannelHandlerContext) {
54+
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
55+
self.run(action, context: context)
56+
}
57+
58+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
59+
let request = self.unwrapOutboundIn(data)
60+
self.request = request
61+
62+
let action = self.state.startRequest(
63+
head: request.requestHead,
64+
metadata: request.requestFramingMetadata
65+
)
66+
self.run(action, context: context)
67+
}
68+
69+
func read(context: ChannelHandlerContext) {
70+
let action = self.state.read()
71+
self.run(action, context: context)
72+
}
73+
74+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
75+
let action = self.state.channelRead(self.unwrapInboundIn(data))
76+
self.run(action, context: context)
77+
}
78+
79+
func errorCaught(context: ChannelHandlerContext, error: Error) {
80+
let action = self.state.errorHappened(error)
81+
self.run(action, context: context)
82+
}
83+
84+
// MARK: - Run Actions
85+
86+
func run(_ action: HTTPRequestStateMachine.Action, context: ChannelHandlerContext) {
87+
switch action {
88+
case .sendRequestHead(let head, let startBody):
89+
if startBody {
90+
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
91+
self.request!.resumeRequestBodyStream()
92+
} else {
93+
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
94+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
95+
}
96+
97+
case .pauseRequestBodyStream:
98+
self.request!.pauseRequestBodyStream()
99+
100+
case .sendBodyPart(let data):
101+
context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: nil)
102+
103+
case .sendRequestEnd:
104+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
105+
106+
case .read:
107+
context.read()
108+
109+
case .wait:
110+
break
111+
112+
case .resumeRequestBodyStream:
113+
self.request!.resumeRequestBodyStream()
114+
115+
case .forwardResponseHead(let head, pauseRequestBodyStream: let pauseRequestBodyStream):
116+
self.request!.receiveResponseHead(head)
117+
if pauseRequestBodyStream {
118+
self.request!.pauseRequestBodyStream()
119+
}
120+
121+
case .forwardResponseBodyParts(let parts):
122+
self.request!.receiveResponseBodyParts(parts)
123+
124+
case .failRequest(let error, let finalAction):
125+
self.request!.fail(error)
126+
self.request = nil
127+
self.runFinalAction(finalAction, context: context)
128+
129+
case .succeedRequest(let finalAction, let finalParts):
130+
self.request!.succeedRequest(finalParts)
131+
self.request = nil
132+
self.runFinalAction(finalAction, context: context)
133+
}
134+
}
135+
136+
// MARK: - Private Methods -
137+
138+
private func runFinalAction(_ action: HTTPRequestStateMachine.Action.FinalStreamAction, context: ChannelHandlerContext) {
139+
switch action {
140+
case .close:
141+
context.close(promise: nil)
142+
case .sendRequestEnd:
143+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
144+
case .none:
145+
break
146+
}
147+
}
148+
149+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutingRequest) {
150+
guard self.request === request else {
151+
return
152+
}
153+
154+
let action = self.state.requestStreamPartReceived(data)
155+
self.run(action, context: self.channelContext)
156+
}
157+
158+
private func finishRequestBodyStream0(_ request: HTTPExecutingRequest) {
159+
guard self.request === request else {
160+
return
161+
}
162+
163+
let action = self.state.requestStreamFinished()
164+
self.run(action, context: self.channelContext)
165+
}
166+
167+
private func demandResponseBodyStream0(_ request: HTTPExecutingRequest) {
168+
guard self.request === request else {
169+
return
170+
}
171+
172+
let action = self.state.demandMoreResponseBodyParts()
173+
self.run(action, context: self.channelContext)
174+
}
175+
176+
private func cancelRequest0(_ request: HTTPExecutingRequest) {
177+
guard self.request === request else {
178+
return
179+
}
180+
181+
let action = self.state.requestCancelled()
182+
self.run(action, context: self.channelContext)
183+
}
184+
}
185+
186+
extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
187+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutingRequest) {
188+
if self.channelContext.eventLoop.inEventLoop {
189+
self.writeRequestBodyPart0(data, request: request)
190+
} else {
191+
self.channelContext.eventLoop.execute {
192+
self.writeRequestBodyPart0(data, request: request)
193+
}
194+
}
195+
}
196+
197+
func finishRequestBodyStream(_ request: HTTPExecutingRequest) {
198+
if self.channelContext.eventLoop.inEventLoop {
199+
self.finishRequestBodyStream0(request)
200+
} else {
201+
self.channelContext.eventLoop.execute {
202+
self.finishRequestBodyStream0(request)
203+
}
204+
}
205+
}
206+
207+
func demandResponseBodyStream(_ request: HTTPExecutingRequest) {
208+
if self.channelContext.eventLoop.inEventLoop {
209+
self.demandResponseBodyStream0(request)
210+
} else {
211+
self.channelContext.eventLoop.execute {
212+
self.demandResponseBodyStream0(request)
213+
}
214+
}
215+
}
216+
217+
func cancelRequest(_ request: HTTPExecutingRequest) {
218+
if self.channelContext.eventLoop.inEventLoop {
219+
self.cancelRequest0(request)
220+
} else {
221+
self.channelContext.eventLoop.execute {
222+
self.cancelRequest0(request)
223+
}
224+
}
225+
}
226+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIO
17+
import NIOHTTP2
18+
19+
protocol HTTP2ConnectionDelegate {
20+
func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int)
21+
func http2ConnectionClosed(_: HTTP2Connection)
22+
}
23+
24+
class HTTP2Connection {
25+
let channel: Channel
26+
let multiplexer: HTTP2StreamMultiplexer
27+
let logger: Logger
28+
29+
/// the connection pool that created the connection
30+
let delegate: HTTP2ConnectionDelegate
31+
32+
enum State {
33+
case starting(EventLoopPromise<Void>)
34+
case active(HTTP2Settings)
35+
case closed
36+
}
37+
38+
var readyToAcceptConnectionsFuture: EventLoopFuture<Void>
39+
40+
var settings: HTTP2Settings? {
41+
self.channel.eventLoop.assertInEventLoop()
42+
switch self.state {
43+
case .starting:
44+
return nil
45+
case .active(let settings):
46+
return settings
47+
case .closed:
48+
return nil
49+
}
50+
}
51+
52+
private var state: State
53+
let id: HTTPConnectionPool.Connection.ID
54+
55+
init(channel: Channel,
56+
connectionID: HTTPConnectionPool.Connection.ID,
57+
delegate: HTTP2ConnectionDelegate,
58+
logger: Logger) throws {
59+
precondition(channel.isActive)
60+
channel.eventLoop.preconditionInEventLoop()
61+
62+
let readyToAcceptConnectionsPromise = channel.eventLoop.makePromise(of: Void.self)
63+
64+
self.channel = channel
65+
self.id = connectionID
66+
self.logger = logger
67+
self.multiplexer = HTTP2StreamMultiplexer(
68+
mode: .client,
69+
channel: channel,
70+
targetWindowSize: 65535,
71+
outboundBufferSizeHighWatermark: 8196,
72+
outboundBufferSizeLowWatermark: 4092,
73+
inboundStreamInitializer: { (channel) -> EventLoopFuture<Void> in
74+
struct HTTP2PushNotsupportedError: Error {}
75+
return channel.eventLoop.makeFailedFuture(HTTP2PushNotsupportedError())
76+
}
77+
)
78+
self.delegate = delegate
79+
self.state = .starting(readyToAcceptConnectionsPromise)
80+
self.readyToAcceptConnectionsFuture = readyToAcceptConnectionsPromise.futureResult
81+
82+
// 1. Modify channel pipeline and add http2 handlers
83+
let sync = channel.pipeline.syncOperations
84+
85+
let http2Handler = NIOHTTP2Handler(mode: .client, initialSettings: nioDefaultSettings)
86+
let idleHandler = HTTP2IdleHandler(connection: self, logger: self.logger)
87+
88+
try sync.addHandler(http2Handler, position: .last)
89+
try sync.addHandler(idleHandler, position: .last)
90+
try sync.addHandler(self.multiplexer, position: .last)
91+
92+
// 2. set properties
93+
94+
// with this we create an intended retain cycle...
95+
channel.closeFuture.whenComplete { _ in
96+
self.state = .closed
97+
self.delegate.http2ConnectionClosed(self)
98+
}
99+
}
100+
101+
func execute(request: HTTPExecutingRequest) {
102+
let createStreamChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self)
103+
104+
self.multiplexer.createStreamChannel(promise: createStreamChannelPromise) { channel -> EventLoopFuture<Void> in
105+
do {
106+
let translate = HTTP2FramePayloadToHTTP1ClientCodec(httpProtocol: .https)
107+
let handler = HTTP2ClientRequestHandler()
108+
109+
try channel.pipeline.syncOperations.addHandler(translate)
110+
try channel.pipeline.syncOperations.addHandler(handler)
111+
channel.write(request, promise: nil)
112+
return channel.eventLoop.makeSucceededFuture(Void())
113+
} catch {
114+
return channel.eventLoop.makeFailedFuture(error)
115+
}
116+
}
117+
118+
createStreamChannelPromise.futureResult.whenFailure { error in
119+
request.fail(error)
120+
}
121+
}
122+
123+
func close() -> EventLoopFuture<Void> {
124+
self.channel.close()
125+
}
126+
127+
func http2SettingsReceived(_ settings: HTTP2Settings) {
128+
self.channel.eventLoop.assertInEventLoop()
129+
130+
switch self.state {
131+
case .starting(let promise):
132+
self.state = .active(settings)
133+
promise.succeed(())
134+
case .active:
135+
self.state = .active(settings)
136+
case .closed:
137+
preconditionFailure("Invalid state")
138+
}
139+
}
140+
141+
func http2GoAwayReceived() {}
142+
143+
func http2StreamClosed(availableStreams: Int) {
144+
self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams)
145+
}
146+
}

0 commit comments

Comments
 (0)