Skip to content

Commit 9cf2406

Browse files
authored
Move network stats marking into InboundPipeline (elastic#54908)
This is a follow-up to elastic#48263. It moves the inbound stats tracking inside of the InboundPipeline.
1 parent 9569a8e commit 9cf2406

File tree

12 files changed

+154
-58
lines changed

12 files changed

+154
-58
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.common.bytes.ReleasableBytesReference;
3232
import org.elasticsearch.common.lease.Releasables;
3333
import org.elasticsearch.common.util.PageCacheRecycler;
34+
import org.elasticsearch.threadpool.ThreadPool;
3435
import org.elasticsearch.transport.InboundPipeline;
3536
import org.elasticsearch.transport.Transports;
3637

@@ -53,7 +54,9 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
5354

5455
Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) {
5556
this.transport = transport;
56-
this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException);
57+
final ThreadPool threadPool = transport.getThreadPool();
58+
this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis,
59+
transport::inboundMessage, transport::inboundDecodeException);
5760
}
5861

5962
@Override

plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.nio.BytesWriteHandler;
3030
import org.elasticsearch.nio.InboundChannelBuffer;
3131
import org.elasticsearch.nio.Page;
32+
import org.elasticsearch.threadpool.ThreadPool;
3233
import org.elasticsearch.transport.InboundPipeline;
3334
import org.elasticsearch.transport.TcpTransport;
3435

@@ -41,7 +42,9 @@ public class TcpReadWriteHandler extends BytesWriteHandler {
4142

4243
public TcpReadWriteHandler(NioTcpChannel channel, PageCacheRecycler recycler, TcpTransport transport) {
4344
this.channel = channel;
44-
this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException);
45+
final ThreadPool threadPool = transport.getThreadPool();
46+
this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis,
47+
transport::inboundMessage, transport::inboundDecodeException);
4548
}
4649

