Skip to content

[ML] JIndex: Job exists and get job should read cluster state first. #36305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,15 @@ public void groupExists(String groupId, ActionListener<Boolean> listener) {
}

public void jobExists(String jobId, ActionListener<Boolean> listener) {
jobConfigProvider.jobExists(jobId, false, ActionListener.wrap(
jobFound -> {
if (jobFound) {
listener.onResponse(Boolean.TRUE);
} else {
// Look in the clusterstate for the job config
if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) {
listener.onResponse(Boolean.TRUE);
} else {
listener.onFailure(ExceptionsHelper.missingJobException(jobId));
}
}
},
listener::onFailure
));
if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) {
listener.onResponse(Boolean.TRUE);
} else {
// check the index
jobConfigProvider.jobExists(jobId, true, ActionListener.wrap(
jobFound -> listener.onResponse(jobFound),
listener::onFailure
));
}
}

/**
Expand All @@ -173,33 +167,14 @@ public void jobExists(String jobId, ActionListener<Boolean> listener) {
* a ResourceNotFoundException is returned
*/
public void getJob(String jobId, ActionListener<Job> jobListener) {
jobConfigProvider.getJob(jobId, ActionListener.wrap(
r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here
e -> {
if (e instanceof ResourceNotFoundException) {
// Try to get the job from the cluster state
getJobFromClusterState(jobId, jobListener);
} else {
jobListener.onFailure(e);
}
}
));
}

/**
* Read a job from the cluster state.
* The job is returned on the same thread even though a listener is used.
*
* @param jobId the jobId
* @param jobListener the Job listener. If no job matches {@code jobId}
* a ResourceNotFoundException is returned
*/
private void getJobFromClusterState(String jobId, ActionListener<Job> jobListener) {
Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobId);
if (job == null) {
jobListener.onFailure(ExceptionsHelper.missingJobException(jobId));
} else {
if (job != null) {
jobListener.onResponse(job);
} else {
jobConfigProvider.getJob(jobId, ActionListener.wrap(
r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here
jobListener::onFailure
));
}
}

Expand Down Expand Up @@ -366,6 +341,22 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist
return;
}

// Check the job id is not the same as a group Id
if (currentMlMetadata.isGroupOrJob(job.getId())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we also validating this when the job is written to the index?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 checks here

  1. that the job id is not a group
  2. that any groups the job has are not job Ids

I realised the second was missing so I pushed another commit with that. Previously those checks were done in the constructor of MlMetadata which further made me realise I need to perform the same check on job update so I made that change also.

actionListener.onFailure(new
ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, job.getId())));
return;
}

// and that the new job's groups are not job Ids
for (String group : job.getGroups()) {
if (currentMlMetadata.getJobs().containsKey(group)) {
actionListener.onFailure(new
ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group)));
return;
}
}

ActionListener<Boolean> putJobListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean indicesCreated) {
Expand Down Expand Up @@ -446,6 +437,35 @@ public void onFailure(Exception e) {

public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());

if (request.getJobUpdate().getGroups() != null && request.getJobUpdate().getGroups().isEmpty() == false) {

// check the new groups are not job Ids
for (String group : request.getJobUpdate().getGroups()) {
if (mlMetadata.getJobs().containsKey(group)) {
actionListener.onFailure(new ResourceAlreadyExistsException(
Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group)));
}
}

jobConfigProvider.jobIdMatches(request.getJobUpdate().getGroups(), ActionListener.wrap(
matchingIds -> {
if (matchingIds.isEmpty()) {
updateJobPostInitialChecks(request, mlMetadata, actionListener);
} else {
actionListener.onFailure(new ResourceAlreadyExistsException(
Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, matchingIds.get(0))));
}
},
actionListener::onFailure
));
} else {
updateJobPostInitialChecks(request, mlMetadata, actionListener);
}
}

