Skip to content

Commit 0ca375e

Browse files
authored
Remove dedicated SSL network write buffer (elastic#41283)
This is related to elastic#27260. Currently for the SSLDriver we allocate a dedicated network write buffer and encrypt the data into that buffer one buffer at a time. This requires constantly switching between encrypting and flushing. This commit adds a dedicated outbound buffer for SSL operations that will internally allocate new packet sized buffers as they are need (for writing encrypted data). This allows us to totally encrypt an operation before writing it to the network. Eventually it can be hooked up to buffer recycling.
1 parent 3567b79 commit 0ca375e

File tree

22 files changed

+480
-329
lines changed

22 files changed

+480
-329
lines changed

libs/nio/src/main/java/org/elasticsearch/nio/FlushOperation.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
public class FlushOperation {
2727

28+
private static final ByteBuffer[] EMPTY_ARRAY = new ByteBuffer[0];
29+
2830
private final BiConsumer<Void, Exception> listener;
2931
private final ByteBuffer[] buffers;
3032
private final int[] offsets;
@@ -61,19 +63,38 @@ public void incrementIndex(int delta) {
6163
}
6264

6365
public ByteBuffer[] getBuffersToWrite() {
66+
return getBuffersToWrite(length);
67+
}
68+
69+
public ByteBuffer[] getBuffersToWrite(int maxBytes) {
6470
final int index = Arrays.binarySearch(offsets, internalIndex);
65-
int offsetIndex = index < 0 ? (-(index + 1)) - 1 : index;
71+
final int offsetIndex = index < 0 ? (-(index + 1)) - 1 : index;
72+
final int finalIndex = Arrays.binarySearch(offsets, Math.min(internalIndex + maxBytes, length));
73+
final int finalOffsetIndex = finalIndex < 0 ? (-(finalIndex + 1)) - 1 : finalIndex;
6674

67-
ByteBuffer[] postIndexBuffers = new ByteBuffer[buffers.length - offsetIndex];
75+
int nBuffers = (finalOffsetIndex - offsetIndex) + 1;
6876

77+
int firstBufferPosition = internalIndex - offsets[offsetIndex];
6978
ByteBuffer firstBuffer = buffers[offsetIndex].duplicate();
70-
firstBuffer.position(internalIndex - offsets[offsetIndex]);
79+
firstBuffer.position(firstBufferPosition);
80+
if (nBuffers == 1 && firstBuffer.remaining() == 0) {
81+
return EMPTY_ARRAY;
82+
}
83+
84+
ByteBuffer[] postIndexBuffers = new ByteBuffer[nBuffers];
7185
postIndexBuffers[0] = firstBuffer;
86+
int finalOffset = offsetIndex + nBuffers;
87+
int nBytes = firstBuffer.remaining();
7288
int j = 1;
73-
for (int i = (offsetIndex + 1); i < buffers.length; ++i) {
74-
postIndexBuffers[j++] = buffers[i].duplicate();
89+
for (int i = (offsetIndex + 1); i < finalOffset; ++i) {
90+
ByteBuffer buffer = buffers[i].duplicate();
91+
nBytes += buffer.remaining();
92+
postIndexBuffers[j++] = buffer;
7593
}
7694

95+
int excessBytes = Math.max(0, nBytes - maxBytes);
96+
ByteBuffer lastBuffer = postIndexBuffers[postIndexBuffers.length - 1];
97+
lastBuffer.limit(lastBuffer.limit() - excessBytes);
7798
return postIndexBuffers;
7899
}
79100
}

libs/nio/src/main/java/org/elasticsearch/nio/FlushReadyWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class FlushReadyWrite extends FlushOperation implements WriteOperation {
2727
private final SocketChannelContext channelContext;
2828
private final ByteBuffer[] buffers;
2929

30-
FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
30+
public FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
3131
super(buffers, listener);
3232
this.channelContext = channelContext;
3333
this.buffers = buffers;

libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java

Lines changed: 6 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.nio;
2121

22-
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
2322
import org.elasticsearch.nio.utils.ExceptionsHelper;
2423

2524
import java.nio.ByteBuffer;
@@ -140,11 +139,11 @@ public ByteBuffer[] sliceBuffersTo(long to) {
140139

141140
ByteBuffer[] buffers = new ByteBuffer[pageCount];
142141
Iterator<Page> pageIterator = pages.iterator();
143-
ByteBuffer firstBuffer = pageIterator.next().byteBuffer.duplicate();
142+
ByteBuffer firstBuffer = pageIterator.next().byteBuffer().duplicate();
144143
firstBuffer.position(firstBuffer.position() + offset);
145144
buffers[0] = firstBuffer;
146145
for (int i = 1; i < buffers.length; i++) {
147-
buffers[i] = pageIterator.next().byteBuffer.duplicate();
146+
buffers[i] = pageIterator.next().byteBuffer().duplicate();
148147
}
149148
if (finalLimit != 0) {
150149
buffers[buffers.length - 1].limit(finalLimit);
@@ -180,14 +179,14 @@ public Page[] sliceAndRetainPagesTo(long to) {
180179
Page[] pages = new Page[pageCount];
181180
Iterator<Page> pageIterator = this.pages.iterator();
182181
Page firstPage = pageIterator.next().duplicate();
183-
ByteBuffer firstBuffer = firstPage.byteBuffer;
182+
ByteBuffer firstBuffer = firstPage.byteBuffer();
184183
firstBuffer.position(firstBuffer.position() + offset);
185184
pages[0] = firstPage;
186185
for (int i = 1; i < pages.length; i++) {
187186
pages[i] = pageIterator.next().duplicate();
188187
}
189188
if (finalLimit != 0) {
190-
pages[pages.length - 1].byteBuffer.limit(finalLimit);
189+
pages[pages.length - 1].byteBuffer().limit(finalLimit);
191190
}
192191

193192
return pages;
@@ -217,9 +216,9 @@ public ByteBuffer[] sliceBuffersFrom(long from) {
217216
ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex];
218217
Iterator<Page> pageIterator = pages.descendingIterator();
219218
for (int i = buffers.length - 1; i > 0; --i) {
220-
buffers[i] = pageIterator.next().byteBuffer.duplicate();
219+
buffers[i] = pageIterator.next().byteBuffer().duplicate();
221220
}
222-
ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer.duplicate();
221+
ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer().duplicate();
223222
firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage);
224223
buffers[0] = firstPostIndexBuffer;
225224

@@ -268,53 +267,4 @@ private int pageIndex(long index) {
268267
private int indexInPage(long index) {
269268
return (int) (index & PAGE_MASK);
270269
}
271-
272-
public static class Page implements AutoCloseable {
273-
274-
private final ByteBuffer byteBuffer;
275-
// This is reference counted as some implementations want to retain the byte pages by calling
276-
// sliceAndRetainPagesTo. With reference counting we can increment the reference count, return the
277-
// pages, and safely close them when this channel buffer is done with them. The reference count
278-
// would be 1 at that point, meaning that the pages will remain until the implementation closes
279-
// theirs.
280-
private final RefCountedCloseable refCountedCloseable;
281-
282-
public Page(ByteBuffer byteBuffer, Runnable closeable) {
283-
this(byteBuffer, new RefCountedCloseable(closeable));
284-
}
285-
286-
private Page(ByteBuffer byteBuffer, RefCountedCloseable refCountedCloseable) {
287-
this.byteBuffer = byteBuffer;
288-
this.refCountedCloseable = refCountedCloseable;
289-
}
290-
291-
private Page duplicate() {
292-
refCountedCloseable.incRef();
293-
return new Page(byteBuffer.duplicate(), refCountedCloseable);
294-
}
295-
296-
public ByteBuffer getByteBuffer() {
297-
return byteBuffer;
298-
}
299-
300-
@Override
301-
public void close() {
302-
refCountedCloseable.decRef();
303-
}
304-
305-
private static class RefCountedCloseable extends AbstractRefCounted {
306-
307-
private final Runnable closeable;
308-
309-
private RefCountedCloseable(Runnable closeable) {
310-
super("byte array page");
311-
this.closeable = closeable;
312-
}
313-
314-
@Override
315-
protected void closeInternal() {
316-
closeable.run();
317-
}
318-
}
319-
}
320270
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.nio;
21+
22+
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
23+
24+
import java.io.Closeable;
25+
import java.nio.ByteBuffer;
26+
27+
public class Page implements Closeable {
28+
29+
private final ByteBuffer byteBuffer;
30+
// This is reference counted as some implementations want to retain the byte pages by calling
31+
// duplicate. With reference counting we can increment the reference count, return a new page,
32+
// and safely close the pages independently. The closeable will not be called until each page is
33+
// released.
34+
private final RefCountedCloseable refCountedCloseable;
35+
36+
public Page(ByteBuffer byteBuffer) {
37+
this(byteBuffer, () -> {});
38+
}
39+
40+
public Page(ByteBuffer byteBuffer, Runnable closeable) {
41+
this(byteBuffer, new RefCountedCloseable(closeable));
42+
}
43+
44+
private Page(ByteBuffer byteBuffer, RefCountedCloseable refCountedCloseable) {
45+
this.byteBuffer = byteBuffer;
46+
this.refCountedCloseable = refCountedCloseable;
47+
}
48+
49+
/**
50+
* Duplicates this page and increments the reference count. The new page must be closed independently
51+
* of the original page.
52+
*
53+
* @return the new page
54+
*/
55+
public Page duplicate() {
56+
refCountedCloseable.incRef();
57+
return new Page(byteBuffer.duplicate(), refCountedCloseable);
58+
}
59+
60+
/**
61+
* Returns the {@link ByteBuffer} for this page. Modifications to the limits, positions, etc of the
62+
* buffer will also mutate this page. Call {@link ByteBuffer#duplicate()} to avoid mutating the page.
63+
*
64+
* @return the byte buffer
65+
*/
66+
public ByteBuffer byteBuffer() {
67+
return byteBuffer;
68+
}
69+
70+
@Override
71+
public void close() {
72+
refCountedCloseable.decRef();
73+
}
74+
75+
private static class RefCountedCloseable extends AbstractRefCounted {
76+
77+
private final Runnable closeable;
78+
79+
private RefCountedCloseable(Runnable closeable) {
80+
super("byte array page");
81+
this.closeable = closeable;
82+
}
83+
84+
@Override
85+
protected void closeInternal() {
86+
closeable.run();
87+
}
88+
}
89+
}

libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ protected int flushToChannel(FlushOperation flushOperation) throws IOException {
325325
ioBuffer.clear();
326326
ioBuffer.limit(Math.min(WRITE_LIMIT, ioBuffer.limit()));
327327
int j = 0;
328-
ByteBuffer[] buffers = flushOperation.getBuffersToWrite();
328+
ByteBuffer[] buffers = flushOperation.getBuffersToWrite(WRITE_LIMIT);
329329
while (j < buffers.length && ioBuffer.remaining() > 0) {
330330
ByteBuffer buffer = buffers[j++];
331331
copyBytes(buffer, ioBuffer);

libs/nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.function.Consumer;
3232

3333
import static org.mockito.Matchers.any;
34+
import static org.mockito.Matchers.anyInt;
3435
import static org.mockito.Matchers.eq;
3536
import static org.mockito.Mockito.mock;
3637
import static org.mockito.Mockito.times;
@@ -168,7 +169,7 @@ public void testQueuedWriteIsFlushedInFlushCall() throws Exception {
168169

169170
assertTrue(context.readyForFlush());
170171

171-
when(flushOperation.getBuffersToWrite()).thenReturn(buffers);
172+
when(flushOperation.getBuffersToWrite(anyInt())).thenReturn(buffers);
172173
when(flushOperation.isFullyFlushed()).thenReturn(false, true);
173174
when(flushOperation.getListener()).thenReturn(listener);
174175
context.flushChannel();
@@ -187,7 +188,7 @@ public void testPartialFlush() throws IOException {
187188
assertTrue(context.readyForFlush());
188189

189190
when(flushOperation.isFullyFlushed()).thenReturn(false);
190-
when(flushOperation.getBuffersToWrite()).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
191+
when(flushOperation.getBuffersToWrite(anyInt())).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
191192
context.flushChannel();
192193

193194
verify(listener, times(0)).accept(null, null);
@@ -201,8 +202,8 @@ public void testMultipleWritesPartialFlushes() throws IOException {
201202
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
202203
FlushReadyWrite flushOperation1 = mock(FlushReadyWrite.class);
203204
FlushReadyWrite flushOperation2 = mock(FlushReadyWrite.class);
204-
when(flushOperation1.getBuffersToWrite()).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
205-
when(flushOperation2.getBuffersToWrite()).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
205+
when(flushOperation1.getBuffersToWrite(anyInt())).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
206+
when(flushOperation2.getBuffersToWrite(anyInt())).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
206207
when(flushOperation1.getListener()).thenReturn(listener);
207208
when(flushOperation2.getListener()).thenReturn(listener2);
208209

@@ -237,7 +238,7 @@ public void testWhenIOExceptionThrownListenerIsCalled() throws IOException {
237238
assertTrue(context.readyForFlush());
238239

239240
IOException exception = new IOException();
240-
when(flushOperation.getBuffersToWrite()).thenReturn(buffers);
241+
when(flushOperation.getBuffersToWrite(anyInt())).thenReturn(buffers);
241242
when(rawChannel.write(any(ByteBuffer.class))).thenThrow(exception);
242243
when(flushOperation.getListener()).thenReturn(listener);
243244
expectThrows(IOException.class, () -> context.flushChannel());
@@ -252,7 +253,7 @@ public void testWriteIOExceptionMeansChannelReadyToClose() throws IOException {
252253
context.queueWriteOperation(flushOperation);
253254

254255
IOException exception = new IOException();
255-
when(flushOperation.getBuffersToWrite()).thenReturn(buffers);
256+
when(flushOperation.getBuffersToWrite(anyInt())).thenReturn(buffers);
256257
when(rawChannel.write(any(ByteBuffer.class))).thenThrow(exception);
257258

258259
assertFalse(context.selectorShouldClose());

libs/nio/src/test/java/org/elasticsearch/nio/FlushOperationTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,29 +65,45 @@ public void testMultipleFlushesWithCompositeBuffer() throws IOException {
6565
ByteBuffer[] byteBuffers = writeOp.getBuffersToWrite();
6666
assertEquals(3, byteBuffers.length);
6767
assertEquals(5, byteBuffers[0].remaining());
68+
ByteBuffer[] byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
69+
assertEquals(2, byteBuffersWithLimit.length);
70+
assertEquals(5, byteBuffersWithLimit[0].remaining());
71+
assertEquals(5, byteBuffersWithLimit[1].remaining());
6872

6973
writeOp.incrementIndex(5);
7074
assertFalse(writeOp.isFullyFlushed());
7175
byteBuffers = writeOp.getBuffersToWrite();
7276
assertEquals(2, byteBuffers.length);
7377
assertEquals(15, byteBuffers[0].remaining());
78+
assertEquals(3, byteBuffers[1].remaining());
79+
byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
80+
assertEquals(1, byteBuffersWithLimit.length);
81+
assertEquals(10, byteBuffersWithLimit[0].remaining());
7482

7583
writeOp.incrementIndex(2);
7684
assertFalse(writeOp.isFullyFlushed());
7785
byteBuffers = writeOp.getBuffersToWrite();
7886
assertEquals(2, byteBuffers.length);
7987
assertEquals(13, byteBuffers[0].remaining());
88+
assertEquals(3, byteBuffers[1].remaining());
89+
byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
90+
assertEquals(1, byteBuffersWithLimit.length);
91+
assertEquals(10, byteBuffersWithLimit[0].remaining());
8092

8193
writeOp.incrementIndex(15);
8294
assertFalse(writeOp.isFullyFlushed());
8395
byteBuffers = writeOp.getBuffersToWrite();
8496
assertEquals(1, byteBuffers.length);
8597
assertEquals(1, byteBuffers[0].remaining());
98+
byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
99+
assertEquals(1, byteBuffersWithLimit.length);
100+
assertEquals(1, byteBuffersWithLimit[0].remaining());
86101

87102
writeOp.incrementIndex(1);
88103
assertTrue(writeOp.isFullyFlushed());
89104
byteBuffers = writeOp.getBuffersToWrite();
90-
assertEquals(1, byteBuffers.length);
91-
assertEquals(0, byteBuffers[0].remaining());
105+
assertEquals(0, byteBuffers.length);
106+
byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
107+
assertEquals(0, byteBuffersWithLimit.length);
92108
}
93109
}

0 commit comments

Comments
 (0)