Skip to content

Commit 847ff91

Browse files
committed
Introduce resizable inbound byte buffer (#27551)
This is related to #27563. In order to interface with java nio, we must have buffers that are compatible with ByteBuffer. This commit introduces a basic ByteBufferReference to easily allow transferring bytes off the wire to usage in the application. Additionally it introduces an InboundChannelBuffer. This is a buffer that can internally expand as more space is needed. It is designed to be integrated with a page recycler so that it can internally reuse pages. The final piece is moving all of the index work for writing bytes to a channel into the WriteOperation.
1 parent f47d759 commit 847ff91

File tree

16 files changed

+601
-433
lines changed

16 files changed

+601
-433
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.bytes;
21+
22+
import org.apache.lucene.util.BytesRef;
23+
24+
import java.nio.ByteBuffer;
25+
26+
/**
27+
* This is a {@link BytesReference} backed by a {@link ByteBuffer}. The byte buffer can either be a heap or
28+
* direct byte buffer. The reference is composed of the space between the {@link ByteBuffer#position} and
29+
* {@link ByteBuffer#limit} at construction time. If the position or limit of the underlying byte buffer is
30+
* changed, those changes will not be reflected in this reference. However, modifying the limit or position
31+
* of the underlying byte buffer is not recommended as those can be used during {@link ByteBuffer#get()}
32+
* bounds checks. Use {@link ByteBuffer#duplicate()} at creation time if you plan on modifying the markers of
33+
* the underlying byte buffer. Any changes to the underlying data in the byte buffer will be reflected.
34+
*/
35+
public class ByteBufferReference extends BytesReference {
36+
37+
private final ByteBuffer buffer;
38+
private final int offset;
39+
private final int length;
40+
41+
public ByteBufferReference(ByteBuffer buffer) {
42+
this.buffer = buffer;
43+
this.offset = buffer.position();
44+
this.length = buffer.remaining();
45+
}
46+
47+
@Override
48+
public byte get(int index) {
49+
return buffer.get(index + offset);
50+
}
51+
52+
@Override
53+
public int length() {
54+
return length;
55+
}
56+
57+
@Override
58+
public BytesReference slice(int from, int length) {
59+
if (from < 0 || (from + length) > this.length) {
60+
throw new IndexOutOfBoundsException("can't slice a buffer with length [" + this.length + "], with slice parameters from ["
61+
+ from + "], length [" + length + "]");
62+
}
63+
ByteBuffer newByteBuffer = buffer.duplicate();
64+
newByteBuffer.position(offset + from);
65+
newByteBuffer.limit(offset + from + length);
66+
return new ByteBufferReference(newByteBuffer);
67+
}
68+
69+
/**
70+
* This will return a bytes ref composed of the bytes. If this is a direct byte buffer, the bytes will
71+
* have to be copied.
72+
*
73+
* @return the bytes ref
74+
*/
75+
@Override
76+
public BytesRef toBytesRef() {
77+
if (buffer.hasArray()) {
78+
return new BytesRef(buffer.array(), buffer.arrayOffset() + offset, length);
79+
}
80+
final byte[] copy = new byte[length];
81+
buffer.get(copy, offset, length);
82+
return new BytesRef(copy);
83+
}
84+
85+
@Override
86+
public long ramBytesUsed() {
87+
return buffer.capacity();
88+
}
89+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.bytes;
21+
22+
import java.io.IOException;
23+
import java.nio.ByteBuffer;
24+
25+
public class ByteBufferReferenceTests extends AbstractBytesReferenceTestCase {
26+
27+
private void initializeBytes(byte[] bytes) {
28+
for (int i = 0 ; i < bytes.length; ++i) {
29+
bytes[i] = (byte) i;
30+
}
31+
}
32+
33+
@Override
34+
protected BytesReference newBytesReference(int length) throws IOException {
35+
return newBytesReferenceWithOffsetOfZero(length);
36+
}
37+
38+
@Override
39+
protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException {
40+
byte[] bytes = new byte[length];
41+
initializeBytes(bytes);
42+
return new ByteBufferReference(ByteBuffer.wrap(bytes));
43+
}
44+
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport.nio;
21+
22+
import java.nio.ByteBuffer;
23+
import java.util.ArrayDeque;
24+
import java.util.Iterator;
25+
import java.util.function.Supplier;
26+
27+
/**
28+
* This is a channel byte buffer composed internally of 16kb pages. When an entire message has been read
29+
* and consumed, the {@link #release(long)} method releases the bytes from the head of the buffer and closes
30+
* the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can
31+
* be called and the buffer will expand using the supplier provided.
32+
*/
33+
public final class InboundChannelBuffer {
34+
35+
private static final int PAGE_SIZE = 1 << 14;
36+
private static final int PAGE_MASK = PAGE_SIZE - 1;
37+
private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
38+
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
39+
40+
41+
private final ArrayDeque<ByteBuffer> pages;
42+
private final Supplier<ByteBuffer> pageSupplier;
43+
44+
private long capacity = 0;
45+
private long internalIndex = 0;
46+
// The offset is an int as it is the offset of where the bytes begin in the first buffer
47+
private int offset = 0;
48+
49+
public InboundChannelBuffer() {
50+
this(() -> ByteBuffer.wrap(new byte[PAGE_SIZE]));
51+
}
52+
53+
private InboundChannelBuffer(Supplier<ByteBuffer> pageSupplier) {
54+
this.pageSupplier = pageSupplier;
55+
this.pages = new ArrayDeque<>();
56+
this.capacity = PAGE_SIZE * pages.size();
57+
ensureCapacity(PAGE_SIZE);
58+
}
59+
60+
public void ensureCapacity(long requiredCapacity) {
61+
if (capacity < requiredCapacity) {
62+
int numPages = numPages(requiredCapacity + offset);
63+
int pagesToAdd = numPages - pages.size();
64+
for (int i = 0; i < pagesToAdd; i++) {
65+
pages.addLast(pageSupplier.get());
66+
}
67+
capacity += pagesToAdd * PAGE_SIZE;
68+
}
69+
}
70+
71+
/**
72+
* This method will release bytes from the head of this buffer. If you release bytes past the current
73+
* index the index is truncated to zero.
74+
*
75+
* @param bytesToRelease number of bytes to drop
76+
*/
77+
public void release(long bytesToRelease) {
78+
if (bytesToRelease > capacity) {
79+
throw new IllegalArgumentException("Releasing more bytes [" + bytesToRelease + "] than buffer capacity [" + capacity + "].");
80+
}
81+
82+
int pagesToRelease = pageIndex(offset + bytesToRelease);
83+
for (int i = 0; i < pagesToRelease; i++) {
84+
pages.removeFirst();
85+
}
86+
capacity -= bytesToRelease;
87+
internalIndex = Math.max(internalIndex - bytesToRelease, 0);
88+
offset = indexInPage(bytesToRelease + offset);
89+
}
90+
91+
/**
92+
* This method will return an array of {@link ByteBuffer} representing the bytes from the beginning of
93+
* this buffer up through the index argument that was passed. The buffers will be duplicates of the
94+
* internal buffers, so any modifications to the markers {@link ByteBuffer#position()},
95+
* {@link ByteBuffer#limit()}, etc will not modify the this class.
96+
*
97+
* @param to the index to slice up to
98+
* @return the byte buffers
99+
*/
100+
public ByteBuffer[] sliceBuffersTo(long to) {
101+
if (to > capacity) {
102+
throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity +
103+
"], with slice parameters to [" + to + "]");
104+
} else if (to == 0) {
105+
return EMPTY_BYTE_BUFFER_ARRAY;
106+
}
107+
long indexWithOffset = to + offset;
108+
int pageCount = pageIndex(indexWithOffset);
109+
int finalLimit = indexInPage(indexWithOffset);
110+
if (finalLimit != 0) {
111+
pageCount += 1;
112+
}
113+
114+
ByteBuffer[] buffers = new ByteBuffer[pageCount];
115+
Iterator<ByteBuffer> pageIterator = pages.iterator();
116+
ByteBuffer firstBuffer = pageIterator.next().duplicate();
117+
firstBuffer.position(firstBuffer.position() + offset);
118+
buffers[0] = firstBuffer;
119+
for (int i = 1; i < buffers.length; i++) {
120+
buffers[i] = pageIterator.next().duplicate();
121+
}
122+
if (finalLimit != 0) {
123+
buffers[buffers.length - 1].limit(finalLimit);
124+
}
125+
126+
return buffers;
127+
}
128+
129+
/**
130+
* This method will return an array of {@link ByteBuffer} representing the bytes from the index passed
131+
* through the end of this buffer. The buffers will be duplicates of the internal buffers, so any
132+
* modifications to the markers {@link ByteBuffer#position()}, {@link ByteBuffer#limit()}, etc will not
133+
* modify the this class.
134+
*
135+
* @param from the index to slice from
136+
* @return the byte buffers
137+
*/
138+
public ByteBuffer[] sliceBuffersFrom(long from) {
139+
if (from > capacity) {
140+
throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity +
141+
"], with slice parameters from [" + from + "]");
142+
} else if (from == capacity) {
143+
return EMPTY_BYTE_BUFFER_ARRAY;
144+
}
145+
long indexWithOffset = from + offset;
146+
147+
int pageIndex = pageIndex(indexWithOffset);
148+
int indexInPage = indexInPage(indexWithOffset);
149+
150+
ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex];
151+
Iterator<ByteBuffer> pageIterator = pages.descendingIterator();
152+
for (int i = buffers.length - 1; i > 0; --i) {
153+
buffers[i] = pageIterator.next().duplicate();
154+
}
155+
ByteBuffer firstPostIndexBuffer = pageIterator.next().duplicate();
156+
firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage);
157+
buffers[0] = firstPostIndexBuffer;
158+
159+
return buffers;
160+
}
161+
162+
public void incrementIndex(long delta) {
163+
if (delta < 0) {
164+
throw new IllegalArgumentException("Cannot increment an index with a negative delta [" + delta + "]");
165+
}
166+
167+
long newIndex = delta + internalIndex;
168+
if (newIndex > capacity) {
169+
throw new IllegalArgumentException("Cannot increment an index [" + internalIndex + "] with a delta [" + delta +
170+
"] that will result in a new index [" + newIndex + "] that is greater than the capacity [" + capacity + "].");
171+
}
172+
internalIndex = newIndex;
173+
}
174+
175+
public long getIndex() {
176+
return internalIndex;
177+
}
178+
179+
public long getCapacity() {
180+
return capacity;
181+
}
182+
183+
public long getRemaining() {
184+
long remaining = capacity - internalIndex;
185+
assert remaining >= 0 : "The remaining [" + remaining + "] number of bytes should not be less than zero.";
186+
return remaining;
187+
}
188+
189+
private int numPages(long capacity) {
190+
final long numPages = (capacity + PAGE_MASK) >>> PAGE_SHIFT;
191+
if (numPages > Integer.MAX_VALUE) {
192+
throw new IllegalArgumentException("pageSize=" + (PAGE_MASK + 1) + " is too small for such as capacity: " + capacity);
193+
}
194+
return (int) numPages;
195+
}
196+
197+
private int pageIndex(long index) {
198+
return (int) (index >>> PAGE_SHIFT);
199+
}
200+
201+
private int indexInPage(long index) {
202+
return (int) (index & PAGE_MASK);
203+
}
204+
}

0 commit comments

Comments
 (0)