Skip to content

optimize number of buffer allocations #11879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions core/src/main/java/io/grpc/internal/MessageFramer.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ void deliverFrame(
// effectively final. Can only be set once.
private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE;
private WritableBuffer buffer;
/**
* if > 0 - the number of bytes to allocate for the current known-length message.
*/
private int knownLengthPendingAllocation;
private Compressor compressor = Codec.Identity.NONE;
private boolean messageCompression = true;
private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
Expand Down Expand Up @@ -222,13 +226,25 @@ private int writeKnownLengthUncompressed(InputStream message, int messageLength)
headerScratch.put(UNCOMPRESSED).putInt(messageLength);
// Allocate the initial buffer chunk based on frame header + payload length.
// Note that the allocator may allocate a buffer larger or smaller than this length
knownLengthPendingAllocation = HEADER_LENGTH + messageLength;
if (buffer == null) {
buffer = bufferAllocator.allocate(headerScratch.position() + messageLength);
buffer = allocateKnownLength();
}
writeRaw(headerScratch.array(), 0, headerScratch.position());
return writeToOutputStream(message, outputStreamAdapter);
}

/**
* Allocate buffer according to {@link #knownLengthPendingAllocation} which is decremented after
* that.
*/
private WritableBuffer allocateKnownLength() {
WritableBuffer newBuffer = bufferAllocator.allocateKnownLength(knownLengthPendingAllocation);
knownLengthPendingAllocation -= Math.min(knownLengthPendingAllocation,
newBuffer.writableBytes());
return newBuffer;
}

