Skip to content

Expose master timeout for ILM actions #51130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentObject;

/**
Expand All @@ -28,11 +29,21 @@ protected Client getClient() {
return client;
}

public abstract void evaluateCondition(IndexMetaData indexMetaData, Listener listener);
public void evaluateCondition(Settings settings, IndexMetaData indexMetaData, Listener listener){
evaluateCondition(indexMetaData, listener);
}

public void evaluateCondition(IndexMetaData indexMetaData, Listener listener){
try {
throw new UnsupportedOperationException();
} catch (UnsupportedOperationException e) {
listener.onFailure(e);
}
}

public interface Listener {

void onResponse(boolean conditionMet, ToXContentObject infomationContext);
void onResponse(boolean conditionMet, ToXContentObject informationContext);

void onFailure(Exception e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentCl
}

if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex);
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex)
.masterNodeTimeout(getMasterTimeout(currentClusterState));
getClient().admin().indices().close(closeIndexRequest, ActionListener.wrap(
r -> {
assert r.isAcknowledged() : "close index response is not acknowledged";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
@Override
public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
getClient().admin().indices()
.delete(new DeleteIndexRequest(indexMetaData.getIndex().getName()),
.delete(new DeleteIndexRequest(indexMetaData.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public FreezeStep(StepKey key, StepKey nextStepKey, Client client) {
@Override
public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
getClient().admin().indices().execute(FreezeIndexAction.INSTANCE,
new FreezeRequest(indexMetaData.getIndex().getName()),
new FreezeRequest(indexMetaData.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ public class LifecycleSettings {
public static final String LIFECYCLE_ORIGINATION_DATE = "index.lifecycle.origination_date";
public static final String LIFECYCLE_PARSE_ORIGINATION_DATE = "index.lifecycle.parse_origination_date";
public static final String LIFECYCLE_HISTORY_INDEX_ENABLED = "index.lifecycle.history_index_enabled";
public static final String LIFECYCLE_STEP_MASTER_TIMEOUT = "index.lifecycle.step.master_timeout";

public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled";
public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule";
public static final String SLM_RETENTION_DURATION = "slm.retention_duration";


public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL,
TimeValue.timeValueMinutes(10), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<String> LIFECYCLE_NAME_SETTING = Setting.simpleString(LIFECYCLE_NAME,
Expand All @@ -38,7 +38,9 @@ public class LifecycleSettings {
false, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<Boolean> LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(LIFECYCLE_HISTORY_INDEX_ENABLED,
true, Setting.Property.NodeScope);

public static final Setting<TimeValue> LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING =
Setting.positiveTimeSetting(LIFECYCLE_STEP_MASTER_TIMEOUT, TimeValue.timeValueSeconds(30), Setting.Property.Dynamic,
Setting.Property.NodeScope);

public static final Setting<Boolean> SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true,
Setting.Property.NodeScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ final class OpenFollowerIndexStep extends AsyncActionStep {
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName());
OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(currentClusterState));
getClient().admin().indices().open(request, ActionListener.wrap(
r -> {
assert r.isAcknowledged() : "open index response is not acknowledged";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentClust
}

// Calling rollover with no conditions will always roll over the index
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null)
.masterNodeTimeout(getMasterTimeout(currentClusterState));
getClient().admin().indices().rolloverIndex(rolloverRequest,
ActionListener.wrap(response -> {
assert response.isRolledOver() : "the only way this rollover call should fail is with an exception";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState clusterState
Settings settings = Settings.builder()
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", nodeId.get()).build();
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(clusterState))
.settings(settings);
getClient().admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState cu
// get target shrink index
String targetIndexName = shrunkIndexPrefix + index;
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest()
.masterNodeTimeout(getMasterTimeout(currentState))
.addAliasAction(IndicesAliasesRequest.AliasActions.removeIndex().index(index))
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(targetIndexName).alias(index));
// copy over other aliases from original index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentState
.build();

String shrunkenIndexName = shrunkIndexPrefix + indexMetaData.getIndex().getName();
ResizeRequest resizeRequest = new ResizeRequest(shrunkenIndexName, indexMetaData.getIndex().getName());
ResizeRequest resizeRequest = new ResizeRequest(shrunkenIndexName, indexMetaData.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(currentState));
resizeRequest.getTargetIndexRequest().settings(relevantTargetSettings);

getClient().admin().indices().resizeIndex(resizeRequest, ActionListener.wrap(response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -22,6 +25,7 @@
* Represents one part of the execution of a {@link LifecycleAction}.
*/
public abstract class Step {

private final StepKey key;
private final StepKey nextStepKey;

Expand Down Expand Up @@ -60,14 +64,21 @@ public boolean equals(Object obj) {
}
Step other = (Step) obj;
return Objects.equals(key, other.key) &&
Objects.equals(nextStepKey, other.nextStepKey);
Objects.equals(nextStepKey, other.nextStepKey);
}

@Override
public String toString() {
return key + " => " + nextStepKey;
}

protected TimeValue getMasterTimeout(ClusterState clusterState){
if(clusterState == null){
return LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(Settings.EMPTY);
}
return LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(clusterState.metaData().settings());
}

