Skip to content

Commit 6486cdf

Browse files
sohaibiftikharnik9000
authored andcommitted
Changed ReindexRequest to use Writeable.Reader (#32401)
-- This is a pre-stage for adding the reindex API to the REST high-level-client -- Follows the pattern set in #26315
1 parent 0359cef commit 6486cdf

File tree

7 files changed

+39
-30
lines changed

7 files changed

+39
-30
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
4343
public TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
4444
IndexNameExpressionResolver resolver, Client client, TransportService transportService,
4545
ScriptService scriptService, ClusterService clusterService) {
46-
super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters, resolver, DeleteByQueryRequest::new);
46+
super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters, DeleteByQueryRequest::new, resolver);
4747
this.client = client;
4848
this.scriptService = scriptService;
4949
this.clusterService = clusterService;

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
102102
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
103103
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
104104
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService) {
105-
super(settings, ReindexAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
106-
ReindexRequest::new);
105+
super(settings, ReindexAction.NAME, threadPool, transportService, actionFilters, ReindexRequest::new, indexNameExpressionResolver);
107106
this.clusterService = clusterService;
108107
this.scriptService = scriptService;
109108
this.autoCreateIndex = autoCreateIndex;

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public TransportUpdateByQueryAction(Settings settings, ThreadPool threadPool, Ac
5656
IndexNameExpressionResolver indexNameExpressionResolver, Client client, TransportService transportService,
5757
ScriptService scriptService, ClusterService clusterService) {
5858
super(settings, UpdateByQueryAction.NAME, threadPool, transportService, actionFilters,
59-
indexNameExpressionResolver, UpdateByQueryRequest::new);
59+
UpdateByQueryRequest::new, indexNameExpressionResolver);
6060
this.client = client;
6161
this.scriptService = scriptService;
6262
this.clusterService = clusterService;

modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java

+15-20
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,17 @@ public void testReindexRequest() throws IOException {
6767
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, null,
6868
query, username, password, headers, socketTimeout, connectTimeout));
6969
}
70-
ReindexRequest tripped = new ReindexRequest();
71-
roundTrip(reindex, tripped);
70+
ReindexRequest tripped = new ReindexRequest(toInputByteStream(reindex));
7271
assertRequestEquals(reindex, tripped);
7372

7473
// Try slices=auto with a version that doesn't support it, which should fail
7574
reindex.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
76-
Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, reindex, null));
75+
Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, reindex));
7776
assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());
7877

7978
// Try regular slices with a version that doesn't support slices=auto, which should succeed
80-
tripped = new ReindexRequest();
8179
reindex.setSlices(between(1, Integer.MAX_VALUE));
82-
roundTrip(Version.V_6_0_0_alpha1, reindex, tripped);
80+
tripped = new ReindexRequest(toInputByteStream(reindex));
8381
assertRequestEquals(Version.V_6_0_0_alpha1, reindex, tripped);
8482
}
8583

@@ -89,40 +87,36 @@ public void testUpdateByQueryRequest() throws IOException {
8987
if (randomBoolean()) {
9088
update.setPipeline(randomAlphaOfLength(5));
9189
}
92-
UpdateByQueryRequest tripped = new UpdateByQueryRequest();
93-
roundTrip(update, tripped);
90+
UpdateByQueryRequest tripped = new UpdateByQueryRequest(toInputByteStream(update));
9491
assertRequestEquals(update, tripped);
9592
assertEquals(update.getPipeline(), tripped.getPipeline());
9693

9794
// Try slices=auto with a version that doesn't support it, which should fail
9895
update.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
99-
Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, update, null));
96+
Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, update));
10097
assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());
10198

10299
// Try regular slices with a version that doesn't support slices=auto, which should succeed
103-
tripped = new UpdateByQueryRequest();
104100
update.setSlices(between(1, Integer.MAX_VALUE));
105-
roundTrip(Version.V_6_0_0_alpha1, update, tripped);
101+
tripped = new UpdateByQueryRequest(toInputByteStream(update));
106102
assertRequestEquals(update, tripped);
107103
assertEquals(update.getPipeline(), tripped.getPipeline());
108104
}
109105

110106
public void testDeleteByQueryRequest() throws IOException {
111107
DeleteByQueryRequest delete = new DeleteByQueryRequest(new SearchRequest());
112108
randomRequest(delete);
113-
DeleteByQueryRequest tripped = new DeleteByQueryRequest();
114-
roundTrip(delete, tripped);
109+
DeleteByQueryRequest tripped = new DeleteByQueryRequest(toInputByteStream(delete));
115110
assertRequestEquals(delete, tripped);
116111

117112
// Try slices=auto with a version that doesn't support it, which should fail
118113
delete.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
119-
Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, delete, null));
114+
Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, delete));
120115
assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());
121116

