Skip to content

ILM wait for active shards on rolled index in a separate step #50718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f6290e4
ILM wait for active shards on rolled index in a separate step
andreidan Jan 7, 2020
fccfee5
Add license header
andreidan Jan 7, 2020
456ba48
Fix RolloverActionTests to reflect the new step
andreidan Jan 8, 2020
292a8e6
WaitForActiveShardsStep uses the alias index
andreidan Jan 15, 2020
b2673bd
Fix integratino test
andreidan Jan 15, 2020
77364de
Merge branch 'master' into ilm-rollover-wait-for-active-shards
andreidan Jan 15, 2020
0a98bf0
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 15, 2020
4842f39
Don't wait for active shards when rolling over in ILM
andreidan Jan 15, 2020
eb8f95f
Fix TransportPutLifecycleActionTests
andreidan Jan 15, 2020
a4615f8
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 15, 2020
115a131
Comment to clarify why rollover doens't wait for active shards
andreidan Jan 16, 2020
0924222
Guard against the index having been deleted while executing a policy
andreidan Jan 16, 2020
bf09bbc
Return a meaningful shards state message
andreidan Jan 16, 2020
c33a595
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 16, 2020
6225c04
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 16, 2020
a27e1cc
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 16, 2020
a838925
Drop unused getters
andreidan Jan 16, 2020
9f9b842
Find rolled index by finding the max counter in the name.
andreidan Jan 17, 2020
c3f506e
Escape < and > in javadoc
andreidan Jan 17, 2020
7679143
Skip WaitForActiveShardsStep when lifecycle complete is set
andreidan Jan 17, 2020
3a1cd87
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 17, 2020
b838f3a
Use lower case and drop . in error messages
andreidan Jan 17, 2020
4449853
Mark vars as final
andreidan Jan 20, 2020
64c9f06
Add explicit error handling to parseIndexNameCounter
andreidan Jan 20, 2020
9322665
Remove Parser
andreidan Jan 20, 2020
36e07fc
Add Info object to report various step progress messages
andreidan Jan 20, 2020
c0ecf31
Make constructors default visible
andreidan Jan 20, 2020
f4f2c84
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -159,6 +160,13 @@ public void dryRun(boolean dryRun) {
this.dryRun = dryRun;
}

/**
* Sets the wait for active shards configuration for the rolled index that gets created.
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
createIndexRequest.waitForActiveShards(waitForActiveShards);
}

/**
* Adds condition to check if the index is at least <code>age</code> old
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,19 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)

StepKey waitForRolloverReadyStepKey = new StepKey(phase, NAME, WaitForRolloverReadyStep.NAME);
StepKey rolloverStepKey = new StepKey(phase, NAME, RolloverStep.NAME);
StepKey waitForActiveShardsKey = new StepKey(phase, NAME, WaitForActiveShardsStep.NAME);
StepKey updateDateStepKey = new StepKey(phase, NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey setIndexingCompleteStepKey = new StepKey(phase, NAME, INDEXING_COMPLETE_STEP_NAME);

WaitForRolloverReadyStep waitForRolloverReadyStep = new WaitForRolloverReadyStep(waitForRolloverReadyStepKey, rolloverStepKey,
client, maxSize, maxAge, maxDocs);
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, updateDateStepKey, client);
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, waitForActiveShardsKey, client);
WaitForActiveShardsStep waitForActiveShardsStep = new WaitForActiveShardsStep(waitForActiveShardsKey, updateDateStepKey);
UpdateRolloverLifecycleDateStep updateDateStep = new UpdateRolloverLifecycleDateStep(updateDateStepKey, setIndexingCompleteStepKey,
System::currentTimeMillis);
UpdateSettingsStep setIndexingCompleteStep = new UpdateSettingsStep(setIndexingCompleteStepKey, nextStepKey,
client, indexingComplete);
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, updateDateStep, setIndexingCompleteStep);
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, waitForActiveShardsStep, updateDateStep, setIndexingCompleteStep);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -70,6 +71,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentClust

// Calling rollover with no conditions will always roll over the index
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
rolloverRequest.setWaitForActiveShards(ActiveShardCount.NONE);
getClient().admin().indices().rolloverIndex(rolloverRequest,
ActionListener.wrap(response -> {
assert response.isRolledOver() : "the only way this rollover call should fail is with an exception";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.AliasOrIndex.Alias;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.Index;

import java.util.List;

/**
* After we performed the index rollover we wait for the the configured number of shards for the rolled over index (ie. newly created
* index) to become available.
*/
public class WaitForActiveShardsStep extends ClusterStateWaitStep {

public static final String NAME = "wait-for-active-shards";

WaitForActiveShardsStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetaData originalIndexMeta = clusterState.metaData().index(index);
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
}

