Skip to content

During ML maintenance, reset jobs in the reset state without a corresponding task. #106062

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/106062.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 106062
summary: "During ML maintenance, reset jobs in the reset state without a corresponding\
\ task"
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ public boolean isDeleting() {
return deleting;
}

public boolean isResetting() {
return blocked != null && Blocked.Reason.RESET.equals(blocked.getReason());
}

public boolean allowLazyOpen() {
return allowLazyOpen;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
Expand All @@ -27,12 +29,15 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;

Expand All @@ -42,6 +47,8 @@
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -206,24 +213,34 @@ private void triggerTasks() {
}

private void triggerAnomalyDetectionMaintenance() {
// Step 3: Log any error that could have happened
// Step 4: Log any error that could have happened
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
unused -> {},
e -> logger.error("An error occurred during [ML] maintenance tasks execution", e)
);

// Step 2: Delete expired data
// Step 3: Delete expired data
ActionListener<AcknowledgedResponse> deleteJobsListener = ActionListener.wrap(
unused -> triggerDeleteExpiredDataTask(finalListener),
e -> {
logger.info("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e);
// Note: Steps 1 and 2 are independent of each other and step 2 is executed even if step 1 failed.
logger.info("[ML] maintenance task: triggerResetJobsInStateResetWithoutResetTask failed", e);
// Note: Steps 1-3 are independent, so continue upon errors.
triggerDeleteExpiredDataTask(finalListener);
}
);

// Step 1: Delete jobs that are in deleting state
triggerDeleteJobsInStateDeletingWithoutDeletionTask(deleteJobsListener);
// Step 2: Reset jobs that are in resetting state without task
ActionListener<AcknowledgedResponse> resetJobsListener = ActionListener.wrap(
unused -> triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener),
e -> {
logger.info("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e);
// Note: Steps 1-3 are independent, so continue upon errors.
triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener);
}
);

// Step 1: Delete jobs that are in deleting state without task
triggerDeleteJobsInStateDeletingWithoutDeletionTask(resetJobsListener);
}

