|
72 | 72 | import java.util.TreeMap;
|
73 | 73 | import java.util.concurrent.CountDownLatch;
|
74 | 74 | import java.util.concurrent.TimeUnit;
|
| 75 | +import java.util.concurrent.atomic.AtomicBoolean; |
75 | 76 | import java.util.concurrent.atomic.AtomicLong;
|
76 | 77 | import java.util.function.BiFunction;
|
77 | 78 | import java.util.function.Function;
|
@@ -356,6 +357,84 @@ public void testRunStateChangePolicyWithNextStep() throws Exception {
|
356 | 357 | threadPool.shutdownNow();
|
357 | 358 | }
|
358 | 359 |
|
| 360 | + public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception { |
| 361 | + doTestRunPolicyWithFailureToReadPolicy(false, true); |
| 362 | + } |
| 363 | + |
| 364 | + public void testRunStateChangePolicyWithFailureToReadPolicy() throws Exception { |
| 365 | + doTestRunPolicyWithFailureToReadPolicy(false, false); |
| 366 | + } |
| 367 | + |
| 368 | + public void testRunAsyncActionPolicyWithFailureToReadPolicy() throws Exception { |
| 369 | + doTestRunPolicyWithFailureToReadPolicy(true, false); |
| 370 | + } |
| 371 | + |
| 372 | + public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean periodicAction) throws Exception { |
| 373 | + String policyName = "foo"; |
| 374 | + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); |
| 375 | + StepKey nextStepKey = new StepKey("phase", "action", "next_cluster_state_action_step"); |
| 376 | + MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, nextStepKey); |
| 377 | + MockClusterStateActionStep nextStep = new MockClusterStateActionStep(nextStepKey, null); |
| 378 | + MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); |
| 379 | + AtomicBoolean resolved = new AtomicBoolean(false); |
| 380 | + stepRegistry.setResolver((i, k) -> { |
| 381 | + resolved.set(true); |
| 382 | + throw new IllegalArgumentException("fake failure retrieving step"); |
| 383 | + }); |
| 384 | + ThreadPool threadPool = new TestThreadPool("name"); |
| 385 | + LifecycleExecutionState les = LifecycleExecutionState.builder() |
| 386 | + .setPhase("phase") |
| 387 | + .setAction("action") |
| 388 | + .setStep("cluster_state_action_step") |
| 389 | + .build(); |
| 390 | + IndexMetaData indexMetaData = IndexMetaData.builder("test") |
| 391 | + .settings(Settings.builder() |
| 392 | + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) |
| 393 | + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) |
| 394 | + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) |
| 395 | + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) |
| 396 | + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap()) |
| 397 | + .build(); |
| 398 | + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); |
| 399 | + DiscoveryNode node = clusterService.localNode(); |
| 400 | + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); |
| 401 | + ClusterState state = ClusterState.builder(new ClusterName("cluster")) |
| 402 | + .metaData(MetaData.builder() |
| 403 | + .put(indexMetaData, true) |
| 404 | + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) |
| 405 | + .nodes(DiscoveryNodes.builder() |
| 406 | + .add(node) |
| 407 | + .masterNodeId(node.getId()) |
| 408 | + .localNodeId(node.getId())) |
| 409 | + .build(); |
| 410 | + ClusterServiceUtils.setState(clusterService, state); |
| 411 | + long stepTime = randomLong(); |
| 412 | + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime); |
| 413 | + |
| 414 | + ClusterState before = clusterService.state(); |
| 415 | + if (asyncAction) { |
| 416 | + runner.maybeRunAsyncAction(before, indexMetaData, policyName, stepKey); |
| 417 | + } else if (periodicAction) { |
| 418 | + runner.runPeriodicStep(policyName, indexMetaData); |
| 419 | + } else { |
| 420 | + runner.runPolicyAfterStateChange(policyName, indexMetaData); |
| 421 | + } |
| 422 | + |
| 423 | + // The cluster state can take a few extra milliseconds to update after the steps are executed |
| 424 | + assertBusy(() -> assertNotEquals(before, clusterService.state())); |
| 425 | + LifecycleExecutionState newExecutionState = LifecycleExecutionState |
| 426 | + .fromIndexMetadata(clusterService.state().metaData().index(indexMetaData.getIndex())); |
| 427 | + assertThat(newExecutionState.getPhase(), equalTo("phase")); |
| 428 | + assertThat(newExecutionState.getAction(), equalTo("action")); |
| 429 | + assertThat(newExecutionState.getStep(), equalTo("cluster_state_action_step")); |
| 430 | + assertThat(step.getExecuteCount(), equalTo(0L)); |
| 431 | + assertThat(nextStep.getExecuteCount(), equalTo(0L)); |
| 432 | + assertThat(newExecutionState.getStepInfo(), |
| 433 | + containsString("{\"type\":\"illegal_argument_exception\",\"reason\":\"fake failure retrieving step\"}")); |
| 434 | + clusterService.close(); |
| 435 | + threadPool.shutdownNow(); |
| 436 | + } |
| 437 | + |
359 | 438 | public void testRunAsyncActionDoesNotRun() {
|
360 | 439 | String policyName = "foo";
|
361 | 440 | StepKey stepKey = new StepKey("phase", "action", "async_action_step");
|
|
0 commit comments