Skip to content

Commit a646f8e

Browse files
authored
[ML] Job in index: Get datafeed and job stats from index (#34645)
1 parent 6d36bb8 commit a646f8e

File tree

8 files changed

+135
-99
lines changed

8 files changed

+135
-99
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
107107
*/
108108

109109
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
110-
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap(
110+
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
111111
expandedJobIds -> {
112112
validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap(
113113
response -> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,33 @@
1616
import org.elasticsearch.cluster.service.ClusterService;
1717
import org.elasticsearch.common.inject.Inject;
1818
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1920
import org.elasticsearch.threadpool.ThreadPool;
2021
import org.elasticsearch.transport.TransportService;
21-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2222
import org.elasticsearch.xpack.core.ml.MlTasks;
2323
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
2424
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
2525
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2626
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
27-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
27+
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
2828

2929
import java.util.List;
30-
import java.util.Set;
3130
import java.util.stream.Collectors;
3231

3332
public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction<GetDatafeedsStatsAction.Request,
3433
GetDatafeedsStatsAction.Response> {
3534

35+
private final DatafeedConfigProvider datafeedConfigProvider;
36+
3637
@Inject
3738
public TransportGetDatafeedsStatsAction(Settings settings, TransportService transportService,
3839
ClusterService clusterService, ThreadPool threadPool,
3940
ActionFilters actionFilters,
40-
IndexNameExpressionResolver indexNameExpressionResolver) {
41+
IndexNameExpressionResolver indexNameExpressionResolver,
42+
DatafeedConfigProvider datafeedConfigProvider) {
4143
super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
4244
indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new);
45+
this.datafeedConfigProvider = datafeedConfigProvider;
4346
}
4447

4548
@Override
@@ -57,16 +60,18 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS
5760
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
5861
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
5962

60-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
61-
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
62-
63-
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
64-
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
65-
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
66-
.collect(Collectors.toList());
67-
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
68-
DatafeedConfig.RESULTS_FIELD);
69-
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
63+
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
64+
expandedDatafeedIds -> {
65+
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
66+
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
67+
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
68+
.collect(Collectors.toList());
69+
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
70+
DatafeedConfig.RESULTS_FIELD);
71+
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
72+
},
73+
listener::onFailure
74+
));
7075
}
7176

7277
private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.common.unit.TimeValue;
2222
import org.elasticsearch.common.util.concurrent.AtomicArray;
23+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2324
import org.elasticsearch.tasks.Task;
2425
import org.elasticsearch.threadpool.ThreadPool;
2526
import org.elasticsearch.transport.TransportService;
26-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2727
import org.elasticsearch.xpack.core.ml.MlTasks;
2828
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
2929
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
@@ -32,7 +32,7 @@
3232
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3333
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
3434
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
35-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
35+
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
3636
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
3737
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
3838

@@ -54,28 +54,37 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
5454
private final ClusterService clusterService;
5555
private final AutodetectProcessManager processManager;
5656
private final JobResultsProvider jobResultsProvider;
57+
private final JobConfigProvider jobConfigProvider;
5758

5859
@Inject
5960
public TransportGetJobsStatsAction(Settings settings, TransportService transportService, ThreadPool threadPool,
6061
ActionFilters actionFilters, ClusterService clusterService,
6162
IndexNameExpressionResolver indexNameExpressionResolver,
62-
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider) {
63+
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider,
64+
JobConfigProvider jobConfigProvider) {
6365
super(settings, GetJobsStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
6466
indexNameExpressionResolver, GetJobsStatsAction.Request::new, GetJobsStatsAction.Response::new,
6567
ThreadPool.Names.MANAGEMENT);
6668
this.clusterService = clusterService;
6769
this.processManager = processManager;
6870
this.jobResultsProvider = jobResultsProvider;
71+
this.jobConfigProvider = jobConfigProvider;
6972
}
7073