private void triggerDataFrameAnalyticsMaintenance() {
Expand Down Expand Up @@ -257,73 +274,111 @@ private void triggerDeleteExpiredDataTask(ActionListener<AcknowledgedResponse> f

// Visible for testing
public void triggerDeleteJobsInStateDeletingWithoutDeletionTask(ActionListener<AcknowledgedResponse> finalListener) {
SetOnce<Set<String>> jobsInStateDeletingHolder = new SetOnce<>();

ActionListener<List<Tuple<DeleteJobAction.Request, AcknowledgedResponse>>> deleteJobsActionListener = finalListener
.delegateFailureAndWrap((delegate, deleteJobsResponses) -> {
List<String> jobIds = deleteJobsResponses.stream()
.filter(t -> t.v2().isAcknowledged() == false)
.map(Tuple::v1)
.map(DeleteJobAction.Request::getJobId)
.collect(toList());
triggerJobsInStateWithoutMatchingTask(
"triggerDeleteJobsInStateDeletingWithoutDeletionTask",
Job::isDeleting,
DeleteJobAction.NAME,
taskInfo -> stripPrefixOrNull(taskInfo.description(), DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX),
DeleteJobAction.INSTANCE,
DeleteJobAction.Request::new,
finalListener
);
}

public void triggerResetJobsInStateResetWithoutResetTask(ActionListener<AcknowledgedResponse> finalListener) {
triggerJobsInStateWithoutMatchingTask(
"triggerResetJobsInStateResetWithoutResetTask",
Job::isResetting,
ResetJobAction.NAME,
taskInfo -> stripPrefixOrNull(taskInfo.description(), MlTasks.JOB_TASK_ID_PREFIX),
ResetJobAction.INSTANCE,
ResetJobAction.Request::new,
finalListener
);
}

/**
* @return If the string starts with the prefix, this returns the string without the prefix.
* Otherwise, this return null.
*/
private static String stripPrefixOrNull(String str, String prefix) {
return str == null || str.startsWith(prefix) == false ? null : str.substring(prefix.length());
}

/**
* Executes a request for each job in a state, while missing the corresponding task. This
* usually indicates the node originally executing the task has died, so retry the request.
*
* @param maintenanceTaskName Name of ML maintenance task; used only for logging.
* @param jobFilter Predicate for filtering the jobs.
* @param taskActionName Action name of the tasks corresponding to the jobs.
* @param jobIdExtractor Function to extract the job ID from the task info (in order to match to the job).
* @param actionType Action type of the request that should be (re)executed.
* @param requestCreator Function to create the request from the job ID.
* @param finalListener Listener that captures the final response.
*/
private void triggerJobsInStateWithoutMatchingTask(
String maintenanceTaskName,
Predicate<Job> jobFilter,
String taskActionName,
Function<TaskInfo, String> jobIdExtractor,
ActionType<AcknowledgedResponse> actionType,
Function<String, AcknowledgedRequest<?>> requestCreator,
ActionListener<AcknowledgedResponse> finalListener
) {
SetOnce<Set<String>> jobsInStateHolder = new SetOnce<>();

ActionListener<List<Tuple<String, AcknowledgedResponse>>> jobsActionListener = finalListener.delegateFailureAndWrap(
(delegate, jobsResponses) -> {
List<String> jobIds = jobsResponses.stream().filter(t -> t.v2().isAcknowledged() == false).map(Tuple::v1).collect(toList());
if (jobIds.isEmpty()) {
logger.info("Successfully completed [ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask");
logger.info("Successfully completed [ML] maintenance task: {}", maintenanceTaskName);
} else {
logger.info("The following ML jobs could not be deleted: [" + String.join(",", jobIds) + "]");
logger.info("[ML] maintenance task {} failed for jobs: {}", maintenanceTaskName, jobIds);
}
delegate.onResponse(AcknowledgedResponse.TRUE);
});
}
);

ActionListener<ListTasksResponse> listTasksActionListener = ActionListener.wrap(listTasksResponse -> {
Set<String> jobsInStateDeleting = jobsInStateDeletingHolder.get();
Set<String> jobsWithDeletionTask = listTasksResponse.getTasks()
.stream()
.filter(t -> t.description() != null)
.filter(t -> t.description().startsWith(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX))
.map(t -> t.description().substring(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX.length()))
.collect(toSet());
Set<String> jobsInStateDeletingWithoutDeletionTask = Sets.difference(jobsInStateDeleting, jobsWithDeletionTask);
if (jobsInStateDeletingWithoutDeletionTask.isEmpty()) {
Set<String> jobsInState = jobsInStateHolder.get();
Set<String> jobsWithTask = listTasksResponse.getTasks().stream().map(jobIdExtractor).filter(Objects::nonNull).collect(toSet());
Set<String> jobsInStateWithoutTask = Sets.difference(jobsInState, jobsWithTask);
if (jobsInStateWithoutTask.isEmpty()) {
finalListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
TypedChainTaskExecutor<Tuple<DeleteJobAction.Request, AcknowledgedResponse>> chainTaskExecutor = new TypedChainTaskExecutor<>(
TypedChainTaskExecutor<Tuple<String, AcknowledgedResponse>> chainTaskExecutor = new TypedChainTaskExecutor<>(
EsExecutors.DIRECT_EXECUTOR_SERVICE,
Predicates.always(),
Predicates.always()
);
for (String jobId : jobsInStateDeletingWithoutDeletionTask) {
DeleteJobAction.Request request = new DeleteJobAction.Request(jobId);
for (String jobId : jobsInStateWithoutTask) {
chainTaskExecutor.add(
listener -> executeAsyncWithOrigin(
client,
ML_ORIGIN,
DeleteJobAction.INSTANCE,
request,
listener.delegateFailureAndWrap((l, response) -> l.onResponse(Tuple.tuple(request, response)))
actionType,
requestCreator.apply(jobId),
listener.delegateFailureAndWrap((l, response) -> l.onResponse(Tuple.tuple(jobId, response)))
)
);
}
chainTaskExecutor.execute(deleteJobsActionListener);
chainTaskExecutor.execute(jobsActionListener);
}, finalListener::onFailure);

ActionListener<GetJobsAction.Response> getJobsActionListener = ActionListener.wrap(getJobsResponse -> {
Set<String> jobsInStateDeleting = getJobsResponse.getResponse()
.results()
.stream()
.filter(Job::isDeleting)
.map(Job::getId)
.collect(toSet());
if (jobsInStateDeleting.isEmpty()) {
Set<String> jobsInState = getJobsResponse.getResponse().results().stream().filter(jobFilter).map(Job::getId).collect(toSet());
if (jobsInState.isEmpty()) {
finalListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
jobsInStateDeletingHolder.set(jobsInStateDeleting);
jobsInStateHolder.set(jobsInState);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
TransportListTasksAction.TYPE,
new ListTasksRequest().setActions(DeleteJobAction.NAME),
new ListTasksRequest().setActions(taskActionName),
listTasksActionListener
);
}, finalListener::onFailure);
Expand Down
Loading