Skip to content

Commit c93cac7

Browse files
committed
Code review
1 parent 068235c commit c93cac7

File tree

2 files changed

+52
-38
lines changed

2 files changed

+52
-38
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1ClientChannelHandler.swift

+45-36
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,30 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
2121
typealias OutboundOut = HTTPClientRequestPart
2222
typealias InboundIn = HTTPClientResponsePart
2323

24-
var channelContext: ChannelHandlerContext!
25-
26-
var state: HTTP1ConnectionStateMachine = .init() {
24+
private var state: HTTP1ConnectionStateMachine = .init() {
2725
didSet {
28-
self.channelContext.eventLoop.assertInEventLoop()
26+
self.eventLoop.assertInEventLoop()
2927

3028
self.logger.trace("Connection state did change", metadata: [
3129
"state": "\(String(describing: self.state))",
3230
])
3331
}
3432
}
3533

34+
/// while we are in a channel pipeline, this context can be used.
35+
private var channelContext: ChannelHandlerContext?
36+
3637
/// the currently executing request
3738
private var request: HTTPExecutingRequest?
3839
private var idleReadTimeoutTimer: Scheduled<Void>?
3940

4041
let connection: HTTP1Connection
4142
let logger: Logger
43+
let eventLoop: EventLoop
4244

43-
init(connection: HTTP1Connection, logger: Logger) {
45+
init(connection: HTTP1Connection, eventLoop: EventLoop, logger: Logger) {
4446
self.connection = connection
47+
self.eventLoop = eventLoop
4548
self.logger = logger
4649
}
4750

@@ -54,6 +57,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
5457
}
5558
}
5659

60+
func handlerRemoved(context: ChannelHandlerContext) {
61+
self.channelContext = nil
62+
}
63+
5764
// MARK: Channel Inbound Handler
5865

5966
func channelActive(context: ChannelHandlerContext) {
@@ -91,13 +98,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
9198
self.run(action, context: context)
9299
}
93100

94-
func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
95-
context.close(mode: mode, promise: promise)
96-
}
97-
98101
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
99102
self.logger.trace("New request to execute")
100103

104+
assert(self.request == nil, "Only write to the ChannelHandler if you are sure, it is idle!")
101105
let req = self.unwrapOutboundIn(data)
102106
self.request = req
103107

@@ -135,28 +139,28 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
135139

136140
// MARK: - Run Actions
137141

