Skip to content

Commit c4ed1cd

Browse files
author
Christoph Büscher
authored
Rest High Level client: Add List Tasks (#29546)
This change adds a `listTasks` method to the high level java ClusterClient which allows listing running tasks through the task management API. Related to #27205
1 parent 554fad0 commit c4ed1cd

File tree

19 files changed

+806
-128
lines changed

19 files changed

+806
-128
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java

+24
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.http.Header;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
25+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2426
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2527
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
2628

@@ -63,4 +65,26 @@ public void putSettingsAsync(ClusterUpdateSettingsRequest clusterUpdateSettingsR
6365
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings,
6466
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
6567
}
68+
69+
/**
70+
* Get current tasks using the Task Management API
71+
* <p>
72+
* See
73+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
74+
*/
75+
public ListTasksResponse listTasks(ListTasksRequest request, Header... headers) throws IOException {
76+
return restHighLevelClient.performRequestAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
77+
emptySet(), headers);
78+
}
79+
80+
/**
81+
* Asynchronously get current tasks using the Task Management API
82+
* <p>
83+
* See
84+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
85+
*/
86+
public void listTasksAsync(ListTasksRequest request, ActionListener<ListTasksResponse> listener, Header... headers) {
87+
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
88+
listener, emptySet(), headers);
89+
}
6690
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

+60
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.http.entity.ContentType;
3030
import org.apache.lucene.util.BytesRef;
3131
import org.elasticsearch.action.DocWriteRequest;
32+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
3233
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
3334
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
3435
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@@ -79,6 +80,7 @@
7980
import org.elasticsearch.index.rankeval.RankEvalRequest;
8081
import org.elasticsearch.rest.action.search.RestSearchAction;
8182
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
83+
import org.elasticsearch.tasks.TaskId;
8284

8385
import java.io.ByteArrayOutputStream;
8486
import java.io.IOException;
@@ -580,6 +582,22 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
580582
return request;
581583
}
582584

585+
static Request listTasks(ListTasksRequest listTaskRequest) {
586+
if (listTaskRequest.getTaskId() != null && listTaskRequest.getTaskId().isSet()) {
587+
throw new IllegalArgumentException("TaskId cannot be used for list tasks request");
588+
}
589+
Request request = new Request(HttpGet.METHOD_NAME, "/_tasks");
590+
Params params = new Params(request);
591+
params.withTimeout(listTaskRequest.getTimeout())
592+
.withDetailed(listTaskRequest.getDetailed())
593+
.withWaitForCompletion(listTaskRequest.getWaitForCompletion())
594+
.withParentTaskId(listTaskRequest.getParentTaskId())
595+
.withNodes(listTaskRequest.getNodes())
596+
.withActions(listTaskRequest.getActions())
597+
.putParam("group_by", "none");
598+
return request;
599+
}
600+
583601
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
584602
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
585603
.addPathPart(rolloverRequest.getNewIndexName()).build();
@@ -880,6 +898,48 @@ Params withPreserveExisting(boolean preserveExisting) {
880898
}
881899
return this;
882900
}
901+
902+
Params withDetailed(boolean detailed) {
903+
if (detailed) {
904+
return putParam("detailed", Boolean.TRUE.toString());
905+
}
906+
return this;
907+
}
908+
909+
Params withWaitForCompletion(boolean waitForCompletion) {
910+
if (waitForCompletion) {
911+
return putParam("wait_for_completion", Boolean.TRUE.toString());
912+
}
913+
return this;
914+
}
915+
916+
Params withNodes(String[] nodes) {
917+
if (nodes != null && nodes.length > 0) {
918+
return putParam("nodes", String.join(",", nodes));
919+
}
920+
return this;
921+
}
922+
923+
Params withActions(String[] actions) {
924+
if (actions != null && actions.length > 0) {
925+
return putParam("actions", String.join(",", actions));
926+
}
927+
return this;
928+
}
929+
930+
Params withParentTaskId(TaskId parentTaskId) {
931+
if (parentTaskId != null && parentTaskId.isSet()) {
932+
return putParam("parent_task_id", parentTaskId.toString());
933+
}
934+
return this;
935+
}
936+
937+
Params withVerify(boolean verify) {
938+
if (verify) {
939+
return putParam("verify", Boolean.TRUE.toString());
940+
}
941+
return this;
942+
}
883943
}
884944

885945
/**

client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java

+31
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
package org.elasticsearch.client;
2121

2222
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
24+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
25+
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
2326
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
2427
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
2528
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -29,13 +32,16 @@
2932
import org.elasticsearch.common.xcontent.support.XContentMapValues;
3033
import org.elasticsearch.indices.recovery.RecoverySettings;
3134
import org.elasticsearch.rest.RestStatus;
35+
import org.elasticsearch.tasks.TaskInfo;
3236

3337
import java.io.IOException;
3438
import java.util.HashMap;
3539
import java.util.Map;
3640

41+
import static java.util.Collections.emptyList;
3742
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3843
import static org.hamcrest.Matchers.equalTo;
44+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3945
import static org.hamcrest.Matchers.notNullValue;
4046
import static org.hamcrest.Matchers.nullValue;
4147

@@ -105,4 +111,29 @@ public void testClusterUpdateSettingNonExistent() {
105111
assertThat(exception.getMessage(), equalTo(
106112
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
107113
}
114+
115+
public void testListTasks() throws IOException {
116+
ListTasksRequest request = new ListTasksRequest();
117+
ListTasksResponse response = execute(request, highLevelClient().cluster()::listTasks, highLevelClient().cluster()::listTasksAsync);
118+
119+
assertThat(response, notNullValue());
120+
assertThat(response.getNodeFailures(), equalTo(emptyList()));
121+
assertThat(response.getTaskFailures(), equalTo(emptyList()));
122+
// It's possible that there are other tasks except 'cluster:monitor/tasks/lists[n]' and 'action":"cluster:monitor/tasks/lists'
123+
assertThat(response.getTasks().size(), greaterThanOrEqualTo(2));
124+
boolean listTasksFound = false;
125+
for (TaskGroup taskGroup : response.getTaskGroups()) {
126+
TaskInfo parent = taskGroup.getTaskInfo();
127+
if ("cluster:monitor/tasks/lists".equals(parent.getAction())) {
128+
assertThat(taskGroup.getChildTasks().size(), equalTo(1));
129+
TaskGroup childGroup = taskGroup.getChildTasks().iterator().next();
130+
assertThat(childGroup.getChildTasks().isEmpty(), equalTo(true));
131+
TaskInfo child = childGroup.getTaskInfo();
132+
assertThat(child.getAction(), equalTo("cluster:monitor/tasks/lists[n]"));
133+
assertThat(child.getParentTaskId(), equalTo(parent.getTaskId()));
134+
listTasksFound = true;
135+
}
136+
}
137+
assertTrue("List tasks were not found", listTasksFound);
138+
}
108139
}

0 commit comments

Comments
 (0)