Skip to content

Commit 4e98158

Browse files
authored
[ML] Prefer cluster state config to index documents (#36014)
Flipping the change in #35940
1 parent 4d6f556 commit 4e98158

File tree

12 files changed

+307
-274
lines changed

12 files changed

+307
-274
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public final class Messages {
9191
public static final String JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT = "Job memory status changed to hard_limit at {0}; adjust the " +
9292
"analysis_limits.model_memory_limit setting to ensure all data is analyzed";
9393

94+
public static final String JOB_CANNOT_CLOSE_BECAUSE_DATAFEED = "cannot close job datafeed [{0}] hasn''t been stopped";
95+
9496
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "categorization_filters contain duplicates";
9597
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_EMPTY =
9698
"categorization_filters are not allowed to contain empty strings";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import org.elasticsearch.tasks.Task;
2929
import org.elasticsearch.threadpool.ThreadPool;
3030
import org.elasticsearch.transport.TransportService;
31+
import org.elasticsearch.xpack.core.ml.MlMetadata;
3132
import org.elasticsearch.xpack.core.ml.MlTasks;
3233
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
3334
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
35+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
3436
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
3537
import org.elasticsearch.xpack.core.ml.job.config.JobState;
3638
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
@@ -46,12 +48,13 @@
4648
import java.util.Collection;
4749
import java.util.HashSet;
4850
import java.util.List;
51+
import java.util.Optional;
4952
import java.util.Set;
5053
import java.util.concurrent.atomic.AtomicInteger;
5154
import java.util.stream.Collectors;
5255

53-
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
5456
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
57+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
5558

5659
public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJobAction.JobTask, CloseJobAction.Request,
5760
CloseJobAction.Response, CloseJobAction.Response> {
@@ -112,7 +115,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
112115
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
113116
jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap(
114117
expandedJobIds -> {
115-
validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap(
118+
validate(expandedJobIds, request.isForce(), MlMetadata.getMlMetadata(state), tasksMetaData, ActionListener.wrap(
116119
response -> {
117120
request.setOpenJobIds(response.openJobIds.toArray(new String[0]));
118121
if (response.openJobIds.isEmpty() && response.closingJobIds.isEmpty()) {
@@ -173,13 +176,14 @@ class OpenAndClosingIds {
173176
*
174177
* @param expandedJobIds The job ids
175178
* @param forceClose Force close the job(s)
179+
* @param mlMetadata The ML metadata for un-migrated jobs
176180
* @param tasksMetaData Persistent tasks
177181
* @param listener Resolved job Ids listener
178182
*/
179-
void validate(Collection<String> expandedJobIds, boolean forceClose, PersistentTasksCustomMetaData tasksMetaData,
180-
ActionListener<OpenAndClosingIds> listener) {
183+
void validate(Collection<String> expandedJobIds, boolean forceClose, MlMetadata mlMetadata,
184+
PersistentTasksCustomMetaData tasksMetaData, ActionListener<OpenAndClosingIds> listener) {
181185

182-
checkDatafeedsHaveStopped(expandedJobIds, tasksMetaData, ActionListener.wrap(
186+
checkDatafeedsHaveStopped(expandedJobIds, tasksMetaData, mlMetadata, ActionListener.wrap(
183187
response -> {
184188
OpenAndClosingIds ids = new OpenAndClosingIds();
185189
List<String> failedJobs = new ArrayList<>();
@@ -209,14 +213,27 @@ void validate(Collection<String> expandedJobIds, boolean forceClose, PersistentT
209213
}
210214

211215
void checkDatafeedsHaveStopped(Collection<String> jobIds, PersistentTasksCustomMetaData tasksMetaData,
212-
ActionListener<Boolean> listener) {
216+
MlMetadata mlMetadata, ActionListener<Boolean> listener) {
217+
218+
for (String jobId: jobIds) {
219+
Optional<DatafeedConfig> datafeed = mlMetadata.getDatafeedByJobId(jobId);
220+
if (datafeed.isPresent()) {
221+
DatafeedState datafeedState = MlTasks.getDatafeedState(datafeed.get().getId(), tasksMetaData);
222+
if (datafeedState != DatafeedState.STOPPED) {
223+
listener.onFailure(
224+
ExceptionsHelper.conflictStatusException(
225+
Messages.getMessage(Messages.JOB_CANNOT_CLOSE_BECAUSE_DATAFEED, datafeed.get().getId())));
226+
return;
227+
}
228+
}
229+
}
213230
datafeedConfigProvider.findDatafeedsForJobIds(jobIds, ActionListener.wrap(
214231
datafeedIds -> {
215232
for (String datafeedId : datafeedIds) {
216233
DatafeedState datafeedState = MlTasks.getDatafeedState(datafeedId, tasksMetaData);
217234
if (datafeedState != DatafeedState.STOPPED) {
218235
listener.onFailure(ExceptionsHelper.conflictStatusException(
219-
"cannot close job datafeed [{}] hasn't been stopped", datafeedId));
236+
Messages.getMessage(Messages.JOB_CANNOT_CLOSE_BECAUSE_DATAFEED, datafeedId)));
220237
return;
221238
}
222239
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -131,21 +131,15 @@ private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ClusterS
131131
return;
132132
}
133133

134-
135-
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
136-
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
137-
e -> {
138-
if (e.getClass() == ResourceNotFoundException.class) {
139-
// is the datafeed in the clusterstate
140-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
141-
if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) {
142-
deleteDatafeedFromMetadata(request, listener);
143-
return;
144-
}
145-
}
146-
listener.onFailure(e);
147-
}
148-
));
134+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
135+
if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) {
136+
deleteDatafeedFromMetadata(request, listener);
137+
} else {
138+
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
139+
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
140+
listener::onFailure
141+
));
142+
}
149143
}
150144

151145
private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ protected AcknowledgedResponse newResponse() {
7373
@Override
7474
protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state,
7575
ActionListener<AcknowledgedResponse> listener) {
76-
7776
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
7877
Set<String> jobsInClusterState = Arrays.stream(request.getJobIds())
7978
.filter(id -> mlMetadata.getJobs().containsKey(id))
@@ -83,14 +82,19 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust
8382
finalizeIndexJobs(Arrays.asList(request.getJobIds()), listener);
8483
} else {
8584
ActionListener<AcknowledgedResponse> finalizeClusterStateJobsListener = ActionListener.wrap(
86-
ack -> finalizeClusterStateJobs(jobsInClusterState, listener),
85+
ack -> {
86+
Set<String> jobsInIndex = new HashSet<>(Arrays.asList(request.getJobIds()));
87+
jobsInIndex.removeAll(jobsInClusterState);
88+
if (jobsInIndex.isEmpty()) {
89+
listener.onResponse(ack);
90+
} else {
91+
finalizeIndexJobs(jobsInIndex, listener);
92+
}
93+
},
8794
listener::onFailure
8895
);
8996

90-
Set<String> jobsInIndex = new HashSet<>(Arrays.asList(request.getJobIds()));
91-
jobsInIndex.removeAll(jobsInClusterState);
92-
93-
finalizeIndexJobs(jobsInIndex, finalizeClusterStateJobsListener);
97+
finalizeClusterStateJobs(jobsInClusterState, finalizeClusterStateJobsListener);
9498
}
9599
}
96100

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 37 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.ElasticsearchStatusException;
1313
import org.elasticsearch.ResourceAlreadyExistsException;
14-
import org.elasticsearch.ResourceNotFoundException;
1514
import org.elasticsearch.Version;
1615
import org.elasticsearch.action.ActionListener;
1716
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
@@ -693,53 +692,47 @@ public void onTimeout(TimeValue timeout) {
693692

694693
private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
695694

696-
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
695+
boolean jobIsInClusterState = ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId);
696+
if (jobIsInClusterState) {
697+
clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() {
698+
@Override
699+
public ClusterState execute(ClusterState currentState) {
700+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
701+
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
702+
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
703+
jobBuilder.setFinishedTime(null);
704+
705+
mlMetadataBuilder.putJob(jobBuilder.build(), true);
706+
ClusterState.Builder builder = ClusterState.builder(currentState);
707+
return builder.metaData(new MetaData.Builder(currentState.metaData())
708+
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
709+
.build();
710+
}
697711

698-
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
699-
job -> listener.onResponse(new AcknowledgedResponse(true)),
700-
e -> {
701-
if (e.getClass() == ResourceNotFoundException.class) {
702-
// Maybe the config is in the clusterstate
703-
if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) {
704-
clearJobFinishedTimeClusterState(jobId, listener);
705-
return;
706-
}
707-
}
708-
logger.error("[" + jobId + "] Failed to clear finished_time", e);
709-
// Not a critical error so continue
712+
@Override
713+
public void onFailure(String source, Exception e) {
714+
logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e);
710715
listener.onResponse(new AcknowledgedResponse(true));
711716
}
712-
));
713-
}
714-
715-
private void clearJobFinishedTimeClusterState(String jobId, ActionListener<AcknowledgedResponse> listener) {
716-
clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() {
717-
@Override
718-
public ClusterState execute(ClusterState currentState) {
719-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
720-
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
721-
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
722-
jobBuilder.setFinishedTime(null);
723-
724-
mlMetadataBuilder.putJob(jobBuilder.build(), true);
725-
ClusterState.Builder builder = ClusterState.builder(currentState);
726-
return builder.metaData(new MetaData.Builder(currentState.metaData())
727-
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
728-
.build();
729-
}
730-
731-
@Override
732-
public void onFailure(String source, Exception e) {
733-
logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e);
734-
listener.onResponse(new AcknowledgedResponse(true));
735-
}
736717

737-
@Override
738-
public void clusterStateProcessed(String source, ClusterState oldState,
739-
ClusterState newState) {
740-
listener.onResponse(new AcknowledgedResponse(true));
741-
}
742-
});
718+
@Override
719+
public void clusterStateProcessed(String source, ClusterState oldState,
720+
ClusterState newState) {
721+
listener.onResponse(new AcknowledgedResponse(true));
722+
}
723+
});
724+
} else {
725+
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
726+
727+
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
728+
job -> listener.onResponse(new AcknowledgedResponse(true)),
729+
e -> {
730+
logger.error("[" + jobId + "] Failed to clear finished_time", e);
731+
// Not a critical error so continue
732+
listener.onResponse(new AcknowledgedResponse(true));
733+
}
734+
));
735+
}
743736
}
744737

745738
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
package org.elasticsearch.xpack.ml.action;
77

8-
import org.elasticsearch.ResourceNotFoundException;
98
import org.elasticsearch.action.ActionListener;
109
import org.elasticsearch.action.support.ActionFilters;
1110
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -70,6 +69,19 @@ protected PutDatafeedAction.Response newResponse() {
7069
protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state,
7170
ActionListener<PutDatafeedAction.Response> listener) throws Exception {
7271

72+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
73+
boolean datafeedConfigIsInClusterState = mlMetadata.getDatafeed(request.getUpdate().getId()) != null;
74+
if (datafeedConfigIsInClusterState) {
75+
updateDatafeedInClusterState(request, listener);
76+
} else {
77+
updateDatafeedInIndex(request, state, listener);
78+
}
79+
}
80+
81+
private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state,
82+
ActionListener<PutDatafeedAction.Response> listener) throws Exception {
83+
final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
84+
7385
// Check datafeed is stopped
7486
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
7587
if (MlTasks.getDatafeedTask(request.getUpdate().getId(), tasks) != null) {
@@ -79,30 +91,14 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat
7991
return;
8092
}
8193

82-
updateDatafeedInIndex(request, state, listener);
83-
}
84-
85-
private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, ClusterState state,
86-
ActionListener<PutDatafeedAction.Response> listener) throws Exception {
87-
final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
8894
String datafeedId = request.getUpdate().getId();
8995

9096
CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
9197
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
9298
jobConfigProvider::validateDatafeedJob,
9399
ActionListener.wrap(
94100
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
95-
e -> {
96-
if (e.getClass() == ResourceNotFoundException.class) {
97-
// try the clusterstate
98-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
99-
if (mlMetadata.getDatafeed(request.getUpdate().getId()) != null) {
100-
updateDatafeedInClusterState(request, listener);
101-
return;
102-
}
103-
}
104-
listener.onFailure(e);
105-
}
101+
listener::onFailure
106102
));
107103
};
108104

0 commit comments

Comments
 (0)