Skip to content

Commit 6bf742d

Browse files
authored
Fix EsAbortPolicy to conform to API (#29075)
The rejected execution handler API says that rejectedExecution(Runnable, ThreadPoolExecutor) throws a RejectedExecutionException if the task must be rejected due to capacity on the executor. We do throw something that smells like a RejectedExecutionException (it is named EsRejectedExecutionException) yet we violate the API because EsRejectedExecutionException is not a RejectedExecutionException. This has caused problems before where we try to catch RejectedExecution when invoking rejectedExecution but this causes EsRejectedExecutionException to go uncaught. This commit addresses this by modifying EsRejectedExecutionException to extend RejectedExecutionException.
1 parent 9b41917 commit 6bf742d

File tree

9 files changed

+52
-29
lines changed

9 files changed

+52
-29
lines changed

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
import static org.hamcrest.Matchers.equalTo;
116116
import static org.hamcrest.Matchers.greaterThan;
117117
import static org.hamcrest.Matchers.hasSize;
118+
import static org.hamcrest.Matchers.hasToString;
118119
import static org.hamcrest.Matchers.instanceOf;
119120
import static org.hamcrest.Matchers.lessThanOrEqualTo;
120121

@@ -330,7 +331,8 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
330331
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
331332
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
332333
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
333-
assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
334+
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
335+
assertThat(e.getCause(), hasToString(containsString("test")));
334336
assertThat(client.scrollsCleared, contains(scrollId));
335337

336338
// When the task is rejected we don't increment the throttled timer

server/src/main/java/org/elasticsearch/ElasticsearchException.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -827,8 +827,7 @@ private enum ElasticsearchExceptionHandle {
827827
org.elasticsearch.indices.IndexTemplateMissingException::new, 57, UNKNOWN_VERSION_ADDED),
828828
SEND_REQUEST_TRANSPORT_EXCEPTION(org.elasticsearch.transport.SendRequestTransportException.class,
829829
org.elasticsearch.transport.SendRequestTransportException::new, 58, UNKNOWN_VERSION_ADDED),
830-
ES_REJECTED_EXECUTION_EXCEPTION(org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class,
831-
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException::new, 59, UNKNOWN_VERSION_ADDED),
830+
// 59 used to be EsRejectedExecutionException
832831
// 60 used to be for EarlyTerminationException
833832
// 61 used to be for RoutingValidationException
834833
NOT_SERIALIZABLE_EXCEPTION_WRAPPER(org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class,

server/src/main/java/org/elasticsearch/ExceptionsHelper.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.ShardOperationFailedException;
2727
import org.elasticsearch.common.Nullable;
2828
import org.elasticsearch.common.logging.Loggers;
29+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2930
import org.elasticsearch.index.Index;
3031
import org.elasticsearch.rest.RestStatus;
3132

@@ -67,6 +68,8 @@ public static RestStatus status(Throwable t) {
6768
return ((ElasticsearchException) t).status();
6869
} else if (t instanceof IllegalArgumentException) {
6970
return RestStatus.BAD_REQUEST;
71+
} else if (t instanceof EsRejectedExecutionException) {
72+
return RestStatus.TOO_MANY_REQUESTS;
7073
}
7174
}
7275
return RestStatus.INTERNAL_SERVER_ERROR;

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

+11
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.common.bytes.BytesReference;
3737
import org.elasticsearch.common.geo.GeoPoint;
3838
import org.elasticsearch.common.text.Text;
39+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3940
import org.joda.time.DateTime;
4041
import org.joda.time.DateTimeZone;
4142

@@ -747,6 +748,13 @@ public <T extends Exception> T readException() throws IOException {
747748
switch (key) {
748749
case 0:
749750
final int ord = readVInt();
751+
// TODO: remove the if branch when master is bumped to 8.0.0
752+
assert Version.CURRENT.major < 8;
753+
if (ord == 59) {
754+
final ElasticsearchException ex = new ElasticsearchException(this);
755+
final boolean isExecutorShutdown = readBoolean();
756+
return (T) new EsRejectedExecutionException(ex.getMessage(), isExecutorShutdown);
757+
}
750758
return (T) ElasticsearchException.readException(this, ord);
751759
case 1:
752760
String msg1 = readOptionalString();
@@ -831,6 +839,9 @@ public <T extends Exception> T readException() throws IOException {
831839
return (T) readStackTrace(new InterruptedException(readOptionalString()), this);
832840
case 17:
833841
return (T) readStackTrace(new IOException(readOptionalString(), readException()), this);
842+
case 18:
843+
final boolean isExecutorShutdown = readBoolean();
844+
return (T) readStackTrace(new EsRejectedExecutionException(readOptionalString(), isExecutorShutdown), this);
834845
default:
835846
throw new IOException("no such exception for id: " + key);
836847
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.geo.GeoPoint;
3636
import org.elasticsearch.common.io.stream.Writeable.Writer;
3737
import org.elasticsearch.common.text.Text;
38+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3839
import org.joda.time.DateTimeZone;
3940
import org.joda.time.ReadableInstant;
4041

@@ -852,8 +853,28 @@ public void writeException(Throwable throwable) throws IOException {
852853
writeCause = false;
853854
} else if (throwable instanceof IOException) {
854855
writeVInt(17);
856+
} else if (throwable instanceof EsRejectedExecutionException) {
857+
// TODO: remove the if branch when master is bumped to 8.0.0
858+
assert Version.CURRENT.major < 8;
859+
if (version.before(Version.V_7_0_0_alpha1)) {
860+
/*
861+
* This is a backwards compatibility layer when speaking to nodes that still treated EsRejectedExceutionException as an
862+
* instance of ElasticsearchException. As such, we serialize this in a way that the receiving node would read this as an
863+
* EsRejectedExecutionException.
864+
*/
865+
final ElasticsearchException ex = new ElasticsearchException(throwable.getMessage());
866+
writeVInt(0);
867+
writeVInt(59);
868+
ex.writeTo(this);
869+
writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown());
870+
return;
871+
} else {
872+
writeVInt(18);
873+
writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown());
874+
writeCause = false;
875+
}
855876
} else {
856-
ElasticsearchException ex;
877+
final ElasticsearchException ex;
857878
if (throwable instanceof ElasticsearchException && ElasticsearchException.isRegistered(throwable.getClass(), version)) {
858879
ex = (ElasticsearchException) throwable;
859880
} else {
@@ -863,7 +884,6 @@ public void writeException(Throwable throwable) throws IOException {
863884
writeVInt(ElasticsearchException.getId(ex.getClass()));
864885
ex.writeTo(this);
865886
return;
866-
867887
}
868888
if (writeMessage) {
869889
writeOptionalString(throwable.getMessage());

server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java

+2-23
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,9 @@
1919

2020
package org.elasticsearch.common.util.concurrent;
2121

22-
import org.elasticsearch.ElasticsearchException;
23-
import org.elasticsearch.common.io.stream.StreamInput;
24-
import org.elasticsearch.common.io.stream.StreamOutput;
25-
import org.elasticsearch.rest.RestStatus;
22+
import java.util.concurrent.RejectedExecutionException;
2623

27-
import java.io.IOException;
28-
29-
public class EsRejectedExecutionException extends ElasticsearchException {
24+
public class EsRejectedExecutionException extends RejectedExecutionException {
3025

3126
private final boolean isExecutorShutdown;
3227

@@ -43,22 +38,6 @@ public EsRejectedExecutionException() {
4338
this(null, false);
4439
}
4540

46-
@Override
47-
public RestStatus status() {
48-
return RestStatus.TOO_MANY_REQUESTS;
49-
}
50-
51-
public EsRejectedExecutionException(StreamInput in) throws IOException{
52-
super(in);
53-
isExecutorShutdown = in.readBoolean();
54-
}
55-
56-
@Override
57-
public void writeTo(StreamOutput out) throws IOException {
58-
super.writeTo(out);
59-
out.writeBoolean(isExecutorShutdown);
60-
}
61-
6241
/**
6342
* Checks if the thread pool that rejected the execution was terminated
6443
* shortly after the rejection. Its possible that this returns false and the

server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.bytes.BytesArray;
3636
import org.elasticsearch.common.bytes.BytesReference;
3737
import org.elasticsearch.common.collect.Tuple;
38+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3839
import org.elasticsearch.common.xcontent.ToXContent;
3940
import org.elasticsearch.common.xcontent.XContent;
4041
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -76,6 +77,7 @@
7677
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
7778
import static org.hamcrest.CoreMatchers.hasItem;
7879
import static org.hamcrest.CoreMatchers.hasItems;
80+
import static org.hamcrest.Matchers.arrayWithSize;
7981
import static org.hamcrest.Matchers.equalTo;
8082
import static org.hamcrest.Matchers.hasSize;
8183
import static org.hamcrest.Matchers.startsWith;

server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ public void testIds() {
728728
ids.put(56, org.elasticsearch.common.settings.SettingsException.class);
729729
ids.put(57, org.elasticsearch.indices.IndexTemplateMissingException.class);
730730
ids.put(58, org.elasticsearch.transport.SendRequestTransportException.class);
731-
ids.put(59, org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class);
731+
ids.put(59, null); // weas EsRejectedExecutionException, which is no longer an instance of ElasticsearchException
732732
ids.put(60, null); // EarlyTerminationException was removed in 6.0
733733
ids.put(61, null); // RoutingValidationException was removed in 5.0
734734
ids.put(62, org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class);

server/src/test/java/org/elasticsearch/ExceptionsHelperTests.java

+7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.elasticsearch;
2121

2222
import org.apache.commons.codec.DecoderException;
23+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
24+
import org.elasticsearch.rest.RestStatus;
2325
import org.elasticsearch.test.ESTestCase;
2426

2527
import java.util.Optional;
@@ -84,4 +86,9 @@ private void assertError(final Throwable cause, final Error error) {
8486
assertThat(maybeError.get(), equalTo(error));
8587
}
8688

89+
public void testStatus() {
90+
assertThat(ExceptionsHelper.status(new IllegalArgumentException("illegal")), equalTo(RestStatus.BAD_REQUEST));
91+
assertThat(ExceptionsHelper.status(new EsRejectedExecutionException("rejected")), equalTo(RestStatus.TOO_MANY_REQUESTS));
92+
}
93+
8794
}

0 commit comments

Comments
 (0)