diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 46685001153d7..5ba04fcc4087c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -83,10 +84,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis * All anomaly detector jobs are returned regardless of the status of the * task (OPEN, CLOSED, FAILED etc). * - * @param tasks Persistent tasks + * @param tasks Persistent tasks. If null an empty set is returned. * @return The job Ids of anomaly detector job tasks */ - public static Set openJobIds(PersistentTasksCustomMetaData tasks) { + public static Set openJobIds(@Nullable PersistentTasksCustomMetaData tasks) { + if (tasks == null) { + return Collections.emptySet(); + } + return tasks.findTasks(JOB_TASK_NAME, task -> true) .stream() .map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length())) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index e2db7c3a30951..c3579fe4173b8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -31,6 +31,10 @@ public void testGetJobState() { assertEquals(JobState.OPENED, MlTasks.getJobState("foo", tasksBuilder.build())); } + public void testGetJobState_GivenNull() { + assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", null)); + } + public void testGetDatefeedState() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); // A missing task is a stopped datafeed @@ -83,6 +87,10 @@ public void testOpenJobIds() { assertThat(MlTasks.openJobIds(tasksBuilder.build()), containsInAnyOrder("foo-1", "bar")); } + public void testOpenJobIds_GivenNull() { + assertThat(MlTasks.openJobIds(null), empty()); + } + public void testTaskExistsForJob() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java index 18a507a77250e..b37b25fcacfbc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java @@ -8,41 +8,38 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction; import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; +import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import java.util.Collections; -import java.util.List; public class TransportGetCalendarEventsAction extends HandledTransportAction { private final JobResultsProvider jobResultsProvider; - private final ClusterService clusterService; + private final JobConfigProvider jobConfigProvider; @Inject public TransportGetCalendarEventsAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, JobResultsProvider jobResultsProvider) { + JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) { super(settings, GetCalendarEventsAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetCalendarEventsAction.Request::new); this.jobResultsProvider = jobResultsProvider; - this.clusterService = clusterService; + this.jobConfigProvider = jobConfigProvider; } @Override @@ -68,26 +65,28 @@ protected void doExecute(GetCalendarEventsAction.Request request, ); if (request.getJobId() != null) { - ClusterState state = clusterService.state(); - MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state); - - List jobGroups; - String requestId = request.getJobId(); - Job job = currentMlMetadata.getJobs().get(request.getJobId()); - if (job == null) { - // Check if the requested id is a job group - if (currentMlMetadata.isGroupOrJob(request.getJobId()) == false) { - listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId())); - return; - } - jobGroups = Collections.singletonList(request.getJobId()); - requestId = null; - } else { - jobGroups = job.getGroups(); - } + jobConfigProvider.getJob(request.getJobId(), ActionListener.wrap( + jobBuiler -> { + Job job = jobBuiler.build(); + jobResultsProvider.scheduledEventsForJob(request.getJobId(), job.getGroups(), query, eventsListener); - jobResultsProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener); + }, + jobNotFound -> { + // is the request Id a group? + jobConfigProvider.groupExists(request.getJobId(), ActionListener.wrap( + groupExists -> { + if (groupExists) { + jobResultsProvider.scheduledEventsForJob( + null, Collections.singletonList(request.getJobId()), query, eventsListener); + } else { + listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId())); + } + }, + listener::onFailure + )); + } + )); } else { jobResultsProvider.scheduledEvents(query, eventsListener); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 88041d2cbc1e1..3007f9241221c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -631,6 +631,41 @@ public void expandGroupIds(List groupIds, ActionListener> l , client::search); } + /** + * Check if a group exists, that is there exists a job that is a member of + * the group. If there are one or more jobs that define the group then + * the listener responds with true else false. + * + * @param groupId The group Id + * @param listener Returns true, false or a failure + */ + public void groupExists(String groupId, ActionListener listener) { + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + boolQueryBuilder.filter(new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE)); + boolQueryBuilder.filter(new TermQueryBuilder(Job.GROUPS.getPreferredName(), groupId)); + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() + .query(boolQueryBuilder); + sourceBuilder.fetchSource(false); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setSize(0) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + listener.onResponse(response.getHits().totalHits > 0); + }, + listener::onFailure) + , client::search); + } + + /** + * Find jobs with custom rules defined. + * @param listener Jobs listener + */ public void findJobsWithCustomRules(ActionListener> listener) { String customRulesPath = Strings.collectionToDelimitedString(Arrays.asList(Job.ANALYSIS_CONFIG.getPreferredName(), AnalysisConfig.DETECTORS.getPreferredName(), Detector.CUSTOM_RULES_FIELD.getPreferredName()), "."); diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index 5ddf1b0e8fcba..9d62119f3d471 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -85,7 +85,8 @@ private void waitForTemplates() throws Exception { List templates = new ArrayList<>(); templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME, AnomalyDetectorsIndex.jobStateIndexName(), - AnomalyDetectorsIndex.jobResultsIndexPrefix())); + AnomalyDetectorsIndex.jobResultsIndexPrefix(), + AnomalyDetectorsIndex.configIndexName())); for (String template : templates) { awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),