Skip to content

Commit a9aee26

Browse files
fabianfettglbrnttLukasa
authored
Add HTTPScheduledRequest and HTTPExecutingRequest (#384)
- Adding four new protocols: - `HTTPExecutingRequest` a protocol representing an HTTP task that is executed on a ChannelHandler. - `HTTPScheduledRequest` a protocol representing an HTTP task that is scheduled for execution, but in a waiting state (example: Waiting for an idle connection in the connection pool). - `HTTPRequestExecutor` a protocol that can be used from the `HTTPExecutingRequest` abstracting away functionality that will normally be implemented by a `ChannelHandler` - `HTTPRequestScheduler` a protocol that can be used from the `HTTPScheduledRequest` abstracting away functionality that will normally be implemented by a `ConnectionPool` - An implementation of the `HTTPExecutingTask` and `HTTPScheduledRequest` called `RequestBag`. It implements our current API using the new protocols Co-authored-by: George Barnett <[email protected]> Co-authored-by: Cory Benfield <[email protected]>
1 parent fcb0f21 commit a9aee26

9 files changed

+1720
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIO
17+
import NIOHTTP1
18+
19+
/// # Protocol Overview
20+
///
21+
/// To support different public request APIs we abstract the actual request implementations behind
22+
/// protocols. During the lifetime of a request, a request must conform to different protocols
23+
/// depending on which state it is in.
24+
///
25+
/// Generally there are two main states in a request's lifetime:
26+
///
27+
/// 1. **The request is scheduled to be run.**
28+
/// In this state the HTTP client tries to acquire a connection for the request, and the request
29+
/// may need to wait for a connection
30+
/// 2. **The request is executing.**
31+
/// In this state the request was written to a NIO channel. A NIO channel handler (abstracted
32+
/// by the `HTTPRequestExecutor` protocol) writes the request's bytes onto the wire and
33+
/// dispatches the http response bytes back to the response.
34+
///
35+
///
36+
/// ## Request is scheduled
37+
///
38+
/// When the `HTTPClient` shall send an HTTP request, it will use its `HTTPConnectionPool.Manager` to
39+
/// determine the `HTTPConnectionPool` to run the request on. After a `HTTPConnectionPool` has been
40+
/// found for the request, the request will be executed on this connection pool. Since the HTTP
41+
/// request implements the `HTTPScheduledRequest` protocol, the HTTP connection pool can communicate
42+
/// with the request. The `HTTPConnectionPool` implements the `HTTPRequestScheduler` protocol.
43+
///
44+
/// 1. The `HTTPConnectionPool` tries to find an idle connection for the request based on its
45+
/// `eventLoopPreference`.
46+
///
47+
/// 2. If an idle connection is available to the request, the request will be passed to the
48+
/// connection right away. In this case the `HTTPConnectionPool` will only use the
49+
/// `HTTPScheduledRequest`'s `eventLoopPreference` property. No other methods will be called.
50+
///
51+
/// 3. If no idle connection is available to the request, the request will be queued for execution:
52+
/// - The `HTTPConnectionPool` will inform the request that it is queued for execution by
53+
/// calling: `requestWasQueued(_: HTTPRequestScheduler)`. The request must store a reference
54+
/// to the `HTTPRequestScheduler`. The request must call `cancelRequest(self)` on the
55+
/// scheduler, if the request was cancelled, while waiting for execution.
56+
/// - The `HTTPConnectionPool` will create a connection deadline based on the
57+
/// `HTTPScheduledRequest`'s `connectionDeadline` property. If a connection to execute the
58+
/// request on, was not found before this deadline the request will be failed.
59+
/// - The HTTPConnectionPool will call `fail(_: Error)` on the `HTTPScheduledRequest` to
60+
/// inform the request about having overrun the `connectionDeadline`.
61+
///
62+
///
63+
/// ## Request is executing
64+
///
65+
/// After the `HTTPConnectionPool` has identified a connection for the request to be run on, it will
66+
/// execute the request on this connection. (Implementation detail: This happens by writing the
67+
/// `HTTPExecutingRequest` to a `NIO.Channel`. We expect the last handler in the `ChannelPipeline`
68+
/// to have an `OutboundIn` type of `HTTPExecutingRequest`. Further we expect that the handler
69+
/// also conforms to the protocol `HTTPRequestExecutor` to allow communication of the request with
70+
/// the executor/`ChannelHandler`).
71+
///
72+
/// The request execution will work as follows:
73+
///
74+
/// 1. The request executor will call `willExecuteRequest(_: HTTPRequestExecutor)` on the
75+
/// request. The request is expected to keep a reference to the `HTTPRequestExecutor` that was
76+
/// passed to the request for further communication.
77+
/// 2. The request sending is started by the executor accessing the `HTTPExecutingRequest`'s
78+
/// property `requestHead: HTTPRequestHead`. Based on the `requestHead` the executor can
79+
/// determine if the request has a body (Is a "content-length" or "transfer-encoding"
80+
/// header present?).
81+
/// 3. The executor will write the request's header into the Channel. If no body is present, the
82+
/// executor will also write a request end into the Channel. After this the executor will call
83+
/// `requestHeadSent(_: HTTPRequestHead)`
84+
/// 4. If the request has a body the request executor will, ask the request for body data, by
85+
/// calling `startRequestBodyStream()`. The request is expected to call
86+
/// `writeRequestBodyPart(_: IOData, task: HTTPExecutingRequest)` on the executor with body
87+
/// data.
88+
/// - The executor can signal backpressure to the request by calling
89+
/// `pauseRequestBodyStream()`. In this case the request is expected to stop calling
90+
/// `writeRequestBodyPart(_: IOData, task: HTTPExecutingRequest)`. However because of race
91+
/// conditions the executor is prepared to process more data, even though it asked the
92+
/// request to pause.
93+
/// - Once the executor is able to send more data, it will notify the request by calling
94+
/// `resumeRequestBodyStream()` on the request.
95+
/// - The request shall call `finishRequestBodyStream()` on the executor to signal that the
96+
/// request body was sent.
97+
/// 5. Once the executor receives a http response from the Channel, it will forward the http
98+
/// response head to the `HTTPExecutingRequest` by calling `receiveResponseHead` on it.
99+
/// - The executor will forward all the response body parts it receives in a single read to
100+
/// the `HTTPExecutingRequest` without any buffering by calling
101+
/// `receiveResponseBodyPart(_ buffer: ByteBuffer)` right away. It is the task's job to
102+
/// buffer the responses for user consumption.
103+
/// - Once the executor has finished a read, it will not schedule another read, until the
104+
/// request calls `demandResponseBodyStream(task: HTTPExecutingRequest)` on the executor.
105+
/// - Once the executor has received the response's end, it will forward this message by
106+
/// calling `receiveResponseEnd()` on the `HTTPExecutingRequest`.
107+
/// 6. If a channel error occurs during the execution of the request, or if the channel becomes
108+
/// inactive the executor will notify the request by calling `fail(_ error: Error)` on it.
109+
/// 7. If the request is cancelled, while it is executing on the executor, it must call
110+
/// `cancelRequest(task: HTTPExecutingRequest)` on the executor.
111+
///
112+
///
113+
/// ## Further notes
114+
///
115+
/// - These protocols makes no guarantees about thread safety at all. It is implementations job to
116+
/// ensure thread safety.
117+
/// - However all calls to the `HTTPRequestScheduler` and `HTTPRequestExecutor` require that the
118+
/// invoking request is passed along. This helps the scheduler and executor in race conditions.
119+
/// Example:
120+
/// - The executor may have received an error in thread A that it passes along to the request.
121+
/// After having passed on the error, the executor considers the request done and releases
122+
/// the request's reference.
123+
/// - The request may issue a call to `writeRequestBodyPart(_: IOData, task: HTTPExecutingRequest)`
124+
/// on thread B in the same moment the request error above occurred. For this reason it may
125+
/// happen that the executor receives, the invocation of `writeRequestBodyPart` after it has
126+
/// failed the request.
127+
/// Passing along the requests reference helps the executor and scheduler verify its internal
128+
/// state.
129+
130+
/// A handle to the request scheduler.
131+
///
132+
/// Use this handle to cancel the request, while it is waiting for a free connection, to execute the request.
133+
/// This protocol is only intended to be implemented by the `HTTPConnectionPool`.
134+
protocol HTTPRequestScheduler {
135+
/// Informs the task queuer that a request has been cancelled.
136+
func cancelRequest(_: HTTPScheduledRequest)
137+
}
138+
139+
/// An abstraction over a request that we want to send. A request may need to communicate with its request
140+
/// queuer and executor. The client's methods will be called synchronously on an `EventLoop` by the
141+
/// executor. For this reason it is very important that the implementation of these functions never blocks.
142+
protocol HTTPScheduledRequest: AnyObject {
143+
/// The task's logger
144+
var logger: Logger { get }
145+
146+
/// A connection to run this task on needs to be found before this deadline!
147+
var connectionDeadline: NIODeadline { get }
148+
149+
/// The task's `EventLoop` preference
150+
var eventLoopPreference: HTTPClient.EventLoopPreference { get }
151+
152+
/// Informs the task, that it was queued for execution
153+
///
154+
/// This happens if all available connections are currently in use
155+
func requestWasQueued(_: HTTPRequestScheduler)
156+
157+
/// Fails the queued request, with an error.
158+
func fail(_ error: Error)
159+
}
160+
161+
/// A handle to the request executor.
162+
///
163+
/// This protocol is implemented by the `HTTP1ClientChannelHandler`.
164+
protocol HTTPRequestExecutor {
165+
/// Writes a body part into the channel pipeline
166+
///
167+
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
168+
func writeRequestBodyPart(_: IOData, request: HTTPExecutingRequest)
169+
170+
/// Signals that the request body stream has finished
171+
///
172+
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
173+
func finishRequestBodyStream(_ task: HTTPExecutingRequest)
174+
175+
/// Signals that more bytes from response body stream can be consumed.
176+
///
177+
/// The request executor will call `receiveResponseBodyPart(_ buffer: ByteBuffer)` with more data after
178+
/// this call.
179+
///
180+
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
181+
func demandResponseBodyStream(_ task: HTTPExecutingRequest)
182+
183+
/// Signals that the request has been cancelled.
184+
///
185+
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
186+
func cancelRequest(_ task: HTTPExecutingRequest)
187+
}
188+
189+
protocol HTTPExecutingRequest: AnyObject {
190+
/// The request's head.
191+
///
192+
/// Based on the content of the request head the task executor will call `startRequestBodyStream`
193+
/// after `requestHeadSent` was called.
194+
var requestHead: HTTPRequestHead { get }
195+
196+
/// The maximal `TimeAmount` that is allowed to pass between `channelRead`s from the Channel.
197+
var idleReadTimeout: TimeAmount? { get }
198+
199+
/// Will be called by the ChannelHandler to indicate that the request is going to be sent.
200+
///
201+
/// This will be called on the Channel's EventLoop. Do **not block** during your execution! If the
202+
/// request is cancelled after the `willExecuteRequest` method was called. The executing
203+
/// request must call `executor.cancel()` to stop request execution.
204+
func willExecuteRequest(_: HTTPRequestExecutor)
205+
206+
/// Will be called by the ChannelHandler to indicate that the request head has been sent.
207+
///
208+
/// This will be called on the Channel's EventLoop. Do **not block** during your execution!
209+
func requestHeadSent()
210+
211+
/// Start or resume request body streaming
212+
///
213+
/// This will be called on the Channel's EventLoop. Do **not block** during your execution!
214+
func resumeRequestBodyStream()
215+
216+
/// Pause request streaming
217+
///
218+
/// This will be called on the Channel's EventLoop. Do **not block** during your execution!
219+
func pauseRequestBodyStream()
220+
221+
/// Receive a response head.
222+
///
223+
/// Please note that `receiveResponseHead` and `receiveResponseBodyPart` may
224+
/// be called in quick succession. It is the task's job to buffer those events for the user. Once all
225+
/// buffered data has been consumed the task must call `executor.demandResponseBodyStream`
226+
/// to ask for more data.
227+
func receiveResponseHead(_ head: HTTPResponseHead)
228+
229+
/// Receive response body stream parts.
230+
///
231+
/// Please note that `receiveResponseHead` and `receiveResponseBodyPart` may
232+
/// be called in quick succession. It is the task's job to buffer those events for the user. Once all
233+
/// buffered data has been consumed the task must call `executor.demandResponseBodyStream`
234+
/// to ask for more data.
235+
func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>)
236+
237+
/// Succeeds the executing request. The executor will not call any further methods on the request after this method.
238+
///
239+
/// - Parameter buffer: The remaining response body parts, that were received before the request end
240+
func succeedRequest(_ buffer: CircularBuffer<ByteBuffer>?)
241+
242+
/// Fails the executing request, with an error.
243+
func fail(_ error: Error)
244+
}

