Skip to content

Commit f3f17a2

Browse files
committed
save buffer
1 parent 2fc86a5 commit f3f17a2

8 files changed

+148
-158
lines changed

Sources/GRPC/GRPCClientChannelHandler.swift

+5-3
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,10 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
502502
switch self.unwrapOutboundIn(data) {
503503
case let .head(requestHead):
504504
// Feed the request into the state machine:
505-
switch self.stateMachine.sendRequestHeaders(requestHead: requestHead) {
505+
switch self.stateMachine.sendRequestHeaders(
506+
requestHead: requestHead,
507+
allocator: context.channel.allocator
508+
) {
506509
case let .success(headers):
507510
// We're clear to write some headers. Create an appropriate frame and write it.
508511
let framePayload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
@@ -526,8 +529,7 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
526529
// Feed the request message into the state machine:
527530
let result = self.stateMachine.sendRequest(
528531
request.message,
529-
compressed: request.compressed,
530-
allocator: context.channel.allocator
532+
compressed: request.compressed
531533
)
532534
switch result {
533535
case let .success((buffer, maybeBuffer)):

Sources/GRPC/GRPCClientStateMachine.swift

+14-11
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,11 @@ struct GRPCClientStateMachine {
165165
///
166166
/// - Parameter requestHead: The client request head for the RPC.
167167
mutating func sendRequestHeaders(
168-
requestHead: _GRPCRequestHead
168+
requestHead: _GRPCRequestHead,
169+
allocator: ByteBufferAllocator
169170
) -> Result<HPACKHeaders, SendRequestHeadersError> {
170171
return self.withStateAvoidingCoWs { state in
171-
state.sendRequestHeaders(requestHead: requestHead)
172+
state.sendRequestHeaders(requestHead: requestHead, allocator: allocator)
172173
}
173174
}
174175

@@ -195,11 +196,10 @@ struct GRPCClientStateMachine {
195196
/// request will be written.
196197
mutating func sendRequest(
197198
_ message: ByteBuffer,
198-
compressed: Bool,
199-
allocator: ByteBufferAllocator
199+
compressed: Bool
200200
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
201201
return self.withStateAvoidingCoWs { state in
202-
state.sendRequest(message, compressed: compressed, allocator: allocator)
202+
state.sendRequest(message, compressed: compressed)
203203
}
204204
}
205205

@@ -351,7 +351,8 @@ struct GRPCClientStateMachine {
351351
extension GRPCClientStateMachine.State {
352352
/// See `GRPCClientStateMachine.sendRequestHeaders(requestHead:)`.
353353
mutating func sendRequestHeaders(
354-
requestHead: _GRPCRequestHead
354+
requestHead: _GRPCRequestHead,
355+
allocator: ByteBufferAllocator
355356
) -> Result<HPACKHeaders, SendRequestHeadersError> {
356357
let result: Result<HPACKHeaders, SendRequestHeadersError>
357358

@@ -369,7 +370,10 @@ extension GRPCClientStateMachine.State {
369370
result = .success(headers)
370371

371372
self = .clientActiveServerIdle(
372-
writeState: pendingWriteState.makeWriteState(messageEncoding: requestHead.encoding),
373+
writeState: pendingWriteState.makeWriteState(
374+
messageEncoding: requestHead.encoding,
375+
allocator: allocator
376+
),
373377
pendingReadState: .init(arity: responseArity, messageEncoding: requestHead.encoding)
374378
)
375379

@@ -390,18 +394,17 @@ extension GRPCClientStateMachine.State {
390394
/// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
391395
mutating func sendRequest(
392396
_ message: ByteBuffer,
393-
compressed: Bool,
394-
allocator: ByteBufferAllocator
397+
compressed: Bool
395398
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
396399
let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError>
397400

398401
switch self {
399402
case .clientActiveServerIdle(var writeState, let pendingReadState):
400-
result = writeState.write(message, compressed: compressed, allocator: allocator)
403+
result = writeState.write(message, compressed: compressed)
401404
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
402405

403406
case .clientActiveServerActive(var writeState, let readState):
404-
result = writeState.write(message, compressed: compressed, allocator: allocator)
407+
result = writeState.write(message, compressed: compressed)
405408
self = .clientActiveServerActive(writeState: writeState, readState: readState)
406409

407410
case .clientClosedServerIdle,

Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift

+28-24
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,11 @@ extension HTTP2ToRawGRPCStateMachine.State {
303303
}
304304

305305
// Figure out which encoding we should use for responses.
306-
let (writer, responseEncoding) = self.extractResponseEncoding(from: headers, encoding: encoding)
306+
let (writer, responseEncoding) = self.extractResponseEncoding(
307+
from: headers,
308+
encoding: encoding,
309+
allocator: allocator
310+
)
307311

308312
// Parse the path, and create a call handler.
309313
guard let path = headers.first(name: ":path") else {
@@ -516,7 +520,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
516520
/// - Returns: A message writer and the response encoding header to send back to the client.
517521
private func extractResponseEncoding(
518522
from headers: HPACKHeaders,
519-
encoding: ServerMessageEncoding
523+
encoding: ServerMessageEncoding,
524+
allocator: ByteBufferAllocator
520525
) -> (LengthPrefixedMessageWriter, String?) {
521526
let writer: LengthPrefixedMessageWriter
522527
let responseEncoding: String?
@@ -534,12 +539,12 @@ extension HTTP2ToRawGRPCStateMachine.State {
534539
configuration.enabledAlgorithms.contains($0)
535540
}
536541

537-
writer = LengthPrefixedMessageWriter(compression: algorithm)
542+
writer = LengthPrefixedMessageWriter(compression: algorithm, allocator: allocator)
538543
responseEncoding = algorithm?.name
539544

540545
case .disabled:
541546
// The server doesn't have compression enabled.
542-
writer = LengthPrefixedMessageWriter(compression: .none)
547+
writer = LengthPrefixedMessageWriter(compression: .none, allocator: allocator)
543548
responseEncoding = nil
544549
}
545550

@@ -642,11 +647,10 @@ extension HTTP2ToRawGRPCStateMachine {
642647
static func writeGRPCFramedMessage(
643648
_ buffer: ByteBuffer,
644649
compress: Bool,
645-
allocator: ByteBufferAllocator,
646-
writer: LengthPrefixedMessageWriter
650+
writer: inout LengthPrefixedMessageWriter
647651
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
648652
do {
649-
let buffers = try writer.write(buffer: buffer, allocator: allocator, compressed: compress)
653+
let buffers = try writer.write(buffer: buffer, compressed: compress)
650654
return .success(buffers)
651655
} catch {
652656
return .failure(error)
@@ -655,31 +659,27 @@ extension HTTP2ToRawGRPCStateMachine {
655659
}
656660

657661
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
658-
func send(
662+
mutating func send(
659663
buffer: ByteBuffer,
660-
allocator: ByteBufferAllocator,
661664
compress: Bool
662665
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
663666
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
664667
buffer,
665668
compress: compress,
666-
allocator: allocator,
667-
writer: self.writer
669+
writer: &self.writer
668670
)
669671
}
670672
}
671673

672674
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
673-
func send(
675+
mutating func send(
674676
buffer: ByteBuffer,
675-
allocator: ByteBufferAllocator,
676677
compress: Bool
677678
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
678679
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
679680
buffer,
680681
compress: compress,
681-
allocator: allocator,
682-
writer: self.writer
682+
writer: &self.writer
683683
)
684684
}
685685
}
@@ -903,12 +903,14 @@ extension HTTP2ToRawGRPCStateMachine {
903903
}
904904

905905
/// Send a response buffer.
906-
func send(
906+
mutating func send(
907907
buffer: ByteBuffer,
908908
allocator: ByteBufferAllocator,
909909
compress: Bool
910910
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
911-
return self.state.send(buffer: buffer, allocator: allocator, compress: compress)
911+
return self.withStateAvoidingCoWs { state in
912+
state.send(buffer: buffer, allocator: allocator, compress: compress)
913+
}
912914
}
913915

914916
/// Send status and trailers.
@@ -1115,7 +1117,7 @@ extension HTTP2ToRawGRPCStateMachine.State {
11151117
}
11161118
}
11171119

1118-
func send(
1120+
mutating func send(
11191121
buffer: ByteBuffer,
11201122
allocator: ByteBufferAllocator,
11211123
compress: Bool
@@ -1129,19 +1131,21 @@ extension HTTP2ToRawGRPCStateMachine.State {
11291131
let error = GRPCError.InvalidState("Response headers must be sent before response message")
11301132
return .failure(error)
11311133

1132-
case let .requestOpenResponseOpen(state):
1133-
return state.send(
1134+
case var .requestOpenResponseOpen(state):
1135+
let result = state.send(
11341136
buffer: buffer,
1135-
allocator: allocator,
11361137
compress: compress
11371138
)
1139+
self = .requestOpenResponseOpen(state)
1140+
return result
11381141

1139-
case let .requestClosedResponseOpen(state):
1140-
return state.send(
1142+
case var .requestClosedResponseOpen(state):
1143+
let result = state.send(
11411144
buffer: buffer,
1142-
allocator: allocator,
11431145
compress: compress
11441146
)
1147+
self = .requestClosedResponseOpen(state)
1148+
return result
11451149

11461150
case .requestOpenResponseClosed,
11471151
.requestClosedResponseClosed:

Sources/GRPC/LengthPrefixedMessageWriter.swift

+24-30
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,13 @@ internal struct LengthPrefixedMessageWriter {
2828
return self.compression != nil
2929
}
3030

31-
init(compression: CompressionAlgorithm? = nil) {
31+
/// A scratch buffer that we encode messages into: if the buffer isn't held elsewhere then we
32+
/// can avoid having to allocate a new one.
33+
private var scratch: ByteBuffer
34+
35+
init(compression: CompressionAlgorithm? = nil, allocator: ByteBufferAllocator) {
3236
self.compression = compression
37+
self.scratch = allocator.buffer(capacity: 0)
3338

3439
switch self.compression?.algorithm {
3540
case .none, .some(.identity):
@@ -41,73 +46,62 @@ internal struct LengthPrefixedMessageWriter {
4146
}
4247
}
4348

44-
private func compress(
49+
private mutating func compress(
4550
buffer: ByteBuffer,
46-
using compressor: Zlib.Deflate,
47-
allocator: ByteBufferAllocator
51+
using compressor: Zlib.Deflate
4852
) throws -> ByteBuffer {
4953
// The compressor will allocate the correct size. For now the leading 5 bytes will do.
50-
var output = allocator.buffer(capacity: 5)
51-
54+
self.scratch.clear(minimumCapacity: 5)
5255
// Set the compression byte.
53-
output.writeInteger(UInt8(1))
54-
56+
self.scratch.writeInteger(UInt8(1))
5557
// Set the length to zero; we'll write the actual value in a moment.
56-
let payloadSizeIndex = output.writerIndex
57-
output.writeInteger(UInt32(0))
58+
let payloadSizeIndex = self.scratch.writerIndex
59+
self.scratch.writeInteger(UInt32(0))
5860

5961
let bytesWritten: Int
6062

6163
do {
6264
var buffer = buffer
63-
bytesWritten = try compressor.deflate(&buffer, into: &output)
65+
bytesWritten = try compressor.deflate(&buffer, into: &self.scratch)
6466
} catch {
6567
throw error
6668
}
6769

6870
// Now fill in the message length.
69-
output.writePayloadLength(UInt32(bytesWritten), at: payloadSizeIndex)
71+
self.scratch.writePayloadLength(UInt32(bytesWritten), at: payloadSizeIndex)
7072

7173
// Finally, the compression context should be reset between messages.
7274
compressor.reset()
7375

74-
return output
76+
return self.scratch
7577
}
7678

7779
/// Writes the readable bytes of `buffer` as a gRPC length-prefixed message.
7880
///
7981
/// - Parameters:
8082
/// - buffer: The bytes to compress and length-prefix.
81-
/// - allocator: A `ByteBufferAllocator`.
8283
/// - compressed: Whether the bytes should be compressed. This is ignored if not compression
8384
/// mechanism was configured on this writer.
8485
/// - Returns: A buffer containing the length prefixed bytes.
85-
func write(
86+
mutating func write(
8687
buffer: ByteBuffer,
87-
allocator: ByteBufferAllocator,
8888
compressed: Bool = true
8989
) throws -> (ByteBuffer, ByteBuffer?) {
9090
if compressed, let compressor = self.compressor {
91-
let compressedAndFramedPayload = try self.compress(
92-
buffer: buffer,
93-
using: compressor,
94-
allocator: allocator
95-
)
91+
let compressedAndFramedPayload = try self.compress(buffer: buffer, using: compressor)
9692
return (compressedAndFramedPayload, nil)
9793
} else if buffer.readableBytes > Self.singleBufferSizeLimit {
9894
// Buffer is larger than the limit for emitting a single buffer: create a second buffer
9995
// containing just the message header.
100-
var prefixed = allocator.buffer(capacity: 5)
101-
prefixed.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
102-
return (prefixed, buffer)
96+
self.scratch.clear(minimumCapacity: 5)
97+
self.scratch.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
98+
return (self.scratch, buffer)
10399
} else {
104100
// We're not compressing and the message is within our single buffer size limit.
105-
var lengthPrefixed = allocator.buffer(capacity: 5 &+ buffer.readableBytes)
106-
// Write the compression byte and message length.
107-
lengthPrefixed.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
108-
// Write the message.
109-
lengthPrefixed.writeImmutableBuffer(buffer)
110-
return (lengthPrefixed, nil)
101+
self.scratch.clear(minimumCapacity: 5 &+ buffer.readableBytes)
102+
self.scratch.writeMultipleIntegers(UInt8(0), UInt32(buffer.readableBytes))
103+
self.scratch.writeImmutableBuffer(buffer)
104+
return (self.scratch, nil)
111105
}
112106
}
113107

Sources/GRPC/ReadWriteStates.swift

+10-9
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ struct PendingWriteState {
3030
/// The 'content-type' being written.
3131
var contentType: ContentType
3232

33-
func makeWriteState(messageEncoding: ClientMessageEncoding) -> WriteState {
33+
func makeWriteState(
34+
messageEncoding: ClientMessageEncoding,
35+
allocator: ByteBufferAllocator
36+
) -> WriteState {
3437
let compression: CompressionAlgorithm?
3538
switch messageEncoding {
3639
case let .enabled(configuration):
@@ -39,7 +42,7 @@ struct PendingWriteState {
3942
compression = nil
4043
}
4144

42-
let writer = LengthPrefixedMessageWriter(compression: compression)
45+
let writer = LengthPrefixedMessageWriter(compression: compression, allocator: allocator)
4346
return .writing(self.arity, self.contentType, writer)
4447
}
4548
}
@@ -53,25 +56,23 @@ enum WriteState {
5356
/// more messages to be written.
5457
case notWriting
5558

56-
/// Writes a message into a buffer using the `writer` and `allocator`.
59+
/// Writes a message into a buffer using the `writer`.
5760
///
5861
/// - Parameter message: The `Message` to write.
59-
/// - Parameter allocator: An allocator to provide a `ByteBuffer` into which the message will be
60-
/// written.
6162
mutating func write(
6263
_ message: ByteBuffer,
63-
compressed: Bool,
64-
allocator: ByteBufferAllocator
64+
compressed: Bool
6565
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
6666
switch self {
6767
case .notWriting:
6868
return .failure(.cardinalityViolation)
6969

70-
case let .writing(writeArity, contentType, writer):
70+
case .writing(let writeArity, let contentType, var writer):
71+
self = .notWriting
7172
let buffers: (ByteBuffer, ByteBuffer?)
7273

7374
do {
74-
buffers = try writer.write(buffer: message, allocator: allocator, compressed: compressed)
75+
buffers = try writer.write(buffer: message, compressed: compressed)
7576
} catch {
7677
self = .notWriting
7778
return .failure(.serializationFailed)

0 commit comments

Comments
 (0)