Skip to content

Reorganize o/e/c/a/s/ShardStateAction.java #15456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 16, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
package org.elasticsearch.cluster.action.shard;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
Expand All @@ -37,19 +41,23 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;

/**
*
*/
public class ShardStateAction extends AbstractComponent {

public class ShardStateAction extends AbstractComponent {
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";

Expand Down Expand Up @@ -97,52 +105,26 @@ private void innerShardFailed(final ShardRouting shardRouting, final String inde
options = TransportRequestOptions.builder().withTimeout(timeout).build();
}
transportService.sendRequest(masterNode,
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
}

@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
listener.onShardFailedFailure(masterNode, exp);
}
});
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
return;
}
shardStarted(shardRouting, indexUUID, reason, masterNode);
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, options, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onSuccess();
}

});
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send failed shard to {}", exp, masterNode);
listener.onShardFailedFailure(masterNode, exp);
}
});
}

private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();

private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask(
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
shardFailedClusterStateHandler);
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardFailureOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
Expand All @@ -168,10 +150,10 @@ public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<Sh

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
}
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
}
}

@Override
Expand All @@ -180,18 +162,45 @@ public void onFailure(String source, Throwable t) {
}
}

private final ShardStartedClusterStateHandler shardStartedClusterStateHandler =
new ShardStartedClusterStateHandler();

private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();

private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateHandler,
shardStartedClusterStateHandler);
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
shardFailedClusterStateHandler);
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
if (masterNode == null) {
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
return;
}
shardStarted(shardRouting, indexUUID, reason, masterNode);
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
}
});
}

class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardStartedOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
Expand Down Expand Up @@ -223,26 +232,20 @@ public void onFailure(String source, Throwable t) {
}
}

private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler();

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardFailureOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
shardStartedOnMaster(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateHandler,
shardStartedClusterStateHandler);
}

public static class ShardRoutingEntry extends TransportRequest {

ShardRouting shardRouting;
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
String message;
Expand Down Expand Up @@ -283,8 +286,13 @@ public String toString() {
}

public interface Listener {
default void onSuccess() {}
default void onShardFailedNoMaster() {}
default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {}
default void onSuccess() {
}

default void onShardFailedNoMaster() {
}

default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) {
}
}
}