Skip to content

Commit e8c3951

Browse files
committed
[ML] Job in index: Get datafeed and job stats from index (elastic#34645)
1 parent 040da13 commit e8c3951

File tree

8 files changed

+136
-101
lines changed

8 files changed

+136
-101
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
107107
*/
108108

109109
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
110-
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap(
110+
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
111111
expandedJobIds -> {
112112
validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap(
113113
response -> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,32 @@
1616
import org.elasticsearch.cluster.service.ClusterService;
1717
import org.elasticsearch.common.inject.Inject;
1818
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1920
import org.elasticsearch.threadpool.ThreadPool;
2021
import org.elasticsearch.transport.TransportService;
21-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2222
import org.elasticsearch.xpack.core.ml.MlTasks;
2323
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
2424
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
2525
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2626
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
27-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
27+
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
2828

2929
import java.util.List;
30-
import java.util.Set;
3130
import java.util.stream.Collectors;
3231

3332
public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction<GetDatafeedsStatsAction.Request,
3433
GetDatafeedsStatsAction.Response> {
3534

35+
private final DatafeedConfigProvider datafeedConfigProvider;
36+
3637
@Inject
3738
public TransportGetDatafeedsStatsAction(Settings settings, TransportService transportService, ClusterService clusterService,
3839
ThreadPool threadPool, ActionFilters actionFilters,
39-
IndexNameExpressionResolver indexNameExpressionResolver) {
40+
IndexNameExpressionResolver indexNameExpressionResolver,
41+
DatafeedConfigProvider datafeedConfigProvider) {
4042
super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
4143
GetDatafeedsStatsAction.Request::new, indexNameExpressionResolver);
44+
this.datafeedConfigProvider = datafeedConfigProvider;
4245
}
4346

4447
@Override
@@ -56,16 +59,18 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS
5659
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
5760
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
5861

59-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
60-
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
61-
62-
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
63-
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
64-
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
65-
.collect(Collectors.toList());
66-
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
67-
DatafeedConfig.RESULTS_FIELD);
68-
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
62+
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
63+
expandedDatafeedIds -> {
64+
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
65+
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
66+
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
67+
.collect(Collectors.toList());
68+
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
69+
DatafeedConfig.RESULTS_FIELD);
70+
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
71+
},
72+
listener::onFailure
73+
));
6974
}
7075

7176
private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(String datafeedId, ClusterState state,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import org.elasticsearch.common.settings.Settings;
2020
import org.elasticsearch.common.unit.TimeValue;
2121
import org.elasticsearch.common.util.concurrent.AtomicArray;
22+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2223
import org.elasticsearch.tasks.Task;
2324
import org.elasticsearch.threadpool.ThreadPool;
2425
import org.elasticsearch.transport.TransportService;
25-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2626
import org.elasticsearch.xpack.core.ml.MlTasks;
2727
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
2828
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
@@ -31,7 +31,7 @@
3131
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
3232
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
3333
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
34-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
34+
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
3535
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
3636
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
3737

@@ -53,27 +53,35 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
5353
private final ClusterService clusterService;
5454
private final AutodetectProcessManager processManager;
5555
private final JobResultsProvider jobResultsProvider;
56+
private final JobConfigProvider jobConfigProvider;
5657

5758
@Inject
5859
public TransportGetJobsStatsAction(Settings settings, TransportService transportService,
5960
ActionFilters actionFilters, ClusterService clusterService,
60-
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider) {
61+
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider,
62+
JobConfigProvider jobConfigProvider) {
6163
super(settings, GetJobsStatsAction.NAME, clusterService, transportService, actionFilters,
62-
GetJobsStatsAction.Request::new, GetJobsStatsAction.Response::new,
63-
ThreadPool.Names.MANAGEMENT);
64+
GetJobsStatsAction.Request::new, GetJobsStatsAction.Response::new, ThreadPool.Names.MANAGEMENT);
6465
this.clusterService = clusterService;
6566
this.processManager = processManager;
6667
this.jobResultsProvider = jobResultsProvider;
68+
this.jobConfigProvider = jobConfigProvider;
6769
}
6870

