Skip to content

Commit 2a33538

Browse files
authored
Big arrays sliced from netty buffers (long) (#91641)
Based on #90745 but for longs. This should allow aggregations down the road to read long values directly from netty buffer, rather than copying it from the netty buffer. Relates to #89437
1 parent 60d1e1f commit 2a33538

File tree

13 files changed

+217
-31
lines changed

13 files changed

+217
-31
lines changed

server/src/main/java/org/elasticsearch/common/bytes/BytesArray.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ public int getIntLE(int index) {
137137
return ByteUtils.readIntLE(bytes, offset + index);
138138
}
139139

140+
@Override
141+
public long getLongLE(int index) {
142+
return ByteUtils.readLongLE(bytes, offset + index);
143+
}
144+
140145
@Override
141146
public double getDoubleLE(int index) {
142147
return ByteUtils.readDoubleLE(bytes, offset + index);

server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,18 @@ public int getIntLE(int index) {
238238
return super.getIntLE(index);
239239
}
240240

241+
@Override
242+
public long getLongLE(int index) {
243+
int i = getOffsetIndex(index);
244+
int idx = index - offsets[i];
245+
int end = idx + 8;
246+
BytesReference wholeLongsLivesHere = references[i];
247+
if (end <= wholeLongsLivesHere.length()) {
248+
return wholeLongsLivesHere.getLongLE(idx);
249+
}
250+
return super.getLongLE(index);
251+
}
252+
241253
@Override
242254
public double getDoubleLE(int index) {
243255
int i = getOffsetIndex(index);

server/src/main/java/org/elasticsearch/common/util/BigArrays.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,13 @@ public void set(long index, byte[] buf, int offset, int len) {
261261
assert index >= 0 && index < size();
262262
System.arraycopy(buf, offset << 3, array, (int) index << 3, len << 3);
263263
}
264+
265+
@Override
266+
public void writeTo(StreamOutput out) throws IOException {
267+
int size = Math.toIntExact(size()) * Long.BYTES;
268+
out.writeVInt(size);
269+
out.write(array, 0, size);
270+
}
264271
}
265272

266273
private static class ByteArrayAsDoubleArrayWrapper extends AbstractArrayWrapper implements DoubleArray {

server/src/main/java/org/elasticsearch/common/util/BigDoubleArray.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.nio.ByteOrder;
1919
import java.util.Arrays;
2020

21+
import static org.elasticsearch.common.util.BigLongArray.writePages;
2122
import static org.elasticsearch.common.util.PageCacheRecycler.DOUBLE_PAGE_SIZE;
2223

2324
/**
@@ -128,18 +129,6 @@ public void set(long index, byte[] buf, int offset, int len) {
128129

129130
@Override
130131
public void writeTo(StreamOutput out) throws IOException {
131-
int size = (int) this.size;
132-
out.writeVInt(size * Double.BYTES);
133-
int lastPageEnd = size % DOUBLE_PAGE_SIZE;
134-
if (lastPageEnd == 0) {
135-
for (byte[] page : pages) {
136-
out.write(page);
137-
}
138-
return;
139-
}
140-
for (int i = 0; i < pages.length - 1; i++) {
141-
out.write(pages[i]);
142-
}
143-
out.write(pages[pages.length - 1], 0, lastPageEnd * Double.BYTES);
132+
writePages(out, Math.toIntExact(size), pages, Double.BYTES, DOUBLE_PAGE_SIZE);
144133
}
145134
}

server/src/main/java/org/elasticsearch/common/util/BigIntArray.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.nio.ByteOrder;
1919
import java.util.Arrays;
2020

21+
import static org.elasticsearch.common.util.BigLongArray.writePages;
2122
import static org.elasticsearch.common.util.PageCacheRecycler.INT_PAGE_SIZE;
2223

2324
/**
@@ -43,22 +44,7 @@ final class BigIntArray extends AbstractBigArray implements IntArray {
4344

4445
@Override
4546
public void writeTo(StreamOutput out) throws IOException {
46-
if (size > Integer.MAX_VALUE / Integer.BYTES) {
47-
throw new IllegalArgumentException();
48-
}
49-
int intSize = (int) size;
50-
out.writeVInt(intSize * Integer.BYTES);
51-
int lastPageEnd = intSize % INT_PAGE_SIZE;
52-
if (lastPageEnd == 0) {
53-
for (byte[] page : pages) {
54-
out.write(page);
55-
}
56-
return;
57-
}
58-
for (int i = 0; i < pages.length - 1; i++) {
59-
out.write(pages[i]);
60-
}
61-
out.write(pages[pages.length - 1], 0, lastPageEnd * Integer.BYTES);
47+
writePages(out, (int) size, pages, Integer.BYTES, INT_PAGE_SIZE);
6248
}
6349

6450
@Override

server/src/main/java/org/elasticsearch/common/util/BigLongArray.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010

1111
import org.apache.lucene.util.ArrayUtil;
1212
import org.apache.lucene.util.RamUsageEstimator;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
1314

15+
import java.io.IOException;
1416
import java.lang.invoke.MethodHandles;
1517
import java.lang.invoke.VarHandle;
1618
import java.nio.ByteOrder;
@@ -126,4 +128,24 @@ public static long estimateRamBytes(final long size) {
126128
public void set(long index, byte[] buf, int offset, int len) {
127129
set(index, buf, offset, len, pages, 3);
128130
}
131+
132+
@Override
133+
public void writeTo(StreamOutput out) throws IOException {
134+
writePages(out, Math.toIntExact(size), pages, Long.BYTES, LONG_PAGE_SIZE);
135+
}
136+
137+
static void writePages(StreamOutput out, int size, byte[][] pages, int bytesPerValue, int pageSize) throws IOException {
138+
out.writeVInt(size * bytesPerValue);
139+
int lastPageEnd = size % pageSize;
140+
if (lastPageEnd == 0) {
141+
for (byte[] page : pages) {
142+
out.write(page);
143+
}
144+
return;
145+
}
146+
for (int i = 0; i < pages.length - 1; i++) {
147+
out.write(pages[i]);
148+
}
149+
out.write(pages[pages.length - 1], 0, lastPageEnd * bytesPerValue);
150+
}
129151
}

server/src/main/java/org/elasticsearch/common/util/LongArray.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,19 @@
88

99
package org.elasticsearch.common.util;
1010

11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.Writeable;
13+
14+
import java.io.IOException;
15+
1116
/**
1217
* Abstraction of an array of long values.
1318
*/
14-
public interface LongArray extends BigArray {
19+
public interface LongArray extends BigArray, Writeable {
20+
21+
static LongArray readFrom(StreamInput in) throws IOException {
22+
return new ReleasableLongArray(in);
23+
}
1524

1625
/**
1726
* Get an element given its index.
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.common.util;
10+
11+
import org.apache.lucene.util.RamUsageEstimator;
12+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
16+
import java.io.IOException;
17+
18+
public class ReleasableLongArray implements LongArray {
19+
20+
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ReleasableLongArray.class);
21+
22+
private final ReleasableBytesReference ref;
23+
24+
ReleasableLongArray(StreamInput in) throws IOException {
25+
this.ref = in.readReleasableBytesReference();
26+
}
27+
28+
@Override
29+
public void writeTo(StreamOutput out) throws IOException {
30+
out.writeBytesReference(ref);
31+
}
32+
33+
@Override
34+
public long size() {
35+
return ref.length() / Long.BYTES;
36+
}
37+
38+
@Override
39+
public long get(long index) {
40+
if (index > Integer.MAX_VALUE / Long.BYTES) {
41+
// We can't serialize messages longer than 2gb anyway
42+
throw new ArrayIndexOutOfBoundsException();
43+
}
44+
return ref.getLongLE((int) index * Long.BYTES);
45+
}
46+
47+
@Override
48+
public long set(long index, long value) {
49+
throw new UnsupportedOperationException();
50+
}
51+
52+
@Override
53+
public long increment(long index, long inc) {
54+
throw new UnsupportedOperationException();
55+
}
56+
57+
@Override
58+
public void fill(long fromIndex, long toIndex, long value) {
59+
throw new UnsupportedOperationException();
60+
}
61+
62+
@Override
63+
public void set(long index, byte[] buf, int offset, int len) {
64+
throw new UnsupportedOperationException();
65+
}
66+
67+
@Override
68+
public long ramBytesUsed() {
69+
/*
70+
* If we return the size of the buffer that we've sliced
71+
* we're likely to double count things.
72+
*/
73+
return SHALLOW_SIZE;
74+
}
75+
76+
@Override
77+
public void close() {
78+
ref.decRef();
79+
}
80+
}

server/src/test/java/org/elasticsearch/common/bytes/BytesArrayTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void testGetLongLE() {
8383
assertThat(ref.getLongLE(0), equalTo(888L));
8484
assertThat(ref.getLongLE(8), equalTo(Long.MAX_VALUE));
8585
Exception e = expectThrows(ArrayIndexOutOfBoundsException.class, () -> ref.getLongLE(9));
86-
assertThat(e.getMessage(), equalTo("Index 16 out of bounds for length 16"));
86+
assertThat(e.getMessage(), equalTo("Index 9 out of bounds for length 9"));
8787
}
8888

8989
public void testGetDoubleLE() {

server/src/test/java/org/elasticsearch/common/bytes/CompositeBytesReferenceTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.Randomness;
1414
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1515
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
16+
import org.elasticsearch.common.util.ByteUtils;
1617
import org.hamcrest.Matchers;
1718

1819
import java.io.IOException;
@@ -190,4 +191,29 @@ public void testGetDoubleLE() {
190191
// We can assert the exception message if -XX:-OmitStackTraceInFastThrow is set in gradle test task.
191192
expectThrows(IndexOutOfBoundsException.class, () -> comp.getDoubleLE(17));
192193
}
194+
195+
public void testGetLongLE() {
196+
// first long = 2, second long = 44, third long = 512
197+
// tag::noformat
198+
byte[] data = new byte[] {
199+
0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
200+
0x2C, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
201+
0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0};
202+
// end::noformat
203+
204+
byte[] d = new byte[8];
205+
ByteUtils.writeLongLE(2, d, 0);
206+
207+
List<BytesReference> refs = new ArrayList<>();
208+
int bytesPerChunk = randomFrom(4, 16);
209+
for (int offset = 0; offset < data.length; offset += bytesPerChunk) {
210+
int length = Math.min(bytesPerChunk, data.length - offset);
211+
refs.add(new BytesArray(data, offset, length));
212+
}
213+
BytesReference comp = CompositeBytesReference.of(refs.toArray(BytesReference[]::new));
214+
assertThat(comp.getLongLE(0), equalTo(2L));
215+
assertThat(comp.getLongLE(8), equalTo(44L));
216+
assertThat(comp.getLongLE(16), equalTo(512L));
217+
expectThrows(IndexOutOfBoundsException.class, () -> comp.getLongLE(17));
218+
}
193219
}

server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceStreamInputTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.util.BigArrays;
1515
import org.elasticsearch.common.util.DoubleArray;
1616
import org.elasticsearch.common.util.IntArray;
17+
import org.elasticsearch.common.util.LongArray;
1718
import org.junit.After;
1819

1920
import java.io.IOException;
@@ -100,4 +101,23 @@ public void testBigDoubleArrayLivesAfterReleasableIsDecremented() throws IOExcep
100101
assertThat(ref.hasReferences(), equalTo(false));
101102
}
102103

104+
public void testBigLongArrayLivesAfterReleasableIsDecremented() throws IOException {
105+
LongArray testData = BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, false);
106+
testData.set(0, 1);
107+
108+
BytesStreamOutput out = new BytesStreamOutput();
109+
testData.writeTo(out);
110+
111+
ReleasableBytesReference ref = ReleasableBytesReference.wrap(out.bytes());
112+
113+
try (LongArray in = LongArray.readFrom(ref.streamInput())) {
114+
ref.decRef();
115+
assertThat(ref.hasReferences(), equalTo(true));
116+
117+
assertThat(in.size(), equalTo(testData.size()));
118+
assertThat(in.get(0), equalTo(1L));
119+
}
120+
assertThat(ref.hasReferences(), equalTo(false));
121+
}
122+
103123
}

