Skip to content

Commit 81c7a32

Browse files
glbrnttLukasa
authored andcommitted
Adopt the coalescing writer for servers (grpc#1546)
Motivation: In grpc#1357 we introduced a message frames which coalesces writes into a single buffer in a write-flush cycle to reduce the number of emitted DATA frames. This PR adopts those changes for the server. Modifications: - Adjust the server state machine and handler to use the coalescing writer Results: Small messages are coalesced in a flush cycle within a stream. Co-authored-by: Cory Benfield <[email protected]>
1 parent 612a404 commit 81c7a32

File tree

4 files changed

+145
-83
lines changed

4 files changed

+145
-83
lines changed

Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift

+23-14
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,26 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
181181
self.isReading = false
182182

183183
if self.flushPending {
184+
self.deliverPendingResponses()
184185
self.flushPending = false
185186
context.flush()
186187
}
187188

188189
context.fireChannelReadComplete()
189190
}
190191

192+
private func deliverPendingResponses() {
193+
while let (result, promise) = self.state.nextResponse() {
194+
switch result {
195+
case let .success(buffer):
196+
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
197+
self.context.write(self.wrapOutboundOut(payload), promise: promise)
198+
case let .failure(error):
199+
promise?.fail(error)
200+
}
201+
}
202+
}
203+
191204
/// Called when the pipeline has finished configuring.
192205
private func configured() {
193206
switch self.state.pipelineConfigured() {
@@ -288,23 +301,14 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
288301
metadata: MessageMetadata,
289302
promise: EventLoopPromise<Void>?
290303
) {
291-
let writeBuffer = self.state.send(
304+
let result = self.state.send(
292305
buffer: buffer,
293-
allocator: self.context.channel.allocator,
294-
compress: metadata.compress
306+
compress: metadata.compress,
307+
promise: promise
295308
)
296309

297-
switch writeBuffer {
298-
case let .success((buffer, maybeBuffer)):
299-
if let actuallyBuffer = maybeBuffer {
300-
let payload1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
301-
self.context.write(self.wrapOutboundOut(payload1), promise: nil)
302-
let payload2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
303-
self.context.write(self.wrapOutboundOut(payload2), promise: promise)
304-
} else {
305-
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
306-
self.context.write(self.wrapOutboundOut(payload), promise: promise)
307-
}
310+
switch result {
311+
case .success:
308312
if metadata.flush {
309313
self.markFlushPoint()
310314
}
@@ -319,6 +323,9 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
319323
trailers: HPACKHeaders,
320324
promise: EventLoopPromise<Void>?
321325
) {
326+
// About to end the stream: send any pending responses.
327+
self.deliverPendingResponses()
328+
322329
switch self.state.send(status: status, trailers: trailers) {
323330
case let .sendTrailers(trailers):
324331
self.sendTrailers(trailers, promise: promise)
@@ -349,6 +356,8 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelInboundHandler, GRPCServe
349356
if self.isReading {
350357
self.flushPending = true
351358
} else {
359+
// About to flush: send any pending responses.
360+
self.deliverPendingResponses()
352361
self.flushPending = false
353362
self.context.flush()
354363
}

Sources/GRPC/HTTP2ToRawGRPCStateMachine.swift

+59-53
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ extension HTTP2ToRawGRPCStateMachine {
5353
var reader: LengthPrefixedMessageReader
5454

5555
/// A length prefixed message writer for response messages.
56-
var writer: LengthPrefixedMessageWriter
56+
var writer: CoalescingLengthPrefixedMessageWriter
5757

5858
/// The content type of the RPC.
5959
var contentType: ContentType
@@ -78,7 +78,7 @@ extension HTTP2ToRawGRPCStateMachine {
7878
var reader: LengthPrefixedMessageReader
7979

8080
/// A length prefixed message writer for response messages.
81-
var writer: LengthPrefixedMessageWriter
81+
var writer: CoalescingLengthPrefixedMessageWriter
8282

8383
/// The content type of the RPC.
8484
var contentType: ContentType
@@ -113,7 +113,7 @@ extension HTTP2ToRawGRPCStateMachine {
113113
var reader: LengthPrefixedMessageReader
114114

115115
/// A length prefixed message writer for response messages.
116-
var writer: LengthPrefixedMessageWriter
116+
var writer: CoalescingLengthPrefixedMessageWriter
117117

118118
/// Whether to normalize user-provided metadata.
119119
var normalizeHeaders: Bool
@@ -130,7 +130,7 @@ extension HTTP2ToRawGRPCStateMachine {
130130
var reader: LengthPrefixedMessageReader
131131

132132
/// A length prefixed message writer for response messages.
133-
var writer: LengthPrefixedMessageWriter
133+
var writer: CoalescingLengthPrefixedMessageWriter
134134

135135
/// Whether to normalize user-provided metadata.
136136
var normalizeHeaders: Bool
@@ -502,8 +502,8 @@ extension HTTP2ToRawGRPCStateMachine.State {
502502
from headers: HPACKHeaders,
503503
encoding: ServerMessageEncoding,
504504
allocator: ByteBufferAllocator
505-
) -> (LengthPrefixedMessageWriter, String?) {
506-
let writer: LengthPrefixedMessageWriter
505+
) -> (CoalescingLengthPrefixedMessageWriter, String?) {
506+
let writer: CoalescingLengthPrefixedMessageWriter
507507
let responseEncoding: String?
508508

509509
switch encoding {
@@ -519,12 +519,12 @@ extension HTTP2ToRawGRPCStateMachine.State {
519519
configuration.enabledAlgorithms.contains($0)
520520
}
521521

522-
writer = LengthPrefixedMessageWriter(compression: algorithm, allocator: allocator)
522+
writer = .init(compression: algorithm, allocator: allocator)
523523
responseEncoding = algorithm?.name
524524

525525
case .disabled:
526526
// The server doesn't have compression enabled.
527-
writer = LengthPrefixedMessageWriter(compression: .none, allocator: allocator)
527+
writer = .init(compression: .none, allocator: allocator)
528528
responseEncoding = nil
529529
}
530530

@@ -623,44 +623,23 @@ extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseIdleState {
623623

624624
// MARK: - Send Data
625625

626-
extension HTTP2ToRawGRPCStateMachine {
627-
static func writeGRPCFramedMessage(
628-
_ buffer: ByteBuffer,
629-
compress: Bool,
630-
writer: inout LengthPrefixedMessageWriter
631-
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
632-
do {
633-
let buffers = try writer.write(buffer: buffer, compressed: compress)
634-
return .success(buffers)
635-
} catch {
636-
return .failure(error)
637-
}
638-
}
639-
}
640-
641626
extension HTTP2ToRawGRPCStateMachine.RequestOpenResponseOpenState {
642627
mutating func send(
643628
buffer: ByteBuffer,
644-
compress: Bool
645-
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
646-
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
647-
buffer,
648-
compress: compress,
649-
writer: &self.writer
650-
)
629+
compress: Bool,
630+
promise: EventLoopPromise<Void>?
631+
) {
632+
self.writer.append(buffer: buffer, compress: compress, promise: promise)
651633
}
652634
}
653635

654636
extension HTTP2ToRawGRPCStateMachine.RequestClosedResponseOpenState {
655637
mutating func send(
656638
buffer: ByteBuffer,
657-
compress: Bool
658-
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
659-
return HTTP2ToRawGRPCStateMachine.writeGRPCFramedMessage(
660-
buffer,
661-
compress: compress,
662-
writer: &self.writer
663-
)
639+
compress: Bool,
640+
promise: EventLoopPromise<Void>?
641+
) {
642+
self.writer.append(buffer: buffer, compress: compress, promise: promise)
664643
}
665644
}
666645

@@ -879,10 +858,14 @@ extension HTTP2ToRawGRPCStateMachine {
879858
/// Send a response buffer.
880859
mutating func send(
881860
buffer: ByteBuffer,
882-
allocator: ByteBufferAllocator,
883-
compress: Bool
884-
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
885-
self.state.send(buffer: buffer, allocator: allocator, compress: compress)
861+
compress: Bool,
862+
promise: EventLoopPromise<Void>?
863+
) -> Result<Void, Error> {
864+
self.state.send(buffer: buffer, compress: compress, promise: promise)
865+
}
866+
867+
mutating func nextResponse() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? {
868+
self.state.nextResponse()
886869
}
887870

888871
/// Send status and trailers.
@@ -1070,9 +1053,9 @@ extension HTTP2ToRawGRPCStateMachine.State {
10701053

10711054
mutating func send(
10721055
buffer: ByteBuffer,
1073-
allocator: ByteBufferAllocator,
1074-
compress: Bool
1075-
) -> Result<(ByteBuffer, ByteBuffer?), Error> {
1056+
compress: Bool,
1057+
promise: EventLoopPromise<Void>?
1058+
) -> Result<Void, Error> {
10761059
switch self {
10771060
case .requestIdleResponseIdle:
10781061
preconditionFailure("Invalid state: the request stream is still closed")
@@ -1083,24 +1066,47 @@ extension HTTP2ToRawGRPCStateMachine.State {
10831066
return .failure(error)
10841067

10851068
case var .requestOpenResponseOpen(state):
1086-
let result = state.send(
1087-
buffer: buffer,
1088-
compress: compress
1089-
)
1069+
self = .requestClosedResponseClosed
1070+
state.send(buffer: buffer, compress: compress, promise: promise)
1071+
self = .requestOpenResponseOpen(state)
1072+
return .success(())
1073+
1074+
case var .requestClosedResponseOpen(state):
1075+
self = .requestClosedResponseClosed
1076+
state.send(buffer: buffer, compress: compress, promise: promise)
1077+
self = .requestClosedResponseOpen(state)
1078+
return .success(())
1079+
1080+
case .requestOpenResponseClosed,
1081+
.requestClosedResponseClosed:
1082+
return .failure(GRPCError.AlreadyComplete())
1083+
}
1084+
}
1085+
1086+
mutating func nextResponse() -> (Result<ByteBuffer, Error>, EventLoopPromise<Void>?)? {
1087+
switch self {
1088+
case .requestIdleResponseIdle:
1089+
preconditionFailure("Invalid state: the request stream is still closed")
1090+
1091+
case .requestOpenResponseIdle,
1092+
.requestClosedResponseIdle:
1093+
return nil
1094+
1095+
case var .requestOpenResponseOpen(state):
1096+
self = .requestClosedResponseClosed
1097+
let result = state.writer.next()
10901098
self = .requestOpenResponseOpen(state)
10911099
return result
10921100

10931101
case var .requestClosedResponseOpen(state):
1094-
let result = state.send(
1095-
buffer: buffer,
1096-
compress: compress
1097-
)
1102+
self = .requestClosedResponseClosed
1103+
let result = state.writer.next()
10981104
self = .requestClosedResponseOpen(state)
10991105
return result
11001106

11011107
case .requestOpenResponseClosed,
11021108
.requestClosedResponseClosed:
1103-
return .failure(GRPCError.AlreadyComplete())
1109+
return nil
11041110
}
11051111
}
11061112

Tests/GRPCTests/CompressionTests.swift

+16-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import EchoImplementation
1717
import EchoModel
1818
import GRPC
19+
import NIOConcurrencyHelpers
1920
import NIOCore
2021
import NIOHPACK
2122
import NIOPosix
@@ -268,21 +269,27 @@ class MessageCompressionTests: GRPCTestCase {
268269

269270
func testDecompressionLimitIsRespectedByClientForStreamingCall() throws {
270271
try self.setupServer(encoding: .enabled(.init(decompressionLimit: .absolute(2048))))
271-
self
272-
.setupClient(encoding: .enabled(.init(
273-
forRequests: .gzip,
274-
decompressionLimit: .absolute(1024)
275-
)))
272+
self.setupClient(
273+
encoding: .enabled(.init(forRequests: .gzip, decompressionLimit: .absolute(1024)))
274+
)
275+
276+
let responsePromise = self.group.next().makePromise(of: Echo_EchoResponse.self)
277+
let lock = NIOLock()
278+
var responseCount = 0
276279

277-
var responses: [Echo_EchoResponse] = []
278280
let update = self.echo.update {
279-
responses.append($0)
281+
lock.withLock {
282+
responseCount += 1
283+
}
284+
responsePromise.succeed($0)
280285
}
281286

282287
let status = self.expectation(description: "received status")
283288

284289
// Smaller than limit.
285290
update.sendMessage(.with { $0.text = "foo" }, promise: nil)
291+
XCTAssertNoThrow(try responsePromise.futureResult.wait())
292+
286293
// Should be just over the limit.
287294
update.sendMessage(.with { $0.text = String(repeating: "x", count: 1024) }, promise: nil)
288295
update.sendEnd(promise: nil)
@@ -292,7 +299,8 @@ class MessageCompressionTests: GRPCTestCase {
292299
}.assertEqual(.resourceExhausted, fulfill: status)
293300

294301
self.wait(for: [status], timeout: self.defaultTimeout)
295-
XCTAssertEqual(responses.count, 1)
302+
let receivedResponses = lock.withLock { responseCount }
303+
XCTAssertEqual(receivedResponses, 1)
296304
}
297305

298306
func testIdentityCompressionIsntCompression() throws {

0 commit comments

Comments
 (0)