@@ -231,10 +231,12 @@ public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() thro
231
231
PostDataAction .Response postDataResponse = client ().execute (PostDataAction .INSTANCE , postDataRequest ).actionGet ();
232
232
assertEquals (1L , postDataResponse .getDataCounts ().getInputRecordCount ());
233
233
234
- // Confirm the job state is now failed
235
- jobStatsRequest = new GetJobsStatsAction .Request (jobId );
236
- jobStatsResponse = client ().execute (GetJobsStatsAction .INSTANCE , jobStatsRequest ).actionGet ();
237
- assertEquals (JobState .FAILED , jobStatsResponse .getResponse ().results ().get (0 ).getState ());
234
+ // Confirm the job state is now failed - this may take a while to update in cluster state
235
+ assertBusy (() -> {
236
+ GetJobsStatsAction .Request jobStatsRequest2 = new GetJobsStatsAction .Request (jobId );
237
+ GetJobsStatsAction .Response jobStatsResponse2 = client ().execute (GetJobsStatsAction .INSTANCE , jobStatsRequest2 ).actionGet ();
238
+ assertEquals (JobState .FAILED , jobStatsResponse2 .getResponse ().results ().get (0 ).getState ());
239
+ });
238
240
239
241
// It's impossible to reliably get the datafeed into a stopping state at the point when the ML node is removed from the cluster
240
242
// using externally accessible actions. The only way this situation could occur in reality is through extremely unfortunate
@@ -248,11 +250,13 @@ public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() thro
248
250
client ().execute (UpdatePersistentTaskStatusAction .INSTANCE , updatePersistentTaskStatusRequest ).actionGet ();
249
251
assertNotNull (updatePersistentTaskStatusResponse .getTask ());
250
252
251
- // Confirm the datafeed state is now stopping
252
- GetDatafeedsStatsAction .Request datafeedStatsRequest = new GetDatafeedsStatsAction .Request (datafeedId );
253
- GetDatafeedsStatsAction .Response datafeedStatsResponse =
254
- client ().execute (GetDatafeedsStatsAction .INSTANCE , datafeedStatsRequest ).actionGet ();
255
- assertEquals (DatafeedState .STOPPING , datafeedStatsResponse .getResponse ().results ().get (0 ).getDatafeedState ());
253
+ // Confirm the datafeed state is now stopping - this may take a while to update in cluster state
254
+ assertBusy (() -> {
255
+ GetDatafeedsStatsAction .Request datafeedStatsRequest = new GetDatafeedsStatsAction .Request (datafeedId );
256
+ GetDatafeedsStatsAction .Response datafeedStatsResponse =
257
+ client ().execute (GetDatafeedsStatsAction .INSTANCE , datafeedStatsRequest ).actionGet ();
258
+ assertEquals (DatafeedState .STOPPING , datafeedStatsResponse .getResponse ().results ().get (0 ).getDatafeedState ());
259
+ });
256
260
257
261
// Stop the node running the failed job/stopping datafeed
258
262
ensureGreen (); // replicas must be assigned, otherwise we could lose a whole index
@@ -265,10 +269,12 @@ public void testCloseUnassignedFailedJobAndStopUnassignedStoppingDatafeed() thro
265
269
StopDatafeedAction .Response stopDatafeedResponse = client ().execute (StopDatafeedAction .INSTANCE , stopDatafeedRequest ).actionGet ();
266
270
assertTrue (stopDatafeedResponse .isStopped ());
267
271
268
- // Confirm the datafeed state is now stopped
269
- datafeedStatsRequest = new GetDatafeedsStatsAction .Request (datafeedId );
270
- datafeedStatsResponse = client ().execute (GetDatafeedsStatsAction .INSTANCE , datafeedStatsRequest ).actionGet ();
271
- assertEquals (DatafeedState .STOPPED , datafeedStatsResponse .getResponse ().results ().get (0 ).getDatafeedState ());
272
+ // Confirm the datafeed state is now stopped - shouldn't need a busy check here as
273
+ // the stop endpoint shouldn't return until its effects are externally visible
274
+ GetDatafeedsStatsAction .Request datafeedStatsRequest2 = new GetDatafeedsStatsAction .Request (datafeedId );
275
+ GetDatafeedsStatsAction .Response datafeedStatsResponse2 =
276
+ client ().execute (GetDatafeedsStatsAction .INSTANCE , datafeedStatsRequest2 ).actionGet ();
277
+ assertEquals (DatafeedState .STOPPED , datafeedStatsResponse2 .getResponse ().results ().get (0 ).getDatafeedState ());
272
278
273
279
// We should be allowed to force stop the unassigned failed job
274
280
CloseJobAction .Request closeJobRequest = new CloseJobAction .Request (jobId );
0 commit comments