Skip to content

Commit 44467df

Browse files
committed
Merge pull request #15456 from jasontedor/reorganize-shard-state-action
Reorganize o/e/c/a/s/ShardStateAction.java
2 parents 4597a22 + 36bd845 commit 44467df

File tree

1 file changed

+89
-81
lines changed

1 file changed

+89
-81
lines changed

core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

+89-81
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
package org.elasticsearch.cluster.action.shard;
2121

2222
import org.elasticsearch.ExceptionsHelper;
23-
import org.elasticsearch.cluster.*;
23+
import org.elasticsearch.cluster.ClusterService;
24+
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.ClusterStateTaskConfig;
26+
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
27+
import org.elasticsearch.cluster.ClusterStateTaskListener;
2428
import org.elasticsearch.cluster.metadata.IndexMetaData;
2529
import org.elasticsearch.cluster.node.DiscoveryNode;
2630
import org.elasticsearch.cluster.routing.RoutingService;
@@ -37,19 +41,23 @@
3741
import org.elasticsearch.common.settings.Settings;
3842
import org.elasticsearch.common.unit.TimeValue;
3943
import org.elasticsearch.threadpool.ThreadPool;
40-
import org.elasticsearch.transport.*;
44+
import org.elasticsearch.transport.EmptyTransportResponseHandler;
45+
import org.elasticsearch.transport.TransportChannel;
46+
import org.elasticsearch.transport.TransportException;
47+
import org.elasticsearch.transport.TransportRequest;
48+
import org.elasticsearch.transport.TransportRequestHandler;
49+
import org.elasticsearch.transport.TransportRequestOptions;
50+
import org.elasticsearch.transport.TransportResponse;
51+
import org.elasticsearch.transport.TransportService;
4152

4253
import java.io.IOException;
4354
import java.util.ArrayList;
4455
import java.util.List;
4556

4657
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
4758

48-
/**
49-
*
50-
*/
51-
public class ShardStateAction extends AbstractComponent {
5259

60+
public class ShardStateAction extends AbstractComponent {
5361
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
5462
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
5563

@@ -97,52 +105,26 @@ private void innerShardFailed(final ShardRouting shardRouting, final String inde
97105
options = TransportRequestOptions.builder().withTimeout(timeout).build();
98106
}
99107
transportService.sendRequest(masterNode,
100-
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
101-
@Override
102-
public void handleResponse(TransportResponse.Empty response) {
103-
listener.onSuccess();
104-
}
105-
106-
@Override
107-
public void handleException(TransportException exp) {
108-
logger.warn("failed to send failed shard to {}", exp, masterNode);
109-
listener.onShardFailedFailure(masterNode, exp);
110-
}
111-
});
112-
}
113-
114-
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
115-
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
116-
if (masterNode == null) {
117-
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
118-
return;
119-
}
120-
shardStarted(shardRouting, indexUUID, reason, masterNode);
121-
}
122-
123-
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
124-
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
125-
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
126-
transportService.sendRequest(masterNode,
127-
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
128-
@Override
129-
public void handleException(TransportException exp) {
130-
logger.warn("failed to send shard started to [{}]", exp, masterNode);
131-
}
108+
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
109+
@Override
110+
public void handleResponse(TransportResponse.Empty response) {
111+
listener.onSuccess();
112+
}
132113

133-
});
114+
@Override
115+
public void handleException(TransportException exp) {
116+
logger.warn("failed to send failed shard to {}", exp, masterNode);
117+
listener.onShardFailedFailure(masterNode, exp);
118+
}
119+
});
134120
}
135121

136-
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
137-
138-
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
139-
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
140-
clusterService.submitStateUpdateTask(
141-
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
142-
shardRoutingEntry,
143-
ClusterStateTaskConfig.build(Priority.HIGH),
144-
shardFailedClusterStateHandler,
145-
shardFailedClusterStateHandler);
122+
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
123+
@Override
124+
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
125+
handleShardFailureOnMaster(request);
126+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
127+
}
146128
}
147129

148130
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
@@ -168,10 +150,10 @@ public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<Sh
168150

169151
@Override
170152
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
171-
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
172-
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
173-
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
174-
}
153+
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
154+
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
155+
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
156+
}
175157
}
176158

177159
@Override
@@ -180,18 +162,45 @@ public void onFailure(String source, Throwable t) {
180162
}
181163
}
182164

183-
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler =
184-
new ShardStartedClusterStateHandler();
185-
186-
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
187-
logger.debug("received shard started for {}", shardRoutingEntry);
165+
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();
188166

167+
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
168+
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
189169
clusterService.submitStateUpdateTask(
190-
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
191-
shardRoutingEntry,
192-
ClusterStateTaskConfig.build(Priority.URGENT),
193-
shardStartedClusterStateHandler,
194-
shardStartedClusterStateHandler);
170+
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
171+
shardRoutingEntry,
172+
ClusterStateTaskConfig.build(Priority.HIGH),
173+
shardFailedClusterStateHandler,
174+
shardFailedClusterStateHandler);
175+
}
176+
177+
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
178+
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
179+
if (masterNode == null) {
180+
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
181+
return;
182+
}
183+
shardStarted(shardRouting, indexUUID, reason, masterNode);
184+
}
185+
186+
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
187+
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
188+
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
189+
transportService.sendRequest(masterNode,
190+
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
191+
@Override
192+
public void handleException(TransportException exp) {
193+
logger.warn("failed to send shard started to [{}]", exp, masterNode);
194+
}
195+
});
196+
}
197+
198+
class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
199+
@Override
200+
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
201+
handleShardStartedOnMaster(request);
202+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
203+
}
195204
}
196205

197206
class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
@@ -223,26 +232,20 @@ public void onFailure(String source, Throwable t) {
223232
}
224233
}
225234

226-
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
235+
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler();
227236

228-
@Override
229-
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
230-
handleShardFailureOnMaster(request);
231-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
232-
}
233-
}
234-
235-
class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
237+
private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
238+
logger.debug("received shard started for {}", shardRoutingEntry);
236239

237-
@Override
238-
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
239-
shardStartedOnMaster(request);
240-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
241-
}
240+
clusterService.submitStateUpdateTask(
241+
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
242+
shardRoutingEntry,
243+
ClusterStateTaskConfig.build(Priority.URGENT),
244+
shardStartedClusterStateHandler,
245+
shardStartedClusterStateHandler);
242246
}
243247

244248
public static class ShardRoutingEntry extends TransportRequest {
245-
246249
ShardRouting shardRouting;
247250
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
248251
String message;
@@ -283,8 +286,13 @@ public String toString() {
283286
}
284287

285288
public interface Listener {
286-
default void onSuccess() {}
287-
default void onShardFailedNoMaster() {}
288-
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {}
289+
default void onSuccess() {
290+
}
291+
292+
default void onShardFailedNoMaster() {
293+
}
294+
295+
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {
296+
}
289297
}
290298
}

0 commit comments

Comments
 (0)