Skip to content

Always clear read idle timeout at the end of a request #455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
private var idleReadTimeoutStateMachine: IdleReadStateMachine?
private var idleReadTimeoutTimer: Scheduled<Void>?

/// Cancelling a task in NIO does *not* guarantee that the task will not execute under certain race conditions.
/// We therefore give each timer an ID and increase the ID every time we reset or cancel it.
/// We check in the task if the timer ID has changed in the meantime and do not execute any action if has changed.
private var currentIdleReadTimeoutTimerID: Int = 0

private let backgroundLogger: Logger
private var logger: Logger

Expand Down Expand Up @@ -253,6 +258,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {

let oldRequest = self.request!
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)

switch finalAction {
case .close:
Expand All @@ -271,6 +277,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
// see comment in the `succeedRequest` case.
let oldRequest = self.request!
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)

switch finalAction {
case .close:
Expand All @@ -292,7 +299,9 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .startIdleReadTimeoutTimer(let timeAmount):
assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.")

let timerID = self.currentIdleReadTimeoutTimerID
self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
guard self.currentIdleReadTimeoutTimerID == timerID else { return }
let action = self.state.idleReadTimeoutTriggered()
self.run(action, context: context)
}
Expand All @@ -302,17 +311,19 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
oldTimer.cancel()
}

self.currentIdleReadTimeoutTimerID &+= 1
let timerID = self.currentIdleReadTimeoutTimerID
self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
guard self.currentIdleReadTimeoutTimerID == timerID else { return }
let action = self.state.idleReadTimeoutTriggered()
self.run(action, context: context)
}

case .clearIdleReadTimeoutTimer:
if let oldTimer = self.idleReadTimeoutTimer {
self.idleReadTimeoutTimer = nil
self.currentIdleReadTimeoutTimerID &+= 1
oldTimer.cancel()
}

case .none:
break
}
Expand Down Expand Up @@ -465,7 +476,7 @@ struct IdleReadStateMachine {
return .resetIdleReadTimeoutTimer(self.timeAmount)
case .end:
self.state = .responseEndReceived
return .clearIdleReadTimeoutTimer
return .none
}

case .responseEndReceived:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,13 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
case .failRequest(let error, let finalAction):
self.request!.fail(error)
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
self.runFinalAction(finalAction, context: context)

case .succeedRequest(let finalAction, let finalParts):
self.request!.succeedRequest(finalParts)
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
self.runFinalAction(finalAction, context: context)
}
}
Expand All @@ -224,6 +226,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.")

self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
guard self.idleReadTimeoutTimer != nil else { return }
let action = self.state.idleReadTimeoutTriggered()
self.run(action, context: context)
}
Expand All @@ -234,10 +237,10 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
}

self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) {
guard self.idleReadTimeoutTimer != nil else { return }
let action = self.state.idleReadTimeoutTriggered()
self.run(action, context: context)
}

case .clearIdleReadTimeoutTimer:
if let oldTimer = self.idleReadTimeoutTimer {
self.idleReadTimeoutTimer = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extension HTTP1ClientChannelHandlerTests {
("testWriteBackpressure", testWriteBackpressure),
("testClientHandlerCancelsRequestIfWeWantToShutdown", testClientHandlerCancelsRequestIfWeWantToShutdown),
("testIdleReadTimeout", testIdleReadTimeout),
("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled),
("testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand", testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand),
]
}
Expand Down
50 changes: 50 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,56 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
}
}

func testIdleReadTimeoutIsCanceledIfRequestIsCanceled() {
let embedded = EmbeddedChannel()
var maybeTestUtils: HTTP1TestTools?
XCTAssertNoThrow(maybeTestUtils = try embedded.setupHTTP1Connection())
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/"))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }

let delegate = ResponseBackpressureDelegate(eventLoop: embedded.eventLoop)
var maybeRequestBag: RequestBag<ResponseBackpressureDelegate>?
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
request: request,
eventLoopPreference: .delegate(on: embedded.eventLoop),
task: .init(eventLoop: embedded.eventLoop, logger: testUtils.logger),
redirectHandler: nil,
connectionDeadline: .now() + .seconds(30),
requestOptions: .forTests(idleReadTimeout: .milliseconds(200)),
delegate: delegate
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }

testUtils.connection.executeRequest(requestBag)

XCTAssertNoThrow(try embedded.receiveHeadAndVerify {
XCTAssertEqual($0.method, .GET)
XCTAssertEqual($0.uri, "/")
XCTAssertEqual($0.headers.first(name: "host"), "localhost")
})
XCTAssertNoThrow(try embedded.receiveEnd())

let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")]))

XCTAssertEqual(testUtils.readEventHandler.readHitCounter, 0)
embedded.read()
XCTAssertEqual(testUtils.readEventHandler.readHitCounter, 1)
XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.head(responseHead)))

// canceling the request
requestBag.cancel()
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
XCTAssertEqual($0 as? HTTPClientError, .cancelled)
}

// the idle read timeout should be cleared because we canceled the request
// therefore advancing the time should not trigger a crash
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(250))
}

func testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand() {
let embedded = EmbeddedChannel()
var maybeTestUtils: HTTP1TestTools?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extension HTTP2ClientRequestHandlerTests {
("testResponseBackpressure", testResponseBackpressure),
("testWriteBackpressure", testWriteBackpressure),
("testIdleReadTimeout", testIdleReadTimeout),
("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled),
]
}
}
52 changes: 52 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,56 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
XCTAssertEqual($0 as? HTTPClientError, .readTimeout)
}
}

func testIdleReadTimeoutIsCanceledIfRequestIsCanceled() {
let embedded = EmbeddedChannel()
let readEventHandler = ReadEventHitHandler()
let requestHandler = HTTP2ClientRequestHandler(eventLoop: embedded.eventLoop)
XCTAssertNoThrow(try embedded.pipeline.syncOperations.addHandlers([readEventHandler, requestHandler]))
XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait())
let logger = Logger(label: "test")

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/"))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }

let delegate = ResponseBackpressureDelegate(eventLoop: embedded.eventLoop)
var maybeRequestBag: RequestBag<ResponseBackpressureDelegate>?
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
request: request,
eventLoopPreference: .delegate(on: embedded.eventLoop),
task: .init(eventLoop: embedded.eventLoop, logger: logger),
redirectHandler: nil,
connectionDeadline: .now() + .seconds(30),
requestOptions: .forTests(idleReadTimeout: .milliseconds(200)),
delegate: delegate
))
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }

embedded.write(requestBag, promise: nil)

XCTAssertNoThrow(try embedded.receiveHeadAndVerify {
XCTAssertEqual($0.method, .GET)
XCTAssertEqual($0.uri, "/")
XCTAssertEqual($0.headers.first(name: "host"), "localhost")
})
XCTAssertNoThrow(try embedded.receiveEnd())

let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "12")]))

XCTAssertEqual(readEventHandler.readHitCounter, 0)
embedded.read()
XCTAssertEqual(readEventHandler.readHitCounter, 1)
XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.head(responseHead)))

// canceling the request
requestBag.cancel()
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
XCTAssertEqual($0 as? HTTPClientError, .cancelled)
}

// the idle read timeout should be cleared because we canceled the request
// therefore advancing the time should not trigger a crash
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(250))
}
}