Skip to content

Commit ad55bdd

Browse files
[7.x][ML] Avoid NPE when node load is calculated on job assignment (elastic#49186)
This commit fixes a NPE problem as reported in elastic#49150. But this problem uncovered that we never added proper handling of state for data frame analytics tasks. In this commit we improve the `MlTasks.getDataFrameAnalyticsState` method to handle null tasks and state tasks properly. Closes elastic#49150 Backport of elastic#49186
1 parent 09a9ec4 commit ad55bdd

File tree

4 files changed

+186
-9
lines changed

4 files changed

+186
-9
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,31 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
143143

144144
public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) {
145145
PersistentTasksCustomMetaData.PersistentTask<?> task = getDataFrameAnalyticsTask(analyticsId, tasks);
146-
if (task != null) {
147-
DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) task.getState();
148-
if (taskState == null) {
146+
return getDataFrameAnalyticsState(task);
147+
}
148+
149+
public static DataFrameAnalyticsState getDataFrameAnalyticsState(@Nullable PersistentTasksCustomMetaData.PersistentTask<?> task) {
150+
if (task == null) {
151+
return DataFrameAnalyticsState.STOPPED;
152+
}
153+
DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) task.getState();
154+
if (taskState == null) {
155+
return DataFrameAnalyticsState.STARTING;
156+
}
157+
158+
DataFrameAnalyticsState state = taskState.getState();
159+
if (taskState.isStatusStale(task)) {
160+
if (state == DataFrameAnalyticsState.STOPPING) {
161+
// previous executor node failed while the job was stopping - it won't
162+
// be restarted on another node, so consider it STOPPED for reassignment purposes
163+
return DataFrameAnalyticsState.STOPPED;
164+
}
165+
if (state != DataFrameAnalyticsState.FAILED) {
166+
// we are relocating at the moment
149167
return DataFrameAnalyticsState.STARTING;
150168
}
151-
return taskState.getState();
152169
}
153-
return DataFrameAnalyticsState.STOPPED;
170+
return state;
154171
}
155172

