Skip to content

Prevent RestoreService from Triggering Redundant CS Updates (#71812) #71820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 59 additions & 76 deletions server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus;
Expand Down Expand Up @@ -123,7 +120,7 @@
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link ShardRouting#recoverySource()} property.
* <p>
* At the end of the successful restore process {@code RestoreService} calls {@link #cleanupRestoreState(ClusterChangedEvent)},
* At the end of the successful restore process {@code RestoreService} calls {@link #removeCompletedRestoresFromClusterState()},
* which removes {@link RestoreInProgress} when all shards are completed. In case of
* restore failure a normal recovery fail-over process kicks in.
*/
Expand Down Expand Up @@ -176,8 +173,6 @@ public class RestoreService implements ClusterStateApplier {

private volatile boolean refreshRepositoryUuidOnRestore;

private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();

public RestoreService(
ClusterService clusterService,
RepositoriesService repositoriesService,
Expand Down Expand Up @@ -1039,74 +1034,6 @@ public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
}
}

public static RestoreInProgress.Entry restoreInProgress(ClusterState state, String restoreUUID) {
return state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).get(restoreUUID);
}

static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor<CleanRestoreStateTaskExecutor.Task>,
ClusterStateTaskListener {

static class Task {
final String uuid;

Task(String uuid) {
this.uuid = uuid;
}

@Override
public String toString() {
return "clean restore state for restore " + uuid;
}
}

@Override
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) {
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
Set<String> completedRestores = tasks.stream().map(e -> e.uuid).collect(Collectors.toSet());
RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder();
boolean changed = false;
for (RestoreInProgress.Entry entry : currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (completedRestores.contains(entry.uuid())) {
changed = true;
} else {
restoreInProgressBuilder.add(entry);
}
}
if (changed == false) {
return resultBuilder.build(currentState);
}
ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(currentState.getCustoms());
builder.put(RestoreInProgress.TYPE, restoreInProgressBuilder.build());
ImmutableOpenMap<String, ClusterState.Custom> customs = builder.build();
return resultBuilder.build(ClusterState.builder(currentState).customs(customs).build());
}

@Override
public void onFailure(final String source, final Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
}

@Override
public void onNoLongerMaster(String source) {
logger.debug("no longer master while processing restore state update [{}]", source);
}

}

private void cleanupRestoreState(ClusterChangedEvent event) {
for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (entry.state().completed()) {
assert completed(entry.shards()) : "state says completed but restore entries are not";
clusterService.submitStateUpdateTask(
"clean up snapshot restore state",
new CleanRestoreStateTaskExecutor.Task(entry.uuid()),
ClusterStateTaskConfig.build(Priority.URGENT),
cleanRestoreStateTaskExecutor,
cleanRestoreStateTaskExecutor);
}
}
}

private static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState,
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
boolean hasFailed = false;
Expand Down Expand Up @@ -1230,13 +1157,69 @@ public static Set<Index> restoringIndices(final ClusterState currentState, final
return indices;
}

public static RestoreInProgress.Entry restoreInProgress(ClusterState state, String restoreUUID) {
return state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).get(restoreUUID);
}

/**
* Set to true if {@link #removeCompletedRestoresFromClusterState()} already has an in-flight state update running that will clean up
* all completed restores from the cluster state.
*/
private volatile boolean cleanupInProgress = false;

// run a cluster state update that removes all completed restores from the cluster state
private void removeCompletedRestoresFromClusterState() {
clusterService.submitStateUpdateTask("clean up snapshot restore status", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder();
boolean changed = false;
for (RestoreInProgress.Entry entry : currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (entry.state().completed()) {
changed = true;
} else {
restoreInProgressBuilder.add(entry);
}
}
return changed == false ? currentState : ClusterState.builder(currentState).putCustom(
RestoreInProgress.TYPE, restoreInProgressBuilder.build()).build();
}

@Override
public void onFailure(final String source, final Exception e) {
cleanupInProgress = false;
logger.warn(() -> new ParameterizedMessage("failed to remove completed restores from cluster state"), e);
}

@Override
public void onNoLongerMaster(String source) {
cleanupInProgress = false;
logger.debug("no longer master while removing completed restores from cluster state");
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
cleanupInProgress = false;
}
});
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
try {
if (event.localNodeMaster()) {
cleanupRestoreState(event);
if (event.localNodeMaster() && cleanupInProgress == false) {
for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (entry.state().completed()) {
assert completed(entry.shards()) : "state says completed but restore entries are not";
removeCompletedRestoresFromClusterState();
cleanupInProgress = true;
// the above method cleans up all completed restores, no need to keep looping
break;
}
}
}
} catch (Exception t) {
assert false : t;
logger.warn("Failed to update restore state ", t);
}
}
Expand Down