Skip to content

Commit 4cc35c2

Browse files
committed
Remove parameterization from TcpTransport (elastic#27407)
This commit is a follow up to the work completed in elastic#27132. Essentially it transitions two more methods (sendMessage and getLocalAddress) from Transport to TcpChannel. With this change, there is no longer a need for TcpTransport to be aware of the specific type of channel a transport returns. So that class is no longer parameterized by channel type.
1 parent 5dfb432 commit 4cc35c2

File tree

16 files changed

+239
-206
lines changed

16 files changed

+239
-206
lines changed

core/src/main/java/org/elasticsearch/transport/TcpChannel.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,19 @@
1919

2020
package org.elasticsearch.transport;
2121

22-
import org.apache.logging.log4j.Logger;
23-
import org.apache.logging.log4j.message.ParameterizedMessage;
2422
import org.elasticsearch.action.ActionFuture;
2523
import org.elasticsearch.action.ActionListener;
2624
import org.elasticsearch.action.support.PlainActionFuture;
2725
import org.elasticsearch.cluster.node.DiscoveryNode;
26+
import org.elasticsearch.common.bytes.BytesReference;
2827
import org.elasticsearch.common.lease.Releasable;
2928
import org.elasticsearch.common.lease.Releasables;
3029
import org.elasticsearch.common.unit.TimeValue;
3130

3231
import java.io.IOException;
32+
import java.net.InetSocketAddress;
33+
import java.net.SocketAddress;
3334
import java.util.ArrayList;
34-
import java.util.Collection;
3535
import java.util.Collections;
3636
import java.util.List;
3737
import java.util.concurrent.ExecutionException;
@@ -80,6 +80,22 @@ public interface TcpChannel extends Releasable {
8080
*/
8181
boolean isOpen();
8282

83+
/**
84+
* Returns the local address for this channel.
85+
*
86+
* @return the local address of this channel.
87+
*/
88+
InetSocketAddress getLocalAddress();
89+
90+
/**
91+
* Sends a tcp message to the channel. The listener will be executed once the send process has been
92+
* completed.
93+
*
94+
* @param reference to send to channel
95+
* @param listener to execute upon send completion
96+
*/
97+
void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener);
98+
8399
/**
84100
* Closes the channel.
85101
*

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 58 additions & 68 deletions
Large diffs are not rendered by default.

core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@
2323
import java.io.IOException;
2424
import java.util.concurrent.atomic.AtomicBoolean;
2525

26-
public final class TcpTransportChannel<Channel extends TcpChannel> implements TransportChannel {
27-
private final TcpTransport<Channel> transport;
26+
public final class TcpTransportChannel implements TransportChannel {
27+
private final TcpTransport transport;
2828
private final Version version;
2929
private final String action;
3030
private final long requestId;
3131
private final String profileName;
3232
private final long reservedBytes;
3333
private final AtomicBoolean released = new AtomicBoolean();
3434
private final String channelType;
35-
private final Channel channel;
35+
private final TcpChannel channel;
3636

37-
TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
37+
TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action,
3838
long requestId, Version version, String profileName, long reservedBytes) {
3939
this.version = version;
4040
this.channel = channel;
@@ -97,7 +97,7 @@ public Version getVersion() {
9797
return version;
9898
}
9999

100-
public Channel getChannel() {
100+
public TcpChannel getChannel() {
101101
return channel;
102102
}
103103
}

core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.net.InetSocketAddress;
4040
import java.util.ArrayList;
4141
import java.util.concurrent.TimeUnit;
42-
import java.util.concurrent.atomic.AtomicBoolean;
4342
import java.util.concurrent.atomic.AtomicReference;
4443

4544
import static org.hamcrest.Matchers.equalTo;
@@ -172,57 +171,23 @@ public void testEnsureVersionCompatibility() {
172171

173172
public void testCompressRequest() throws IOException {
174173
final boolean compressed = randomBoolean();
175-
final AtomicBoolean called = new AtomicBoolean(false);
176174
Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100));
177175
ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName());
178-
AtomicReference<IOException> exceptionReference = new AtomicReference<>();
176+
AtomicReference<BytesReference> messageCaptor = new AtomicReference<>();
179177
try {
180-
TcpTransport<FakeChannel> transport = new TcpTransport<FakeChannel>(
178+
TcpTransport transport = new TcpTransport(
181179
"test", Settings.builder().put("transport.tcp.compress", compressed).build(), threadPool,
182180
new BigArrays(Settings.EMPTY, null), null, null, null) {
183-
@Override
184-
protected InetSocketAddress getLocalAddress(FakeChannel o) {
185-
return null;
186-
}
187181

188182
@Override
189183
protected FakeChannel bind(String name, InetSocketAddress address) throws IOException {
190184
return null;
191185
}
192186

193-
@Override
194-
protected void sendMessage(FakeChannel o, BytesReference reference, ActionListener listener) {
195-
try {
196-
StreamInput streamIn = reference.streamInput();
197-
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
198-
int len = streamIn.readInt();
199-
long requestId = streamIn.readLong();
200-
assertEquals(42, requestId);
201-
byte status = streamIn.readByte();
202-
Version version = Version.fromId(streamIn.readInt());
203-
assertEquals(Version.CURRENT, version);
204-
assertEquals(compressed, TransportStatus.isCompress(status));
205-
called.compareAndSet(false, true);
206-
if (compressed) {
207-
final int bytesConsumed = TcpHeader.HEADER_SIZE;
208-
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
209-
.streamInput(streamIn);
210-
}
211-
threadPool.getThreadContext().readHeaders(streamIn);
212-
assertEquals("foobar", streamIn.readString());
213-
Req readReq = new Req("");
214-
readReq.readFrom(streamIn);
215-
assertEquals(request.value, readReq.value);
216-
} catch (IOException e) {
217-
exceptionReference.set(e);
218-
}
219-
}
220-
221187
@Override
222188
protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout,
223-
ActionListener<FakeChannel> connectListener) throws IOException {
224-
FakeChannel fakeChannel = new FakeChannel();
225-
return fakeChannel;
189+
ActionListener<TcpChannel> connectListener) throws IOException {
190+
return new FakeChannel(messageCaptor);
226191
}
227192

228193
@Override
@@ -233,25 +198,54 @@ public long getNumOpenServerConnections() {
233198
@Override
234199
public NodeChannels getConnection(DiscoveryNode node) {
235200
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();
236-
ArrayList<FakeChannel> fakeChannels = new ArrayList<>(numConnections);
201+
ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections);
237202
for (int i = 0; i < numConnections; ++i) {
238-
fakeChannels.add(new FakeChannel());
203+
fakeChannels.add(new FakeChannel(messageCaptor));
239204
}
240205
return new NodeChannels(node, fakeChannels, MockTcpTransport.LIGHT_PROFILE, Version.CURRENT);
241206
}
242207
};
208+
243209
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
244210
Transport.Connection connection = transport.getConnection(node);
245211
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
246-
assertTrue(called.get());
247-
assertNull("IOException while sending message.", exceptionReference.get());
212+
213+
BytesReference reference = messageCaptor.get();
214+
assertNotNull(reference);
215+
216+
StreamInput streamIn = reference.streamInput();
217+
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
218+
int len = streamIn.readInt();
219+
long requestId = streamIn.readLong();
220+
assertEquals(42, requestId);
221+
byte status = streamIn.readByte();
222+
Version version = Version.fromId(streamIn.readInt());
223+
assertEquals(Version.CURRENT, version);
224+
assertEquals(compressed, TransportStatus.isCompress(status));
225+
if (compressed) {
226+
final int bytesConsumed = TcpHeader.HEADER_SIZE;
227+
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
228+
.streamInput(streamIn);
229+
}
230+
threadPool.getThreadContext().readHeaders(streamIn);
231+
assertEquals("foobar", streamIn.readString());
232+
Req readReq = new Req("");
233+
readReq.readFrom(streamIn);
234+
assertEquals(request.value, readReq.value);
235+
248236
} finally {
249237
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
250238
}
251239
}
252240

253241
private static final class FakeChannel implements TcpChannel {
254242

243+
private final AtomicReference<BytesReference> messageCaptor;
244+
245+
FakeChannel(AtomicReference<BytesReference> messageCaptor) {
246+
this.messageCaptor = messageCaptor;
247+
}
248+
255249
@Override
256250
public void close() {
257251
}
@@ -268,6 +262,16 @@ public void setSoLinger(int value) throws IOException {
268262
public boolean isOpen() {
269263
return false;
270264
}
265+
266+
@Override
267+
public InetSocketAddress getLocalAddress() {
268+
return null;
269+
}
270+
271+
@Override
272+
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
273+
messageCaptor.set(reference);
274+
}
271275
}
272276

273277
private static final class Req extends TransportRequest {

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.elasticsearch.action.ActionListener;
4343
import org.elasticsearch.cluster.node.DiscoveryNode;
4444
import org.elasticsearch.common.SuppressForbidden;
45-
import org.elasticsearch.common.bytes.BytesReference;
4645
import org.elasticsearch.common.collect.Tuple;
4746
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
4847
import org.elasticsearch.common.lease.Releasables;
@@ -57,6 +56,7 @@
5756
import org.elasticsearch.common.util.concurrent.EsExecutors;
5857
import org.elasticsearch.indices.breaker.CircuitBreakerService;
5958
import org.elasticsearch.threadpool.ThreadPool;
59+
import org.elasticsearch.transport.TcpChannel;
6060
import org.elasticsearch.transport.TcpTransport;
6161
import org.elasticsearch.transport.TransportRequestOptions;
6262

@@ -79,7 +79,7 @@
7979
* longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for
8080
* sending out ping requests to other nodes.
8181
*/
82-
public class Netty4Transport extends TcpTransport<NettyTcpChannel> {
82+
public class Netty4Transport extends TcpTransport {
8383

8484
static {
8585
Netty4Utils.setup();
@@ -249,7 +249,7 @@ public long getNumOpenServerConnections() {
249249
}
250250

251251
@Override
252-
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<NettyTcpChannel> listener)
252+
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> listener)
253253
throws IOException {
254254
ChannelFuture channelFuture = bootstrap.connect(node.getAddress().address());
255255
Channel channel = channelFuture.channel();
@@ -279,28 +279,6 @@ protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectT
279279
return nettyChannel;
280280
}
281281

282-
@Override
283-
protected void sendMessage(NettyTcpChannel channel, BytesReference reference, ActionListener<NettyTcpChannel> listener) {
284-
final ChannelFuture future = channel.getLowLevelChannel().writeAndFlush(Netty4Utils.toByteBuf(reference));
285-
future.addListener(f -> {
286-
if (f.isSuccess()) {
287-
listener.onResponse(channel);
288-
} else {
289-
final Throwable cause = f.cause();
290-
Netty4Utils.maybeDie(cause);
291-
logger.warn((Supplier<?>) () ->
292-
new ParameterizedMessage("write and flush on the network layer failed (channel: {})", channel), cause);
293-
assert cause instanceof Exception;
294-
listener.onFailure((Exception) cause);
295-
}
296-
});
297-
}
298-
299-
@Override
300-
protected InetSocketAddress getLocalAddress(NettyTcpChannel channel) {
301-
return (InetSocketAddress) channel.getLowLevelChannel().localAddress();
302-
}
303-
304282
@Override
305283
protected NettyTcpChannel bind(String name, InetSocketAddress address) {
306284
Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@
2020
package org.elasticsearch.transport.netty4;
2121

2222
import io.netty.channel.Channel;
23+
import io.netty.channel.ChannelFuture;
2324
import io.netty.channel.ChannelOption;
25+
import org.apache.logging.log4j.message.ParameterizedMessage;
26+
import org.apache.logging.log4j.util.Supplier;
2427
import org.elasticsearch.action.ActionListener;
28+
import org.elasticsearch.common.bytes.BytesReference;
2529
import org.elasticsearch.transport.TcpChannel;
2630

31+
import java.net.InetSocketAddress;
2732
import java.util.concurrent.CompletableFuture;
2833

2934
public class NettyTcpChannel implements TcpChannel {
@@ -48,10 +53,6 @@ public class NettyTcpChannel implements TcpChannel {
4853
});
4954
}
5055

51-
public Channel getLowLevelChannel() {
52-
return channel;
53-
}
54-
5556
@Override
5657
public void close() {
5758
channel.close();
@@ -71,4 +72,28 @@ public void setSoLinger(int value) {
7172
public boolean isOpen() {
7273
return channel.isOpen();
7374
}
75+
76+
@Override
77+
public InetSocketAddress getLocalAddress() {
78+
return (InetSocketAddress) channel.localAddress();
79+
}
80+
81+
@Override
82+
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
83+
final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
84+
future.addListener(f -> {
85+
if (f.isSuccess()) {
86+
listener.onResponse(this);
87+
} else {
88+
final Throwable cause = f.cause();
89+
Netty4Utils.maybeDie(cause);
90+
assert cause instanceof Exception;
91+
listener.onFailure((Exception) cause);
92+
}
93+
});
94+
}
95+
96+
public Channel getLowLevelChannel() {
97+
return channel;
98+
}
7499
}

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
3737
import org.elasticsearch.test.ESIntegTestCase.Scope;
3838
import org.elasticsearch.threadpool.ThreadPool;
39+
import org.elasticsearch.transport.TcpChannel;
3940
import org.elasticsearch.transport.TcpTransport;
4041
import org.elasticsearch.transport.Transport;
4142

@@ -108,7 +109,8 @@ public ExceptionThrowingNetty4Transport(
108109
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
109110
}
110111

111-
protected String handleRequest(NettyTcpChannel channel, String profileName,
112+
@Override
113+
protected String handleRequest(TcpChannel channel, String profileName,
112114
StreamInput stream, long requestId, int messageLengthBytes, Version version,
113115
InetSocketAddress remoteAddress, byte status) throws IOException {
114116
String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version,

0 commit comments

Comments
 (0)