Skip to content

Commit 89a09b9

Browse files
committed
Master should wait on cluster state publication when failing a shard
When a client sends a request to fail a shard to the master, the current behavior is that the master will submit the cluster state update task and then immediately send a successful response back to the client; additionally, if there are any failures while processing the cluster state update task to fail the shard, then the client will never be notified of these failures. This commit modifies the master behavior when handling requests to fail a shard. In particular, the master will now wait until successful publication of the cluster state update before notifying the request client that the shard is marked as failed; additionally, the client is now notified of any failures during the execution of the cluster state update task. Relates #14252
1 parent 44467df commit 89a09b9

File tree

1 file changed

+51
-22
lines changed

1 file changed

+51
-22
lines changed

core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

+51-22
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.ClusterStateTaskConfig;
2626
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2727
import org.elasticsearch.cluster.ClusterStateTaskListener;
28+
import org.elasticsearch.cluster.NotMasterException;
2829
import org.elasticsearch.cluster.metadata.IndexMetaData;
2930
import org.elasticsearch.cluster.node.DiscoveryNode;
3031
import org.elasticsearch.cluster.routing.RoutingService;
@@ -53,6 +54,7 @@
5354
import java.io.IOException;
5455
import java.util.ArrayList;
5556
import java.util.List;
57+
import java.util.Locale;
5658

5759
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
5860

@@ -113,7 +115,7 @@ public void handleResponse(TransportResponse.Empty response) {
113115

114116
@Override
115117
public void handleException(TransportException exp) {
116-
logger.warn("failed to send failed shard to {}", exp, masterNode);
118+
logger.warn("unexpected failure while sending request to [{}] to fail shard [{}]", exp, masterNode, shardRoutingEntry);
117119
listener.onShardFailedFailure(masterNode, exp);
118120
}
119121
});
@@ -122,22 +124,62 @@ public void handleException(TransportException exp) {
122124
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
123125
@Override
124126
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
125-
handleShardFailureOnMaster(request);
126-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
127+
handleShardFailureOnMaster(request, new ClusterStateTaskListener() {
128+
@Override
129+
public void onFailure(String source, Throwable t) {
130+
logger.error("unexpected failure while failing shard [{}]", t, request.shardRouting);
131+
try {
132+
channel.sendResponse(t);
133+
} catch (Throwable channelThrowable) {
134+
logger.warn("failed to send failure [{}] while failing shard [{}]", channelThrowable, t, request.shardRouting);
135+
}
136+
}
137+
138+
@Override
139+
public void onNoLongerMaster(String source) {
140+
logger.error("no longer master while failing shard [{}]", request.shardRouting);
141+
try {
142+
channel.sendResponse(new NotMasterException(source));
143+
} catch (Throwable channelThrowable) {
144+
logger.warn("failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting);
145+
}
146+
}
147+
148+
@Override
149+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
150+
try {
151+
int numberOfUnassignedShards = newState.getRoutingNodes().unassigned().size();
152+
if (oldState != newState && numberOfUnassignedShards > 0) {
153+
String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shard [%s]", numberOfUnassignedShards, request.shardRouting);
154+
if (logger.isTraceEnabled()) {
155+
logger.trace(reason + ", scheduling a reroute");
156+
}
157+
routingService.reroute(reason);
158+
}
159+
} finally {
160+
try {
161+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
162+
} catch (Throwable channelThrowable) {
163+
logger.warn("failed to send response while failing shard [{}]", channelThrowable, request.shardRouting);
164+
}
165+
}
166+
}
167+
}
168+
);
127169
}
128170
}
129171

130-
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
172+
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry> {
131173
@Override
132174
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
133175
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
134-
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
176+
List<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>(tasks.size());
135177
for (ShardRoutingEntry task : tasks) {
136-
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure));
178+
failedShards.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure));
137179
}
138180
ClusterState maybeUpdatedState = currentState;
139181
try {
140-
RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);
182+
RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, failedShards);
141183
if (result.changed()) {
142184
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
143185
}
@@ -147,31 +189,18 @@ public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<Sh
147189
}
148190
return batchResultBuilder.build(maybeUpdatedState);
149191
}
150-
151-
@Override
152-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
153-
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
154-
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
155-
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
156-
}
157-
}
158-
159-
@Override
160-
public void onFailure(String source, Throwable t) {
161-
logger.error("unexpected failure during [{}]", t, source);
162-
}
163192
}
164193

165194
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
166195

167-
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
196+
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) {
168197
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
169198
clusterService.submitStateUpdateTask(
170199
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
171200
shardRoutingEntry,
172201
ClusterStateTaskConfig.build(Priority.HIGH),
173202
shardFailedClusterStateHandler,
174-
shardFailedClusterStateHandler);
203+
listener);
175204
}
176205

177206
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {

0 commit comments

Comments
 (0)