Skip to content

Commit 7b3dd30

Browse files
authored
[ML] Update ML results mappings on process start (#37706)
This change moves the update to the results index mappings from the open job action to the code that starts the autodetect process. When a rolling upgrade is performed we need to update the mappings for already-open jobs that are reassigned from an old version node to a new version node, but the open job action is not called in this case. Closes #37607
1 parent b3f9bec commit 7b3dd30

File tree

7 files changed

+336
-212
lines changed

7 files changed

+336
-212
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,24 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.job.persistence;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
11+
import org.elasticsearch.ElasticsearchException;
812
import org.elasticsearch.Version;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
15+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
16+
import org.elasticsearch.client.Client;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.metadata.AliasOrIndex;
19+
import org.elasticsearch.cluster.metadata.IndexMetaData;
20+
import org.elasticsearch.cluster.metadata.MappingMetaData;
21+
import org.elasticsearch.common.CheckedSupplier;
22+
import org.elasticsearch.common.collect.ImmutableOpenMap;
923
import org.elasticsearch.common.xcontent.XContentBuilder;
24+
import org.elasticsearch.index.Index;
25+
import org.elasticsearch.plugins.MapperPlugin;
1026
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
1127
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
1228
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
@@ -38,10 +54,16 @@
3854
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;
3955

4056
import java.io.IOException;
57+
import java.util.ArrayList;
58+
import java.util.Arrays;
4159
import java.util.Collection;
4260
import java.util.Collections;
61+
import java.util.List;
62+
import java.util.Map;
4363

4464
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
65+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
66+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
4567

4668
/**
4769
* Static methods to create Elasticsearch index mappings for the autodetect
@@ -107,6 +129,8 @@ public class ElasticsearchMappings {
107129

108130
static final String RAW = "raw";
109131

132+
private static final Logger logger = LogManager.getLogger(ElasticsearchMappings.class);
133+
110134
private ElasticsearchMappings() {
111135
}
112136

@@ -964,4 +988,94 @@ public static XContentBuilder auditMessageMapping() throws IOException {
964988
.endObject()
965989
.endObject();
966990
}
991+
992+
static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion) throws IOException {
993+
List<String> indicesToUpdate = new ArrayList<>();
994+
995+
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> currentMapping = state.metaData().findMappings(concreteIndices,
996+
new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER);
997+
998+
for (String index : concreteIndices) {
999+
ImmutableOpenMap<String, MappingMetaData> innerMap = currentMapping.get(index);
1000+
if (innerMap != null) {
1001+
MappingMetaData metaData = innerMap.get(DOC_TYPE);
1002+
try {
1003+
@SuppressWarnings("unchecked")
1004+
Map<String, Object> meta = (Map<String, Object>) metaData.sourceAsMap().get("_meta");
1005+
if (meta != null) {
1006+
String versionString = (String) meta.get("version");
1007+
if (versionString == null) {
1008+
logger.info("Version of mappings for [{}] not found, recreating", index);
1009+
indicesToUpdate.add(index);
1010+
continue;
1011+
}
1012+
1013+
Version mappingVersion = Version.fromString(versionString);
1014+
1015+
if (mappingVersion.onOrAfter(minVersion)) {
1016+
continue;
1017+
} else {
1018+
logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT);
1019+
indicesToUpdate.add(index);
1020+
continue;
1021+
}
1022+
} else {
1023+
logger.info("Version of mappings for [{}] not found, recreating", index);
1024+
indicesToUpdate.add(index);
1025+
continue;
1026+
}
1027+
} catch (Exception e) {
1028+
logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e);
1029+
indicesToUpdate.add(index);
1030+
continue;
1031+
}
1032+
} else {
1033+
logger.info("No mappings found for [{}], recreating", index);
1034+
indicesToUpdate.add(index);
1035+
}
1036+
}
1037+
return indicesToUpdate.toArray(new String[indicesToUpdate.size()]);
1038+
}
1039+
1040+
public static void addDocMappingIfMissing(String alias, CheckedSupplier<XContentBuilder, IOException> mappingSupplier,
1041+
Client client, ClusterState state, ActionListener<Boolean> listener) {
1042+
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
1043+
if (aliasOrIndex == null) {
1044+
// The index has never been created yet
1045+
listener.onResponse(true);
1046+
return;
1047+
}
1048+
String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName)
1049+
.toArray(String[]::new);
1050+
1051+
String[] indicesThatRequireAnUpdate;
1052+
try {
1053+
indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT);
1054+
} catch (IOException e) {
1055+
listener.onFailure(e);
1056+
return;
1057+
}
1058+
1059+
if (indicesThatRequireAnUpdate.length > 0) {
1060+
try (XContentBuilder mapping = mappingSupplier.get()) {
1061+
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
1062+
putMappingRequest.type(DOC_TYPE);
1063+
putMappingRequest.source(mapping);
1064+
executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest,
1065+
ActionListener.wrap(response -> {
1066+
if (response.isAcknowledged()) {
1067+
listener.onResponse(true);
1068+
} else {
1069+
listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices "
1070+
+ Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged"));
1071+
}
1072+
}, listener::onFailure));
1073+
} catch (IOException e) {
1074+
listener.onFailure(e);
1075+
}
1076+
} else {
1077+
logger.trace("Mappings are up to date.");
1078+
listener.onResponse(true);
1079+
}
1080+
}
9671081
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,18 @@
99
import com.fasterxml.jackson.core.JsonParseException;
1010
import com.fasterxml.jackson.core.JsonParser;
1111
import com.fasterxml.jackson.core.JsonToken;
12+
import org.elasticsearch.Version;
13+
import org.elasticsearch.cluster.ClusterName;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.IndexMetaData;
16+
import org.elasticsearch.cluster.metadata.MappingMetaData;
17+
import org.elasticsearch.cluster.metadata.MetaData;
1218
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.common.settings.Settings;
1320
import org.elasticsearch.common.xcontent.XContentBuilder;
1421
import org.elasticsearch.common.xcontent.XContentParser;
1522
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.test.VersionUtils;
1624
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
1725
import org.elasticsearch.xpack.core.ml.job.config.Job;
1826
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
@@ -30,6 +38,8 @@
3038
import java.io.IOException;
3139
import java.nio.charset.StandardCharsets;
3240
import java.util.Arrays;
41+
import java.util.Collections;
42+
import java.util.HashMap;
3343
import java.util.HashSet;
3444
import java.util.List;
3545
import java.util.Map;
@@ -128,6 +138,96 @@ public void testTermFieldMapping() throws IOException {
128138
assertNull(instanceMapping);
129139
}
130140

141+
142+
public void testMappingRequiresUpdateNoMapping() throws IOException {
143+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
144+
ClusterState cs = csBuilder.build();
145+
String[] indices = new String[] { "no_index" };
146+
147+
assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
148+
}
149+
150+
public void testMappingRequiresUpdateNullMapping() throws IOException {
151+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null));
152+
String[] indices = new String[] { "null_index" };
153+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
154+
}
155+
156+
public void testMappingRequiresUpdateNoVersion() throws IOException {
157+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD"));
158+
String[] indices = new String[] { "no_version_field" };
159+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
160+
}
161+
162+
public void testMappingRequiresUpdateRecentMappingVersion() throws IOException {
163+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString()));
164+
String[] indices = new String[] { "version_current" };
165+
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
166+
}
167+
168+
public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException {
169+
ClusterState cs = getClusterStateWithMappingsWithMetaData(
170+
Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0")));
171+
String[] indices = new String[] { "version_nested" };
172+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
173+
}
174+
175+
public void testMappingRequiresUpdateBogusMappingVersion() throws IOException {
176+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0"));
177+
String[] indices = new String[] { "version_bogus" };
178+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
179+
}
180+
181+
public void testMappingRequiresUpdateNewerMappingVersion() throws IOException {
182+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT));
183+
String[] indices = new String[] { "version_newer" };
184+
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion()));
185+
}
186+
187+
public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException {
188+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT));
189+
String[] indices = new String[] { "version_newer_minor" };
190+
assertArrayEquals(new String[] {},
191+
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion()));
192+
}
193+
194+
195+
private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
196+
MetaData.Builder metaDataBuilder = MetaData.builder();
197+
198+
for (Map.Entry<String, Object> entry : namesAndVersions.entrySet()) {
199+
200+
String indexName = entry.getKey();
201+
Object version = entry.getValue();
202+
203+
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
204+
indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
205+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
206+
207+
Map<String, Object> mapping = new HashMap<>();
208+
Map<String, Object> properties = new HashMap<>();
209+
for (int i = 0; i < 10; i++) {
210+
properties.put("field" + i, Collections.singletonMap("type", "string"));
211+
}
212+
mapping.put("properties", properties);
213+
214+
Map<String, Object> meta = new HashMap<>();
215+
if (version != null && version.equals("NO_VERSION_FIELD") == false) {
216+
meta.put("version", version);
217+
}
218+
mapping.put("_meta", meta);
219+
220+
indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping));
221+
222+
metaDataBuilder.put(indexMetaData);
223+
}
224+
MetaData metaData = metaDataBuilder.build();
225+
226+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
227+
csBuilder.metaData(metaData);
228+
return csBuilder.build();
229+
}
230+
131231
private Set<String> collectResultsDocFieldNames() throws IOException {
132232
// Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here.
133233
return collectFieldNames(ElasticsearchMappings.resultsMapping());

0 commit comments

Comments
 (0)