138-
func run(_ action: HTTP1ConnectionStateMachine.Action, context: ChannelHandlerContext) {
142+
private func run(_ action: HTTP1ConnectionStateMachine.Action, context: ChannelHandlerContext) {
139143
switch action {
140144
case .sendRequestHead(let head, startBody: let startBody):
141145
if startBody {
142-
context.write(wrapOutboundOut(.head(head)), promise: nil)
146+
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
143147
context.flush()
144148

145149
self.request!.requestHeadSent()
146150
self.request!.resumeRequestBodyStream()
147151
} else {
148-
context.write(wrapOutboundOut(.head(head)), promise: nil)
149-
context.write(wrapOutboundOut(.end(nil)), promise: nil)
152+
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
153+
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
150154
context.flush()
151155

152156
self.request!.requestHeadSent()
153157
}
154158

155159
case .sendBodyPart(let part):
156-
context.writeAndFlush(wrapOutboundOut(.body(part)), promise: nil)
160+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
157161

158162
case .sendRequestEnd:
159-
context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: nil)
163+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
160164

161165
case .pauseRequestBodyStream:
162166
self.request!.pauseRequestBodyStream()
@@ -219,6 +223,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
219223
oldRequest.succeedRequest(buffer)
220224

221225
case .failRequest(let error, let finalAction):
226+
// see comment in the `succeedRequest` case.
222227
let oldRequest = self.request!
223228
self.request = nil
224229

@@ -262,84 +267,88 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
262267
// MARK: Private HTTPRequestExecutor
263268

264269
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutingRequest) {
265-
guard self.request === request else {
266-
// very likely we got threading issues here...
270+
guard self.request === request, let context = self.channelContext else {
271+
// Because the HTTPExecutingRequest may run in a different thread to our eventLoop,
272+
// calls from the HTTPExecutingRequest to our ChannelHandler may arrive here after
273+
// the request has been popped by the state machine or the ChannelHandler has been
274+
// removed from the Channel pipeline. This is a normal threading issue, noone has
275+
// screwed up.
267276
return
268277
}
269278

270279
let action = self.state.requestStreamPartReceived(data)
271-
self.run(action, context: self.channelContext)
280+
self.run(action, context: context)
272281
}
273282

274283
private func finishRequestBodyStream0(_ request: HTTPExecutingRequest) {
275-
guard self.request === request else {
276-
// very likely we got threading issues here...
284+
guard self.request === request, let context = self.channelContext else {
285+
// See code comment in `writeRequestBodyPart0`
277286
return
278287
}
279288

280289
let action = self.state.requestStreamFinished()
281-
self.run(action, context: self.channelContext)
290+
self.run(action, context: context)
282291
}
283292

284293
private func demandResponseBodyStream0(_ request: HTTPExecutingRequest) {
285-
guard self.request === request else {
286-
// very likely we got threading issues here...
294+
guard self.request === request, let context = self.channelContext else {
295+
// See code comment in `writeRequestBodyPart0`
287296
return
288297
}
289298

290299
self.logger.trace("Downstream requests more response body data")
291300

292301
let action = self.state.demandMoreResponseBodyParts()
293-
self.run(action, context: self.channelContext)
302+
self.run(action, context: context)
294303
}
295304

296-
func cancelRequest0(_ request: HTTPExecutingRequest) {
297-
guard self.request === request else {
298-
// very likely we got threading issues here...
305+
private func cancelRequest0(_ request: HTTPExecutingRequest) {
306+
guard self.request === request, let context = self.channelContext else {
307+
// See code comment in `writeRequestBodyPart0`
299308
return
300309
}
301310

302311
let action = self.state.requestCancelled(closeConnection: true)
303-
self.run(action, context: self.channelContext)
312+
self.run(action, context: context)
304313
}
305314
}
306315

307316
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
308317
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutingRequest) {
309-
if self.channelContext.eventLoop.inEventLoop {
318+
if self.eventLoop.inEventLoop {
310319
self.writeRequestBodyPart0(data, request: request)
311320
} else {
312-
self.channelContext.eventLoop.execute {
321+
self.eventLoop.execute {
313322
self.writeRequestBodyPart0(data, request: request)
314323
}
315324
}
316325
}
317326

318327
func finishRequestBodyStream(_ request: HTTPExecutingRequest) {
319-
if self.channelContext.eventLoop.inEventLoop {
328+
if self.eventLoop.inEventLoop {
320329
self.finishRequestBodyStream0(request)
321330
} else {
322-
self.channelContext.eventLoop.execute {
331+
self.eventLoop.execute {
323332
self.finishRequestBodyStream0(request)
324333
}
325334
}
326335
}
327336

328337
func demandResponseBodyStream(_ request: HTTPExecutingRequest) {
329-
if self.channelContext.eventLoop.inEventLoop {
338+
if self.eventLoop.inEventLoop {
330339
self.demandResponseBodyStream0(request)
331340
} else {
332-
self.channelContext.eventLoop.execute {
341+
self.eventLoop.execute {
333342
self.demandResponseBodyStream0(request)
334343
}
335344
}
336345
}
337346

338347
func cancelRequest(_ request: HTTPExecutingRequest) {
339-
if self.channelContext.eventLoop.inEventLoop {
348+
if self.eventLoop.inEventLoop {
340349
self.cancelRequest0(request)
341350
} else {
342-
self.channelContext.eventLoop.execute {
351+
self.eventLoop.execute {
343352
self.cancelRequest0(request)
344353
}
345354
}

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTP1.1/HTTP1Connection.swift

+7-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ protocol HTTP1ConnectionDelegate {
2525
final class HTTP1Connection {
2626
let channel: Channel
2727

28-
/// the connection pool that created the connection
28+
/// the connection's delegate, that will be informed about connection close and connection release
29+
/// (ready to run next request).
2930
let delegate: HTTP1ConnectionDelegate
3031

3132
enum State {
@@ -61,7 +62,11 @@ final class HTTP1Connection {
6162
try sync.addHandler(decompressHandler)
6263
}
6364

64-
let channelHandler = HTTP1ClientChannelHandler(connection: self, logger: logger)
65+
let channelHandler = HTTP1ClientChannelHandler(
66+
connection: self,
67+
eventLoop: channel.eventLoop,
68+
logger: logger
69+
)
6570
try sync.addHandler(channelHandler)
6671

6772
// with this we create an intended retain cycle...

0 commit comments

Comments
 (0)