Skip to content

[ML] Job in Index: Convert get calendar events to index docs #34710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -83,10 +84,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
* All anomaly detector jobs are returned regardless of the status of the
* task (OPEN, CLOSED, FAILED etc).
*
* @param tasks Persistent tasks
* @param tasks Persistent tasks. If null an empty set is returned.
* @return The job Ids of anomaly detector job tasks
*/
public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
public static Set<String> openJobIds(@Nullable PersistentTasksCustomMetaData tasks) {
if (tasks == null) {
return Collections.emptySet();
}

return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public void testGetJobState() {
assertEquals(JobState.OPENED, MlTasks.getJobState("foo", tasksBuilder.build()));
}

public void testGetJobState_GivenNull() {
assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", null));
}

public void testGetDatefeedState() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
// A missing task is a stopped datafeed
Expand Down Expand Up @@ -83,6 +87,10 @@ public void testOpenJobIds() {
assertThat(MlTasks.openJobIds(tasksBuilder.build()), containsInAnyOrder("foo-1", "bar"));
}

public void testOpenJobIds_GivenNull() {
assertThat(MlTasks.openJobIds(null), empty());
}

public void testTaskExistsForJob() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,38 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;

import java.util.Collections;
import java.util.List;

public class TransportGetCalendarEventsAction extends HandledTransportAction<GetCalendarEventsAction.Request,
GetCalendarEventsAction.Response> {

private final JobResultsProvider jobResultsProvider;
private final ClusterService clusterService;
private final JobConfigProvider jobConfigProvider;

@Inject
public TransportGetCalendarEventsAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, JobResultsProvider jobResultsProvider) {
JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) {
super(settings, GetCalendarEventsAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, GetCalendarEventsAction.Request::new);
this.jobResultsProvider = jobResultsProvider;
this.clusterService = clusterService;
this.jobConfigProvider = jobConfigProvider;
}

@Override
Expand All @@ -68,26 +65,28 @@ protected void doExecute(GetCalendarEventsAction.Request request,
);

if (request.getJobId() != null) {
ClusterState state = clusterService.state();
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state);

List<String> jobGroups;
String requestId = request.getJobId();

Job job = currentMlMetadata.getJobs().get(request.getJobId());
if (job == null) {
// Check if the requested id is a job group
if (currentMlMetadata.isGroupOrJob(request.getJobId()) == false) {
listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
return;
}
jobGroups = Collections.singletonList(request.getJobId());
requestId = null;
} else {
jobGroups = job.getGroups();
}
jobConfigProvider.getJob(request.getJobId(), ActionListener.wrap(
jobBuiler -> {
Job job = jobBuiler.build();
jobResultsProvider.scheduledEventsForJob(request.getJobId(), job.getGroups(), query, eventsListener);

jobResultsProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener);
},
jobNotFound -> {
// is the request Id a group?
jobConfigProvider.groupExists(request.getJobId(), ActionListener.wrap(
groupExists -> {
if (groupExists) {
jobResultsProvider.scheduledEventsForJob(
null, Collections.singletonList(request.getJobId()), query, eventsListener);
} else {
listener.onFailure(ExceptionsHelper.missingJobException(request.getJobId()));
}
},
listener::onFailure
));
}
));
} else {
jobResultsProvider.scheduledEvents(query, eventsListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,41 @@ public void expandGroupIds(List<String> groupIds, ActionListener<Set<String>> l
, client::search);
}

/**
* Check if a group exists, that is there exists a job that is a member of
* the group. If there are one or more jobs that define the group then
* the listener responds with true else false.
*
* @param groupId The group Id
* @param listener Returns true, false or a failure
*/
public void groupExists(String groupId, ActionListener<Boolean> listener) {
Copy link
Member

@benwtrent benwtrent Oct 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that for consistency with jobExists the listener should get Boolean.FALSE if the group does not exist and it is up to the caller to know if they want to error on false or not. Giving the listener an error here does not seem right as no error actually occurred.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that provides a better interface for users I pushed the change.

BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.filter(new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE));
boolQueryBuilder.filter(new TermQueryBuilder(Job.GROUPS.getPreferredName(), groupId));

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(boolQueryBuilder);
sourceBuilder.fetchSource(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should make .size(0) on the search as we don't really care about any of the hits, just the total number of them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setSize(0)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder).request();

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
response -> {
listener.onResponse(response.getHits().totalHits > 0);
},
listener::onFailure)
, client::search);
}

/**
* Find jobs with custom rules defined.
* @param listener Jobs listener
*/
public void findJobsWithCustomRules(ActionListener<List<Job>> listener) {
String customRulesPath = Strings.collectionToDelimitedString(Arrays.asList(Job.ANALYSIS_CONFIG.getPreferredName(),
AnalysisConfig.DETECTORS.getPreferredName(), Detector.CUSTOM_RULES_FIELD.getPreferredName()), ".");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ private void waitForTemplates() throws Exception {
List<String> templates = new ArrayList<>();
templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndex.jobResultsIndexPrefix()));
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName()));

for (String template : templates) {
awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(),
Expand Down