From f73202f6ccde3ed640f36160271a5e4c3919795f Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Thu, 26 Jul 2018 16:23:41 +0200 Subject: [PATCH 1/3] Changed Reindex request to use Writeable.Reader --- .../index/reindex/TransportReindexAction.java | 3 ++- .../elasticsearch/index/reindex/ReindexRequest.java | 12 ++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index e54b5f50ae674..1d80c28b58568 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.action.index.IndexRequest; @@ -104,7 +105,7 @@ public class TransportReindexAction extends HandledTransportAction)ReindexRequest::new); this.threadPool = threadPool; this.clusterService = clusterService; this.scriptService = scriptService; diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index 276c455915323..e45d039edaeae 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -59,6 +59,13 @@ private ReindexRequest(SearchRequest search, IndexRequest destination, boolean s this.destination = destination; } + public ReindexRequest(StreamInput in) throws IOException { + super.readFrom(in); + destination = new IndexRequest(); + destination.readFrom(in); + remoteInfo = in.readOptionalWriteable(RemoteInfo::new); + } + @Override protected ReindexRequest self() { return this; @@ -135,10 +142,7 @@ public ReindexRequest forSlice(TaskId slicingTask, SearchRequest slice, int tota @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - destination = new IndexRequest(); - destination.readFrom(in); - remoteInfo = in.readOptionalWriteable(RemoteInfo::new); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override From c9906ef8d374830e6e6753ce9f958043c32c3d4d Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Thu, 26 Jul 2018 19:37:53 +0200 Subject: [PATCH 2/3] fixed tests. todo: do the change for update/delete by query requests as well --- .../index/reindex/RoundTripTests.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 2dc4b59e8d9f9..dea4f8dcc3b12 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -67,19 +67,17 @@ public void testReindexRequest() throws IOException { new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, null, query, username, password, headers, socketTimeout, connectTimeout)); } - ReindexRequest tripped = new ReindexRequest(); - roundTrip(reindex, tripped); + ReindexRequest tripped = new ReindexRequest(toInputByteStream(reindex)); assertRequestEquals(reindex, tripped); // Try slices=auto with a version that doesn't support it, which should fail reindex.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, reindex, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, reindex)); assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); // Try regular slices with a version that doesn't support slices=auto, which should succeed - tripped = new ReindexRequest(); reindex.setSlices(between(1, Integer.MAX_VALUE)); - roundTrip(Version.V_6_0_0_alpha1, reindex, tripped); + tripped = new ReindexRequest(toInputByteStream(reindex)); assertRequestEquals(Version.V_6_0_0_alpha1, reindex, tripped); } @@ -217,6 +215,19 @@ private void roundTrip(Version version, Streamable example, Streamable empty) th empty.readFrom(in); } + private StreamInput toInputByteStream(Streamable example) throws IOException { + return toInputByteStream(Version.CURRENT, example); + } + + private StreamInput toInputByteStream(Version version, Streamable example) throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(version); + example.writeTo(out); + StreamInput in = out.bytes().streamInput(); + in.setVersion(version); + return in; + } + private Script randomScript() { ScriptType type = randomFrom(ScriptType.values()); String lang = random().nextBoolean() ? Script.DEFAULT_SCRIPT_LANG : randomSimpleString(random()); From 948cbfb42495416270c4968c6a13c7259d9e648c Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Fri, 27 Jul 2018 00:46:12 +0200 Subject: [PATCH 3/3] Added Writeable.Reader support for {Delete,Update}ByQueryRequest --- .../reindex/TransportDeleteByQueryAction.java | 4 +-- .../reindex/TransportUpdateByQueryAction.java | 4 +-- .../index/reindex/RoundTripTests.java | 32 +++++-------------- .../index/reindex/DeleteByQueryRequest.java | 7 ++++ .../index/reindex/UpdateByQueryRequest.java | 8 +++-- 5 files changed, 25 insertions(+), 30 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java index c1defe56adc6f..706f2c0b8f8f1 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java @@ -27,13 +27,13 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.ScriptService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.function.Supplier; public class TransportDeleteByQueryAction extends HandledTransportAction { @@ -46,7 +46,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction) DeleteByQueryRequest::new); + (Writeable.Reader) DeleteByQueryRequest::new); this.threadPool = threadPool; this.client = client; this.scriptService = scriptService; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index 34ae3fdd0c62f..00d14822ba091 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -43,7 +44,6 @@ import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Supplier; public class TransportUpdateByQueryAction extends HandledTransportAction { @@ -56,7 +56,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction) UpdateByQueryRequest::new); + (Writeable.Reader) UpdateByQueryRequest::new); this.threadPool = threadPool; this.client = client; this.scriptService = scriptService; diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index dea4f8dcc3b12..97809c9bc8dc3 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -87,20 +87,18 @@ public void testUpdateByQueryRequest() throws IOException { if (randomBoolean()) { update.setPipeline(randomAlphaOfLength(5)); } - UpdateByQueryRequest tripped = new UpdateByQueryRequest(); - roundTrip(update, tripped); + UpdateByQueryRequest tripped = new UpdateByQueryRequest(toInputByteStream(update)); assertRequestEquals(update, tripped); assertEquals(update.getPipeline(), tripped.getPipeline()); // Try slices=auto with a version that doesn't support it, which should fail update.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, update, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, update)); assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); // Try regular slices with a version that doesn't support slices=auto, which should succeed - tripped = new UpdateByQueryRequest(); update.setSlices(between(1, Integer.MAX_VALUE)); - roundTrip(Version.V_6_0_0_alpha1, update, tripped); + tripped = new UpdateByQueryRequest(toInputByteStream(update)); assertRequestEquals(update, tripped); assertEquals(update.getPipeline(), tripped.getPipeline()); } @@ -108,19 +106,17 @@ public void testUpdateByQueryRequest() throws IOException { public void testDeleteByQueryRequest() throws IOException { DeleteByQueryRequest delete = new DeleteByQueryRequest(new SearchRequest()); randomRequest(delete); - DeleteByQueryRequest tripped = new DeleteByQueryRequest(); - roundTrip(delete, tripped); + DeleteByQueryRequest tripped = new DeleteByQueryRequest(toInputByteStream(delete)); assertRequestEquals(delete, tripped); // Try slices=auto with a version that doesn't support it, which should fail delete.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, delete, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, delete)); assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); // Try regular slices with a version that doesn't support slices=auto, which should succeed - tripped = new DeleteByQueryRequest(); delete.setSlices(between(1, Integer.MAX_VALUE)); - roundTrip(Version.V_6_0_0_alpha1, delete, tripped); + tripped = new DeleteByQueryRequest(toInputByteStream(delete)); assertRequestEquals(delete, tripped); } @@ -196,25 +192,13 @@ public void testRethrottleRequest() throws IOException { request.setTaskId(new TaskId(randomAlphaOfLength(5), randomLong())); } RethrottleRequest tripped = new RethrottleRequest(); - roundTrip(request, tripped); + // We use readFrom here because Rethrottle does not support the Writeable.Reader interface + tripped.readFrom(toInputByteStream(request)); assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0.00001); assertArrayEquals(request.getActions(), tripped.getActions()); assertEquals(request.getTaskId(), tripped.getTaskId()); } - private void roundTrip(Streamable example, Streamable empty) throws IOException { - roundTrip(Version.CURRENT, example, empty); - } - - private void roundTrip(Version version, Streamable example, Streamable empty) throws IOException { - BytesStreamOutput out = new BytesStreamOutput(); - out.setVersion(version); - example.writeTo(out); - StreamInput in = out.bytes().streamInput(); - in.setVersion(version); - empty.readFrom(in); - } - private StreamInput toInputByteStream(Streamable example) throws IOException { return toInputByteStream(Version.CURRENT, example); } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java index aa8543175d9f0..f848e8722c719 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java @@ -23,8 +23,11 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.tasks.TaskId; +import java.io.IOException; + import static org.elasticsearch.action.ValidateActions.addValidationError; /** @@ -53,6 +56,10 @@ public DeleteByQueryRequest(SearchRequest search) { this(search, true); } + public DeleteByQueryRequest(StreamInput in) throws IOException { + super.readFrom(in); + } + private DeleteByQueryRequest(SearchRequest search, boolean setDefaults) { super(search, setDefaults); // Delete-By-Query does not require the source diff --git a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java index 2be0e46dbcedf..eb4fd59a7bc5f 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java @@ -47,6 +47,11 @@ public UpdateByQueryRequest(SearchRequest search) { this(search, true); } + public UpdateByQueryRequest(StreamInput in) throws IOException { + super.readFrom(in); + pipeline = in.readOptionalString(); + } + private UpdateByQueryRequest(SearchRequest search, boolean setDefaults) { super(search, setDefaults); } @@ -108,8 +113,7 @@ public IndicesOptions indicesOptions() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - pipeline = in.readOptionalString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override