Skip to content

Commit 50b2980

Browse files
glbrnttpinlin168
authored andcommitted
Avoid copies of large payloads (grpc#1529)
Motivation: Messages are prefixed with a 5-byte header. Currently messages of all sizes are written into a new buffer with their header. For smaller payloads this is good: we avoid the extra allocations associated with creating HTTP/2 frames. For large payloads the cost of the copy outweighs the cost of extra allocations. Modifications: - For messages larger than 8KB emit an extra HTTP/2 DATA frame containing just the message header. Result: Better performance for large payloads.
1 parent dc78cdf commit 50b2980

10 files changed

+250
-256
lines changed

.github/workflows/ci.yaml

+4-4
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
include:
5757
- image: swiftlang/swift:nightly-focal
5858
env:
59-
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 428000
59+
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 392000
6060
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 176000
6161
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
6262
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
@@ -66,7 +66,7 @@ jobs:
6666
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 181000
6767
- image: swift:5.7-jammy
6868
env:
69-
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 428000
69+
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 392000
7070
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 176000
7171
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
7272
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
@@ -76,7 +76,7 @@ jobs:
7676
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 181000
7777
- image: swift:5.6-focal
7878
env:
79-
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 429000
79+
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 393000
8080
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 177000
8181
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
8282
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000
@@ -86,7 +86,7 @@ jobs:
8686
MAX_ALLOCS_ALLOWED_unary_1k_ping_pong_interceptors_server: 182000
8787
- image: swift:5.5-focal
8888
env:
89-
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 459000
89+
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_10_requests: 423000
9090
MAX_ALLOCS_ALLOWED_bidi_1k_rpcs_1_request: 189000
9191
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_10_small_requests: 112000
9292
MAX_ALLOCS_ALLOWED_embedded_server_bidi_1k_rpcs_1_small_request: 67000

Sources/GRPC/GRPCClientChannelHandler.swift

+20-7
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,10 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
502502
switch self.unwrapOutboundIn(data) {
503503
case let .head(requestHead):
504504
// Feed the request into the state machine:
505-
switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
505+
switch self.stateMachine.sendRequestHeaders(
506+
requestHead: requestHead,
507+
allocator: context.channel.allocator
508+
) {
506509
case let .success(headers):
507510
// We're clear to write some headers. Create an appropriate frame and write it.
508511
let framePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
@@ -526,19 +529,29 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
526529
// Feed the request message into the state machine:
527530
let result = self.stateMachine.sendRequest(
528531
request.message,
529-
compressed: request.compressed,
530-
allocator: context.channel.allocator
532+
compressed: request.compressed
531533
)
532534
switch result {
533-
case let .success(buffer):
534-
// We're clear to send a message; wrap it up in an HTTP/2 frame.
535-
let framePayload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
535+
case let .success((buffer, maybeBuffer)):
536+
let frame1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
536537
self.logger.trace("writing HTTP2 frame", metadata: [
537538
MetadataKey.h2Payload: "DATA",
538539
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
539540
MetadataKey.h2EndStream: "false",
540541
])
541-
context.write(self.wrapOutboundOut(framePayload), promise: promise)
542+
// If there's a second buffer, attach the promise to the second write.
543+
let promise1 = maybeBuffer == nil ? promise : nil
544+
context.write(self.wrapOutboundOut(frame1), promise: promise1)
545+
546+
if let actuallyBuffer = maybeBuffer {
547+
let frame2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
548+
self.logger.trace("writing HTTP2 frame", metadata: [
549+
MetadataKey.h2Payload: "DATA",
550+
MetadataKey.h2DataBytes: "\(actuallyBuffer.readableBytes)",
551+
MetadataKey.h2EndStream: "false",
552+
])
553+
context.write(self.wrapOutboundOut(frame2), promise: promise)
554+
}
542555

543556
case let .failure(writeError):
544557
switch writeError {

Sources/GRPC/GRPCClientStateMachine.swift

+17-14
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,11 @@ struct GRPCClientStateMachine {
165165
///
166166
/// - Parameter requestHead: The client request head for the RPC.
167167
mutating func sendRequestHeaders(
168-
requestHead: _GRPCRequestHead
168+
requestHead: _GRPCRequestHead,
169+
allocator: ByteBufferAllocator
169170
) -> Result<HPACKHeaders, SendRequestHeadersError> {
170171
return self.withStateAvoidingCoWs { state in
171-
state.sendRequestHeaders(requestHead: requestHead)
172+
state.sendRequestHeaders(requestHead: requestHead, allocator: allocator)
172173
}
173174
}
174175

@@ -195,11 +196,10 @@ struct GRPCClientStateMachine {
195196
/// request will be written.
196197
mutating func sendRequest(
197198
_ message: ByteBuffer,
198-
compressed: Bool,
199-
allocator: ByteBufferAllocator
200-
) -> Result<ByteBuffer, MessageWriteError> {
199+
compressed: Bool
200+
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
201201
return self.withStateAvoidingCoWs { state in
202-
state.sendRequest(message, compressed: compressed, allocator: allocator)
202+
state.sendRequest(message, compressed: compressed)
203203
}
204204
}
205205

@@ -351,7 +351,8 @@ struct GRPCClientStateMachine {
351351
extension GRPCClientStateMachine.State {
352352
/// See `GRPCClientStateMachine.sendRequestHeaders(requestHead:)`.
353353
mutating func sendRequestHeaders(
354-
requestHead: _GRPCRequestHead
354+
requestHead: _GRPCRequestHead,
355+
allocator: ByteBufferAllocator
355356
) -> Result<HPACKHeaders, SendRequestHeadersError> {
356357
let result: Result<HPACKHeaders, SendRequestHeadersError>
357358

@@ -369,7 +370,10 @@ extension GRPCClientStateMachine.State {
369370
result = .success(headers)
370371

371372
self = .clientActiveServerIdle(
372-
writeState: pendingWriteState.makeWriteState(messageEncoding: requestHead.encoding),
373+
writeState: pendingWriteState.makeWriteState(
374+
messageEncoding: requestHead.encoding,
375+
allocator: allocator
376+
),
373377
pendingReadState: .init(arity: responseArity, messageEncoding: requestHead.encoding)
374378
)
375379

@@ -390,18 +394,17 @@ extension GRPCClientStateMachine.State {
390394
/// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
391395
mutating func sendRequest(
392396
_ message: ByteBuffer,
393-
compressed: Bool,
394-
allocator: ByteBufferAllocator
395-
) -> Result<ByteBuffer, MessageWriteError> {
396-
let result: Result<ByteBuffer, MessageWriteError>
397+
compressed: Bool
398+
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
399+
let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError>
397400

398401
switch self {
399402
case .clientActiveServerIdle(var writeState, let pendingReadState):
400-
result = writeState.write(message, compressed: compressed, allocator: allocator)
403+
result = writeState.write(message, compressed: compressed)
401404
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
402405

403406
case .clientActiveServerActive(var writeState, let readState):
404-
result = writeState.write(message, compressed: compressed, allocator: allocator)
407+
result = writeState.write(message, compressed: compressed)
405408
self = .clientActiveServerActive(writeState: writeState, readState: readState)
406409

407410
case .clientClosedServerIdle,

Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift

+10-3
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,16 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
295295
)
296296

297297
switch writeBuffer {
298-
case let .success(buffer):
299-
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
300-
self.context.write(self.wrapOutboundOut(payload), promise: promise)
298+
case let .success((buffer, maybeBuffer)):
299+
if let actuallyBuffer = maybeBuffer {
300+
let payload1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
301+
self.context.write(self.wrapOutboundOut(payload1), promise: nil)
302+
let payload2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
303+
self.context.write(self.wrapOutboundOut(payload2), promise: promise)
304+
} else {
305+
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
306+
self.context.write(self.wrapOutboundOut(payload), promise: promise)
307+
}
301308
if metadata.flush {
302309
self.markFlushPoint()
303310
}

Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift

+34-30
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,11 @@ extension HTTP2ToRawGRPCStateMachine.State {
303303
}
304304

305305
// Figure out which encoding we should use for responses.
306-
let (writer, responseEncoding) = self.extractResponseEncoding(from: headers, encoding: encoding)
306+
let (writer, responseEncoding) = self.extractResponseEncoding(
307+
from: headers,
308+
encoding: encoding,
309+
allocator: allocator
310+
)
307311

308312
// Parse the path, and create a call handler.
309313
guard let path = headers.first(name: ":path") else {
@@ -516,7 +520,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
516520
/// - Returns: A message writer and the response encoding header to send back to the client.
517521
private func extractResponseEncoding(
518522
from headers: HPACKHeaders,
519-
encoding: ServerMessageEncoding
523+
encoding: ServerMessageEncoding,
524+
allocator: ByteBufferAllocator
520525
) -> (LengthPrefixedMessageWriter, String?) {
521526
let writer: LengthPrefixedMessageWriter
522527
let responseEncoding: String?
@@ -534,12 +539,12 @@ extension HTTP2ToRawGRPCStateMachine.State {
534539
configuration.enabledAlgorithms.contains($0)
535540
}
536541

537-
writer = LengthPrefixedMessageWriter(compression: algorithm)
542+
writer = LengthPrefixedMessageWriter(compression: algorithm, allocator: allocator)
538543
responseEncoding = algorithm?.name
539544

540545
case .disabled:
541546
// The server doesn't have compression enabled.
542-
writer = LengthPrefixedMessageWriter(compression: .none)
547+
writer = LengthPrefixedMessageWriter(compression: .none, allocator: allocator)
543548
responseEncoding = nil
544549
}
545550

@@ -642,44 +647,39 @@ extension HTTP2ToRawGRPCStateMachine {
642647
static func writeGRPCFramedMessage(
643648
_ buffer: ByteBuffer,
644649
compress: Bool,
645-
allocator: ByteBufferAllocator,
646-
writer: LengthPrefixedMessageWriter
647-
) -> Result<ByteBuffer, Error> {
650+
writer: inout LengthPrefixedMessageWriter
651+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
648652
do {
649-
let prefixed = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
650-
return .success(prefixed)
653+
let buffers = try writer.write(buffer: buffer, compressed: compress)
654+
return .success(buffers)
651655
} catch {
652656
return .failure(error)
653657
}
654658
}
655659
}
656660

657661
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
658-
func send(
662+
mutating func send(
659663
buffer: ByteBuffer,
660-
allocator: ByteBufferAllocator,
661664
compress: Bool
662-
) -> Result<ByteBuffer, Error> {
665+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
663666
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
664667
buffer,
665668
compress: compress,
666-
allocator: allocator,
667-
writer: self.writer
669+
writer: &self.writer
668670
)
669671
}
670672
}
671673

672674
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
673-
func send(
675+
mutating func send(
674676
buffer: ByteBuffer,
675-
allocator: ByteBufferAllocator,
676677
compress: Bool
677-
) -> Result<ByteBuffer, Error> {
678+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
678679
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
679680
buffer,
680681
compress: compress,
681-
allocator: allocator,
682-
writer: self.writer
682+
writer: &self.writer
683683
)
684684
}
685685
}
@@ -903,12 +903,14 @@ extension HTTP2ToRawGRPCStateMachine {
903903
}
904904

905905
/// Send a response buffer.
906-
func send(
906+
mutating func send(
907907
buffer: ByteBuffer,
908908
allocator: ByteBufferAllocator,
909909
compress: Bool
910-
) -> Result<ByteBuffer, Error> {
911-
return self.state.send(buffer: buffer, allocator: allocator, compress: compress)
910+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
911+
return self.withStateAvoidingCoWs { state in
912+
state.send(buffer: buffer, allocator: allocator, compress: compress)
913+
}
912914
}
913915

914916
/// Send status and trailers.
@@ -1115,11 +1117,11 @@ extension HTTP2ToRawGRPCStateMachine.State {
11151117
}
11161118
}
11171119

1118-
func send(
1120+
mutating func send(
11191121
buffer: ByteBuffer,
11201122
allocator: ByteBufferAllocator,
11211123
compress: Bool
1122-
) -> Result<ByteBuffer, Error> {
1124+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
11231125
switch self {
11241126
case .requestIdleResponseIdle:
11251127
preconditionFailure("Invalid state: the request stream is still closed")
@@ -1129,19 +1131,21 @@ extension HTTP2ToRawGRPCStateMachine.State {
11291131
let error = GRPCError.InvalidState("Response headers must be sent before response message")
11301132
return .failure(error)
11311133

1132-
case let .requestOpenResponseOpen(state):
1133-
return state.send(
1134+
case var .requestOpenResponseOpen(state):
1135+
let result = state.send(
11341136
buffer: buffer,
1135-
allocator: allocator,
11361137
compress: compress
11371138
)
1139+
self = .requestOpenResponseOpen(state)
1140+
return result
11381141

1139-
case let .requestClosedResponseOpen(state):
1140-
return state.send(
1142+
case var .requestClosedResponseOpen(state):
1143+
let result = state.send(
11411144
buffer: buffer,
1142-
allocator: allocator,
11431145
compress: compress
11441146
)
1147+
self = .requestClosedResponseOpen(state)
1148+
return result
11451149

11461150
case .requestOpenResponseClosed,
11471151
.requestClosedResponseClosed:

0 commit comments

Comments
 (0)