Skip to content

Commit 44ccf67

Browse files
authored
Simplify TcpTransport interface by reducing send code to a single send method (#19223)
Due to some optimization on the netty layer we had quite some code / cruft added to the TcpTransport to allow for those optimizations. After cleaning up BytesReference we can now move this optimization into TcpTransport and have a simple send method on the implementation layer instead. This commit adds a CompositeBytesReference that also allows message headers to be written separately which simplify the header code as well since no skips are needed anymore.
1 parent a00a54e commit 44ccf67

File tree

12 files changed

+612
-381
lines changed

12 files changed

+612
-381
lines changed

core/src/main/java/org/elasticsearch/common/bytes/BytesArray.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ public final class BytesArray extends BytesReference {
3535
private final int length;
3636

3737
public BytesArray(String bytes) {
38-
BytesRef bytesRef = new BytesRef(bytes);
39-
this.bytes = bytesRef.bytes;
40-
this.offset = bytesRef.offset;
41-
this.length = bytesRef.length;
38+
this(new BytesRef(bytes));
4239
}
4340

4441
public BytesArray(BytesRef bytesRef) {
@@ -47,21 +44,15 @@ public BytesArray(BytesRef bytesRef) {
4744

4845
public BytesArray(BytesRef bytesRef, boolean deepCopy) {
4946
if (deepCopy) {
50-
BytesRef copy = BytesRef.deepCopyOf(bytesRef);
51-
bytes = copy.bytes;
52-
offset = copy.offset;
53-
length = copy.length;
54-
} else {
55-
bytes = bytesRef.bytes;
56-
offset = bytesRef.offset;
57-
length = bytesRef.length;
47+
bytesRef = BytesRef.deepCopyOf(bytesRef);
5848
}
49+
bytes = bytesRef.bytes;
50+
offset = bytesRef.offset;
51+
length = bytesRef.length;
5952
}
6053

6154
public BytesArray(byte[] bytes) {
62-
this.bytes = bytes;
63-
this.offset = 0;
64-
this.length = bytes.length;
55+
this(bytes, 0, bytes.length);
6556
}
6657

6758
public BytesArray(byte[] bytes, int offset, int length) {
@@ -105,4 +96,5 @@ public BytesRef toBytesRef() {
10596
public long ramBytesUsed() {
10697
return bytes.length;
10798
}
99+
108100
}

core/src/main/java/org/elasticsearch/common/bytes/BytesReference.java

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525

2626
import java.io.IOException;
27+
import java.io.InputStream;
2728
import java.io.OutputStream;
2829
import java.util.function.ToIntBiFunction;
2930

@@ -52,9 +53,8 @@ public abstract class BytesReference implements Accountable, Comparable<BytesRef
5253
/**
5354
* A stream input of the bytes.
5455
*/
55-
public StreamInput streamInput() {
56-
BytesRef ref = toBytesRef();
57-
return StreamInput.wrap(ref.bytes, ref.offset, ref.length);
56+
public StreamInput streamInput() throws IOException {
57+
return new MarkSupportingStreamInputWrapper(this);
5858
}
5959

6060
/**
@@ -208,4 +208,73 @@ private static void advance(final BytesRef ref, final int length) {
208208
ref.length -= length;
209209
ref.offset += length;
210210
}
211+
212+
/**
213+
* Instead of adding the complexity of {@link InputStream#reset()} etc to the actual impl
214+
* this wrapper builds it on top of the BytesReferenceStreamInput which is much simpler
215+
* that way.
216+
*/
217+
private static final class MarkSupportingStreamInputWrapper extends StreamInput {
218+
private final BytesReference reference;
219+
private BytesReferenceStreamInput input;
220+
private int mark = 0;
221+
222+
private MarkSupportingStreamInputWrapper(BytesReference reference) throws IOException {
223+
this.reference = reference;
224+
this.input = new BytesReferenceStreamInput(reference.iterator(), reference.length());
225+
}
226+
227+
@Override
228+
public byte readByte() throws IOException {
229+
return input.readByte();
230+
}
231+
232+
@Override
233+
public void readBytes(byte[] b, int offset, int len) throws IOException {
234+
input.readBytes(b, offset, len);
235+
}
236+
237+
@Override
238+
public int read(byte[] b, int off, int len) throws IOException {
239+
return input.read(b, off, len);
240+
}
241+
242+
@Override
243+
public void close() throws IOException {
244+
input.close();
245+
}
246+
247+
@Override
248+
public int read() throws IOException {
249+
return input.read();
250+
}
251+
252+
@Override
253+
public int available() throws IOException {
254+
return input.available();
255+
}
256+
257+
@Override
258+
public void reset() throws IOException {
259+
input = new BytesReferenceStreamInput(reference.iterator(), reference.length());
260+
input.skip(mark);
261+
}
262+
263+
@Override
264+
public boolean markSupported() {
265+
return true;
266+
}
267+
268+
@Override
269+
public void mark(int readLimit) {
270+
// readLimit is optional it only guarantees that the stream remembers data upto this limit but it can remember more
271+
// which we do in our case
272+
this.mark = input.getOffset();
273+
}
274+
275+
@Override
276+
public long skip(long n) throws IOException {
277+
return input.skip(n);
278+
}
279+
}
211280
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.common.bytes;
20+
21+
import org.apache.lucene.util.BytesRef;
22+
import org.apache.lucene.util.BytesRefIterator;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
25+
import java.io.EOFException;
26+
import java.io.IOException;
27+
28+
/**
29+
* A StreamInput that reads off a {@link BytesRefIterator}. This is used to provide
30+
* generic stream access to {@link BytesReference} instances without materializing the
31+
* underlying bytes reference.
32+
*/
33+
final class BytesReferenceStreamInput extends StreamInput {
34+
private final BytesRefIterator iterator;
35+
private int sliceOffset;
36+
private BytesRef slice;
37+
private final int length; // the total size of the stream
38+
private int offset; // the current position of the stream
39+
40+
public BytesReferenceStreamInput(BytesRefIterator iterator, final int length) throws IOException {
41+
this.iterator = iterator;
42+
this.slice = iterator.next();
43+
this.length = length;
44+
this.offset = 0;
45+
this.sliceOffset = 0;
46+
}
47+
48+
@Override
49+
public byte readByte() throws IOException {
50+
if (offset >= length) {
51+
throw new EOFException();
52+
}
53+
maybeNextSlice();
54+
byte b = slice.bytes[slice.offset + (sliceOffset++)];
55+
offset++;
56+
return b;
57+
}
58+
59+
private void maybeNextSlice() throws IOException {
60+
while (sliceOffset == slice.length) {
61+
slice = iterator.next();
62+
sliceOffset = 0;
63+
if (slice == null) {
64+
throw new EOFException();
65+
}
66+
}
67+
}
68+
69+
@Override
70+
public void readBytes(byte[] b, int bOffset, int len) throws IOException {
71+
if (offset + len > length) {
72+
throw new IndexOutOfBoundsException("Cannot read " + len + " bytes from stream with length " + length + " at offset " + offset);
73+
}
74+
read(b, bOffset, len);
75+
}
76+
77+
@Override
78+
public int read() throws IOException {
79+
if (offset >= length) {
80+
return -1;
81+
}
82+
return Byte.toUnsignedInt(readByte());
83+
}
84+
85+
@Override
86+
public int read(final byte[] b, final int bOffset, final int len) throws IOException {
87+
if (offset >= length) {
88+
return -1;
89+
}
90+
final int numBytesToCopy = Math.min(len, length - offset);
91+
int remaining = numBytesToCopy; // copy the full length or the remaining part
92+
int destOffset = bOffset;
93+
while (remaining > 0) {
94+
maybeNextSlice();
95+
final int currentLen = Math.min(remaining, slice.length - sliceOffset);
96+
assert currentLen > 0 : "length has to be > 0 to make progress but was: " + currentLen;
97+
System.arraycopy(slice.bytes, slice.offset + sliceOffset, b, destOffset, currentLen);
98+
destOffset += currentLen;
99+
remaining -= currentLen;
100+
sliceOffset += currentLen;
101+
offset += currentLen;
102+
assert remaining >= 0 : "remaining: " + remaining;
103+
}
104+
return numBytesToCopy;
105+
}
106+
107+
@Override
108+
public void close() throws IOException {
109+
// do nothing
110+
}
111+
112+
@Override
113+
public int available() throws IOException {
114+
return length - offset;
115+
}
116+
117+
@Override
118+
public long skip(long n) throws IOException {
119+
final int skip = (int) Math.min(Integer.MAX_VALUE, n);
120+
final int numBytesSkipped = Math.min(skip, length - offset);
121+
int remaining = numBytesSkipped;
122+
while (remaining > 0) {
123+
maybeNextSlice();
124+
int currentLen = Math.min(remaining, slice.length - (slice.offset + sliceOffset));
125+
remaining -= currentLen;
126+
sliceOffset += currentLen;
127+
offset += currentLen;
128+
assert remaining >= 0 : "remaining: " + remaining;
129+
}
130+
return numBytesSkipped;
131+
}
132+
133+
int getOffset() {
134+
return offset;
135+
}
136+
}

0 commit comments

Comments
 (0)