Skip to content

Commit 8173580

Browse files
committed
Adopt the coalescing writer for clients
Motivation: In grpc#1357 we introduced a message frames which coalesces writes into a single buffer in a write-flush cycle to reduce the number of emitted DATA frames. This PR adopts those changes for the client. Modifications: - Adjust the client state machine to use the coalescing writer Results: Small messages are coalesced in a flush cycle within a stream.
1 parent e710507 commit 8173580

5 files changed

+146
-70
lines changed

Sources/GRPC/GRPCClientChannelHandler.swift

+53-23
Original file line numberDiff line numberDiff line change
@@ -529,29 +529,13 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
529529
// Feed the request message into the state machine:
530530
let result = self.stateMachine.sendRequest(
531531
request.message,
532-
compressed: request.compressed
532+
compressed: request.compressed,
533+
promise: promise
533534
)
534-
switch result {
535-
case let .success((buffer, maybeBuffer)):
536-
let frame1 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
537-
self.logger.trace("writing HTTP2 frame", metadata: [
538-
MetadataKey.h2Payload: "DATA",
539-
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
540-
MetadataKey.h2EndStream: "false",
541-
])
542-
// If there's a second buffer, attach the promise to the second write.
543-
let promise1 = maybeBuffer == nil ? promise : nil
544-
context.write(self.wrapOutboundOut(frame1), promise: promise1)
545535

546-
if let actuallyBuffer = maybeBuffer {
547-
let frame2 = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(actuallyBuffer)))
548-
self.logger.trace("writing HTTP2 frame", metadata: [
549-
MetadataKey.h2Payload: "DATA",
550-
MetadataKey.h2DataBytes: "\(actuallyBuffer.readableBytes)",
551-
MetadataKey.h2EndStream: "false",
552-
])
553-
context.write(self.wrapOutboundOut(frame2), promise: promise)
554-
}
536+
switch result {
537+
case .success:
538+
()
555539

556540
case let .failure(writeError):
557541
switch writeError {
@@ -572,13 +556,35 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
572556
}
573557

574558
case .end:
559+
// About to send end: write any outbound messages first.
560+
while let (result, promise) = self.stateMachine.nextRequest() {
561+
switch result {
562+
case let .success(buffer):
563+
let framePayload: HTTP2Frame.FramePayload = .data(
564+
.init(data: .byteBuffer(buffer), endStream: false)
565+
)
566+
567+
self.logger.trace("writing HTTP2 frame", metadata: [
568+
MetadataKey.h2Payload: "DATA",
569+
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
570+
MetadataKey.h2EndStream: "false",
571+
])
572+
context.write(self.wrapOutboundOut(framePayload), promise: promise)
573+
574+
case let .failure(error):
575+
promise?.fail(error)
576+
}
577+
}
578+
575579
// Okay: can we close the request stream?
576580
switch self.stateMachine.sendEndOfRequestStream() {
577581
case .success:
578582
// We can. Send an empty DATA frame with end-stream set.
579583
let empty = context.channel.allocator.buffer(capacity: 0)
580-
let framePayload = HTTP2Frame.FramePayload
581-
.data(.init(data: .byteBuffer(empty), endStream: true))
584+
let framePayload: HTTP2Frame.FramePayload = .data(
585+
.init(data: .byteBuffer(empty), endStream: true)
586+
)
587+
582588
self.logger.trace("writing HTTP2 frame", metadata: [
583589
MetadataKey.h2Payload: "DATA",
584590
MetadataKey.h2DataBytes: "0",
@@ -605,4 +611,28 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
605611
}
606612
}
607613
}
614+
615+
func flush(context: ChannelHandlerContext) {
616+
// Drain any requests.
617+
while let (result, promise) = self.stateMachine.nextRequest() {
618+
switch result {
619+
case let .success(buffer):
620+
let framePayload: HTTP2Frame.FramePayload = .data(
621+
.init(data: .byteBuffer(buffer), endStream: false)
622+
)
623+
624+
self.logger.trace("writing HTTP2 frame", metadata: [
625+
MetadataKey.h2Payload: "DATA",
626+
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
627+
MetadataKey.h2EndStream: "false",
628+
])
629+
context.write(self.wrapOutboundOut(framePayload), promise: promise)
630+
631+
case let .failure(error):
632+
promise?.fail(error)
633+
}
634+
}
635+
636+
context.flush()
637+
}
608638
}