7174
@Override
72-
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> listener) {
73-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
74-
request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs())));
75-
ActionListener<GetJobsStatsAction.Response> finalListener = listener;
76-
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata,
77-
request, response, finalListener), listener::onFailure);
78-
super.doExecute(task, request, listener);
75+
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {
76+
77+
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
78+
expandedIds -> {
79+
request.setExpandedJobsIds(new ArrayList<>(expandedIds));
80+
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
81+
response -> gatherStatsForClosedJobs(request, response, finalListener),
82+
finalListener::onFailure
83+
);
84+
super.doExecute(task, request, jobStatsListener);
85+
},
86+
finalListener::onFailure
87+
));
7988
}
8089

8190
@Override
@@ -123,21 +132,20 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo
123132

124133
// Up until now we gathered the stats for jobs that were open,
125134
// This method will fetch the stats for missing jobs, that was stored in the jobs index
126-
void gatherStatsForClosedJobs(MlMetadata mlMetadata, GetJobsStatsAction.Request request, GetJobsStatsAction.Response response,
135+
void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAction.Response response,
127136
ActionListener<GetJobsStatsAction.Response> listener) {
128-
List<String> jobIds = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
129-
request.getExpandedJobsIds(), response.getResponse().results());
130-
if (jobIds.isEmpty()) {
137+
List<String> closedJobIds = determineJobIdsWithoutLiveStats(request.getExpandedJobsIds(), response.getResponse().results());
138+
if (closedJobIds.isEmpty()) {
131139
listener.onResponse(response);
132140
return;
133141
}
134142

135-
AtomicInteger counter = new AtomicInteger(jobIds.size());
136-
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(jobIds.size());
143+
AtomicInteger counter = new AtomicInteger(closedJobIds.size());
144+
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size());
137145
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
138-
for (int i = 0; i < jobIds.size(); i++) {
146+
for (int i = 0; i < closedJobIds.size(); i++) {
139147
int slot = i;
140-
String jobId = jobIds.get(i);
148+
String jobId = closedJobIds.get(i);
141149
gatherForecastStats(jobId, forecastStats -> {
142150
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
143151
JobState jobState = MlTasks.getJobState(jobId, tasks);
@@ -180,11 +188,9 @@ static TimeValue durationToTimeValue(Optional<Duration> duration) {
180188
}
181189
}
182190

183-
static List<String> determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetadata,
184-
List<String> requestedJobIds,
185-
List<GetJobsStatsAction.Response.JobStats> stats) {
191+
static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds,
192+
List<GetJobsStatsAction.Response.JobStats> stats) {
186193
Set<String> excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet());
187-
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) &&
188-
!mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList());
194+
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList());
189195
}
190196
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private void getJobFromClusterState(String jobId, ActionListener<Job> jobListene
172172
public void expandJobs(String expression, boolean allowNoJobs, ActionListener<QueryPage<Job>> jobsListener) {
173173
Map<String, Job> clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state());
174174

175-
jobConfigProvider.expandJobs(expression, allowNoJobs, ActionListener.wrap(
175+
jobConfigProvider.expandJobs(expression, allowNoJobs, false, ActionListener.wrap(
176176
jobBuilders -> {
177177
// Check for duplicate jobs
178178
for (Job.Builder jb : jobBuilders) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.index.engine.DocumentMissingException;
4848
import org.elasticsearch.index.engine.VersionConflictEngineException;
4949
import org.elasticsearch.index.query.BoolQueryBuilder;
50+
import org.elasticsearch.index.query.ExistsQueryBuilder;
5051
import org.elasticsearch.index.query.QueryBuilder;
5152
import org.elasticsearch.index.query.QueryBuilders;
5253
import org.elasticsearch.index.query.TermQueryBuilder;
@@ -490,11 +491,12 @@ public void markJobAsDeleting(String jobId, ActionListener<Boolean> listener) {
490491
* @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}.
491492
* This only applies to wild card expressions, if {@code expression} is not a
492493
* wildcard then setting this true will not suppress the exception
494+
* @param excludeDeleting If true exclude jobs marked as deleting
493495
* @param listener The expanded job Ids listener
494496
*/
495-
public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener<Set<String>> listener) {
497+
public void expandJobsIds(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<Set<String>> listener) {
496498
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
497-
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
499+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
498500
sourceBuilder.sort(Job.ID.getPreferredName());
499501
sourceBuilder.fetchSource(false);
500502
sourceBuilder.docValueField(Job.ID.getPreferredName());
@@ -536,21 +538,22 @@ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener
536538
}
537539

538540
/**
539-
* The same logic as {@link #expandJobsIds(String, boolean, ActionListener)} but
541+
* The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but
540542
* the full anomaly detector job configuration is returned.
541543
*
542-
* See {@link #expandJobsIds(String, boolean, ActionListener)}
544+
* See {@link #expandJobsIds(String, boolean, boolean, ActionListener)}
543545
*
544546
* @param expression the expression to resolve
545547
* @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}.
546548
* This only applies to wild card expressions, if {@code expression} is not a
547549
* wildcard then setting this true will not suppress the exception
550+
* @param excludeDeleting If true exclude jobs marked as deleting
548551
* @param listener The expanded jobs listener
549552
*/
550553
// NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them
551-
public void expandJobs(String expression, boolean allowNoJobs, ActionListener<List<Job.Builder>> listener) {
554+
public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
552555
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
553-
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
556+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
554557
sourceBuilder.sort(Job.ID.getPreferredName());
555558

556559
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
@@ -595,7 +598,7 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener<Li
595598

596599
/**
597600
* Expands the list of job group Ids to the set of jobs which are members of the groups.
598-
* Unlike {@link #expandJobsIds(String, boolean, ActionListener)} it is not an error
601+
* Unlike {@link #expandJobsIds(String, boolean, boolean, ActionListener)} it is not an error
599602
* if a group Id does not exist.
600603
* Wildcard expansion of group Ids is not supported.
601604
*
@@ -699,9 +702,9 @@ private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IO
699702
}
700703
}
701704

702-
private QueryBuilder buildQuery(String [] tokens) {
705+
private QueryBuilder buildQuery(String [] tokens, boolean excludeDeleting) {
703706
QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
704-
if (Strings.isAllOrWildcard(tokens)) {
707+
if (Strings.isAllOrWildcard(tokens) && excludeDeleting == false) {
705708
// match all
706709
return jobQuery;
707710
}
@@ -710,6 +713,16 @@ private QueryBuilder buildQuery(String [] tokens) {
710713
boolQueryBuilder.filter(jobQuery);
711714
BoolQueryBuilder shouldQueries = new BoolQueryBuilder();
712715

716+
if (excludeDeleting) {
717+
// field exists only when the job is marked as deleting
718+
shouldQueries.mustNot(new ExistsQueryBuilder(Job.DELETING.getPreferredName()));
719+
720+
if (Strings.isAllOrWildcard(tokens)) {
721+
boolQueryBuilder.filter(shouldQueries);
722+
return boolQueryBuilder;
723+
}
724+
}
725+
713726
List<String> terms = new ArrayList<>();
714727
for (String token : tokens) {
715728
if (Regex.isSimpleMatchPattern(token)) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,11 @@ private void mockDatafeedConfigFindDatafeeds(Set<String> datafeedIds) {
288288

289289
private void mockJobConfigProviderExpandIds(Set<String> expandedIds) {
290290
doAnswer(invocation -> {
291-
ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[2];
291+
ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[3];
292292
listener.onResponse(expandedIds);
293293

294294
return null;
295-
}).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), any(ActionListener.class));
295+
}).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(ActionListener.class));
296296
}
297297

298298
}

0 commit comments

Comments
 (0)