Skip to content

Commit 11935cd

Browse files
authored
Replace Streamable w/ Writeable in BaseTasksResponse and subclasses (elastic#36176)
This commit replaces usages of Streamable with Writeable for the BaseTasksResponse / TransportTasksAction classes and subclasses of these classes. Note that where possible response fields were made final. Relates to elastic#34389
1 parent 59b0900 commit 11935cd

File tree

55 files changed

+362
-541
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+362
-541
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.action.Action;
2323
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
24+
import org.elasticsearch.common.io.stream.Writeable;
2425

2526
public class RethrottleAction extends Action<ListTasksResponse> {
2627
public static final RethrottleAction INSTANCE = new RethrottleAction();
@@ -32,6 +33,11 @@ private RethrottleAction() {
3233

3334
@Override
3435
public ListTasksResponse newResponse() {
35-
return new ListTasksResponse();
36+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
37+
}
38+
39+
@Override
40+
public Writeable.Reader<ListTasksResponse> getResponseReader() {
41+
return ListTasksResponse::new;
3642
}
3743
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@
2929
import org.elasticsearch.client.Client;
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.inject.Inject;
32-
import org.elasticsearch.common.io.stream.StreamInput;
3332
import org.elasticsearch.tasks.TaskId;
3433
import org.elasticsearch.tasks.TaskInfo;
3534
import org.elasticsearch.threadpool.ThreadPool;
3635
import org.elasticsearch.transport.TransportService;
3736

38-
import java.io.IOException;
3937
import java.util.List;
4038

4139
public class TransportRethrottleAction extends TransportTasksAction<BulkByScrollTask, RethrottleRequest, ListTasksResponse, TaskInfo> {
@@ -45,7 +43,7 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
4543
public TransportRethrottleAction(ClusterService clusterService, TransportService transportService,
4644
ActionFilters actionFilters, Client client) {
4745
super(RethrottleAction.NAME, clusterService, transportService, actionFilters,
48-
RethrottleRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
46+
RethrottleRequest::new, ListTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
4947
this.client = client;
5048
}
5149

@@ -101,11 +99,6 @@ private static void rethrottleChildTask(Logger logger, String localNodeId, BulkB
10199
listener.onResponse(task.taskInfo(localNodeId, true));
102100
}
103101

104-
@Override
105-
protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
106-
return new TaskInfo(in);
107-
}
108-
109102
@Override
110103
protected ListTasksResponse newResponse(RethrottleRequest request, List<TaskInfo> tasks,
111104
List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
2121

2222
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.common.io.stream.Writeable;
2324

2425
/**
2526
* Action for cancelling running tasks
@@ -35,6 +36,11 @@ private CancelTasksAction() {
3536

3637
@Override
3738
public CancelTasksResponse newResponse() {
38-
return new CancelTasksResponse();
39+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
40+
}
41+
42+
@Override
43+
public Writeable.Reader<CancelTasksResponse> getResponseReader() {
44+
return CancelTasksResponse::new;
3945
}
4046
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.TaskOperationFailure;
2424
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2525
import org.elasticsearch.common.Strings;
26+
import org.elasticsearch.common.io.stream.StreamInput;
2627
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
2728
import org.elasticsearch.common.xcontent.XContentBuilder;
2829
import org.elasticsearch.common.xcontent.XContentParser;
@@ -40,7 +41,8 @@ public class CancelTasksResponse extends ListTasksResponse {
4041
private static final ConstructingObjectParser<CancelTasksResponse, Void> PARSER =
4142
setupParser("cancel_tasks_response", CancelTasksResponse::new);
4243

43-
public CancelTasksResponse() {
44+
public CancelTasksResponse(StreamInput in) throws IOException {
45+
super(in);
4446
}
4547

4648
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends ElasticsearchException>

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
6464
@Inject
6565
public TransportCancelTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
6666
super(CancelTasksAction.NAME, clusterService, transportService, actionFilters,
67-
CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT);
67+
CancelTasksRequest::new, CancelTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
6868
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new,
6969
new BanParentRequestHandler());
7070
}
@@ -75,11 +75,6 @@ protected CancelTasksResponse newResponse(CancelTasksRequest request, List<TaskI
7575
return new CancelTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
7676
}
7777

78-
@Override
79-
protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
80-
return new TaskInfo(in);
81-
}
82-
8378
protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask> operation) {
8479
if (request.getTaskId().isSet()) {
8580
// we are only checking one task, we can optimize it

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.admin.cluster.node.tasks.list;
2121

2222
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.common.io.stream.Writeable;
2324

2425
/**
2526
* Action for retrieving a list of currently running tasks
@@ -35,6 +36,11 @@ private ListTasksAction() {
3536

3637
@Override
3738
public ListTasksResponse newResponse() {
38-
return new ListTasksResponse();
39+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
40+
}
41+
42+
@Override
43+
public Writeable.Reader<ListTasksResponse> getResponseReader() {
44+
return ListTasksResponse::new;
3945
}
4046
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,28 @@
5252
public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject {
5353
private static final String TASKS = "tasks";
5454

55-
private List<TaskInfo> tasks;
55+
private final List<TaskInfo> tasks;
5656

5757
private Map<String, List<TaskInfo>> perNodeTasks;
5858

5959
private List<TaskGroup> groups;
6060

61-
public ListTasksResponse() {
62-
this(null, null, null);
63-
}
64-
6561
public ListTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures,
6662
List<? extends ElasticsearchException> nodeFailures) {
6763
super(taskFailures, nodeFailures);
6864
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
6965
}
7066

67+
public ListTasksResponse(StreamInput in) throws IOException {
68+
super(in);
69+
tasks = Collections.unmodifiableList(in.readList(TaskInfo::new));
70+
}
71+
72+
@Override
73+
public void writeTo(StreamOutput out) throws IOException {
74+
super.writeTo(out);
75+
out.writeList(tasks);
76+
}
7177

7278
protected static <T> ConstructingObjectParser<T, Void> setupParser(String name,
7379
TriFunction<
@@ -96,18 +102,6 @@ protected static <T> ConstructingObjectParser<T, Void> setupParser(String name,
96102
private static final ConstructingObjectParser<ListTasksResponse, Void> PARSER =
97103
setupParser("list_tasks_response", ListTasksResponse::new);
98104

99-
@Override
100-
public void readFrom(StreamInput in) throws IOException {
101-
super.readFrom(in);
102-
tasks = Collections.unmodifiableList(in.readList(TaskInfo::new));
103-
}
104-
105-
@Override
106-
public void writeTo(StreamOutput out) throws IOException {
107-
super.writeTo(out);
108-
out.writeList(tasks);
109-
}
110-
111105
/**
112106
* Returns the list of tasks by node
113107
*/

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@
2626
import org.elasticsearch.action.support.tasks.TransportTasksAction;
2727
import org.elasticsearch.cluster.service.ClusterService;
2828
import org.elasticsearch.common.inject.Inject;
29-
import org.elasticsearch.common.io.stream.StreamInput;
3029
import org.elasticsearch.common.unit.TimeValue;
3130
import org.elasticsearch.tasks.Task;
3231
import org.elasticsearch.tasks.TaskInfo;
3332
import org.elasticsearch.threadpool.ThreadPool;
3433
import org.elasticsearch.transport.TransportService;
3534

36-
import java.io.IOException;
3735
import java.util.List;
3836
import java.util.function.Consumer;
3937

@@ -52,7 +50,7 @@ public static long waitForCompletionTimeout(TimeValue timeout) {
5250
@Inject
5351
public TransportListTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
5452
super(ListTasksAction.NAME, clusterService, transportService, actionFilters,
55-
ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
53+
ListTasksRequest::new, ListTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
5654
}
5755

5856
@Override
@@ -61,11 +59,6 @@ protected ListTasksResponse newResponse(ListTasksRequest request, List<TaskInfo>
6159
return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
6260
}
6361

64-
@Override
65-
protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
66-
return new TaskInfo(in);
67-
}
68-
6962
@Override
7063
protected void taskOperation(ListTasksRequest request, Task task, ActionListener<TaskInfo> listener) {
7164
listener.onResponse(task.taskInfo(clusterService.localNode().getId(), request.getDetailed()));

server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,34 +55,8 @@ public BaseTasksResponse(List<TaskOperationFailure> taskFailures, List<? extends
5555
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures));
5656
}
5757

58-
/**
59-
* The list of task failures exception.
60-
*/
61-
public List<TaskOperationFailure> getTaskFailures() {
62-
return taskFailures;
63-
}
64-
65-
/**
66-
* The list of node failures exception.
67-
*/
68-
public List<ElasticsearchException> getNodeFailures() {
69-
return nodeFailures;
70-
}
71-
72-
/**
73-
* Rethrow task failures if there are any.
74-
*/
75-
public void rethrowFailures(String operationName) {
76-
rethrowAndSuppress(Stream.concat(
77-
getNodeFailures().stream(),
78-
getTaskFailures().stream().map(f -> new ElasticsearchException(
79-
"{} of [{}] failed", f.getCause(), operationName, new TaskId(f.getNodeId(), f.getTaskId()))))
80-
.collect(toList()));
81-
}
82-
83-
@Override
84-
public void readFrom(StreamInput in) throws IOException {
85-
super.readFrom(in);
58+
public BaseTasksResponse(StreamInput in) throws IOException {
59+
super(in);
8660
int size = in.readVInt();
8761
List<TaskOperationFailure> taskFailures = new ArrayList<>(size);
8862
for (int i = 0; i < size; i++) {
@@ -110,6 +84,31 @@ public void writeTo(StreamOutput out) throws IOException {
11084
}
11185
}
11286

87+
/**
88+
* The list of task failures exception.
89+
*/
90+
public List<TaskOperationFailure> getTaskFailures() {
91+
return taskFailures;
92+
}
93+
94+
/**
95+
* The list of node failures exception.
96+
*/
97+
public List<ElasticsearchException> getNodeFailures() {
98+
return nodeFailures;
99+
}
100+
101+
/**
102+
* Rethrow task failures if there are any.
103+
*/
104+
public void rethrowFailures(String operationName) {
105+
rethrowAndSuppress(Stream.concat(
106+
getNodeFailures().stream(),
107+
getTaskFailures().stream().map(f -> new ElasticsearchException(
108+
"{} of [{}] failed", f.getCause(), operationName, new TaskId(f.getNodeId(), f.getTaskId()))))
109+
.collect(toList()));
110+
}
111+
113112
protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException {
114113
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
115114
builder.startArray(TASK_FAILURES);

server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.util.concurrent.atomic.AtomicInteger;
5656
import java.util.concurrent.atomic.AtomicReferenceArray;
5757
import java.util.function.Consumer;
58-
import java.util.function.Supplier;
5958

6059
import static java.util.Collections.emptyList;
6160

@@ -71,20 +70,23 @@ public abstract class TransportTasksAction<
7170

7271
protected final ClusterService clusterService;
7372
protected final TransportService transportService;
74-
protected final Writeable.Reader<TasksRequest> requestSupplier;
75-
protected final Supplier<TasksResponse> responseSupplier;
73+
protected final Writeable.Reader<TasksRequest> requestReader;
74+
protected final Writeable.Reader<TasksResponse> responsesReader;
75+
protected final Writeable.Reader<TaskResponse> responseReader;
7676

7777
protected final String transportNodeAction;
7878

7979
protected TransportTasksAction(String actionName, ClusterService clusterService, TransportService transportService,
80-
ActionFilters actionFilters, Writeable.Reader<TasksRequest> requestSupplier,
81-
Supplier<TasksResponse> responseSupplier, String nodeExecutor) {
82-
super(actionName, transportService, actionFilters, requestSupplier);
80+
ActionFilters actionFilters, Writeable.Reader<TasksRequest> requestReader,
81+
Writeable.Reader<TasksResponse> responsesReader, Writeable.Reader<TaskResponse> responseReader,
82+
String nodeExecutor) {
83+
super(actionName, transportService, actionFilters, requestReader);
8384
this.clusterService = clusterService;
8485
this.transportService = transportService;
8586
this.transportNodeAction = actionName + "[n]";
86-
this.requestSupplier = requestSupplier;
87-
this.responseSupplier = responseSupplier;
87+
this.requestReader = requestReader;
88+
this.responsesReader = responsesReader;
89+
this.responseReader = responseReader;
8890

8991
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, NodeTaskRequest::new, new NodeTransportHandler());
9092
}
@@ -205,8 +207,6 @@ protected TasksResponse newResponse(TasksRequest request, AtomicReferenceArray r
205207
return newResponse(request, tasks, taskOperationFailures, failedNodeExceptions);
206208
}
207209

208-
protected abstract TaskResponse readTaskResponse(StreamInput in) throws IOException;
209-
210210
/**
211211
* Perform the required operation on the task. It is OK start an asynchronous operation or to throw an exception but not both.
212212
*/
@@ -364,7 +364,7 @@ private class NodeTaskRequest extends TransportRequest {
364364

365365
protected NodeTaskRequest(StreamInput in) throws IOException {
366366
super(in);
367-
this.tasksRequest = requestSupplier.read(in);
367+
this.tasksRequest = requestReader.read(in);
368368
}
369369

370370
@Override
@@ -411,7 +411,7 @@ public void readFrom(StreamInput in) throws IOException {
411411
int resultsSize = in.readVInt();
412412
results = new ArrayList<>(resultsSize);
413413
for (; resultsSize > 0; resultsSize--) {
414-
final TaskResponse result = in.readBoolean() ? readTaskResponse(in) : null;
414+
final TaskResponse result = in.readBoolean() ? responseReader.read(in) : null;
415415
results.add(result);
416416
}
417417
if (in.readBoolean()) {

0 commit comments

Comments
 (0)