Skip to content

Avoid sending duplicate remote failed shard requests #31313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 18, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
Expand All @@ -48,6 +48,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
Expand All @@ -68,7 +69,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;

public class ShardStateAction extends AbstractComponent {
Expand All @@ -80,6 +83,10 @@ public class ShardStateAction extends AbstractComponent {
private final ClusterService clusterService;
private final ThreadPool threadPool;

// a list of shards that failed during replication
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
private final ConcurrentMap<FailedShardEntry, CompositeListener> remoteFailedShardsCache = ConcurrentCollections.newConcurrentMap();

@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService, ThreadPool threadPool) {
Expand Down Expand Up @@ -146,8 +153,35 @@ private static boolean isMasterChannelException(TransportException exp) {
*/
public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message, @Nullable final Exception failure, Listener listener) {
assert primaryTerm > 0L : "primary term should be strictly positive";
FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale);
sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, listener);
final FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale);
final CompositeListener compositeListener = new CompositeListener(listener);
final CompositeListener existingListener = remoteFailedShardsCache.putIfAbsent(shardEntry, compositeListener);
if (existingListener == null) {
sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, new Listener() {
@Override
public void onSuccess() {
try {
compositeListener.onSuccess();
} finally {
remoteFailedShardsCache.remove(shardEntry);
}
}
@Override
public void onFailure(Exception e) {
try {
compositeListener.onFailure(e);
} finally {
remoteFailedShardsCache.remove(shardEntry);
}
}
});
} else {
existingListener.addListener(listener);
}
}

int remoteShardFailedCacheSize() {
return remoteFailedShardsCache.size();
}

/**
Expand Down Expand Up @@ -414,6 +448,23 @@ public String toString() {
components.add("markAsStale [" + markAsStale + "]");
return String.join(", ", components);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FailedShardEntry that = (FailedShardEntry) o;
// Exclude message and exception from equals and hashCode
return Objects.equals(this.shardId, that.shardId) &&
Objects.equals(this.allocationId, that.allocationId) &&
primaryTerm == that.primaryTerm &&
markAsStale == that.markAsStale;
}

@Override
public int hashCode() {
return Objects.hash(shardId, allocationId, primaryTerm, markAsStale);
}
}

public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) {
Expand Down Expand Up @@ -585,6 +636,74 @@ default void onFailure(final Exception e) {

}

/**
* A composite listener that allows registering multiple listeners dynamically.
*/
static final class CompositeListener implements Listener {
private boolean isNotified = false;
private Exception failure = null;
private final List<Listener> listeners = new ArrayList<>();

CompositeListener(Listener listener) {
listeners.add(listener);
}

void addListener(Listener listener) {
final boolean ready;
synchronized (this) {
ready = this.isNotified;
if (ready == false) {
listeners.add(listener);
}
}
if (ready) {
if (failure != null) {
listener.onFailure(failure);
} else {
listener.onSuccess();
}
}
}

private void onCompleted(Exception failure) {
final List<Listener> listeners;
synchronized (this) {
this.failure = failure;
listeners = this.listeners;
this.isNotified = true;
}
Exception firstException = null;
for (Listener listener : listeners) {
try {
if (failure != null) {
listener.onFailure(failure);
} else {
listener.onSuccess();
}
} catch (Exception innerEx) {
if (firstException == null) {
firstException = innerEx;
} else {
firstException.addSuppressed(innerEx);
}
}
}
if (firstException != null) {
throw new ElasticsearchException("failed to notify listener", firstException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this exception going to end up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting question. It will be reported as unhandled by the transport threads. The same thing should happen for the current implementation.

}
}

@Override
public void onSuccess() {
onCompleted(null);
}

@Override
public void onFailure(Exception failure) {
onCompleted(failure);
}
}

public static class NoLongerPrimaryShardException extends ElasticsearchException {

public NoLongerPrimaryShardException(ShardId shardId, String msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -52,10 +52,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -131,9 +131,14 @@ ClusterState applyFailedShards(ClusterState currentState, List<FailedShard> fail
tasks.addAll(failingTasks);
tasks.addAll(nonExistentTasks);
ClusterStateTaskExecutor.ClusterTasksResult<FailedShardEntry> result = failingExecutor.execute(currentState, tasks);
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure"))));
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())));
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap = new IdentityHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for turning this into a identity map?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we did not override FailedShardEntry, but now we do. I used an identity map because we verify that the size of the resultMap equals the number of tasks.

I replaced these maps with a list. I think it's clearer now.

for (FailedShardEntry failingTask : failingTasks) {
taskResultMap.put(failingTask,
ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")));
}
for (FailedShardEntry nonExistentTask : nonExistentTasks) {
taskResultMap.put(nonExistentTask, ClusterStateTaskExecutor.TaskResult.success());
}
assertTaskResults(taskResultMap, result, currentState, false);
}

Expand All @@ -147,12 +152,13 @@ public void testIllegalShardFailureRequests() throws Exception {
tasks.add(new FailedShardEntry(failingTask.shardId, failingTask.allocationId,
randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure, randomBoolean()));
}
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(
Function.identity(),
task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId,
Map<FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap = new IdentityHashMap<>();
for (FailedShardEntry task : tasks) {
taskResultMap.put(task,
ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId,
"primary term [" + task.primaryTerm + "] did not match current primary term [" +
currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]"))));
currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]")));
}
ClusterStateTaskExecutor.ClusterTasksResult<FailedShardEntry> result = executor.execute(currentState, tasks);
assertTaskResults(taskResultMap, result, currentState, false);
}
Expand Down Expand Up @@ -251,8 +257,10 @@ private static void assertTasksSuccessful(
ClusterState clusterState,
boolean clusterStateChanged
) {
Map<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap =
tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()));
Map<ShardStateAction.FailedShardEntry, ClusterStateTaskExecutor.TaskResult> taskResultMap = new IdentityHashMap<>();
for (FailedShardEntry task : tasks) {
taskResultMap.put(task, ClusterStateTaskExecutor.TaskResult.success());
}
assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged);
}

Expand Down
Loading