|
69 | 69 | import java.util.TreeMap;
|
70 | 70 | import java.util.concurrent.CountDownLatch;
|
71 | 71 | import java.util.concurrent.TimeUnit;
|
| 72 | +import java.util.concurrent.atomic.AtomicBoolean; |
72 | 73 | import java.util.concurrent.atomic.AtomicLong;
|
73 | 74 | import java.util.function.BiFunction;
|
74 | 75 | import java.util.function.Function;
|
@@ -295,6 +296,84 @@ public void testRunStateChangePolicyWithNextStep() throws Exception {
|
295 | 296 | threadPool.shutdownNow();
|
296 | 297 | }
|
297 | 298 |
|
| 299 | + public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception { |
| 300 | + doTestRunPolicyWithFailureToReadPolicy(false, true); |
| 301 | + } |
| 302 | + |
| 303 | + public void testRunStateChangePolicyWithFailureToReadPolicy() throws Exception { |
| 304 | + doTestRunPolicyWithFailureToReadPolicy(false, false); |
| 305 | + } |
| 306 | + |
| 307 | + public void testRunAsyncActionPolicyWithFailureToReadPolicy() throws Exception { |
| 308 | + doTestRunPolicyWithFailureToReadPolicy(true, false); |
| 309 | + } |
| 310 | + |
| 311 | + public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean periodicAction) throws Exception { |
| 312 | + String policyName = "foo"; |
| 313 | + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); |
| 314 | + StepKey nextStepKey = new StepKey("phase", "action", "next_cluster_state_action_step"); |
| 315 | + MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, nextStepKey); |
| 316 | + MockClusterStateActionStep nextStep = new MockClusterStateActionStep(nextStepKey, null); |
| 317 | + MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); |
| 318 | + AtomicBoolean resolved = new AtomicBoolean(false); |
| 319 | + stepRegistry.setResolver((i, k) -> { |
| 320 | + resolved.set(true); |
| 321 | + throw new IllegalArgumentException("fake failure retrieving step"); |
| 322 | + }); |
| 323 | + ThreadPool threadPool = new TestThreadPool("name"); |
| 324 | + LifecycleExecutionState les = LifecycleExecutionState.builder() |
| 325 | + .setPhase("phase") |
| 326 | + .setAction("action") |
| 327 | + .setStep("cluster_state_action_step") |
| 328 | + .build(); |
| 329 | + IndexMetaData indexMetaData = IndexMetaData.builder("test") |
| 330 | + .settings(Settings.builder() |
| 331 | + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) |
| 332 | + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) |
| 333 | + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) |
| 334 | + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) |
| 335 | + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap()) |
| 336 | + .build(); |
| 337 | + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); |
| 338 | + DiscoveryNode node = clusterService.localNode(); |
| 339 | + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); |
| 340 | + ClusterState state = ClusterState.builder(new ClusterName("cluster")) |
| 341 | + .metaData(MetaData.builder() |
| 342 | + .put(indexMetaData, true) |
| 343 | + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) |
| 344 | + .nodes(DiscoveryNodes.builder() |
| 345 | + .add(node) |
| 346 | + .masterNodeId(node.getId()) |
| 347 | + .localNodeId(node.getId())) |
| 348 | + .build(); |
| 349 | + ClusterServiceUtils.setState(clusterService, state); |
| 350 | + long stepTime = randomLong(); |
| 351 | + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); |
| 352 | + |
| 353 | + ClusterState before = clusterService.state(); |
| 354 | + if (asyncAction) { |
| 355 | + runner.maybeRunAsyncAction(before, indexMetaData, policyName, stepKey); |
| 356 | + } else if (periodicAction) { |
| 357 | + runner.runPeriodicStep(policyName, indexMetaData); |
| 358 | + } else { |
| 359 | + runner.runPolicyAfterStateChange(policyName, indexMetaData); |
| 360 | + } |
| 361 | + |
| 362 | + // The cluster state can take a few extra milliseconds to update after the steps are executed |
| 363 | + assertBusy(() -> assertNotEquals(before, clusterService.state())); |
| 364 | + LifecycleExecutionState newExecutionState = LifecycleExecutionState |
| 365 | + .fromIndexMetadata(clusterService.state().metaData().index(indexMetaData.getIndex())); |
| 366 | + assertThat(newExecutionState.getPhase(), equalTo("phase")); |
| 367 | + assertThat(newExecutionState.getAction(), equalTo("action")); |
| 368 | + assertThat(newExecutionState.getStep(), equalTo("cluster_state_action_step")); |
| 369 | + assertThat(step.getExecuteCount(), equalTo(0L)); |
| 370 | + assertThat(nextStep.getExecuteCount(), equalTo(0L)); |
| 371 | + assertThat(newExecutionState.getStepInfo(), |
| 372 | + containsString("{\"type\":\"illegal_argument_exception\",\"reason\":\"fake failure retrieving step\"}")); |
| 373 | + clusterService.close(); |
| 374 | + threadPool.shutdownNow(); |
| 375 | + } |
| 376 | + |
298 | 377 | public void testRunAsyncActionDoesNotRun() {
|
299 | 378 | String policyName = "foo";
|
300 | 379 | StepKey stepKey = new StepKey("phase", "action", "async_action_step");
|
|
0 commit comments