Skip to content

Always access Task’s connection and cancelled property through a lock #385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,7 @@ public class HTTPClient {

task.setConnection(connection)

let isCancelled = task.lock.withLock {
task.cancelled
}

if !isCancelled {
if !task.isCancelled {
return channel.writeAndFlush(request).flatMapError { _ in
// At this point the `TaskHandler` will already be present
// to handle the failure and pass it to the `promise`
Expand Down
54 changes: 36 additions & 18 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -633,17 +633,27 @@ extension HTTPClient {

let promise: EventLoopPromise<Response>
var completion: EventLoopFuture<Void>
var connection: Connection?
var cancelled: Bool
let lock: Lock
private let lock = Lock()
// protected by lock
private var _connection: Connection?
// protected by lock
private var _cancelled: Bool = false
let logger: Logger // We are okay to store the logger here because a Task is for only one request.

var isCancelled: Bool {
self.lock.withLock { self._cancelled }
}

/// The connection the request is scheduled on.
/// This is used for tests only.
var connection: Connection? {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this is only used in tests: can we mark it clearly for that purpose?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wasn't clear: I meant changing the name to var _testOnly_connection

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used on L715, maybe make it private and add an internal _testOnly_connection per Cory's suggestion?

self.lock.withLock { self._connection }
}

init(eventLoop: EventLoop, logger: Logger) {
self.eventLoop = eventLoop
self.promise = eventLoop.makePromise()
self.completion = self.promise.futureResult.map { _ in }
self.cancelled = false
self.lock = Lock()
self.logger = logger
}

Expand All @@ -669,24 +679,24 @@ extension HTTPClient {
/// Cancels the request execution.
public func cancel() {
let channel: Channel? = self.lock.withLock {
if !self.cancelled {
self.cancelled = true
return self.connection?.channel
if !self._cancelled {
self._cancelled = true
return self._connection?.channel
} else {
return nil
}
}
channel?.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
}

@discardableResult
func setConnection(_ connection: Connection) -> Connection {
return self.lock.withLock {
self.connection = connection
if self.cancelled {
connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
}
return connection
func setConnection(_ connection: Connection) {
let cancelled = self.lock.withLock { () -> Bool in
self._connection = connection
return self._cancelled
}

if cancelled {
connection.channel.triggerUserOutboundEvent(TaskCancelEvent(), promise: nil)
}
}

Expand All @@ -702,7 +712,9 @@ extension HTTPClient {

func fail<Delegate: HTTPClientResponseDelegate>(with error: Error,
delegateType: Delegate.Type) {
if let connection = self.connection {
let maybeConnection = self.connection

if let connection = maybeConnection {
self.releaseAssociatedConnection(delegateType: delegateType, closing: true)
.whenSuccess {
self.promise.fail(error)
Expand All @@ -716,7 +728,13 @@ extension HTTPClient {

func releaseAssociatedConnection<Delegate: HTTPClientResponseDelegate>(delegateType: Delegate.Type,
closing: Bool) -> EventLoopFuture<Void> {
if let connection = self.connection {
let maybeConnection = self.lock.withLock { () -> Connection? in
let connection = self._connection
self._connection = nil
return connection
}

if let connection = maybeConnection {
// remove read timeout handler
return connection.removeHandler(IdleStateHandler.self).flatMap {
connection.removeHandler(TaskHandler<Delegate>.self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ extension HTTPClientInternalTests {
("testRequestFinishesAfterRedirectIfServerRespondsBeforeClientFinishes", testRequestFinishesAfterRedirectIfServerRespondsBeforeClientFinishes),
("testProxyStreaming", testProxyStreaming),
("testProxyStreamingFailure", testProxyStreamingFailure),
("testUploadStreamingBackpressure", testUploadStreamingBackpressure),
("testDownloadStreamingBackpressure", testDownloadStreamingBackpressure),
("testRequestURITrailingSlash", testRequestURITrailingSlash),
("testChannelAndDelegateOnDifferentEventLoops", testChannelAndDelegateOnDifferentEventLoops),
("testResponseConnectionCloseGet", testResponseConnectionCloseGet),
Expand Down
82 changes: 59 additions & 23 deletions Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

@testable import AsyncHTTPClient
import Logging
import NIO
import NIOConcurrencyHelpers
import NIOHTTP1
Expand Down Expand Up @@ -323,21 +324,25 @@ class HTTPClientInternalTests: XCTestCase {
// of 4 bytes. This will guarantee that if we see first byte of the message, other
// bytes a ready to be read as well. This will allow us to test if subsequent reads
// are waiting for backpressure promise.
func testUploadStreamingBackpressure() throws {
func testDownloadStreamingBackpressure() throws {
class BackpressureTestDelegate: HTTPClientResponseDelegate {
typealias Response = Void

var _reads = 0
let lock: Lock
let backpressurePromise: EventLoopPromise<Void>
let channel: Channel

let optionsApplied: EventLoopPromise<Void>
let messageReceived: EventLoopPromise<Void>
let backpressureFuture: EventLoopFuture<Void>
let firstBodyPartReceived: EventLoopPromise<Void>

init(eventLoop: EventLoop) {
init(channel: Channel, writeBodyPromise: EventLoopPromise<Void>, writeEndFuture: EventLoopFuture<Void>) {
self.lock = Lock()
self.backpressurePromise = eventLoop.makePromise()
self.optionsApplied = eventLoop.makePromise()
self.messageReceived = eventLoop.makePromise()

self.channel = channel
self.optionsApplied = writeBodyPromise
self.backpressureFuture = writeEndFuture
self.firstBodyPartReceived = channel.eventLoop.makePromise()
}

var reads: Int {
Expand All @@ -348,8 +353,8 @@ class HTTPClientInternalTests: XCTestCase {

func didReceiveHead(task: HTTPClient.Task<Void>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
// This is to force NIO to send only 1 byte at a time.
let future = task.connection!.channel.setOption(ChannelOptions.maxMessagesPerRead, value: 1).flatMap {
task.connection!.channel.setOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 1))
let future = self.channel.setOption(ChannelOptions.maxMessagesPerRead, value: 1).flatMap {
self.channel.setOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 1))
}
future.cascade(to: self.optionsApplied)
return future
Expand All @@ -361,8 +366,8 @@ class HTTPClientInternalTests: XCTestCase {
self._reads += 1
}
// We need to notify the test when first byte of the message is arrived.
self.messageReceived.succeed(())
return self.backpressurePromise.futureResult
self.firstBodyPartReceived.succeed(())
return self.self.backpressureFuture
}

func didFinishRequest(task: HTTPClient.Task<Response>) throws {}
Expand Down Expand Up @@ -403,37 +408,68 @@ class HTTPClientInternalTests: XCTestCase {

// cannot test with NIOTS as `maxMessagesPerRead` is not supported
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
let delegate = BackpressureTestDelegate(eventLoop: httpClient.eventLoopGroup.next())
let eventLoop = eventLoopGroup.next()
let writeBodyPromise = eventLoop.makePromise(of: Void.self)
let writeEndPromise = eventLoop.makePromise(of: Void.self)
let httpBin = HTTPBin { _ in
WriteAfterFutureSucceedsHandler(
bodyFuture: delegate.optionsApplied.futureResult,
endFuture: delegate.backpressurePromise.futureResult
bodyFuture: writeBodyPromise.futureResult,
endFuture: writeEndPromise.futureResult
)
}

defer {
XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true))
XCTAssertNoThrow(try httpBin.shutdown())
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
}

let request = try Request(url: "http://localhost:\(httpBin.port)/custom")

let requestFuture = httpClient.execute(request: request, delegate: delegate).futureResult
let logger = Logger(label: "test-connection")

let clientFactory = HTTPConnectionPool.ConnectionFactory(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has this dropped down to be so low-level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to not expose the connection on the task in the future. In order to have control over the channel, we need to create it ourselves here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one other test remaining that accesses the task.connection:

testWeCanActuallyExactlySetTheEventLoops

We should be able to remove task.connection from testWeCanActuallyExactlySetTheEventLoops, once the new connection pool has landed. We would test only the delegate calls, since all connection eventLoop requirements can be tested with unit tests and will be tested with unit tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, can we encapsulate this new code in a function that clearly explains what it does? The code here is very aware of implementation details of AHC, which suggests the test as a whole needs rewriting, but in the near term I'd be ok with just hiding the implementation details in a function.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still open.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump. 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ignored it for now, since I think we should be able to just remove this test, once other PRs have landed. The functionality will be tested by unit tests.

key: .init(request),
tlsConfiguration: nil,
clientConfiguration: .init(),
sslContextCache: .init()
)
var maybeChannel: Channel?
XCTAssertNoThrow(maybeChannel = try clientFactory.makeHTTP1Channel(
connectionID: 1,
deadline: .now() + .seconds(10),
eventLoop: eventLoopGroup.next(),
logger: logger
).wait())

guard let channel = maybeChannel else { return XCTFail("Expected to have a channel at this point") }

let delegate = BackpressureTestDelegate(
channel: channel,
writeBodyPromise: writeBodyPromise,
writeEndFuture: writeEndPromise.futureResult
)
let task = HTTPClient.Task<BackpressureTestDelegate.Response>(eventLoop: eventLoop, logger: logger)

let taskHandler = TaskHandler(task: task,
kind: request.kind,
delegate: delegate,
redirectHandler: nil,
ignoreUncleanSSLShutdown: true,
logger: logger)

XCTAssertNoThrow(try channel.pipeline.addHandler(taskHandler).wait())
XCTAssertNoThrow(try channel.writeAndFlush(request).wait())

// We need to wait for channel options that limit NIO to sending only one byte at a time.
try delegate.optionsApplied.futureResult.wait()
XCTAssertNoThrow(try delegate.optionsApplied.futureResult.wait())

// Send 4 bytes, but only one should be received until the backpressure promise is succeeded.

// Now we wait until message is delivered to client channel pipeline
try delegate.messageReceived.futureResult.wait()
XCTAssertNoThrow(try delegate.firstBodyPartReceived.futureResult.wait())
XCTAssertEqual(delegate.reads, 1)

// Succeed the backpressure promise.
delegate.backpressurePromise.succeed(())
try requestFuture.wait()
writeEndPromise.succeed(())
XCTAssertNoThrow(try task.futureResult.wait())

// At this point all other bytes should be delivered.
XCTAssertEqual(delegate.reads, 4)
Expand Down