Skip to content

Commit 7fd4b9d

Browse files
committed
[ML] Job in Index: Convert get calendar events to index docs (elastic#34710)
1 parent e8c3951 commit 7fd4b9d

File tree

5 files changed

+77
-29
lines changed

5 files changed

+77
-29
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1313
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1414

15+
import java.util.Collections;
1516
import java.util.List;
1617
import java.util.Set;
1718
import java.util.stream.Collectors;
@@ -83,10 +84,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
8384
* All anomaly detector jobs are returned regardless of the status of the
8485
* task (OPEN, CLOSED, FAILED etc).
8586
*
86-
* @param tasks Persistent tasks
87+
* @param tasks Persistent tasks. If null an empty set is returned.
8788
* @return The job Ids of anomaly detector job tasks
8889
*/
89-
public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
90+
public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tasks) {
91+
if (tasks == null) {
92+
return Collections.emptySet();
93+
}
94+
9095
return tasks.findTasks(JOB_TASK_NAME, task -> true)
9196
.stream()
9297
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public void testGetJobState() {
3131
assertEquals(JobState.OPENED, MlTasks.getJobState("foo", tasksBuilder.build()));
3232
}
3333

34+
public void testGetJobState_GivenNull() {
35+
assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", null));
36+
}
37+
3438
public void testGetDatefeedState() {
3539
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
3640
// A missing task is a stopped datafeed
@@ -83,6 +87,10 @@ public void testOpenJobIds() {
8387
assertThat(MlTasks.openJobIds(tasksBuilder.build()), containsInAnyOrder("foo-1", "bar"));
8488
}
8589

90+
public void testOpenJobIds_GivenNull() {
91+
assertThat(MlTasks.openJobIds(null), empty());
92+
}
93+
8694
public void testTaskExistsForJob() {
8795
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
8896
assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,40 +8,37 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.support.ActionFilters;
1010
import org.elasticsearch.action.support.HandledTransportAction;
11-
import org.elasticsearch.cluster.ClusterState;
12-
import org.elasticsearch.cluster.service.ClusterService;
1311
import org.elasticsearch.common.inject.Inject;
1412
import org.elasticsearch.common.settings.Settings;
1513
import org.elasticsearch.tasks.Task;
1614
import org.elasticsearch.transport.TransportService;
17-
import org.elasticsearch.xpack.core.ml.MlMetadata;
1815
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
1916
import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction;
2017
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
2118
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
2219
import org.elasticsearch.xpack.core.ml.job.config.Job;
2320
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
21+
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
2422
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
2523
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
2624

2725
import java.util.Collections;
28-
import java.util.List;
2926
import java.util.function.Supplier;
3027

3128
public class TransportGetCalendarEventsAction extends HandledTransportAction<GetCalendarEventsAction.Request,
3229
GetCalendarEventsAction.Response> {
3330

3431
private final JobResultsProvider jobResultsProvider;
35-
private final ClusterService clusterService;
32+
private final JobConfigProvider jobConfigProvider;
3633

3734
@Inject
3835
public TransportGetCalendarEventsAction(Settings settings, TransportService transportService,
39-
ActionFilters actionFilters, ClusterService clusterService,
40-
JobResultsProvider jobResultsProvider) {
36+
ActionFilters actionFilters, JobResultsProvider jobResultsProvider,
37+
JobConfigProvider jobConfigProvider) {
4138
super(settings, GetCalendarEventsAction.NAME, transportService, actionFilters,
4239
(Supplier<GetCalendarEventsAction.Request>) GetCalendarEventsAction.Request::new);
4340
this.jobResultsProvider = jobResultsProvider;
44-
this.clusterService = clusterService;
41+
this.jobConfigProvider = jobConfigProvider;
4542
}
4643

4744
@Override
@@ -67,26 +64,28 @@ protected void doExecute(Task task, GetCalendarEventsAction.Request request,
6764
);
6865

6966
if (request.getJobId() != null) {
70-
ClusterState state = clusterService.state();
71-
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state);
7267

73-
List<String> jobGroups;
74-
String requestId = request.getJobId();
68+
jobConfigProvider.getJob(request.getJobId(), ActionListener.wrap(
69+
jobBuiler -> {
70+
Job job = jobBuiler.build();
71+
jobResultsProvider.scheduledEventsForJob(request.getJobId(), job.getGroups(), query, eventsListener);
7572

76-
Job job = currentMlMetadata.getJobs().get(request.getJobId());
77-
if (job == null) {
78-
// Check if the requested id is a job group
79-
if (currentMlMetadata.isGroupOrJob(request.getJobId()) == false) {
80-
listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
81-
return;
82-
}
83-
jobGroups = Collections.singletonList(request.getJobId());
84-
requestId = null;
85-
} else {
86-
jobGroups = job.getGroups();
87-
}
88-
89-
jobResultsProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener);
73+
},
74+
jobNotFound -> {
75+
// is the request Id a group?
76+
jobConfigProvider.groupExists(request.getJobId(), ActionListener.wrap(
77+
groupExists -> {
78+
if (groupExists) {
79+
jobResultsProvider.scheduledEventsForJob(
80+
null, Collections.singletonList(request.getJobId()), query, eventsListener);
81+
} else {
82+
listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
83+
}
84+
},
85+
listener::onFailure
86+
));
87+
}
88+
));
9089
} else {
9190
jobResultsProvider.scheduledEvents(query, eventsListener);
9291
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,41 @@ public void expandGroupIds(List<String> groupIds, ActionListener<Set<String>> l
630630
, client::search);
631631
}
632632

633+
/**
634+
* Check if a group exists, that is there exists a job that is a member of
635+
* the group. If there are one or more jobs that define the group then
636+
* the listener responds with true else false.
637+
*
638+
* @param groupId The group Id
639+
* @param listener Returns true, false or a failure
640+
*/
641+
public void groupExists(String groupId, ActionListener<Boolean> listener) {
642+
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
643+
boolQueryBuilder.filter(new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE));
644+
boolQueryBuilder.filter(new TermQueryBuilder(Job.GROUPS.getPreferredName(), groupId));
645+
646+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
647+
.query(boolQueryBuilder);
648+
sourceBuilder.fetchSource(false);
649+
650+
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
651+
.setSize(0)
652+
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
653+
.setSource(sourceBuilder).request();
654+
655+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
656+
ActionListener.<SearchResponse>wrap(
657+
response -> {
658+
listener.onResponse(response.getHits().totalHits > 0);
659+
},
660+
listener::onFailure)
661+
, client::search);
662+
}
663+
664+
/**
665+
* Find jobs with custom rules defined.
666+
* @param listener Jobs listener
667+
*/
633668
public void findJobsWithCustomRules(ActionListener<List<Job>> listener) {
634669
String customRulesPath = Strings.collectionToDelimitedString(Arrays.asList(Job.ANALYSIS_CONFIG.getPreferredName(),
635670
AnalysisConfig.DETECTORS.getPreferredName(), Detector.CUSTOM_RULES_FIELD.getPreferredName()), ".");

x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ private void waitForTemplates() throws Exception {
8686
List<String> templates = new ArrayList<>();
8787
templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
8888
AnomalyDetectorsIndex.jobStateIndexName(),
89-
AnomalyDetectorsIndex.jobResultsIndexPrefix()));
89+
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
90+
AnomalyDetectorsIndex.configIndexName()));
9091

9192
for (String template : templates) {
9293
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),

0 commit comments

Comments
 (0)