122117
// Try regular slices with a version that doesn't support slices=auto, which should succeed
123-
tripped = new DeleteByQueryRequest();
124118
delete.setSlices(between(1, Integer.MAX_VALUE));
125-
roundTrip(Version.V_6_0_0_alpha1, delete, tripped);
119+
tripped = new DeleteByQueryRequest(toInputByteStream(delete));
126120
assertRequestEquals(delete, tripped);
127121
}
128122

@@ -198,23 +192,24 @@ public void testRethrottleRequest() throws IOException {
198192
request.setTaskId(new TaskId(randomAlphaOfLength(5), randomLong()));
199193
}
200194
RethrottleRequest tripped = new RethrottleRequest();
201-
roundTrip(request, tripped);
195+
// We use readFrom here because Rethrottle does not support the Writeable.Reader interface
196+
tripped.readFrom(toInputByteStream(request));
202197
assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0.00001);
203198
assertArrayEquals(request.getActions(), tripped.getActions());
204199
assertEquals(request.getTaskId(), tripped.getTaskId());
205200
}
206201

207-
private void roundTrip(Streamable example, Streamable empty) throws IOException {
208-
roundTrip(Version.CURRENT, example, empty);
202+
private StreamInput toInputByteStream(Streamable example) throws IOException {
203+
return toInputByteStream(Version.CURRENT, example);
209204
}
210205

211-
private void roundTrip(Version version, Streamable example, Streamable empty) throws IOException {
206+
private StreamInput toInputByteStream(Version version, Streamable example) throws IOException {
212207
BytesStreamOutput out = new BytesStreamOutput();
213208
out.setVersion(version);
214209
example.writeTo(out);
215210
StreamInput in = out.bytes().streamInput();
216211
in.setVersion(version);
217-
empty.readFrom(in);
212+
return in;
218213
}
219214

220215
private Script randomScript() {

server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java

+7
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import org.elasticsearch.action.IndicesRequest;
2424
import org.elasticsearch.action.search.SearchRequest;
2525
import org.elasticsearch.action.support.IndicesOptions;
26+
import org.elasticsearch.common.io.stream.StreamInput;
2627
import org.elasticsearch.tasks.TaskId;
2728

29+
import java.io.IOException;
30+
2831
import static org.elasticsearch.action.ValidateActions.addValidationError;
2932

3033
/**
@@ -53,6 +56,10 @@ public DeleteByQueryRequest(SearchRequest search) {
5356
this(search, true);
5457
}
5558

59+
public DeleteByQueryRequest(StreamInput in) throws IOException {
60+
super.readFrom(in);
61+
}
62+
5663
private DeleteByQueryRequest(SearchRequest search, boolean setDefaults) {
5764
super(search, setDefaults);
5865
// Delete-By-Query does not require the source

server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ private ReindexRequest(SearchRequest search, IndexRequest destination, boolean s
5959
this.destination = destination;
6060
}
6161

62+
public ReindexRequest(StreamInput in) throws IOException {
63+
super.readFrom(in);
64+
destination = new IndexRequest();
65+
destination.readFrom(in);
66+
remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
67+
}
68+
6269
@Override
6370
protected ReindexRequest self() {
6471
return this;
@@ -135,10 +142,7 @@ public ReindexRequest forSlice(TaskId slicingTask, SearchRequest slice, int tota
135142

136143
@Override
137144
public void readFrom(StreamInput in) throws IOException {
138-
super.readFrom(in);
139-
destination = new IndexRequest();
140-
destination.readFrom(in);
141-
remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
145+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
142146
}
143147

144148
@Override

server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public UpdateByQueryRequest(SearchRequest search) {
4747
this(search, true);
4848
}
4949

50+
public UpdateByQueryRequest(StreamInput in) throws IOException {
51+
super.readFrom(in);
52+
pipeline = in.readOptionalString();
53+
}
54+
5055
private UpdateByQueryRequest(SearchRequest search, boolean setDefaults) {
5156
super(search, setDefaults);
5257
}
@@ -108,8 +113,7 @@ public IndicesOptions indicesOptions() {
108113

109114
@Override
110115
public void readFrom(StreamInput in) throws IOException {
111-
super.readFrom(in);
112-
pipeline = in.readOptionalString();
116+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
113117
}
114118

115119
@Override

0 commit comments

Comments
 (0)