Skip to content

Commit 3ec3182

Browse files
authored
Service to migrate indices and ILM policies to data tiers (elastic#73689) (elastic#74287)
This adds a service that migrates the indices and ILM policies away from custom node attribute allocation routing to data tiers. Optionally, it also deletes one legacy index template. (cherry picked from commit 6285fac) Signed-off-by: Andrei Dan <[email protected]>
1 parent 04bca48 commit 3ec3182

File tree

11 files changed

+1965
-350
lines changed

11 files changed

+1965
-350
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTier.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.util.Arrays;
2020
import java.util.HashSet;
21+
import java.util.List;
2122
import java.util.Set;
23+
import java.util.stream.Collectors;
2224

2325
/**
2426
* The {@code DataTier} class encapsulates the formalization of the "content",
@@ -40,6 +42,10 @@ public class DataTier {
4042
public static final Set<String> ALL_DATA_TIERS =
4143
new HashSet<>(Arrays.asList(DATA_CONTENT, DATA_HOT, DATA_WARM, DATA_COLD, DATA_FROZEN));
4244

45+
// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
46+
private static final List<String> ORDERED_FROZEN_TO_HOT_TIERS =
47+
org.elasticsearch.core.List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);
48+
4349
/**
4450
* Returns true if the given tier name is a valid tier
4551
*/
@@ -51,6 +57,19 @@ public static boolean validTierName(String tierName) {
5157
DATA_FROZEN.equals(tierName);
5258
}
5359

60+
/**
61+
* Based on the provided target tier it will return a comma separated list of preferred tiers.
62+
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
63+
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
64+
*/
65+
public static String getPreferredTiersConfiguration(String targetTier) {
66+
int indexOfTargetTier = ORDERED_FROZEN_TO_HOT_TIERS.indexOf(targetTier);
67+
if (indexOfTargetTier == -1) {
68+
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
69+
}
70+
return ORDERED_FROZEN_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
71+
}
72+
5473
/**
5574
* Returns true iff the given settings have a data tier setting configured
5675
*/

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MigrateAction.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
import java.io.IOException;
2626
import java.util.List;
2727
import java.util.Objects;
28-
import java.util.stream.Collectors;
28+
29+
import static org.elasticsearch.xpack.core.DataTier.getPreferredTiersConfiguration;
2930

3031
/**
3132
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
@@ -37,9 +38,6 @@ public class MigrateAction implements LifecycleAction {
3738

3839
private static final Logger logger = LogManager.getLogger(MigrateAction.class);
3940
static final String CONDITIONAL_SKIP_MIGRATE_STEP = BranchingStep.NAME + "-check-skip-action";
40-
// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
41-
private static final List<String> FROZEN_TO_HOT_TIERS =
42-
org.elasticsearch.core.List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);
4341

4442
private static final ConstructingObjectParser<MigrateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
4543
a -> new MigrateAction(a[0] == null ? true : (boolean) a[0]));
@@ -128,19 +126,6 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
128126
}
129127
}
130128

131-
/**
132-
* Based on the provided target tier it will return a comma separated list of preferred tiers.
133-
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
134-
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
135-
*/
136-
static String getPreferredTiersConfiguration(String targetTier) {
137-
int indexOfTargetTier = FROZEN_TO_HOT_TIERS.indexOf(targetTier);
138-
if (indexOfTargetTier == -1) {
139-
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
140-
}
141-
return FROZEN_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
142-
}
143-
144129
@Override
145130
public int hashCode() {
146131
return Objects.hash(enabled);
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ilm;
9+
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
13+
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.Metadata;
17+
import org.elasticsearch.common.Strings;
18+
import org.elasticsearch.common.xcontent.DeprecationHandler;
19+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
20+
import org.elasticsearch.common.xcontent.XContentParser;
21+
import org.elasticsearch.common.xcontent.json.JsonXContent;
22+
import org.elasticsearch.core.Nullable;
23+
24+
import java.util.ArrayList;
25+
import java.util.LinkedHashSet;
26+
import java.util.List;
27+
import java.util.Set;
28+
import java.util.Spliterators;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.StreamSupport;
31+
32+
import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
33+
34+
/**
35+
* We cache the currently executing ILM phase in the index metadata so the ILM execution for managed indices is not irrecoverably
36+
* interrupted by a concurrent update policy that, say, would remove the current execution phase altogether.
37+
* <p>
38+
* This contains class contains a series of methods that help manage the cached ILM phase.
39+
*/
40+
public final class PhaseCacheManagement {
41+
42+
private static final Logger logger = LogManager.getLogger(PhaseCacheManagement.class);
43+
44+
private PhaseCacheManagement() {
45+
}
46+
47+
/**
48+
* Rereads the phase JSON for the given index, returning a new cluster state.
49+
*/
50+
public static ClusterState refreshPhaseDefinition(final ClusterState state, final String index,
51+
final LifecyclePolicyMetadata updatedPolicy) {
52+
final IndexMetadata idxMeta = state.metadata().index(index);
53+
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata());
54+
refreshPhaseDefinition(metadataBuilder, idxMeta, updatedPolicy);
55+
return ClusterState.builder(state).metadata(metadataBuilder).build();
56+
}
57+
58+
/**
59+
* Rereads the phase JSON for the given index, and updates the provided metadata.
60+
*/
61+
public static void refreshPhaseDefinition(final Metadata.Builder metadataBuilder, final IndexMetadata idxMeta,
62+
final LifecyclePolicyMetadata updatedPolicy) {
63+
String index = idxMeta.getIndex().getName();
64+
assert eligibleToCheckForRefresh(idxMeta) : "index " + index + " is missing crucial information needed to refresh phase definition";
65+
66+
logger.trace("[{}] updating cached phase definition for policy [{}]", index, updatedPolicy.getName());
67+
LifecycleExecutionState currentExState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
68+
69+
String currentPhase = currentExState.getPhase();
70+
PhaseExecutionInfo pei = new PhaseExecutionInfo(updatedPolicy.getName(),
71+
updatedPolicy.getPolicy().getPhases().get(currentPhase), updatedPolicy.getVersion(), updatedPolicy.getModifiedDate());
72+
73+
LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState)
74+
.setPhaseDefinition(Strings.toString(pei, false, false))
75+
.build();
76+
77+
metadataBuilder.put(IndexMetadata.builder(idxMeta)
78+
.putCustom(ILM_CUSTOM_METADATA_KEY, newExState.asMap()));
79+
}
80+
81+
82+
/**
83+
* Ensure that we have the minimum amount of metadata necessary to check for cache phase
84+
* refresh. This includes:
85+
* - An execution state
86+
* - Existing phase definition JSON
87+
* - A current step key
88+
* - A current phase in the step key
89+
* - Not currently in the ERROR step
90+
*/
91+
public static boolean eligibleToCheckForRefresh(final IndexMetadata metadata) {
92+
LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metadata);
93+
if (executionState == null || executionState.getPhaseDefinition() == null) {
94+
return false;
95+
}
96+
97+
Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
98+
if (currentStepKey == null || currentStepKey.getPhase() == null) {
99+
return false;
100+
}
101+
102+
return ErrorStep.NAME.equals(currentStepKey.getName()) == false;
103+
}
104+
105+
/**
106+
* For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed.
107+
*/
108+
public static ClusterState updateIndicesForPolicy(final ClusterState state, final NamedXContentRegistry xContentRegistry,
109+
final Client client, final LifecyclePolicy oldPolicy,
110+
final LifecyclePolicyMetadata newPolicy) {
111+
Metadata.Builder mb = Metadata.builder(state.metadata());
112+
if (updateIndicesForPolicy(mb, state, xContentRegistry, client, oldPolicy, newPolicy)) {
113+
return ClusterState.builder(state).metadata(mb).build();
114+
}
115+
return state;
116+
}
117+
118+
/**
119+
* For the given new policy, update the provided metadata to reflect the refreshed phase JSON for all updateable indices.
120+
* Returns true if any indices were updated and false otherwise.
121+
* Users of this API should consider the returned value and only create a new {@link ClusterState} if `true` is returned.
122+
*/
123+
public static boolean updateIndicesForPolicy(final Metadata.Builder mb, final ClusterState currentState,
124+
final NamedXContentRegistry xContentRegistry, final Client client,
125+
final LifecyclePolicy oldPolicy, final LifecyclePolicyMetadata newPolicy) {
126+
assert oldPolicy.getName().equals(newPolicy.getName()) : "expected both policies to have the same id but they were: [" +
127+
oldPolicy.getName() + "] vs. [" + newPolicy.getName() + "]";
128+
129+
// No need to update anything if the policies are identical in contents
130+
if (oldPolicy.equals(newPolicy.getPolicy())) {
131+
logger.debug("policy [{}] is unchanged and no phase definition refresh is needed", oldPolicy.getName());
132+
return false;
133+
}
134+
135+
final List<IndexMetadata> indicesThatCanBeUpdated =
136+
StreamSupport.stream(Spliterators.spliteratorUnknownSize(currentState.metadata().indices().valuesIt(), 0), false)
137+
.filter(meta -> newPolicy.getName().equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(meta.getSettings())))
138+
.filter(meta -> isIndexPhaseDefinitionUpdatable(xContentRegistry, client, meta, newPolicy.getPolicy()))
139+
.collect(Collectors.toList());
140+
141+
final List<String> refreshedIndices = new ArrayList<>(indicesThatCanBeUpdated.size());
142+
for (IndexMetadata index : indicesThatCanBeUpdated) {
143+
try {
144+
refreshPhaseDefinition(mb, index, newPolicy);
145+
refreshedIndices.add(index.getIndex().getName());
146+
} catch (Exception e) {
147+
logger.warn(new ParameterizedMessage("[{}] unable to refresh phase definition for updated policy [{}]",
148+
index, newPolicy.getName()), e);
149+
}
150+
}
151+
logger.debug("refreshed policy [{}] phase definition for [{}] indices", newPolicy.getName(), refreshedIndices.size());
152+
return refreshedIndices.size() > 0;
153+
}
154+
155+
/**
156+
* Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise.
157+
*/
158+
public static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistry xContentRegistry, final Client client,
159+
final IndexMetadata metadata, final LifecyclePolicy newPolicy) {
160+
final String index = metadata.getIndex().getName();
161+
if (eligibleToCheckForRefresh(metadata) == false) {
162+
logger.debug("[{}] does not contain enough information to check for eligibility of refreshing phase", index);
163+
return false;
164+
}
165+
final String policyId = newPolicy.getName();
166+
167+
final LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metadata);
168+
final Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
169+
final String currentPhase = currentStepKey.getPhase();
170+
171+
final Set<Step.StepKey> newStepKeys = newPolicy.toSteps(client).stream()
172+
.map(Step::getKey)
173+
.collect(Collectors.toCollection(LinkedHashSet::new));
174+
175+
if (newStepKeys.contains(currentStepKey) == false) {
176+
// The index is on a step that doesn't exist in the new policy, we
177+
// can't safely re-read the JSON
178+
logger.debug("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed",
179+
index, policyId, currentStepKey);
180+
return false;
181+
}
182+
183+
final String phaseDef = executionState.getPhaseDefinition();
184+
final Set<Step.StepKey> oldStepKeys = readStepKeys(xContentRegistry, client, phaseDef, currentPhase);
185+
if (oldStepKeys == null) {
186+
logger.debug("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed",
187+
index, policyId);
188+
return false;
189+
}
190+
191+
final Set<Step.StepKey> oldPhaseStepKeys = oldStepKeys.stream()
192+
.filter(sk -> currentPhase.equals(sk.getPhase()))
193+
.collect(Collectors.toCollection(LinkedHashSet::new));
194+
195+
final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyId, newPolicy.getPhases().get(currentPhase), 1L, 1L);
196+
final String peiJson = Strings.toString(phaseExecutionInfo);
197+
198+
final Set<Step.StepKey> newPhaseStepKeys = readStepKeys(xContentRegistry, client, peiJson, currentPhase);
199+
if (newPhaseStepKeys == null) {
200+
logger.debug(new ParameterizedMessage("[{}] unable to parse phase definition for policy [{}] " +
201+
"to determine if it could be refreshed", index, policyId));
202+
return false;
203+
}
204+
205+
if (newPhaseStepKeys.equals(oldPhaseStepKeys)) {
206+
// The new and old phase have the same stepkeys for this current phase, so we can
207+
// refresh the definition because we know it won't change the execution flow.
208+
logger.debug("[{}] updated policy [{}] contains the same phase step keys and can be refreshed", index, policyId);
209+
return true;
210+
} else {
211+
logger.debug("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " +
212+
"definition as it differs too greatly. old: {}, new: {}",
213+
index, policyId, oldPhaseStepKeys, newPhaseStepKeys);
214+
return false;
215+
}
216+
}
217+
218+
/**
219+
* Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase.
220+
* If there is an error parsing or if the phase definition is missing the required
221+
* information, returns null.
222+
*/
223+
@Nullable
224+
static Set<Step.StepKey> readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client,
225+
final String phaseDef, final String currentPhase) {
226+
final PhaseExecutionInfo phaseExecutionInfo;
227+
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
228+
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
229+
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
230+
} catch (Exception e) {
231+
logger.trace(new ParameterizedMessage("exception reading step keys checking for refreshability, phase definition: {}",
232+
phaseDef), e);
233+
return null;
234+
}
235+
236+
if (phaseExecutionInfo == null || phaseExecutionInfo.getPhase() == null) {
237+
return null;
238+
}
239+
240+
return phaseExecutionInfo.getPhase().getActions().values().stream()
241+
.flatMap(a -> a.toSteps(client, phaseExecutionInfo.getPhase().getName(), null).stream())
242+
.map(Step::getKey)
243+
.collect(Collectors.toCollection(LinkedHashSet::new));
244+
}
245+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/DataTierTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727
import java.util.concurrent.atomic.AtomicInteger;
2828
import java.util.stream.StreamSupport;
2929

