Skip to content

Commit 044a4e1

Browse files
authored
[ML] Add reason to DataFrameAnalyticsTask setFailed log message (#52659) (#52707)
1 parent 5e48811 commit 044a4e1

File tree

5 files changed

+23
-22
lines changed

5 files changed

+23
-22
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public final class Messages {
6060
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics";
6161
public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics";
6262
public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics";
63-
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE = "Successfully updated analytics task state to [{0}]";
63+
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON =
64+
"Updated analytics task state to [{0}] with reason [{1}]";
6465
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
6566
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
6667
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,12 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current
9393
executeJobInMiddleOfReindexing(task, config);
9494
break;
9595
default:
96-
task.updateState(DataFrameAnalyticsState.FAILED, "Cannot execute analytics task [" + config.getId() +
96+
task.setFailed("Cannot execute analytics task [" + config.getId() +
9797
"] as it is in unknown state [" + currentState + "]. Must be one of [STARTED, REINDEXING, ANALYZING]");
9898
}
9999

100100
},
101-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
101+
error -> task.setFailed(error.getMessage())
102102
);
103103

104104
// Retrieve configuration
@@ -122,21 +122,21 @@ private void executeStartingJob(DataFrameAnalyticsTask task, DataFrameAnalyticsC
122122
case FIRST_TIME:
123123
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
124124
updatedTask -> reindexDataframeAndStartAnalysis(task, config),
125-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
125+
error -> task.setFailed(error.getMessage())
126126
));
127127
break;
128128
case RESUMING_REINDEXING:
129129
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
130130
updatedTask -> executeJobInMiddleOfReindexing(task, config),
131-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
131+
error -> task.setFailed(error.getMessage())
132132
));
133133
break;
134134
case RESUMING_ANALYZING:
135135
startAnalytics(task, config);
136136
break;
137137
case FINISHED:
138138
default:
139-
task.updateState(DataFrameAnalyticsState.FAILED, "Unexpected starting state [" + startingState + "]");
139+
task.setFailed("Unexpected starting state [" + startingState + "]");
140140
}
141141
}
142142

@@ -151,7 +151,7 @@ private void executeJobInMiddleOfReindexing(DataFrameAnalyticsTask task, DataFra
151151
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
152152
reindexDataframeAndStartAnalysis(task, config);
153153
} else {
154-
task.updateState(DataFrameAnalyticsState.FAILED, e.getMessage());
154+
task.setFailed(e.getMessage());
155155
}
156156
}
157157
));
@@ -178,7 +178,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
178178
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
179179
startAnalytics(task, config);
180180
},
181-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
181+
error -> task.setFailed(error.getMessage())
182182
);
183183

184184
// Reindex
@@ -244,12 +244,12 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
244244
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
245245
// Task has stopped
246246
} else {
247-
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
247+
task.setFailed(error.getMessage());
248248
}
249249
}
250250
));
251251
},
252-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
252+
error -> task.setFailed(error.getMessage())
253253
);
254254

255255
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -177,17 +177,20 @@ private void cancelReindexingTask(String reason, TimeValue timeout) {
177177
}
178178
}
179179

180-
public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
181-
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
180+
public void setFailed(String reason) {
181+
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
182+
getAllocationId(), reason);
182183
updatePersistentTaskState(
183184
newTaskState,
184185
ActionListener.wrap(
185186
updatedTask -> {
186-
auditor.info(getParams().getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE, state));
187-
LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state);
187+
String message = Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON,
188+
DataFrameAnalyticsState.FAILED, reason);
189+
auditor.info(getParams().getId(), message);
190+
LOGGER.info("[{}] {}", getParams().getId(), message);
188191
},
189192
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]",
190-
getParams().getId(), state, reason), e)
193+
getParams().getId(), DataFrameAnalyticsState.FAILED, reason), e)
191194
)
192195
);
193196
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.threadpool.ThreadPool;
2424
import org.elasticsearch.xpack.core.ClientHelper;
2525
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
26-
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
2726
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
2827
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
2928
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -110,8 +109,7 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
110109
return;
111110
}
112111
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
113-
task.updateState(
114-
DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists");
112+
task.setFailed("[" + config.getId() + "] Could not create process as one already exists");
115113
return;
116114
}
117115
}
@@ -193,7 +191,7 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont
193191
task.markAsCompleted();
194192
} else {
195193
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
196-
task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
194+
task.setFailed(processContext.getFailureReason());
197195
// Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason.
198196
}
199197
}
@@ -265,7 +263,7 @@ private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig
265263
process.restoreState(state);
266264
} catch (Exception e) {
267265
LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e);
268-
task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage());
266+
task.setFailed("Failed to restore state: " + e.getMessage());
269267
}
270268
}
271269

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.threadpool.ThreadPool;
1515
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
1616
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
17-
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
1817
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests;
1918
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
2019
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
@@ -131,7 +130,7 @@ public void testRunJob_ProcessContextAlreadyExists() {
131130
inOrder.verify(task).getStatsHolder();
132131
inOrder.verify(task).isStopping();
133132
inOrder.verify(task).getAllocationId();
134-
inOrder.verify(task).updateState(DataFrameAnalyticsState.FAILED, "[config-id] Could not create process as one already exists");
133+
inOrder.verify(task).setFailed("[config-id] Could not create process as one already exists");
135134
verifyNoMoreInteractions(task);
136135
}
137136

0 commit comments

Comments
 (0)