Skip to content

Commit bef8878

Browse files
authored
ChannelRead because of closing connection: Remove preconditions (#430)
### Motivation NIO may send `channelRead` events without a handlers requesting more data with a `context.read()` invocation. This happens if the remote has closed the connection and NIO wants to inform the handlers as soon as possible. ### Changes - Don't `precondition` on `channelRead` events anymore. - Close channel if we received an http end without a `context.read()` invocation
1 parent 7c9662d commit bef8878

File tree

4 files changed

+124
-21
lines changed

4 files changed

+124
-21
lines changed

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift

+41-12
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,31 @@ extension HTTPRequestStateMachine {
5151
buffer.append(body)
5252
self.state = .waitingForBytes(buffer)
5353

54-
case .waitingForRead,
55-
.waitingForDemand,
56-
.waitingForReadOrDemand:
57-
preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
58-
5954
case .modifying:
6055
preconditionFailure("Invalid state: \(self.state)")
56+
57+
// For all the following cases, please note:
58+
// Normally these code paths should never be hit. However there is one way to trigger
59+
// this:
60+
//
61+
// If the server decides to close a connection, NIO will forward all outstanding
62+
// `channelRead`s without waiting for a next `context.read` call. For this reason we
63+
// might receive further bytes, when we don't expect them here.
64+
65+
case .waitingForRead(var buffer):
66+
self.state = .modifying
67+
buffer.append(body)
68+
self.state = .waitingForRead(buffer)
69+
70+
case .waitingForDemand(var buffer):
71+
self.state = .modifying
72+
buffer.append(body)
73+
self.state = .waitingForDemand(buffer)
74+
75+
case .waitingForReadOrDemand(var buffer):
76+
self.state = .modifying
77+
buffer.append(body)
78+
self.state = .waitingForReadOrDemand(buffer)
6179
}
6280
}
6381

@@ -134,15 +152,26 @@ extension HTTPRequestStateMachine {
134152
}
135153
}
136154

137-
mutating func end() -> CircularBuffer<ByteBuffer> {
155+
enum ConnectionAction {
156+
case none
157+
case close
158+
}
159+
160+
mutating func end() -> (CircularBuffer<ByteBuffer>, ConnectionAction) {
138161
switch self.state {
139162
case .waitingForBytes(let buffer):
140-
return buffer
141-
142-
case .waitingForReadOrDemand,
143-
.waitingForRead,
144-
.waitingForDemand:
145-
preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
163+
return (buffer, .none)
164+
165+
case .waitingForReadOrDemand(let buffer),
166+
.waitingForRead(let buffer),
167+
.waitingForDemand(let buffer):
168+
// Normally this code path should never be hit. However there is one way to trigger
169+
// this:
170+
//
171+
// If the server decides to close a connection, NIO will forward all outstanding
172+
// `channelRead`s without waiting for a next `context.read` call. For this reason we
173+
// might receive a call to `end()`, when we don't expect it here.
174+
return (buffer, .close)
146175

147176
case .modifying:
148177
preconditionFailure("Invalid state: \(self.state)")

Diff for: Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift

+26-9
Original file line numberDiff line numberDiff line change
@@ -523,29 +523,46 @@ struct HTTPRequestStateMachine {
523523
where head.status.code < 300:
524524

525525
return self.avoidingStateMachineCoW { state -> Action in
526-
let remainingBuffer = responseStreamState.end()
527-
state = .running(
528-
.streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState),
529-
.endReceived
530-
)
531-
return .forwardResponseBodyParts(remainingBuffer)
526+
let (remainingBuffer, connectionAction) = responseStreamState.end()
527+
switch connectionAction {
528+
case .none:
529+
state = .running(
530+
.streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState),
531+
.endReceived
532+
)
533+
return .forwardResponseBodyParts(remainingBuffer)
534+
case .close:
535+
// If we receive a `.close` as a connectionAction from the responseStreamState
536+
// this means, that the response end was signaled by a connection close. Since
537+
// the request is still uploading, we will not be able to finish the upload. For
538+
// this reason we can fail the request here.
539+
state = .failed(HTTPClientError.remoteConnectionClosed)
540+
return .failRequest(HTTPClientError.remoteConnectionClosed, .close)
541+
}
532542
}
533543

534544
case .running(.streaming(_, _, let producerState), .receivingBody(let head, var responseStreamState)):
535545
assert(head.status.code >= 300)
536546
assert(producerState == .paused, "Expected to have paused the request body stream, when the head was received. Invalid state: \(self.state)")
537547

538548
return self.avoidingStateMachineCoW { state -> Action in
539-
let remainingBuffer = responseStreamState.end()
549+
// We can ignore the connectionAction from the responseStreamState, since the
550+
// connection should be closed anyway.
551+
let (remainingBuffer, _) = responseStreamState.end()
540552
state = .finished
541553
return .succeedRequest(.close, remainingBuffer)
542554
}
543555

544556
case .running(.endSent, .receivingBody(_, var responseStreamState)):
545557
return self.avoidingStateMachineCoW { state -> Action in
546-
let remainingBuffer = responseStreamState.end()
558+
let (remainingBuffer, action) = responseStreamState.end()
547559
state = .finished
548-
return .succeedRequest(.none, remainingBuffer)
560+
switch action {
561+
case .none:
562+
return .succeedRequest(.none, remainingBuffer)
563+
case .close:
564+
return .succeedRequest(.close, remainingBuffer)
565+
}
549566
}
550567

551568
case .running(_, .endReceived), .finished:

Diff for: Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift

+3
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ extension HTTPRequestStateMachineTests {
5050
("testReadTimeoutThatFiresToLateIsIgnored", testReadTimeoutThatFiresToLateIsIgnored),
5151
("testCancellationThatIsInvokedToLateIsIgnored", testCancellationThatIsInvokedToLateIsIgnored),
5252
("testErrorWhileRunningARequestClosesTheStream", testErrorWhileRunningARequestClosesTheStream),
53+
("testCanReadHTTP1_0ResponseWithoutBody", testCanReadHTTP1_0ResponseWithoutBody),
54+
("testCanReadHTTP1_0ResponseWithBody", testCanReadHTTP1_0ResponseWithBody),
55+
("testFailHTTP1_0RequestThatIsStillUploading", testFailHTTP1_0RequestThatIsStillUploading),
5356
]
5457
}
5558
}

