Skip to content

Commit 1cac6ba

Browse files
authored
[ML] Job in Index: Convert get calendar events to index docs (#34710)
1 parent a646f8e commit 1cac6ba

File tree

5 files changed

+77
-29
lines changed

5 files changed

+77
-29
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1313
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1414

15+
import java.util.Collections;
1516
import java.util.List;
1617
import java.util.Set;
1718
import java.util.stream.Collectors;
@@ -83,10 +84,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
8384
* All anomaly detector jobs are returned regardless of the status of the
8485
* task (OPEN, CLOSED, FAILED etc).
8586
*
86-
* @param tasks Persistent tasks
87+
* @param tasks Persistent tasks. If null an empty set is returned.
8788
* @return The job Ids of anomaly detector job tasks
8889
*/
89-
public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
90+
public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tasks) {
91+
if (tasks == null) {
92+
return Collections.emptySet();
93+
}
94+
9095
return tasks.findTasks(JOB_TASK_NAME, task -> true)
9196
.stream()
9297
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java

+8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public void testGetJobState() {
3131
assertEquals(JobState.OPENED, MlTasks.getJobState("foo", tasksBuilder.build()));
3232
}
3333

34+
public void testGetJobState_GivenNull() {
35+
assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", null));
36+
}
37+
3438
public void testGetDatefeedState() {
3539
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
3640
// A missing task is a stopped datafeed
@@ -83,6 +87,10 @@ public void testOpenJobIds() {
8387
assertThat(MlTasks.openJobIds(tasksBuilder.build()), containsInAnyOrder("foo-1", "bar"));
8488
}
8589

90+
public void testOpenJobIds_GivenNull() {
91+
assertThat(MlTasks.openJobIds(null), empty());
92+
}
93+
8694
public void testTaskExistsForJob() {
8795
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
8896
assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java

+25-26
Original file line numberDiff line numberDiff line change
@@ -8,41 +8,38 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.support.ActionFilters;
1010
import org.elasticsearch.action.support.HandledTransportAction;
11-
import org.elasticsearch.cluster.ClusterState;
1211
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
13-
import org.elasticsearch.cluster.service.ClusterService;
1412
import org.elasticsearch.common.inject.Inject;
1513
import org.elasticsearch.common.settings.Settings;
1614
import org.elasticsearch.threadpool.ThreadPool;
1715
import org.elasticsearch.transport.TransportService;
18-
import org.elasticsearch.xpack.core.ml.MlMetadata;
1916
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
2017
import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction;
2118
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
2219
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
2320
import org.elasticsearch.xpack.core.ml.job.config.Job;
24-
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
2521
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
22+
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
2623
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
24+
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
2725

2826
import java.util.Collections;
29-
import java.util.List;
3027

3128
public class TransportGetCalendarEventsAction extends HandledTransportAction<GetCalendarEventsAction.Request,
3229
GetCalendarEventsAction.Response> {
3330

3431
private final JobResultsProvider jobResultsProvider;
35-
private final ClusterService clusterService;
32+
private final JobConfigProvider jobConfigProvider;
3633

3734
@Inject
3835
public TransportGetCalendarEventsAction(Settings settings, ThreadPool threadPool,
3936
TransportService transportService, ActionFilters actionFilters,
4037
IndexNameExpressionResolver indexNameExpressionResolver,
41-
ClusterService clusterService, JobResultsProvider jobResultsProvider) {
38+
JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) {
4239
super(settings, GetCalendarEventsAction.NAME, threadPool, transportService, actionFilters,
4340
indexNameExpressionResolver, GetCalendarEventsAction.Request::new);
4441
this.jobResultsProvider = jobResultsProvider;
45-
this.clusterService = clusterService;
42+
this.jobConfigProvider = jobConfigProvider;
4643
}
4744

4845
@Override
@@ -68,26 +65,28 @@ protected void doExecute(GetCalendarEventsAction.Request request,
6865
);
6966

7067
if (request.getJobId() != null) {
71-
ClusterState state = clusterService.state();
72-
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state);
73-
74-
List<String> jobGroups;
75-
String requestId = request.getJobId();
7668

77-
Job job = currentMlMetadata.getJobs().get(request.getJobId());
78-
if (job == null) {
79-
// Check if the requested id is a job group
80-
if (currentMlMetadata.isGroupOrJob(request.getJobId()) == false) {
81-
listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
82-
return;
83-
}
84-
jobGroups = Collections.singletonList(request.getJobId());
85-
requestId = null;
86-
} else {
87-
jobGroups = job.getGroups();
88-
}
69+
jobConfigProvider.getJob(request.getJobId(), ActionListener.wrap(
70+
jobBuiler -> {
71+
Job job = jobBuiler.build();
72+
jobResultsProvider.scheduledEventsForJob(request.getJobId(), job.getGroups(), query, eventsListener);
8973

90-
jobResultsProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener);
74+
},
75+
jobNotFound -> {
76+
// is the request Id a group?
77+
jobConfigProvider.groupExists(request.getJobId(), ActionListener.wrap(
78+
groupExists -> {
79+
if (groupExists) {
80+
jobResultsProvider.scheduledEventsForJob(
81+
null, Collections.singletonList(request.getJobId()), query, eventsListener);
82+
} else {
83+
listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
84+
}
85+
},
86+
listener::onFailure
87+
));
88+
}
89+
));
9190
} else {
9291
jobResultsProvider.scheduledEvents(query, eventsListener);
9392
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java

+35
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,41 @@ public void expandGroupIds(List<String> groupIds, ActionListener<Set<String>> l
631631
, client::search);
632632
}
633633

634+
/**
635+
* Check if a group exists, that is there exists a job that is a member of
636+
* the group. If there are one or more jobs that define the group then
637+
* the listener responds with true else false.
638+
*
639+
* @param groupId The group Id
640+
* @param listener Returns true, false or a failure
641+
*/
642+
public void groupExists(String groupId, ActionListener<Boolean> listener) {
643+
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
644+
boolQueryBuilder.filter(new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE));
645+
boolQueryBuilder.filter(new TermQueryBuilder(Job.GROUPS.getPreferredName(), groupId));
646+
647+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
648+
.query(boolQueryBuilder);
649+
sourceBuilder.fetchSource(false);
650+
651+
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
652+
.setSize(0)
653+
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
654+
.setSource(sourceBuilder).request();
655+
656+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
657+
ActionListener.<SearchResponse>wrap(
658+
response -> {
659+
listener.onResponse(response.getHits().totalHits > 0);
660+
},
661+
listener::onFailure)
662+
, client::search);
663+
}
664+
665+
/**
666+
* Find jobs with custom rules defined.
667+
* @param listener Jobs listener
668+
*/
634669
public void findJobsWithCustomRules(ActionListener<List<Job>> listener) {
635670
String customRulesPath = Strings.collectionToDelimitedString(Arrays.asList(Job.ANALYSIS_CONFIG.getPreferredName(),
636671
AnalysisConfig.DETECTORS.getPreferredName(), Detector.CUSTOM_RULES_FIELD.getPreferredName()), ".");

x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ private void waitForTemplates() throws Exception {
8585
List<String> templates = new ArrayList<>();
8686
templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
8787
AnomalyDetectorsIndex.jobStateIndexName(),
88-
AnomalyDetectorsIndex.jobResultsIndexPrefix()));
88+
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
89+
AnomalyDetectorsIndex.configIndexName()));
8990

9091
for (String template : templates) {
9192
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),

0 commit comments

Comments
 (0)