Skip to content

Commit cecb5f9

Browse files
authored
Update .ml-config mappings before indexing job, datafeed or df analytics config (#44216) (#44274)
1 parent ba01bbf commit cecb5f9

File tree

5 files changed

+165
-19
lines changed

5 files changed

+165
-19
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.elasticsearch.cluster.metadata.AliasOrIndex;
1919
import org.elasticsearch.cluster.metadata.IndexMetaData;
2020
import org.elasticsearch.cluster.metadata.MappingMetaData;
21-
import org.elasticsearch.common.CheckedBiFunction;
21+
import org.elasticsearch.common.CheckedFunction;
2222
import org.elasticsearch.common.collect.ImmutableOpenMap;
2323
import org.elasticsearch.common.xcontent.XContentBuilder;
2424
import org.elasticsearch.index.Index;
@@ -139,9 +139,13 @@ private ElasticsearchMappings() {
139139
}
140140

141141
public static XContentBuilder configMapping() throws IOException {
142+
return configMapping(SINGLE_MAPPING_NAME);
143+
}
144+
145+
public static XContentBuilder configMapping(String mappingType) throws IOException {
142146
XContentBuilder builder = jsonBuilder();
143147
builder.startObject();
144-
builder.startObject(SINGLE_MAPPING_NAME);
148+
builder.startObject(mappingType);
145149
addMetaInformation(builder);
146150
addDefaultMapping(builder);
147151
builder.startObject(PROPERTIES);
@@ -1129,7 +1133,7 @@ static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndic
11291133
}
11301134

11311135
public static void addDocMappingIfMissing(String alias,
1132-
CheckedBiFunction<String, Collection<String>, XContentBuilder, IOException> mappingSupplier,
1136+
CheckedFunction<String, XContentBuilder, IOException> mappingSupplier,
11331137
Client client, ClusterState state, ActionListener<Boolean> listener) {
11341138
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
11351139
if (aliasOrIndex == null) {
@@ -1153,7 +1157,7 @@ public static void addDocMappingIfMissing(String alias,
11531157
IndexMetaData indexMetaData = state.metaData().index(indicesThatRequireAnUpdate[0]);
11541158
String mappingType = indexMetaData.mapping().type();
11551159

1156-
try (XContentBuilder mapping = mappingSupplier.apply(mappingType, Collections.emptyList())) {
1160+
try (XContentBuilder mapping = mappingSupplier.apply(mappingType)) {
11571161
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
11581162
putMappingRequest.type(mappingType);
11591163
putMappingRequest.source(mapping);

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java

+65
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,25 @@
1010
import com.fasterxml.jackson.core.JsonParser;
1111
import com.fasterxml.jackson.core.JsonToken;
1212
import org.elasticsearch.Version;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
15+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
16+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
17+
import org.elasticsearch.client.Client;
1318
import org.elasticsearch.cluster.ClusterName;
1419
import org.elasticsearch.cluster.ClusterState;
1520
import org.elasticsearch.cluster.metadata.IndexMetaData;
1621
import org.elasticsearch.cluster.metadata.MappingMetaData;
1722
import org.elasticsearch.cluster.metadata.MetaData;
1823
import org.elasticsearch.common.Strings;
1924
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.util.concurrent.ThreadContext;
2026
import org.elasticsearch.common.xcontent.XContentBuilder;
2127
import org.elasticsearch.common.xcontent.XContentParser;
2228
import org.elasticsearch.index.get.GetResult;
2329
import org.elasticsearch.test.ESTestCase;
2430
import org.elasticsearch.test.VersionUtils;
31+
import org.elasticsearch.threadpool.ThreadPool;
2532
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2633
import org.elasticsearch.xpack.core.ml.job.config.Job;
2734
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
@@ -34,6 +41,7 @@
3441
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
3542
import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames;
3643
import org.elasticsearch.xpack.core.ml.job.results.Result;
44+
import org.mockito.ArgumentCaptor;
3745

3846
import java.io.BufferedInputStream;
3947
import java.io.ByteArrayInputStream;
@@ -47,7 +55,16 @@
4755
import java.util.Map;
4856
import java.util.Set;
4957

58+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
5059
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
60+
import static org.hamcrest.Matchers.equalTo;
61+
import static org.mockito.Matchers.any;
62+
import static org.mockito.Matchers.eq;
63+
import static org.mockito.Mockito.doAnswer;
64+
import static org.mockito.Mockito.mock;
65+
import static org.mockito.Mockito.verify;
66+
import static org.mockito.Mockito.verifyNoMoreInteractions;
67+
import static org.mockito.Mockito.when;
5168

5269

5370
public class ElasticsearchMappingsTests extends ESTestCase {
@@ -205,6 +222,54 @@ public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOExcepti
205222
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion()));
206223
}
207224

225+
public void testAddDocMappingIfMissing() throws IOException {
226+
ThreadPool threadPool = mock(ThreadPool.class);
227+
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
228+
Client client = mock(Client.class);
229+
when(client.threadPool()).thenReturn(threadPool);
230+
doAnswer(
231+
invocationOnMock -> {
232+
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
233+
listener.onResponse(new AcknowledgedResponse(true));
234+
return null;
235+
})
236+
.when(client).execute(eq(PutMappingAction.INSTANCE), any(), any(ActionListener.class));
237+
238+
ClusterState clusterState = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("index-name", "0.0"));
239+
ElasticsearchMappings.addDocMappingIfMissing(
240+
"index-name",
241+
ElasticsearchMappingsTests::fakeMapping,
242+
client,
243+
clusterState,
244+
ActionListener.wrap(
245+
ok -> assertTrue(ok),
246+
e -> fail(e.toString())
247+
)
248+
);
249+
250+
ArgumentCaptor<PutMappingRequest> requestCaptor = ArgumentCaptor.forClass(PutMappingRequest.class);
251+
verify(client).threadPool();
252+
verify(client).execute(eq(PutMappingAction.INSTANCE), requestCaptor.capture(), any(ActionListener.class));
253+
verifyNoMoreInteractions(client);
254+
255+
PutMappingRequest request = requestCaptor.getValue();
256+
assertThat(request.type(), equalTo("_doc"));
257+
assertThat(request.indices(), equalTo(new String[] { "index-name" }));
258+
assertThat(request.source(), equalTo("{\"_doc\":{\"properties\":{\"some-field\":{\"type\":\"long\"}}}}"));
259+
}
260+
261+
private static XContentBuilder fakeMapping(String mappingType) throws IOException {
262+
return jsonBuilder()
263+
.startObject()
264+
.startObject(mappingType)
265+
.startObject(ElasticsearchMappings.PROPERTIES)
266+
.startObject("some-field")
267+
.field(ElasticsearchMappings.TYPE, ElasticsearchMappings.LONG)
268+
.endObject()
269+
.endObject()
270+
.endObject()
271+
.endObject();
272+
}
208273

209274
private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
210275
MetaData.Builder metaDataBuilder = MetaData.builder();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java

+41-6
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@
55
*/
66
package org.elasticsearch.xpack.ml.action;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
810
import org.elasticsearch.Version;
911
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.index.IndexResponse;
1013
import org.elasticsearch.action.support.ActionFilters;
1114
import org.elasticsearch.action.support.HandledTransportAction;
1215
import org.elasticsearch.client.Client;
16+
import org.elasticsearch.cluster.ClusterState;
1317
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1418
import org.elasticsearch.cluster.service.ClusterService;
1519
import org.elasticsearch.common.Strings;
@@ -29,6 +33,8 @@
2933
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
3034
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
3135
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
36+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
37+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
3238
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3339
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
3440
import org.elasticsearch.xpack.core.security.SecurityContext;
@@ -43,12 +49,15 @@
4349

4450
import java.io.IOException;
4551
import java.time.Instant;
52+
import java.util.Map;
4653
import java.util.Objects;
4754
import java.util.function.Supplier;
4855

4956
public class TransportPutDataFrameAnalyticsAction
5057
extends HandledTransportAction<PutDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {
5158

59+
private static final Logger logger = LogManager.getLogger(TransportPutDataFrameAnalyticsAction.class);
60+
5261
private final XPackLicenseState licenseState;
5362
private final DataFrameAnalyticsConfigProvider configProvider;
5463
private final ThreadPool threadPool;
@@ -97,6 +106,7 @@ protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
97106
.setCreateTime(Instant.now())
98107
.setVersion(Version.CURRENT)
99108
.build();
109+
100110
if (licenseState.isAuthAllowed()) {
101111
final String username = securityContext.getUser().principal();
102112
RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
@@ -120,9 +130,12 @@ protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
120130

121131
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
122132
} else {
123-
configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap(
124-
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
125-
listener::onFailure
133+
updateDocMappingAndPutConfig(
134+
memoryCappedConfig,
135+
threadPool.getThreadContext().getHeaders(),
136+
ActionListener.wrap(
137+
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
138+
listener::onFailure
126139
));
127140
}
128141
}
@@ -131,9 +144,12 @@ private void handlePrivsResponse(String username, DataFrameAnalyticsConfig memor
131144
HasPrivilegesResponse response,
132145
ActionListener<PutDataFrameAnalyticsAction.Response> listener) throws IOException {
133146
if (response.isCompleteMatch()) {
134-
configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap(
135-
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
136-
listener::onFailure
147+
updateDocMappingAndPutConfig(
148+
memoryCappedConfig,
149+
threadPool.getThreadContext().getHeaders(),
150+
ActionListener.wrap(
151+
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
152+
listener::onFailure
137153
));
138154
} else {
139155
XContentBuilder builder = JsonXContent.contentBuilder();
@@ -150,6 +166,25 @@ private void handlePrivsResponse(String username, DataFrameAnalyticsConfig memor
150166
}
151167
}
152168

169+
private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config,
170+
Map<String, String> headers,
171+
ActionListener<IndexResponse> listener) {
172+
ClusterState clusterState = clusterService.state();
173+
if (clusterState == null) {
174+
logger.warn("Cannot update doc mapping because clusterState == null");
175+
configProvider.put(config, headers, listener);
176+
return;
177+
}
178+
ElasticsearchMappings.addDocMappingIfMissing(
179+
AnomalyDetectorsIndex.configIndexName(),
180+
ElasticsearchMappings::configMapping,
181+
client,
182+
clusterState,
183+
ActionListener.wrap(
184+
unused -> configProvider.put(config, headers, listener),
185+
listener::onFailure));
186+
}
187+
153188
private void validateConfig(DataFrameAnalyticsConfig config) {
154189
if (MlStrings.isValidId(config.getId()) == false) {
155190
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java

+34-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package org.elasticsearch.xpack.ml.action;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
810
import org.elasticsearch.ElasticsearchException;
911
import org.elasticsearch.action.ActionListener;
1012
import org.elasticsearch.action.search.SearchAction;
@@ -36,6 +38,8 @@
3638
import org.elasticsearch.xpack.core.ml.MlMetadata;
3739
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
3840
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
41+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
42+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
3943
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
4044
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
4145
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
@@ -58,6 +62,8 @@
5862

5963
public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {
6064

65+
private static final Logger logger = LogManager.getLogger(TransportPutDatafeedAction.class);
66+
6167
private final XPackLicenseState licenseState;
6268
private final Client client;
6369
private final SecurityContext securityContext;
@@ -111,7 +117,7 @@ protected void masterOperation(PutDatafeedAction.Request request, ClusterState s
111117
.indices(indices);
112118

113119
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
114-
r -> handlePrivsResponse(username, request, r, listener),
120+
r -> handlePrivsResponse(username, request, r, state, listener),
115121
listener::onFailure);
116122

117123
ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
@@ -145,15 +151,17 @@ protected void masterOperation(PutDatafeedAction.Request request, ClusterState s
145151
}
146152

147153
} else {
148-
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
154+
putDatafeed(request, threadPool.getThreadContext().getHeaders(), state, listener);
149155
}
150156
}
151157

152-
private void handlePrivsResponse(String username, PutDatafeedAction.Request request,
158+
private void handlePrivsResponse(String username,
159+
PutDatafeedAction.Request request,
153160
HasPrivilegesResponse response,
161+
ClusterState clusterState,
154162
ActionListener<PutDatafeedAction.Response> listener) throws IOException {
155163
if (response.isCompleteMatch()) {
156-
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
164+
putDatafeed(request, threadPool.getThreadContext().getHeaders(), clusterState, listener);
157165
} else {
158166
XContentBuilder builder = JsonXContent.contentBuilder();
159167
builder.startObject();
@@ -169,7 +177,9 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ
169177
}
170178
}
171179

172-
private void putDatafeed(PutDatafeedAction.Request request, Map<String, String> headers,
180+
private void putDatafeed(PutDatafeedAction.Request request,
181+
Map<String, String> headers,
182+
ClusterState clusterState,
173183
ActionListener<PutDatafeedAction.Response> listener) {
174184

175185
String datafeedId = request.getDatafeed().getId();
@@ -181,13 +191,30 @@ private void putDatafeed(PutDatafeedAction.Request request, Map<String, String>
181191
}
182192
DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations(xContentRegistry));
183193

184-
CheckedConsumer<Boolean, Exception> validationOk = ok -> {
185-
datafeedConfigProvider.putDatafeedConfig(request.getDatafeed(), headers, ActionListener.wrap(
194+
CheckedConsumer<Boolean, Exception> mappingsUpdated = ok -> {
195+
datafeedConfigProvider.putDatafeedConfig(
196+
request.getDatafeed(),
197+
headers,
198+
ActionListener.wrap(
186199
indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())),
187200
listener::onFailure
188201
));
189202
};
190203

204+
CheckedConsumer<Boolean, Exception> validationOk = ok -> {
205+
if (clusterState == null) {
206+
logger.warn("Cannot update doc mapping because clusterState == null");
207+
mappingsUpdated.accept(false);
208+
return;
209+
}
210+
ElasticsearchMappings.addDocMappingIfMissing(
211+
AnomalyDetectorsIndex.configIndexName(),
212+
ElasticsearchMappings::configMapping,
213+
client,
214+
clusterState,
215+
ActionListener.wrap(mappingsUpdated, listener::onFailure));
216+
};
217+
191218
CheckedConsumer<Boolean, Exception> jobOk = ok ->
192219
jobConfigProvider.validateDatafeedJob(request.getDatafeed(), ActionListener.wrap(validationOk, listener::onFailure));
193220

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
4646
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
4747
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
48+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
49+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
4850
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
4951
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
5052
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@@ -254,7 +256,7 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist
254256

255257
ActionListener<Boolean> putJobListener = new ActionListener<Boolean>() {
256258
@Override
257-
public void onResponse(Boolean indicesCreated) {
259+
public void onResponse(Boolean mappingsUpdated) {
258260

259261
jobConfigProvider.putJob(job, ActionListener.wrap(
260262
response -> {
@@ -281,10 +283,23 @@ public void onFailure(Exception e) {
281283
}
282284
};
283285

286+
ActionListener<Boolean> addDocMappingsListener = ActionListener.wrap(
287+
indicesCreated -> {
288+
if (state == null) {
289+
logger.warn("Cannot update doc mapping because clusterState == null");
290+
putJobListener.onResponse(false);
291+
return;
292+
}
293+
ElasticsearchMappings.addDocMappingIfMissing(
294+
AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, state, putJobListener);
295+
},
296+
putJobListener::onFailure
297+
);
298+
284299
ActionListener<List<String>> checkForLeftOverDocs = ActionListener.wrap(
285300
matchedIds -> {
286301
if (matchedIds.isEmpty()) {
287-
jobResultsProvider.createJobResultIndex(job, state, putJobListener);
302+
jobResultsProvider.createJobResultIndex(job, state, addDocMappingsListener);
288303
} else {
289304
// A job has the same Id as one of the group names
290305
// error with the first in the list

0 commit comments

Comments
 (0)