diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift index 8a061bfa3..d20c94d02 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine+Demand.swift @@ -51,13 +51,31 @@ extension HTTPRequestStateMachine { buffer.append(body) self.state = .waitingForBytes(buffer) - case .waitingForRead, - .waitingForDemand, - .waitingForReadOrDemand: - preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") - case .modifying: preconditionFailure("Invalid state: \(self.state)") + + // For all the following cases, please note: + // Normally these code paths should never be hit. However there is one way to trigger + // this: + // + // If the server decides to close a connection, NIO will forward all outstanding + // `channelRead`s without waiting for a next `context.read` call. For this reason we + // might receive further bytes, when we don't expect them here. + + case .waitingForRead(var buffer): + self.state = .modifying + buffer.append(body) + self.state = .waitingForRead(buffer) + + case .waitingForDemand(var buffer): + self.state = .modifying + buffer.append(body) + self.state = .waitingForDemand(buffer) + + case .waitingForReadOrDemand(var buffer): + self.state = .modifying + buffer.append(body) + self.state = .waitingForReadOrDemand(buffer) } } @@ -134,15 +152,26 @@ extension HTTPRequestStateMachine { } } - mutating func end() -> CircularBuffer { + enum ConnectionAction { + case none + case close + } + + mutating func end() -> (CircularBuffer, ConnectionAction) { switch self.state { case .waitingForBytes(let buffer): - return buffer - - case .waitingForReadOrDemand, - .waitingForRead, - .waitingForDemand: - preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)") + return (buffer, .none) + + case .waitingForReadOrDemand(let buffer), + .waitingForRead(let buffer), + .waitingForDemand(let buffer): + // Normally this code path should never be hit. However there is one way to trigger + // this: + // + // If the server decides to close a connection, NIO will forward all outstanding + // `channelRead`s without waiting for a next `context.read` call. For this reason we + // might receive a call to `end()`, when we don't expect it here. + return (buffer, .close) case .modifying: preconditionFailure("Invalid state: \(self.state)") diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift index be10e9418..34d2ec433 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift @@ -523,12 +523,22 @@ struct HTTPRequestStateMachine { where head.status.code < 300: return self.avoidingStateMachineCoW { state -> Action in - let remainingBuffer = responseStreamState.end() - state = .running( - .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState), - .endReceived - ) - return .forwardResponseBodyParts(remainingBuffer) + let (remainingBuffer, connectionAction) = responseStreamState.end() + switch connectionAction { + case .none: + state = .running( + .streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState), + .endReceived + ) + return .forwardResponseBodyParts(remainingBuffer) + case .close: + // If we receive a `.close` as a connectionAction from the responseStreamState + // this means, that the response end was signaled by a connection close. Since + // the request is still uploading, we will not be able to finish the upload. For + // this reason we can fail the request here. + state = .failed(HTTPClientError.remoteConnectionClosed) + return .failRequest(HTTPClientError.remoteConnectionClosed, .close) + } } case .running(.streaming(_, _, let producerState), .receivingBody(let head, var responseStreamState)): @@ -536,16 +546,23 @@ struct HTTPRequestStateMachine { assert(producerState == .paused, "Expected to have paused the request body stream, when the head was received. Invalid state: \(self.state)") return self.avoidingStateMachineCoW { state -> Action in - let remainingBuffer = responseStreamState.end() + // We can ignore the connectionAction from the responseStreamState, since the + // connection should be closed anyway. + let (remainingBuffer, _) = responseStreamState.end() state = .finished return .succeedRequest(.close, remainingBuffer) } case .running(.endSent, .receivingBody(_, var responseStreamState)): return self.avoidingStateMachineCoW { state -> Action in - let remainingBuffer = responseStreamState.end() + let (remainingBuffer, action) = responseStreamState.end() state = .finished - return .succeedRequest(.none, remainingBuffer) + switch action { + case .none: + return .succeedRequest(.none, remainingBuffer) + case .close: + return .succeedRequest(.close, remainingBuffer) + } } case .running(_, .endReceived), .finished: diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift index 7fef14658..d600092ff 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests+XCTest.swift @@ -50,6 +50,9 @@ extension HTTPRequestStateMachineTests { ("testReadTimeoutThatFiresToLateIsIgnored", testReadTimeoutThatFiresToLateIsIgnored), ("testCancellationThatIsInvokedToLateIsIgnored", testCancellationThatIsInvokedToLateIsIgnored), ("testErrorWhileRunningARequestClosesTheStream", testErrorWhileRunningARequestClosesTheStream), + ("testCanReadHTTP1_0ResponseWithoutBody", testCanReadHTTP1_0ResponseWithoutBody), + ("testCanReadHTTP1_0ResponseWithBody", testCanReadHTTP1_0ResponseWithBody), + ("testFailHTTP1_0RequestThatIsStillUploading", testFailHTTP1_0RequestThatIsStillUploading), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift index b465c3484..3a11cfd43 100644 --- a/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift @@ -451,6 +451,60 @@ class HTTPRequestStateMachineTests: XCTestCase { XCTAssertEqual(state.errorHappened(NIOSSLError.uncleanShutdown), .failRequest(NIOSSLError.uncleanShutdown, .close)) XCTAssertEqual(state.requestCancelled(), .wait, "A cancellation that happens to late is ignored") } + + func testCanReadHTTP1_0ResponseWithoutBody() { + var state = HTTPRequestStateMachine(isChannelWritable: true) + let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") + let metadata = RequestFramingMetadata(connectionClose: false, body: .none) + XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) + + let responseHead = HTTPResponseHead(version: .http1_0, status: .internalServerError) + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [])) + XCTAssertEqual(state.channelInactive(), .wait) + } + + func testCanReadHTTP1_0ResponseWithBody() { + var state = HTTPRequestStateMachine(isChannelWritable: true) + let requestHead = HTTPRequestHead(version: .http1_1, method: .GET, uri: "/") + let metadata = RequestFramingMetadata(connectionClose: false, body: .none) + XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: false)) + + let responseHead = HTTPResponseHead(version: .http1_0, status: .internalServerError) + let body = ByteBuffer(string: "foo bar") + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.channelRead(.body(body)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, [body])) + XCTAssertEqual(state.channelInactive(), .wait) + } + + func testFailHTTP1_0RequestThatIsStillUploading() { + var state = HTTPRequestStateMachine(isChannelWritable: true) + let requestHead = HTTPRequestHead(version: .http1_1, method: .POST, uri: "/") + let metadata = RequestFramingMetadata(connectionClose: false, body: .stream) + XCTAssertEqual(state.startRequest(head: requestHead, metadata: metadata), .sendRequestHead(requestHead, startBody: true)) + + let part1: ByteBuffer = .init(string: "foo") + XCTAssertEqual(state.requestStreamPartReceived(.byteBuffer(part1)), .sendBodyPart(.byteBuffer(part1))) + let responseHead = HTTPResponseHead(version: .http1_0, status: .ok) + let body = ByteBuffer(string: "foo bar") + XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: false)) + XCTAssertEqual(state.demandMoreResponseBodyParts(), .wait) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.read(), .read) + XCTAssertEqual(state.channelReadComplete(), .wait) + XCTAssertEqual(state.channelRead(.body(body)), .wait) + XCTAssertEqual(state.channelRead(.end(nil)), .failRequest(HTTPClientError.remoteConnectionClosed, .close)) + XCTAssertEqual(state.channelInactive(), .wait) + } } extension HTTPRequestStateMachine.Action: Equatable {