private void updateJobPostInitialChecks(UpdateJobAction.Request request, MlMetadata mlMetadata,
ActionListener<PutJobAction.Response> actionListener) {
if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) {
updateJobClusterState(request, actionListener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -497,7 +498,7 @@ public void testPutJob_AddsCreateTime() throws IOException {
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
JobManager jobManager = createJobManager(mockClientBuilder.build());

PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob());
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());

doAnswer(invocation -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocation.getArguments()[1];
Expand Down Expand Up @@ -544,7 +545,7 @@ public void testJobExists_GivenMissingJob() {

doAnswer(invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(false);
listener.onFailure(ExceptionsHelper.missingJobException("non-job"));
return null;
}).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any());

Expand Down Expand Up @@ -579,18 +580,43 @@ public void testJobExists_GivenJobIsInClusterState() {
Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);


JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService,
auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider);

AtomicBoolean jobExistsHolder = new AtomicBoolean();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobManager.jobExists("cs-job", ActionListener.wrap(
jobExistsHolder::set,
exceptionHolder::set
));

assertTrue(jobExistsHolder.get());
assertNull(exceptionHolder.get());
verify(jobConfigProvider, never()).jobExists(anyString(), anyBoolean(), any());
}

public void testJobExists_GivenJobIsInIndex() {
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
when(clusterService.state()).thenReturn(clusterState);

ClusterSettings clusterSettings = new ClusterSettings(environment.settings(),
Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class);
doAnswer(invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(false);
listener.onResponse(true);
return null;
}).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any());
}).when(jobConfigProvider).jobExists(eq("index-job"), anyBoolean(), any());

JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService,
auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider);

AtomicBoolean jobExistsHolder = new AtomicBoolean();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobManager.jobExists("cs-job", ActionListener.wrap(
jobManager.jobExists("index-job", ActionListener.wrap(
jobExistsHolder::set,
exceptionHolder::set
));
Expand All @@ -603,7 +629,7 @@ public void testPutJob_ThrowsIfJobExistsInClusterState() throws IOException {
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
JobManager jobManager = createJobManager(mockClientBuilder.build());

PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob());
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());

MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("foo").build(), false);
Expand All @@ -623,6 +649,54 @@ public void onFailure(Exception e) {
});
}

public void testPutJob_ThrowsIfIdIsTheSameAsAGroup() throws IOException {
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
JobManager jobManager = createJobManager(mockClientBuilder.build());


MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job.Builder jobBuilder = buildJobBuilder("job-with-group-foo");
jobBuilder.setGroups(Collections.singletonList("foo"));
mlMetadata.putJob(jobBuilder.build(), false);
ClusterState clusterState = ClusterState.builder(new ClusterName("name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())).build();

// job id cannot be a group
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());
jobManager.putJob(putJobRequest, analysisRegistry, clusterState, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
fail("should have got an error");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof ResourceAlreadyExistsException);
assertEquals("job and group names must be unique but job [foo] and group [foo] have the same name", e.getMessage());
}
});

// the job's groups cannot be job Ids
jobBuilder = buildJobBuilder("job-with-clashing-group-name");
jobBuilder.setCreateTime(null);
jobBuilder.setGroups(Collections.singletonList("job-with-group-foo"));
putJobRequest = new PutJobAction.Request(jobBuilder);

jobManager.putJob(putJobRequest, analysisRegistry, clusterState, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
fail("should have got an error");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof ResourceAlreadyExistsException);
assertEquals("job and group names must be unique but job [job-with-group-foo] and " +
"group [job-with-group-foo] have the same name", e.getMessage());
}
});
}

public void testNotifyFilterChangedGivenNoop() {
MlFilter filter = MlFilter.builder("my_filter").build();
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
Expand Down Expand Up @@ -900,7 +974,7 @@ public void testRevertSnapshot_GivenJobInClusterState() {
verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any());
}

private Job.Builder createJob() {
private Job.Builder createJobFoo() {
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("client");
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Collections.singletonList(d1.build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,28 @@
"description":"Can't update all description"
}

- do:
xpack.ml.put_job:
job_id: job-crud-update-group-name-clash
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}

- do:
catch: "/job and group names must be unique/"
xpack.ml.update_job:
job_id: jobs-crud-update-job
body: >
{
"groups": ["job-crud-update-group-name-clash"]
}

---
"Test cannot decrease model_memory_limit below current usage":
- skip:
Expand Down