30+
import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
31+
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
32+
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
33+
import static org.elasticsearch.xpack.core.DataTier.getPreferredTiersConfiguration;
34+
import static org.hamcrest.CoreMatchers.is;
3035
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
3136
import static org.hamcrest.Matchers.hasItem;
3237
import static org.hamcrest.Matchers.not;
@@ -131,6 +136,14 @@ public void testDisablingLegacyDataRoleDisablesTieredDataRoles() {
131136
assertSettingDeprecationsAndWarnings(new Setting<?>[]{DiscoveryNodeRole.DATA_ROLE.legacySetting()});
132137
}
133138

139+
public void testGetPreferredTiersConfiguration() {
140+
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
141+
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
142+
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
143+
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
144+
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
145+
}
146+
134147
private static DiscoveryNodes buildDiscoveryNodes() {
135148
int numNodes = randomIntBetween(3, 15);
136149
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MigrateActionTests.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
2424
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
2525
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
26-
import static org.elasticsearch.xpack.core.ilm.MigrateAction.getPreferredTiersConfiguration;
2726
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
2827
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
2928
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
@@ -83,14 +82,6 @@ public void testToSteps() {
8382
}
8483
}
8584

86-
public void testGetPreferredTiersConfiguration() {
87-
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
88-
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
89-
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
90-
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
91-
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
92-
}
93-
9485
public void testMigrateActionsConfiguresTierPreference() {
9586
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
9687
randomAlphaOfLengthBetween(1, 10));

0 commit comments

Comments
 (0)