Skip to content

Commit 6285fac

Browse files
authored
Service to migrate indices and ILM policies to data tiers (#73689)
This adds a service that migrates the indices and ILM policies away from custom node attribute allocation routing to data tiers. Optionally, it also deletes one legacy index template.
1 parent c9a6136 commit 6285fac

File tree

11 files changed

+1956
-349
lines changed

11 files changed

+1956
-349
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java

+19
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.util.Arrays;
2020
import java.util.HashSet;
21+
import java.util.List;
2122
import java.util.Set;
23+
import java.util.stream.Collectors;
2224

2325
/**
2426
* The {@code DataTier} class encapsulates the formalization of the "content",
@@ -40,6 +42,10 @@ public class DataTier {
4042
public static final Set<String> ALL_DATA_TIERS =
4143
new HashSet<>(Arrays.asList(DATA_CONTENT, DATA_HOT, DATA_WARM, DATA_COLD, DATA_FROZEN));
4244

45+
// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
46+
private static final List<String> ORDERED_FROZEN_TO_HOT_TIERS =
47+
List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);
48+
4349
/**
4450
* Returns true if the given tier name is a valid tier
4551
*/
@@ -51,6 +57,19 @@ public static boolean validTierName(String tierName) {
5157
DATA_FROZEN.equals(tierName);
5258
}
5359

60+
/**
61+
* Based on the provided target tier it will return a comma separated list of preferred tiers.
62+
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
63+
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
64+
*/
65+
public static String getPreferredTiersConfiguration(String targetTier) {
66+
int indexOfTargetTier = ORDERED_FROZEN_TO_HOT_TIERS.indexOf(targetTier);
67+
if (indexOfTargetTier == -1) {
68+
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
69+
}
70+
return ORDERED_FROZEN_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
71+
}
72+
5473
/**
5574
* Returns true iff the given settings have a data tier setting configured
5675
*/

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MigrateAction.java

+2-17
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
import java.io.IOException;
2626
import java.util.List;
2727
import java.util.Objects;
28-
import java.util.stream.Collectors;
28+
29+
import static org.elasticsearch.xpack.core.DataTier.getPreferredTiersConfiguration;
2930

3031
/**
3132
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
@@ -37,9 +38,6 @@ public class MigrateAction implements LifecycleAction {
3738

3839
private static final Logger logger = LogManager.getLogger(MigrateAction.class);
3940
static final String CONDITIONAL_SKIP_MIGRATE_STEP = BranchingStep.NAME + "-check-skip-action";
40-
// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
41-
private static final List<String> FROZEN_TO_HOT_TIERS =
42-
List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);
4341

4442
private static final ConstructingObjectParser<MigrateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
4543
a -> new MigrateAction(a[0] == null ? true : (boolean) a[0]));
@@ -128,19 +126,6 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
128126
}
129127
}
130128

131-
/**
132-
* Based on the provided target tier it will return a comma separated list of preferred tiers.
133-
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
134-
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
135-
*/
136-
static String getPreferredTiersConfiguration(String targetTier) {
137-
int indexOfTargetTier = FROZEN_TO_HOT_TIERS.indexOf(targetTier);
138-
if (indexOfTargetTier == -1) {
139-
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
140-
}
141-
return FROZEN_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
142-
}
143-
144129
@Override
145130
public int hashCode() {
146131
return Objects.hash(enabled);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ilm;
9+
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
13+
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.common.Strings;
18+
import org.elasticsearch.common.xcontent.DeprecationHandler;
19+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
20+
import org.elasticsearch.common.xcontent.XContentParser;
21+
import org.elasticsearch.common.xcontent.json.JsonXContent;
22+
import org.elasticsearch.core.Nullable;
23+
24+
import java.util.ArrayList;
25+
import java.util.LinkedHashSet;
26+
import java.util.List;
27+
import java.util.Set;
28+
import java.util.Spliterators;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.StreamSupport;
31+
32+
import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
33+
34+
/**
35+
* We cache the currently executing ILM phase in the index metadata so the ILM execution for managed indices is not irrecoverably
36+
* interrupted by a concurrent update policy that, say, would remove the current execution phase altogether.
37+
* <p>
38+
* This contains class contains a series of methods that help manage the cached ILM phase.
39+
*/
40+
public final class PhaseCacheManagement {
41+
42+
private static final Logger logger = LogManager.getLogger(PhaseCacheManagement.class);
43+
44+
private PhaseCacheManagement() {
45+
}
46+
47+
/**
48+
* Rereads the phase JSON for the given index, returning a new cluster state.
49+
*/
50+
public static ClusterState refreshPhaseDefinition(final ClusterState state, final String index,
51+
final LifecyclePolicyMetadata updatedPolicy) {
52+
final IndexMetadata idxMeta = state.metadata().index(index);
53+
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata());
54+
refreshPhaseDefinition(metadataBuilder, idxMeta, updatedPolicy);
55+
return ClusterState.builder(state).metadata(metadataBuilder).build();
56+
}
57+
58+
/**
59+
* Rereads the phase JSON for the given index, and updates the provided metadata.
60+
*/
61+
public static void refreshPhaseDefinition(final Metadata.Builder metadataBuilder, final IndexMetadata idxMeta,
62+
final LifecyclePolicyMetadata updatedPolicy) {
63+
String index = idxMeta.getIndex().getName();
64+
assert eligibleToCheckForRefresh(idxMeta) : "index " + index + " is missing crucial information needed to refresh phase definition";
65+
66+
logger.trace("[{}] updating cached phase definition for policy [{}]", index, updatedPolicy.getName());
67+
LifecycleExecutionState currentExState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
68+
69+
String currentPhase = currentExState.getPhase();
70+
PhaseExecutionInfo pei = new PhaseExecutionInfo(updatedPolicy.getName(),
71+
updatedPolicy.getPolicy().getPhases().get(currentPhase), updatedPolicy.getVersion(), updatedPolicy.getModifiedDate());
72+
73+
LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState)
74+
.setPhaseDefinition(Strings.toString(pei, false, false))
75+
.build();
76+
77+
metadataBuilder.put(IndexMetadata.builder(idxMeta)
78+
.putCustom(ILM_CUSTOM_METADATA_KEY, newExState.asMap()));
79+
}
80+
81+
82+
/**
83+
* Ensure that we have the minimum amount of metadata necessary to check for cache phase
84+
* refresh. This includes:
85+
* - An execution state
86+
* - Existing phase definition JSON
87+
* - A current step key
88+
* - A current phase in the step key
89+
* - Not currently in the ERROR step
90+
*/
91+
public static boolean eligibleToCheckForRefresh(final IndexMetadata metadata) {
92+
LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metadata);
93+
if (executionState == null || executionState.getPhaseDefinition() == null) {
94+
return false;
95+
}
96+
97+
Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
98+
if (currentStepKey == null || currentStepKey.getPhase() == null) {
99+
return false;
100+
}
101+
102+
return ErrorStep.NAME.equals(currentStepKey.getName()) == false;
103+
}
104+
105+
/**
106+
* For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed.
107+
*/
108+
public static ClusterState updateIndicesForPolicy(final ClusterState state, final NamedXContentRegistry xContentRegistry,
109+
final Client client, final LifecyclePolicy oldPolicy,
110+
final LifecyclePolicyMetadata newPolicy) {
111+
Metadata.Builder mb = Metadata.builder(state.metadata());
112+
if (updateIndicesForPolicy(mb, state, xContentRegistry, client, oldPolicy, newPolicy)) {
113+
return ClusterState.builder(state).metadata(mb).build();
114+
}
115+
return state;
116+
}
117+
118+
/**
119+
* For the given new policy, update the provided metadata to reflect the refreshed phase JSON for all updateable indices.
120+
* Returns true if any indices were updated and false otherwise.
121+
* Users of this API should consider the returned value and only create a new {@link ClusterState} if `true` is returned.
122+
*/
123+
public static boolean updateIndicesForPolicy(final Metadata.Builder mb, final ClusterState currentState,
124+
final NamedXContentRegistry xContentRegistry, final Client client,
125+
final LifecyclePolicy oldPolicy, final LifecyclePolicyMetadata newPolicy) {
126+
assert oldPolicy.getName().equals(newPolicy.getName()) : "expected both policies to have the same id but they were: [" +
127+
oldPolicy.getName() + "] vs. [" + newPolicy.getName() + "]";
128+
129+
// No need to update anything if the policies are identical in contents
130+
if (oldPolicy.equals(newPolicy.getPolicy())) {
131+
logger.debug("policy [{}] is unchanged and no phase definition refresh is needed", oldPolicy.getName());
132+
return false;
133+
}
134+
135+
final List<IndexMetadata> indicesThatCanBeUpdated =
136+
StreamSupport.stream(Spliterators.spliteratorUnknownSize(currentState.metadata().indices().valuesIt(), 0), false)
137+
.filter(meta -> newPolicy.getName().equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(meta.getSettings())))
138+
.filter(meta -> isIndexPhaseDefinitionUpdatable(xContentRegistry, client, meta, newPolicy.getPolicy()))
139+
.collect(Collectors.toList());
140+
141+
final List<String> refreshedIndices = new ArrayList<>(indicesThatCanBeUpdated.size());
142+
for (IndexMetadata index : indicesThatCanBeUpdated) {
143+
try {
144+
refreshPhaseDefinition(mb, index, newPolicy);
145+
refreshedIndices.add(index.getIndex().getName());
146+
} catch (Exception e) {
147+
logger.warn(new ParameterizedMessage("[{}] unable to refresh phase definition for updated policy [{}]",
148+
index, newPolicy.getName()), e);
149+
}
150+
}
151+
logger.debug("refreshed policy [{}] phase definition for [{}] indices", newPolicy.getName(), refreshedIndices.size());
152+
return refreshedIndices.size() > 0;
153+
}
154+
155+
/**
156+
* Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise.
157+
*/
158+
public static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistry xContentRegistry, final Client client,
159+
final IndexMetadata metadata, final LifecyclePolicy newPolicy) {
160+
final String index = metadata.getIndex().getName();
161+
if (eligibleToCheckForRefresh(metadata) == false) {
162+
logger.debug("[{}] does not contain enough information to check for eligibility of refreshing phase", index);
163+
return false;
164+
}
165+
final String policyId = newPolicy.getName();
166+
167+
final LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metadata);
168+
final Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
169+
final String currentPhase = currentStepKey.getPhase();
170+
171+
final Set<Step.StepKey> newStepKeys = newPolicy.toSteps(client).stream()
172+
.map(Step::getKey)
173+
.collect(Collectors.toCollection(LinkedHashSet::new));
174+
175+
if (newStepKeys.contains(currentStepKey) == false) {
176+
// The index is on a step that doesn't exist in the new policy, we
177+
// can't safely re-read the JSON
178+
logger.debug("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed",
179+
index, policyId, currentStepKey);
180+
return false;
181+
}
182+
183+
final String phaseDef = executionState.getPhaseDefinition();
184+
final Set<Step.StepKey> oldStepKeys = readStepKeys(xContentRegistry, client, phaseDef, currentPhase);
185+
if (oldStepKeys == null) {
186+
logger.debug("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed",
187+
index, policyId);
188+
return false;
189+
}
190+
191+
final Set<Step.StepKey> oldPhaseStepKeys = oldStepKeys.stream()
192+
.filter(sk -> currentPhase.equals(sk.getPhase()))
193+
.collect(Collectors.toCollection(LinkedHashSet::new));
194+
195+
final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyId, newPolicy.getPhases().get(currentPhase), 1L, 1L);
196+
final String peiJson = Strings.toString(phaseExecutionInfo);
197+
198+
final Set<Step.StepKey> newPhaseStepKeys = readStepKeys(xContentRegistry, client, peiJson, currentPhase);
199+
if (newPhaseStepKeys == null) {
200+
logger.debug(new ParameterizedMessage("[{}] unable to parse phase definition for policy [{}] " +
201+
"to determine if it could be refreshed", index, policyId));
202+
return false;
203+
}
204+
205+
if (newPhaseStepKeys.equals(oldPhaseStepKeys)) {
206+
// The new and old phase have the same stepkeys for this current phase, so we can
207+
// refresh the definition because we know it won't change the execution flow.
208+
logger.debug("[{}] updated policy [{}] contains the same phase step keys and can be refreshed", index, policyId);
209+
return true;
210+
} else {
211+
logger.debug("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " +
212+
"definition as it differs too greatly. old: {}, new: {}",
213+
index, policyId, oldPhaseStepKeys, newPhaseStepKeys);
214+
return false;
215+
}
216+
}
217+
218+
/**
219+
* Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase.
220+
* If there is an error parsing or if the phase definition is missing the required
221+
* information, returns null.
222+
*/
223+
@Nullable
224+
static Set<Step.StepKey> readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client,
225+
final String phaseDef, final String currentPhase) {
226+
final PhaseExecutionInfo phaseExecutionInfo;
227+
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
228+
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
229+
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
230+
} catch (Exception e) {
231+
logger.trace(new ParameterizedMessage("exception reading step keys checking for refreshability, phase definition: {}",
232+
phaseDef), e);
233+
return null;
234+
}
235+
236+
if (phaseExecutionInfo == null || phaseExecutionInfo.getPhase() == null) {
237+
return null;
238+
}
239+
240+
return phaseExecutionInfo.getPhase().getActions().values().stream()
241+
.flatMap(a -> a.toSteps(client, phaseExecutionInfo.getPhase().getName(), null).stream())
242+
.map(Step::getKey)
243+
.collect(Collectors.toCollection(LinkedHashSet::new));
244+
}
245+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java

