Skip to content

Commit 6b410df

Browse files
authored
ILM: add support for rolling over data streams (#57295)
As the datastream information is stored in the `ClusterState.Metadata` we exposed the `Metadata` to the `AsyncWaitStep#evaluateCondition` method in order for the steps to be able to identify when a managed index is part of a DataStream. If a managed index is part of a DataStream the rollover target is the DataStream name and the highest generation index is the write index (ie. the rolled index).
1 parent 9e6b2a6 commit 6b410df

28 files changed

+820
-449
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -1245,12 +1245,6 @@ public void testIndexPutSettings() throws IOException {
12451245
+ "reason=final index setting [index.number_of_shards], not updateable"));
12461246
}
12471247

1248-
@SuppressWarnings("unchecked")
1249-
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
1250-
Map<String, Object> indexSettings = getIndexSettings(index);
1251-
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
1252-
}
1253-
12541248
public void testIndexPutSettingNonExistent() throws IOException {
12551249

12561250
String index = "index";

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

-6
Original file line numberDiff line numberDiff line change
@@ -731,10 +731,4 @@ public void testSoftDeletesDisabledWarning() throws Exception {
731731
ensureGreen(indexName);
732732
indexDocs(indexName, randomInt(100), randomInt(100));
733733
}
734-
735-
@SuppressWarnings("unchecked")
736-
private Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
737-
Map<String, Object> indexSettings = getIndexSettings(index);
738-
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
739-
}
740734
}

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

+6
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,12 @@ protected static Map<String, Object> getIndexSettings(String index) throws IOExc
11021102
}
11031103
}
11041104

1105+
@SuppressWarnings("unchecked")
1106+
protected Map<String, Object> getIndexSettingsAsMap(String index) throws IOException {
1107+
Map<String, Object> indexSettings = getIndexSettings(index);
1108+
return (Map<String, Object>)((Map<String, Object>) indexSettings.get(index)).get("settings");
1109+
}
1110+
11051111
protected static boolean indexExists(String index) throws IOException {
11061112
Response response = client().performRequest(new Request("HEAD", "/" + index));
11071113
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
package org.elasticsearch.xpack.core.ilm;
77

88
import org.elasticsearch.client.Client;
9-
import org.elasticsearch.cluster.metadata.IndexMetadata;
9+
import org.elasticsearch.cluster.metadata.Metadata;
1010
import org.elasticsearch.common.unit.TimeValue;
1111
import org.elasticsearch.common.xcontent.ToXContentObject;
12+
import org.elasticsearch.index.Index;
1213

1314
/**
1415
* A step which will be called periodically, waiting for some condition to become true.
1516
* Called asynchronously, as the condition may take time to check.
16-
*
17+
* <p>
1718
* If checking something based on the current cluster state which does not take time to check, use {@link ClusterStateWaitStep}.
1819
*/
1920
public abstract class AsyncWaitStep extends Step {
@@ -29,7 +30,7 @@ protected Client getClient() {
2930
return client;
3031
}
3132

32-
public abstract void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout);
33+
public abstract void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout);
3334

3435
public interface Listener {
3536

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java

+29-19
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.client.Client;
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.ClusterStateObserver;
16+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1617
import org.elasticsearch.cluster.metadata.IndexMetadata;
1718
import org.elasticsearch.common.Strings;
1819

@@ -39,38 +40,47 @@ public boolean isRetryable() {
3940
@Override
4041
public void performAction(IndexMetadata indexMetadata, ClusterState currentClusterState,
4142
ClusterStateObserver observer, Listener listener) {
43+
String indexName = indexMetadata.getIndex().getName();
4244
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetadata.getSettings());
4345
if (indexingComplete) {
4446
logger.trace(indexMetadata.getIndex() + " has lifecycle complete set, skipping " + RolloverStep.NAME);
4547
listener.onResponse(true);
4648
return;
4749
}
50+
IndexAbstraction indexAbstraction = currentClusterState.metadata().getIndicesLookup().get(indexName);
51+
assert indexAbstraction != null : "expected the index " + indexName + " to exist in the lookup but it didn't";
52+
final String rolloverTarget;
53+
if (indexAbstraction.getParentDataStream() != null) {
54+
rolloverTarget = indexAbstraction.getParentDataStream().getName();
55+
} else {
56+
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
4857

49-
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
58+
if (Strings.isNullOrEmpty(rolloverAlias)) {
59+
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
60+
"setting [%s] for index [%s] is empty or not defined, it must be set to the name of the alias pointing to the group " +
61+
"of indices being rolled over", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, indexName)));
62+
return;
63+
}
5064

51-
if (Strings.isNullOrEmpty(rolloverAlias)) {
52-
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
53-
"setting [%s] for index [%s] is empty or not defined", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS,
54-
indexMetadata.getIndex().getName())));
55-
return;
56-
}
65+
if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
66+
logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
67+
indexName, rolloverAlias);
68+
listener.onResponse(true);
69+
return;
70+
}
5771

58-
if (indexMetadata.getRolloverInfos().get(rolloverAlias) != null) {
59-
logger.info("index [{}] was already rolled over for alias [{}], not attempting to roll over again",
60-
indexMetadata.getIndex().getName(), rolloverAlias);
61-
listener.onResponse(true);
62-
return;
63-
}
72+
if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) {
73+
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
74+
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
75+
indexName)));
76+
return;
77+
}
6478

