Skip to content

Commit 457a92f

Browse files
authored
ILM wait for active shards on rolled index in a separate step (#50718)
After we rollover the index we wait for the configured number of shards for the rolled index to become active (based on the index.write.wait_for_active_shards setting which might be present in a template, or otherwise in the default case, for the primaries to become active). This wait might be long due to disk watermarks being tripped, replicas not being able to spring to life due to cluster nodes reconfiguration and others and, the RolloverStep might not complete successfully due to this inherent transient situation, albeit the rolled index having been created.
1 parent 6da66c6 commit 457a92f

File tree

8 files changed

+538
-9
lines changed

8 files changed

+538
-9
lines changed

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java

+8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.ActionRequestValidationException;
2222
import org.elasticsearch.action.IndicesRequest;
2323
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
24+
import org.elasticsearch.action.support.ActiveShardCount;
2425
import org.elasticsearch.action.support.IndicesOptions;
2526
import org.elasticsearch.action.support.master.AcknowledgedRequest;
2627
import org.elasticsearch.common.ParseField;
@@ -159,6 +160,13 @@ public void dryRun(boolean dryRun) {
159160
this.dryRun = dryRun;
160161
}
161162

163+
/**
164+
* Sets the wait for active shards configuration for the rolled index that gets created.
165+
*/
166+
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
167+
createIndexRequest.waitForActiveShards(waitForActiveShards);
168+
}
169+
162170
/**
163171
* Adds condition to check if the index is at least <code>age</code> old
164172
*/

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverAction.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,19 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)
139139

140140
StepKey waitForRolloverReadyStepKey = new StepKey(phase, NAME, WaitForRolloverReadyStep.NAME);
141141
StepKey rolloverStepKey = new StepKey(phase, NAME, RolloverStep.NAME);
142+
StepKey waitForActiveShardsKey = new StepKey(phase, NAME, WaitForActiveShardsStep.NAME);
142143
StepKey updateDateStepKey = new StepKey(phase, NAME, UpdateRolloverLifecycleDateStep.NAME);
143144
StepKey setIndexingCompleteStepKey = new StepKey(phase, NAME, INDEXING_COMPLETE_STEP_NAME);
144145

145146
WaitForRolloverReadyStep waitForRolloverReadyStep = new WaitForRolloverReadyStep(waitForRolloverReadyStepKey, rolloverStepKey,
146147
client, maxSize, maxAge, maxDocs);
147-
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, updateDateStepKey, client);
148+
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, waitForActiveShardsKey, client);
149+
WaitForActiveShardsStep waitForActiveShardsStep = new WaitForActiveShardsStep(waitForActiveShardsKey, updateDateStepKey);
148150
UpdateRolloverLifecycleDateStep updateDateStep = new UpdateRolloverLifecycleDateStep(updateDateStepKey, setIndexingCompleteStepKey,
149151
System::currentTimeMillis);
150152
UpdateSettingsStep setIndexingCompleteStep = new UpdateSettingsStep(setIndexingCompleteStepKey, nextStepKey,
151153
client, indexingComplete);
152-
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, updateDateStep, setIndexingCompleteStep);
154+
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, waitForActiveShardsStep, updateDateStep, setIndexingCompleteStep);
153155
}
154156

155157
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java

+4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
12+
import org.elasticsearch.action.support.ActiveShardCount;
1213
import org.elasticsearch.client.Client;
1314
import org.elasticsearch.cluster.ClusterState;
1415
import org.elasticsearch.cluster.ClusterStateObserver;
@@ -70,6 +71,9 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentClust
7071

