Skip to content

Commit 5112101

Browse files
DaveCTurnerGurkan Kaymak
authored and
Gurkan Kaymak
committed
Use Writeable for TransportReplAction derivatives (elastic#40894)
Relates elastic#34389
1 parent dec3702 commit 5112101

32 files changed

+254
-245
lines changed

server/src/main/java/org/elasticsearch/action/DocWriteRequest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,9 @@ static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException
222222
byte type = in.readByte();
223223
DocWriteRequest<?> docWriteRequest;
224224
if (type == 0) {
225-
IndexRequest indexRequest = new IndexRequest();
226-
indexRequest.readFrom(in);
227-
docWriteRequest = indexRequest;
225+
docWriteRequest = new IndexRequest(in);
228226
} else if (type == 1) {
229-
DeleteRequest deleteRequest = new DeleteRequest();
230-
deleteRequest.readFrom(in);
231-
docWriteRequest = deleteRequest;
227+
docWriteRequest = new DeleteRequest(in);
232228
} else if (type == 2) {
233229
UpdateRequest updateRequest = new UpdateRequest();
234230
updateRequest.readFrom(in);

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,11 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all
136136

137137
public static class ShardRequest extends ReplicationRequest<ShardRequest> {
138138

139-
private ClusterBlock clusterBlock;
139+
private final ClusterBlock clusterBlock;
140140

141-
ShardRequest(){
141+
ShardRequest(StreamInput in) throws IOException {
142+
super(in);
143+
clusterBlock = new ClusterBlock(in);
142144
}
143145

144146
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
@@ -153,9 +155,8 @@ public String toString() {
153155
}
154156

155157
@Override
156-
public void readFrom(final StreamInput in) throws IOException {
157-
super.readFrom(in);
158-
clusterBlock = new ClusterBlock(in);
158+
public void readFrom(final StreamInput in) {
159+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
159160
}
160161

161162
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public FlushRequest(String... indices) {
5252
super(indices);
5353
}
5454

55+
public FlushRequest(StreamInput in) throws IOException {
56+
super(in);
57+
force = in.readBoolean();
58+
waitIfOngoing = in.readBoolean();
59+
}
60+
5561
/**
5662
* Returns {@code true} iff a flush should block
5763
* if a another flush operation is already running. Otherwise {@code false}
@@ -103,9 +109,7 @@ public void writeTo(StreamOutput out) throws IOException {
103109

104110
@Override
105111
public void readFrom(StreamInput in) throws IOException {
106-
super.readFrom(in);
107-
force = in.readBoolean();
108-
waitIfOngoing = in.readBoolean();
112+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
109113
}
110114

111115
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@
2929

3030
public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
3131

32-
private FlushRequest request = new FlushRequest();
32+
private final FlushRequest request;
3333

3434
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
3535
super(shardId);
3636
this.request = request;
3737
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
3838
}
3939

40-
public ShardFlushRequest() {
40+
public ShardFlushRequest(StreamInput in) throws IOException {
41+
super(in);
42+
request = new FlushRequest(in);
4143
}
4244

4345
FlushRequest getRequest() {
@@ -46,8 +48,7 @@ FlushRequest getRequest() {
4648

4749
@Override
4850
public void readFrom(StreamInput in) throws IOException {
49-
super.readFrom(in);
50-
request.readFrom(in);
51+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
5152
}
5253

5354
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected PrimaryResult<ShardFlushRequest, ReplicationResponse> shardOperationOn
5555
IndexShard primary) {
5656
primary.flush(shardRequest.getRequest());
5757
logger.trace("{} flush request executed on primary", primary.shardId());
58-
return new PrimaryResult<ShardFlushRequest, ReplicationResponse>(shardRequest, new ReplicationResponse());
58+
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
5959
}
6060

6161
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
package org.elasticsearch.action.admin.indices.refresh;
2121

2222
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
25+
import java.io.IOException;
2326

2427
/**
2528
* A refresh request making all operations performed since the last refresh available for search. The (near) real-time
@@ -35,4 +38,8 @@ public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
3538
public RefreshRequest(String... indices) {
3639
super(indices);
3740
}
41+
42+
public RefreshRequest(StreamInput in) throws IOException {
43+
super(in);
44+
}
3845
}

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,14 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
3333

3434
private BulkItemRequest[] items;
3535

36-
public BulkShardRequest() {
36+
public BulkShardRequest(StreamInput in) throws IOException {
37+
super(in);
38+
items = new BulkItemRequest[in.readVInt()];
39+
for (int i = 0; i < items.length; i++) {
40+
if (in.readBoolean()) {
41+
items[i] = BulkItemRequest.readBulkItem(in);
42+
}
43+
}
3744
}
3845

3946
public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
@@ -60,7 +67,7 @@ public String[] indices() {
6067
indices.add(item.index());
6168
}
6269
}
63-
return indices.toArray(new String[indices.size()]);
70+
return indices.toArray(new String[0]);
6471
}
6572

6673
@Override
@@ -78,14 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
7885
}
7986

8087
@Override
81-
public void readFrom(StreamInput in) throws IOException {
82-
super.readFrom(in);
83-
items = new BulkItemRequest[in.readVInt()];
84-
for (int i = 0; i < items.length; i++) {
85-
if (in.readBoolean()) {
86-
items[i] = BulkItemRequest.readBulkItem(in);
87-
}
88-
}
88+
public void readFrom(StreamInput in) {
89+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
8990
}
9091

9192
@Override

server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@
2828
import org.elasticsearch.action.support.WriteResponse;
2929
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
3030
import org.elasticsearch.action.support.replication.ReplicationResponse;
31+
import org.elasticsearch.common.io.stream.Writeable;
3132
import org.elasticsearch.tasks.Task;
3233
import org.elasticsearch.transport.TransportService;
3334

34-
import java.util.function.Supplier;
35-
3635
/** use transport bulk action directly */
3736
@Deprecated
3837
public abstract class TransportSingleItemBulkWriteAction<
@@ -43,8 +42,8 @@ public abstract class TransportSingleItemBulkWriteAction<
4342
private final TransportBulkAction bulkAction;
4443

4544
protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters,
46-
Supplier<Request> request, TransportBulkAction bulkAction) {
47-
super(actionName, transportService, actionFilters, request);
45+
Writeable.Reader<Request> requestReader, TransportBulkAction bulkAction) {
46+
super(actionName, transportService, actionFilters, requestReader);
4847
this.bulkAction = bulkAction;
4948
}
5049

server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
5353
implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {
5454

55+
private static final ShardId NO_SHARD_ID = null;
56+
5557
// Set to null initially so we can know to override in bulk requests that have a default type.
5658
private String type;
5759
private String id;
@@ -62,14 +64,27 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
6264
private long ifSeqNo = UNASSIGNED_SEQ_NO;
6365
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
6466

67+
public DeleteRequest(StreamInput in) throws IOException {
68+
super(in);
69+
type = in.readString();
70+
id = in.readString();
71+
routing = in.readOptionalString();
72+
version = in.readLong();
73+
versionType = VersionType.fromValue(in.readByte());
74+
ifSeqNo = in.readZLong();
75+
ifPrimaryTerm = in.readVLong();
76+
}
77+
6578
public DeleteRequest() {
79+
super(NO_SHARD_ID);
6680
}
6781

6882
/**
6983
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
7084
* must be set.
7185
*/
7286
public DeleteRequest(String index) {
87+
super(NO_SHARD_ID);
7388
this.index = index;
7489
}
7590

@@ -84,6 +99,7 @@ public DeleteRequest(String index) {
8499
*/
85100
@Deprecated
86101
public DeleteRequest(String index, String type, String id) {
102+
super(NO_SHARD_ID);
87103
this.index = index;
88104
this.type = type;
89105
this.id = id;
@@ -96,6 +112,7 @@ public DeleteRequest(String index, String type, String id) {
96112
* @param id The id of the document
97113
*/
98114
public DeleteRequest(String index, String id) {
115+
super(NO_SHARD_ID);
99116
this.index = index;
100117
this.id = id;
101118
}
@@ -273,15 +290,8 @@ public OpType opType() {
273290
}
274291

275292
@Override
276-
public void readFrom(StreamInput in) throws IOException {
277-
super.readFrom(in);
278-
type = in.readString();
279-
id = in.readString();
280-
routing = in.readOptionalString();
281-
version = in.readLong();
282-
versionType = VersionType.fromValue(in.readByte());
283-
ifSeqNo = in.readZLong();
284-
ifPrimaryTerm = in.readVLong();
293+
public void readFrom(StreamInput in) {
294+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
285295
}
286296

287297
@Override
@@ -302,14 +312,4 @@ public void writeTo(StreamOutput out) throws IOException {
302312
public String toString() {
303313
return "delete {[" + index + "][" + type() + "][" + id + "]}";
304314
}
305-
306-
/**
307-
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
308-
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
309-
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
310-
*/
311-
@Override
312-
public DeleteRequest setShardId(ShardId shardId) {
313-
throw new UnsupportedOperationException("shard id should never be set on DeleteRequest");
314-
}
315315
}

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
8383
*/
8484
static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048;
8585

86+
private static final ShardId NO_SHARD_ID = null;
87+
8688
// Set to null initially so we can know to override in bulk requests that have a default type.
8789
private String type;
8890
private String id;
@@ -112,15 +114,37 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
112114
private long ifSeqNo = UNASSIGNED_SEQ_NO;
113115
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
114116

117+
public IndexRequest(StreamInput in) throws IOException {
118+
super(in);
119+
type = in.readOptionalString();
120+
id = in.readOptionalString();
121+
routing = in.readOptionalString();
122+
source = in.readBytesReference();
123+
opType = OpType.fromId(in.readByte());
124+
version = in.readLong();
125+
versionType = VersionType.fromValue(in.readByte());
126+
pipeline = in.readOptionalString();
127+
isRetry = in.readBoolean();
128+
autoGeneratedTimestamp = in.readLong();
129+
if (in.readBoolean()) {
130+
contentType = in.readEnum(XContentType.class);
131+
} else {
132+
contentType = null;
133+
}
134+
ifSeqNo = in.readZLong();
135+
ifPrimaryTerm = in.readVLong();
136+
}
115137

116138
public IndexRequest() {
139+
super(NO_SHARD_ID);
117140
}
118141

119142
/**
120143
* Constructs a new index request against the specific index. The {@link #type(String)}
121144
* {@link #source(byte[], XContentType)} must be set.
122145
*/
123146
public IndexRequest(String index) {
147+
super(NO_SHARD_ID);
124148
this.index = index;
125149
}
126150

@@ -131,6 +155,7 @@ public IndexRequest(String index) {
131155
*/
132156
@Deprecated
133157
public IndexRequest(String index, String type) {
158+
super(NO_SHARD_ID);
134159
this.index = index;
135160
this.type = type;
136161
}
@@ -146,6 +171,7 @@ public IndexRequest(String index, String type) {
146171
*/
147172
@Deprecated
148173
public IndexRequest(String index, String type, String id) {
174+
super(NO_SHARD_ID);
149175
this.index = index;
150176
this.type = type;
151177
this.id = id;
@@ -593,25 +619,8 @@ public void resolveRouting(MetaData metaData) {
593619
}
594620

595621
@Override
596-
public void readFrom(StreamInput in) throws IOException {
597-
super.readFrom(in);
598-
type = in.readOptionalString();
599-
id = in.readOptionalString();
600-
routing = in.readOptionalString();
601-
source = in.readBytesReference();
602-
opType = OpType.fromId(in.readByte());
603-
version = in.readLong();
604-
versionType = VersionType.fromValue(in.readByte());
605-
pipeline = in.readOptionalString();
606-
isRetry = in.readBoolean();
607-
autoGeneratedTimestamp = in.readLong();
608-
if (in.readBoolean()) {
609-
contentType = in.readEnum(XContentType.class);
610-
} else {
611-
contentType = null;
612-
}
613-
ifSeqNo = in.readZLong();
614-
ifPrimaryTerm = in.readVLong();
622+
public void readFrom(StreamInput in) {
623+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
615624
}
616625

617626
@Override
@@ -675,15 +684,4 @@ public void onRetry() {
675684
public long getAutoGeneratedTimestamp() {
676685
return autoGeneratedTimestamp;
677686
}
678-
679-
/**
680-
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
681-
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
682-
* use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
683-
*/
684-
@Override
685-
public IndexRequest setShardId(ShardId shardId) {
686-
throw new UnsupportedOperationException("shard id should never be set on IndexRequest");
687-
}
688-
689687
}

0 commit comments

Comments
 (0)