Skip to content

Commit aaecaf5

Browse files
Optimize Bulk Message Parsing and Message Length Parsing (#39634) (#39730)
* Optimize Bulk Message Parsing and Message Length Parsing * findNextMarker took almost 1ms per invocation during the PMC rally track * Fixed to be about an order of magnitude faster by using Netty's bulk `ByteBuf` search * It is unnecessary to instantiate an object (the input stream wrapper) and throw it away, just to read the `int` length from the message bytes * Fixed by adding bulk `int` read to BytesReference
1 parent 4a3fa5a commit aaecaf5

File tree

8 files changed

+105
-24
lines changed

8 files changed

+105
-24
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@ public byte get(int index) {
4545
return buffer.getByte(offset + index);
4646
}
4747

48+
@Override
49+
public int getInt(int index) {
50+
return buffer.getInt(offset + index);
51+
}
52+
53+
@Override
54+
public int indexOf(byte marker, int from) {
55+
final int start = offset + from;
56+
return buffer.forEachByte(start, length - start, value -> value != marker);
57+
}
58+
4859
@Override
4960
public int length() {
5061
return length;

plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,17 @@ public byte get(int index) {
9090
return buffer.getByte(offset + index);
9191
}
9292

93+
@Override
94+
public int getInt(int index) {
95+
return buffer.getInt(offset + index);
96+
}
97+
98+
@Override
99+
public int indexOf(byte marker, int from) {
100+
final int start = offset + from;
101+
return buffer.forEachByte(start, length - start, value -> value != marker);
102+
}
103+
93104
@Override
94105
public int length() {
95106
return length;

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -364,11 +364,10 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
364364
XContent xContent = xContentType.xContent();
365365
int line = 0;
366366
int from = 0;
367-
int length = data.length();
368367
byte marker = xContent.streamSeparator();
369368
boolean typesDeprecationLogged = false;
370369
while (true) {
371-
int nextMarker = findNextMarker(marker, from, data, length);
370+
int nextMarker = findNextMarker(marker, from, data);
372371
if (nextMarker == -1) {
373372
break;
374373
}
@@ -477,7 +476,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
477476
add(new DeleteRequest(index, type, id).routing(routing)
478477
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload);
479478
} else {
480-
nextMarker = findNextMarker(marker, from, data, length);
479+
nextMarker = findNextMarker(marker, from, data);
481480
if (nextMarker == -1) {
482481
break;
483482
}
@@ -615,16 +614,16 @@ public String routing() {
615614
return globalRouting;
616615
}
617616

618-
private int findNextMarker(byte marker, int from, BytesReference data, int length) {
619-
for (int i = from; i < length; i++) {
620-
if (data.get(i) == marker) {
621-
return i;
622-
}
617+
private static int findNextMarker(byte marker, int from, BytesReference data) {
618+
final int res = data.indexOf(marker, from);
619+
if (res != -1) {
620+
assert res >= 0;
621+
return res;
623622
}
624-
if (from != length) {
623+
if (from != data.length()) {
625624
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\n]");
626625
}
627-
return -1;
626+
return res;
628627
}
629628

630629
@Override

server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,9 @@ public static void readMultiLineFormat(BytesReference data,
177177
NamedXContentRegistry registry,
178178
boolean allowExplicitIndex) throws IOException {
179179
int from = 0;
180-
int length = data.length();
181180
byte marker = xContent.streamSeparator();
182181
while (true) {
183-
int nextMarker = findNextMarker(marker, from, data, length);
182+
int nextMarker = findNextMarker(marker, from, data);
184183
if (nextMarker == -1) {
185184
break;
186185
}
@@ -261,7 +260,7 @@ public static void readMultiLineFormat(BytesReference data,
261260
// move pointers
262261
from = nextMarker + 1;
263262
// now for the body
264-
nextMarker = findNextMarker(marker, from, data, length);
263+
nextMarker = findNextMarker(marker, from, data);
265264
if (nextMarker == -1) {
266265
break;
267266
}
@@ -275,13 +274,13 @@ public static void readMultiLineFormat(BytesReference data,
275274
}
276275
}
277276

278-
private static int findNextMarker(byte marker, int from, BytesReference data, int length) {
279-
for (int i = from; i < length; i++) {
280-
if (data.get(i) == marker) {
281-
return i;
282-
}
277+
private static int findNextMarker(byte marker, int from, BytesReference data) {
278+
final int res = data.indexOf(marker, from);
279+
if (res != -1) {
280+
assert res >= 0;
281+
return res;
283282
}
284-
if (from != length) {
283+
if (from != data.length()) {
285284
throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]");
286285
}
287286
return -1;

server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ public byte get(int index) {
4646
return buffer.get(index);
4747
}
4848

49+
@Override
50+
public int getInt(int index) {
51+
return buffer.getInt(index);
52+
}
53+
4954
@Override
5055
public int length() {
5156
return length;

server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,29 @@ public static BytesReference bytes(XContentBuilder xContentBuilder) {
6060
*/
6161
public abstract byte get(int index);
6262

63+
/**
64+
* Returns the integer read from the 4 bytes (BE) starting at the given index.
65+
*/
66+
public int getInt(int index) {
67+
return (get(index) & 0xFF) << 24 | (get(index + 1) & 0xFF) << 16 | (get(index + 2) & 0xFF) << 8 | get(index + 3) & 0xFF;
68+
}
69+
70+
/**
71+
* Finds the index of the first occurrence of the given marker between within the given bounds.
72+
* @param marker marker byte to search
73+
* @param from lower bound for the index to check (inclusive)
74+
* @return first index of the marker or {@code -1} if not found
75+
*/
76+
public int indexOf(byte marker, int from) {
77+
final int to = length();
78+
for (int i = from; i < to; i++) {
79+
if (get(i) == marker) {
80+
return i;
81+
}
82+
}
83+
return -1;
84+
}
85+
6386
/**
6487
* The length.
6588
*/

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -840,11 +840,7 @@ private static int readHeaderBuffer(BytesReference headerBuffer) throws IOExcept
840840
+ Integer.toHexString(headerBuffer.get(2) & 0xFF) + ","
841841
+ Integer.toHexString(headerBuffer.get(3) & 0xFF) + ")");
842842
}
843-
final int messageLength;
844-
try (StreamInput input = headerBuffer.streamInput()) {
845-
input.skip(TcpHeader.MARKER_BYTES_SIZE);
846-
messageLength = input.readInt();
847-
}
843+
final int messageLength = headerBuffer.getInt(TcpHeader.MARKER_BYTES_SIZE);
848844

849845
if (messageLength == TransportKeepAlive.PING_DATA_SIZE) {
850846
// This is a ping

test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,14 @@
3434

3535
import java.io.EOFException;
3636
import java.io.IOException;
37+
import java.nio.ByteBuffer;
38+
import java.nio.ByteOrder;
39+
import java.nio.IntBuffer;
40+
import java.util.ArrayList;
3741
import java.util.Arrays;
42+
import java.util.HashMap;
43+
import java.util.List;
44+
import java.util.Map;
3845

3946
public abstract class AbstractBytesReferenceTestCase extends ESTestCase {
4047

@@ -648,4 +655,34 @@ public void testBasicEquals() {
648655
assertNotEquals(b1, b2);
649656
}
650657
}
658+
659+
public void testGetInt() throws IOException {
660+
final int count = randomIntBetween(1, 10);
661+
final BytesReference bytesReference = newBytesReference(count * Integer.BYTES);
662+
final BytesRef bytesRef = bytesReference.toBytesRef();
663+
final IntBuffer intBuffer =
664+
ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length).order(ByteOrder.BIG_ENDIAN).asIntBuffer();
665+
for (int i = 0; i < count; ++i) {
666+
assertEquals(intBuffer.get(i), bytesReference.getInt(i * Integer.BYTES));
667+
}
668+
}
669+
670+
public void testIndexOf() throws IOException {
671+
final int size = randomIntBetween(0, 100);
672+
final BytesReference bytesReference = newBytesReference(size);
673+
final Map<Byte, List<Integer>> map = new HashMap<>();
674+
for (int i = 0; i < size; ++i) {
675+
final byte value = bytesReference.get(i);
676+
map.computeIfAbsent(value, v -> new ArrayList<>()).add(i);
677+
}
678+
map.forEach((value, positions) -> {
679+
for (int i = 0; i < positions.size(); i++) {
680+
final int pos = positions.get(i);
681+
final int from = i == 0 ? randomIntBetween(0, pos) : positions.get(i - 1) + 1;
682+
assertEquals(bytesReference.indexOf(value, from), pos);
683+
}
684+
});
685+
final byte missing = randomValueOtherThanMany(map::containsKey, ESTestCase::randomByte);
686+
assertEquals(-1, bytesReference.indexOf(missing, randomIntBetween(0, Math.max(0, size - 1))));
687+
}
651688
}

0 commit comments

Comments
 (0)