-
Notifications
You must be signed in to change notification settings - Fork 123
Add HTTPRequestStateMachine
#386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add HTTPRequestStateMachine
#386
Conversation
} | ||
|
||
fileprivate enum RequestState { | ||
enum ExpectedBody { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is entirely unused.
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
return .forwardResponseBodyPart(body, resetReadTimeoutTimer: self.idleReadTimeout) | ||
|
||
case .running(_, .endReceived), .finished: | ||
preconditionFailure("How can we sucessfully finish the request, before having received a head") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems out of place.
preconditionFailure("How can we receive a response head before sending a request head ourselves") | ||
|
||
case .running(_, .initialized): | ||
preconditionFailure("How can we receive a response body, if we haven't a received a head") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're receiving an end
, technically.
|
||
case .running(.streaming, .receivingBody(let streamState)): | ||
preconditionFailure("Unimplemented") | ||
#warning("@Fabian: We received response end, before sending our own request's end.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've discussed this.
} | ||
|
||
if case .endReceived = responseState { | ||
preconditionFailure("Invalid state: If we have received everything, we must not schedule further timeout timers") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit harsh: it's possible we failed to cancel the timer in time.
Co-authored-by: Cory Benfield <[email protected]>
// oh the channel reports... we should slow down producing... | ||
XCTAssertEqual(state.writabilityChanged(writable: false), .pauseRequestBodyStream) | ||
|
||
// but we issued a .produceMoreRequestBodyData before... Thus, we must accept more produced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow this, we're no longer writable but can still receive parts on the request stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this might happen because of race conditions. If we receive channelNotWritable
in thread A and we need to communicate it to thread B, B may have already created new data. For this reason, we need to accept more data even though we asked the producer to pause.
} | ||
|
||
enum Action { | ||
enum NextMessageToSend { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't appear to be used
case endReceived | ||
} | ||
|
||
enum Action { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, in the past I've found it slightly awkward when debugging code which relies on a generic Action
(rather than one action type per function on the state machine) since it's not clear at the call site what actions may occur as a result of prodding the state machine. I.e. writabilityChanged
can only result in three of the 12 defined actions (wait
, resumeRequestBodyStream
and pauseRequestBodyStream
) but that isn't clear when calling writabilityChanged
. This is entirely personal preference though.
} | ||
|
||
mutating func readEventCaught() -> Action { | ||
return .read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we unconditionally read? Does this not just apply to the running
state?
|
||
mutating func start() -> Action { | ||
guard case .initialized = self.state else { | ||
preconditionFailure("Invalid state") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment indicating that start()
must be called first and exactly once would be useful.
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
case stream | ||
} | ||
|
||
enum ProducerControlState: Equatable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just make this type :String
, and you can avoid the CustomStringConvertible
below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bump
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
255da7a
to
b899969
Compare
b899969
to
7e5ad26
Compare
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
|
||
/// The request is streaming its request body. `expectedBodyLength` has a value, if the request header contained | ||
/// a `"content-length"` header field. It is the request header contained a `"transfer-encoding" = "chunked"` | ||
/// header field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second sentence here is something I don't understand: can you rewrite it?
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
case waitingForHead | ||
/// A response head has been received and we are ready to consume more data of the wire | ||
case receivingBody(HTTPResponseHead, ConsumerControlState) | ||
/// A response end has been received and we are ready to consume more data of the wire |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be saying "ready to consume more data off the wire" here?
self.state = .running(requestState, .receivingBody(responseHead, .downstreamIsConsuming(readPending: true))) | ||
return .wait | ||
case .running(let requestState, .receivingBody(let responseHead, .downstreamHasDemand)): | ||
self.state = .running(requestState, .receivingBody(responseHead, .downstreamHasDemand)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no state change here. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Added a comment.
// situations in which the producer might not have received the plea to pause yet. | ||
|
||
if let expected = expectedBodyLength { | ||
if sentBodyBytes + part.readableBytes > expected { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two if statements can be collapsed.
clearReadTimeoutTimer = true | ||
} | ||
|
||
return .failRequest(error, .close, clearReadTimeoutTimer: clearReadTimeoutTimer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason this state machine has to keep track of whether we need to clear the read timeout timer? Presumably someone has set the timer, and they know if they have one or not, so should unconditionally clear it.
} | ||
|
||
self.state = .finished | ||
return .succeedRequest(.none, clearReadTimeoutTimer: false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't know why the read timeout isn't getting cleared here. I think whether the read timeout is cleared or reset should not be the responsibility of this state machine: the read timeout should get set when either we've sent our .end
or received our first .head
, and from that point on it shouldn't be our problem. It adds too many responsibilities to this object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, if you really want it to be composed into this state machine, factor it out into a little state machine of its own and give it an interface that you can call from here. It'll make it much easier to validate its correctness. But honestly I think it's overcomplicating this object.
let finalAction: Action.FinalStreamAction | ||
switch streamState { | ||
case .downstreamIsConsuming(readPending: true): | ||
finalAction = .read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand why the consumer has any control over what we do with the HTTP connection at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get your point here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does our finalAction
depend on what the consumer is doing with the data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The finalAction
depends on whether backpressure was used. If we receive the response end and we have a read event buffered, I think we should make sure this read event is passed forward. This way we ensure that the channel is in a "normal" state (no buffered read event), once we start the next request. I see it as a cleanup job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing substantive from me. The comments were helpful though, thanks for adding them all!
Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift
Outdated
Show resolved
Hide resolved
Co-authored-by: Cory Benfield <[email protected]> Co-authored-by: George Barnett <[email protected]>
case stream | ||
} | ||
|
||
enum ProducerControlState: Equatable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bump
cd55229
to
660ad20
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I think I'm happy with this as it stands.
This PR adds an explicit
HTTPRequestStateMachine
. It is intended to be used in two instances in the future:HTTP1ConnectionStateMachine
, when the connection is in a request.HTTP2RequestHandler
that operates on a single http2 stream created by the multiplexer.It remains to be seen, if we want to handle buffer body parts for backpressure directly in the
HTTPExecutingRequest
(see also #384) or directly theHTTPRequestStateMachine
.