Sources/GRPC/GRPCClientStateMachine.swift

+41-8
Original file line numberDiff line numberDiff line change
@@ -196,13 +196,18 @@ struct GRPCClientStateMachine {
196196
/// request will be written.
197197
mutating func sendRequest(
198198
_ message: ByteBuffer,
199-
compressed: Bool
200-
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
199+
compressed: Bool,
200+
promise: EventLoopPromise<Void>? = nil
201+
) -> Result<Void, MessageWriteError> {
201202
return self.withStateAvoidingCoWs { state in
202-
state.sendRequest(message, compressed: compressed)
203+
state.sendRequest(message, compressed: compressed, promise: promise)
203204
}
204205
}
205206

207+
mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
208+
return self.state.nextRequest()
209+
}
210+
206211
/// Closes the request stream.
207212
///
208213
/// The client must be streaming requests in order to terminate the request stream. Valid
@@ -394,18 +399,21 @@ extension GRPCClientStateMachine.State {
394399
/// See `GRPCClientStateMachine.sendRequest(_:allocator:)`.
395400
mutating func sendRequest(
396401
_ message: ByteBuffer,
397-
compressed: Bool
398-
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
399-
let result: Result<(ByteBuffer, ByteBuffer?), MessageWriteError>
402+
compressed: Bool,
403+
promise: EventLoopPromise<Void>?
404+
) -> Result<Void, MessageWriteError> {
405+
let result: Result<Void, MessageWriteError>
400406

401407
switch self {
402408
case .clientActiveServerIdle(var writeState, let pendingReadState):
403-
result = writeState.write(message, compressed: compressed)
409+
let result = writeState.write(message, compressed: compressed, promise: promise)
404410
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
411+
return result
405412

406413
case .clientActiveServerActive(var writeState, let readState):
407-
result = writeState.write(message, compressed: compressed)
414+
let result = writeState.write(message, compressed: compressed, promise: promise)
408415
self = .clientActiveServerActive(writeState: writeState, readState: readState)
416+
return result
409417

410418
case .clientClosedServerIdle,
411419
.clientClosedServerActive,
@@ -422,6 +430,31 @@ extension GRPCClientStateMachine.State {
422430
return result
423431
}
424432

433+
mutating func nextRequest() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
434+
switch self {
435+
case .clientActiveServerIdle(var writeState, let pendingReadState):
436+
self = .modifying
437+
let result = writeState.next()
438+
self = .clientActiveServerIdle(writeState: writeState, pendingReadState: pendingReadState)
439+
return result
440+
441+
case .clientActiveServerActive(var writeState, let readState):
442+
self = .modifying
443+
let result = writeState.next()
444+
self = .clientActiveServerActive(writeState: writeState, readState: readState)
445+
return result
446+
447+
case .clientIdleServerIdle,
448+
.clientClosedServerIdle,
449+
.clientClosedServerActive,
450+
.clientClosedServerClosed:
451+
return nil
452+
453+
case .modifying:
454+
preconditionFailure("State left as 'modifying'")
455+
}
456+
}
457+
425458
/// See `GRPCClientStateMachine.sendEndOfRequestStream()`.
426459
mutating func sendEndOfRequestStream() -> Result<Void, SendEndOfRequestStreamError> {
427460
let result: Result<Void, SendEndOfRequestStreamError>

Sources/GRPC/LengthPrefixedMessageWriter.swift

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
import Foundation
1717
import NIOCore
18+
import NIOHPACK
1819

1920
internal struct LengthPrefixedMessageWriter {
2021
static let metadataLength = 5

Sources/GRPC/ReadWriteStates.swift

+40-29
Original file line numberDiff line numberDiff line change
@@ -42,50 +42,61 @@ struct PendingWriteState {
4242
compression = nil
4343
}
4444

45-
let writer = LengthPrefixedMessageWriter(compression: compression, allocator: allocator)
46-
return .writing(self.arity, self.contentType, writer)
45+
let writer = CoalescingLengthPrefixedMessageWriter(
46+
compression: compression,
47+
allocator: allocator
48+
)
49+
return .init(arity: self.arity, contentType: self.contentType, writer: writer)
4750
}
4851
}
4952

5053
/// The write state of a stream.
51-
enum WriteState {
52-
/// Writing may be attempted using the given writer.
53-
case writing(MessageArity, ContentType, LengthPrefixedMessageWriter)
54-
55-
/// Writing may not be attempted: either a write previously failed or it is not valid for any
56-
/// more messages to be written.
57-
case notWriting
54+
struct WriteState {
55+
private var arity: MessageArity
56+
private var contentType: ContentType
57+
private var writer: CoalescingLengthPrefixedMessageWriter
58+
private var canWrite: Bool
59+
60+
init(
61+
arity: MessageArity,
62+
contentType: ContentType,
63+
writer: CoalescingLengthPrefixedMessageWriter
64+
) {
65+
self.arity = arity
66+
self.contentType = contentType
67+
self.writer = writer
68+
self.canWrite = true
69+
}
5870

5971
/// Writes a message into a buffer using the `writer`.
6072
///
6173
/// - Parameter message: The `Message` to write.
6274
mutating func write(
6375
_ message: ByteBuffer,
64-
compressed: Bool
65-
) -> Result<(ByteBuffer, ByteBuffer?), MessageWriteError> {
66-
switch self {
67-
case .notWriting:
76+
compressed: Bool,
77+
promise: EventLoopPromise<Void>?
78+
) -> Result<Void, MessageWriteError> {
79+
guard self.canWrite else {
6880
return .failure(.cardinalityViolation)
81+
}
6982

70-
case .writing(let writeArity, let contentType, var writer):
71-
self = .notWriting
72-
let buffers: (ByteBuffer, ByteBuffer?)
83+
self.writer.append(buffer: message, compress: compressed, promise: promise)
7384

74-
do {
75-
buffers = try writer.write(buffer: message, compressed: compressed)
76-
} catch {
77-
self = .notWriting
78-
return .failure(.serializationFailed)
79-
}
85+
switch self.arity {
86+
case .one:
87+
self.canWrite = false
88+
case .many:
89+
()
90+
}
8091

81-
// If we only expect to write one message then we're no longer writable.
82-
if case .one = writeArity {
83-
self = .notWriting
84-
} else {
85-
self = .writing(writeArity, contentType, writer)
86-
}
92+
return .success(())
93+
}
8794

88-
return .success(buffers)
95+
mutating func next() -> (Result<ByteBuffer, MessageWriteError>, EventLoopPromise<Void>?)? {
96+
if let next = self.writer.next() {
97+
return (next.0.mapError { _ in .serializationFailed }, next.1)
98+
} else {
99+
return nil
89100
}
90101
}
91102
}

Tests/GRPCTests/GRPCClientStateMachineTests.swift

+11-10
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,7 @@ extension GRPCClientStateMachineTests {
154154
stateMachine.sendRequest(
155155
ByteBuffer(string: request),
156156
compressed: false
157-
).assertSuccess { buffers in
158-
var buffer = buffers.0
159-
XCTAssertNil(buffers.1)
160-
// Remove the length and compression flag prefix.
161-
buffer.moveReaderIndex(forwardBy: 5)
162-
let data = buffer.readString(length: buffer.readableBytes)!
163-
XCTAssertEqual(request, data)
164-
}
157+
).assertSuccess()
165158
}
166159

167160
func testSendRequestFromIdle() {
@@ -1299,10 +1292,18 @@ extension PendingWriteState {
12991292

13001293
extension WriteState {
13011294
static func one() -> WriteState {
1302-
return .writing(.one, .protobuf, LengthPrefixedMessageWriter(compression: .none))
1295+
return .init(
1296+
arity: .one,
1297+
contentType: .protobuf,
1298+
writer: .init(compression: .none, allocator: .init())
1299+
)
13031300
}
13041301

13051302
static func many() -> WriteState {
1306-
return .writing(.many, .protobuf, LengthPrefixedMessageWriter(compression: .none))
1303+
return .init(
1304+
arity: .many,
1305+
contentType: .protobuf,
1306+
writer: .init(compression: .none, allocator: .init())
1307+
)
13071308
}
13081309
}

0 commit comments

Comments
 (0)