Skip to content

Commit 403d1ff

Browse files
Optimize FilterStreamInput for Network Reads (#52395)
When `FilterStreamInput` wraps a Netty `ByteBuf` based stream it did not forward the bulk primitive reads to the delegate. These are optimized on the delegate but if they're not forwarded then the delegate will be called e.g. 4 times to read an `int`. This happens for essentially all network reads prior to this change because they all run from a `NamedWritableAwareStreamInput`. This also required optimising `BufferedChecksumStreamInput` individually to use bulk reads from the buffer because it implicitly assumed that the filter stream input wouldn't override any of the bulk operations.
1 parent 9d6eab2 commit 403d1ff

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,21 @@ public void readBytes(byte[] b, int offset, int len) throws IOException {
4545
delegate.readBytes(b, offset, len);
4646
}
4747

48+
@Override
49+
public short readShort() throws IOException {
50+
return delegate.readShort();
51+
}
52+
53+
@Override
54+
public int readInt() throws IOException {
55+
return delegate.readInt();
56+
}
57+
58+
@Override
59+
public long readLong() throws IOException {
60+
return delegate.readLong();
61+
}
62+
4863
@Override
4964
public void reset() throws IOException {
5065
delegate.reset();

server/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamInput.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,30 @@ public void readBytes(byte[] b, int offset, int len) throws IOException {
7070
digest.update(b, offset, len);
7171
}
7272

73+
private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[8]);
74+
75+
@Override
76+
public short readShort() throws IOException {
77+
final byte[] buf = buffer.get();
78+
readBytes(buf, 0, 2);
79+
return (short) (((buf[0] & 0xFF) << 8) | (buf[1] & 0xFF));
80+
}
81+
82+
@Override
83+
public int readInt() throws IOException {
84+
final byte[] buf = buffer.get();
85+
readBytes(buf, 0, 4);
86+
return ((buf[0] & 0xFF) << 24) | ((buf[1] & 0xFF) << 16) | ((buf[2] & 0xFF) << 8) | (buf[3] & 0xFF);
87+
}
88+
89+
@Override
90+
public long readLong() throws IOException {
91+
final byte[] buf = buffer.get();
92+
readBytes(buf, 0, 8);
93+
return (((long) (((buf[0] & 0xFF) << 24) | ((buf[1] & 0xFF) << 16) | ((buf[2] & 0xFF) << 8) | (buf[3] & 0xFF))) << 32)
94+
| ((((buf[4] & 0xFF) << 24) | ((buf[5] & 0xFF) << 16) | ((buf[6] & 0xFF) << 8) | (buf[7] & 0xFF)) & 0xFFFFFFFFL);
95+
}
96+
7397
@Override
7498
public void reset() throws IOException {
7599
delegate.reset();

0 commit comments

Comments
 (0)