Skip to content

Buffer channelReads in ResponseStreamState until the next channelReadComplete #388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO

extension HTTPRequestStateMachine {
/// A sub state for receiving a response events. Stores whether the consumer has either signaled demand and whether the
/// channel has issued `read` events.
struct ResponseStreamState {
private enum State {
/// The state machines expects further writes to `channelRead`. The writes are appended to the buffer.
case waitingForBytes(CircularBuffer<ByteBuffer>)
/// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is
/// empty. It is preserved for performance reasons.
case waitingForReadOrDemand(CircularBuffer<ByteBuffer>)
/// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons.
case waitingForRead(CircularBuffer<ByteBuffer>)
/// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is
/// preserved for performance reasons.
case waitingForDemand(CircularBuffer<ByteBuffer>)

case modifying
}

enum Action {
case read
case wait
}

private var state: State

init() {
self.state = .waitingForBytes(CircularBuffer(initialCapacity: 16))
}

mutating func receivedBodyPart(_ body: ByteBuffer) {
switch self.state {
case .waitingForBytes(var buffer):
self.state = .modifying
buffer.append(body)
self.state = .waitingForBytes(buffer)

case .waitingForRead,
.waitingForDemand,
.waitingForReadOrDemand:
preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}

mutating func channelReadComplete() -> CircularBuffer<ByteBuffer>? {
switch self.state {
case .waitingForBytes(let buffer):
if buffer.isEmpty {
self.state = .waitingForRead(buffer)
return nil
} else {
var newBuffer = buffer
newBuffer.removeAll(keepingCapacity: true)
self.state = .waitingForReadOrDemand(newBuffer)
return buffer
}

case .waitingForRead,
.waitingForDemand,
.waitingForReadOrDemand:
preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}

mutating func demandMoreResponseBodyParts() -> Action {
switch self.state {
case .waitingForDemand(let buffer):
self.state = .waitingForBytes(buffer)
return .read

case .waitingForReadOrDemand(let buffer):
self.state = .waitingForRead(buffer)
return .wait

case .waitingForRead:
// if we are `waitingForRead`, no action needs to be taken. Demand was already signalled
// once we receive the next `read`, we will forward it, right away
return .wait

case .waitingForBytes:
// if we are `.waitingForBytes`, no action needs to be taken. As soon as we receive
// the next channelReadComplete we will forward all buffered data
return .wait

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}

mutating func read() -> Action {
switch self.state {
case .waitingForBytes:
// This should never happen. But we don't want to precondition this behavior. Let's just
// pass the read event on
return .read

case .waitingForReadOrDemand(let buffer):
self.state = .waitingForDemand(buffer)
return .wait

case .waitingForRead(let buffer):
self.state = .waitingForBytes(buffer)
return .read

case .waitingForDemand:
// we have already received a read event. We will issue it as soon as we received demand
// from the consumer
return .wait

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}

mutating func end() -> CircularBuffer<ByteBuffer> {
switch self.state {
case .waitingForBytes(let buffer):
return buffer

case .waitingForReadOrDemand,
.waitingForRead,
.waitingForDemand:
preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
}
}
Loading