Skip to content

Commit 564f522

Browse files
committed
Avoid copies of large payloads
Motivation: Messages are prefixed with a 5-byte header. Currently messages of all sizes are written into a new buffer with their header. For smaller payloads this is good: we avoid the extra allocations associated with creating HTTP/2 frames. For large payloads the cost of the copy outweighs the cost of extra allocations. Modifications: - For messages larger than 8KB emit an extra HTTP/2 DATA frame containing just the message header. Result: Better performance for large payloads.
1 parent e49a331 commit 564f522

8 files changed

+130
-126
lines changed

Sources/GRPC/GRPCClientChannelHandler.swift

+15-4
Original file line numberDiff line numberDiff line change
@@ -530,15 +530,26 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
530530
allocator: context.channel.allocator
531531
)
532532
switch result {
533-
case let .success(buffer):
534-
// We're clear to send a message; wrap it up in an HTTP/2 frame.
535-
let framePayload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
533+
case let .success((buffer, maybeBuffer)):
534+
let frame1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
536535
self.logger.trace("writing HTTP2 frame", metadata: [
537536
MetadataKey.h2Payload: "DATA",
538537
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
539538
MetadataKey.h2EndStream: "false",
540539
])
541-
context.write(self.wrapOutboundOut(framePayload), promise: promise)
540+
// If there's a second buffer, attach the promise to the second write.
541+
let promise1 = maybeBuffer == nil ? promise : nil
542+
context.write(self.wrapOutboundOut(frame1), promise: promise1)
543+
544+
if let maybeBuffer = maybeBuffer {
545+
let frame2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(maybeBuffer)))
546+
self.logger.trace("writing HTTP2 frame", metadata: [
547+
MetadataKey.h2Payload: "DATA",
548+
MetadataKey.h2DataBytes: "\(maybeBuffer.readableBytes)",
549+
MetadataKey.h2EndStream: "false",
550+
])
551+
context.write(self.wrapOutboundOut(frame2), promise: promise)
552+
}
542553