156173
/**

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,23 @@
1313
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1414
import org.elasticsearch.test.ESTestCase;
1515
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
16+
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
1617
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
1718
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
19+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
20+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
1821
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1922
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
2023

2124
import java.net.InetAddress;
25+
import java.util.Collections;
2226

2327
import static org.hamcrest.Matchers.containsInAnyOrder;
2428
import static org.hamcrest.Matchers.empty;
2529
import static org.hamcrest.Matchers.equalTo;
2630

2731
public class MlTasksTests extends ESTestCase {
32+
2833
public void testGetJobState() {
2934
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
3035
// A missing task is a closed job
@@ -168,4 +173,133 @@ public void testDataFrameAnalyticsTaskIds() {
168173
assertThat(taskId, equalTo("data_frame_analytics-foo"));
169174
assertThat(MlTasks.dataFrameAnalyticsIdFromTaskId(taskId), equalTo("foo"));
170175
}
176+
177+
public void testGetDataFrameAnalyticsState_GivenNullTask() {
178+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(null);
179+
assertThat(state, equalTo(DataFrameAnalyticsState.STOPPED));
180+
}
181+
182+
public void testGetDataFrameAnalyticsState_GivenTaskWithNullState() {
183+
String jobId = "foo";
184+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node", null, false);
185+
186+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
187+
188+
assertThat(state, equalTo(DataFrameAnalyticsState.STARTING));
189+
}
190+
191+
public void testGetDataFrameAnalyticsState_GivenTaskWithStartedState() {
192+
String jobId = "foo";
193+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
194+
DataFrameAnalyticsState.STARTED, false);
195+
196+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
197+
198+
assertThat(state, equalTo(DataFrameAnalyticsState.STARTED));
199+
}
200+
201+
public void testGetDataFrameAnalyticsState_GivenStaleTaskWithStartedState() {
202+
String jobId = "foo";
203+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
204+
DataFrameAnalyticsState.STARTED, true);
205+
206+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
207+
208+
assertThat(state, equalTo(DataFrameAnalyticsState.STARTING));
209+
}
210+
211+
public void testGetDataFrameAnalyticsState_GivenTaskWithReindexingState() {
212+
String jobId = "foo";
213+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
214+
DataFrameAnalyticsState.REINDEXING, false);
215+
216+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
217+
218+
assertThat(state, equalTo(DataFrameAnalyticsState.REINDEXING));
219+
}
220+
221+
public void testGetDataFrameAnalyticsState_GivenStaleTaskWithReindexingState() {
222+
String jobId = "foo";
223+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
224+
DataFrameAnalyticsState.REINDEXING, true);
225+
226+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
227+
228+
assertThat(state, equalTo(DataFrameAnalyticsState.STARTING));
229+
}
230+
231+
public void testGetDataFrameAnalyticsState_GivenTaskWithAnalyzingState() {
232+
String jobId = "foo";
233+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
234+
DataFrameAnalyticsState.ANALYZING, false);
235+
236+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
237+
238+
assertThat(state, equalTo(DataFrameAnalyticsState.ANALYZING));
239+
}
240+
241+
public void testGetDataFrameAnalyticsState_GivenStaleTaskWithAnalyzingState() {
242+
String jobId = "foo";
243+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
244+
DataFrameAnalyticsState.ANALYZING, true);
245+
246+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
247+
248+
assertThat(state, equalTo(DataFrameAnalyticsState.STARTING));
249+
}
250+
251+
public void testGetDataFrameAnalyticsState_GivenTaskWithStoppingState() {
252+
String jobId = "foo";
253+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
254+
DataFrameAnalyticsState.STOPPING, false);
255+
256+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
257+
258+
assertThat(state, equalTo(DataFrameAnalyticsState.STOPPING));
259+
}
260+
261+
public void testGetDataFrameAnalyticsState_GivenStaleTaskWithStoppingState() {
262+
String jobId = "foo";
263+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
264+
DataFrameAnalyticsState.STOPPING, true);
265+
266+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
267+
268+
assertThat(state, equalTo(DataFrameAnalyticsState.STOPPED));
269+
}
270+
271+
public void testGetDataFrameAnalyticsState_GivenTaskWithFailedState() {
272+
String jobId = "foo";
273+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
274+
DataFrameAnalyticsState.FAILED, false);
275+
276+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
277+
278+
assertThat(state, equalTo(DataFrameAnalyticsState.FAILED));
279+
}
280+
281+
public void testGetDataFrameAnalyticsState_GivenStaleTaskWithFailedState() {
282+
String jobId = "foo";
283+
PersistentTasksCustomMetaData.PersistentTask<?> task = createDataFrameAnalyticsTask(jobId, "test_node",
284+
DataFrameAnalyticsState.FAILED, true);
285+
286+
DataFrameAnalyticsState state = MlTasks.getDataFrameAnalyticsState(task);
287+
288+
assertThat(state, equalTo(DataFrameAnalyticsState.FAILED));
289+
}
290+
291+
private static PersistentTasksCustomMetaData.PersistentTask<?> createDataFrameAnalyticsTask(String jobId, String nodeId,
292+
DataFrameAnalyticsState state,
293+
boolean isStale) {
294+
PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.builder();
295+
builder.addTask(MlTasks.dataFrameAnalyticsTaskId(jobId), MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
296+
new StartDataFrameAnalyticsAction.TaskParams(jobId, Version.CURRENT, Collections.emptyList(), false),
297+
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
298+
if (state != null) {
299+
builder.updateTaskState(MlTasks.dataFrameAnalyticsTaskId(jobId),
300+
new DataFrameAnalyticsTaskState(state, builder.getLastAllocationId() - (isStale ? 1 : 0), null));
301+
}
302+
PersistentTasksCustomMetaData tasks = builder.build();
303+
return tasks.getTask(MlTasks.dataFrameAnalyticsTaskId(jobId));
304+
}
171305
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
1717
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
1818
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
19-
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
2019
import org.elasticsearch.xpack.core.ml.job.config.JobState;
2120
import org.elasticsearch.xpack.ml.MachineLearning;
2221
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
@@ -269,7 +268,7 @@ private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTa
269268
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> assignedAnalyticsTasks = persistentTasks.findTasks(
270269
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
271270
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedAnalyticsTasks) {
272-
DataFrameAnalyticsState dataFrameAnalyticsState = ((DataFrameAnalyticsTaskState) assignedTask.getState()).getState();
271+
DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask);
273272

274273
// Don't count stopped and failed df-analytics tasks as they don't consume native memory
275274
if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,27 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi
200200
+ currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
201201
}
202202

203+
public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_givenTaskHasNullState() {
204+
int numNodes = randomIntBetween(1, 10);
205+
int maxRunningJobsPerNode = 10;
206+
int maxMachineMemoryPercent = 30;
207+
208+
Map<String, String> nodeAttr = new HashMap<>();
209+
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
210+
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1");
211+
212+
ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 1, JobState.OPENED, null);
213+
214+
String dataFrameAnalyticsId = "data_frame_analytics_id_new";
215+
216+
JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId,
217+
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
218+
node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, dataFrameAnalyticsId));
219+
PersistentTasksCustomMetaData.Assignment result =
220+
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
221+
assertNotNull(result.getExecutorNode());
222+
}
223+
203224
public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemoryLimiting() {
204225
int numNodes = randomIntBetween(1, 10);
205226
int maxRunningJobsPerNode = randomIntBetween(1, 100);
@@ -579,6 +600,12 @@ public void testConsiderLazyAssignmentWithLazyNodes() {
579600

580601
private ClusterState.Builder fillNodesWithRunningJobs(Map<String, String> nodeAttr, int numNodes, int numRunningJobsPerNode) {
581602

603+
return fillNodesWithRunningJobs(nodeAttr, numNodes, numRunningJobsPerNode, JobState.OPENED, DataFrameAnalyticsState.STARTED);
604+
}
605+
606+
private ClusterState.Builder fillNodesWithRunningJobs(Map<String, String> nodeAttr, int numNodes, int numRunningJobsPerNode,
607+
JobState anomalyDetectionJobState, DataFrameAnalyticsState dfAnalyticsJobState) {
608+
582609
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
583610
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
584611
String[] jobIds = new String[numNodes * numRunningJobsPerNode];
@@ -591,10 +618,10 @@ private ClusterState.Builder fillNodesWithRunningJobs(Map<String, String> nodeAt
591618
// Both anomaly detector jobs and data frame analytics jobs should count towards the limit
592619
if (randomBoolean()) {
593620
jobIds[id] = "job_id" + id;
594-
TransportOpenJobActionTests.addJobTask(jobIds[id], nodeId, JobState.OPENED, tasksBuilder);
621+
TransportOpenJobActionTests.addJobTask(jobIds[id], nodeId, anomalyDetectionJobState, tasksBuilder);
595622
} else {
596623
jobIds[id] = "data_frame_analytics_id" + id;
597-
addDataFrameAnalyticsJobTask(jobIds[id], nodeId, DataFrameAnalyticsState.STARTED, tasksBuilder);
624+
addDataFrameAnalyticsJobTask(jobIds[id], nodeId, dfAnalyticsJobState, tasksBuilder);
598625
}
599626
}
600627
}

0 commit comments

Comments
 (0)