AliasOrIndex aliasOrIndex = clusterState.metaData().getAliasAndIndexLookup().get(rolloverAlias);
assert aliasOrIndex.isAlias() : rolloverAlias + " must be an alias but it is an index";

Alias alias = (Alias) aliasOrIndex;
IndexMetaData aliasWriteIndex = alias.getWriteIndex();
String rolledIndexName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String rolledIndexName;
final String rolledIndexName;

String waitForActiveShardsSettingValue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String waitForActiveShardsSettingValue;
final String waitForActiveShardsSettingValue;

if (aliasWriteIndex != null) {
rolledIndexName = aliasWriteIndex.getIndex().getName();
waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards");
} else {
// if the rollover was not performed on a write index alias, the alias will be moved to the new index and it will be the only
// index this alias points to
List<IndexMetaData> indices = alias.getIndices();
assert indices.size() == 1 : "when performing rollover on alias with is_write_index = false the alias must point to only " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assertion doesn't stand in a CCR environment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coming up with the fix by parsing the number from the index name and finding the rolled index by "max number"

"one index";
IndexMetaData indexMetaData = indices.get(0);
rolledIndexName = indexMetaData.getIndex().getName();
waitForActiveShardsSettingValue = indexMetaData.getSettings().get("index.write.wait_for_active_shards");
}

ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue);
return new Result(activeShardCount.enoughShardsActive(clusterState, rolledIndexName), null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,32 @@ public void testToSteps() {
RolloverAction action = createTestInstance();
String phase = randomAlphaOfLengthBetween(1, 10);
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10));
randomAlphaOfLengthBetween(1, 10));
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertNotNull(steps);
assertEquals(4, steps.size());
assertEquals(5, steps.size());
StepKey expectedFirstStepKey = new StepKey(phase, RolloverAction.NAME, WaitForRolloverReadyStep.NAME);
StepKey expectedSecondStepKey = new StepKey(phase, RolloverAction.NAME, RolloverStep.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, WaitForActiveShardsStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey expectedFifthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
WaitForRolloverReadyStep firstStep = (WaitForRolloverReadyStep) steps.get(0);
RolloverStep secondStep = (RolloverStep) steps.get(1);
UpdateRolloverLifecycleDateStep thirdStep = (UpdateRolloverLifecycleDateStep) steps.get(2);
UpdateSettingsStep fourthStep = (UpdateSettingsStep) steps.get(3);
WaitForActiveShardsStep thirdStep = (WaitForActiveShardsStep) steps.get(2);
UpdateRolloverLifecycleDateStep fourthStep = (UpdateRolloverLifecycleDateStep) steps.get(3);
UpdateSettingsStep fifthStep = (UpdateSettingsStep) steps.get(4);
assertEquals(expectedFirstStepKey, firstStep.getKey());
assertEquals(expectedSecondStepKey, secondStep.getKey());
assertEquals(expectedThirdStepKey, thirdStep.getKey());
assertEquals(expectedFourthStepKey, fourthStep.getKey());
assertEquals(expectedFifthStepKey, fifthStep.getKey());
assertEquals(secondStep.getKey(), firstStep.getNextStepKey());
assertEquals(thirdStep.getKey(), secondStep.getNextStepKey());
assertEquals(fourthStep.getKey(), thirdStep.getNextStepKey());
assertEquals(fifthStep.getKey(), fourthStep.getNextStepKey());
assertEquals(action.getMaxSize(), firstStep.getMaxSize());
assertEquals(action.getMaxAge(), firstStep.getMaxAge());
assertEquals(action.getMaxDocs(), firstStep.getMaxDocs());
assertEquals(nextStepKey, fourthStep.getNextStepKey());
assertEquals(nextStepKey, fifthStep.getNextStepKey());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import static org.hamcrest.Matchers.is;

public class WaitForActiveShardsTests extends AbstractStepTestCase<WaitForActiveShardsStep> {

@Override
public WaitForActiveShardsStep createRandomInstance() {
StepKey stepKey = randomStepKey();
StepKey nextStepKey = randomStepKey();

return new WaitForActiveShardsStep(stepKey, nextStepKey);
}

@Override
public WaitForActiveShardsStep mutateInstance(WaitForActiveShardsStep instance) {
StepKey key = instance.getKey();
StepKey nextKey = instance.getNextStepKey();

switch (between(0, 1)) {
case 0:
key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
break;
case 1:
nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
break;
default:
throw new AssertionError("Illegal randomisation branch");
}

return new WaitForActiveShardsStep(key, nextKey);
}

@Override
public WaitForActiveShardsStep copyInstance(WaitForActiveShardsStep instance) {
return new WaitForActiveShardsStep(instance.getKey(), instance.getNextStepKey());
}

public void testIsConditionMetThrowsExceptionWhenRolloverAliasIsNotSet() {
String alias = randomAlphaOfLength(5);
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
.putAlias(AliasMetaData.builder(alias))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder().put(indexMetaData, true).build())
.build();

try {
createRandomInstance().isConditionMet(indexMetaData.getIndex(), clusterState);
fail("expected the invocation to fail");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), is("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + indexMetaData.getIndex().getName() + "]"));
}
}

