Skip to content

Commit ffcd1e1

Browse files
authored
Fixes double-release of a connection. (#295)
Motivation: TaskHandler unconditionally releases it's connection on error, this can lead to double release. This issue actually indicates a more general issue where handler continues to handle errors even after its state is `.endOrError`. We need to fix this by ignoring all subsequent errors. Modifications: 1. Check state before calling out delegate and pool 2. Replace all error callouts with call to `errorCaught` Result: Fixes #294
1 parent 1432843 commit ffcd1e1

File tree

5 files changed

+84
-29
lines changed

5 files changed

+84
-29
lines changed

Diff for: Sources/AsyncHTTPClient/HTTPHandler.swift

+18-29
Original file line numberDiff line numberDiff line change
@@ -814,9 +814,8 @@ extension TaskHandler: ChannelDuplexHandler {
814814
do {
815815
try headers.validate(method: request.method, body: request.body)
816816
} catch {
817+
self.errorCaught(context: context, error: error)
817818
promise?.fail(error)
818-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
819-
self.state = .endOrError
820819
return
821820
}
822821

@@ -843,9 +842,8 @@ extension TaskHandler: ChannelDuplexHandler {
843842
self.state = .bodySent
844843
context.eventLoop.assertInEventLoop()
845844
if let expectedBodyLength = self.expectedBodyLength, expectedBodyLength != self.actualBodyLength {
846-
self.state = .endOrError
847845
let error = HTTPClientError.bodyLengthMismatch
848-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
846+
self.errorCaught(context: context, error: error)
849847
return context.eventLoop.makeFailedFuture(error)
850848
}
851849
return context.writeAndFlush(self.wrapOutboundOut(.end(nil)))
@@ -855,13 +853,7 @@ extension TaskHandler: ChannelDuplexHandler {
855853
self.callOutToDelegateFireAndForget(self.delegate.didSendRequest)
856854
}.flatMapErrorThrowing { error in
857855
context.eventLoop.assertInEventLoop()
858-
switch self.state {
859-
case .endOrError:
860-
break
861-
default:
862-
self.state = .endOrError
863-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
864-
}
856+
self.errorCaught(context: context, error: error)
865857
throw error
866858
}.cascade(to: promise)
867859
}
@@ -906,17 +898,15 @@ extension TaskHandler: ChannelDuplexHandler {
906898
case .idle:
907899
if let limit = self.expectedBodyLength, self.actualBodyLength + part.readableBytes > limit {
908900
let error = HTTPClientError.bodyLengthMismatch
909-
self.state = .endOrError
910-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
901+
self.errorCaught(context: context, error: error)
911902
promise.fail(error)
912903
return
913904
}
914905
self.actualBodyLength += part.readableBytes
915906
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
916907
default:
917908
let error = HTTPClientError.writeAfterRequestSent
918-
self.state = .endOrError
919-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
909+
self.errorCaught(context: context, error: error)
920910
promise.fail(error)
921911
}
922912
}
@@ -983,26 +973,21 @@ extension TaskHandler: ChannelDuplexHandler {
983973
context.read()
984974
}
985975
case .failure(let error):
986-
self.state = .endOrError
987-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
976+
self.errorCaught(context: context, error: error)
988977
}
989978
}
990979

991980
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
992981
if (event as? IdleStateHandler.IdleStateEvent) == .read {
993-
self.state = .endOrError
994-
let error = HTTPClientError.readTimeout
995-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
982+
self.errorCaught(context: context, error: HTTPClientError.readTimeout)
996983
} else {
997984
context.fireUserInboundEventTriggered(event)
998985
}
999986
}
1000987

1001988
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
1002989
if (event as? TaskCancelEvent) != nil {
1003-
self.state = .endOrError
1004-
let error = HTTPClientError.cancelled
1005-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
990+
self.errorCaught(context: context, error: HTTPClientError.cancelled)
1006991
promise?.succeed(())
1007992
} else {
1008993
context.triggerUserOutboundEvent(event, promise: promise)
@@ -1014,9 +999,7 @@ extension TaskHandler: ChannelDuplexHandler {
1014999
case .endOrError:
10151000
break
10161001
case .body, .head, .idle, .redirected, .sent, .bodySent:
1017-
self.state = .endOrError
1018-
let error = HTTPClientError.remoteConnectionClosed
1019-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
1002+
self.errorCaught(context: context, error: HTTPClientError.remoteConnectionClosed)
10201003
}
10211004
context.fireChannelInactive()
10221005
}
@@ -1038,14 +1021,20 @@ extension TaskHandler: ChannelDuplexHandler {
10381021
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
10391022
}
10401023
default:
1041-
self.state = .endOrError
1042-
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
1024+
switch self.state {
1025+
case .idle, .bodySent, .sent, .head, .redirected, .body:
1026+
self.state = .endOrError
1027+
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
1028+
case .endOrError:
1029+
// error was already handled
1030+
break
1031+
}
10431032
}
10441033
}
10451034

