-
Notifications
You must be signed in to change notification settings - Fork 125
Make the ResponseAccumulator Sendable #838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -538,8 +538,12 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { | |
} | ||
} | ||
|
||
var history = [HTTPClient.RequestResponse]() | ||
var state = State.idle | ||
private struct MutableState: Sendable { | ||
var history = [HTTPClient.RequestResponse]() | ||
var state = State.idle | ||
} | ||
|
||
private let state: NIOLockedValueBox<MutableState> | ||
let requestMethod: HTTPMethod | ||
let requestHost: String | ||
|
||
|
@@ -573,107 +577,118 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { | |
self.requestMethod = request.method | ||
self.requestHost = request.host | ||
self.maxBodySize = maxBodySize | ||
self.state = NIOLockedValueBox(MutableState()) | ||
} | ||
|
||
public func didVisitURL( | ||
task: HTTPClient.Task<HTTPClient.Response>, | ||
_ request: HTTPClient.Request, | ||
_ head: HTTPResponseHead | ||
) { | ||
self.history.append(.init(request: request, responseHead: head)) | ||
self.state.withLockedValue { | ||
$0.history.append(.init(request: request, responseHead: head)) | ||
} | ||
} | ||
|
||
public func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> { | ||
switch self.state { | ||
case .idle: | ||
if self.requestMethod != .HEAD, | ||
let contentLength = head.headers.first(name: "Content-Length"), | ||
let announcedBodySize = Int(contentLength), | ||
announcedBodySize > self.maxBodySize | ||
{ | ||
let error = ResponseTooBigError(maxBodySize: maxBodySize) | ||
self.state = .error(error) | ||
return task.eventLoop.makeFailedFuture(error) | ||
} | ||
self.state.withLockedValue { | ||
switch $0.state { | ||
case .idle: | ||
if self.requestMethod != .HEAD, | ||
let contentLength = head.headers.first(name: "Content-Length"), | ||
let announcedBodySize = Int(contentLength), | ||
announcedBodySize > self.maxBodySize | ||
{ | ||
let error = ResponseTooBigError(maxBodySize: maxBodySize) | ||
$0.state = .error(error) | ||
return task.eventLoop.makeFailedFuture(error) | ||
} | ||
|
||
self.state = .head(head) | ||
case .head: | ||
preconditionFailure("head already set") | ||
case .body: | ||
preconditionFailure("no head received before body") | ||
case .end: | ||
preconditionFailure("request already processed") | ||
case .error: | ||
break | ||
$0.state = .head(head) | ||
case .head: | ||
preconditionFailure("head already set") | ||
case .body: | ||
preconditionFailure("no head received before body") | ||
case .end: | ||
preconditionFailure("request already processed") | ||
case .error: | ||
break | ||
} | ||
return task.eventLoop.makeSucceededFuture(()) | ||
} | ||
return task.eventLoop.makeSucceededFuture(()) | ||
} | ||
|
||
public func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ part: ByteBuffer) -> EventLoopFuture<Void> { | ||
switch self.state { | ||
case .idle: | ||
preconditionFailure("no head received before body") | ||
case .head(let head): | ||
guard part.readableBytes <= self.maxBodySize else { | ||
let error = ResponseTooBigError(maxBodySize: self.maxBodySize) | ||
self.state = .error(error) | ||
return task.eventLoop.makeFailedFuture(error) | ||
} | ||
self.state = .body(head, part) | ||
case .body(let head, var body): | ||
let newBufferSize = body.writerIndex + part.readableBytes | ||
guard newBufferSize <= self.maxBodySize else { | ||
let error = ResponseTooBigError(maxBodySize: self.maxBodySize) | ||
self.state = .error(error) | ||
return task.eventLoop.makeFailedFuture(error) | ||
} | ||
self.state.withLockedValue { | ||
switch $0.state { | ||
case .idle: | ||
preconditionFailure("no head received before body") | ||
case .head(let head): | ||
guard part.readableBytes <= self.maxBodySize else { | ||
let error = ResponseTooBigError(maxBodySize: self.maxBodySize) | ||
$0.state = .error(error) | ||
return task.eventLoop.makeFailedFuture(error) | ||
} | ||
$0.state = .body(head, part) | ||
case .body(let head, var body): | ||
let newBufferSize = body.writerIndex + part.readableBytes | ||
guard newBufferSize <= self.maxBodySize else { | ||
let error = ResponseTooBigError(maxBodySize: self.maxBodySize) | ||
$0.state = .error(error) | ||
return task.eventLoop.makeFailedFuture(error) | ||
} | ||
|
||
// The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's | ||
// a cross-module call in the way) so we need to drop the original reference to `body` in | ||
// `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which | ||
// has no associated data). We'll fix it at the bottom of this block. | ||
self.state = .end | ||
var part = part | ||
body.writeBuffer(&part) | ||
self.state = .body(head, body) | ||
case .end: | ||
preconditionFailure("request already processed") | ||
case .error: | ||
break | ||
// The compiler can't prove that `self.state` is dead here (and it kinda isn't, there's | ||
// a cross-module call in the way) so we need to drop the original reference to `body` in | ||
// `self.state` or we'll get a CoW. To fix that we temporarily set the state to `.end` (which | ||
// has no associated data). We'll fix it at the bottom of this block. | ||
$0.state = .end | ||
var part = part | ||
body.writeBuffer(&part) | ||
$0.state = .body(head, body) | ||
case .end: | ||
preconditionFailure("request already processed") | ||
case .error: | ||
break | ||
} | ||
return task.eventLoop.makeSucceededFuture(()) | ||
} | ||
return task.eventLoop.makeSucceededFuture(()) | ||
} | ||
|
||
public func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) { | ||
self.state = .error(error) | ||
self.state.withLockedValue { | ||
$0.state = .error(error) | ||
} | ||
} | ||
|
||
public func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Response { | ||
switch self.state { | ||
case .idle: | ||
preconditionFailure("no head received before end") | ||
case .head(let head): | ||
return Response( | ||
host: self.requestHost, | ||
status: head.status, | ||
version: head.version, | ||
headers: head.headers, | ||
body: nil, | ||
history: self.history | ||
) | ||
case .body(let head, let body): | ||
return Response( | ||
host: self.requestHost, | ||
status: head.status, | ||
version: head.version, | ||
headers: head.headers, | ||
body: body, | ||
history: self.history | ||
) | ||
case .end: | ||
preconditionFailure("request already processed") | ||
case .error(let error): | ||
throw error | ||
try self.state.withLockedValue { | ||
switch $0.state { | ||
case .idle: | ||
preconditionFailure("no head received before end") | ||
case .head(let head): | ||
return Response( | ||
host: self.requestHost, | ||
status: head.status, | ||
version: head.version, | ||
headers: head.headers, | ||
body: nil, | ||
history: $0.history | ||
) | ||
case .body(let head, let body): | ||
return Response( | ||
host: self.requestHost, | ||
status: head.status, | ||
version: head.version, | ||
headers: head.headers, | ||
body: body, | ||
history: $0.history | ||
) | ||
case .end: | ||
preconditionFailure("request already processed") | ||
case .error(let error): | ||
throw error | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -709,8 +724,9 @@ public final class ResponseAccumulator: HTTPClientResponseDelegate { | |
/// released together with the `HTTPTaskHandler` when channel is closed. | ||
/// Users of the library are not required to keep a reference to the | ||
/// object that implements this protocol, but may do so if needed. | ||
public protocol HTTPClientResponseDelegate: AnyObject { | ||
associatedtype Response | ||
@preconcurrency | ||
public protocol HTTPClientResponseDelegate: AnyObject, Sendable { | ||
associatedtype Response: Sendable | ||
|
||
/// Called when the request head is sent. Will be called once. | ||
/// | ||
|
@@ -885,7 +901,7 @@ extension URL { | |
} | ||
} | ||
|
||
protocol HTTPClientTaskDelegate { | ||
protocol HTTPClientTaskDelegate: Sendable { | ||
func fail(_ error: Error) | ||
} | ||
|
||
|
@@ -894,30 +910,35 @@ extension HTTPClient { | |
/// | ||
/// Will be created by the library and could be used for obtaining | ||
/// `EventLoopFuture<Response>` of the execution or cancellation of the execution. | ||
public final class Task<Response> { | ||
@preconcurrency | ||
public final class Task<Response: Sendable> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the unchecked Sendable still needed for this? Can it be checked? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, how sure are we that this constraint is needed? |
||
/// The `EventLoop` the delegate will be executed on. | ||
public let eventLoop: EventLoop | ||
/// The `Logger` used by the `Task` for logging. | ||
public let logger: Logger // We are okay to store the logger here because a Task is for only one request. | ||
|
||
let promise: EventLoopPromise<Response> | ||
|
||
struct State: Sendable { | ||
var isCancelled: Bool | ||
var taskDelegate: HTTPClientTaskDelegate? | ||
} | ||
|
||
private let state: NIOLockedValueBox<State> | ||
|
||
var isCancelled: Bool { | ||
self.lock.withLock { self._isCancelled } | ||
self.state.withLockedValue { $0.isCancelled } | ||
} | ||
|
||
var taskDelegate: HTTPClientTaskDelegate? { | ||
get { | ||
self.lock.withLock { self._taskDelegate } | ||
self.state.withLockedValue { $0.taskDelegate } | ||
} | ||
set { | ||
self.lock.withLock { self._taskDelegate = newValue } | ||
self.state.withLockedValue { $0.taskDelegate = newValue } | ||
} | ||
} | ||
|
||
private var _isCancelled: Bool = false | ||
private var _taskDelegate: HTTPClientTaskDelegate? | ||
private let lock = NIOLock() | ||
private let makeOrGetFileIOThreadPool: () -> NIOThreadPool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this needs an |
||
|
||
/// The shared thread pool of a ``HTTPClient`` used for file IO. It is lazily created on first access. | ||
|
@@ -930,6 +951,7 @@ extension HTTPClient { | |
self.promise = eventLoop.makePromise() | ||
self.logger = logger | ||
self.makeOrGetFileIOThreadPool = makeOrGetFileIOThreadPool | ||
self.state = NIOLockedValueBox(State(isCancelled: false, taskDelegate: nil)) | ||
} | ||
|
||
static func failedTask( | ||
|
@@ -957,7 +979,8 @@ extension HTTPClient { | |
/// - returns: The value of ``futureResult`` when it completes. | ||
/// - throws: The error value of ``futureResult`` if it errors. | ||
@available(*, noasync, message: "wait() can block indefinitely, prefer get()", renamed: "get()") | ||
public func wait() throws -> Response { | ||
@preconcurrency | ||
public func wait() throws -> Response where Response: Sendable { | ||
try self.promise.futureResult.wait() | ||
} | ||
|
||
|
@@ -968,7 +991,8 @@ extension HTTPClient { | |
/// - returns: The value of ``futureResult`` when it completes. | ||
/// - throws: The error value of ``futureResult`` if it errors. | ||
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) | ||
public func get() async throws -> Response { | ||
@preconcurrency | ||
public func get() async throws -> Response where Response: Sendable { | ||
Lukasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try await self.promise.futureResult.get() | ||
} | ||
|
||
|
@@ -985,9 +1009,9 @@ extension HTTPClient { | |
/// | ||
/// - Parameter error: the error that is used to fail the promise | ||
public func fail(reason error: Error) { | ||
let taskDelegate = self.lock.withLock { () -> HTTPClientTaskDelegate? in | ||
self._isCancelled = true | ||
return self._taskDelegate | ||
let taskDelegate = self.state.withLockedValue { state in | ||
state.isCancelled = true | ||
return state.taskDelegate | ||
} | ||
|
||
taskDelegate?.fail(error) | ||
|
@@ -1017,7 +1041,7 @@ internal struct TaskCancelEvent {} | |
|
||
// MARK: - RedirectHandler | ||
|
||
internal struct RedirectHandler<ResponseType> { | ||
internal struct RedirectHandler<ResponseType: Sendable> { | ||
let request: HTTPClient.Request | ||
let redirectState: RedirectState | ||
let execute: (HTTPClient.Request, RedirectState) -> HTTPClient.Task<ResponseType> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I suggest that we pull this computation out of the lock?