6971
@Override
70-
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> listener) {
71-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
72-
request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs())));
73-
ActionListener<GetJobsStatsAction.Response> finalListener = listener;
74-
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata,
75-
request, response, finalListener), listener::onFailure);
76-
super.doExecute(task, request, listener);
72+
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {
73+
74+
jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap(
75+
expandedIds -> {
76+
request.setExpandedJobsIds(new ArrayList<>(expandedIds));
77+
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
78+
response -> gatherStatsForClosedJobs(request, response, finalListener),
79+
finalListener::onFailure
80+
);
81+
super.doExecute(task, request, jobStatsListener);
82+
},
83+
finalListener::onFailure
84+
));
7785
}
7886

7987
@Override
@@ -121,21 +129,20 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo
121129

122130
// Up until now we gathered the stats for jobs that were open,
123131
// This method will fetch the stats for missing jobs, that was stored in the jobs index
124-
void gatherStatsForClosedJobs(MlMetadata mlMetadata, GetJobsStatsAction.Request request, GetJobsStatsAction.Response response,
132+
void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAction.Response response,
125133
ActionListener<GetJobsStatsAction.Response> listener) {
126-
List<String> jobIds = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
127-
request.getExpandedJobsIds(), response.getResponse().results());
128-
if (jobIds.isEmpty()) {
134+
List<String> closedJobIds = determineJobIdsWithoutLiveStats(request.getExpandedJobsIds(), response.getResponse().results());
135+
if (closedJobIds.isEmpty()) {
129136
listener.onResponse(response);
130137
return;
131138
}
132139

133-
AtomicInteger counter = new AtomicInteger(jobIds.size());
134-
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(jobIds.size());
140+
AtomicInteger counter = new AtomicInteger(closedJobIds.size());
141+
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size());
135142
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
136-
for (int i = 0; i < jobIds.size(); i++) {
143+
for (int i = 0; i < closedJobIds.size(); i++) {
137144
int slot = i;
138-
String jobId = jobIds.get(i);
145+
String jobId = closedJobIds.get(i);
139146
gatherForecastStats(jobId, forecastStats -> {
140147
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
141148
JobState jobState = MlTasks.getJobState(jobId, tasks);
@@ -178,11 +185,9 @@ static TimeValue durationToTimeValue(Optional<Duration> duration) {
178185
}
179186
}
180187

181-
static List<String> determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetadata,
182-
List<String> requestedJobIds,
183-
List<GetJobsStatsAction.Response.JobStats> stats) {
188+
static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds,
189+
List<GetJobsStatsAction.Response.JobStats> stats) {
184190
Set<String> excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet());
185-
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) &&
186-
!mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList());
191+
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList());
187192
}
188193
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private void getJobFromClusterState(String jobId, ActionListener<Job> jobListene
172172
public void expandJobs(String expression, boolean allowNoJobs, ActionListener<QueryPage<Job>> jobsListener) {
173173
Map<String, Job> clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state());
174174

175-
jobConfigProvider.expandJobs(expression, allowNoJobs, ActionListener.wrap(
175+
jobConfigProvider.expandJobs(expression, allowNoJobs, false, ActionListener.wrap(
176176
jobBuilders -> {
177177
// Check for duplicate jobs
178178
for (Job.Builder jb : jobBuilders) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.index.engine.DocumentMissingException;
4848
import org.elasticsearch.index.engine.VersionConflictEngineException;
4949
import org.elasticsearch.index.query.BoolQueryBuilder;
50+
import org.elasticsearch.index.query.ExistsQueryBuilder;
5051
import org.elasticsearch.index.query.QueryBuilder;
5152
import org.elasticsearch.index.query.QueryBuilders;
5253
import org.elasticsearch.index.query.TermQueryBuilder;
@@ -489,11 +490,12 @@ public void markJobAsDeleting(String jobId, ActionListener<Boolean> listener) {
489490
* @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}.
490491
* This only applies to wild card expressions, if {@code expression} is not a
491492
* wildcard then setting this true will not suppress the exception
493+
* @param excludeDeleting If true exclude jobs marked as deleting
492494
* @param listener The expanded job Ids listener
493495
*/
494-
public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener<Set<String>> listener) {
496+
public void expandJobsIds(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<Set<String>> listener) {
495497
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
496-
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
498+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
497499
sourceBuilder.sort(Job.ID.getPreferredName());
498500
sourceBuilder.fetchSource(false);
499501
sourceBuilder.docValueField(Job.ID.getPreferredName());
@@ -535,21 +537,22 @@ public void expandJobsIds(String expression, boolean allowNoJobs, ActionListener
535537
}
536538

537539
/**
538-
* The same logic as {@link #expandJobsIds(String, boolean, ActionListener)} but
540+
* The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but
539541
* the full anomaly detector job configuration is returned.
540542
*
541-
* See {@link #expandJobsIds(String, boolean, ActionListener)}
543+
* See {@link #expandJobsIds(String, boolean, boolean, ActionListener)}
542544
*
543545
* @param expression the expression to resolve
544546
* @param allowNoJobs if {@code false}, an error is thrown when no name matches the {@code expression}.
545547
* This only applies to wild card expressions, if {@code expression} is not a
546548
* wildcard then setting this true will not suppress the exception
549+
* @param excludeDeleting If true exclude jobs marked as deleting
547550
* @param listener The expanded jobs listener
548551
*/
549552
// NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them
550-
public void expandJobs(String expression, boolean allowNoJobs, ActionListener<List<Job.Builder>> listener) {
553+
public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
551554
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
552-
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
555+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
553556
sourceBuilder.sort(Job.ID.getPreferredName());
554557

555558
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
@@ -594,7 +597,7 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener<Li
594597

595598
/**
596599
* Expands the list of job group Ids to the set of jobs which are members of the groups.
597-
* Unlike {@link #expandJobsIds(String, boolean, ActionListener)} it is not an error
600+
* Unlike {@link #expandJobsIds(String, boolean, boolean, ActionListener)} it is not an error
598601
* if a group Id does not exist.
599602
* Wildcard expansion of group Ids is not supported.
600603
*
@@ -698,9 +701,9 @@ private Job.Builder parseJobLenientlyFromSource(BytesReference source) throws IO
698701
}
699702
}
700703

701-
private QueryBuilder buildQuery(String [] tokens) {
704+
private QueryBuilder buildQuery(String [] tokens, boolean excludeDeleting) {
702705
QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
703-
if (Strings.isAllOrWildcard(tokens)) {
706+
if (Strings.isAllOrWildcard(tokens) && excludeDeleting == false) {
704707
// match all
705708
return jobQuery;
706709
}
@@ -709,6 +712,16 @@ private QueryBuilder buildQuery(String [] tokens) {
709712
boolQueryBuilder.filter(jobQuery);
710713
BoolQueryBuilder shouldQueries = new BoolQueryBuilder();
711714

715+
if (excludeDeleting) {
716+
// field exists only when the job is marked as deleting
717+
shouldQueries.mustNot(new ExistsQueryBuilder(Job.DELETING.getPreferredName()));
718+
719+
if (Strings.isAllOrWildcard(tokens)) {
720+
boolQueryBuilder.filter(shouldQueries);
721+
return boolQueryBuilder;
722+
}
723+
}
724+
712725
List<String> terms = new ArrayList<>();
713726
for (String token : tokens) {
714727
if (Regex.isSimpleMatchPattern(token)) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,11 @@ private void mockDatafeedConfigFindDatafeeds(Set<String> datafeedIds) {
288288

289289
private void mockJobConfigProviderExpandIds(Set<String> expandedIds) {
290290
doAnswer(invocation -> {
291-
ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[2];
291+
ActionListener<Set<String>> listener = (ActionListener<Set<String>>) invocation.getArguments()[3];
292292
listener.onResponse(expandedIds);
293293

294294
return null;
295-
}).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), any(ActionListener.class));
295+
}).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(ActionListener.class));
296296
}
297297

298298
}

0 commit comments

Comments
 (0)