+13
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
import java.util.concurrent.atomic.AtomicInteger;
2727
import java.util.stream.StreamSupport;
2828

29+
import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
30+
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
31+
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
32+
import static org.elasticsearch.xpack.core.DataTier.getPreferredTiersConfiguration;
33+
import static org.hamcrest.CoreMatchers.is;
2934
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
3035
import static org.hamcrest.Matchers.hasItem;
3136
import static org.hamcrest.Matchers.not;
@@ -110,6 +115,14 @@ public void testDataRoleDoesNotImplyTieredDataRoles() {
110115
assertThat(node.getRoles(), not(hasItem(DiscoveryNodeRole.DATA_COLD_NODE_ROLE)));
111116
}
112117

118+
public void testGetPreferredTiersConfiguration() {
119+
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
120+
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
121+
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
122+
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
123+
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
124+
}
125+
113126
private static DiscoveryNodes buildDiscoveryNodes() {
114127
int numNodes = randomIntBetween(3, 15);
115128
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MigrateActionTests.java

-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
2424
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
2525
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
26-
import static org.elasticsearch.xpack.core.ilm.MigrateAction.getPreferredTiersConfiguration;
2726
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
2827
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
2928
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
@@ -83,14 +82,6 @@ public void testToSteps() {
8382
}
8483
}
8584

86-
public void testGetPreferredTiersConfiguration() {
87-
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
88-
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
89-
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
90-
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
91-
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
92-
}
93-
9485
public void testMigrateActionsConfiguresTierPreference() {
9586
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
9687
randomAlphaOfLengthBetween(1, 10));

0 commit comments

Comments
 (0)