Skip to content

ILM use Priority.IMMEDIATE for stop ILM cluster update #54909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -363,7 +364,13 @@ assert isClusterServiceStoppedOrClosed() : "close is called by closing the plugi
}

public void submitOperationModeUpdate(OperationMode mode) {
clusterService.submitStateUpdateTask("ilm_operation_mode_update", OperationModeUpdateTask.ilmMode(mode));
OperationModeUpdateTask ilmOperationModeUpdateTask;
if (mode == OperationMode.STOPPING || mode == OperationMode.STOPPED) {
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.IMMEDIATE, mode);
} else {
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.NORMAL, mode);
}
clusterService.submitStateUpdateTask("ilm_operation_mode_update {OperationMode " + mode.name() + "}", ilmOperationModeUpdateTask);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,40 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;

/**
* This task updates the operation mode state for ILM.
*
* As stopping ILM proved to be an action we want to sometimes take in order to allow clusters to stabilise when under heavy load this
* task might run at {@link Priority#IMMEDIATE} priority so please make sure to keep this task as lightweight as possible.
*/
public class OperationModeUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
@Nullable
private final OperationMode ilmMode;
@Nullable
private final OperationMode slmMode;

private OperationModeUpdateTask(OperationMode ilmMode, OperationMode slmMode) {
private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, OperationMode slmMode) {
super(priority);
this.ilmMode = ilmMode;
this.slmMode = slmMode;
}

public static OperationModeUpdateTask ilmMode(OperationMode mode) {
return new OperationModeUpdateTask(mode, null);
return ilmMode(Priority.NORMAL, mode);
}

public static OperationModeUpdateTask ilmMode(Priority priority, OperationMode mode) {
return new OperationModeUpdateTask(priority, mode, null);
}

public static OperationModeUpdateTask slmMode(OperationMode mode) {
return new OperationModeUpdateTask(null, mode);
return new OperationModeUpdateTask(Priority.NORMAL, null, mode);
}

OperationMode getILMOperationMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -50,7 +51,8 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
@Override
protected void masterOperation(Task task, StopILMRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.IMMEDIATE, request, listener) {

@Override
public ClusterState execute(ClusterState currentState) {
return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.Lifecycle.State;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -40,8 +41,10 @@
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.hamcrest.Description;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

import java.time.Clock;
Expand All @@ -62,9 +65,11 @@
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class IndexLifecycleServiceTests extends ESTestCase {
Expand Down Expand Up @@ -242,7 +247,8 @@ private void verifyCanStopWithStep(String stoppableStep) {
doAnswer(invocationOnMock -> {
changedOperationMode.set(true);
return null;
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"),
any(OperationModeUpdateTask.class));
indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, true);
assertTrue(changedOperationMode.get());
Expand Down Expand Up @@ -294,7 +300,8 @@ public void testRequestedStopOnSafeAction() {
assertThat(task.getILMOperationMode(), equalTo(OperationMode.STOPPED));
moveToMaintenance.set(true);
return null;
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"),
any(OperationModeUpdateTask.class));

indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
Expand All @@ -310,6 +317,40 @@ public void testExceptionStillProcessesOtherIndicesOnMaster() {
doTestExceptionStillProcessesOtherIndices(true);
}

public void testOperationModeUpdateTaskPriority() {
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPING);
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPING, Priority.IMMEDIATE);
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPED);
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPED, Priority.IMMEDIATE);
indexLifecycleService.submitOperationModeUpdate(OperationMode.RUNNING);
verifyOperationModeUpdateTaskPriority(OperationMode.RUNNING, Priority.NORMAL);
}

private void verifyOperationModeUpdateTaskPriority(OperationMode mode, Priority expectedPriority) {
verify(clusterService).submitStateUpdateTask(
Mockito.eq("ilm_operation_mode_update {OperationMode " + mode.name() +"}"),
argThat(new ArgumentMatcher<OperationModeUpdateTask>() {

Priority actualPriority = null;

@Override
public boolean matches(Object argument) {
if (argument instanceof OperationModeUpdateTask == false) {
return false;
}
actualPriority = ((OperationModeUpdateTask) argument).priority();
return actualPriority == expectedPriority;
}

@Override
public void describeTo(Description description) {
description.appendText("the cluster state update task priority must be "+ expectedPriority+" but got: ")
.appendText(actualPriority.name());
}
})
);
}

@SuppressWarnings("unchecked")
public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
String policy1 = randomAlphaOfLengthBetween(1, 20);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ilm.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
import org.elasticsearch.xpack.core.ilm.action.StopILMAction;
import org.hamcrest.Description;
import org.mockito.ArgumentMatcher;

import static java.util.Collections.emptyMap;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class TransportStopILMActionTests extends ESTestCase {

private static final ActionListener<AcknowledgedResponse> EMPTY_LISTENER = new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {

}

@Override
public void onFailure(Exception e) {

}
};

@SuppressWarnings("unchecked")
public void testStopILMClusterStatePriorityIsImmediate() {
ClusterService clusterService = mock(ClusterService.class);

TransportStopILMAction transportStopILMAction = new TransportStopILMAction(mock(TransportService.class),
clusterService, mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
Task task = new Task(randomLong(), "transport", StopILMAction.NAME, "description",
new TaskId(randomLong() + ":" + randomLong()), emptyMap());
StopILMRequest request = new StopILMRequest();
transportStopILMAction.masterOperation(task, request, ClusterState.EMPTY_STATE, EMPTY_LISTENER);

verify(clusterService).submitStateUpdateTask(
eq("ilm_operation_mode_update"),
argThat(new ArgumentMatcher<AckedClusterStateUpdateTask<AcknowledgedResponse>>() {

Priority actualPriority = null;

@Override
public boolean matches(Object argument) {
if (argument instanceof AckedClusterStateUpdateTask == false) {
return false;
}
actualPriority = ((AckedClusterStateUpdateTask<AcknowledgedResponse>) argument).priority();
return actualPriority == Priority.IMMEDIATE;
}

@Override
public void describeTo(Description description) {
description.appendText("the cluster state update task priority must be URGENT but got: ")
.appendText(actualPriority.name());
}
})
);
}

}