4750
@Override

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
2929
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3030
import org.elasticsearch.common.io.stream.StreamInput;
31-
import org.elasticsearch.common.metrics.MeanMetric;
3231
import org.elasticsearch.common.transport.TransportAddress;
3332
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3433
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -45,7 +44,6 @@ public class InboundHandler {
4544

4645
private static final Logger logger = LogManager.getLogger(InboundHandler.class);
4746

48-
private final MeanMetric readBytesMetric = new MeanMetric();
4947
private final ThreadPool threadPool;
5048
private final OutboundHandler outboundHandler;
5149
private final NamedWriteableRegistry namedWriteableRegistry;
@@ -83,10 +81,6 @@ final Transport.ResponseHandlers getResponseHandlers() {
8381
return responseHandlers;
8482
}
8583

86-
MeanMetric getReadBytes() {
87-
return readBytesMetric;
88-
}
89-
9084
void setMessageListener(TransportMessageListener listener) {
9185
if (messageListener == TransportMessageListener.NOOP_LISTENER) {
9286
messageListener = listener;
@@ -100,19 +94,12 @@ void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception
10094
TransportLogger.logInboundMessage(channel, message);
10195

10296
if (message.isPing()) {
103-
readBytesMetric.inc(TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
10497
keepAlive.receiveKeepAlive(channel);
10598
} else {
106-
readBytesMetric.inc(message.getHeader().getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
10799
messageReceived(message, channel);
108100
}
109101
}
110102

111-
void handleDecodeException(TcpChannel channel, Header header) {
112-
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
113-
readBytesMetric.inc(header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
114-
}
115-
116103
private void messageReceived(InboundMessage message, TcpChannel channel) throws IOException {
117104
final InetSocketAddress remoteAddress = channel.getRemoteAddress();
118105
final Header header = message.getHeader();

server/src/main/java/org/elasticsearch/transport/InboundPipeline.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,34 @@
3131
import java.util.ArrayDeque;
3232
import java.util.ArrayList;
3333
import java.util.function.BiConsumer;
34+
import java.util.function.LongSupplier;
3435

3536
public class InboundPipeline implements Releasable {
3637

3738
private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
3839
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);
3940

41+
private final LongSupplier relativeTimeInMillis;
42+
private final StatsTracker statsTracker;
4043
private final InboundDecoder decoder;
4144
private final InboundAggregator aggregator;
4245
private final BiConsumer<TcpChannel, InboundMessage> messageHandler;
4346
private final BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler;
4447
private ArrayDeque<ReleasableBytesReference> pending = new ArrayDeque<>(2);
4548
private boolean isClosed = false;
4649

47-
public InboundPipeline(Version version, PageCacheRecycler recycler, BiConsumer<TcpChannel, InboundMessage> messageHandler,
50+
public InboundPipeline(Version version, StatsTracker statsTracker, PageCacheRecycler recycler, LongSupplier relativeTimeInMillis,
51+
BiConsumer<TcpChannel, InboundMessage> messageHandler,
4852
BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler) {
49-
this(new InboundDecoder(version, recycler), new InboundAggregator(), messageHandler, errorHandler);
53+
this(statsTracker, relativeTimeInMillis, new InboundDecoder(version, recycler), new InboundAggregator(), messageHandler,
54+
errorHandler);
5055
}
5156

52-
private InboundPipeline(InboundDecoder decoder, InboundAggregator aggregator,
53-
BiConsumer<TcpChannel, InboundMessage> messageHandler,
57+
private InboundPipeline(StatsTracker statsTracker, LongSupplier relativeTimeInMillis, InboundDecoder decoder,
58+
InboundAggregator aggregator, BiConsumer<TcpChannel, InboundMessage> messageHandler,
5459
BiConsumer<TcpChannel, Tuple<Header, Exception>> errorHandler) {
60+
this.relativeTimeInMillis = relativeTimeInMillis;
61+
this.statsTracker = statsTracker;
5562
this.decoder = decoder;
5663
this.aggregator = aggregator;
5764
this.messageHandler = messageHandler;
@@ -67,6 +74,8 @@ public void close() {
6774
}
6875

6976
public void handleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
77+
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong());
78+
statsTracker.markBytesRead(reference.length());
7079
pending.add(reference.retain());
7180

7281
final ArrayList<Object> fragments = fragmentList.get();
@@ -116,12 +125,14 @@ private void forwardFragments(TcpChannel channel, ArrayList<Object> fragments) t
116125
} else if (fragment == InboundDecoder.END_CONTENT) {
117126
assert aggregator.isAggregating();
118127
try (InboundMessage aggregated = aggregator.finishAggregation()) {
128+
statsTracker.markMessageReceived();
119129
messageHandler.accept(channel, aggregated);
120130
}
121131
} else if (fragment instanceof Exception) {
122132
final Header header;
123133
if (aggregator.isAggregating()) {
124134
header = aggregator.cancelAggregation();
135+
statsTracker.markMessageReceived();
125136
} else {
126137
header = null;
127138
}

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
3232
import org.elasticsearch.common.lease.Releasable;
3333
import org.elasticsearch.common.lease.Releasables;
34-
import org.elasticsearch.common.metrics.MeanMetric;
3534
import org.elasticsearch.common.network.CloseableChannel;
3635
import org.elasticsearch.common.transport.NetworkExceptionHelper;
3736
import org.elasticsearch.common.transport.TransportAddress;
@@ -46,19 +45,20 @@ final class OutboundHandler {
4645

4746
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
4847

49-
private final MeanMetric transmittedBytesMetric = new MeanMetric();
50-
5148
private final String nodeName;
5249
private final Version version;
5350
private final String[] features;
51+
private final StatsTracker statsTracker;
5452
private final ThreadPool threadPool;
5553
private final BigArrays bigArrays;
5654
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
5755

58-
OutboundHandler(String nodeName, Version version, String[] features, ThreadPool threadPool, BigArrays bigArrays) {
56+
OutboundHandler(String nodeName, Version version, String[] features, StatsTracker statsTracker, ThreadPool threadPool,
57+
BigArrays bigArrays) {
5958
this.nodeName = nodeName;
6059
this.version = version;
6160
this.features = features;
61+
this.statsTracker = statsTracker;
6262
this.threadPool = threadPool;
6363
this.bigArrays = bigArrays;
6464
}
@@ -137,10 +137,6 @@ private void internalSend(TcpChannel channel, SendContext sendContext) throws IO
137137

138138
}
139139

140-
MeanMetric getTransmittedBytes() {
141-
return transmittedBytesMetric;
142-
}
143-
144140
void setMessageListener(TransportMessageListener listener) {
145141
if (messageListener == TransportMessageListener.NOOP_LISTENER) {
146142
messageListener = listener;
@@ -209,7 +205,7 @@ public BytesReference get() throws IOException {
209205
@Override
210206
protected void innerOnResponse(Void v) {
211207
assert messageSize != -1 : "If onResponse is being called, the message should have been serialized";
212-
transmittedBytesMetric.inc(messageSize);
208+
statsTracker.markBytesWritten(messageSize);
213209
closeAndCallback(() -> listener.onResponse(v));
214210
}
215211

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.elasticsearch.common.metrics.MeanMetric;
23+
24+
import java.util.concurrent.atomic.LongAdder;
25+
26+
public class StatsTracker {
27+
28+
private final LongAdder bytesRead = new LongAdder();
29+
private final LongAdder messagesReceived = new LongAdder();
30+
private final MeanMetric writeBytesMetric = new MeanMetric();
31+
32+
public void markBytesRead(long bytesReceived) {
33+
bytesRead.add(bytesReceived);
34+
}
35+
36+
public void markMessageReceived() {
37+
messagesReceived.increment();
38+
}
39+
40+
public void markBytesWritten(long bytesWritten) {
41+
writeBytesMetric.inc(bytesWritten);
42+
}
43+
44+
public long getBytesRead() {
45+
return bytesRead.sum();
46+
}
47+
48+
public long getMessagesReceived() {
49+
return messagesReceived.sum();
50+
}
51+
52+
53+
public MeanMetric getWriteBytes() {
54+
return writeBytesMetric;
55+
}
56+
57+
public long getBytesWritten() {
58+
return writeBytesMetric.sum();
59+
}
60+
61+
public long getMessagesSent() {
62+
return writeBytesMetric.count();
63+
}
64+
}

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
104104
private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
105105
private static final long THIRTY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.3);
106106

107+
final StatsTracker statsTracker = new StatsTracker();
108+
107109
// this limit is per-address
108110
private static final int LIMIT_LOCAL_PORTS_COUNT = 6;
109111

@@ -153,7 +155,7 @@ public TcpTransport(Settings settings, Version version, ThreadPool threadPool, P
153155
}
154156
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
155157

156-
this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays);
158+
this.outboundHandler = new OutboundHandler(nodeName, version, features, statsTracker, threadPool, bigArrays);
157159
this.handshaker = new TransportHandshaker(version, threadPool,
158160
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
159161
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
@@ -169,6 +171,14 @@ public Version getVersion() {
169171
return version;
170172
}
171173

174+
public StatsTracker getStatsTracker() {
175+
return statsTracker;
176+
}
177+
178+
public ThreadPool getThreadPool() {
179+
return threadPool;
180+
}
181+
172182
@Override
173183
protected void doStart() {
174184
}
@@ -685,9 +695,6 @@ public void inboundMessage(TcpChannel channel, InboundMessage message) {
685695
}
686696

687697
public void inboundDecodeException(TcpChannel channel, Tuple<Header, Exception> tuple) {
688-
// Need to call inbound handler to mark bytes received. This should eventually be unnecessary as the
689-
// stats marking will move into the pipeline.
690-
inboundHandler.handleDecodeException(channel, tuple.v1());
691698
onException(channel, tuple.v2());
692699
}
693700

@@ -836,10 +843,12 @@ private void ensureOpen() {
836843

837844
@Override
838845
public final TransportStats getStats() {
839-
MeanMetric transmittedBytes = outboundHandler.getTransmittedBytes();
840-
MeanMetric readBytes = inboundHandler.getReadBytes();
841-
return new TransportStats(acceptedChannels.size(), readBytes.count(), readBytes.sum(), transmittedBytes.count(),
842-
transmittedBytes.sum());
846+
final MeanMetric writeBytesMetric = statsTracker.getWriteBytes();
847+
final long bytesWritten = statsTracker.getBytesWritten();
848+
final long messagesSent = statsTracker.getMessagesSent();
849+
final long messagesReceived = statsTracker.getMessagesReceived();
850+
final long bytesRead = statsTracker.getBytesRead();
851+
return new TransportStats(acceptedChannels.size(), messagesReceived, bytesRead, messagesSent, bytesWritten);
843852
}
844853

845854
/**

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public void setUp() throws Exception {
6060
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
6161
TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {}, (v, f, c, r, r_id) -> {});
6262
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
63-
OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], threadPool, BigArrays.NON_RECYCLING_INSTANCE);
63+
OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], new StatsTracker(), threadPool,
64+
BigArrays.NON_RECYCLING_INSTANCE);
6465
final NoneCircuitBreakerService breaker = new NoneCircuitBreakerService();
6566
handler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, breaker, handshaker, keepAlive);
6667
}
@@ -78,8 +79,6 @@ public void testPing() throws Exception {
7879
handler.registerRequestHandler(registry);
7980

8081
handler.inboundMessage(channel, new InboundMessage(null, true));
81-
assertEquals(1, handler.getReadBytes().count());
82-
assertEquals(6, handler.getReadBytes().sum());
8382
if (channel.isServerChannel()) {
8483
BytesReference ping = channel.getMessageCaptor().get();
8584
assertEquals('E', ping.get(0));

0 commit comments

Comments
 (0)