Skip to content

Commit 59944a0

Browse files
authored
[ML] Handle failed datafeed in MlDistributedFailureIT (#52631)
1 parent cd069a8 commit 59944a0

File tree

1 file changed

+10
-3
lines changed

1 file changed

+10
-3
lines changed

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,20 +245,27 @@ public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() thro
245245
// stopping.
246246
PersistentTasksCustomMetaData tasks = clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
247247
PersistentTasksCustomMetaData.PersistentTask<?> task = MlTasks.getDatafeedTask(datafeedId, tasks);
248+
249+
// It is possible that the datafeed has already detected the job failure and
250+
// terminated itself. In this happens there is no persistent task to stop
251+
assumeFalse("The datafeed task is null most likely because the datafeed detected the job had failed. " +
252+
"This is expected to happen extremely rarely but the test cannot continue in these circumstances.", task == null);
253+
248254
UpdatePersistentTaskStatusAction.Request updatePersistentTaskStatusRequest =
249-
new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING);
255+
new UpdatePersistentTaskStatusAction.Request(task.getId(), task.getAllocationId(), DatafeedState.STOPPING);
250256
PersistentTaskResponse updatePersistentTaskStatusResponse =
251-
client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet();
257+
client().execute(UpdatePersistentTaskStatusAction.INSTANCE, updatePersistentTaskStatusRequest).actionGet();
252258
assertNotNull(updatePersistentTaskStatusResponse.getTask());
253259

254260
// Confirm the datafeed state is now stopping - this may take a while to update in cluster state
255261
assertBusy(() -> {
256262
GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId);
257263
GetDatafeedsStatsAction.Response datafeedStatsResponse =
258-
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
264+
client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet();
259265
assertEquals(DatafeedState.STOPPING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState());
260266
});
261267

268+
262269
// Stop the node running the failed job/stopping datafeed
263270
ensureGreen(); // replicas must be assigned, otherwise we could lose a whole index
264271
internalCluster().stopRandomNode(settings -> jobNode.getName().equals(settings.get("node.name")));

0 commit comments

Comments
 (0)