Skip to content

HTTPSchedulableRequest requires HTTPExecutableRequest #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ extension HTTPConnectionPool {
struct RequestID: Hashable {
private let objectIdentifier: ObjectIdentifier

init(_ request: HTTPScheduledRequest) {
init(_ request: HTTPSchedulableRequest) {
self.objectIdentifier = ObjectIdentifier(request)
}
}
@@ -28,7 +28,7 @@ extension HTTPConnectionPool {
RequestID(self.request)
}

var request: HTTPScheduledRequest
var request: HTTPSchedulableRequest

private var eventLoopRequirement: EventLoop? {
switch self.request.eventLoopPreference.preference {
@@ -41,7 +41,7 @@ extension HTTPConnectionPool {
}
}

init(request: HTTPScheduledRequest) {
init(request: HTTPSchedulableRequest) {
self.request = request
}

Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ enum HTTPConnectionPool {
}
}

fileprivate func execute(request: HTTPExecutingRequest) {
fileprivate func execute(request: HTTPExecutableRequest) {
switch self._ref {
// case .http1_1(let connection):
// return connection.execute(request: request)
Original file line number Diff line number Diff line change
@@ -38,34 +38,34 @@ import NIOHTTP1
/// When the `HTTPClient` shall send an HTTP request, it will use its `HTTPConnectionPool.Manager` to
/// determine the `HTTPConnectionPool` to run the request on. After a `HTTPConnectionPool` has been
/// found for the request, the request will be executed on this connection pool. Since the HTTP
/// request implements the `HTTPScheduledRequest` protocol, the HTTP connection pool can communicate
/// request implements the `HTTPSchedulableRequest` protocol, the HTTP connection pool can communicate
/// with the request. The `HTTPConnectionPool` implements the `HTTPRequestScheduler` protocol.
///
/// 1. The `HTTPConnectionPool` tries to find an idle connection for the request based on its
/// `eventLoopPreference`.
///
/// 2. If an idle connection is available to the request, the request will be passed to the
/// connection right away. In this case the `HTTPConnectionPool` will only use the
/// `HTTPScheduledRequest`'s `eventLoopPreference` property. No other methods will be called.
/// `HTTPSchedulableRequest`'s `eventLoopPreference` property. No other methods will be called.
///
/// 3. If no idle connection is available to the request, the request will be queued for execution:
/// - The `HTTPConnectionPool` will inform the request that it is queued for execution by
/// calling: `requestWasQueued(_: HTTPRequestScheduler)`. The request must store a reference
/// to the `HTTPRequestScheduler`. The request must call `cancelRequest(self)` on the
/// scheduler, if the request was cancelled, while waiting for execution.
/// - The `HTTPConnectionPool` will create a connection deadline based on the
/// `HTTPScheduledRequest`'s `connectionDeadline` property. If a connection to execute the
/// `HTTPSchedulableRequest`'s `connectionDeadline` property. If a connection to execute the
/// request on, was not found before this deadline the request will be failed.
/// - The HTTPConnectionPool will call `fail(_: Error)` on the `HTTPScheduledRequest` to
/// - The HTTPConnectionPool will call `fail(_: Error)` on the `HTTPSchedulableRequest` to
/// inform the request about having overrun the `connectionDeadline`.
///
///
/// ## Request is executing
///
/// After the `HTTPConnectionPool` has identified a connection for the request to be run on, it will
/// execute the request on this connection. (Implementation detail: This happens by writing the
/// `HTTPExecutingRequest` to a `NIO.Channel`. We expect the last handler in the `ChannelPipeline`
/// to have an `OutboundIn` type of `HTTPExecutingRequest`. Further we expect that the handler
/// `HTTPExecutableRequest` to a `NIO.Channel`. We expect the last handler in the `ChannelPipeline`
/// to have an `OutboundIn` type of `HTTPExecutableRequest`. Further we expect that the handler
/// also conforms to the protocol `HTTPRequestExecutor` to allow communication of the request with
/// the executor/`ChannelHandler`).
///
@@ -74,7 +74,7 @@ import NIOHTTP1
/// 1. The request executor will call `willExecuteRequest(_: HTTPRequestExecutor)` on the
/// request. The request is expected to keep a reference to the `HTTPRequestExecutor` that was
/// passed to the request for further communication.
/// 2. The request sending is started by the executor accessing the `HTTPExecutingRequest`'s
/// 2. The request sending is started by the executor accessing the `HTTPExecutableRequest`'s
/// property `requestHead: HTTPRequestHead`. Based on the `requestHead` the executor can
/// determine if the request has a body (Is a "content-length" or "transfer-encoding"
/// header present?).
@@ -83,31 +83,31 @@ import NIOHTTP1
/// `requestHeadSent(_: HTTPRequestHead)`
/// 4. If the request has a body the request executor will, ask the request for body data, by
/// calling `startRequestBodyStream()`. The request is expected to call
/// `writeRequestBodyPart(_: IOData, task: HTTPExecutingRequest)` on the executor with body
/// `writeRequestBodyPart(_: IOData, task: HTTPExecutableRequest)` on the executor with body
/// data.
/// - The executor can signal backpressure to the request by calling
/// `pauseRequestBodyStream()`. In this case the request is expected to stop calling
/// `writeRequestBodyPart(_: IOData, task: HTTPExecutingRequest)`. However because of race
/// `writeRequestBodyPart(_: IOData, task: HTTPExecutableRequest)`. However because of race
/// conditions the executor is prepared to process more data, even though it asked the
/// request to pause.
/// - Once the executor is able to send more data, it will notify the request by calling
/// `resumeRequestBodyStream()` on the request.
/// - The request shall call `finishRequestBodyStream()` on the executor to signal that the
/// request body was sent.
/// 5. Once the executor receives a http response from the Channel, it will forward the http
/// response head to the `HTTPExecutingRequest` by calling `receiveResponseHead` on it.
/// response head to the `HTTPExecutableRequest` by calling `receiveResponseHead` on it.
/// - The executor will forward all the response body parts it receives in a single read to
/// the `HTTPExecutingRequest` without any buffering by calling
/// the `HTTPExecutableRequest` without any buffering by calling
/// `receiveResponseBodyPart(_ buffer: ByteBuffer)` right away. It is the task's job to
/// buffer the responses for user consumption.
/// - Once the executor has finished a read, it will not schedule another read, until the
/// request calls `demandResponseBodyStream(task: HTTPExecutingRequest)` on the executor.
/// request calls `demandResponseBodyStream(task: HTTPExecutableRequest)` on the executor.
/// - Once the executor has received the response's end, it will forward this message by
/// calling `receiveResponseEnd()` on the `HTTPExecutingRequest`.
/// calling `receiveResponseEnd()` on the `HTTPExecutableRequest`.
/// 6. If a channel error occurs during the execution of the request, or if the channel becomes
/// inactive the executor will notify the request by calling `fail(_ error: Error)` on it.
/// 7. If the request is cancelled, while it is executing on the executor, it must call
/// `cancelRequest(task: HTTPExecutingRequest)` on the executor.
/// `cancelRequest(task: HTTPExecutableRequest)` on the executor.
///
///
/// ## Further notes
@@ -133,13 +133,13 @@ import NIOHTTP1
/// This protocol is only intended to be implemented by the `HTTPConnectionPool`.
protocol HTTPRequestScheduler {
/// Informs the task queuer that a request has been cancelled.
func cancelRequest(_: HTTPScheduledRequest)
func cancelRequest(_: HTTPSchedulableRequest)
}

/// An abstraction over a request that we want to send. A request may need to communicate with its request
/// queuer and executor. The client's methods will be called synchronously on an `EventLoop` by the
/// executor. For this reason it is very important that the implementation of these functions never blocks.
protocol HTTPScheduledRequest: AnyObject {
protocol HTTPSchedulableRequest: HTTPExecutableRequest {
/// The task's logger
var logger: Logger { get }

@@ -165,28 +165,28 @@ protocol HTTPRequestExecutor {
/// Writes a body part into the channel pipeline
///
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
func writeRequestBodyPart(_: IOData, request: HTTPExecutingRequest)
func writeRequestBodyPart(_: IOData, request: HTTPExecutableRequest)

/// Signals that the request body stream has finished
///
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
func finishRequestBodyStream(_ task: HTTPExecutingRequest)
func finishRequestBodyStream(_ task: HTTPExecutableRequest)

/// Signals that more bytes from response body stream can be consumed.
///
/// The request executor will call `receiveResponseBodyPart(_ buffer: ByteBuffer)` with more data after
/// this call.
///
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
func demandResponseBodyStream(_ task: HTTPExecutingRequest)
func demandResponseBodyStream(_ task: HTTPExecutableRequest)

/// Signals that the request has been cancelled.
///
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
func cancelRequest(_ task: HTTPExecutingRequest)
func cancelRequest(_ task: HTTPExecutableRequest)
}

protocol HTTPExecutingRequest: AnyObject {
protocol HTTPExecutableRequest: AnyObject {
/// The request's head.
///
/// The HTTP request head, that shall be sent. The HTTPRequestExecutor **will not** run any validation
4 changes: 2 additions & 2 deletions Sources/AsyncHTTPClient/RequestBag.swift
Original file line number Diff line number Diff line change
@@ -312,7 +312,7 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
}
}

extension RequestBag: HTTPScheduledRequest {
extension RequestBag: HTTPSchedulableRequest {
func requestWasQueued(_ scheduler: HTTPRequestScheduler) {
if self.task.eventLoop.inEventLoop {
self.requestWasQueued0(scheduler)
@@ -334,7 +334,7 @@ extension RequestBag: HTTPScheduledRequest {
}
}

extension RequestBag: HTTPExecutingRequest {
extension RequestBag: HTTPExecutableRequest {
func willExecuteRequest(_ executor: HTTPRequestExecutor) {
if self.task.eventLoop.inEventLoop {
self.willExecuteRequest0(executor)
37 changes: 36 additions & 1 deletion Tests/AsyncHTTPClientTests/HTTPConnectionPool+WaiterTests.swift
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
@testable import AsyncHTTPClient
import Logging
import NIO
import NIOHTTP1
import XCTest

class HTTPConnectionPool_WaiterTests: XCTestCase {
@@ -44,7 +45,7 @@ class HTTPConnectionPool_WaiterTests: XCTestCase {
}
}

private class MockScheduledRequest: HTTPScheduledRequest {
private class MockScheduledRequest: HTTPSchedulableRequest {
init(eventLoopPreference: HTTPClient.EventLoopPreference) {
self.eventLoopPreference = eventLoopPreference
}
@@ -60,4 +61,38 @@ private class MockScheduledRequest: HTTPScheduledRequest {
func fail(_: Error) {
preconditionFailure("Unimplemented")
}

// MARK: HTTPExecutableRequest

var requestHead: HTTPRequestHead { preconditionFailure("Unimplemented") }
var requestFramingMetadata: RequestFramingMetadata { preconditionFailure("Unimplemented") }
var idleReadTimeout: TimeAmount? { preconditionFailure("Unimplemented") }

func willExecuteRequest(_: HTTPRequestExecutor) {
preconditionFailure("Unimplemented")
}

func requestHeadSent() {
preconditionFailure("Unimplemented")
}

func resumeRequestBodyStream() {
preconditionFailure("Unimplemented")
}

func pauseRequestBodyStream() {
preconditionFailure("Unimplemented")
}

func receiveResponseHead(_: HTTPResponseHead) {
preconditionFailure("Unimplemented")
}

func receiveResponseBodyParts(_: CircularBuffer<ByteBuffer>) {
preconditionFailure("Unimplemented")
}

func succeedRequest(_: CircularBuffer<ByteBuffer>?) {
preconditionFailure("Unimplemented")
}
}
10 changes: 5 additions & 5 deletions Tests/AsyncHTTPClientTests/RequestBagTests.swift
Original file line number Diff line number Diff line change
@@ -402,22 +402,22 @@ class MockRequestExecutor: HTTPRequestExecutor {
// this should always be called twice. When we receive the first call, the next call to produce
// data is already scheduled. If we call pause here, once, after the second call new subsequent
// calls should not be scheduled.
func writeRequestBodyPart(_ part: IOData, request: HTTPExecutingRequest) {
func writeRequestBodyPart(_ part: IOData, request: HTTPExecutableRequest) {
if self.requestBodyParts.isEmpty, self.pauseRequestBodyPartStreamAfterASingleWrite {
request.pauseRequestBodyStream()
}
self.requestBodyParts.append(.body(part))
}

func finishRequestBodyStream(_: HTTPExecutingRequest) {
func finishRequestBodyStream(_: HTTPExecutableRequest) {
self.requestBodyParts.append(.endOfStream)
}

func demandResponseBodyStream(_: HTTPExecutingRequest) {
func demandResponseBodyStream(_: HTTPExecutableRequest) {
self.signalledDemandForResponseBody = true
}

func cancelRequest(_: HTTPExecutingRequest) {
func cancelRequest(_: HTTPExecutableRequest) {
self.isCancelled = true
}
}
@@ -490,7 +490,7 @@ class MockTaskQueuer: HTTPRequestScheduler {

init() {}

func cancelRequest(_: HTTPScheduledRequest) {
func cancelRequest(_: HTTPSchedulableRequest) {
self.hitCancelCount += 1
}
}