Skip to content

Avoid copies of large payloads #1529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
include:
- image: swiftlang/swift:nightly-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 428000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 392000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 176000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
Expand All @@ -66,7 +66,7 @@ jobs:
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 181000
- image: swift:5.7-jammy
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 428000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 392000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 176000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
Expand All @@ -76,7 +76,7 @@ jobs:
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 181000
- image: swift:5.6-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 429000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 393000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 177000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
Expand All @@ -86,7 +86,7 @@ jobs:
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 182000
- image: swift:5.5-focal
env:
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 459000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 423000
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 189000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
Expand Down
27 changes: 20 additions & 7 deletions Sources/GRPC/GRPCClientChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,10 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
switch self.unwrapOutboundIn(data) {
case let .head(requestHead):
// Feed the request into the state machine:
switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
switch self.stateMachine.sendRequestHeaders(
requestHead: requestHead,
allocator: context.channel.allocator
) {
case let .success(headers):
// We're clear to write some headers. Create an appropriate frame and write it.
let framePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
Expand All @@ -526,19 +529,29 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
// Feed the request message into the state machine:
let result = self.stateMachine.sendRequest(
request.message,
compressed: request.compressed,
allocator: context.channel.allocator
compressed: request.compressed
)
switch result {
case let .success(buffer):
// We're clear to send a message; wrap it up in an HTTP/2 frame.
let framePayload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
case let .success((buffer, maybeBuffer)):
let frame1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
context.write(self.wrapOutboundOut(framePayload), promise: promise)
// If there's a second buffer, attach the promise to the second write.
let promise1 = maybeBuffer == nil ? promise : nil
context.write(self.wrapOutboundOut(frame1), promise: promise1)

if let actuallyBuffer = maybeBuffer {
let frame2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
self.logger.trace("writing HTTP2 frame", metadata: [
MetadataKey.h2Payload: "DATA",
MetadataKey.h2DataBytes: "\(actuallyBuffer.readableBytes)",
MetadataKey.h2EndStream: "false",
])
context.write(self.wrapOutboundOut(frame2), promise: promise)
}

case let .failure(writeError):
switch writeError {
Expand Down
31 changes: 17 additions & 14 deletions Sources/GRPC/GRPCClientStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ struct GRPCClientStateMachine {
///
/// - Parameter requestHead: The client request head for the RPC.
mutating func sendRequestHeaders(
requestHead: _GRPCRequestHead
requestHead: _GRPCRequestHead,
allocator: ByteBufferAllocator
) -> Result<HPACKHeaders, SendRequestHeadersError> {
return self.withStateAvoidingCoWs { state in
state.sendRequestHeaders(requestHead: requestHead)
state.sendRequestHeaders(requestHead: requestHead, allocator: allocator)
}
}

Expand All @@ -195,11 +196,10 @@ struct GRPCClientStateMachine {
/// request will be written.
mutating func sendRequest(
_ message: ByteBuffer,
compressed: Bool,
allocator: ByteBufferAllocator
) -> Result<ByteBuffer, MessageWriteError> {
compressed: Bool
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
return self.withStateAvoidingCoWs { state in
state.sendRequest(message, compressed: compressed, allocator: allocator)
state.sendRequest(message, compressed: compressed)
}
}

Expand Down Expand Up @@ -351,7 +351,8 @@ struct GRPCClientStateMachine {
extension GRPCClientStateMachine.State {
/// See `GRPCClientStateMachine.sendRequestHeaders(requestHead:)`.
mutating func sendRequestHeaders(
requestHead: _GRPCRequestHead
requestHead: _GRPCRequestHead,
allocator: ByteBufferAllocator
) -> Result<HPACKHeaders, SendRequestHeadersError> {
let result: Result<HPACKHeaders, SendRequestHeadersError>

Expand All @@ -369,7 +370,10 @@ extension GRPCClientStateMachine.State {
result = .success(headers)

self = .clientActiveServerIdle(
writeState: pendingWriteState.makeWriteState(messageEncoding: requestHead.encoding),
writeState: pendingWriteState.makeWriteState(
messageEncoding: requestHead.encoding,
allocator: allocator
),
pendingReadState: .init(arity: responseArity, messageEncoding: requestHead.encoding)
)

Expand All @@ -390,18 +394,17 @@ extension GRPCClientStateMachine.State {
/// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
mutating func sendRequest(
_ message: ByteBuffer,
compressed: Bool,
allocator: ByteBufferAllocator
) -> Result<ByteBuffer, MessageWriteError> {
let result: Result<ByteBuffer, MessageWriteError>
compressed: Bool
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError>

switch self {
case .clientActiveServerIdle(var writeState, let pendingReadState):
result = writeState.write(message, compressed: compressed, allocator: allocator)
result = writeState.write(message, compressed: compressed)
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)

case .clientActiveServerActive(var writeState, let readState):
result = writeState.write(message, compressed: compressed, allocator: allocator)
result = writeState.write(message, compressed: compressed)
self = .clientActiveServerActive(writeState: writeState, readState: readState)

case .clientClosedServerIdle,
Expand Down
13 changes: 10 additions & 3 deletions Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,16 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
)

switch writeBuffer {
case let .success(buffer):
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
self.context.write(self.wrapOutboundOut(payload), promise: promise)
case let .success((buffer, maybeBuffer)):
if let actuallyBuffer = maybeBuffer {
let payload1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
self.context.write(self.wrapOutboundOut(payload1), promise: nil)
let payload2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
self.context.write(self.wrapOutboundOut(payload2), promise: promise)
} else {
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
self.context.write(self.wrapOutboundOut(payload), promise: promise)
}
if metadata.flush {
self.markFlushPoint()
}
Expand Down
64 changes: 34 additions & 30 deletions Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ extension HTTP2ToRawGRPCStateMachine.State {
}

// Figure out which encoding we should use for responses.
let (writer, responseEncoding) = self.extractResponseEncoding(from: headers, encoding: encoding)
let (writer, responseEncoding) = self.extractResponseEncoding(
from: headers,
encoding: encoding,
allocator: allocator
)

// Parse the path, and create a call handler.
guard let path = headers.first(name: ":path") else {
Expand Down Expand Up @@ -516,7 +520,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
/// - Returns: A message writer and the response encoding header to send back to the client.
private func extractResponseEncoding(
from headers: HPACKHeaders,
encoding: ServerMessageEncoding
encoding: ServerMessageEncoding,
allocator: ByteBufferAllocator
) -> (LengthPrefixedMessageWriter, String?) {
let writer: LengthPrefixedMessageWriter
let responseEncoding: String?
Expand All @@ -534,12 +539,12 @@ extension HTTP2ToRawGRPCStateMachine.State {
configuration.enabledAlgorithms.contains($0)
}

writer = LengthPrefixedMessageWriter(compression: algorithm)
writer = LengthPrefixedMessageWriter(compression: algorithm, allocator: allocator)
responseEncoding = algorithm?.name

case .disabled:
// The server doesn't have compression enabled.
writer = LengthPrefixedMessageWriter(compression: .none)
writer = LengthPrefixedMessageWriter(compression: .none, allocator: allocator)
responseEncoding = nil
}

Expand Down Expand Up @@ -642,44 +647,39 @@ extension HTTP2ToRawGRPCStateMachine {
static func writeGRPCFramedMessage(
_ buffer: ByteBuffer,
compress: Bool,
allocator: ByteBufferAllocator,
writer: LengthPrefixedMessageWriter
) -> Result<ByteBuffer, Error> {
writer: inout LengthPrefixedMessageWriter
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
do {
let prefixed = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
return .success(prefixed)
let buffers = try writer.write(buffer: buffer, compressed: compress)
return .success(buffers)
} catch {
return .failure(error)
}
}
}

extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
func send(
mutating func send(
buffer: ByteBuffer,
allocator: ByteBufferAllocator,
compress: Bool
) -> Result<ByteBuffer, Error> {
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
buffer,
compress: compress,
allocator: allocator,
writer: self.writer
writer: &self.writer
)
}
}

extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
func send(
mutating func send(
buffer: ByteBuffer,
allocator: ByteBufferAllocator,
compress: Bool
) -> Result<ByteBuffer, Error> {
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
buffer,
compress: compress,
allocator: allocator,
writer: self.writer
writer: &self.writer
)
}
}
Expand Down Expand Up @@ -903,12 +903,14 @@ extension HTTP2ToRawGRPCStateMachine {
}

/// Send a response buffer.
func send(
mutating func send(
buffer: ByteBuffer,
allocator: ByteBufferAllocator,
compress: Bool
) -> Result<ByteBuffer, Error> {
return self.state.send(buffer: buffer, allocator: allocator, compress: compress)
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
return self.withStateAvoidingCoWs { state in
state.send(buffer: buffer, allocator: allocator, compress: compress)
}
}

/// Send status and trailers.
Expand Down Expand Up @@ -1115,11 +1117,11 @@ extension HTTP2ToRawGRPCStateMachine.State {
}
}

func send(
mutating func send(
buffer: ByteBuffer,
allocator: ByteBufferAllocator,
compress: Bool
) -> Result<ByteBuffer, Error> {
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
switch self {
case .requestIdleResponseIdle:
preconditionFailure("Invalid state: the request stream is still closed")
Expand All @@ -1129,19 +1131,21 @@ extension HTTP2ToRawGRPCStateMachine.State {
let error = GRPCError.InvalidState("Response headers must be sent before response message")
return .failure(error)

case let .requestOpenResponseOpen(state):
return state.send(
case var .requestOpenResponseOpen(state):
let result = state.send(
buffer: buffer,
allocator: allocator,
compress: compress
)
self = .requestOpenResponseOpen(state)
return result

case let .requestClosedResponseOpen(state):
return state.send(
case var .requestClosedResponseOpen(state):
let result = state.send(
buffer: buffer,
allocator: allocator,
compress: compress
)
self = .requestClosedResponseOpen(state)
return result

case .requestOpenResponseClosed,
.requestClosedResponseClosed:
Expand Down
Loading