Diff for: Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift

+54
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,60 @@ class HTTPRequestStateMachineTests: XCTestCase {
451451
XCTAssertEqual(state.errorHappened(NIOSSLError.uncleanShutdown), .failRequest(NIOSSLError.uncleanShutdown, .close))
452452
XCTAssertEqual(state.requestCancelled(), .wait, "A cancellation that happens to late is ignored")
453453
}
454+
455+
func testCanReadHTTP1_0ResponseWithoutBody() {
456+
var state = HTTPRequestStateMachine(isChannelWritable: true)
457+
let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/")
458+
let metadata = RequestFramingMetadata(connectionClose: false, body: .none)
459+
XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false))
460+
461+
let responseHead = HTTPResponseHead(version: .http1_0, status: .internalServerError)
462+
XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false))
463+
XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait)
464+
XCTAssertEqual(state.channelReadComplete(), .wait)
465+
XCTAssertEqual(state.read(), .read)
466+
XCTAssertEqual(state.channelReadComplete(), .wait)
467+
XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, []))
468+
XCTAssertEqual(state.channelInactive(), .wait)
469+
}
470+
471+
func testCanReadHTTP1_0ResponseWithBody() {
472+
var state = HTTPRequestStateMachine(isChannelWritable: true)
473+
let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/")
474+
let metadata = RequestFramingMetadata(connectionClose: false, body: .none)
475+
XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false))
476+
477+
let responseHead = HTTPResponseHead(version: .http1_0, status: .internalServerError)
478+
let body = ByteBuffer(string: "foo bar")
479+
XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false))
480+
XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait)
481+
XCTAssertEqual(state.channelReadComplete(), .wait)
482+
XCTAssertEqual(state.read(), .read)
483+
XCTAssertEqual(state.channelReadComplete(), .wait)
484+
XCTAssertEqual(state.channelRead(.body(body)), .wait)
485+
XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [body]))
486+
XCTAssertEqual(state.channelInactive(), .wait)
487+
}
488+
489+
func testFailHTTP1_0RequestThatIsStillUploading() {
490+
var state = HTTPRequestStateMachine(isChannelWritable: true)
491+
let requestHead = HTTPRequestHead(version: .http1_1, method: .POST, uri: "/")
492+
let metadata = RequestFramingMetadata(connectionClose: false, body: .stream)
493+
XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: true))
494+
495+
let part1: ByteBuffer = .init(string: "foo")
496+
XCTAssertEqual(state.requestStreamPartReceived(.byteBuffer(part1)), .sendBodyPart(.byteBuffer(part1)))
497+
let responseHead = HTTPResponseHead(version: .http1_0, status: .ok)
498+
let body = ByteBuffer(string: "foo bar")
499+
XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false))
500+
XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait)
501+
XCTAssertEqual(state.channelReadComplete(), .wait)
502+
XCTAssertEqual(state.read(), .read)
503+
XCTAssertEqual(state.channelReadComplete(), .wait)
504+
XCTAssertEqual(state.channelRead(.body(body)), .wait)
505+
XCTAssertEqual(state.channelRead(.end(nil)), .failRequest(HTTPClientError.remoteConnectionClosed, .close))
506+
XCTAssertEqual(state.channelInactive(), .wait)
507+
}
454508
}
455509

456510
extension HTTPRequestStateMachine.Action: Equatable {

0 commit comments

Comments
 (0)