Skip to content

Commit 4d6e037

Browse files
[7.x][ML] Extract creation of DFA field extractor into a factory (#49315) (#49329)
This commit moves the async calls required to retrieve the components that make up `ExtractedFieldsExtractor` out of `DataFrameDataExtractorFactory` and into a dedicated `ExtractorFieldsExtractorFactory` class. A few more refactorings are performed: - The detector no longer needs the results field. Instead, it knows whether to use it or not based on whether the task is restarting. - We pass more accurately whether the task is restarting or not. - The validation of whether fields that have a cardinality limit are valid is now performed in the detector after retrieving the respective cardinalities. Backport of #49315
1 parent 543f5f4 commit 4d6e037

File tree

6 files changed

+339
-298
lines changed

6 files changed

+339
-298
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ private void doEstimateMemoryUsage(String taskId,
8181
DataFrameDataExtractorFactory.createForSourceIndices(
8282
client,
8383
taskId,
84+
true, // We are not interested in first-time run validations here
8485
request.getConfig(),
8586
ActionListener.wrap(
8687
dataExtractorFactory -> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.elasticsearch.xpack.ml.dataframe.MappingsMerger;
6565
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
6666
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
67+
import org.elasticsearch.xpack.ml.dataframe.extractor.ExtractedFieldsDetectorFactory;
6768
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
6869
import org.elasticsearch.xpack.ml.job.JobNodeSelector;
6970
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
@@ -228,33 +229,7 @@ private void getStartContext(String id, ActionListener<StartContext> finalListen
228229

229230
// Step 6. Validate that there are analyzable data in the source index
230231
ActionListener<StartContext> validateMappingsMergeListener = ActionListener.wrap(
231-
startContext -> DataFrameDataExtractorFactory.createForSourceIndices(client,
232-
"validate_source_index_has_rows-" + id,
233-
startContext.config,
234-
ActionListener.wrap(
235-
dataFrameDataExtractorFactory ->
236-
dataFrameDataExtractorFactory
237-
.newExtractor(false)
238-
.collectDataSummaryAsync(ActionListener.wrap(
239-
dataSummary -> {
240-
if (dataSummary.rows == 0) {
241-
finalListener.onFailure(ExceptionsHelper.badRequestException(
242-
"Unable to start {} as no documents in the source indices [{}] contained all the fields "
243-
+ "selected for analysis. If you are relying on automatic field selection then there are "
244-
+ "currently mapped fields that do not exist in any indexed documents, and you will have "
245-
+ "to switch to explicit field selection and include only fields that exist in indexed "
246-
+ "documents.",
247-
id, Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex())
248-
));
249-
} else {
250-
finalListener.onResponse(startContext);
251-
}
252-
},
253-
finalListener::onFailure
254-
)),
255-
finalListener::onFailure
256-
))
257-
,
232+
startContext -> validateSourceIndexHasRows(startContext, finalListener),
258233
finalListener::onFailure
259234
);
260235

@@ -269,9 +244,7 @@ private void getStartContext(String id, ActionListener<StartContext> finalListen
269244
// Step 4. Validate dest index is empty if task is starting for first time
270245
ActionListener<StartContext> toValidateDestEmptyListener = ActionListener.wrap(
271246
startContext -> {
272-
DataFrameAnalyticsTask.StartingState startingState = DataFrameAnalyticsTask.determineStartingState(
273-
startContext.config.getId(), startContext.progressOnStart);
274-
switch (startingState) {
247+
switch (startContext.startingState) {
275248
case FIRST_TIME:
276249
checkDestIndexIsEmptyIfExists(startContext, toValidateMappingsListener);
277250
break;
@@ -285,7 +258,7 @@ private void getStartContext(String id, ActionListener<StartContext> finalListen
285258
"Cannot start because the job has already finished"));
286259
break;
287260
default:
288-
finalListener.onFailure(ExceptionsHelper.serverError("Unexpected starting state " + startingState));
261+
finalListener.onFailure(ExceptionsHelper.serverError("Unexpected starting state " + startContext.startingState));
289262
break;
290263
}
291264
},
@@ -295,9 +268,16 @@ private void getStartContext(String id, ActionListener<StartContext> finalListen
295268
// Step 3. Validate source and dest; check data extraction is possible
296269
ActionListener<StartContext> startContextListener = ActionListener.wrap(
297270
startContext -> {
271+
// Validate the query parses
272+
startContext.config.getSource().getParsedQuery();
273+
274+
// Validate source/dest are valid
298275
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(startContext.config);
299-
DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, startContext.config, ActionListener.wrap(
300-
config -> toValidateDestEmptyListener.onResponse(startContext), finalListener::onFailure));
276+
277+
// Validate extraction is possible
278+
boolean isTaskRestarting = startContext.startingState != DataFrameAnalyticsTask.StartingState.FIRST_TIME;
279+
new ExtractedFieldsDetectorFactory(client).createFromSource(startContext.config, isTaskRestarting, ActionListener.wrap(
280+
extractedFieldsDetector -> toValidateDestEmptyListener.onResponse(startContext), finalListener::onFailure));
301281
},
302282
finalListener::onFailure
303283
);
@@ -313,6 +293,38 @@ private void getStartContext(String id, ActionListener<StartContext> finalListen
313293
configProvider.get(id, getConfigListener);
314294
}
315295

296+
private void validateSourceIndexHasRows(StartContext startContext, ActionListener<StartContext> listener) {
297+
boolean isTaskRestarting = startContext.startingState != DataFrameAnalyticsTask.StartingState.FIRST_TIME;
298+
DataFrameDataExtractorFactory.createForSourceIndices(client,
299+
"validate_source_index_has_rows-" + startContext.config.getId(),
300+
isTaskRestarting,
301+
startContext.config,
302+
ActionListener.wrap(
303+
dataFrameDataExtractorFactory ->
304+
dataFrameDataExtractorFactory
305+
.newExtractor(false)
306+
.collectDataSummaryAsync(ActionListener.wrap(
307+
dataSummary -> {
308+
if (dataSummary.rows == 0) {
309+
listener.onFailure(ExceptionsHelper.badRequestException(
310+
"Unable to start {} as no documents in the source indices [{}] contained all the fields "
311+
+ "selected for analysis. If you are relying on automatic field selection then there are "
312+
+ "currently mapped fields that do not exist in any indexed documents, and you will have "
313+
+ "to switch to explicit field selection and include only fields that exist in indexed "
314+
+ "documents.",
315+
startContext.config.getId(),
316+
Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex())
317+
));
318+
} else {
319+
listener.onResponse(startContext);
320+
}
321+
},
322+
listener::onFailure
323+
)),
324+
listener::onFailure
325+
));
326+
}
327+
316328
private void getProgress(DataFrameAnalyticsConfig config, ActionListener<List<PhaseProgress>> listener) {
317329
GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(config.getId());
318330
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap(
@@ -389,10 +401,12 @@ public void onTimeout(TimeValue timeout) {
389401
private static class StartContext {
390402
private final DataFrameAnalyticsConfig config;
391403
private final List<PhaseProgress> progressOnStart;
404+
private final DataFrameAnalyticsTask.StartingState startingState;
392405

393406
private StartContext(DataFrameAnalyticsConfig config, List<PhaseProgress> progressOnStart) {
394407
this.config = config;
395408
this.progressOnStart = progressOnStart;
409+
this.startingState = DataFrameAnalyticsTask.determineStartingState(config.getId(), progressOnStart);
396410
}
397411
}
398412

0 commit comments

Comments
 (0)