@@ -21,27 +21,30 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
21
21
typealias OutboundOut = HTTPClientRequestPart
22
22
typealias InboundIn = HTTPClientResponsePart
23
23
24
- var channelContext : ChannelHandlerContext !
25
-
26
- var state : HTTP1ConnectionStateMachine = . init( ) {
24
+ private var state : HTTP1ConnectionStateMachine = . init( ) {
27
25
didSet {
28
- self . channelContext . eventLoop. assertInEventLoop ( )
26
+ self . eventLoop. assertInEventLoop ( )
29
27
30
28
self . logger. trace ( " Connection state did change " , metadata: [
31
29
" state " : " \( String ( describing: self . state) ) " ,
32
30
] )
33
31
}
34
32
}
35
33
34
+ /// while we are in a channel pipeline, this context can be used.
35
+ private var channelContext : ChannelHandlerContext ?
36
+
36
37
/// the currently executing request
37
38
private var request : HTTPExecutingRequest ?
38
39
private var idleReadTimeoutTimer : Scheduled < Void > ?
39
40
40
41
let connection : HTTP1Connection
41
42
let logger : Logger
43
+ let eventLoop : EventLoop
42
44
43
- init ( connection: HTTP1Connection , logger: Logger ) {
45
+ init ( connection: HTTP1Connection , eventLoop : EventLoop , logger: Logger ) {
44
46
self . connection = connection
47
+ self . eventLoop = eventLoop
45
48
self . logger = logger
46
49
}
47
50
@@ -54,6 +57,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
54
57
}
55
58
}
56
59
60
+ func handlerRemoved( context: ChannelHandlerContext ) {
61
+ self . channelContext = nil
62
+ }
63
+
57
64
// MARK: Channel Inbound Handler
58
65
59
66
func channelActive( context: ChannelHandlerContext ) {
@@ -91,13 +98,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
91
98
self . run ( action, context: context)
92
99
}
93
100
94
- func close( context: ChannelHandlerContext , mode: CloseMode , promise: EventLoopPromise < Void > ? ) {
95
- context. close ( mode: mode, promise: promise)
96
- }
97
-
98
101
func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
99
102
self . logger. trace ( " New request to execute " )
100
103
104
+ assert ( self . request == nil , " Only write to the ChannelHandler if you are sure, it is idle! " )
101
105
let req = self . unwrapOutboundIn ( data)
102
106
self . request = req
103
107
@@ -135,28 +139,28 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
135
139
136
140
// MARK: - Run Actions
137
141
138
- func run( _ action: HTTP1ConnectionStateMachine . Action , context: ChannelHandlerContext ) {
142
+ private func run( _ action: HTTP1ConnectionStateMachine . Action , context: ChannelHandlerContext ) {
139
143
switch action {
140
144
case . sendRequestHead( let head, startBody: let startBody) :
141
145
if startBody {
142
- context. write ( wrapOutboundOut ( . head( head) ) , promise: nil )
146
+ context. write ( self . wrapOutboundOut ( . head( head) ) , promise: nil )
143
147
context. flush ( )
144
148
145
149
self . request!. requestHeadSent ( )
146
150
self . request!. resumeRequestBodyStream ( )
147
151
} else {
148
- context. write ( wrapOutboundOut ( . head( head) ) , promise: nil )
149
- context. write ( wrapOutboundOut ( . end( nil ) ) , promise: nil )
152
+ context. write ( self . wrapOutboundOut ( . head( head) ) , promise: nil )
153
+ context. write ( self . wrapOutboundOut ( . end( nil ) ) , promise: nil )
150
154
context. flush ( )
151
155
152
156
self . request!. requestHeadSent ( )
153
157
}
154
158
155
159
case . sendBodyPart( let part) :
156
- context. writeAndFlush ( wrapOutboundOut ( . body( part) ) , promise: nil )
160
+ context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: nil )
157
161
158
162
case . sendRequestEnd:
159
- context. writeAndFlush ( wrapOutboundOut ( . end( nil ) ) , promise: nil )
163
+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: nil )
160
164
161
165
case . pauseRequestBodyStream:
162
166
self . request!. pauseRequestBodyStream ( )
@@ -219,6 +223,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
219
223
oldRequest. succeedRequest ( buffer)
220
224
221
225
case . failRequest( let error, let finalAction) :
226
+ // see comment in the `succeedRequest` case.
222
227
let oldRequest = self . request!
223
228
self . request = nil
224
229
@@ -262,84 +267,88 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
262
267
// MARK: Private HTTPRequestExecutor
263
268
264
269
private func writeRequestBodyPart0( _ data: IOData , request: HTTPExecutingRequest ) {
265
- guard self . request === request else {
266
- // very likely we got threading issues here...
270
+ guard self . request === request, let context = self . channelContext else {
271
+ // Because the HTTPExecutingRequest may run in a different thread to our eventLoop,
272
+ // calls from the HTTPExecutingRequest to our ChannelHandler may arrive here after
273
+ // the request has been popped by the state machine or the ChannelHandler has been
274
+ // removed from the Channel pipeline. This is a normal threading issue, noone has
275
+ // screwed up.
267
276
return
268
277
}
269
278
270
279
let action = self . state. requestStreamPartReceived ( data)
271
- self . run ( action, context: self . channelContext )
280
+ self . run ( action, context: context )
272
281
}
273
282
274
283
private func finishRequestBodyStream0( _ request: HTTPExecutingRequest ) {
275
- guard self . request === request else {
276
- // very likely we got threading issues here...
284
+ guard self . request === request, let context = self . channelContext else {
285
+ // See code comment in `writeRequestBodyPart0`
277
286
return
278
287
}
279
288
280
289
let action = self . state. requestStreamFinished ( )
281
- self . run ( action, context: self . channelContext )
290
+ self . run ( action, context: context )
282
291
}
283
292
284
293
private func demandResponseBodyStream0( _ request: HTTPExecutingRequest ) {
285
- guard self . request === request else {
286
- // very likely we got threading issues here...
294
+ guard self . request === request, let context = self . channelContext else {
295
+ // See code comment in `writeRequestBodyPart0`
287
296
return
288
297
}
289
298
290
299
self . logger. trace ( " Downstream requests more response body data " )
291
300
292
301
let action = self . state. demandMoreResponseBodyParts ( )
293
- self . run ( action, context: self . channelContext )
302
+ self . run ( action, context: context )
294
303
}
295
304
296
- func cancelRequest0( _ request: HTTPExecutingRequest ) {
297
- guard self . request === request else {
298
- // very likely we got threading issues here...
305
+ private func cancelRequest0( _ request: HTTPExecutingRequest ) {
306
+ guard self . request === request, let context = self . channelContext else {
307
+ // See code comment in `writeRequestBodyPart0`
299
308
return
300
309
}
301
310
302
311
let action = self . state. requestCancelled ( closeConnection: true )
303
- self . run ( action, context: self . channelContext )
312
+ self . run ( action, context: context )
304
313
}
305
314
}
306
315
307
316
extension HTTP1ClientChannelHandler : HTTPRequestExecutor {
308
317
func writeRequestBodyPart( _ data: IOData , request: HTTPExecutingRequest ) {
309
- if self . channelContext . eventLoop. inEventLoop {
318
+ if self . eventLoop. inEventLoop {
310
319
self . writeRequestBodyPart0 ( data, request: request)
311
320
} else {
312
- self . channelContext . eventLoop. execute {
321
+ self . eventLoop. execute {
313
322
self . writeRequestBodyPart0 ( data, request: request)
314
323
}
315
324
}
316
325
}
317
326
318
327
func finishRequestBodyStream( _ request: HTTPExecutingRequest ) {
319
- if self . channelContext . eventLoop. inEventLoop {
328
+ if self . eventLoop. inEventLoop {
320
329
self . finishRequestBodyStream0 ( request)
321
330
} else {
322
- self . channelContext . eventLoop. execute {
331
+ self . eventLoop. execute {
323
332
self . finishRequestBodyStream0 ( request)
324
333
}
325
334
}
326
335
}
327
336
328
337
func demandResponseBodyStream( _ request: HTTPExecutingRequest ) {
329
- if self . channelContext . eventLoop. inEventLoop {
338
+ if self . eventLoop. inEventLoop {
330
339
self . demandResponseBodyStream0 ( request)
331
340
} else {
332
- self . channelContext . eventLoop. execute {
341
+ self . eventLoop. execute {
333
342
self . demandResponseBodyStream0 ( request)
334
343
}
335
344
}
336
345
}
337
346
338
347
func cancelRequest( _ request: HTTPExecutingRequest ) {
339
- if self . channelContext . eventLoop. inEventLoop {
348
+ if self . eventLoop. inEventLoop {
340
349
self . cancelRequest0 ( request)
341
350
} else {
342
- self . channelContext . eventLoop. execute {
351
+ self . eventLoop. execute {
343
352
self . cancelRequest0 ( request)
344
353
}
345
354
}
0 commit comments