/**
* Write a message that has been serialized to a sequence of buffers.
*/
Expand All @@ -243,7 +259,7 @@ private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compr
}
headerScratch.clear();
headerScratch.put(compressed ? COMPRESSED : UNCOMPRESSED).putInt(messageLength);
WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH);
WritableBuffer writeableHeader = bufferAllocator.allocateKnownLength(HEADER_LENGTH);
writeableHeader.write(headerScratch.array(), 0, headerScratch.position());
if (messageLength == 0) {
// the payload had 0 length so make the header the current buffer.
Expand Down Expand Up @@ -289,7 +305,9 @@ private void writeRaw(byte[] b, int off, int len) {
}
if (buffer == null) {
// Request a buffer allocation using the message length as a hint.
buffer = bufferAllocator.allocate(len);
buffer = knownLengthPendingAllocation > 0
? allocateKnownLength()
: bufferAllocator.allocate(len);
}
int toWrite = min(len, buffer.writableBytes());
buffer.write(b, off, toWrite);
Expand Down Expand Up @@ -397,7 +415,7 @@ private final class BufferChainOutputStream extends OutputStream {
* {@link #write(byte[], int, int)}.
*/
@Override
public void write(int b) throws IOException {
public void write(int b) {
if (current != null && current.writableBytes() > 0) {
current.write((byte)b);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ public interface WritableBufferAllocator {
* free to return a buffer with a greater or lesser capacity.
*/
WritableBuffer allocate(int capacityHint);

/**
* Request a new {@link WritableBuffer} with the given {@code capacityHint}. This method is
* similar to {@link #allocate(int)}, but there is no need to allocate greater capacity.
*/
default WritableBuffer allocateKnownLength(int capacityHint) {
return allocate(capacityHint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
Expand Down Expand Up @@ -53,8 +55,6 @@
import io.grpc.testing.integration.TestServiceGrpc.TestServiceBlockingStub;
import io.grpc.testing.integration.TransportCompressionTest.Fzip;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -146,25 +146,16 @@ public void tearDown() {
* Parameters for test.
*/
@Parameters
public static Collection<Object[]> params() {
boolean[] bools = new boolean[]{false, true};
List<Object[]> combos = new ArrayList<>(64);
for (boolean enableClientMessageCompression : bools) {
for (boolean clientAcceptEncoding : bools) {
for (boolean clientEncoding : bools) {
for (boolean enableServerMessageCompression : bools) {
for (boolean serverAcceptEncoding : bools) {
for (boolean serverEncoding : bools) {
combos.add(new Object[] {
enableClientMessageCompression, clientAcceptEncoding, clientEncoding,
enableServerMessageCompression, serverAcceptEncoding, serverEncoding});
}
}
}
}
}
}
return combos;
public static Iterable<Object[]> params() {
List<Boolean> bools = Lists.newArrayList(false, true);
return Iterables.transform(Lists.cartesianProduct(
bools, // enableClientMessageCompression
bools, // clientAcceptEncoding
bools, // clientEncoding
bools, // enableServerMessageCompression
bools, // serverAcceptEncoding
bools // serverEncoding
), List::toArray);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
*/
class NettyWritableBufferAllocator implements WritableBufferAllocator {

// Use 4k as our minimum buffer size.
private static final int MIN_BUFFER = 4 * 1024;

// Set the maximum buffer size to 1MB.
private static final int MAX_BUFFER = 1024 * 1024;

Expand All @@ -44,6 +47,11 @@ class NettyWritableBufferAllocator implements WritableBufferAllocator {

@Override
public WritableBuffer allocate(int capacityHint) {
return allocateKnownLength(Math.max(MIN_BUFFER, capacityHint));
}

@Override
public WritableBuffer allocateKnownLength(int capacityHint) {
capacityHint = Math.min(MAX_BUFFER, capacityHint);
return new NettyWritableBuffer(allocator.buffer(capacityHint, capacityHint));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void writeFrameFutureFailedShouldCancelRpc() {
// Verify that failed SendGrpcFrameCommand results in immediate CancelClientStreamCommand.
inOrder.verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true));
// Verify that any other failures do not produce another CancelClientStreamCommand in the queue.
inOrder.verify(writeQueue, atLeast(1)).enqueue(any(SendGrpcFrameCommand.class), eq(false));
inOrder.verify(writeQueue, atLeast(0)).enqueue(any(SendGrpcFrameCommand.class), eq(false));
inOrder.verify(writeQueue).enqueue(any(SendGrpcFrameCommand.class), eq(true));
inOrder.verifyNoMoreInteractions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void writeFrameFutureFailedShouldCancelRpc() {
// Verify that failed SendGrpcFrameCommand results in immediate CancelServerStreamCommand.
inOrder.verify(writeQueue).enqueue(any(CancelServerStreamCommand.class), eq(true));
// Verify that any other failures do not produce another CancelServerStreamCommand in the queue.
inOrder.verify(writeQueue, atLeast(1)).enqueue(any(SendGrpcFrameCommand.class), eq(false));
inOrder.verify(writeQueue, atLeast(0)).enqueue(any(SendGrpcFrameCommand.class), eq(false));
inOrder.verify(writeQueue).enqueue(any(SendGrpcFrameCommand.class), eq(true));
inOrder.verifyNoMoreInteractions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.grpc.internal.WritableBuffer;
import io.grpc.internal.WritableBufferAllocator;
import okio.Buffer;
import okio.Segment;

/**
* The default allocator for {@link OkHttpWritableBuffer}s used by the OkHttp transport. OkHttp
Expand All @@ -27,9 +28,6 @@
*/
class OkHttpWritableBufferAllocator implements WritableBufferAllocator {

// Use 4k as our minimum buffer size.
private static final int MIN_BUFFER = 4096;

// Set the maximum buffer size to 1MB
private static final int MAX_BUFFER = 1024 * 1024;

Expand All @@ -45,7 +43,13 @@ class OkHttpWritableBufferAllocator implements WritableBufferAllocator {
*/
@Override
public WritableBuffer allocate(int capacityHint) {
capacityHint = Math.min(MAX_BUFFER, Math.max(MIN_BUFFER, capacityHint));
// okio buffer uses fixed size Segments, round capacityHint up
return allocateKnownLength((capacityHint + Segment.SIZE - 1) / Segment.SIZE * Segment.SIZE);
}

@Override
public WritableBuffer allocateKnownLength(int capacityHint) {
capacityHint = Math.min(MAX_BUFFER, capacityHint);
return new OkHttpWritableBuffer(new Buffer(), capacityHint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.grpc.internal.WritableBuffer;
import io.grpc.internal.WritableBufferAllocator;
import io.grpc.internal.WritableBufferAllocatorTestBase;
import okio.Segment;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -42,7 +43,7 @@ protected WritableBufferAllocator allocator() {
public void testCapacity() {
WritableBuffer buffer = allocator().allocate(4096);
assertEquals(0, buffer.readableBytes());
assertEquals(4096, buffer.writableBytes());
assertEquals(Segment.SIZE, buffer.writableBytes());
}

@Test
Expand All @@ -54,8 +55,8 @@ public void testInitialCapacityHasMaximum() {

@Test
public void testIsExactBelowMaxCapacity() {
WritableBuffer buffer = allocator().allocate(4097);
WritableBuffer buffer = allocator().allocate(Segment.SIZE + 1);
assertEquals(0, buffer.readableBytes());
assertEquals(4097, buffer.writableBytes());
assertEquals(Segment.SIZE * 2, buffer.writableBytes());
}
}
17 changes: 16 additions & 1 deletion servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.grpc.internal.TransportFrameUtil;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import io.grpc.internal.WritableBufferAllocator;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
Expand Down Expand Up @@ -72,7 +73,7 @@ final class ServletServerStream extends AbstractServerStream {
Attributes attributes,
String authority,
InternalLogId logId) throws IOException {
super(ByteArrayWritableBuffer::new, statsTraceCtx);
super(ALLOCATOR, statsTraceCtx);
transportState =
new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
this.attributes = attributes;
Expand Down Expand Up @@ -161,6 +162,20 @@ public void deframeFailed(Throwable cause) {
}
}

private static final WritableBufferAllocator ALLOCATOR = new WritableBufferAllocator() {
private static final int MIN_BUFFER = 4096;

@Override
public WritableBuffer allocate(int capacityHint) {
return new ByteArrayWritableBuffer(max(MIN_BUFFER, capacityHint));
}

@Override
public WritableBuffer allocateKnownLength(int capacityHint) {
return new ByteArrayWritableBuffer(capacityHint);
}
};

private static final class ByteArrayWritableBuffer implements WritableBuffer {

private final int capacity;
Expand Down