Skip to content

Commit d72757f

Browse files
committed
Fix EsAbortPolicy to conform to API (#29075)
The rejected execution handler API says that rejectedExecution(Runnable, ThreadPoolExecutor) throws a RejectedExecutionException if the task must be rejected due to capacity on the executor. We do throw something that smells like a RejectedExecutionException (it is named EsRejectedExecutionException) yet we violate the API because EsRejectedExecutionException is not a RejectedExecutionException. This has caused problems before where we try to catch RejectedExecution when invoking rejectedExecution but this causes EsRejectedExecutionException to go uncaught. This commit addresses this by modifying EsRejectedExecutionException to extend RejectedExecutionException.
1 parent 2f2f7ed commit d72757f

File tree

9 files changed

+48
-29
lines changed

9 files changed

+48
-29
lines changed

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
import static org.hamcrest.Matchers.equalTo;
116116
import static org.hamcrest.Matchers.greaterThan;
117117
import static org.hamcrest.Matchers.hasSize;
118+
import static org.hamcrest.Matchers.hasToString;
118119
import static org.hamcrest.Matchers.instanceOf;
119120
import static org.hamcrest.Matchers.lessThanOrEqualTo;
120121

@@ -330,7 +331,8 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
330331
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
331332
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
332333
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
333-
assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
334+
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
335+
assertThat(e.getCause(), hasToString(containsString("test")));
334336
assertThat(client.scrollsCleared, contains(scrollId));
335337

336338
// When the task is rejected we don't increment the throttled timer

server/src/main/java/org/elasticsearch/ElasticsearchException.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -828,8 +828,7 @@ private enum ElasticsearchExceptionHandle {
828828
org.elasticsearch.indices.IndexTemplateMissingException::new, 57, UNKNOWN_VERSION_ADDED),
829829
SEND_REQUEST_TRANSPORT_EXCEPTION(org.elasticsearch.transport.SendRequestTransportException.class,
830830
org.elasticsearch.transport.SendRequestTransportException::new, 58, UNKNOWN_VERSION_ADDED),
831-
ES_REJECTED_EXECUTION_EXCEPTION(org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class,
832-
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException::new, 59, UNKNOWN_VERSION_ADDED),
831+
// 59 used to be EsRejectedExecutionException
833832
// 60 used to be for EarlyTerminationException
834833
// 61 used to be for RoutingValidationException
835834
NOT_SERIALIZABLE_EXCEPTION_WRAPPER(org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class,

server/src/main/java/org/elasticsearch/ExceptionsHelper.java

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.ShardOperationFailedException;
2727
import org.elasticsearch.common.Nullable;
2828
import org.elasticsearch.common.logging.Loggers;
29+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2930
import org.elasticsearch.index.Index;
3031
import org.elasticsearch.rest.RestStatus;
3132

@@ -67,6 +68,8 @@ public static RestStatus status(Throwable t) {
6768
return ((ElasticsearchException) t).status();
6869
} else if (t instanceof IllegalArgumentException) {
6970
return RestStatus.BAD_REQUEST;
71+
} else if (t instanceof EsRejectedExecutionException) {
72+
return RestStatus.TOO_MANY_REQUESTS;
7073
}
7174
}
7275
return RestStatus.INTERNAL_SERVER_ERROR;

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

+9
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.common.bytes.BytesReference;
3737
import org.elasticsearch.common.geo.GeoPoint;
3838
import org.elasticsearch.common.text.Text;
39+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3940
import org.joda.time.DateTime;
4041
import org.joda.time.DateTimeZone;
4142

@@ -747,6 +748,11 @@ public <T extends Exception> T readException() throws IOException {
747748
switch (key) {
748749
case 0:
749750
final int ord = readVInt();
751+
if (ord == 59) {
752+
final ElasticsearchException ex = new ElasticsearchException(this);
753+
final boolean isExecutorShutdown = readBoolean();
754+
return (T) new EsRejectedExecutionException(ex.getMessage(), isExecutorShutdown);
755+
}
750756
return (T) ElasticsearchException.readException(this, ord);
751757
case 1:
752758
String msg1 = readOptionalString();
@@ -831,6 +837,9 @@ public <T extends Exception> T readException() throws IOException {
831837
return (T) readStackTrace(new InterruptedException(readOptionalString()), this);
832838
case 17:
833839
return (T) readStackTrace(new IOException(readOptionalString(), readException()), this);
840+
case 18:
841+
final boolean isExecutorShutdown = readBoolean();
842+
return (T) readStackTrace(new EsRejectedExecutionException(readOptionalString(), isExecutorShutdown), this);
834843
default:
835844
throw new IOException("no such exception for id: " + key);
836845
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.geo.GeoPoint;
3636
import org.elasticsearch.common.io.stream.Writeable.Writer;
3737
import org.elasticsearch.common.text.Text;
38+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3839
import org.joda.time.DateTimeZone;
3940
import org.joda.time.ReadableInstant;
4041

@@ -852,8 +853,26 @@ public void writeException(Throwable throwable) throws IOException {
852853
writeCause = false;
853854
} else if (throwable instanceof IOException) {
854855
writeVInt(17);
856+
} else if (throwable instanceof EsRejectedExecutionException) {
857+
if (version.before(Version.V_6_3_0)) {
858+
/*
859+
* This is a backwards compatibility layer when speaking to nodes that still treated EsRejectedExceutionException as an
860+
* instance of ElasticsearchException. As such, we serialize this in a way that the receiving node would read this as an
861+
* EsRejectedExecutionException.
862+
*/
863+
final ElasticsearchException ex = new ElasticsearchException(throwable.getMessage());
864+
writeVInt(0);
865+
writeVInt(59);
866+
ex.writeTo(this);
867+
writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown());
868+
return;
869+
} else {
870+
writeVInt(18);
871+
writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown());
872+
writeCause = false;
873+
}
855874
} else {
856-
ElasticsearchException ex;
875+
final ElasticsearchException ex;
857876
if (throwable instanceof ElasticsearchException && ElasticsearchException.isRegistered(throwable.getClass(), version)) {
858877
ex = (ElasticsearchException) throwable;
859878
} else {
@@ -863,7 +882,6 @@ public void writeException(Throwable throwable) throws IOException {
863882
writeVInt(ElasticsearchException.getId(ex.getClass()));
864883
ex.writeTo(this);
865884
return;
866-
867885
}
868886
if (writeMessage) {
869887
writeOptionalString(throwable.getMessage());

server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java

+2-23
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,9 @@
1919

2020
package org.elasticsearch.common.util.concurrent;
2121

22-
import org.elasticsearch.ElasticsearchException;
23-
import org.elasticsearch.common.io.stream.StreamInput;
24-
import org.elasticsearch.common.io.stream.StreamOutput;
25-
import org.elasticsearch.rest.RestStatus;
22+
import java.util.concurrent.RejectedExecutionException;
2623

27-
import java.io.IOException;
28-
29-
public class EsRejectedExecutionException extends ElasticsearchException {
24+
public class EsRejectedExecutionException extends RejectedExecutionException {
3025

3126
private final boolean isExecutorShutdown;
3227

@@ -43,22 +38,6 @@ public EsRejectedExecutionException() {
4338
this(null, false);
4439
}
4540

46-
@Override
47-
public RestStatus status() {
48-
return RestStatus.TOO_MANY_REQUESTS;
49-
}
50-
51-
public EsRejectedExecutionException(StreamInput in) throws IOException{
52-
super(in);
53-
isExecutorShutdown = in.readBoolean();
54-
}
55-
56-
@Override
57-
public void writeTo(StreamOutput out) throws IOException {
58-
super.writeTo(out);
59-
out.writeBoolean(isExecutorShutdown);
60-
}
61-
6241
/**
6342
* Checks if the thread pool that rejected the execution was terminated
6443
* shortly after the rejection. Its possible that this returns false and the

server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.bytes.BytesArray;
3636
import org.elasticsearch.common.bytes.BytesReference;
3737
import org.elasticsearch.common.collect.Tuple;
38+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3839
import org.elasticsearch.common.xcontent.ToXContent;
3940
import org.elasticsearch.common.xcontent.XContent;
4041
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -76,6 +77,7 @@
7677
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
7778
import static org.hamcrest.CoreMatchers.hasItem;
7879
import static org.hamcrest.CoreMatchers.hasItems;
80+
import static org.hamcrest.Matchers.arrayWithSize;
7981
import static org.hamcrest.Matchers.equalTo;
8082
import static org.hamcrest.Matchers.hasSize;
8183
import static org.hamcrest.Matchers.startsWith;

server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ public void testIds() {
728728
ids.put(56, org.elasticsearch.common.settings.SettingsException.class);
729729
ids.put(57, org.elasticsearch.indices.IndexTemplateMissingException.class);
730730
ids.put(58, org.elasticsearch.transport.SendRequestTransportException.class);
731-
ids.put(59, org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class);
731+
ids.put(59, null); // weas EsRejectedExecutionException, which is no longer an instance of ElasticsearchException
732732
ids.put(60, null); // EarlyTerminationException was removed in 6.0
733733
ids.put(61, null); // RoutingValidationException was removed in 5.0
734734
ids.put(62, org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class);

server/src/test/java/org/elasticsearch/ExceptionsHelperTests.java

+7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.elasticsearch;
2121

2222
import org.apache.commons.codec.DecoderException;
23+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
24+
import org.elasticsearch.rest.RestStatus;
2325
import org.elasticsearch.test.ESTestCase;
2426

2527
import java.util.Optional;
@@ -84,4 +86,9 @@ private void assertError(final Throwable cause, final Error error) {
8486
assertThat(maybeError.get(), equalTo(error));
8587
}
8688

89+
public void testStatus() {
90+
assertThat(ExceptionsHelper.status(new IllegalArgumentException("illegal")), equalTo(RestStatus.BAD_REQUEST));
91+
assertThat(ExceptionsHelper.status(new EsRejectedExecutionException("rejected")), equalTo(RestStatus.TOO_MANY_REQUESTS));
92+
}
93+
8794
}

0 commit comments

Comments
 (0)