|
24 | 24 | import org.elasticsearch.cluster.ClusterChangedEvent;
|
25 | 25 | import org.elasticsearch.cluster.ClusterState;
|
26 | 26 | import org.elasticsearch.cluster.ClusterStateApplier;
|
27 |
| -import org.elasticsearch.cluster.ClusterStateTaskConfig; |
28 |
| -import org.elasticsearch.cluster.ClusterStateTaskExecutor; |
29 |
| -import org.elasticsearch.cluster.ClusterStateTaskListener; |
30 | 27 | import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
31 | 28 | import org.elasticsearch.cluster.RestoreInProgress;
|
32 | 29 | import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus;
|
|
123 | 120 | * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
|
124 | 121 | * at the {@link ShardRouting#recoverySource()} property.
|
125 | 122 | * <p>
|
126 |
| - * At the end of the successful restore process {@code RestoreService} calls {@link #cleanupRestoreState(ClusterChangedEvent)}, |
| 123 | + * At the end of the successful restore process {@code RestoreService} calls {@link #removeCompletedRestoresFromClusterState()}, |
127 | 124 | * which removes {@link RestoreInProgress} when all shards are completed. In case of
|
128 | 125 | * restore failure a normal recovery fail-over process kicks in.
|
129 | 126 | */
|
@@ -176,8 +173,6 @@ public class RestoreService implements ClusterStateApplier {
|
176 | 173 |
|
177 | 174 | private volatile boolean refreshRepositoryUuidOnRestore;
|
178 | 175 |
|
179 |
| - private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(); |
180 |
| - |
181 | 176 | public RestoreService(
|
182 | 177 | ClusterService clusterService,
|
183 | 178 | RepositoriesService repositoriesService,
|
@@ -1039,74 +1034,6 @@ public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
|
1039 | 1034 | }
|
1040 | 1035 | }
|
1041 | 1036 |
|
1042 |
| - public static RestoreInProgress.Entry restoreInProgress(ClusterState state, String restoreUUID) { |
1043 |
| - return state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).get(restoreUUID); |
1044 |
| - } |
1045 |
| - |
1046 |
| - static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor<CleanRestoreStateTaskExecutor.Task>, |
1047 |
| - ClusterStateTaskListener { |
1048 |
| - |
1049 |
| - static class Task { |
1050 |
| - final String uuid; |
1051 |
| - |
1052 |
| - Task(String uuid) { |
1053 |
| - this.uuid = uuid; |
1054 |
| - } |
1055 |
| - |
1056 |
| - @Override |
1057 |
| - public String toString() { |
1058 |
| - return "clean restore state for restore " + uuid; |
1059 |
| - } |
1060 |
| - } |
1061 |
| - |
1062 |
| - @Override |
1063 |
| - public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) { |
1064 |
| - final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks); |
1065 |
| - Set<String> completedRestores = tasks.stream().map(e -> e.uuid).collect(Collectors.toSet()); |
1066 |
| - RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder(); |
1067 |
| - boolean changed = false; |
1068 |
| - for (RestoreInProgress.Entry entry : currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) { |
1069 |
| - if (completedRestores.contains(entry.uuid())) { |
1070 |
| - changed = true; |
1071 |
| - } else { |
1072 |
| - restoreInProgressBuilder.add(entry); |
1073 |
| - } |
1074 |
| - } |
1075 |
| - if (changed == false) { |
1076 |
| - return resultBuilder.build(currentState); |
1077 |
| - } |
1078 |
| - ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(currentState.getCustoms()); |
1079 |
| - builder.put(RestoreInProgress.TYPE, restoreInProgressBuilder.build()); |
1080 |
| - ImmutableOpenMap<String, ClusterState.Custom> customs = builder.build(); |
1081 |
| - return resultBuilder.build(ClusterState.builder(currentState).customs(customs).build()); |
1082 |
| - } |
1083 |
| - |
1084 |
| - @Override |
1085 |
| - public void onFailure(final String source, final Exception e) { |
1086 |
| - logger.error(() -> new ParameterizedMessage("unexpected failure during [{}]", source), e); |
1087 |
| - } |
1088 |
| - |
1089 |
| - @Override |
1090 |
| - public void onNoLongerMaster(String source) { |
1091 |
| - logger.debug("no longer master while processing restore state update [{}]", source); |
1092 |
| - } |
1093 |
| - |
1094 |
| - } |
1095 |
| - |
1096 |
| - private void cleanupRestoreState(ClusterChangedEvent event) { |
1097 |
| - for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) { |
1098 |
| - if (entry.state().completed()) { |
1099 |
| - assert completed(entry.shards()) : "state says completed but restore entries are not"; |
1100 |
| - clusterService.submitStateUpdateTask( |
1101 |
| - "clean up snapshot restore state", |
1102 |
| - new CleanRestoreStateTaskExecutor.Task(entry.uuid()), |
1103 |
| - ClusterStateTaskConfig.build(Priority.URGENT), |
1104 |
| - cleanRestoreStateTaskExecutor, |
1105 |
| - cleanRestoreStateTaskExecutor); |
1106 |
| - } |
1107 |
| - } |
1108 |
| - } |
1109 |
| - |
1110 | 1037 | private static RestoreInProgress.State overallState(RestoreInProgress.State nonCompletedState,
|
1111 | 1038 | ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
|
1112 | 1039 | boolean hasFailed = false;
|
@@ -1230,13 +1157,69 @@ public static Set<Index> restoringIndices(final ClusterState currentState, final
|
1230 | 1157 | return indices;
|
1231 | 1158 | }
|
1232 | 1159 |
|
| 1160 | + public static RestoreInProgress.Entry restoreInProgress(ClusterState state, String restoreUUID) { |
| 1161 | + return state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).get(restoreUUID); |
| 1162 | + } |
| 1163 | + |
| 1164 | + /** |
| 1165 | + * Set to true if {@link #removeCompletedRestoresFromClusterState()} already has an in-flight state update running that will clean up |
| 1166 | + * all completed restores from the cluster state. |
| 1167 | + */ |
| 1168 | + private volatile boolean cleanupInProgress = false; |
| 1169 | + |
| 1170 | + // run a cluster state update that removes all completed restores from the cluster state |
| 1171 | + private void removeCompletedRestoresFromClusterState() { |
| 1172 | + clusterService.submitStateUpdateTask("clean up snapshot restore status", new ClusterStateUpdateTask(Priority.URGENT) { |
| 1173 | + @Override |
| 1174 | + public ClusterState execute(ClusterState currentState) { |
| 1175 | + RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder(); |
| 1176 | + boolean changed = false; |
| 1177 | + for (RestoreInProgress.Entry entry : currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) { |
| 1178 | + if (entry.state().completed()) { |
| 1179 | + changed = true; |
| 1180 | + } else { |
| 1181 | + restoreInProgressBuilder.add(entry); |
| 1182 | + } |
| 1183 | + } |
| 1184 | + return changed == false ? currentState : ClusterState.builder(currentState).putCustom( |
| 1185 | + RestoreInProgress.TYPE, restoreInProgressBuilder.build()).build(); |
| 1186 | + } |
| 1187 | + |
| 1188 | + @Override |
| 1189 | + public void onFailure(final String source, final Exception e) { |
| 1190 | + cleanupInProgress = false; |
| 1191 | + logger.warn(() -> new ParameterizedMessage("failed to remove completed restores from cluster state"), e); |
| 1192 | + } |
| 1193 | + |
| 1194 | + @Override |
| 1195 | + public void onNoLongerMaster(String source) { |
| 1196 | + cleanupInProgress = false; |
| 1197 | + logger.debug("no longer master while removing completed restores from cluster state"); |
| 1198 | + } |
| 1199 | + |
| 1200 | + @Override |
| 1201 | + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { |
| 1202 | + cleanupInProgress = false; |
| 1203 | + } |
| 1204 | + }); |
| 1205 | + } |
| 1206 | + |
1233 | 1207 | @Override
|
1234 | 1208 | public void applyClusterState(ClusterChangedEvent event) {
|
1235 | 1209 | try {
|
1236 |
| - if (event.localNodeMaster()) { |
1237 |
| - cleanupRestoreState(event); |
| 1210 | + if (event.localNodeMaster() && cleanupInProgress == false) { |
| 1211 | + for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) { |
| 1212 | + if (entry.state().completed()) { |
| 1213 | + assert completed(entry.shards()) : "state says completed but restore entries are not"; |
| 1214 | + removeCompletedRestoresFromClusterState(); |
| 1215 | + cleanupInProgress = true; |
| 1216 | + // the above method cleans up all completed restores, no need to keep looping |
| 1217 | + break; |
| 1218 | + } |
| 1219 | + } |
1238 | 1220 | }
|
1239 | 1221 | } catch (Exception t) {
|
| 1222 | + assert false : t; |
1240 | 1223 | logger.warn("Failed to update restore state ", t);
|
1241 | 1224 | }
|
1242 | 1225 | }
|
|
0 commit comments