public void testResultEvaluatedOnWriteIndexAliasWhenExists() {
String alias = randomAlphaOfLength(5);
IndexMetaData originalIndex = IndexMetaData.builder("index-000000")
.putAlias(AliasMetaData.builder(alias).writeIndex(false))
.settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias))
.numberOfShards(1)
.numberOfReplicas(randomIntBetween(0, 5))
.build();
IndexMetaData rolledIndex = IndexMetaData.builder("index-000001")
.putAlias(AliasMetaData.builder(alias).writeIndex(true))
.settings(settings(Version.CURRENT)
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
.put("index.write.wait_for_active_shards", "all")
)
.numberOfShards(1)
.numberOfReplicas(1)
.build();
IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndex.getIndex());
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node", null, true,
ShardRoutingState.STARTED));
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node2", null, false,
ShardRoutingState.STARTED));
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder().put(originalIndex, true)
.put(rolledIndex, true)
.build())
.routingTable(RoutingTable.builder().add(routingTable.build()).build())
.build();

assertThat("the rolled index has both the primary and the replica shards started so the condition should be met",
createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState).isComplete(), is(true));
}

public void testResultEvaluatedOnOnlyIndexTheAliasPointsToIfWriteIndexIsNull() {
String alias = randomAlphaOfLength(5);
IndexMetaData originalIndex = IndexMetaData.builder("index-000000")
.settings(settings(Version.CURRENT).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias))
.numberOfShards(1)
.numberOfReplicas(randomIntBetween(0, 5))
.build();
IndexMetaData rolledIndex = IndexMetaData.builder("index-000001")
.putAlias(AliasMetaData.builder(alias).writeIndex(false))
.settings(settings(Version.CURRENT)
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
.put("index.write.wait_for_active_shards", "all")
)
.numberOfShards(1)
.numberOfReplicas(1)
.build();
IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(rolledIndex.getIndex());
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node", null, true,
ShardRoutingState.STARTED));
routingTable.addShard(TestShardRouting.newShardRouting(rolledIndex.getIndex().getName(), 0, "node2", null, false,
ShardRoutingState.STARTED));
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder().put(originalIndex, true)
.put(rolledIndex, true)
.build())
.routingTable(RoutingTable.builder().add(routingTable.build()).build())
.build();

assertThat("the index the alias is pointing to has both the primary and the replica shards started so the condition should be" +
" met", createRandomInstance().isConditionMet(originalIndex.getIndex(), clusterState).isComplete(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep;
import org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep;
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -1200,6 +1201,41 @@ public void testUpdateRolloverLifecycleDateStepRetriesWhenRolloverInfoIsMissing(
assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY)));
}

public void testWaitForActiveShardsStep() throws Exception {
String originalIndex = index + "-000001";
String secondIndex = index + "-000002";
createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"),
true);

// create policy
createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L));
// update policy on index
updatePolicy(originalIndex, policy);
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
createIndexTemplate.setJsonEntity("{" +
"\"index_patterns\": [\""+ index + "-*\"], \n" +
" \"settings\": {\n" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 142,\n" +
" \"index.write.wait_for_active_shards\": \"all\"\n" +
" }\n" +
"}");
client().performRequest(createIndexTemplate);

// index document to trigger rollover
index(client(), originalIndex, "_id", "foo", "bar");
assertBusy(() -> assertTrue(indexExists(secondIndex)));

assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex).getName(), equalTo(WaitForActiveShardsStep.NAME)));

// reset the number of replicas to 0 so that the second index wait for active shard condition can be met
updateIndexSettings(secondIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));

assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY)));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/50353")
public void testHistoryIsWrittenWithSuccess() throws Exception {
String index = "success-index";
Expand Down
Loading