Skip to content

Commit deaf5b4

Browse files
committed
Better HTTP/2
1 parent b8a266c commit deaf5b4

File tree

4 files changed

+250
-61
lines changed

4 files changed

+250
-61
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+95-20
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,29 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
2424

2525
private let eventLoop: EventLoop
2626

27+
private var state: HTTPRequestStateMachine = .init(isChannelWritable: false) {
28+
didSet {
29+
self.eventLoop.assertInEventLoop()
30+
}
31+
}
32+
33+
/// while we are in a channel pipeline, this context can be used.
2734
private var channelContext: ChannelHandlerContext?
28-
private var state: HTTPRequestStateMachine = .init(isChannelWritable: false)
29-
private var request: HTTPExecutableRequest?
35+
36+
private var request: HTTPExecutableRequest? {
37+
didSet {
38+
if let newRequest = self.request {
39+
if let idleReadTimeout = newRequest.idleReadTimeout {
40+
self.idleReadTimeoutStateMachine = .init(timeAmount: idleReadTimeout)
41+
}
42+
} else {
43+
self.idleReadTimeoutStateMachine = nil
44+
}
45+
}
46+
}
47+
48+
private var idleReadTimeoutStateMachine: IdleReadStateMachine?
49+
private var idleReadTimeoutTimer: Scheduled<Void>?
3050

3151
init(eventLoop: EventLoop) {
3252
self.eventLoop = eventLoop
@@ -40,10 +60,12 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
4060
self.run(action, context: context)
4161
}
4262

43-
func handlerRemoved() {
63+
func handlerRemoved(context: ChannelHandlerContext) {
4464
self.channelContext = nil
4565
}
4666

67+
// MARK: Channel Inbound Handler
68+
4769
func channelActive(context: ChannelHandlerContext) {
4870
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
4971
self.run(action, context: context)
@@ -54,15 +76,34 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
5476
self.run(action, context: context)
5577
}
5678

57-
func handlerRemoved(context: ChannelHandlerContext) {
58-
self.channelContext = nil
59-
}
60-
6179
func channelWritabilityChanged(context: ChannelHandlerContext) {
6280
let action = self.state.writabilityChanged(writable: context.channel.isWritable)
6381
self.run(action, context: context)
6482
}
6583

84+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
85+
let httpPart = self.unwrapInboundIn(data)
86+
87+
if let timeoutAction = self.idleReadTimeoutStateMachine?.channelRead(httpPart) {
88+
self.runTimeoutAction(timeoutAction, context: context)
89+
}
90+
91+
let action = self.state.channelRead(self.unwrapInboundIn(data))
92+
self.run(action, context: context)
93+
}
94+
95+
func channelReadComplete(context: ChannelHandlerContext) {
96+
let action = self.state.channelReadComplete()
97+
self.run(action, context: context)
98+
}
99+
100+
func errorCaught(context: ChannelHandlerContext, error: Error) {
101+
let action = self.state.errorHappened(error)
102+
self.run(action, context: context)
103+
}
104+
105+
// MARK: Channel Outbound Handler
106+
66107
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
67108
let request = self.unwrapOutboundIn(data)
68109
self.request = request
@@ -79,19 +120,11 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
79120
self.run(action, context: context)
80121
}
81122

82-
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
83-
let action = self.state.channelRead(self.unwrapInboundIn(data))
84-
self.run(action, context: context)
85-
}
86-
87-
func errorCaught(context: ChannelHandlerContext, error: Error) {
88-
let action = self.state.errorHappened(error)
89-
self.run(action, context: context)
90-
}
123+
// MARK: - Private Methods -
91124

92-
// MARK: - Run Actions
125+
// MARK: Run Actions
93126

94-
func run(_ action: HTTPRequestStateMachine.Action, context: ChannelHandlerContext) {
127+
private func run(_ action: HTTPRequestStateMachine.Action, context: ChannelHandlerContext) {
95128
switch action {
96129
case .sendRequestHead(let head, let startBody):
97130
if startBody {
@@ -102,7 +135,12 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
102135
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
103136
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
104137
context.flush()
138+
105139
self.request!.requestHeadSent()
140+
141+
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
142+
self.runTimeoutAction(timeoutAction, context: context)
143+
}
106144
}
107145

108146
case .pauseRequestBodyStream:
@@ -114,6 +152,10 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
114152
case .sendRequestEnd:
115153
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
116154

155+
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
156+
self.runTimeoutAction(timeoutAction, context: context)
157+
}
158+
117159
case .read:
118160
context.read()
119161

@@ -144,19 +186,52 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
144186
}
145187
}
146188

