diff --git a/x-pack/plugin/ccr/qa/rest/src/test/java/org/elasticsearch/xpack/ccr/CcrRestIT.java b/x-pack/plugin/ccr/qa/rest/src/test/java/org/elasticsearch/xpack/ccr/CcrRestIT.java index 45998433d33a6..b0225fc5bf97e 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/java/org/elasticsearch/xpack/ccr/CcrRestIT.java +++ b/x-pack/plugin/ccr/qa/rest/src/test/java/org/elasticsearch/xpack/ccr/CcrRestIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; import org.junit.After; @@ -36,7 +37,7 @@ protected Settings restClientSettings() { @After public void cleanup() throws Exception { - XPackRestTestHelper.waitForPendingTasks(adminClient()); + XPackRestTestHelper.waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(ShardChangesAction.NAME)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java index 5e9fd4a386b64..da2002fd4b564 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.ESTestCase; @@ -29,7 +30,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.junit.Assert.assertEquals; public final class XPackRestTestHelper { @@ -84,34 +87,54 @@ public static void waitForMlTemplates(RestClient client) throws InterruptedExcep } /** - * Waits for pending tasks to complete + * Wait for outstanding tasks to complete. The specified admin client is used to check the outstanding tasks and this is done using + * {@link ESTestCase#assertBusy(CheckedRunnable)} to give a chance to any outstanding tasks to complete. + * + * @param adminClient the admin client + * @throws Exception if an exception is thrown while checking the outstanding tasks */ - public static void waitForPendingTasks(RestClient adminClient) throws Exception { - ESTestCase.assertBusy(() -> { + public static void waitForPendingTasks(final RestClient adminClient) throws Exception { + waitForPendingTasks(adminClient, taskName -> false); + } + + /** + * Wait for outstanding tasks to complete. The specified admin client is used to check the outstanding tasks and this is done using + * {@link ESTestCase#assertBusy(CheckedRunnable)} to give a chance to any outstanding tasks to complete. The specified filter is used + * to filter out outstanding tasks that are expected to be there. + * + * @param adminClient the admin client + * @param taskFilter predicate used to filter tasks that are expected to be there + * @throws Exception if an exception is thrown while checking the outstanding tasks + */ + public static void waitForPendingTasks(final RestClient adminClient, final Predicate taskFilter) throws Exception { + assertBusy(() -> { try { - Request request = new Request("GET", "/_cat/tasks"); + final Request request = new Request("GET", "/_cat/tasks"); request.addParameter("detailed", "true"); - Response response = adminClient.performRequest(request); - // Check to see if there are tasks still active. We exclude the - // list tasks - // actions tasks form this otherwise we will always fail + final Response response = adminClient.performRequest(request); + /* + * Check to see if there are outstanding tasks; we exclude the list task itself, and any expected outstanding tasks using + * the specified task filter. + */ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { try (BufferedReader responseReader = new BufferedReader( new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { int activeTasks = 0; String line; - StringBuilder tasksListString = new StringBuilder(); + final StringBuilder tasksListString = new StringBuilder(); while ((line = responseReader.readLine()) != null) { - if (line.startsWith(ListTasksAction.NAME) == false) { - activeTasks++; - tasksListString.append(line); - tasksListString.append('\n'); + final String taskName = line.split("\\s+")[0]; + if (taskName.startsWith(ListTasksAction.NAME) || taskFilter.test(taskName)) { + continue; } + activeTasks++; + tasksListString.append(line); + tasksListString.append('\n'); } assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks); } } - } catch (IOException e) { + } catch (final IOException e) { throw new AssertionError("Error getting active tasks list", e); } });