diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index 86f2e1f0deef0..661d09407cab8 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -55,8 +55,8 @@ import java.util.Iterator; import java.util.Map; -import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; /** * Service that can store task results. @@ -73,7 +73,7 @@ public class TaskResultsService { public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version"; - public static final int TASK_RESULT_MAPPING_VERSION = 2; + public static final int TASK_RESULT_MAPPING_VERSION = 3; /** * The backoff policy to use when saving a task result fails. The total wait diff --git a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json index 435e6c5759cbb..b8ef2dcd56216 100644 --- a/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json +++ b/server/src/main/resources/org/elasticsearch/tasks/task-index-mapping.json @@ -19,7 +19,7 @@ "id": { "type": "long" }, - "parent_id": { + "parent_task_id": { "type": "keyword" }, "node": { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index b010564c1b9b4..252b97a978669 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -138,14 +138,14 @@ public void testTaskCounts() { } public void testMasterNodeOperationTasks() { - registerTaskManageListeners(ClusterHealthAction.NAME); + registerTaskManagerListeners(ClusterHealthAction.NAME); // First run the health on the master node - should produce only one task on the master node internalCluster().masterClient().admin().cluster().prepareHealth().get(); assertEquals(1, numberOfEvents(ClusterHealthAction.NAME, Tuple::v1)); // counting only registration events assertEquals(1, numberOfEvents(ClusterHealthAction.NAME, event -> event.v1() == false)); // counting only unregistration events - resetTaskManageListeners(ClusterHealthAction.NAME); + resetTaskManagerListeners(ClusterHealthAction.NAME); // Now run the health on a non-master node - should produce one task on master and one task on another node internalCluster().nonMasterClient().admin().cluster().prepareHealth().get(); @@ -162,8 +162,8 @@ public void testMasterNodeOperationTasks() { } public void testTransportReplicationAllShardsTasks() { - registerTaskManageListeners(ValidateQueryAction.NAME); // main task - registerTaskManageListeners(ValidateQueryAction.NAME + "[s]"); // shard + registerTaskManagerListeners(ValidateQueryAction.NAME); // main task + registerTaskManagerListeners(ValidateQueryAction.NAME + "[s]"); // shard // level // tasks createIndex("test"); @@ -181,8 +181,8 @@ public void testTransportReplicationAllShardsTasks() { } public void testTransportBroadcastByNodeTasks() { - registerTaskManageListeners(UpgradeAction.NAME); // main task - registerTaskManageListeners(UpgradeAction.NAME + "[n]"); // node level tasks + registerTaskManagerListeners(UpgradeAction.NAME); // main task + registerTaskManagerListeners(UpgradeAction.NAME + "[n]"); // node level tasks createIndex("test"); ensureGreen("test"); // Make sure all shards are allocated client().admin().indices().prepareUpgrade("test").get(); @@ -197,8 +197,8 @@ public void testTransportBroadcastByNodeTasks() { } public void testTransportReplicationSingleShardTasks() { - registerTaskManageListeners(ValidateQueryAction.NAME); // main task - registerTaskManageListeners(ValidateQueryAction.NAME + "[s]"); // shard level tasks + registerTaskManagerListeners(ValidateQueryAction.NAME); // main task + registerTaskManagerListeners(ValidateQueryAction.NAME + "[s]"); // shard level tasks createIndex("test"); ensureGreen("test"); // Make sure all shards are allocated client().admin().indices().prepareValidateQuery("test").get(); @@ -213,9 +213,9 @@ public void testTransportReplicationSingleShardTasks() { public void testTransportBroadcastReplicationTasks() { - registerTaskManageListeners(RefreshAction.NAME); // main task - registerTaskManageListeners(RefreshAction.NAME + "[s]"); // shard level tasks - registerTaskManageListeners(RefreshAction.NAME + "[s][*]"); // primary and replica shard tasks + registerTaskManagerListeners(RefreshAction.NAME); // main task + registerTaskManagerListeners(RefreshAction.NAME + "[s]"); // shard level tasks + registerTaskManagerListeners(RefreshAction.NAME + "[s][*]"); // primary and replica shard tasks createIndex("test"); ensureGreen("test"); // Make sure all shards are allocated client().admin().indices().prepareRefresh("test").get(); @@ -287,10 +287,10 @@ public void testTransportBroadcastReplicationTasks() { } public void testTransportBulkTasks() { - registerTaskManageListeners(BulkAction.NAME); // main task - registerTaskManageListeners(BulkAction.NAME + "[s]"); // shard task - registerTaskManageListeners(BulkAction.NAME + "[s][p]"); // shard task on primary - registerTaskManageListeners(BulkAction.NAME + "[s][r]"); // shard task on replica + registerTaskManagerListeners(BulkAction.NAME); // main task + registerTaskManagerListeners(BulkAction.NAME + "[s]"); // shard task + registerTaskManagerListeners(BulkAction.NAME + "[s][p]"); // shard task on primary + registerTaskManagerListeners(BulkAction.NAME + "[s][r]"); // shard task on replica createIndex("test"); ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks // ensures the mapping is available on all nodes so we won't retry the request (in case replicas don't have the right mapping). @@ -340,10 +340,9 @@ public void testTransportBulkTasks() { assertParentTask(findEvents(BulkAction.NAME + "[s][r]", Tuple::v1), shardTask); } - public void testSearchTaskDescriptions() { - registerTaskManageListeners(SearchAction.NAME); // main task - registerTaskManageListeners(SearchAction.NAME + "[*]"); // shard task + registerTaskManagerListeners(SearchAction.NAME); // main task + registerTaskManagerListeners(SearchAction.NAME + "[*]"); // shard task createIndex("test"); ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}", XContentType.JSON) @@ -489,8 +488,9 @@ public void waitForTaskCompletion(Task task) { public void testTasksCancellation() throws Exception { // Start blocking test task // Get real client (the plugin is not registered on transport nodes) - ActionFuture future = new TestTaskPlugin.NodesRequestBuilder(client(), - TestTaskPlugin.TestTaskAction.INSTANCE).execute(); + TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test"); + ActionFuture future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request); + logger.info("--> started test tasks"); // Wait for the task to start on all nodes @@ -511,8 +511,8 @@ public void testTasksCancellation() throws Exception { public void testTasksUnblocking() throws Exception { // Start blocking test task - ActionFuture future = - new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE).execute(); + TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test"); + ActionFuture future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request); // Wait for the task to start on all nodes assertBusy(() -> assertEquals(internalCluster().size(), client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); @@ -575,8 +575,9 @@ public void testGetTaskWaitForCompletionWithStoringResult() throws Exception { private void waitForCompletionTestCase(boolean storeResult, Function> wait, Consumer validator) throws Exception { // Start blocking test task - ActionFuture future = new TestTaskPlugin.NodesRequestBuilder(client(), - TestTaskPlugin.TestTaskAction.INSTANCE).setShouldStoreResult(storeResult).execute(); + TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test"); + request.setShouldStoreResult(storeResult); + ActionFuture future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request); ActionFuture waitResponseFuture; TaskId taskId; @@ -649,8 +650,8 @@ public void testGetTaskWaitForTimeout() throws Exception { */ private void waitForTimeoutTestCase(Function> wait) throws Exception { // Start blocking test task - ActionFuture future = new TestTaskPlugin.NodesRequestBuilder(client(), - TestTaskPlugin.TestTaskAction.INSTANCE).execute(); + TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test"); + ActionFuture future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request); try { TaskId taskId = waitForTestTaskStartOnAllNodes(); @@ -717,12 +718,17 @@ public void testTasksWaitForAllTask() throws Exception { assertThat(response.getTasks().size(), greaterThanOrEqualTo(1)); } - public void testTaskStoringSuccesfulResult() throws Exception { - registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process + public void testTaskStoringSuccessfulResult() throws Exception { + registerTaskManagerListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process // Start non-blocking test task - new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE) - .setShouldStoreResult(true).setShouldBlock(false).get(); + TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test"); + request.setShouldStoreResult(true); + request.setShouldBlock(false); + TaskId parentTaskId = new TaskId("parent_node", randomLong()); + request.setParentTask(parentTaskId); + + client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request).get(); List events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1); @@ -736,6 +742,7 @@ public void testTaskStoringSuccesfulResult() throws Exception { assertNull(taskResult.getError()); assertEquals(taskInfo.getTaskId(), taskResult.getTask().getTaskId()); + assertEquals(taskInfo.getParentTaskId(), taskResult.getTask().getParentTaskId()); assertEquals(taskInfo.getType(), taskResult.getTask().getType()); assertEquals(taskInfo.getAction(), taskResult.getTask().getAction()); assertEquals(taskInfo.getDescription(), taskResult.getTask().getDescription()); @@ -764,14 +771,16 @@ public void testTaskStoringSuccesfulResult() throws Exception { } public void testTaskStoringFailureResult() throws Exception { - registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process + registerTaskManagerListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process + + TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test"); + request.setShouldFail(true); + request.setShouldStoreResult(true); + request.setShouldBlock(false); // Start non-blocking test task that should fail assertThrows( - new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE) - .setShouldFail(true) - .setShouldStoreResult(true) - .setShouldBlock(false), + client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request), IllegalStateException.class ); @@ -852,7 +861,7 @@ public void tearDown() throws Exception { /** * Registers recording task event listeners with the given action mask on all nodes */ - private void registerTaskManageListeners(String actionMasks) { + private void registerTaskManagerListeners(String actionMasks) { for (String nodeName : internalCluster().getNodeNames()) { DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(",")); @@ -865,7 +874,7 @@ private void registerTaskManageListeners(String actionMasks) { /** * Resets all recording task event listeners with the given action mask on all nodes */ - private void resetTaskManageListeners(String actionMasks) { + private void resetTaskManagerListeners(String actionMasks) { for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { entry.getValue().reset(); @@ -919,11 +928,12 @@ private void assertParentTask(TaskInfo task, TaskInfo parentTask) { assertEquals(parentTask.getId(), task.getParentTaskId().getId()); } - private ResourceNotFoundException expectNotFound(ThrowingRunnable r) { + private void expectNotFound(ThrowingRunnable r) { Exception e = expectThrows(Exception.class, r); ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class); - if (notFound == null) throw new RuntimeException("Expected ResourceNotFoundException", e); - return notFound; + if (notFound == null) { + throw new AssertionError("Expected " + ResourceNotFoundException.class.getSimpleName(), e); + } } /** diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 8797dd3962c82..5888e81d03ceb 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -31,7 +31,6 @@ import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.action.support.nodes.BaseNodesResponse; -import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder; import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; @@ -138,11 +137,11 @@ public NodeResponse(DiscoveryNode node) { public static class NodesResponse extends BaseNodesResponse implements ToXContentFragment { - public NodesResponse(StreamInput in) throws IOException { + NodesResponse(StreamInput in) throws IOException { super(in); } - public NodesResponse(ClusterName clusterName, List nodes, List failures) { + NodesResponse(ClusterName clusterName, List nodes, List failures) { super(clusterName, nodes, failures); } @@ -168,8 +167,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } public static class NodeRequest extends BaseNodeRequest { - protected String requestName; - protected boolean shouldBlock; + protected final String requestName; + protected final boolean shouldBlock; public NodeRequest(StreamInput in) throws IOException { super(in); @@ -214,7 +213,7 @@ public static class NodesRequest extends BaseNodesRequest { shouldFail = in.readBoolean(); } - public NodesRequest(String requestName, String... nodesIds) { + NodesRequest(String requestName, String... nodesIds) { super(nodesIds); this.requestName = requestName; } @@ -330,37 +329,13 @@ private TestTaskAction() { } } - public static class NodesRequestBuilder extends NodesOperationRequestBuilder { - - protected NodesRequestBuilder(ElasticsearchClient client, ActionType action) { - super(client, action, new NodesRequest("test")); - } - - - public NodesRequestBuilder setShouldStoreResult(boolean shouldStoreResult) { - request().setShouldStoreResult(shouldStoreResult); - return this; - } - - public NodesRequestBuilder setShouldBlock(boolean shouldBlock) { - request().setShouldBlock(shouldBlock); - return this; - } - - public NodesRequestBuilder setShouldFail(boolean shouldFail) { - request().setShouldFail(shouldFail); - return this; - } - } - - public static class UnblockTestTaskResponse implements Writeable { - public UnblockTestTaskResponse() { + UnblockTestTaskResponse() { } - public UnblockTestTaskResponse(StreamInput in) { + UnblockTestTaskResponse(StreamInput in) { } @Override @@ -387,13 +362,13 @@ public static class UnblockTestTasksResponse extends BaseTasksResponse { private List tasks; - public UnblockTestTasksResponse(List tasks, List taskFailures, List tasks, List taskFailures, List nodeFailures) { super(taskFailures, nodeFailures); this.tasks = tasks == null ? Collections.emptyList() : List.copyOf(tasks); } - public UnblockTestTasksResponse(StreamInput in) throws IOException { + UnblockTestTasksResponse(StreamInput in) throws IOException { super(in); int taskCount = in.readVInt(); List builder = new ArrayList<>();