server/src/test/java/org/elasticsearch/common/io/stream/AbstractStreamTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.util.BigArrays;
1717
import org.elasticsearch.common.util.DoubleArray;
1818
import org.elasticsearch.common.util.IntArray;
19+
import org.elasticsearch.common.util.LongArray;
1920
import org.elasticsearch.common.util.PageCacheRecycler;
2021
import org.elasticsearch.common.util.set.Sets;
2122
import org.elasticsearch.core.CheckedConsumer;
@@ -271,6 +272,31 @@ private void assertBigDoubleArray(int size) throws IOException {
271272
}
272273
}
273274

275+
public void testSmallBigLongArray() throws IOException {
276+
assertBigLongArray(between(0, PageCacheRecycler.DOUBLE_PAGE_SIZE));
277+
}
278+
279+
public void testLargeBigLongArray() throws IOException {
280+
assertBigLongArray(between(PageCacheRecycler.DOUBLE_PAGE_SIZE, 10000));
281+
}
282+
283+
private void assertBigLongArray(int size) throws IOException {
284+
LongArray testData = BigArrays.NON_RECYCLING_INSTANCE.newLongArray(size, false);
285+
for (int i = 0; i < size; i++) {
286+
testData.set(i, randomLong());
287+
}
288+
289+
BytesStreamOutput out = new BytesStreamOutput();
290+
testData.writeTo(out);
291+
292+
try (LongArray in = LongArray.readFrom(getStreamInput(out.bytes()))) {
293+
assertThat(in.size(), equalTo(testData.size()));
294+
for (int i = 0; i < size; i++) {
295+
assertThat(in.get(i), equalTo(testData.get(i)));
296+
}
297+
}
298+
}
299+
274300
public void testCollection() throws IOException {
275301
class FooBar implements Writeable {
276302

test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,10 @@ public Collection<Accountable> getChildResources() {
513513
return Collections.singleton(Accountables.namedAccountable("delegate", in));
514514
}
515515

516+
@Override
517+
public void writeTo(StreamOutput out) throws IOException {
518+
in.writeTo(out);
519+
}
516520
}
517521

518522
private class FloatArrayWrapper extends AbstractArrayWrapper implements FloatArray {

0 commit comments

Comments
 (0)