Skip to content

[7.x][ML] Add reason to DataFrameAnalyticsTask setFailed log message #52707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public final class Messages {
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics";
public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics";
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE = "Successfully updated analytics task state to [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON =
"Updated analytics task state to [{0}] with reason [{1}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current
executeJobInMiddleOfReindexing(task, config);
break;
default:
task.updateState(DataFrameAnalyticsState.FAILED, "Cannot execute analytics task [" + config.getId() +
task.setFailed("Cannot execute analytics task [" + config.getId() +
"] as it is in unknown state [" + currentState + "]. Must be one of [STARTED, REINDEXING, ANALYZING]");
}

},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
error -> task.setFailed(error.getMessage())
);

// Retrieve configuration
Expand All @@ -122,21 +122,21 @@ private void executeStartingJob(DataFrameAnalyticsTask task, DataFrameAnalyticsC
case FIRST_TIME:
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
updatedTask -> reindexDataframeAndStartAnalysis(task, config),
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
error -> task.setFailed(error.getMessage())
));
break;
case RESUMING_REINDEXING:
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
updatedTask -> executeJobInMiddleOfReindexing(task, config),
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
error -> task.setFailed(error.getMessage())
));
break;
case RESUMING_ANALYZING:
startAnalytics(task, config);
break;
case FINISHED:
default:
task.updateState(DataFrameAnalyticsState.FAILED, "Unexpected starting state [" + startingState + "]");
task.setFailed("Unexpected starting state [" + startingState + "]");
}
}

Expand All @@ -151,7 +151,7 @@ private void executeJobInMiddleOfReindexing(DataFrameAnalyticsTask task, DataFra
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
reindexDataframeAndStartAnalysis(task, config);
} else {
task.updateState(DataFrameAnalyticsState.FAILED, e.getMessage());
task.setFailed(e.getMessage());
}
}
));
Expand All @@ -178,7 +178,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
startAnalytics(task, config);
},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
error -> task.setFailed(error.getMessage())
);

// Reindex
Expand Down Expand Up @@ -244,12 +244,12 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
// Task has stopped
} else {
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
task.setFailed(error.getMessage());
}
}
));
},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
error -> task.setFailed(error.getMessage())
);

ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,20 @@ private void cancelReindexingTask(String reason, TimeValue timeout) {
}
}

public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
public void setFailed(String reason) {
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
getAllocationId(), reason);
updatePersistentTaskState(
newTaskState,
ActionListener.wrap(
updatedTask -> {
auditor.info(getParams().getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE, state));
LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state);
String message = Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON,
DataFrameAnalyticsState.FAILED, reason);
auditor.info(getParams().getId(), message);
LOGGER.info("[{}] {}", getParams().getId(), message);
},
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]",
getParams().getId(), state, reason), e)
getParams().getId(), DataFrameAnalyticsState.FAILED, reason), e)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
Expand Down Expand Up @@ -110,8 +109,7 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
return;
}
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
task.updateState(
DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists");
task.setFailed("[" + config.getId() + "] Could not create process as one already exists");
return;
}
}
Expand Down Expand Up @@ -193,7 +191,7 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont
task.markAsCompleted();
} else {
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
task.setFailed(processContext.getFailureReason());
// Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason.
}
}
Expand Down Expand Up @@ -265,7 +263,7 @@ private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig
process.restoreState(state);
} catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e);
task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage());
task.setFailed("Failed to restore state: " + e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
Expand Down Expand Up @@ -131,7 +130,7 @@ public void testRunJob_ProcessContextAlreadyExists() {
inOrder.verify(task).getStatsHolder();
inOrder.verify(task).isStopping();
inOrder.verify(task).getAllocationId();
inOrder.verify(task).updateState(DataFrameAnalyticsState.FAILED, "[config-id] Could not create process as one already exists");
inOrder.verify(task).setFailed("[config-id] Could not create process as one already exists");
verifyNoMoreInteractions(task);
}

Expand Down