Skip to content

Commit 179fe9c

Browse files
[7.x][ML] Delete dest index and reindex if incompatible (#62960) (#63050)
Data frame analytics results format changed in version `7.10.0`. If existing jobs that were not completed are restarted, it is possible the destination index had already been created. That index's mappings are not suitable for the new results format. This commit checks the version of the destination index and deletes it when the version is outdated. The job will then continue by recreating the destination index and reindexing. Backport of #62960
1 parent b099bfb commit 179fe9c

File tree

5 files changed

+225
-5
lines changed

5 files changed

+225
-5
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import org.elasticsearch.client.ParentTaskAssigningClient;
2626
import org.elasticsearch.client.node.NodeClient;
2727
import org.elasticsearch.cluster.ClusterState;
28+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2829
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
30+
import org.elasticsearch.cluster.metadata.MappingMetadata;
2931
import org.elasticsearch.common.util.concurrent.ThreadContext;
3032
import org.elasticsearch.index.IndexNotFoundException;
3133
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
@@ -90,6 +92,28 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current
9092
// With config in hand, determine action to take
9193
ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
9294
config -> {
95+
// Check if existing destination index is incompatible.
96+
// If it is, we delete it and start from reindexing.
97+
IndexMetadata destIndex = clusterState.getMetadata().index(config.getDest().getIndex());
98+
if (destIndex != null) {
99+
MappingMetadata destIndexMapping = clusterState.getMetadata().index(config.getDest().getIndex()).mapping();
100+
DestinationIndex.Metadata metadata = DestinationIndex.readMetadata(config.getId(), destIndexMapping);
101+
if (metadata.hasMetadata() && (metadata.isCompatible() == false)) {
102+
LOGGER.info("[{}] Destination index was created in version [{}] but minimum supported version is [{}]. " +
103+
"Deleting index and starting from scratch.", config.getId(), metadata.getVersion(),
104+
DestinationIndex.MIN_COMPATIBLE_VERSION);
105+
task.getStatsHolder().resetProgressTracker(config.getAnalysis().getProgressPhases(),
106+
config.getAnalysis().supportsInference());
107+
DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING,
108+
task.getAllocationId(), "destination index was out of date");
109+
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
110+
updatedTask -> executeJobInMiddleOfReindexing(task, config),
111+
task::setFailed
112+
));
113+
return;
114+
}
115+
}
116+
93117
task.getStatsHolder().adjustProgressTracker(config.getAnalysis().getProgressPhases(),
94118
config.getAnalysis().supportsInference());
95119

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java