65-
if (indexMetadata.getAliases().containsKey(rolloverAlias) == false) {
66-
listener.onFailure(new IllegalArgumentException(String.format(Locale.ROOT,
67-
"%s [%s] does not point to index [%s]", RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, rolloverAlias,
68-
indexMetadata.getIndex().getName())));
69-
return;
79+
rolloverTarget = rolloverAlias;
7080
}
7181

7282
// Calling rollover with no conditions will always roll over the index
73-
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null)
83+
RolloverRequest rolloverRequest = new RolloverRequest(rolloverTarget, null)
7484
.masterNodeTimeout(getMasterTimeout(currentClusterState));
7585
// We don't wait for active shards when we perform the rollover because the
7686
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SegmentCountStep.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@
1313
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
1414
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
1515
import org.elasticsearch.client.Client;
16-
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.Metadata;
1717
import org.elasticsearch.cluster.routing.ShardRouting;
1818
import org.elasticsearch.common.ParseField;
1919
import org.elasticsearch.common.Strings;
2020
import org.elasticsearch.common.unit.TimeValue;
2121
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
2222
import org.elasticsearch.common.xcontent.ToXContentObject;
2323
import org.elasticsearch.common.xcontent.XContentBuilder;
24+
import org.elasticsearch.index.Index;
2425

2526
import java.io.IOException;
2627
import java.util.Arrays;
@@ -49,16 +50,16 @@ public int getMaxNumSegments() {
4950
}
5051

5152
@Override
52-
public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, TimeValue masterTimeout) {
53-
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetadata.getIndex().getName()),
53+
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
54+
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()),
5455
ActionListener.wrap(response -> {
55-
IndexSegments idxSegments = response.getIndices().get(indexMetadata.getIndex().getName());
56+
IndexSegments idxSegments = response.getIndices().get(index.getName());
5657
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
5758
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
5859
logger.info("[{}] retrieval of segment counts after force merge did not succeed, " +
5960
"there were {} shard failures. " +
6061
"failures: {}",
61-
indexMetadata.getIndex().getName(),
62+
index.getName(),
6263
response.getFailedShards(),
6364
failures == null ? "n/a" : Strings.collectionToDelimitedString(Arrays.stream(failures)
6465
.map(Strings::toString)
@@ -73,7 +74,7 @@ public void evaluateCondition(IndexMetadata indexMetadata, Listener listener, Ti
7374
Map<ShardRouting, Integer> unmergedShardCounts = unmergedShards.stream()
7475
.collect(Collectors.toMap(ShardSegments::getShardRouting, ss -> ss.getSegments().size()));
7576
logger.info("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}",
76-
indexMetadata.getIndex().getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
77+
index.getName(), maxNumSegments, unmergedShards.size(), unmergedShardCounts);
7778
}
7879
// Force merging is best effort, so always return true that the condition has been met.
7980
listener.onResponse(true, new Info(unmergedShards.size()));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRolloverLifecycleDateStep.java

+23-9
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
1111
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
1314
import org.elasticsearch.cluster.metadata.Metadata;
1415
import org.elasticsearch.common.Strings;
@@ -52,16 +53,11 @@ public ClusterState performAction(Index index, ClusterState currentState) {
5253
// so just use the current time.
5354
newIndexTime = fallbackTimeSupplier.getAsLong();
5455
} else {
55-
// find the newly created index from the rollover and fetch its index.creation_date
56-
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
57-
if (Strings.isNullOrEmpty(rolloverAlias)) {
58-
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
59-
+ "] is not set on index [" + indexMetadata.getIndex().getName() + "]");
60-
}
61-
RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverAlias);
56+
final String rolloverTarget = getRolloverTarget(index, currentState);
57+
RolloverInfo rolloverInfo = indexMetadata.getRolloverInfos().get(rolloverTarget);
6258
if (rolloverInfo == null) {
63-
throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() + "] with alias [" +
64-
rolloverAlias + "], the index has not yet rolled over with that alias");
59+
throw new IllegalStateException("no rollover info found for [" + indexMetadata.getIndex().getName() +
60+
"] with rollover target [" + rolloverTarget + "], the index has not yet rolled over with that target");
6561
}
6662
newIndexTime = rolloverInfo.getTime();
6763
}
@@ -76,6 +72,24 @@ public ClusterState performAction(Index index, ClusterState currentState) {
7672
.put(newIndexMetadata)).build();
7773
}
7874

75+
private static String getRolloverTarget(Index index, ClusterState currentState) {
76+
IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(index.getName());
77+
final String rolloverTarget;
78+
if (indexAbstraction.getParentDataStream() != null) {
79+
rolloverTarget = indexAbstraction.getParentDataStream().getName();
80+
} else {
81+
// find the newly created index from the rollover and fetch its index.creation_date
82+
IndexMetadata indexMetadata = currentState.metadata().index(index);
83+
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
84+
if (Strings.isNullOrEmpty(rolloverAlias)) {
85+
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
86+
+ "] is not set on index [" + indexMetadata.getIndex().getName() + "]");
87+
}
88+
rolloverTarget = rolloverAlias;
89+
}
90+
return rolloverTarget;
91+
}
92+
7993
@Override
8094
public int hashCode() {
8195
return super.hashCode();

0 commit comments

Comments
 (0)