Skip to content

Commit e8b7027

Browse files
authored
Remove Throwable usage from transport modules (elastic#30845)
Currently nio and netty modules use the CompletableFuture class for managing listeners. This is unfortunate as that class accepts Throwable. This commit adds a class CompletableContext that wraps the CompletableFuture but does not accept Throwable. This allows the modification of netty and nio logic to no longer handle Throwable.
1 parent 5a97423 commit e8b7027

File tree

26 files changed

+127
-67
lines changed

26 files changed

+127
-67
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.concurrent;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.function.BiConsumer;
24+
25+
/**
26+
* A thread-safe completable context that allows listeners to be attached. This class relies on the
27+
* {@link CompletableFuture} for the concurrency logic. However, it does not accept {@link Throwable} as
28+
* an exceptional result. This allows attaching listeners that only handle {@link Exception}.
29+
*
30+
* @param <T> the result type
31+
*/
32+
public class CompletableContext<T> {
33+
34+
private final CompletableFuture<T> completableFuture = new CompletableFuture<>();
35+
36+
public void addListener(BiConsumer<T, ? super Exception> listener) {
37+
BiConsumer<T, Throwable> castThrowable = (v, t) -> {
38+
if (t == null) {
39+
listener.accept(v, null);
40+
} else {
41+
assert !(t instanceof Error) : "Cannot be error";
42+
listener.accept(v, (Exception) t);
43+
}
44+
};
45+
completableFuture.whenComplete(castThrowable);
46+
}
47+
48+
public boolean isDone() {
49+
return completableFuture.isDone();
50+
}
51+
52+
public boolean isCompletedExceptionally() {
53+
return completableFuture.isCompletedExceptionally();
54+
}
55+
56+
public boolean completeExceptionally(Exception ex) {
57+
return completableFuture.completeExceptionally(ex);
58+
}
59+
60+
public boolean complete(T value) {
61+
return completableFuture.complete(value);
62+
}
63+
}

libs/elasticsearch-nio/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ publishing {
3333
}
3434

3535
dependencies {
36+
compile "org.elasticsearch:elasticsearch-core:${version}"
37+
3638
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
3739
testCompile "junit:junit:${versions.junit}"
3840
testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}"

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public abstract class BytesWriteHandler implements ReadWriteHandler {
2828

2929
private static final List<FlushOperation> EMPTY_LIST = Collections.emptyList();
3030

31-
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) {
31+
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
3232
assert message instanceof ByteBuffer[] : "This channel only supports messages that are of type: " + ByteBuffer[].class
3333
+ ". Found type: " + message.getClass() + ".";
3434
return new FlushReadyWrite(context, (ByteBuffer[]) message, listener);

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919

2020
package org.elasticsearch.nio;
2121

22+
import org.elasticsearch.common.concurrent.CompletableContext;
23+
2224
import java.io.IOException;
2325
import java.nio.channels.NetworkChannel;
2426
import java.nio.channels.SelectableChannel;
2527
import java.nio.channels.SelectionKey;
26-
import java.util.concurrent.CompletableFuture;
2728
import java.util.function.BiConsumer;
2829
import java.util.function.Consumer;
2930

@@ -37,7 +38,7 @@ public abstract class ChannelContext<S extends SelectableChannel & NetworkChanne
3738

3839
protected final S rawChannel;
3940
private final Consumer<Exception> exceptionHandler;
40-
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
41+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
4142
private volatile SelectionKey selectionKey;
4243

4344
ChannelContext(S rawChannel, Consumer<Exception> exceptionHandler) {
@@ -81,8 +82,8 @@ public void closeFromSelector() throws IOException {
8182
*
8283
* @param listener to be called
8384
*/
84-
public void addCloseListener(BiConsumer<Void, Throwable> listener) {
85-
closeContext.whenComplete(listener);
85+
public void addCloseListener(BiConsumer<Void, Exception> listener) {
86+
closeContext.addListener(listener);
8687
}
8788

8889
public boolean isOpen() {

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushOperation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525

2626
public class FlushOperation {
2727

28-
private final BiConsumer<Void, Throwable> listener;
28+
private final BiConsumer<Void, Exception> listener;
2929
private final ByteBuffer[] buffers;
3030
private final int[] offsets;
3131
private final int length;
3232
private int internalIndex;
3333

34-
public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
34+
public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
3535
this.listener = listener;
3636
this.buffers = buffers;
3737
this.offsets = new int[buffers.length];
@@ -44,7 +44,7 @@ public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener
4444
length = offset;
4545
}
4646

47-
public BiConsumer<Void, Throwable> getListener() {
47+
public BiConsumer<Void, Exception> getListener() {
4848
return listener;
4949
}
5050

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushReadyWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class FlushReadyWrite extends FlushOperation implements WriteOperation {
2727
private final SocketChannelContext channelContext;
2828
private final ByteBuffer[] buffers;
2929

30-
FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
30+
FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
3131
super(buffers, listener);
3232
this.channelContext = channelContext;
3333
this.buffers = buffers;

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public InetSocketAddress getLocalAddress() {
5353
*
5454
* @param listener to be called at close
5555
*/
56-
public void addCloseListener(BiConsumer<Void, Throwable> listener) {
56+
public void addCloseListener(BiConsumer<Void, Exception> listener) {
5757
getContext().addCloseListener(listener);
5858
}
5959

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public InetSocketAddress getRemoteAddress() {
6060
return remoteAddress;
6161
}
6262

63-
public void addConnectListener(BiConsumer<Void, Throwable> listener) {
63+
public void addConnectListener(BiConsumer<Void, Exception> listener) {
6464
context.addConnectListener(listener);
6565
}
6666

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public interface ReadWriteHandler {
3838
* @param listener the listener to be called when the message is sent
3939
* @return the write operation to be queued
4040
*/
41-
WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener);
41+
WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener);
4242

4343
/**
4444
* This method is called on the event loop thread. It should serialize a write operation object to bytes

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.nio;
2121

22+
import org.elasticsearch.common.concurrent.CompletableContext;
2223
import org.elasticsearch.nio.utils.ExceptionsHelper;
2324

2425
import java.io.IOException;
@@ -27,7 +28,6 @@
2728
import java.nio.channels.SocketChannel;
2829
import java.util.ArrayList;
2930
import java.util.LinkedList;
30-
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.function.BiConsumer;
3333
import java.util.function.Consumer;
@@ -48,7 +48,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
4848
protected final AtomicBoolean isClosing = new AtomicBoolean(false);
4949
private final ReadWriteHandler readWriteHandler;
5050
private final SocketSelector selector;
51-
private final CompletableFuture<Void> connectContext = new CompletableFuture<>();
51+
private final CompletableContext<Void> connectContext = new CompletableContext<>();
5252
private final LinkedList<FlushOperation> pendingFlushes = new LinkedList<>();
5353
private boolean ioException;
5454
private boolean peerClosed;
@@ -73,8 +73,8 @@ public NioSocketChannel getChannel() {
7373
return channel;
7474
}
7575

76-
public void addConnectListener(BiConsumer<Void, Throwable> listener) {
77-
connectContext.whenComplete(listener);
76+
public void addConnectListener(BiConsumer<Void, Exception> listener) {
77+
connectContext.addListener(listener);
7878
}
7979

8080
public boolean isConnectComplete() {
@@ -121,7 +121,7 @@ public boolean connect() throws IOException {
121121
return isConnected;
122122
}
123123

124-
public void sendMessage(Object message, BiConsumer<Void, Throwable> listener) {
124+
public void sendMessage(Object message, BiConsumer<Void, Exception> listener) {
125125
if (isClosing.get()) {
126126
listener.accept(null, new ClosedChannelException());
127127
return;

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
138138
* @param listener to be executed
139139
* @param value to provide to listener
140140
*/
141-
public <V> void executeListener(BiConsumer<V, Throwable> listener, V value) {
141+
public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
142142
assertOnSelectorThread();
143143
try {
144144
listener.accept(value, null);
@@ -154,7 +154,7 @@ public <V> void executeListener(BiConsumer<V, Throwable> listener, V value) {
154154
* @param listener to be executed
155155
* @param exception to provide to listener
156156
*/
157-
public <V> void executeFailedListener(BiConsumer<V, Throwable> listener, Exception exception) {
157+
public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
158158
assertOnSelectorThread();
159159
try {
160160
listener.accept(null, exception);

libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*/
2828
public interface WriteOperation {
2929

30-
BiConsumer<Void, Throwable> getListener();
30+
BiConsumer<Void, Exception> getListener();
3131

3232
SocketChannelContext getChannel();
3333

libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class BytesChannelContextTests extends ESTestCase {
4545
private BytesChannelContext context;
4646
private InboundChannelBuffer channelBuffer;
4747
private SocketSelector selector;
48-
private BiConsumer<Void, Throwable> listener;
48+
private BiConsumer<Void, Exception> listener;
4949
private int messageLength;
5050

5151
@Before
@@ -191,7 +191,7 @@ public void testPartialFlush() throws IOException {
191191
public void testMultipleWritesPartialFlushes() throws IOException {
192192
assertFalse(context.readyForFlush());
193193

194-
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
194+
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
195195
FlushReadyWrite flushOperation1 = mock(FlushReadyWrite.class);
196196
FlushReadyWrite flushOperation2 = mock(FlushReadyWrite.class);
197197
when(flushOperation1.getBuffersToWrite()).thenReturn(new ByteBuffer[0]);

libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void testCloseException() throws IOException {
8383
if (t == null) {
8484
throw new AssertionError("Close should not fail");
8585
} else {
86-
exception.set((Exception) t);
86+
exception.set(t);
8787
}
8888
});
8989

libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/FlushOperationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
public class FlushOperationTests extends ESTestCase {
3333

34-
private BiConsumer<Void, Throwable> listener;
34+
private BiConsumer<Void, Exception> listener;
3535

3636
@Before
3737
@SuppressWarnings("unchecked")

libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class SocketChannelContextTests extends ESTestCase {
5050
private TestSocketChannelContext context;
5151
private Consumer<Exception> exceptionHandler;
5252
private NioSocketChannel channel;
53-
private BiConsumer<Void, Throwable> listener;
53+
private BiConsumer<Void, Exception> listener;
5454
private SocketSelector selector;
5555
private ReadWriteHandler readWriteHandler;
5656

@@ -125,7 +125,7 @@ public void testConnectFails() throws IOException {
125125
if (t == null) {
126126
throw new AssertionError("Connection should not succeed");
127127
} else {
128-
exception.set((Exception) t);
128+
exception.set(t);
129129
}
130130
});
131131

@@ -206,7 +206,7 @@ public void testFlushOpsClearedOnClose() throws Exception {
206206

207207
ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
208208
WriteOperation writeOperation = mock(WriteOperation.class);
209-
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
209+
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
210210
when(readWriteHandler.writeToBytes(writeOperation)).thenReturn(Arrays.asList(new FlushOperation(buffer, listener),
211211
new FlushOperation(buffer, listener2)));
212212
context.queueWriteOperation(writeOperation);
@@ -232,7 +232,7 @@ public void testWillPollForFlushOpsToClose() throws Exception {
232232

233233

234234
ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
235-
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
235+
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
236236

237237
assertFalse(context.readyForFlush());
238238
when(channel.isOpen()).thenReturn(true);

libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class SocketSelectorTests extends ESTestCase {
5050
private NioSocketChannel channel;
5151
private TestSelectionKey selectionKey;
5252
private SocketChannelContext channelContext;
53-
private BiConsumer<Void, Throwable> listener;
53+
private BiConsumer<Void, Exception> listener;
5454
private ByteBuffer[] buffers = {ByteBuffer.allocate(1)};
5555
private Selector rawSelector;
5656

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,21 @@
2020
package org.elasticsearch.transport.netty4;
2121

2222
import io.netty.channel.Channel;
23-
import io.netty.channel.ChannelFuture;
2423
import io.netty.channel.ChannelOption;
2524
import io.netty.channel.ChannelPromise;
26-
import org.apache.logging.log4j.message.ParameterizedMessage;
27-
import org.apache.logging.log4j.util.Supplier;
28-
import org.elasticsearch.ElasticsearchException;
2925
import org.elasticsearch.action.ActionListener;
3026
import org.elasticsearch.common.bytes.BytesReference;
27+
import org.elasticsearch.common.concurrent.CompletableContext;
3128
import org.elasticsearch.transport.TcpChannel;
3229
import org.elasticsearch.transport.TransportException;
3330

3431
import java.net.InetSocketAddress;
35-
import java.nio.channels.ClosedSelectorException;
36-
import java.util.concurrent.CompletableFuture;
3732

3833
public class NettyTcpChannel implements TcpChannel {
3934

4035
private final Channel channel;
4136
private final String profile;
42-
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
37+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
4338

4439
NettyTcpChannel(Channel channel, String profile) {
4540
this.channel = channel;
@@ -51,9 +46,9 @@ public class NettyTcpChannel implements TcpChannel {
5146
Throwable cause = f.cause();
5247
if (cause instanceof Error) {
5348
Netty4Utils.maybeDie(cause);
54-
closeContext.completeExceptionally(cause);
49+
closeContext.completeExceptionally(new Exception(cause));
5550
} else {
56-
closeContext.completeExceptionally(cause);
51+
closeContext.completeExceptionally((Exception) cause);
5752
}
5853
}
5954
});
@@ -71,7 +66,7 @@ public String getProfile() {
7166

7267
@Override
7368
public void addCloseListener(ActionListener<Void> listener) {
74-
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
69+
closeContext.addListener(ActionListener.toBiConsumer(listener));
7570
}
7671

7772
@Override

plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
9696
}
9797

9898
@Override
99-
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) {
99+
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
100100
assert message instanceof NioHttpResponse : "This channel only supports messages that are of type: "
101101
+ NioHttpResponse.class + ". Found type: " + message.getClass() + ".";
102102
return new HttpWriteOperation(context, (NioHttpResponse) message, listener);

0 commit comments

Comments
 (0)