Skip to content

Commit d76029f

Browse files
committed
Extracting into extra state machine
1 parent acceb76 commit d76029f

File tree

2 files changed

+253
-93
lines changed

2 files changed

+253
-93
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
17+
extension HTTPRequestStateMachine {
18+
/// A sub state for receiving a response events. Stores whether the consumer has either signaled demand and whether the
19+
/// channel has issued `read` events.
20+
struct ResponseStreamState {
21+
private enum State {
22+
/// The state machines expects further writes to `channelRead`. The writes are appended to the buffer.
23+
case waitingForBytes(CircularBuffer<ByteBuffer>)
24+
/// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is
25+
/// empty. It is preserved for performance reasons.
26+
case waitingForReadOrDemand(CircularBuffer<ByteBuffer>)
27+
/// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons.
28+
case waitingForRead(CircularBuffer<ByteBuffer>)
29+
/// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is
30+
/// preserved for performance reasons.
31+
case waitingForDemand(CircularBuffer<ByteBuffer>)
32+
33+
case modifying
34+
}
35+
36+
enum Action {
37+
case read
38+
case wait
39+
}
40+
41+
private var state: State
42+
43+
init(expectingBody: Bool) {
44+
self.state = .waitingForBytes(CircularBuffer(initialCapacity: expectingBody ? 16 : 0))
45+
}
46+
47+
mutating func receivedBodyPart(_ body: ByteBuffer) {
48+
switch self.state {
49+
case .waitingForBytes(var buffer):
50+
self.state = .modifying
51+
buffer.append(body)
52+
self.state = .waitingForBytes(buffer)
53+
54+
case .waitingForRead,
55+
.waitingForDemand,
56+
.waitingForReadOrDemand:
57+
preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
58+
59+
case .modifying:
60+
preconditionFailure("Invalid state: \(self.state)")
61+
}
62+
}
63+
64+
mutating func channelReadComplete() -> CircularBuffer<ByteBuffer> {
65+
switch self.state {
66+
case .waitingForBytes(let buffer):
67+
var newBuffer = buffer
68+
newBuffer.removeAll(keepingCapacity: true)
69+
self.state = .waitingForReadOrDemand(newBuffer)
70+
return buffer
71+
72+
case .waitingForRead,
73+
.waitingForDemand,
74+
.waitingForReadOrDemand:
75+
preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
76+
77+
case .modifying:
78+
preconditionFailure("Invalid state: \(self.state)")
79+
}
80+
}
81+
82+
mutating func demandMoreResponseBodyParts() -> Action {
83+
switch self.state {
84+
case .waitingForDemand(let buffer):
85+
self.state = .waitingForBytes(buffer)
86+
return .read
87+
88+
case .waitingForReadOrDemand(let buffer):
89+
self.state = .waitingForRead(buffer)
90+
return .wait
91+
92+
case .waitingForRead:
93+
// if we are `waitingForRead`, no action needs to be taken. Demand was already signalled
94+
// once we receive the next `read`, we will forward it, right away
95+
return .wait
96+
97+
case .waitingForBytes:
98+
// if we are `.waitingForBytes`, no action needs to be taken. As soon as we receive
99+
// the next channelReadComplete we will forward all buffered data
100+
return .wait
101+
102+
case .modifying:
103+
preconditionFailure("Invalid state: \(self.state)")
104+
}
105+
}
106+
107+
mutating func read() -> Action {
108+
switch self.state {
109+
case .waitingForBytes:
110+
// This should never happen. But we don't want to precondition this behavior. Let's just
111+
// pass the read event on
112+
return .read
113+
114+
case .waitingForReadOrDemand(let buffer):
115+
self.state = .waitingForDemand(buffer)
116+
return .wait
117+
118+
case .waitingForRead(let buffer):
119+
self.state = .waitingForBytes(buffer)
120+
return .read
121+
122+
case .waitingForDemand:
123+
// we have already received a read event. We will issue it as soon as we received demand
124+
// from the consumer
125+
return .wait
126+
127+
case .modifying:
128+
preconditionFailure("Invalid state: \(self.state)")
129+
}
130+
}
131+
132+
mutating func end() -> CircularBuffer<ByteBuffer> {
133+
switch self.state {
134+
case .waitingForBytes(let buffer):
135+
// This should never happen. But we don't want to precondition this behavior. Let's just
136+
// pass the read event on
137+
return buffer
138+
139+
case .waitingForReadOrDemand,
140+
.waitingForRead,
141+
.waitingForDemand:
142+
preconditionFailure("How can we receive a body end, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
143+
144+
case .modifying:
145+
preconditionFailure("Invalid state: \(self.state)")
146+
}
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)