Skip to content

Commit d4538df

Browse files
authored
Improve exception handling on TransportMasterNodeAction (#29314)
We have seen exceptions bubble up to the uncaught exception handler. Checking the blocks can lead for example to IndexNotFoundException when the indices are resolved. In order to make TransportMasterNodeAction more resilient against such expected exceptions, this code change wraps the execution of doStart() into a try catch and informs the listener in case of failures.
1 parent 2dc546c commit d4538df

File tree

2 files changed

+98
-55
lines changed

2 files changed

+98
-55
lines changed

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

+65-55
Original file line numberDiff line numberDiff line change
@@ -145,69 +145,79 @@ public void start() {
145145
}
146146

147147
protected void doStart(ClusterState clusterState) {
148-
final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
149-
final DiscoveryNodes nodes = clusterState.nodes();
150-
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
151-
// check for block, if blocked, retry, else, execute locally
152-
final ClusterBlockException blockException = checkBlock(request, clusterState);
153-
if (blockException != null) {
154-
if (!blockException.retryable()) {
155-
listener.onFailure(blockException);
156-
} else {
157-
logger.trace("can't execute due to a cluster block, retrying", blockException);
158-
retry(blockException, newState -> {
159-
ClusterBlockException newException = checkBlock(request, newState);
160-
return (newException == null || !newException.retryable());
161-
});
162-
}
163-
} else {
164-
ActionListener<Response> delegate = new ActionListener<Response>() {
165-
@Override
166-
public void onResponse(Response response) {
167-
listener.onResponse(response);
148+
try {
149+
final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
150+
final DiscoveryNodes nodes = clusterState.nodes();
151+
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
152+
// check for block, if blocked, retry, else, execute locally
153+
final ClusterBlockException blockException = checkBlock(request, clusterState);
154+
if (blockException != null) {
155+
if (!blockException.retryable()) {
156+
listener.onFailure(blockException);
157+
} else {
158+
logger.trace("can't execute due to a cluster block, retrying", blockException);
159+
retry(blockException, newState -> {
160+
try {
161+
ClusterBlockException newException = checkBlock(request, newState);
162+
return (newException == null || !newException.retryable());
163+
} catch (Exception e) {
164+
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
165+
logger.trace("exception occurred during cluster block checking, accepting state", e);
166+
return true;
167+
}
168+
});
168169
}
170+
} else {
171+
ActionListener<Response> delegate = new ActionListener<Response>() {
172+
@Override
173+
public void onResponse(Response response) {
174+
listener.onResponse(response);
175+
}
169176

170-
@Override
171-
public void onFailure(Exception t) {
172-
if (t instanceof Discovery.FailedToCommitClusterStateException
177+
@Override
178+
public void onFailure(Exception t) {
179+
if (t instanceof Discovery.FailedToCommitClusterStateException
173180
|| (t instanceof NotMasterException)) {
174-
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
175-
retry(t, masterChangePredicate);
176-
} else {
177-
listener.onFailure(t);
181+
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
182+
retry(t, masterChangePredicate);
183+
} else {
184+
listener.onFailure(t);
185+
}
178186
}
179-
}
180-
};
181-
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
182-
@Override
183-
protected void doRun() throws Exception {
184-
masterOperation(task, request, clusterState, delegate);
185-
}
186-
});
187-
}
188-
} else {
189-
if (nodes.getMasterNode() == null) {
190-
logger.debug("no known master node, scheduling a retry");
191-
retry(null, masterChangePredicate);
187+
};
188+
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
189+
@Override
190+
protected void doRun() throws Exception {
191+
masterOperation(task, request, clusterState, delegate);
192+
}
193+
});
194+
}
192195
} else {
193-
DiscoveryNode masterNode = nodes.getMasterNode();
194-
final String actionName = getMasterActionName(masterNode);
195-
transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
196-
TransportMasterNodeAction.this::newResponse) {
197-
@Override
198-
public void handleException(final TransportException exp) {
199-
Throwable cause = exp.unwrapCause();
200-
if (cause instanceof ConnectTransportException) {
201-
// we want to retry here a bit to see if a new master is elected
202-
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
196+
if (nodes.getMasterNode() == null) {
197+
logger.debug("no known master node, scheduling a retry");
198+
retry(null, masterChangePredicate);
199+
} else {
200+
DiscoveryNode masterNode = nodes.getMasterNode();
201+
final String actionName = getMasterActionName(masterNode);
202+
transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
203+
TransportMasterNodeAction.this::newResponse) {
204+
@Override
205+
public void handleException(final TransportException exp) {
206+
Throwable cause = exp.unwrapCause();
207+
if (cause instanceof ConnectTransportException) {
208+
// we want to retry here a bit to see if a new master is elected
209+
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
203210
actionName, nodes.getMasterNode(), exp.getDetailedMessage());
204-
retry(cause, masterChangePredicate);
205-
} else {
206-
listener.onFailure(exp);
211+
retry(cause, masterChangePredicate);
212+
} else {
213+
listener.onFailure(exp);
214+
}
207215
}
208-
}
209-
});
216+
});
217+
}
210218
}
219+
} catch (Exception e) {
220+
listener.onFailure(e);
211221
}
212222
}
213223

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

+33
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,39 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
242242
}
243243
}
244244

245+
public void testCheckBlockThrowsException() throws InterruptedException {
246+
boolean throwExceptionOnRetry = randomBoolean();
247+
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60));
248+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
249+
250+
ClusterBlock block = new ClusterBlock(1, "", true, true,
251+
false, randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
252+
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
253+
.blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
254+
setState(clusterService, stateWithBlock);
255+
256+
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
257+
@Override
258+
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
259+
Set<ClusterBlock> blocks = state.blocks().global();
260+
if (throwExceptionOnRetry == false || blocks.isEmpty()) {
261+
throw new RuntimeException("checkBlock has thrown exception");
262+
}
263+
return new ClusterBlockException(blocks);
264+
265+
}
266+
}.execute(request, listener);
267+
268+
if (throwExceptionOnRetry == false) {
269+
assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class);
270+
} else {
271+
assertFalse(listener.isDone());
272+
setState(clusterService, ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
273+
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
274+
assertListenerThrows("checkBlock has thrown exception", listener, RuntimeException.class);
275+
}
276+
}
277+
245278
public void testForceLocalOperation() throws ExecutionException, InterruptedException {
246279
Request request = new Request();
247280
PlainActionFuture<Response> listener = new PlainActionFuture<>();

0 commit comments

Comments
 (0)