Lines changed: 93 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
*/
66
package org.elasticsearch.xpack.ml.dataframe;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
811
import org.elasticsearch.Version;
912
import org.elasticsearch.action.ActionListener;
1013
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
@@ -45,6 +48,8 @@
4548
*/
4649
public final class DestinationIndex {
4750

51+
private static final Logger logger = LogManager.getLogger(DestinationIndex.class);
52+
4853
public static final String INCREMENTAL_ID = "ml__incremental_id";
4954

5055
/**
@@ -62,13 +67,22 @@ public final class DestinationIndex {
6267
private static final String PROPERTIES = "properties";
6368
private static final String META = "_meta";
6469

70+
private static final String DFA_CREATOR = "data-frame-analytics";
71+
6572
/**
6673
* We only preserve the most important settings.
6774
* If the user needs other settings on the destination index they
6875
* should create the destination index before starting the analytics.
6976
*/
7077
private static final String[] PRESERVED_SETTINGS = new String[] {"index.number_of_shards", "index.number_of_replicas"};
7178

79+
/**
80+
* This is the minimum compatible version of the destination index we can currently work with.
81+
* If the results mappings change in a way existing destination indices will fail to index
82+
* the results, this should be bumped accordingly.
83+
*/
84+
public static final Version MIN_COMPATIBLE_VERSION = Version.V_7_10_0;
85+
7286
private DestinationIndex() {}
7387

7488
/**
@@ -130,7 +144,7 @@ private static CreateIndexRequest createIndexRequest(Clock clock, DataFrameAnaly
130144
checkResultsFieldIsNotPresentInProperties(config, properties);
131145
properties.putAll(createAdditionalMappings(config, Collections.unmodifiableMap(properties)));
132146
Map<String, Object> metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new);
133-
metadata.putAll(createMetadata(config.getId(), clock));
147+
metadata.putAll(createMetadata(config.getId(), clock, Version.CURRENT));
134148
return new CreateIndexRequest(destinationIndex, settings).mapping(type, mappingsAsMap);
135149
}
136150

@@ -173,12 +187,13 @@ private static Map<String, Object> createAdditionalMappings(DataFrameAnalyticsCo
173187
return properties;
174188
}
175189

176-
private static Map<String, Object> createMetadata(String analyticsId, Clock clock) {
190+
// Visible for testing
191+
static Map<String, Object> createMetadata(String analyticsId, Clock clock, Version version) {
177192
Map<String, Object> metadata = new HashMap<>();
178193
metadata.put(CREATION_DATE_MILLIS, clock.millis());
179-
metadata.put(CREATED_BY, "data-frame-analytics");
180-
Map<String, Version> versionMapping = new HashMap<>();
181-
versionMapping.put(CREATED, Version.CURRENT);
194+
metadata.put(CREATED_BY, DFA_CREATOR);
195+
Map<String, String> versionMapping = new HashMap<>();
196+
versionMapping.put(CREATED, version.toString());
182197
metadata.put(VERSION, versionMapping);
183198
metadata.put(ANALYTICS, analyticsId);
184199
return metadata;
@@ -234,4 +249,77 @@ private static void checkResultsFieldIsNotPresentInProperties(DataFrameAnalytics
234249
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName());
235250
}
236251
}
252+
253+
@SuppressWarnings("unchecked")
254+
public static Metadata readMetadata(String jobId, MappingMetadata mappingMetadata) {
255+
Map<String, Object> mappings = mappingMetadata.getSourceAsMap();
256+
Map<String, Object> meta = (Map<String, Object>) mappings.get(META);
257+
if ((meta == null) || (DFA_CREATOR.equals(meta.get(CREATED_BY)) == false)) {
258+
return new NoMetadata();
259+
}
260+
return new DestMetadata(getVersion(jobId, meta));
261+
}
262+
263+
@SuppressWarnings("unchecked")
264+
private static Version getVersion(String jobId, Map<String, Object> meta) {
265+
try {
266+
Map<String, Object> version = (Map<String, Object>) meta.get(VERSION);
267+
String createdVersionString = (String) version.get(CREATED);
268+
return Version.fromString(createdVersionString);
269+
} catch (Exception e) {
270+
logger.error(new ParameterizedMessage("[{}] Could not retrieve destination index version", jobId), e);
271+
return null;
272+
}
273+
}
274+
275+
public interface Metadata {
276+
277+
boolean hasMetadata();
278+
279+
boolean isCompatible();
280+
281+
String getVersion();
282+
}
283+
284+
private static class NoMetadata implements Metadata {
285+
286+
@Override
287+
public boolean hasMetadata() {
288+
return false;
289+
}
290+
291+
@Override
292+
public boolean isCompatible() {
293+
throw new UnsupportedOperationException();
294+
}
295+
296+
@Override
297+
public String getVersion() {
298+
throw new UnsupportedOperationException();
299+
}
300+
}
301+
302+
private static class DestMetadata implements Metadata {
303+
304+
private final Version version;
305+
306+
private DestMetadata(Version version) {
307+
this.version = version;
308+
}
309+
310+
@Override
311+
public boolean hasMetadata() {
312+
return true;
313+
}
314+
315+
@Override
316+
public boolean isCompatible() {
317+
return version == null ? false : version.onOrAfter(MIN_COMPATIBLE_VERSION);
318+
}
319+
320+
@Override
321+
public String getVersion() {
322+
return version == null ? "unknown" : version.toString();
323+
}
324+
}
237325
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public void adjustProgressTracker(List<String> analysisPhases, boolean hasInfere
4040
progressTracker.updateReindexingProgress(reindexingProgressPercent < 100 ? 1 : reindexingProgressPercent);
4141
}
4242

43+
public void resetProgressTracker(List<String> analysisPhases, boolean hasInferencePhase) {
44+
progressTracker = ProgressTracker.fromZeroes(analysisPhases, hasInferencePhase);
45+
progressTracker.updateReindexingProgress(1);
46+
}
47+
4348
public ProgressTracker getProgressTracker() {
4449
return progressTracker;
4550
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,83 @@ public void testUpdateMappingsToDestIndex_ResultsFieldsExistsInSourceIndex() thr
368368
verifyZeroInteractions(client);
369369
}
370370

371+
public void testReadMetadata_GivenNoMeta() {
372+
Map<String, Object> mappings = new HashMap<>();
373+
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
374+
when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
375+
376+
DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
377+
378+
assertThat(metadata.hasMetadata(), is(false));
379+
expectThrows(UnsupportedOperationException.class, () -> metadata.isCompatible());
380+
expectThrows(UnsupportedOperationException.class, () -> metadata.getVersion());
381+
}
382+
383+
public void testReadMetadata_GivenMetaWithoutCreatedTag() {
384+
Map<String, Object> mappings = new HashMap<>();
385+
mappings.put("_meta", Collections.emptyMap());
386+
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
387+
when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
388+
389+
DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
390+
391+
assertThat(metadata.hasMetadata(), is(false));
392+
expectThrows(UnsupportedOperationException.class, () -> metadata.isCompatible());
393+
expectThrows(UnsupportedOperationException.class, () -> metadata.getVersion());
394+
}
395+
396+
public void testReadMetadata_GivenMetaNotCreatedByAnalytics() {
397+
Map<String, Object> mappings = new HashMap<>();
398+
mappings.put("_meta", Collections.singletonMap("created", "other"));
399+
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
400+
when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
401+
402+
DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
403+
404+
assertThat(metadata.hasMetadata(), is(false));
405+
expectThrows(UnsupportedOperationException.class, () -> metadata.isCompatible());
406+
expectThrows(UnsupportedOperationException.class, () -> metadata.getVersion());
407+
}
408+
409+
public void testReadMetadata_GivenCurrentVersion() {
410+
Map<String, Object> mappings = new HashMap<>();
411+
mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.CURRENT));
412+
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
413+
when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
414+
415+
DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
416+
417+
assertThat(metadata.hasMetadata(), is(true));
418+
assertThat(metadata.isCompatible(), is(true));
419+
assertThat(metadata.getVersion(), equalTo(Version.CURRENT.toString()));
420+
}
421+
422+
public void testReadMetadata_GivenMinCompatibleVersion() {
423+
Map<String, Object> mappings = new HashMap<>();
424+
mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), DestinationIndex.MIN_COMPATIBLE_VERSION));
425+
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
426+
when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
427+
428+
DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
429+
430+
assertThat(metadata.hasMetadata(), is(true));
431+
assertThat(metadata.isCompatible(), is(true));
432+
assertThat(metadata.getVersion(), equalTo(DestinationIndex.MIN_COMPATIBLE_VERSION.toString()));
433+
}
434+
435+
public void testReadMetadata_GivenIncompatibleVersion() {
436+
Map<String, Object> mappings = new HashMap<>();
437+
mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.V_7_9_3));
438+
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
439+
when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
440+
441+
DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
442+
443+
assertThat(metadata.hasMetadata(), is(true));
444+
assertThat(metadata.isCompatible(), is(false));
445+
assertThat(metadata.getVersion(), equalTo(Version.V_7_9_3.toString()));
446+
}
447+
371448
private static <Response> Answer<Response> callListenerOnResponse(Response response) {
372449
return invocationOnMock -> {
373450
@SuppressWarnings("unchecked")

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolderTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,30 @@ public void testAdjustProgressTracker_GivenReindexingProgressIncomplete() {
9696
assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0));
9797
assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0));
9898
}
99+
100+
public void testResetProgressTracker() {
101+
List<PhaseProgress> phases = Collections.unmodifiableList(
102+
Arrays.asList(
103+
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 100),
104+
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20),
105+
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30),
106+
new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40),
107+
new PhaseProgress("writing_results", 50)
108+
)
109+
);
110+
StatsHolder statsHolder = new StatsHolder(phases);
111+
112+
statsHolder.resetProgressTracker(Arrays.asList("a", "b"), false);
113+
114+
List<PhaseProgress> phaseProgresses = statsHolder.getProgressTracker().report();
115+
116+
assertThat(phaseProgresses.size(), equalTo(5));
117+
assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()),
118+
contains("reindexing", "loading_data", "a", "b", "writing_results"));
119+
assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(1));
120+
assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0));
121+
assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0));
122+
assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0));
123+
assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0));
124+
}
99125
}

0 commit comments

Comments
 (0)