543554
case let .failure(writeError):
544555
switch writeError {

Sources/GRPC/GRPCClientStateMachine.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ struct GRPCClientStateMachine {
197197
_ message: ByteBuffer,
198198
compressed: Bool,
199199
allocator: ByteBufferAllocator
200-
) -> Result<ByteBuffer, MessageWriteError> {
200+
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
201201
return self.withStateAvoidingCoWs { state in
202202
state.sendRequest(message, compressed: compressed, allocator: allocator)
203203
}
@@ -392,8 +392,8 @@ extension GRPCClientStateMachine.State {
392392
_ message: ByteBuffer,
393393
compressed: Bool,
394394
allocator: ByteBufferAllocator
395-
) -> Result<ByteBuffer, MessageWriteError> {
396-
let result: Result<ByteBuffer, MessageWriteError>
395+
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
396+
let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError>
397397

398398
switch self {
399399
case .clientActiveServerIdle(var writeState, let pendingReadState):

Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift

+10-3
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,16 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
295295
)
296296

297297
switch writeBuffer {
298-
case let .success(buffer):
299-
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
300-
self.context.write(self.wrapOutboundOut(payload), promise: promise)
298+
case let .success((buffer, maybeBuffer)):
299+
if let maybeBuffer = maybeBuffer {
300+
let payload1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
301+
self.context.write(self.wrapOutboundOut(payload1), promise: nil)
302+
let payload2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(maybeBuffer)))
303+
self.context.write(self.wrapOutboundOut(payload2), promise: promise)
304+
} else {
305+
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
306+
self.context.write(self.wrapOutboundOut(payload), promise: promise)
307+
}
301308
if metadata.flush {
302309
self.markFlushPoint()
303310
}

Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift

+7-7
Original file line numberDiff line numberDiff line change
@@ -644,10 +644,10 @@ extension HTTP2ToRawGRPCStateMachine {
644644
compress: Bool,
645645
allocator: ByteBufferAllocator,
646646
writer: LengthPrefixedMessageWriter
647-
) -> Result<ByteBuffer, Error> {
647+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
648648
do {
649-
let prefixed = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
650-
return .success(prefixed)
649+
let buffers = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
650+
return .success(buffers)
651651
} catch {
652652
return .failure(error)
653653
}
@@ -659,7 +659,7 @@ extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
659659
buffer: ByteBuffer,
660660
allocator: ByteBufferAllocator,
661661
compress: Bool
662-
) -> Result<ByteBuffer, Error> {
662+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
663663
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
664664
buffer,
665665
compress: compress,
@@ -674,7 +674,7 @@ extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
674674
buffer: ByteBuffer,
675675
allocator: ByteBufferAllocator,
676676
compress: Bool
677-
) -> Result<ByteBuffer, Error> {
677+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
678678
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
679679
buffer,
680680
compress: compress,
@@ -907,7 +907,7 @@ extension HTTP2ToRawGRPCStateMachine {
907907
buffer: ByteBuffer,
908908
allocator: ByteBufferAllocator,
909909
compress: Bool
910-
) -> Result<ByteBuffer, Error> {
910+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
911911
return self.state.send(buffer: buffer, allocator: allocator, compress: compress)
912912
}
913913

@@ -1119,7 +1119,7 @@ extension HTTP2ToRawGRPCStateMachine.State {
11191119
buffer: ByteBuffer,
11201120
allocator: ByteBufferAllocator,
11211121
compress: Bool
1122-
) -> Result<ByteBuffer, Error> {
1122+
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
11231123
switch self {
11241124
case .requestIdleResponseIdle:
11251125
preconditionFailure("Invalid state: the request stream is still closed")

Sources/GRPC/LengthPrefixedMessageWriter.swift

+29-86
Original file line numberDiff line numberDiff line change
@@ -86,99 +86,42 @@ internal struct LengthPrefixedMessageWriter {
8686
buffer: ByteBuffer,
8787
allocator: ByteBufferAllocator,
8888
compressed: Bool = true
89-
) throws -> ByteBuffer {
89+
) throws -> (ByteBuffer, ByteBuffer?) {
9090
if compressed, let compressor = self.compressor {
91-
return try self.compress(buffer: buffer, using: compressor, allocator: allocator)
92-
} else if buffer.readerIndex >= 5 {
93-
// We're not compressing and we have enough bytes before the reader index that we can write
94-
// over with the compression byte and length.
95-
var buffer = buffer
96-
97-
// Get the size of the message.
98-
let messageSize = buffer.readableBytes
99-
100-
// Move the reader index back 5 bytes. This is okay: we validated the `readerIndex` above.
101-
buffer.moveReaderIndex(to: buffer.readerIndex - 5)
102-
103-
// Fill in the compression byte and message length.
104-
buffer.setInteger(UInt8(0), at: buffer.readerIndex)
105-
buffer.setInteger(UInt32(messageSize), at: buffer.readerIndex + 1)
106-
107-
// The message bytes are already in place, we're done.
108-
return buffer
91+
let compressedAndFramedPayload = try self.compress(
92+
buffer: buffer,
93+
using: compressor,
94+
allocator: allocator
95+
)
96+
return (compressedAndFramedPayload, nil)
97+
} else if buffer.readableBytes > Self.singleBufferSizeLimit {
98+
// Buffer is larger than the limit for emitting a single buffer: create a second buffer
99+
// containing just the message header.
100+
var prefixed = allocator.buffer(capacity: 5)
101+
prefixed.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
102+
return (prefixed, buffer)
109103
} else {
110-
// We're not compressing and we don't have enough space before the message bytes passed in.
111-
// We need a new buffer.
112-
var lengthPrefixed = allocator.buffer(capacity: 5 + buffer.readableBytes)
113-
114-
// Write the compression byte.
115-
lengthPrefixed.writeInteger(UInt8(0))
116-
117-
// Write the message length.
118-
lengthPrefixed.writeInteger(UInt32(buffer.readableBytes))
119-
104+
// We're not compressing and the message is within our single buffer size limit.
105+
var lengthPrefixed = allocator.buffer(capacity: 5 &+ buffer.readableBytes)
106+
// Write the compression byte and message length.
107+
lengthPrefixed.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
120108
// Write the message.
121-
var buffer = buffer
122-
lengthPrefixed.writeBuffer(&buffer)
123-
124-
return lengthPrefixed
109+
lengthPrefixed.writeImmutableBuffer(buffer)
110+
return (lengthPrefixed, nil)
125111
}
126112
}
127113

128-
/// Writes the data into a `ByteBuffer` as a gRPC length-prefixed message.
114+
/// Message size above which we emit two buffers: one containing the header and one with the
115+
/// actual message bytes. At or below the limit we copy the message into a new buffer containing
116+
/// both the header and the message.
129117
///
130-
/// - Parameters:
131-
/// - payload: The payload to serialize and write.
132-
/// - buffer: The buffer to write the message into.
133-
/// - Returns: A `ByteBuffer` containing a gRPC length-prefixed message.
134-
/// - Precondition: `compression.supported` is `true`.
135-
/// - Note: See `LengthPrefixedMessageReader` for more details on the format.
136-
func write(
137-
_ payload: GRPCPayload,
138-
into buffer: inout ByteBuffer,
139-
compressed: Bool = true
140-
) throws {
141-
buffer.reserveCapacity(buffer.writerIndex + LengthPrefixedMessageWriter.metadataLength)
142-
143-
if compressed, let compressor = self.compressor {
144-
// Set the compression byte.
145-
buffer.writeInteger(UInt8(1))
146-
147-
// Leave a gap for the length, we'll set it in a moment.
148-
let payloadSizeIndex = buffer.writerIndex
149-
buffer.moveWriterIndex(forwardBy: MemoryLayout<UInt32>.size)
150-
151-
var messageBuf = ByteBufferAllocator().buffer(capacity: 0)
152-
try payload.serialize(into: &messageBuf)
153-
154-
// Compress the message.
155-
let bytesWritten = try compressor.deflate(&messageBuf, into: &buffer)
156-
157-
// Now fill in the message length.
158-
buffer.writePayloadLength(UInt32(bytesWritten), at: payloadSizeIndex)
159-
160-
// Finally, the compression context should be reset between messages.
161-
compressor.reset()
162-
} else {
163-
// We could be using 'identity' compression, but since the result is the same we'll just
164-
// say it isn't compressed.
165-
buffer.writeInteger(UInt8(0))
166-
167-
// Leave a gap for the length, we'll set it in a moment.
168-
let payloadSizeIndex = buffer.writerIndex
169-
buffer.moveWriterIndex(forwardBy: MemoryLayout<UInt32>.size)
170-
171-
let payloadPrefixedBytes = buffer.readableBytes
172-
// Writes the payload into the buffer
173-
try payload.serialize(into: &buffer)
174-
175-
// Calculates the Written bytes with respect to the prefixed ones
176-
let bytesWritten = buffer.readableBytes - payloadPrefixedBytes
177-
178-
// Write the message length.
179-
buffer.writePayloadLength(UInt32(bytesWritten), at: payloadSizeIndex)
180-
}
181-
}
118+
/// Using two buffers avoids expensive copies of large messages. For smaller messages the copy
119+
/// is cheaper than the additional allocations and overhead required to send an extra HTTP/2 DATA
120+
/// frame.
121+
///
122+
/// The value of 8192 was chosen empirically. We subtract the length of the message header
123+
/// as `ByteBuffer` reserve capacity in powers of two and want to avoid overallocating.
124+
private static let singleBufferSizeLimit = 8192 - 5
182125
}
183126

184127
extension ByteBuffer {

Sources/GRPC/ReadWriteStates.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,16 @@ enum WriteState {
6262
_ message: ByteBuffer,
6363
compressed: Bool,
6464
allocator: ByteBufferAllocator
65-
) -> Result<ByteBuffer, MessageWriteError> {
65+
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
6666
switch self {
6767
case .notWriting:
6868
return .failure(.cardinalityViolation)
6969

7070
case let .writing(writeArity, contentType, writer):
71-
let buffer: ByteBuffer
71+
let buffers: (ByteBuffer, ByteBuffer?)
7272

7373
do {
74-
buffer = try writer.write(buffer: message, allocator: allocator, compressed: compressed)
74+
buffers = try writer.write(buffer: message, allocator: allocator, compressed: compressed)
7575
} catch {
7676
self = .notWriting
7777
return .failure(.serializationFailed)
@@ -84,7 +84,7 @@ enum WriteState {
8484
self = .writing(writeArity, contentType, writer)
8585
}
8686

87-
return .success(buffer)
87+
return .success(buffers)
8888
}
8989
}
9090
}

Tests/GRPCTests/GRPCClientStateMachineTests.swift

+30-15
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,16 @@ class GRPCClientStateMachineTests: GRPCTestCase {
3939
let buffer = self.allocator.buffer(string: message)
4040

4141
let writer = LengthPrefixedMessageWriter(compression: .none)
42-
return try writer.write(buffer: buffer, allocator: self.allocator, compressed: false)
42+
var (buffer1, buffer2) = try writer.write(
43+
buffer: buffer,
44+
allocator: self.allocator,
45+
compressed: false
46+
)
47+
48+
if var buffer2 = buffer2 {
49+
buffer1.writeBuffer(&buffer2)
50+
}
51+
return buffer1
4352
}
4453

4554
/// Writes a message into the given `buffer`.
@@ -148,8 +157,9 @@ extension GRPCClientStateMachineTests {
148157
ByteBuffer(string: request),
149158
compressed: false,
150159
allocator: self.allocator
151-
).assertSuccess { buffer in
152-
var buffer = buffer
160+
).assertSuccess { buffers in
161+
var buffer = buffers.0
162+
XCTAssertNil(buffers.1)
153163
// Remove the length and compression flag prefix.
154164
buffer.moveReaderIndex(forwardBy: 5)
155165
let data = buffer.readString(length: buffer.readableBytes)!
@@ -1149,13 +1159,14 @@ class ReadStateTests: GRPCTestCase {
11491159
// Write a message into the buffer:
11501160
let message = ByteBuffer(string: "Hello!")
11511161
let writer = LengthPrefixedMessageWriter(compression: .none)
1152-
var buffer = try writer.write(buffer: message, allocator: self.allocator)
1162+
var buffers = try writer.write(buffer: message, allocator: self.allocator)
1163+
XCTAssertNil(buffers.1)
11531164
// And some extra junk bytes:
11541165
let bytes: [UInt8] = [0x00]
1155-
buffer.writeBytes(bytes)
1166+
buffers.0.writeBytes(bytes)
11561167

11571168
var state: ReadState = .one()
1158-
state.readMessages(&buffer, maxLength: .max).assertFailure {
1169+
state.readMessages(&buffers.0, maxLength: .max).assertFailure {
11591170
XCTAssertEqual($0, .leftOverBytes)
11601171
}
11611172
state.assertNotReading()
@@ -1165,12 +1176,14 @@ class ReadStateTests: GRPCTestCase {
11651176
// Write a message into the buffer twice:
11661177
let message = ByteBuffer(string: "Hello!")
11671178
let writer = LengthPrefixedMessageWriter(compression: .none)
1168-
var buffer = try writer.write(buffer: message, allocator: self.allocator)
1169-
var second = try writer.write(buffer: message, allocator: self.allocator)
1170-
buffer.writeBuffer(&second)
1179+
var buffers1 = try writer.write(buffer: message, allocator: self.allocator)
1180+
var buffers2 = try writer.write(buffer: message, allocator: self.allocator)
1181+
XCTAssertNil(buffers1.1)
1182+
XCTAssertNil(buffers2.1)
1183+
buffers1.0.writeBuffer(&buffers2.0)
11711184

11721185
var state: ReadState = .one()
1173-
state.readMessages(&buffer, maxLength: .max).assertFailure {
1186+
state.readMessages(&buffers1.0, maxLength: .max).assertFailure {
11741187
XCTAssertEqual($0, .cardinalityViolation)
11751188
}
11761189
state.assertNotReading()
@@ -1180,7 +1193,8 @@ class ReadStateTests: GRPCTestCase {
11801193
// Write a message into the buffer twice:
11811194
let message = ByteBuffer(string: "Hello!")
11821195
let writer = LengthPrefixedMessageWriter(compression: .none)
1183-
var buffer = try writer.write(buffer: message, allocator: self.allocator)
1196+
var (buffer, other) = try writer.write(buffer: message, allocator: self.allocator)
1197+
XCTAssertNil(other)
11841198

11851199
var state: ReadState = .one()
11861200
state.readMessages(&buffer, maxLength: .max).assertSuccess {
@@ -1195,7 +1209,8 @@ class ReadStateTests: GRPCTestCase {
11951209
// Write a message into the buffer twice:
11961210
let message = ByteBuffer(string: "Hello!")
11971211
let writer = LengthPrefixedMessageWriter(compression: .none)
1198-
var buffer = try writer.write(buffer: message, allocator: self.allocator)
1212+
var (buffer, other) = try writer.write(buffer: message, allocator: self.allocator)
1213+
XCTAssertNil(other)
11991214

12001215
var state: ReadState = .many()
12011216
state.readMessages(&buffer, maxLength: .max).assertSuccess {
@@ -1211,9 +1226,9 @@ class ReadStateTests: GRPCTestCase {
12111226
let message = ByteBuffer(string: "Hello!")
12121227
let writer = LengthPrefixedMessageWriter(compression: .none)
12131228

1214-
var first = try writer.write(buffer: message, allocator: self.allocator)
1215-
var second = try writer.write(buffer: message, allocator: self.allocator)
1216-
var third = try writer.write(buffer: message, allocator: self.allocator)
1229+
var (first, _) = try writer.write(buffer: message, allocator: self.allocator)
1230+
var (second, _) = try writer.write(buffer: message, allocator: self.allocator)
1231+
var (third, _) = try writer.write(buffer: message, allocator: self.allocator)
12171232

12181233
first.writeBuffer(&second)
12191234
first.writeBuffer(&third)

0 commit comments

Comments
 (0)