Diff for: Sources/AsyncHTTPClient/HTTPClient.swift

+5
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
923923
case httpProxyHandshakeTimeout
924924
case tlsHandshakeTimeout
925925
case serverOfferedUnsupportedApplicationProtocol(String)
926+
case requestStreamCancelled
926927
}
927928

928929
private var code: Code
@@ -991,4 +992,8 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
991992
public static func serverOfferedUnsupportedApplicationProtocol(_ proto: String) -> HTTPClientError {
992993
return HTTPClientError(code: .serverOfferedUnsupportedApplicationProtocol(proto))
993994
}
995+
996+
/// The remote server responded with a status code >= 300, before the full request was sent. The request stream
997+
/// was therefore cancelled
998+
public static let requestStreamCancelled = HTTPClientError(code: .requestStreamCancelled)
994999
}

Diff for: Sources/AsyncHTTPClient/HTTPHandler.swift

+4
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,10 @@ extension URL {
624624
}
625625
}
626626

627+
protocol HTTPClientTaskDelegate {
628+
func cancel()
629+
}
630+
627631
extension HTTPClient {
628632
/// Response execution context. Will be created by the library and could be used for obtaining
629633
/// `EventLoopFuture<Response>` of the execution or cancellation of the execution.

0 commit comments

Comments
 (0)