public static final class StepKey implements Writeable, ToXContentObject {
private final String phase;
private final String action;
Expand All @@ -78,6 +89,7 @@ public static final class StepKey implements Writeable, ToXContentObject {
public static final ParseField NAME_FIELD = new ParseField("name");
private static final ConstructingObjectParser<StepKey, Void> PARSER =
new ConstructingObjectParser<>("stepkey", a -> new StepKey((String) a[0], (String) a[1], (String) a[2]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PHASE_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), ACTION_FIELD);
Expand Down Expand Up @@ -134,8 +146,8 @@ public boolean equals(Object obj) {
}
StepKey other = (StepKey) obj;
return Objects.equals(phase, other.phase) &&
Objects.equals(action, other.action) &&
Objects.equals(name, other.name);
Objects.equals(action, other.action) &&
Objects.equals(name, other.name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public UpdateSettingsStep(StepKey key, StepKey nextStepKey, Client client, Setti

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName()).settings(settings);
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(currentState))
.settings(settings);
getClient().admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand Down Expand Up @@ -49,6 +50,11 @@ public boolean isRetryable() {

@Override
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
evaluateCondition(Settings.EMPTY, indexMetaData, listener);
}

@Override
public void evaluateCondition(Settings settings, IndexMetaData indexMetaData, Listener listener) {
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings());

if (Strings.isNullOrEmpty(rolloverAlias)) {
Expand Down Expand Up @@ -113,7 +119,8 @@ public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
"index [%s] is not the write index for alias [%s]", indexMetaData.getIndex().getName(), rolloverAlias)));
}

RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null)
.masterNodeTimeout(LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(settings));
rolloverRequest.dryRun(true);
if (maxAge != null) {
rolloverRequest.addMaxIndexAgeCondition(maxAge);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;

import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT;
import static org.hamcrest.Matchers.equalTo;

public abstract class AbstractStepMasterTimeoutTestCase<T extends AsyncActionStep> extends AbstractStepTestCase<T> {

protected Client client;
protected AdminClient adminClient;
protected IndicesAdminClient indicesClient;

@Before
public void setup() {
client = Mockito.mock(Client.class);
adminClient = Mockito.mock(AdminClient.class);
Mockito.when(client.admin()).thenReturn(adminClient);
indicesClient = Mockito.mock(IndicesAdminClient.class);
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
}

public void testMasterTimeout() {
checkMasterTimeout(TimeValue.timeValueSeconds(30),
ClusterState.builder(ClusterName.DEFAULT).metaData(MetaData.builder().build()).build());
checkMasterTimeout(TimeValue.timeValueSeconds(10),
ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder()
.persistentSettings(Settings.builder().put(LIFECYCLE_STEP_MASTER_TIMEOUT, "10s").build())
.build())
.build());
}

@SuppressWarnings("rawtypes")
private void checkMasterTimeout(TimeValue timeValue, ClusterState currentClusterState) {
IndexMetaData indexMetadata = getIndexMetaData();

Stubber checkTimeout = Mockito.doAnswer(invocation -> {
for(Object argument: invocation.getArguments()){
if(argument instanceof MasterNodeRequest)
{
@SuppressWarnings("rawtypes") MasterNodeRequest<?> request = (MasterNodeRequest) argument;
assertThat(request.masterNodeTimeout(), equalTo(timeValue));
}
}
return null;
});
mockRequestCall(checkTimeout);
T step = createRandomInstance();
step.performAction(indexMetadata, currentClusterState, null, new AsyncActionStep.Listener() {
@Override
public void onResponse(boolean complete) {

}

@Override
public void onFailure(Exception e) {

}
});
}

protected abstract IndexMetaData getIndexMetaData();

protected abstract void mockRequestCall(Stubber checkTimeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;

import java.util.Collections;

Expand All @@ -23,15 +24,25 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

public class CloseFollowerIndexStepTests extends AbstractStepTestCase<CloseFollowerIndexStep> {
public class CloseFollowerIndexStepTests extends AbstractStepMasterTimeoutTestCase<CloseFollowerIndexStep> {

public void testCloseFollowingIndex() {
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
@Override
protected IndexMetaData getIndexMetaData() {
return IndexMetaData.builder("follower-index")
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
.numberOfShards(1)
.numberOfReplicas(0)
.build();
}

@Override
protected void mockRequestCall(Stubber checkTimeout) {
checkTimeout.when(indicesClient).close(Mockito.any(), Mockito.any());
}

public void testCloseFollowingIndex() {
IndexMetaData indexMetadata = getIndexMetaData();

Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
Expand Down Expand Up @@ -67,12 +78,7 @@ public void onFailure(Exception e) {
}

public void testCloseFollowingIndexFailed() {
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
.numberOfShards(1)
.numberOfReplicas(0)
.build();
IndexMetaData indexMetadata = getIndexMetaData();

// Mock pause follow api call:
Client client = Mockito.mock(Client.class);
Expand Down Expand Up @@ -138,7 +144,7 @@ public void onFailure(Exception e) {
protected CloseFollowerIndexStep createRandomInstance() {
Step.StepKey stepKey = randomStepKey();
Step.StepKey nextStepKey = randomStepKey();
return new CloseFollowerIndexStep(stepKey, nextStepKey, Mockito.mock(Client.class));
return new CloseFollowerIndexStep(stepKey, nextStepKey, client);
}

@Override
Expand Down
Loading