Skip to content

Commit 59f53dd

Browse files
committed
Fixing HLRC parsing of CancelTasks response (elastic#45414)
1 parent 529ebea commit 59f53dd

17 files changed

+1284
-46
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.client.core.TermVectorsRequest;
5454
import org.elasticsearch.client.indices.AnalyzeRequest;
5555
import org.elasticsearch.client.security.RefreshPolicy;
56+
import org.elasticsearch.client.tasks.TaskId;
5657
import org.elasticsearch.cluster.health.ClusterHealthStatus;
5758
import org.elasticsearch.common.Nullable;
5859
import org.elasticsearch.common.Priority;
@@ -80,7 +81,6 @@
8081
import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
8182
import org.elasticsearch.script.mustache.SearchTemplateRequest;
8283
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
83-
import org.elasticsearch.tasks.TaskId;
8484

8585
import java.io.ByteArrayOutputStream;
8686
import java.io.IOException;
@@ -1042,6 +1042,20 @@ Params withActions(String[] actions) {
10421042
return this;
10431043
}
10441044

1045+
Params withTaskId(org.elasticsearch.tasks.TaskId taskId) {
1046+
if (taskId != null && taskId.isSet()) {
1047+
return putParam("task_id", taskId.toString());
1048+
}
1049+
return this;
1050+
}
1051+
1052+
Params withParentTaskId(org.elasticsearch.tasks.TaskId parentTaskId) {
1053+
if (parentTaskId != null && parentTaskId.isSet()) {
1054+
return putParam("parent_task_id", parentTaskId.toString());
1055+
}
1056+
return this;
1057+
}
1058+
10451059
Params withTaskId(TaskId taskId) {
10461060
if (taskId != null && taskId.isSet()) {
10471061
return putParam("task_id", taskId.toString());

client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
package org.elasticsearch.client;
2121

2222
import org.elasticsearch.action.ActionListener;
23-
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
24-
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
2523
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
2624
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
25+
import org.elasticsearch.client.tasks.CancelTasksRequest;
26+
import org.elasticsearch.client.tasks.CancelTasksResponse;
2727
import org.elasticsearch.client.tasks.GetTaskRequest;
2828
import org.elasticsearch.client.tasks.GetTaskResponse;
2929

@@ -102,17 +102,7 @@ public Cancellable getAsync(GetTaskRequest request, RequestOptions options,
102102
GetTaskResponse::fromXContent, listener);
103103
}
104104

105-
/**
106-
* Cancel one or more cluster tasks using the Task Management API.
107-
*
108-
* See
109-
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
110-
* @param cancelTasksRequest the request
111-
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
112-
* @return the response
113-
* @throws IOException in case there is a problem sending the request or parsing back the response
114-
*
115-
*/
105+
116106
public CancelTasksResponse cancel(CancelTasksRequest cancelTasksRequest, RequestOptions options ) throws IOException {
117107
return restHighLevelClient.performRequestAndParseEntity(
118108
cancelTasksRequest,
@@ -135,6 +125,7 @@ public CancelTasksResponse cancel(CancelTasksRequest cancelTasksRequest, Request
135125
*/
136126
public Cancellable cancelAsync(CancelTasksRequest cancelTasksRequest, RequestOptions options,
137127
ActionListener<CancelTasksResponse> listener) {
128+
138129
return restHighLevelClient.performRequestAsyncAndParseEntity(
139130
cancelTasksRequest,
140131
TasksRequestConverters::cancelTasks,

client/rest-high-level/src/main/java/org/elasticsearch/client/TasksRequestConverters.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
import org.apache.http.client.methods.HttpGet;
2323
import org.apache.http.client.methods.HttpPost;
24-
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
2524
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
2625
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
26+
import org.elasticsearch.client.tasks.CancelTasksRequest;
2727
import org.elasticsearch.client.tasks.GetTaskRequest;
2828

2929
final class TasksRequestConverters {
@@ -70,5 +70,5 @@ static Request getTask(GetTaskRequest getTaskRequest) {
7070
request.addParameters(params.asMap());
7171
return request;
7272
}
73-
73+
7474
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.tasks;
20+
21+
import org.elasticsearch.client.Validatable;
22+
import org.elasticsearch.common.unit.TimeValue;
23+
24+
import java.util.Arrays;
25+
import java.util.Objects;
26+
27+
public class CancelTasksRequest implements Validatable {
28+
29+
public static final String[] EMPTY_ARRAY = new String[0];
30+
public static final String[] ALL_ACTIONS = EMPTY_ARRAY;
31+
public static final String[] ALL_NODES = EMPTY_ARRAY;
32+
private String[] nodes = ALL_NODES;
33+
private TimeValue timeout;
34+
private String[] actions = ALL_ACTIONS;
35+
private TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
36+
private TaskId taskId = TaskId.EMPTY_TASK_ID;
37+
private String reason = "";
38+
39+
public final CancelTasksRequest setNodes(String... nodes) {
40+
this.nodes = nodes;
41+
return this;
42+
}
43+
44+
public String[] getNodes() {
45+
return nodes;
46+
}
47+
48+
public CancelTasksRequest setTimeout(TimeValue timeout) {
49+
this.timeout = timeout;
50+
return this;
51+
}
52+
53+
public final CancelTasksRequest setTimeout(String timeout) {
54+
this.timeout = TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout");
55+
return this;
56+
}
57+
58+
public TimeValue getTimeout() {
59+
return timeout;
60+
}
61+
62+
public CancelTasksRequest setActions(String... actions) {
63+
this.actions = actions;
64+
return this;
65+
}
66+
67+
public String[] getActions() {
68+
return actions;
69+
}
70+
71+
public CancelTasksRequest setParentTaskId(TaskId parentTaskId) {
72+
this.parentTaskId = parentTaskId;
73+
return this;
74+
}
75+
76+
public TaskId getParentTaskId() {
77+
return parentTaskId;
78+
}
79+
80+
public CancelTasksRequest setTaskId(TaskId taskId) {
81+
this.taskId = taskId;
82+
return this;
83+
}
84+
85+
public TaskId getTaskId() {
86+
return taskId;
87+
}
88+
89+
90+
public String getReason() {
91+
return reason;
92+
}
93+
94+
public void setReason(String reason) {
95+
this.reason = reason;
96+
}
97+
98+
@Override
99+
public boolean equals(Object o) {
100+
if (this == o) return true;
101+
if (!(o instanceof CancelTasksRequest)) return false;
102+
CancelTasksRequest that = (CancelTasksRequest) o;
103+
return Arrays.equals(getNodes(), that.getNodes()) &&
104+
Objects.equals(getTimeout(), that.getTimeout()) &&
105+
Arrays.equals(getActions(), that.getActions()) &&
106+
Objects.equals(getParentTaskId(), that.getParentTaskId()) &&
107+
Objects.equals(getTaskId(), that.getTaskId()) &&
108+
Objects.equals(getReason(), that.getReason());
109+
}
110+
111+
@Override
112+
public int hashCode() {
113+
int result = Objects.hash(getTimeout(), getParentTaskId(), getTaskId(), getReason());
114+
result = 31 * result + Arrays.hashCode(getNodes());
115+
result = 31 * result + Arrays.hashCode(getActions());
116+
return result;
117+
}
118+
119+
@Override
120+
public String toString() {
121+
return "CancelTasksRequest{" +
122+
"nodes=" + Arrays.toString(nodes) +
123+
", timeout=" + timeout +
124+
", actions=" + Arrays.toString(actions) +
125+
", parentTaskId=" + parentTaskId +
126+
", taskId=" + taskId +
127+
", reason='" + reason + '\'' +
128+
'}';
129+
}
130+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.tasks;
20+
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.stream.Collectors;
29+
30+
import org.elasticsearch.common.ParseField;
31+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
32+
import org.elasticsearch.common.xcontent.XContentParser;
33+
34+
import static java.util.stream.Collectors.groupingBy;
35+
import static java.util.stream.Collectors.toList;
36+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
37+
38+
public class CancelTasksResponse {
39+
40+
private final NodesInfoData nodesInfoData;
41+
private final List<TaskOperationFailure> taskFailures = new ArrayList<>();
42+
private final List<ElasticsearchException> nodeFailures = new ArrayList<>();
43+
private final List<TaskInfo> tasks = new ArrayList<>();
44+
private final List<TaskGroup> taskGroups = new ArrayList<>();
45+
46+
public CancelTasksResponse(NodesInfoData nodesInfoData,
47+
List<TaskOperationFailure> taskFailures,
48+
List<ElasticsearchException> nodeFailures) {
49+
this.nodesInfoData = nodesInfoData;
50+
if (taskFailures!= null){
51+
this.taskFailures.addAll(taskFailures);
52+
}
53+
if (nodeFailures!=null) {
54+
this.nodeFailures.addAll(nodeFailures);
55+
}
56+
this.tasks.addAll(nodesInfoData
57+
.getNodesInfoData()
58+
.stream()
59+
.flatMap(nodeData -> nodeData.getTasks().stream())
60+
.collect(toList())
61+
);
62+
63+
this.taskGroups.addAll(buildTaskGroups());
64+
}
65+
66+
private List<TaskGroup> buildTaskGroups() {
67+
Map<TaskId, TaskGroup.Builder> taskGroups = new HashMap<>();
68+
List<TaskGroup.Builder> topLevelTasks = new ArrayList<>();
69+
// First populate all tasks
70+
for (TaskInfo taskInfo : this.tasks) {
71+
taskGroups.put(taskInfo.getTaskId(), TaskGroup.builder(taskInfo));
72+
}
73+
74+
// Now go through all task group builders and add children to their parents
75+
for (TaskGroup.Builder taskGroup : taskGroups.values()) {
76+
TaskId parentTaskId = taskGroup.getTaskInfo().getParentTaskId();
77+
if (parentTaskId.isSet()) {
78+
TaskGroup.Builder parentTask = taskGroups.get(parentTaskId);
79+
if (parentTask != null) {
80+
// we found parent in the list of tasks - add it to the parent list
81+
parentTask.addGroup(taskGroup);
82+
} else {
83+
// we got zombie or the parent was filtered out - add it to the top task list
84+
topLevelTasks.add(taskGroup);
85+
}
86+
} else {
87+
// top level task - add it to the top task list
88+
topLevelTasks.add(taskGroup);
89+
}
90+
}
91+
return Collections.unmodifiableList(topLevelTasks.stream().map(TaskGroup.Builder::build).collect(Collectors.toList()));
92+
}
93+
94+
public NodesInfoData getNodesInfoData() {
95+
return nodesInfoData;
96+
}
97+
98+
public List<TaskInfo> getTasks() {
99+
return tasks;
100+
}
101+
102+
public Map<String, List<TaskInfo>> getPerNodeTasks() {
103+
return getTasks()
104+
.stream()
105+
.collect(groupingBy(TaskInfo::getNodeId));
106+
}
107+
108+
public List<TaskOperationFailure> getTaskFailures() {
109+
return taskFailures;
110+
}
111+
112+
public List<ElasticsearchException> getNodeFailures() {
113+
return nodeFailures;
114+
}
115+
116+
public List<TaskGroup> getTaskGroups() {
117+
return taskGroups;
118+
}
119+
120+
@Override
121+
public boolean equals(Object o) {
122+
if (this == o) return true;
123+
if (!(o instanceof CancelTasksResponse)) return false;
124+
CancelTasksResponse response = (CancelTasksResponse) o;
125+
return getNodesInfoData().equals(response.getNodesInfoData()) &&
126+
Objects.equals(getTaskFailures(), response.getTaskFailures()) &&
127+
Objects.equals(getNodeFailures(), response.getNodeFailures()) &&
128+
Objects.equals(getTasks(), response.getTasks()) &&
129+
Objects.equals(getTaskGroups(), response.getTaskGroups());
130+
}
131+
132+
@Override
133+
public int hashCode() {
134+
return Objects.hash(getNodesInfoData(), getTaskFailures(), getNodeFailures(), getTasks(), getTaskGroups());
135+
}
136+
137+
public static CancelTasksResponse fromXContent(final XContentParser parser) throws IOException {
138+
return PARSER.parse(parser, null);
139+
}
140+
141+
@Override
142+
public String toString() {
143+
return "CancelTasksResponse{" +
144+
"nodesInfoData=" + nodesInfoData +
145+
", taskFailures=" + taskFailures +
146+
", nodeFailures=" + nodeFailures +
147+
", tasks=" + tasks +
148+
", taskGroups=" + taskGroups +
149+
'}';
150+
}
151+
152+
private static ConstructingObjectParser<CancelTasksResponse, Void> PARSER;
153+
154+
static {
155+
ConstructingObjectParser<CancelTasksResponse, Void> parser = new ConstructingObjectParser<>("cancel_tasks_response", true,
156+
constructingObjects -> {
157+
int i = 0;
158+
@SuppressWarnings("unchecked")
159+
List<TaskOperationFailure> tasksFailures = (List<TaskOperationFailure>) constructingObjects[i++];
160+
@SuppressWarnings("unchecked")
161+
List<ElasticsearchException> nodeFailures = (List<ElasticsearchException>) constructingObjects[i++];
162+
NodesInfoData nodesInfoData = (NodesInfoData) constructingObjects[i];
163+
return new CancelTasksResponse(nodesInfoData, tasksFailures, nodeFailures);
164+
});
165+
166+
parser.declareObjectArray(optionalConstructorArg(), (p, c) ->
167+
TaskOperationFailure.fromXContent(p), new ParseField("task_failures"));
168+
parser.declareObjectArray(optionalConstructorArg(), (p, c) ->
169+
ElasticsearchException.fromXContent(p), new ParseField("node_failures"));
170+
parser.declareObject(optionalConstructorArg(), NodesInfoData.PARSER, new ParseField("nodes"));
171+
PARSER = parser;
172+
}
173+
}

0 commit comments

Comments
 (0)