Skip to content

Commit 973ce28

Browse files
committed
Improve MockTcpTransport memory usage (elastic#35402) (elastic#35418) (elastic#35425)
The MockTcpTransport is not friendly in regards to memory usage. It must allocate multiple byte arrays for every message. This improves the memory situation by failing fast if the message is improperly formatted. Additionally, it uses reusable big arrays for at least half of the allocated byte arrays.
1 parent 4232e77 commit 973ce28

File tree

2 files changed

+20
-14
lines changed

2 files changed

+20
-14
lines changed

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
198198
protected final ScheduledPing scheduledPing;
199199
private final TimeValue pingSchedule;
200200
protected final ThreadPool threadPool;
201-
private final BigArrays bigArrays;
201+
protected final BigArrays bigArrays;
202202
protected final NetworkService networkService;
203203
protected final Set<ProfileSettings> profileSettings;
204204

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@
2121
import org.elasticsearch.core.internal.io.IOUtils;
2222
import org.elasticsearch.Version;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.Version;
25+
import org.elasticsearch.action.ActionListener;
26+
import org.elasticsearch.cli.SuppressForbidden;
2427
import org.elasticsearch.cluster.node.DiscoveryNode;
2528
import org.elasticsearch.common.bytes.BytesReference;
2629
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2730
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
2831
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
32+
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
2933
import org.elasticsearch.common.io.stream.StreamInput;
3034
import org.elasticsearch.common.network.NetworkService;
3135
import org.elasticsearch.common.settings.Settings;
@@ -35,6 +39,7 @@
3539
import org.elasticsearch.common.util.CancellableThreads;
3640
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3741
import org.elasticsearch.common.util.concurrent.EsExecutors;
42+
import org.elasticsearch.core.internal.io.IOUtils;
3843
import org.elasticsearch.indices.breaker.CircuitBreakerService;
3944
import org.elasticsearch.mocksocket.MockServerSocket;
4045
import org.elasticsearch.mocksocket.MockSocket;
@@ -153,19 +158,20 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
153158
if (msgSize == -1) {
154159
socket.getOutputStream().flush();
155160
} else {
156-
BytesStreamOutput output = new BytesStreamOutput();
157-
final byte[] buffer = new byte[msgSize];
158-
input.readFully(buffer);
159-
output.write(minimalHeader);
160-
output.writeInt(msgSize);
161-
output.write(buffer);
162-
final BytesReference bytes = output.bytes();
163-
if (TcpTransport.validateMessageHeader(bytes)) {
164-
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
165-
messageReceived(bytes.slice(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE, msgSize),
166-
mockChannel, mockChannel.profile, remoteAddress, msgSize);
167-
} else {
168-
// ping message - we just drop all stuff
161+
try (BytesStreamOutput output = new ReleasableBytesStreamOutput(msgSize, bigArrays)) {
162+
final byte[] buffer = new byte[msgSize];
163+
input.readFully(buffer);
164+
output.write(minimalHeader);
165+
output.writeInt(msgSize);
166+
output.write(buffer);
167+
final BytesReference bytes = output.bytes();
168+
if (TcpTransport.validateMessageHeader(bytes)) {
169+
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();
170+
messageReceived(bytes.slice(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE, msgSize),
171+
mockChannel, mockChannel.profile, remoteAddress, msgSize);
172+
} else {
173+
// ping message - we just drop all stuff
174+
}
169175
}
170176
}
171177
}

0 commit comments

Comments
 (0)