10461035
func handlerAdded(context: ChannelHandlerContext) {
10471036
guard context.channel.isActive else {
1048-
self.failTaskAndNotifyDelegate(error: HTTPClientError.remoteConnectionClosed, self.delegate.didReceiveError)
1037+
self.errorCaught(context: context, error: HTTPClientError.remoteConnectionClosed)
10491038
return
10501039
}
10511040
}

Diff for: Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ extension HTTPClientInternalTests {
4646
("testConnectErrorCalloutOnCorrectEL", testConnectErrorCalloutOnCorrectEL),
4747
("testInternalRequestURI", testInternalRequestURI),
4848
("testBodyPartStreamStateChangedBeforeNotification", testBodyPartStreamStateChangedBeforeNotification),
49+
("testHandlerDoubleError", testHandlerDoubleError),
4950
]
5051
}
5152
}

Diff for: Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift

+39
Original file line numberDiff line numberDiff line change
@@ -1080,4 +1080,43 @@ class HTTPClientInternalTests: XCTestCase {
10801080
XCTAssertNoThrow(try channel.readOutbound(as: HTTPClientRequestPart.self)) // .head
10811081
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
10821082
}
1083+
1084+
func testHandlerDoubleError() throws {
1085+
class ErrorCountingDelegate: HTTPClientResponseDelegate {
1086+
typealias Response = Void
1087+
1088+
var count = 0
1089+
1090+
func didReceiveError(task: HTTPClient.Task<Response>, _: Error) {
1091+
self.count += 1
1092+
}
1093+
1094+
func didFinishRequest(task: HTTPClient.Task<Void>) throws {
1095+
return ()
1096+
}
1097+
}
1098+
1099+
class SendTwoErrorsHandler: ChannelInboundHandler {
1100+
typealias InboundIn = Any
1101+
1102+
func handlerAdded(context: ChannelHandlerContext) {
1103+
context.fireErrorCaught(HTTPClientError.cancelled)
1104+
context.fireErrorCaught(HTTPClientError.cancelled)
1105+
}
1106+
}
1107+
1108+
let channel = EmbeddedChannel()
1109+
let task = Task<Void>(eventLoop: channel.eventLoop, logger: HTTPClient.loggingDisabled)
1110+
let delegate = ErrorCountingDelegate()
1111+
try channel.pipeline.addHandler(TaskHandler(task: task,
1112+
kind: .host,
1113+
delegate: delegate,
1114+
redirectHandler: nil,
1115+
ignoreUncleanSSLShutdown: false,
1116+
logger: HTTPClient.loggingDisabled)).wait()
1117+
1118+
try channel.pipeline.addHandler(SendTwoErrorsHandler()).wait()
1119+
1120+
XCTAssertEqual(delegate.count, 1)
1121+
}
10831122
}

Diff for: Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ extension HTTPClientTests {
122122
("testContentLengthTooShortFails", testContentLengthTooShortFails),
123123
("testBodyUploadAfterEndFails", testBodyUploadAfterEndFails),
124124
("testNoBytesSentOverBodyLimit", testNoBytesSentOverBodyLimit),
125+
("testDoubleError", testDoubleError),
125126
]
126127
}
127128
}

Diff for: Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+25
Original file line numberDiff line numberDiff line change
@@ -2602,4 +2602,29 @@ class HTTPClientTests: XCTestCase {
26022602

26032603
XCTAssertThrowsError(try future.wait())
26042604
}
2605+
2606+
func testDoubleError() throws {
2607+
// This is needed to that connection pool will not get into closed state when we release
2608+
// second connection.
2609+
_ = self.defaultClient.get(url: "http://localhost:\(self.defaultHTTPBin.port)/events/10/1")
2610+
2611+
var request = try HTTPClient.Request(url: "http://localhost:\(self.defaultHTTPBin.port)/wait", method: .POST)
2612+
request.body = .stream { writer in
2613+
// Start writing chunks so tha we will try to write after read timeout is thrown
2614+
for _ in 1...10 {
2615+
_ = writer.write(.byteBuffer(ByteBuffer(string: "1234")))
2616+
}
2617+
2618+
let promise = self.clientGroup.next().makePromise(of: Void.self)
2619+
self.clientGroup.next().scheduleTask(in: .milliseconds(3)) {
2620+
writer.write(.byteBuffer(ByteBuffer(string: "1234"))).cascade(to: promise)
2621+
}
2622+
2623+
return promise.futureResult
2624+
}
2625+
2626+
// We specify a deadline of 2 ms co that request will be timed out before all chunks are writtent,
2627+
// we need to verify that second error on write after timeout does not lead to double-release.
2628+
XCTAssertThrowsError(try self.defaultClient.execute(request: request, deadline: .now() + .milliseconds(2)).wait())
2629+
}
26052630
}

0 commit comments

Comments
 (0)