Skip to content

Commit 4757b89

Browse files
authored
During ML maintenance, reset jobs in the reset state without a corresponding task. (#106062) (#106125)
* During ML maintenance, reset jobs in the reset state without a corresponding task. * Update docs/changelog/106062.yaml * Fix race condition in MlDailyMaintenanceServiceTests * Fix log level
1 parent 321c4e1 commit 4757b89

File tree

4 files changed

+218
-105
lines changed

4 files changed

+218
-105
lines changed

docs/changelog/106062.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 106062
2+
summary: "During ML maintenance, reset jobs in the reset state without a corresponding\
3+
\ task"
4+
area: Machine Learning
5+
type: bug
6+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,10 @@ public boolean isDeleting() {
485485
return deleting;
486486
}
487487

488+
public boolean isResetting() {
489+
return blocked != null && Blocked.Reason.RESET.equals(blocked.getReason());
490+
}
491+
488492
public boolean allowLazyOpen() {
489493
return allowLazyOpen;
490494
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java

Lines changed: 99 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
import org.apache.logging.log4j.Logger;
1111
import org.apache.lucene.util.SetOnce;
1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.ActionType;
1314
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
1415
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
1516
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
17+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1618
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1719
import org.elasticsearch.client.internal.Client;
1820
import org.elasticsearch.cluster.ClusterName;
@@ -26,12 +28,15 @@
2628
import org.elasticsearch.core.TimeValue;
2729
import org.elasticsearch.core.Tuple;
2830
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
31+
import org.elasticsearch.tasks.TaskInfo;
2932
import org.elasticsearch.threadpool.Scheduler;
3033
import org.elasticsearch.threadpool.ThreadPool;
3134
import org.elasticsearch.xpack.core.ml.MlMetadata;
35+
import org.elasticsearch.xpack.core.ml.MlTasks;
3236
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
3337
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
3438
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
39+
import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
3540
import org.elasticsearch.xpack.core.ml.job.config.Job;
3641
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
3742

@@ -41,6 +46,8 @@
4146
import java.util.Objects;
4247
import java.util.Random;
4348
import java.util.Set;
49+
import java.util.function.Function;
50+
import java.util.function.Predicate;
4451
import java.util.function.Supplier;
4552

4653
import static java.util.stream.Collectors.toList;
@@ -205,24 +212,34 @@ private void triggerTasks() {
205212
}
206213

207214
private void triggerAnomalyDetectionMaintenance() {
208-
// Step 3: Log any error that could have happened
215+
// Step 4: Log any error that could have happened
209216
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
210217
unused -> {},
211-
e -> logger.error("An error occurred during [ML] maintenance tasks execution", e)
218+
e -> logger.warn("An error occurred during [ML] maintenance tasks execution", e)
212219
);
213220

214-
// Step 2: Delete expired data
221+
// Step 3: Delete expired data
215222
ActionListener<AcknowledgedResponse> deleteJobsListener = ActionListener.wrap(
216223
unused -> triggerDeleteExpiredDataTask(finalListener),
217224
e -> {
218-
logger.info("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e);
219-
// Note: Steps 1 and 2 are independent of each other and step 2 is executed even if step 1 failed.
225+
logger.warn("[ML] maintenance task: triggerResetJobsInStateResetWithoutResetTask failed", e);
226+
// Note: Steps 1-3 are independent, so continue upon errors.
220227
triggerDeleteExpiredDataTask(finalListener);
221228
}
222229
);
223230

224-
// Step 1: Delete jobs that are in deleting state
225-
triggerDeleteJobsInStateDeletingWithoutDeletionTask(deleteJobsListener);
231+
// Step 2: Reset jobs that are in resetting state without task
232+
ActionListener<AcknowledgedResponse> resetJobsListener = ActionListener.wrap(
233+
unused -> triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener),
234+
e -> {
235+
logger.warn("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e);
236+
// Note: Steps 1-3 are independent, so continue upon errors.
237+
triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener);
238+
}
239+
);
240+
241+
// Step 1: Delete jobs that are in deleting state without task
242+
triggerDeleteJobsInStateDeletingWithoutDeletionTask(resetJobsListener);
226243
}
227244

228245
private void triggerDataFrameAnalyticsMaintenance() {
@@ -256,73 +273,111 @@ private void triggerDeleteExpiredDataTask(ActionListener<AcknowledgedResponse> f
256273

257274
// Visible for testing
258275
public void triggerDeleteJobsInStateDeletingWithoutDeletionTask(ActionListener<AcknowledgedResponse> finalListener) {
259-
SetOnce<Set<String>> jobsInStateDeletingHolder = new SetOnce<>();
260-
261-
ActionListener<List<Tuple<DeleteJobAction.Request, AcknowledgedResponse>>> deleteJobsActionListener = finalListener
262-
.delegateFailureAndWrap((delegate, deleteJobsResponses) -> {
263-
List<String> jobIds = deleteJobsResponses.stream()
264-
.filter(t -> t.v2().isAcknowledged() == false)
265-
.map(Tuple::v1)
266-
.map(DeleteJobAction.Request::getJobId)
267-
.collect(toList());
276+
triggerJobsInStateWithoutMatchingTask(
277+
"triggerDeleteJobsInStateDeletingWithoutDeletionTask",
278+
Job::isDeleting,
279+
DeleteJobAction.NAME,
280+
taskInfo -> stripPrefixOrNull(taskInfo.description(), DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX),
281+
DeleteJobAction.INSTANCE,
282+
DeleteJobAction.Request::new,
283+
finalListener
284+
);
285+
}
286+
287+
public void triggerResetJobsInStateResetWithoutResetTask(ActionListener<AcknowledgedResponse> finalListener) {
288+
triggerJobsInStateWithoutMatchingTask(
289+
"triggerResetJobsInStateResetWithoutResetTask",
290+
Job::isResetting,
291+
ResetJobAction.NAME,
292+
taskInfo -> stripPrefixOrNull(taskInfo.description(), MlTasks.JOB_TASK_ID_PREFIX),
293+
ResetJobAction.INSTANCE,
294+
ResetJobAction.Request::new,
295+
finalListener
296+
);
297+
}
298+
299+
/**
300+
* @return If the string starts with the prefix, this returns the string without the prefix.
301+
* Otherwise, this return null.
302+
*/
303+
private static String stripPrefixOrNull(String str, String prefix) {
304+
return str == null || str.startsWith(prefix) == false ? null : str.substring(prefix.length());
305+
}
306+
307+
/**
308+
* Executes a request for each job in a state, while missing the corresponding task. This
309+
* usually indicates the node originally executing the task has died, so retry the request.
310+
*
311+
* @param maintenanceTaskName Name of ML maintenance task; used only for logging.
312+
* @param jobFilter Predicate for filtering the jobs.
313+
* @param taskActionName Action name of the tasks corresponding to the jobs.
314+
* @param jobIdExtractor Function to extract the job ID from the task info (in order to match to the job).
315+
* @param actionType Action type of the request that should be (re)executed.
316+
* @param requestCreator Function to create the request from the job ID.
317+
* @param finalListener Listener that captures the final response.
318+
*/
319+
private void triggerJobsInStateWithoutMatchingTask(
320+
String maintenanceTaskName,
321+
Predicate<Job> jobFilter,
322+
String taskActionName,
323+
Function<TaskInfo, String> jobIdExtractor,
324+
ActionType<AcknowledgedResponse> actionType,
325+
Function<String, AcknowledgedRequest<?>> requestCreator,
326+
ActionListener<AcknowledgedResponse> finalListener
327+
) {
328+
SetOnce<Set<String>> jobsInStateHolder = new SetOnce<>();
329+
330+
ActionListener<List<Tuple<String, AcknowledgedResponse>>> jobsActionListener = finalListener.delegateFailureAndWrap(
331+
(delegate, jobsResponses) -> {
332+
List<String> jobIds = jobsResponses.stream().filter(t -> t.v2().isAcknowledged() == false).map(Tuple::v1).collect(toList());
268333
if (jobIds.isEmpty()) {
269-
logger.info("Successfully completed [ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask");
334+
logger.info("Successfully completed [ML] maintenance task: {}", maintenanceTaskName);
270335
} else {
271-
logger.info("The following ML jobs could not be deleted: [" + String.join(",", jobIds) + "]");
336+
logger.info("[ML] maintenance task {} failed for jobs: {}", maintenanceTaskName, jobIds);
272337
}
273338
delegate.onResponse(AcknowledgedResponse.TRUE);
274-
});
339+
}
340+
);
275341

276342
ActionListener<ListTasksResponse> listTasksActionListener = ActionListener.wrap(listTasksResponse -> {
277-
Set<String> jobsInStateDeleting = jobsInStateDeletingHolder.get();
278-
Set<String> jobsWithDeletionTask = listTasksResponse.getTasks()
279-
.stream()
280-
.filter(t -> t.description() != null)
281-
.filter(t -> t.description().startsWith(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX))
282-
.map(t -> t.description().substring(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX.length()))
283-
.collect(toSet());
284-
Set<String> jobsInStateDeletingWithoutDeletionTask = Sets.difference(jobsInStateDeleting, jobsWithDeletionTask);
285-
if (jobsInStateDeletingWithoutDeletionTask.isEmpty()) {
343+
Set<String> jobsInState = jobsInStateHolder.get();
344+
Set<String> jobsWithTask = listTasksResponse.getTasks().stream().map(jobIdExtractor).filter(Objects::nonNull).collect(toSet());
345+
Set<String> jobsInStateWithoutTask = Sets.difference(jobsInState, jobsWithTask);
346+
if (jobsInStateWithoutTask.isEmpty()) {
286347
finalListener.onResponse(AcknowledgedResponse.TRUE);
287348
return;
288349
}
289-
TypedChainTaskExecutor<Tuple<DeleteJobAction.Request, AcknowledgedResponse>> chainTaskExecutor = new TypedChainTaskExecutor<>(
350+
TypedChainTaskExecutor<Tuple<String, AcknowledgedResponse>> chainTaskExecutor = new TypedChainTaskExecutor<>(
290351
EsExecutors.DIRECT_EXECUTOR_SERVICE,
291352
unused -> true,
292353
unused -> true
293354
);
294-
for (String jobId : jobsInStateDeletingWithoutDeletionTask) {
295-
DeleteJobAction.Request request = new DeleteJobAction.Request(jobId);
355+
for (String jobId : jobsInStateWithoutTask) {
296356
chainTaskExecutor.add(
297357
listener -> executeAsyncWithOrigin(
298358
client,
299359
ML_ORIGIN,
300-
DeleteJobAction.INSTANCE,
301-
request,
302-
listener.delegateFailureAndWrap((l, response) -> l.onResponse(Tuple.tuple(request, response)))
360+
actionType,
361+
requestCreator.apply(jobId),
362+
listener.delegateFailureAndWrap((l, response) -> l.onResponse(Tuple.tuple(jobId, response)))
303363
)
304364
);
305365
}
306-
chainTaskExecutor.execute(deleteJobsActionListener);
366+
chainTaskExecutor.execute(jobsActionListener);
307367
}, finalListener::onFailure);
308368

309369
ActionListener<GetJobsAction.Response> getJobsActionListener = ActionListener.wrap(getJobsResponse -> {
310-
Set<String> jobsInStateDeleting = getJobsResponse.getResponse()
311-
.results()
312-
.stream()
313-
.filter(Job::isDeleting)
314-
.map(Job::getId)
315-
.collect(toSet());
316-
if (jobsInStateDeleting.isEmpty()) {
370+
Set<String> jobsInState = getJobsResponse.getResponse().results().stream().filter(jobFilter).map(Job::getId).collect(toSet());
371+
if (jobsInState.isEmpty()) {
317372
finalListener.onResponse(AcknowledgedResponse.TRUE);
318373
return;
319374
}
320-
jobsInStateDeletingHolder.set(jobsInStateDeleting);
375+
jobsInStateHolder.set(jobsInState);
321376
executeAsyncWithOrigin(
322377
client,
323378
ML_ORIGIN,
324379
TransportListTasksAction.TYPE,
325-
new ListTasksRequest().setActions(DeleteJobAction.NAME),
380+
new ListTasksRequest().setActions(taskActionName),
326381
listTasksActionListener
327382
);
328383
}, finalListener::onFailure);

0 commit comments

Comments
 (0)