Skip to content

Commit 9270134

Browse files
authored
Read multiple TLS packets in one read call (#41820)
This is related to #27260. Currently we have a single read buffer that is no larger than a single TLS packet. This prevents us from reading multiple TLS packets in a single socket read call. This commit modifies our TLS work to support reading similar to the plaintext case. The data will be copied to a (potentially) recycled TLS packet-sized buffer for interaction with the SSLEngine.
1 parent 228d23d commit 9270134

File tree

15 files changed

+407
-374
lines changed

15 files changed

+407
-374
lines changed

libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.util.Iterator;
2828
import java.util.List;
2929
import java.util.concurrent.atomic.AtomicBoolean;
30-
import java.util.function.Supplier;
30+
import java.util.function.IntFunction;
3131

3232
/**
3333
* This is a channel byte buffer composed internally of 16kb pages. When an entire message has been read
@@ -37,30 +37,27 @@
3737
*/
3838
public final class InboundChannelBuffer implements AutoCloseable {
3939

40-
private static final int PAGE_SIZE = 1 << 14;
40+
public static final int PAGE_SIZE = 1 << 14;
4141
private static final int PAGE_MASK = PAGE_SIZE - 1;
4242
private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
4343
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
4444
private static final Page[] EMPTY_BYTE_PAGE_ARRAY = new Page[0];
4545

46-
47-
private final ArrayDeque<Page> pages;
48-
private final Supplier<Page> pageSupplier;
46+
private final IntFunction<Page> pageAllocator;
47+
private final ArrayDeque<Page> pages = new ArrayDeque<>();
4948
private final AtomicBoolean isClosed = new AtomicBoolean(false);
5049

5150
private long capacity = 0;
5251
private long internalIndex = 0;
5352
// The offset is an int as it is the offset of where the bytes begin in the first buffer
5453
private int offset = 0;
5554

56-
public InboundChannelBuffer(Supplier<Page> pageSupplier) {
57-
this.pageSupplier = pageSupplier;
58-
this.pages = new ArrayDeque<>();
59-
this.capacity = PAGE_SIZE * pages.size();
55+
public InboundChannelBuffer(IntFunction<Page> pageAllocator) {
56+
this.pageAllocator = pageAllocator;
6057
}
6158

6259
public static InboundChannelBuffer allocatingInstance() {
63-
return new InboundChannelBuffer(() -> new Page(ByteBuffer.allocate(PAGE_SIZE), () -> {}));
60+
return new InboundChannelBuffer((n) -> new Page(ByteBuffer.allocate(n), () -> {}));
6461
}
6562

6663
@Override
@@ -87,7 +84,7 @@ public void ensureCapacity(long requiredCapacity) {
8784
int numPages = numPages(requiredCapacity + offset);
8885
int pagesToAdd = numPages - pages.size();
8986
for (int i = 0; i < pagesToAdd; i++) {
90-
Page page = pageSupplier.get();
87+
Page page = pageAllocator.apply(PAGE_SIZE);
9188
pages.addLast(page);
9289
}
9390
capacity += pagesToAdd * PAGE_SIZE;

libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.nio;
2121

2222
import org.elasticsearch.common.concurrent.CompletableContext;
23+
import org.elasticsearch.nio.utils.ByteBufferUtils;
2324
import org.elasticsearch.nio.utils.ExceptionsHelper;
2425

2526
import java.io.IOException;
@@ -249,26 +250,6 @@ protected void setCloseNow() {
249250
// data that is copied to the buffer for a write, but not successfully flushed immediately, must be
250251
// copied again on the next call.
251252

252-
protected int readFromChannel(ByteBuffer buffer) throws IOException {
253-
ByteBuffer ioBuffer = getSelector().getIoBuffer();
254-
ioBuffer.limit(Math.min(buffer.remaining(), ioBuffer.limit()));
255-
int bytesRead;
256-
try {
257-
bytesRead = rawChannel.read(ioBuffer);
258-
} catch (IOException e) {
259-
closeNow = true;
260-
throw e;
261-
}
262-
if (bytesRead < 0) {
263-
closeNow = true;
264-
return 0;
265-
} else {
266-
ioBuffer.flip();
267-
buffer.put(ioBuffer);
268-
return bytesRead;
269-
}
270-
}
271-
272253
protected int readFromChannel(InboundChannelBuffer channelBuffer) throws IOException {
273254
ByteBuffer ioBuffer = getSelector().getIoBuffer();
274255
int bytesRead;
@@ -288,7 +269,7 @@ protected int readFromChannel(InboundChannelBuffer channelBuffer) throws IOExcep
288269
int j = 0;
289270
while (j < buffers.length && ioBuffer.remaining() > 0) {
290271
ByteBuffer buffer = buffers[j++];
291-
copyBytes(ioBuffer, buffer);
272+
ByteBufferUtils.copyBytes(ioBuffer, buffer);
292273
}
293274
channelBuffer.incrementIndex(bytesRead);
294275
return bytesRead;
@@ -299,24 +280,6 @@ protected int readFromChannel(InboundChannelBuffer channelBuffer) throws IOExcep
299280
// copying.
300281
private final int WRITE_LIMIT = 1 << 16;
301282

302-
protected int flushToChannel(ByteBuffer buffer) throws IOException {
303-
int initialPosition = buffer.position();
304-
ByteBuffer ioBuffer = getSelector().getIoBuffer();
305-
ioBuffer.limit(Math.min(WRITE_LIMIT, ioBuffer.limit()));
306-
copyBytes(buffer, ioBuffer);
307-
ioBuffer.flip();
308-
int bytesWritten;
309-
try {
310-
bytesWritten = rawChannel.write(ioBuffer);
311-
} catch (IOException e) {
312-
closeNow = true;
313-
buffer.position(initialPosition);
314-
throw e;
315-
}
316-
buffer.position(initialPosition + bytesWritten);
317-
return bytesWritten;
318-
}
319-
320283
protected int flushToChannel(FlushOperation flushOperation) throws IOException {
321284
ByteBuffer ioBuffer = getSelector().getIoBuffer();
322285

@@ -325,12 +288,8 @@ protected int flushToChannel(FlushOperation flushOperation) throws IOException {
325288
while (continueFlush) {
326289
ioBuffer.clear();
327290
ioBuffer.limit(Math.min(WRITE_LIMIT, ioBuffer.limit()));
328-
int j = 0;
329291
ByteBuffer[] buffers = flushOperation.getBuffersToWrite(WRITE_LIMIT);
330-
while (j < buffers.length && ioBuffer.remaining() > 0) {
331-
ByteBuffer buffer = buffers[j++];
332-
copyBytes(buffer, ioBuffer);
333-
}
292+
ByteBufferUtils.copyBytes(buffers, ioBuffer);
334293
ioBuffer.flip();
335294
int bytesFlushed;
336295
try {
@@ -345,12 +304,4 @@ protected int flushToChannel(FlushOperation flushOperation) throws IOException {
345304
}
346305
return totalBytesFlushed;
347306
}
348-
349-
private void copyBytes(ByteBuffer from, ByteBuffer to) {
350-
int nBytesToCopy = Math.min(to.remaining(), from.remaining());
351-
int initialLimit = from.limit();
352-
from.limit(from.position() + nBytesToCopy);
353-
to.put(from);
354-
from.limit(initialLimit);
355-
}
356307
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.nio.utils;
21+
22+
import java.nio.ByteBuffer;
23+
24+
public final class ByteBufferUtils {
25+
26+
private ByteBufferUtils() {}
27+
28+
/**
29+
* Copies bytes from the array of byte buffers into the destination buffer. The number of bytes copied is
30+
* limited by the bytes available to copy and the space remaining in the destination byte buffer.
31+
*
32+
* @param source byte buffers to copy from
33+
* @param destination byte buffer to copy to
34+
*
35+
* @return number of bytes copied
36+
*/
37+
public static long copyBytes(ByteBuffer[] source, ByteBuffer destination) {
38+
long bytesCopied = 0;
39+
for (int i = 0; i < source.length && destination.hasRemaining(); i++) {
40+
ByteBuffer buffer = source[i];
41+
bytesCopied += copyBytes(buffer, destination);
42+
}
43+
return bytesCopied;
44+
}
45+
46+
/**
47+
* Copies bytes from source byte buffer into the destination buffer. The number of bytes copied is
48+
* limited by the bytes available to copy and the space remaining in the destination byte buffer.
49+
*
50+
* @param source byte buffer to copy from
51+
* @param destination byte buffer to copy to
52+
*
53+
* @return number of bytes copied
54+
*/
55+
public static int copyBytes(ByteBuffer source, ByteBuffer destination) {
56+
int nBytesToCopy = Math.min(destination.remaining(), source.remaining());
57+
int initialLimit = source.limit();
58+
source.limit(source.position() + nBytesToCopy);
59+
destination.put(source);
60+
source.limit(initialLimit);
61+
return nBytesToCopy;
62+
}
63+
}

0 commit comments

Comments
 (0)