7172
// Calling rollover with no conditions will always roll over the index
7273
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
74+
// We don't wait for active shards when we perform the rollover because the
75+
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
76+
rolloverRequest.setWaitForActiveShards(ActiveShardCount.NONE);
7377
getClient().admin().indices().rolloverIndex(rolloverRequest,
7478
ActionListener.wrap(response -> {
7579
assert response.isRolledOver() : "the only way this rollover call should fail is with an exception";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ilm;
7+
8+
import com.carrotsearch.hppc.cursors.IntObjectCursor;
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.action.support.ActiveShardCount;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.AliasOrIndex;
14+
import org.elasticsearch.cluster.metadata.AliasOrIndex.Alias;
15+
import org.elasticsearch.cluster.metadata.IndexMetaData;
16+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
17+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
18+
import org.elasticsearch.common.ParseField;
19+
import org.elasticsearch.common.Strings;
20+
import org.elasticsearch.common.xcontent.ToXContentObject;
21+
import org.elasticsearch.common.xcontent.XContentBuilder;
22+
import org.elasticsearch.index.Index;
23+
24+
import java.io.IOException;
25+
import java.util.List;
26+
import java.util.Locale;
27+
import java.util.Objects;
28+
29+
/**
30+
* After we performed the index rollover we wait for the the configured number of shards for the rolled over index (ie. newly created
31+
* index) to become available.
32+
*/
33+
public class WaitForActiveShardsStep extends ClusterStateWaitStep {
34+
35+
public static final String NAME = "wait-for-active-shards";
36+
37+
private static final Logger logger = LogManager.getLogger(WaitForActiveShardsStep.class);
38+
39+
WaitForActiveShardsStep(StepKey key, StepKey nextStepKey) {
40+
super(key, nextStepKey);
41+
}
42+
43+
@Override
44+
public boolean isRetryable() {
45+
return true;
46+
}
47+
48+
@Override
49+
public Result isConditionMet(Index index, ClusterState clusterState) {
50+
IndexMetaData originalIndexMeta = clusterState.metaData().index(index);
51+
52+
if (originalIndexMeta == null) {
53+
String errorMessage = String.format(Locale.ROOT, "[%s] lifecycle action for index [%s] executed but index no longer exists",
54+
getKey().getAction(), index.getName());
55+
// Index must have been since deleted
56+
logger.debug(errorMessage);
57+
return new Result(false, new Info(errorMessage));
58+
}
59+
60+
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(originalIndexMeta.getSettings());
61+
if (indexingComplete) {
62+
String message = String.format(Locale.ROOT, "index [%s] has lifecycle complete set, skipping [%s]",
63+
originalIndexMeta.getIndex().getName(), WaitForActiveShardsStep.NAME);
64+
logger.trace(message);
65+
return new Result(true, new Info(message));
66+
}
67+
68+
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
69+
if (Strings.isNullOrEmpty(rolloverAlias)) {
70+
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
71+
+ "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
72+
}
73+
74+
AliasOrIndex aliasOrIndex = clusterState.metaData().getAliasAndIndexLookup().get(rolloverAlias);
75+
assert aliasOrIndex.isAlias() : rolloverAlias + " must be an alias but it is an index";
76+
77+
Alias alias = (Alias) aliasOrIndex;
78+
IndexMetaData aliasWriteIndex = alias.getWriteIndex();
79+
final String rolledIndexName;
80+
final String waitForActiveShardsSettingValue;
81+
if (aliasWriteIndex != null) {
82+
rolledIndexName = aliasWriteIndex.getIndex().getName();
83+
waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards");
84+
} else {
85+
List<IndexMetaData> indices = alias.getIndices();
86+
int maxIndexCounter = -1;
87+
IndexMetaData rolledIndexMeta = null;
88+
for (IndexMetaData indexMetaData : indices) {
89+
int indexNameCounter = parseIndexNameCounter(indexMetaData.getIndex().getName());
90+
if (maxIndexCounter < indexNameCounter) {
91+
maxIndexCounter = indexNameCounter;
92+
rolledIndexMeta = indexMetaData;
93+
}
94+
}
95+
if (rolledIndexMeta == null) {
96+
String errorMessage = String.format(Locale.ROOT,
97+
"unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", index.getName(),
98+
getKey().getAction());
99+
100+
// Index must have been since deleted
101+
logger.debug(errorMessage);
102+
return new Result(false, new Info(errorMessage));
103+
}
104+
rolledIndexName = rolledIndexMeta.getIndex().getName();
105+
waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
106+
}
107+
108+
ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue);
109+
boolean enoughShardsActive = activeShardCount.enoughShardsActive(clusterState, rolledIndexName);
110+
111+
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(rolledIndexName);
112+
int currentActiveShards = 0;
113+
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
114+
currentActiveShards += shardRouting.value.activeShards().size();
115+
}
116+
return new Result(enoughShardsActive, new ActiveShardsInfo(currentActiveShards, activeShardCount.toString(), enoughShardsActive));
117+
}
118+
119+
/**
120+
* Parses the number from the rolled over index name. It also supports the date-math format (ie. index name is wrapped in &lt; and &gt;)
121+
* <p>
122+
* Eg.
123+
* <p>
124+
* - For "logs-000002" it'll return 2
125+
* - For "&lt;logs-{now/d}-3&gt;" it'll return 3
126+
*/
127+
static int parseIndexNameCounter(String indexName) {
128+
int numberIndex = indexName.lastIndexOf("-");
129+
if (numberIndex == -1) {
130+
throw new IllegalArgumentException("no - separator found in index name [" + indexName + "]");
131+
}
132+
try {
133+
return Integer.parseInt(indexName.substring(numberIndex + 1, indexName.endsWith(">") ? indexName.length() - 1 :
134+
indexName.length()));
135+
} catch (NumberFormatException e) {
136+
throw new IllegalArgumentException("unable to parse the index name [" + indexName + "] to extract the counter", e);
137+
}
138+
}
139+
140+
static final class ActiveShardsInfo implements ToXContentObject {
141+
142+
private final long currentActiveShardsCount;
143+
private final String targetActiveShardsCount;
144+
private final boolean enoughShardsActive;
145+
private final String message;
146+
147+
static final ParseField CURRENT_ACTIVE_SHARDS_COUNT = new ParseField("current_active_shards_count");
148+
static final ParseField TARGET_ACTIVE_SHARDS_COUNT = new ParseField("target_active_shards_count");
149+
static final ParseField ENOUGH_SHARDS_ACTIVE = new ParseField("enough_shards_active");
150+
static final ParseField MESSAGE = new ParseField("message");
151+
152+
ActiveShardsInfo(long currentActiveShardsCount, String targetActiveShardsCount, boolean enoughShardsActive) {
153+
this.currentActiveShardsCount = currentActiveShardsCount;
154+
this.targetActiveShardsCount = targetActiveShardsCount;
155+
this.enoughShardsActive = enoughShardsActive;
156+
157+
if (enoughShardsActive) {
158+
message = "the target of [" + targetActiveShardsCount + "] are active. Don't need to wait anymore";
159+
} else {
160+
message = "waiting for [" + targetActiveShardsCount + "] shards to become active, but only [" + currentActiveShardsCount +
161+
"] are active";
162+
}
163+
}
164+
165+
@Override
166+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
167+
builder.startObject();
168+
builder.field(MESSAGE.getPreferredName(), message);
169+
builder.field(CURRENT_ACTIVE_SHARDS_COUNT.getPreferredName(), currentActiveShardsCount);
170+
builder.field(TARGET_ACTIVE_SHARDS_COUNT.getPreferredName(), targetActiveShardsCount);
171+
builder.field(ENOUGH_SHARDS_ACTIVE.getPreferredName(), enoughShardsActive);
172+
builder.endObject();
173+
return builder;
174+
}
175+
176+
@Override
177+
public boolean equals(Object o) {
178+
if (this == o) {
179+
return true;
180+
}
181+
if (o == null || getClass() != o.getClass()) {
182+
return false;
183+
}
184+
ActiveShardsInfo info = (ActiveShardsInfo) o;
185+
return currentActiveShardsCount == info.currentActiveShardsCount &&
186+
enoughShardsActive == info.enoughShardsActive &&
187+
Objects.equals(targetActiveShardsCount, info.targetActiveShardsCount) &&
188+
Objects.equals(message, info.message);
189+
}
190+
191+
@Override
192+
public int hashCode() {
193+
return Objects.hash(currentActiveShardsCount, targetActiveShardsCount, enoughShardsActive, message);
194+
}
195+
}
196+
197+
static final class Info implements ToXContentObject {
198+
199+
private final String message;
200+
201+
static final ParseField MESSAGE = new ParseField("message");
202+
203+
Info(String message) {
204+
this.message = message;
205+
}
206+
207+
@Override
208+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
209+
builder.startObject();
210+
builder.field(MESSAGE.getPreferredName(), message);
211+
builder.endObject();
212+
return builder;
213+
}
214+
215+
@Override
216+
public boolean equals(Object o) {
217+
if (this == o) {
218+
return true;
219+
}
220+
if (o == null || getClass() != o.getClass()) {
221+
return false;
222+
}
223+
Info info = (Info) o;
224+
return Objects.equals(message, info.message);
225+
}
226+
227+
@Override
228+
public int hashCode() {
229+
return Objects.hash(message);
230+
}
231+
}
232+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverActionTests.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -77,28 +77,32 @@ public void testToSteps() {
7777
RolloverAction action = createTestInstance();
7878
String phase = randomAlphaOfLengthBetween(1, 10);
7979
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
80-
randomAlphaOfLengthBetween(1, 10));
80+
randomAlphaOfLengthBetween(1, 10));
8181
List<Step> steps = action.toSteps(null, phase, nextStepKey);
8282
assertNotNull(steps);
83-
assertEquals(4, steps.size());
83+
assertEquals(5, steps.size());
8484
StepKey expectedFirstStepKey = new StepKey(phase, RolloverAction.NAME, WaitForRolloverReadyStep.NAME);
8585
StepKey expectedSecondStepKey = new StepKey(phase, RolloverAction.NAME, RolloverStep.NAME);
86-
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
87-
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
86+
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, WaitForActiveShardsStep.NAME);
87+
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
88+
StepKey expectedFifthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
8889
WaitForRolloverReadyStep firstStep = (WaitForRolloverReadyStep) steps.get(0);
8990
RolloverStep secondStep = (RolloverStep) steps.get(1);
90-
UpdateRolloverLifecycleDateStep thirdStep = (UpdateRolloverLifecycleDateStep) steps.get(2);
91-
UpdateSettingsStep fourthStep = (UpdateSettingsStep) steps.get(3);
91+
WaitForActiveShardsStep thirdStep = (WaitForActiveShardsStep) steps.get(2);
92+
UpdateRolloverLifecycleDateStep fourthStep = (UpdateRolloverLifecycleDateStep) steps.get(3);
93+
UpdateSettingsStep fifthStep = (UpdateSettingsStep) steps.get(4);
9294
assertEquals(expectedFirstStepKey, firstStep.getKey());
9395
assertEquals(expectedSecondStepKey, secondStep.getKey());
9496
assertEquals(expectedThirdStepKey, thirdStep.getKey());
9597
assertEquals(expectedFourthStepKey, fourthStep.getKey());
98+
assertEquals(expectedFifthStepKey, fifthStep.getKey());
9699
assertEquals(secondStep.getKey(), firstStep.getNextStepKey());
97100
assertEquals(thirdStep.getKey(), secondStep.getNextStepKey());
98101
assertEquals(fourthStep.getKey(), thirdStep.getNextStepKey());
102+
assertEquals(fifthStep.getKey(), fourthStep.getNextStepKey());
99103
assertEquals(action.getMaxSize(), firstStep.getMaxSize());
100104
assertEquals(action.getMaxAge(), firstStep.getMaxAge());
101105
assertEquals(action.getMaxDocs(), firstStep.getMaxDocs());
102-
assertEquals(nextStepKey, fourthStep.getNextStepKey());
106+
assertEquals(nextStepKey, fifthStep.getNextStepKey());
103107
}
104108
}

0 commit comments

Comments
 (0)