diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 462f0886774c6..47d26f0b887fb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -148,21 +148,15 @@ public void groupExists(String groupId, ActionListener listener) { } public void jobExists(String jobId, ActionListener listener) { - jobConfigProvider.jobExists(jobId, false, ActionListener.wrap( - jobFound -> { - if (jobFound) { - listener.onResponse(Boolean.TRUE); - } else { - // Look in the clusterstate for the job config - if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) { - listener.onResponse(Boolean.TRUE); - } else { - listener.onFailure(ExceptionsHelper.missingJobException(jobId)); - } - } - }, - listener::onFailure - )); + if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) { + listener.onResponse(Boolean.TRUE); + } else { + // check the index + jobConfigProvider.jobExists(jobId, true, ActionListener.wrap( + jobFound -> listener.onResponse(jobFound), + listener::onFailure + )); + } } /** @@ -173,33 +167,14 @@ public void jobExists(String jobId, ActionListener listener) { * a ResourceNotFoundException is returned */ public void getJob(String jobId, ActionListener jobListener) { - jobConfigProvider.getJob(jobId, ActionListener.wrap( - r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here - e -> { - if (e instanceof ResourceNotFoundException) { - // Try to get the job from the cluster state - getJobFromClusterState(jobId, jobListener); - } else { - jobListener.onFailure(e); - } - } - )); - } - - /** - * Read a job from the cluster state. - * The job is returned on the same thread even though a listener is used. - * - * @param jobId the jobId - * @param jobListener the Job listener. If no job matches {@code jobId} - * a ResourceNotFoundException is returned - */ - private void getJobFromClusterState(String jobId, ActionListener jobListener) { Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobId); - if (job == null) { - jobListener.onFailure(ExceptionsHelper.missingJobException(jobId)); - } else { + if (job != null) { jobListener.onResponse(job); + } else { + jobConfigProvider.getJob(jobId, ActionListener.wrap( + r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here + jobListener::onFailure + )); } } @@ -366,6 +341,22 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist return; } + // Check the job id is not the same as a group Id + if (currentMlMetadata.isGroupOrJob(job.getId())) { + actionListener.onFailure(new + ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, job.getId()))); + return; + } + + // and that the new job's groups are not job Ids + for (String group : job.getGroups()) { + if (currentMlMetadata.getJobs().containsKey(group)) { + actionListener.onFailure(new + ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group))); + return; + } + } + ActionListener putJobListener = new ActionListener() { @Override public void onResponse(Boolean indicesCreated) { @@ -446,6 +437,35 @@ public void onFailure(Exception e) { public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); + + if (request.getJobUpdate().getGroups() != null && request.getJobUpdate().getGroups().isEmpty() == false) { + + // check the new groups are not job Ids + for (String group : request.getJobUpdate().getGroups()) { + if (mlMetadata.getJobs().containsKey(group)) { + actionListener.onFailure(new ResourceAlreadyExistsException( + Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group))); + } + } + + jobConfigProvider.jobIdMatches(request.getJobUpdate().getGroups(), ActionListener.wrap( + matchingIds -> { + if (matchingIds.isEmpty()) { + updateJobPostInitialChecks(request, mlMetadata, actionListener); + } else { + actionListener.onFailure(new ResourceAlreadyExistsException( + Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, matchingIds.get(0)))); + } + }, + actionListener::onFailure + )); + } else { + updateJobPostInitialChecks(request, mlMetadata, actionListener); + } + } + + private void updateJobPostInitialChecks(UpdateJobAction.Request request, MlMetadata mlMetadata, + ActionListener actionListener) { if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) { updateJobClusterState(request, actionListener); } else { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index d3a4071f79dba..484008aa2b6a8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -497,7 +498,7 @@ public void testPutJob_AddsCreateTime() throws IOException { MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); - PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob()); + PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo()); doAnswer(invocation -> { AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; @@ -544,7 +545,7 @@ public void testJobExists_GivenMissingJob() { doAnswer(invocationOnMock -> { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(false); + listener.onFailure(ExceptionsHelper.missingJobException("non-job")); return null; }).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any()); @@ -579,18 +580,43 @@ public void testJobExists_GivenJobIsInClusterState() { Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, + auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); + + AtomicBoolean jobExistsHolder = new AtomicBoolean(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.jobExists("cs-job", ActionListener.wrap( + jobExistsHolder::set, + exceptionHolder::set + )); + + assertTrue(jobExistsHolder.get()); + assertNull(exceptionHolder.get()); + verify(jobConfigProvider, never()).jobExists(anyString(), anyBoolean(), any()); + } + + public void testJobExists_GivenJobIsInIndex() { + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + when(clusterService.state()).thenReturn(clusterState); + + ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), + Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); doAnswer(invocationOnMock -> { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; - listener.onResponse(false); + listener.onResponse(true); return null; - }).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any()); + }).when(jobConfigProvider).jobExists(eq("index-job"), anyBoolean(), any()); JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); AtomicBoolean jobExistsHolder = new AtomicBoolean(); AtomicReference exceptionHolder = new AtomicReference<>(); - jobManager.jobExists("cs-job", ActionListener.wrap( + jobManager.jobExists("index-job", ActionListener.wrap( jobExistsHolder::set, exceptionHolder::set )); @@ -603,7 +629,7 @@ public void testPutJob_ThrowsIfJobExistsInClusterState() throws IOException { MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); - PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob()); + PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo()); MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); mlMetadata.putJob(buildJobBuilder("foo").build(), false); @@ -623,6 +649,54 @@ public void onFailure(Exception e) { }); } + public void testPutJob_ThrowsIfIdIsTheSameAsAGroup() throws IOException { + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + JobManager jobManager = createJobManager(mockClientBuilder.build()); + + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + Job.Builder jobBuilder = buildJobBuilder("job-with-group-foo"); + jobBuilder.setGroups(Collections.singletonList("foo")); + mlMetadata.putJob(jobBuilder.build(), false); + ClusterState clusterState = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())).build(); + + // job id cannot be a group + PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo()); + jobManager.putJob(putJobRequest, analysisRegistry, clusterState, new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + fail("should have got an error"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof ResourceAlreadyExistsException); + assertEquals("job and group names must be unique but job [foo] and group [foo] have the same name", e.getMessage()); + } + }); + + // the job's groups cannot be job Ids + jobBuilder = buildJobBuilder("job-with-clashing-group-name"); + jobBuilder.setCreateTime(null); + jobBuilder.setGroups(Collections.singletonList("job-with-group-foo")); + putJobRequest = new PutJobAction.Request(jobBuilder); + + jobManager.putJob(putJobRequest, analysisRegistry, clusterState, new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + fail("should have got an error"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof ResourceAlreadyExistsException); + assertEquals("job and group names must be unique but job [job-with-group-foo] and " + + "group [job-with-group-foo] have the same name", e.getMessage()); + } + }); + } + public void testNotifyFilterChangedGivenNoop() { MlFilter filter = MlFilter.builder("my_filter").build(); MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); @@ -900,7 +974,7 @@ public void testRevertSnapshot_GivenJobInClusterState() { verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any()); } - private Job.Builder createJob() { + private Job.Builder createJobFoo() { Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("client"); AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Collections.singletonList(d1.build())); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index 285ebfc1cf9e5..f65406a25cabe 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -397,6 +397,28 @@ "description":"Can't update all description" } + - do: + xpack.ml.put_job: + job_id: job-crud-update-group-name-clash + body: > + { + "analysis_config" : { + "bucket_span": "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + } + } + + - do: + catch: "/job and group names must be unique/" + xpack.ml.update_job: + job_id: jobs-crud-update-job + body: > + { + "groups": ["job-crud-update-group-name-clash"] + } + --- "Test cannot decrease model_memory_limit below current usage": - skip: