Skip to content

Commit dfc4539

Browse files
authored
Speed up writeVInt (elastic#62345)
This speeds up `StreamOutput#writeVInt` quite a bit which is nice because it is *very* commonly called when serializing aggregations. Well, when serializing anything. All "collections" serialize their size as a vint. Anyway, I was examining the serialization speeds of `StringTerms` and this saves about 30% of the write time for that. I expect it'll be useful other places.
1 parent 86660b0 commit dfc4539

File tree

4 files changed

+120
-5
lines changed

4 files changed

+120
-5
lines changed

benchmarks/README.md

-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ cd fcml*
7878
make
7979
cd example/hsdis
8080
make
81-
cp .libs/libhsdis.so.0.0.0
8281
sudo cp .libs/libhsdis.so.0.0.0 /usr/lib/jvm/java-14-adoptopenjdk/lib/hsdis-amd64.so
8382
```
8483

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.benchmark.search.aggregations.bucket.terms;
21+
22+
import org.apache.lucene.util.BytesRef;
23+
import org.elasticsearch.common.io.stream.DelayableWriteable;
24+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
25+
import org.elasticsearch.search.DocValueFormat;
26+
import org.elasticsearch.search.aggregations.BucketOrder;
27+
import org.elasticsearch.search.aggregations.InternalAggregation;
28+
import org.elasticsearch.search.aggregations.InternalAggregations;
29+
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
30+
import org.openjdk.jmh.annotations.Benchmark;
31+
import org.openjdk.jmh.annotations.BenchmarkMode;
32+
import org.openjdk.jmh.annotations.Fork;
33+
import org.openjdk.jmh.annotations.Measurement;
34+
import org.openjdk.jmh.annotations.Mode;
35+
import org.openjdk.jmh.annotations.OutputTimeUnit;
36+
import org.openjdk.jmh.annotations.Param;
37+
import org.openjdk.jmh.annotations.Scope;
38+
import org.openjdk.jmh.annotations.Setup;
39+
import org.openjdk.jmh.annotations.State;
40+
import org.openjdk.jmh.annotations.Warmup;
41+
42+
import java.util.ArrayList;
43+
import java.util.List;
44+
import java.util.concurrent.TimeUnit;
45+
46+
@Fork(2)
47+
@Warmup(iterations = 10)
48+
@Measurement(iterations = 5)
49+
@BenchmarkMode(Mode.AverageTime)
50+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
51+
@State(Scope.Benchmark)
52+
public class StringTermsSerializationBenchmark {
53+
private static final NamedWriteableRegistry REGISTRY = new NamedWriteableRegistry(
54+
List.of(new NamedWriteableRegistry.Entry(InternalAggregation.class, StringTerms.NAME, StringTerms::new))
55+
);
56+
@Param(value = { "1000" })
57+
private int buckets;
58+
59+
private DelayableWriteable<InternalAggregations> results;
60+
61+
@Setup
62+
public void initResults() {
63+
results = DelayableWriteable.referencing(InternalAggregations.from(List.of(newTerms(true))));
64+
}
65+
66+
private StringTerms newTerms(boolean withNested) {
67+
List<StringTerms.Bucket> resultBuckets = new ArrayList<>(buckets);
68+
for (int i = 0; i < buckets; i++) {
69+
InternalAggregations inner = withNested ? InternalAggregations.from(List.of(newTerms(false))) : InternalAggregations.EMPTY;
70+
resultBuckets.add(new StringTerms.Bucket(new BytesRef("test" + i), i, inner, false, 0, DocValueFormat.RAW));
71+
}
72+
return new StringTerms(
73+
"test",
74+
BucketOrder.key(true),
75+
BucketOrder.key(true),
76+
buckets,
77+
1,
78+
null,
79+
DocValueFormat.RAW,
80+
buckets,
81+
false,
82+
100000,
83+
resultBuckets,
84+
0
85+
);
86+
}
87+
88+
@Benchmark
89+
public DelayableWriteable<InternalAggregations> serialize() {
90+
return results.asSerialized(InternalAggregations::readFrom, REGISTRY);
91+
}
92+
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,26 @@ public void writeInt(int i) throws IOException {
218218
* using {@link #writeInt}
219219
*/
220220
public void writeVInt(int i) throws IOException {
221-
final byte[] buffer = scratch.get();
221+
/*
222+
* Shortcut writing single byte because it is very, very common and
223+
* can skip grabbing the scratch buffer. This is marginally slower
224+
* than hand unrolling the entire encoding loop but hand unrolling
225+
* the encoding loop blows out the method size so it can't be inlined.
226+
* In that case benchmarks of the method itself are faster but
227+
* benchmarks of methods that use this method are slower.
228+
* This is philosophically in line with vint in general - it biases
229+
* twoards being simple and fast for smaller numbers.
230+
*/
231+
if (Integer.numberOfLeadingZeros(i) >= 25) {
232+
writeByte((byte) i);
233+
return;
234+
}
235+
byte[] buffer = scratch.get();
222236
int index = 0;
223-
while ((i & ~0x7F) != 0) {
237+
do {
224238
buffer[index++] = ((byte) ((i & 0x7f) | 0x80));
225239
i >>>= 7;
226-
}
240+
} while ((i & ~0x7F) != 0);
227241
buffer[index++] = ((byte) i);
228242
writeBytes(buffer, 0, index);
229243
}

server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
import static org.hamcrest.Matchers.sameInstance;
6262

6363
/**
64-
* Tests for {@link BytesStreamOutput} paging behaviour.
64+
* Tests for {@link StreamOutput}.
6565
*/
6666
public class BytesStreamsTests extends ESTestCase {
6767
public void testEmpty() throws Exception {
@@ -827,6 +827,16 @@ public void testVInt() throws IOException {
827827
final int value = randomInt();
828828
BytesStreamOutput output = new BytesStreamOutput();
829829
output.writeVInt(value);
830+
831+
BytesStreamOutput simple = new BytesStreamOutput();
832+
int i = value;
833+
while ((i & ~0x7F) != 0) {
834+
simple.writeByte(((byte) ((i & 0x7f) | 0x80)));
835+
i >>>= 7;
836+
}
837+
simple.writeByte((byte) i);
838+
assertEquals(simple.bytes().toBytesRef().toString(), output.bytes().toBytesRef().toString());
839+
830840
StreamInput input = output.bytes().streamInput();
831841
assertEquals(value, input.readVInt());
832842
}

0 commit comments

Comments
 (0)