147-
// MARK: - Private Methods -
148-
149189
private func runFinalAction(_ action: HTTPRequestStateMachine.Action.FinalStreamAction, context: ChannelHandlerContext) {
150190
switch action {
151191
case .close:
152192
context.close(promise: nil)
193+
153194
case .sendRequestEnd:
154195
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
196+
155197
case .none:
156198
break
157199
}
158200
}
159201

202+
private func runTimeoutAction(_ action: IdleReadStateMachine.Action, context: ChannelHandlerContext) {
203+
switch action {
204+
case .startIdleReadTimeoutTimer(let timeAmount):
205+
assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.")
206+
207+
self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
208+
let action = self.state.idleReadTimeoutTriggered()
209+
self.run(action, context: context)
210+
}
211+
212+
case .resetIdleReadTimeoutTimer(let timeAmount):
213+
if let oldTimer = self.idleReadTimeoutTimer {
214+
oldTimer.cancel()
215+
}
216+
217+
self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
218+
let action = self.state.idleReadTimeoutTriggered()
219+
self.run(action, context: context)
220+
}
221+
222+
case .clearIdleReadTimeoutTimer:
223+
if let oldTimer = self.idleReadTimeoutTimer {
224+
self.idleReadTimeoutTimer = nil
225+
oldTimer.cancel()
226+
}
227+
228+
case .none:
229+
break
230+
}
231+
}
232+
233+
// MARK: Private HTTPRequestExecutor
234+
160235
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
161236
guard self.request === request, let context = self.channelContext else {
162237
// Because the HTTPExecutingRequest may run in a different thread to our eventLoop,

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

+85-16
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,35 @@ class HTTP2Connection {
4242
case closed
4343
}
4444

45+
/// A structure to store a Channel in a Set.
46+
private struct ChannelBox: Hashable {
47+
struct ID: Hashable {
48+
private let id: ObjectIdentifier
49+
50+
init(_ channel: Channel) {
51+
self.id = ObjectIdentifier(channel)
52+
}
53+
}
54+
55+
let channel: Channel
56+
57+
var id: ID {
58+
ID(self.channel)
59+
}
60+
61+
init(_ channel: Channel) {
62+
self.channel = channel
63+
}
64+
65+
static func == (lhs: Self, rhs: Self) -> Bool {
66+
lhs.channel === rhs.channel
67+
}
68+
69+
func hash(into hasher: inout Hasher) {
70+
hasher.combine(self.id)
71+
}
72+
}
73+
4574
var settings: HTTP2Settings? {
4675
self.channel.eventLoop.assertInEventLoop()
4776
switch self.state {
@@ -53,6 +82,11 @@ class HTTP2Connection {
5382
}
5483

5584
private var state: State
85+
86+
/// We use this channel set to remember, which open streams we need to inform that
87+
/// we want to close the connection. The channels shall than cancel their currently running
88+
/// request.
89+
private var openStreams = Set<ChannelBox>()
5690
let id: HTTPConnectionPool.Connection.ID
5791

5892
init(channel: Channel,
@@ -68,24 +102,24 @@ class HTTP2Connection {
68102
self.multiplexer = HTTP2StreamMultiplexer(
69103
mode: .client,
70104
channel: channel,
71-
targetWindowSize: 65535,
105+
targetWindowSize: 8 * 1024 * 1024, // 8mb
72106
outboundBufferSizeHighWatermark: 8196,
73107
outboundBufferSizeLowWatermark: 4092,
74108
inboundStreamInitializer: { (channel) -> EventLoopFuture<Void> in
75-
76-
return channel.eventLoop.makeFailedFuture(HTTP2PushNotSupportedError())
109+
110+
channel.eventLoop.makeFailedFuture(HTTP2PushNotSupportedError())
77111
}
78112
)
79113
self.delegate = delegate
80114
self.state = .initialized
81115
}
82-
116+
83117
deinit {
84118
guard case .closed = self.state else {
85119
preconditionFailure("")
86120
}
87121
}
88-
122+
89123
static func start(
90124
channel: Channel,
91125
connectionID: HTTPConnectionPool.Connection.ID,
@@ -94,7 +128,7 @@ class HTTP2Connection {
94128
logger: Logger
95129
) -> EventLoopFuture<HTTP2Connection> {
96130
let connection = HTTP2Connection(channel: channel, connectionID: connectionID, delegate: delegate, logger: logger)
97-
return connection.start().map{ _ in connection }
131+
return connection.start().map { _ in connection }
98132
}
99133

100134
func execute(request: HTTPExecutableRequest) {
@@ -108,6 +142,13 @@ class HTTP2Connection {
108142
try channel.pipeline.syncOperations.addHandler(translate)
109143
try channel.pipeline.syncOperations.addHandler(handler)
110144
channel.write(request, promise: nil)
145+
146+
let box = ChannelBox(channel)
147+
self.openStreams.insert(box)
148+
self.channel.closeFuture.whenComplete { _ in
149+
self.openStreams.remove(box)
150+
}
151+
111152
return channel.eventLoop.makeSucceededFuture(Void())
112153
} catch {
113154
return channel.eventLoop.makeFailedFuture(error)
@@ -119,6 +160,16 @@ class HTTP2Connection {
119160
}
120161
}
121162

163+
func cancel() {
164+
if self.channel.eventLoop.inEventLoop {
165+
self.cancel0()
166+
} else {
167+
self.channel.eventLoop.execute {
168+
self.cancel0()
169+
}
170+
}
171+
}
172+
122173
func close() -> EventLoopFuture<Void> {
123174
self.channel.close()
124175
}
@@ -146,37 +197,45 @@ class HTTP2Connection {
146197
switch self.state {
147198
case .initialized, .closed:
148199
preconditionFailure("Invalid state: \(self.state)")
149-
200+
150201
case .starting(let promise):
151202
self.state = .closing
152203
promise.fail(HTTP2ReceivedGoAwayBeforeSettingsError())
153-
204+
154205
case .active:
155206
self.state = .closing
156207
self.delegate.http2ConnectionGoAwayReceived(self)
157-
208+
158209
case .closing:
159210
// we are already closing. Nothing new
160211
break
161212
}
162213
}
163214

164215
func http2StreamClosed(availableStreams: Int) {
216+
self.channel.eventLoop.assertInEventLoop()
217+
165218
self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams)
166219
}
167-
220+
168221
private func start() -> EventLoopFuture<Void> {
169-
170-
let readyToAcceptConnectionsPromise = channel.eventLoop.makePromise(of: Void.self)
171-
222+
self.channel.eventLoop.assertInEventLoop()
223+
224+
let readyToAcceptConnectionsPromise = self.channel.eventLoop.makePromise(of: Void.self)
225+
172226
self.state = .starting(readyToAcceptConnectionsPromise)
173227
self.channel.closeFuture.whenComplete { _ in
174228
self.state = .closed
175229
self.delegate.http2ConnectionClosed(self)
176230
}
177-
231+
178232
do {
179-
let sync = channel.pipeline.syncOperations
233+
// We create and add the http handlers ourselves here, since we need to inject an
234+
// `HTTP2IdleHandler` between the `NIOHTTP2Handler` and the `HTTP2StreamMultiplexer`.
235+
// The purpose of the `HTTP2IdleHandler` is to count open streams in the multiplexer.
236+
// We use the HTTP2IdleHandler's information to notify our delegate, whether more work
237+
// can be scheduled on this connection.
238+
let sync = self.channel.pipeline.syncOperations
180239

181240
let http2Handler = NIOHTTP2Handler(mode: .client, initialSettings: nioDefaultSettings)
182241
let idleHandler = HTTP2IdleHandler(connection: self, logger: self.logger)
@@ -188,7 +247,17 @@ class HTTP2Connection {
188247
self.channel.close(mode: .all, promise: nil)
189248
readyToAcceptConnectionsPromise.fail(error)
190249
}
191-
250+
192251
return readyToAcceptConnectionsPromise.futureResult
193252
}
253+
254+
private func cancel0() {
255+
self.channel.eventLoop.assertInEventLoop()
256+
257+
self.state = .closing
258+
259+
self.openStreams.forEach { box in
260+
box.channel.triggerUserOutboundEvent(HTTPConnectionEvent.cancelRequest, promise: nil)
261+
}
262+
}
194263
}

0 commit comments

Comments
 (0)