Skip to content

Commit 4af1b1a

Browse files
committed
Code review part 3
1 parent 0bac419 commit 4af1b1a

File tree

3 files changed

+54
-30
lines changed

3 files changed

+54
-30
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+4-6
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import NIO
1717
import NIOHTTP1
1818
import NIOHTTP2
1919

20-
class HTTP2ClientRequestHandler: ChannelDuplexHandler {
20+
final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
2121
typealias OutboundIn = HTTPExecutableRequest
2222
typealias OutboundOut = HTTPClientRequestPart
2323
typealias InboundIn = HTTPClientResponsePart
@@ -35,10 +35,8 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
3535

3636
private var request: HTTPExecutableRequest? {
3737
didSet {
38-
if let newRequest = self.request {
39-
if let idleReadTimeout = newRequest.idleReadTimeout {
40-
self.idleReadTimeoutStateMachine = .init(timeAmount: idleReadTimeout)
41-
}
38+
if let newRequest = self.request, let idleReadTimeout = newRequest.idleReadTimeout {
39+
self.idleReadTimeoutStateMachine = .init(timeAmount: idleReadTimeout)
4240
} else {
4341
self.idleReadTimeoutStateMachine = nil
4442
}
@@ -88,7 +86,7 @@ class HTTP2ClientRequestHandler: ChannelDuplexHandler {
8886
self.runTimeoutAction(timeoutAction, context: context)
8987
}
9088

91-
let action = self.state.channelRead(self.unwrapInboundIn(data))
89+
let action = self.state.channelRead(httpPart)
9290
self.run(action, context: context)
9391
}
9492

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

+15-6
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ struct HTTP2PushNotSupportedError: Error {}
2626

2727
struct HTTP2ReceivedGoAwayBeforeSettingsError: Error {}
2828

29-
class HTTP2Connection {
29+
final class HTTP2Connection {
3030
let channel: Channel
3131
let multiplexer: HTTP2StreamMultiplexer
3232
let logger: Logger
@@ -42,7 +42,7 @@ class HTTP2Connection {
4242
case closed
4343
}
4444

45-
/// A structure to store a Channel in a Set.
45+
/// A structure to store a http/2 stream channel in a set.
4646
private struct ChannelBox: Hashable {
4747
struct ID: Hashable {
4848
private let id: ObjectIdentifier
@@ -106,7 +106,6 @@ class HTTP2Connection {
106106
outboundBufferSizeHighWatermark: 8196,
107107
outboundBufferSizeLowWatermark: 4092,
108108
inboundStreamInitializer: { (channel) -> EventLoopFuture<Void> in
109-
110109
channel.eventLoop.makeFailedFuture(HTTP2PushNotSupportedError())
111110
}
112111
)
@@ -116,7 +115,7 @@ class HTTP2Connection {
116115

117116
deinit {
118117
guard case .closed = self.state else {
119-
preconditionFailure("")
118+
preconditionFailure("Connection must be closed, before we can deinit it")
120119
}
121120
}
122121

@@ -136,20 +135,25 @@ class HTTP2Connection {
136135

137136
self.multiplexer.createStreamChannel(promise: createStreamChannelPromise) { channel -> EventLoopFuture<Void> in
138137
do {
138+
// We only support http/2 over an https connection – using the Application-Layer
139+
// Protocol Negotiation (ALPN). For this reason it is save to fix this to `.https`.
139140
let translate = HTTP2FramePayloadToHTTP1ClientCodec(httpProtocol: .https)
140141
let handler = HTTP2ClientRequestHandler(eventLoop: channel.eventLoop)
141142

142143
try channel.pipeline.syncOperations.addHandler(translate)
143144
try channel.pipeline.syncOperations.addHandler(handler)
144-
channel.write(request, promise: nil)
145145

146+
// We must add the new channel to the list of open channels BEFORE we write the
147+
// request to it. In case of an error, we are sure that the channel was added
148+
// before.
146149
let box = ChannelBox(channel)
147150
self.openStreams.insert(box)
148151
self.channel.closeFuture.whenComplete { _ in
149152
self.openStreams.remove(box)
150153
}
151154

152-
return channel.eventLoop.makeSucceededFuture(Void())
155+
channel.write(request, promise: nil)
156+
return channel.eventLoop.makeSucceededVoidFuture()
153157
} catch {
154158
return channel.eventLoop.makeFailedFuture(error)
155159
}
@@ -256,8 +260,13 @@ class HTTP2Connection {
256260

257261
self.state = .closing
258262

263+
// inform all open streams, that the currently running request should be cancelled.
259264
self.openStreams.forEach { box in
260265
box.channel.triggerUserOutboundEvent(HTTPConnectionEvent.cancelRequest, promise: nil)
261266
}
267+
268+
// inform the idle connection handler, that connection should be closed, once all streams
269+
// are closed.
270+
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.closeConnection, promise: nil)
262271
}
263272
}

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2IdleHandler.swift

+35-18
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ import Logging
1616
import NIO
1717
import NIOHTTP2
1818

19-
internal final class HTTP2IdleHandler: ChannelDuplexHandler {
19+
// This is a `ChannelDuplexHandler` since we need to intercept outgoing user events.
20+
final class HTTP2IdleHandler: ChannelDuplexHandler {
2021
typealias InboundIn = HTTP2Frame
2122
typealias InboundOut = HTTP2Frame
2223
typealias OutboundIn = HTTP2Frame
@@ -25,7 +26,7 @@ internal final class HTTP2IdleHandler: ChannelDuplexHandler {
2526
let logger: Logger
2627
let connection: HTTP2Connection
2728

28-
var state: StateMachine = .init()
29+
private var state: StateMachine = .init()
2930

3031
init(connection: HTTP2Connection, logger: Logger) {
3132
self.connection = connection
@@ -66,7 +67,7 @@ internal final class HTTP2IdleHandler: ChannelDuplexHandler {
6667
context.fireChannelRead(data)
6768
}
6869

69-
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
70+
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
7071
switch event {
7172
case HTTPConnectionEvent.closeConnection:
7273
let action = self.state.closeEventReceived()
@@ -140,6 +141,9 @@ extension HTTP2IdleHandler {
140141
preconditionFailure("Invalid state: \(self.state)")
141142

142143
case .connected:
144+
// a settings frame might have multiple entries for `maxConcurrentStreams`. We are
145+
// only interested in the last value! If no `maxConcurrentStreams` is set, we assume
146+
// the http/2 default of 100.
143147
let maxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value ?? 100
144148
self.state = .active(openStreams: 0, maxStreams: maxStreams)
145149
return .notifyConnectionNewSettings(settings)
@@ -197,28 +201,41 @@ extension HTTP2IdleHandler {
197201
}
198202

199203
mutating func streamCreated() -> Action {
200-
guard case .active(var openStreams, let maxStreams) = self.state else {
201-
preconditionFailure("Invalid state")
202-
}
204+
switch self.state {
205+
case .active(var openStreams, let maxStreams):
206+
openStreams += 1
207+
self.state = .active(openStreams: openStreams, maxStreams: maxStreams)
208+
return .nothing
203209

204-
openStreams += 1
205-
assert(openStreams <= maxStreams)
210+
case .closing(var openStreams, let maxStreams):
211+
openStreams += 1
212+
self.state = .active(openStreams: openStreams, maxStreams: maxStreams)
213+
return .nothing
206214

207-
self.state = .active(openStreams: openStreams, maxStreams: maxStreams)
208-
return .nothing
215+
case .initialized, .connected, .closed:
216+
preconditionFailure("Invalid state: \(self.state)")
217+
}
209218
}
210219

211220
mutating func streamClosed() -> Action {
212-
guard case .active(var openStreams, let maxStreams) = self.state else {
213-
preconditionFailure("Invalid state")
214-
// TODO: What happens, if we received a go away?!??!
215-
}
221+
switch self.state {
222+
case .active(var openStreams, let maxStreams):
223+
openStreams -= 1
224+
self.state = .active(openStreams: openStreams, maxStreams: maxStreams)
225+
return .notifyConnectionStreamClosed(currentlyAvailable: maxStreams - openStreams)
216226

217-
openStreams -= 1
218-
assert(openStreams >= 0)
227+
case .closing(var openStreams, let maxStreams):
228+
openStreams -= 1
229+
if openStreams == 0 {
230+
self.state = .closed
231+
return .close
232+
}
233+
self.state = .closing(openStreams: openStreams, maxStreams: maxStreams)
234+
return .nothing
219235

220-
self.state = .active(openStreams: openStreams, maxStreams: maxStreams)
221-
return .notifyConnectionStreamClosed(currentlyAvailable: maxStreams - openStreams)
236+
case .initialized, .connected, .closed:
237+
preconditionFailure("Invalid state: \(self.state)")
238+
}
222239
}
223240
}
224241
}

0 commit comments

Comments
 (0)