Skip to content

Commit 7f5e29d

Browse files
sohaibiftikharnik9000
authored andcommitted
HLREST: add reindex API (#32679)
Adds the reindex API to the high level REST client.
1 parent 353112a commit 7f5e29d

File tree

27 files changed

+1643
-67
lines changed

27 files changed

+1643
-67
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

+16
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
import org.elasticsearch.common.xcontent.XContentType;
107107
import org.elasticsearch.index.VersionType;
108108
import org.elasticsearch.index.rankeval.RankEvalRequest;
109+
import org.elasticsearch.index.reindex.ReindexRequest;
109110
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
110111
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
111112
import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest;
@@ -820,6 +821,21 @@ static Request clusterHealth(ClusterHealthRequest healthRequest) {
820821
return request;
821822
}
822823

824+
static Request reindex(ReindexRequest reindexRequest) throws IOException {
825+
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
826+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
827+
Params params = new Params(request)
828+
.withRefresh(reindexRequest.isRefresh())
829+
.withTimeout(reindexRequest.getTimeout())
830+
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards());
831+
832+
if (reindexRequest.getScrollTime() != null) {
833+
params.putParam("scroll", reindexRequest.getScrollTime());
834+
}
835+
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
836+
return request;
837+
}
838+
823839
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
824840
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
825841
.addPathPart(rolloverRequest.getNewIndexName()).build();

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

+29
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import org.elasticsearch.common.xcontent.XContentType;
6565
import org.elasticsearch.index.rankeval.RankEvalRequest;
6666
import org.elasticsearch.index.rankeval.RankEvalResponse;
67+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
68+
import org.elasticsearch.index.reindex.ReindexRequest;
6769
import org.elasticsearch.plugins.spi.NamedXContentProvider;
6870
import org.elasticsearch.rest.BytesRestResponse;
6971
import org.elasticsearch.rest.RestStatus;
@@ -395,6 +397,33 @@ public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, Act
395397
performRequestAsyncAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, listener, emptySet());
396398
}
397399

400+
/**
401+
* Executes a reindex request.
402+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
403+
* @param reindexRequest the request
404+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
405+
* @return the response
406+
* @throws IOException in case there is a problem sending the request or parsing back the response
407+
*/
408+
public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
409+
return performRequestAndParseEntity(
410+
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet()
411+
);
412+
}
413+
414+
/**
415+
* Asynchronously executes a reindex request.
416+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
417+
* @param reindexRequest the request
418+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
419+
* @param listener the listener to be notified upon request completion
420+
*/
421+
public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener<BulkByScrollResponse> listener) {
422+
performRequestAsyncAndParseEntity(
423+
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet()
424+
);
425+
}
426+
398427
/**
399428
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
400429
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

+67
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,16 @@
4141
import org.elasticsearch.action.update.UpdateResponse;
4242
import org.elasticsearch.common.Strings;
4343
import org.elasticsearch.common.bytes.BytesReference;
44+
import org.elasticsearch.common.settings.Settings;
4445
import org.elasticsearch.common.unit.ByteSizeUnit;
4546
import org.elasticsearch.common.unit.ByteSizeValue;
4647
import org.elasticsearch.common.xcontent.XContentBuilder;
4748
import org.elasticsearch.common.xcontent.XContentType;
4849
import org.elasticsearch.index.VersionType;
4950
import org.elasticsearch.index.get.GetResult;
51+
import org.elasticsearch.index.query.IdsQueryBuilder;
52+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
53+
import org.elasticsearch.index.reindex.ReindexRequest;
5054
import org.elasticsearch.rest.RestStatus;
5155
import org.elasticsearch.script.Script;
5256
import org.elasticsearch.script.ScriptType;
@@ -624,6 +628,69 @@ public void testBulk() throws IOException {
624628
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
625629
}
626630

631+
public void testReindex() throws IOException {
632+
final String sourceIndex = "source1";
633+
final String destinationIndex = "dest";
634+
{
635+
// Prepare
636+
Settings settings = Settings.builder()
637+
.put("number_of_shards", 1)
638+
.put("number_of_replicas", 0)
639+
.build();
640+
createIndex(sourceIndex, settings);
641+
createIndex(destinationIndex, settings);
642+
assertEquals(
643+
RestStatus.OK,
644+
highLevelClient().bulk(
645+
new BulkRequest()
646+
.add(new IndexRequest(sourceIndex, "type", "1")
647+
.source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
648+
.add(new IndexRequest(sourceIndex, "type", "2")
649+
.source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
650+
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
651+
RequestOptions.DEFAULT
652+
).status()
653+
);
654+
}
655+
{
656+
// test1: create one doc in dest
657+
ReindexRequest reindexRequest = new ReindexRequest();
658+
reindexRequest.setSourceIndices(sourceIndex);
659+
reindexRequest.setDestIndex(destinationIndex);
660+
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
661+
reindexRequest.setRefresh(true);
662+
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
663+
assertEquals(1, bulkResponse.getCreated());
664+
assertEquals(1, bulkResponse.getTotal());
665+
assertEquals(0, bulkResponse.getDeleted());
666+
assertEquals(0, bulkResponse.getNoops());
667+
assertEquals(0, bulkResponse.getVersionConflicts());
668+
assertEquals(1, bulkResponse.getBatches());
669+
assertTrue(bulkResponse.getTook().getMillis() > 0);
670+
assertEquals(1, bulkResponse.getBatches());
671+
assertEquals(0, bulkResponse.getBulkFailures().size());
672+
assertEquals(0, bulkResponse.getSearchFailures().size());
673+
}
674+
{
675+
// test2: create 1 and update 1
676+
ReindexRequest reindexRequest = new ReindexRequest();
677+
reindexRequest.setSourceIndices(sourceIndex);
678+
reindexRequest.setDestIndex(destinationIndex);
679+
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
680+
assertEquals(1, bulkResponse.getCreated());
681+
assertEquals(2, bulkResponse.getTotal());
682+
assertEquals(1, bulkResponse.getUpdated());
683+
assertEquals(0, bulkResponse.getDeleted());
684+
assertEquals(0, bulkResponse.getNoops());
685+
assertEquals(0, bulkResponse.getVersionConflicts());
686+
assertEquals(1, bulkResponse.getBatches());
687+
assertTrue(bulkResponse.getTook().getMillis() > 0);
688+
assertEquals(1, bulkResponse.getBatches());
689+
assertEquals(0, bulkResponse.getBulkFailures().size());
690+
assertEquals(0, bulkResponse.getSearchFailures().size());
691+
}
692+
}
693+
627694
public void testBulkProcessorIntegration() throws IOException {
628695
int nbItems = randomIntBetween(10, 100);
629696
boolean[] errors = new boolean[nbItems];

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

+63
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import org.elasticsearch.common.xcontent.XContentHelper;
117117
import org.elasticsearch.common.xcontent.XContentParser;
118118
import org.elasticsearch.common.xcontent.XContentType;
119+
import org.elasticsearch.common.xcontent.json.JsonXContent;
119120
import org.elasticsearch.index.RandomCreateIndexGenerator;
120121
import org.elasticsearch.index.VersionType;
121122
import org.elasticsearch.index.query.QueryBuilder;
@@ -126,6 +127,8 @@
126127
import org.elasticsearch.index.rankeval.RankEvalSpec;
127128
import org.elasticsearch.index.rankeval.RatedRequest;
128129
import org.elasticsearch.index.rankeval.RestRankEvalAction;
130+
import org.elasticsearch.index.reindex.ReindexRequest;
131+
import org.elasticsearch.index.reindex.RemoteInfo;
129132
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
130133
import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest;
131134
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
@@ -172,13 +175,15 @@
172175
import java.util.function.Supplier;
173176
import java.util.stream.Collectors;
174177

178+
import static java.util.Collections.emptyMap;
175179
import static java.util.Collections.singletonMap;
176180
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
177181
import static org.elasticsearch.client.RequestConverters.enforceSameContentType;
178182
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomAliases;
179183
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomCreateIndexRequest;
180184
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomIndexSettings;
181185
import static org.elasticsearch.index.alias.RandomAliasActionsGenerator.randomAliasAction;
186+
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
182187
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
183188
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
184189
import static org.hamcrest.CoreMatchers.equalTo;
@@ -407,6 +412,64 @@ public void testUpdateAliases() throws IOException {
407412
assertToXContentBody(indicesAliasesRequest, request.getEntity());
408413
}
409414

415+
public void testReindex() throws IOException {
416+
ReindexRequest reindexRequest = new ReindexRequest();
417+
reindexRequest.setSourceIndices("source_idx");
418+
reindexRequest.setDestIndex("dest_idx");
419+
Map<String, String> expectedParams = new HashMap<>();
420+
if (randomBoolean()) {
421+
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
422+
RemoteInfo remoteInfo = new RemoteInfo("http", "remote-host", 9200, null,
423+
BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)),
424+
"user",
425+
"pass",
426+
emptyMap(),
427+
RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
428+
RemoteInfo.DEFAULT_CONNECT_TIMEOUT
429+
);
430+
reindexRequest.setRemoteInfo(remoteInfo);
431+
}
432+
if (randomBoolean()) {
433+
reindexRequest.setSourceDocTypes("doc", "tweet");
434+
}
435+
if (randomBoolean()) {
436+
reindexRequest.setSourceBatchSize(randomInt(100));
437+
}
438+
if (randomBoolean()) {
439+
reindexRequest.setDestDocType("tweet_and_doc");
440+
}
441+
if (randomBoolean()) {
442+
reindexRequest.setDestOpType("create");
443+
}
444+
if (randomBoolean()) {
445+
reindexRequest.setDestPipeline("my_pipeline");
446+
}
447+
if (randomBoolean()) {
448+
reindexRequest.setDestRouting("=cat");
449+
}
450+
if (randomBoolean()) {
451+
reindexRequest.setSize(randomIntBetween(100, 1000));
452+
}
453+
if (randomBoolean()) {
454+
reindexRequest.setAbortOnVersionConflict(false);
455+
}
456+
if (randomBoolean()) {
457+
String ts = randomTimeValue();
458+
reindexRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
459+
}
460+
if (reindexRequest.getRemoteInfo() == null && randomBoolean()) {
461+
reindexRequest.setSourceQuery(new TermQueryBuilder("foo", "fooval"));
462+
}
463+
setRandomTimeout(reindexRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
464+
setRandomWaitForActiveShards(reindexRequest::setWaitForActiveShards, ActiveShardCount.DEFAULT, expectedParams);
465+
expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep());
466+
Request request = RequestConverters.reindex(reindexRequest);
467+
assertEquals("/_reindex", request.getEndpoint());
468+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
469+
assertEquals(expectedParams, request.getParameters());
470+
assertToXContentBody(reindexRequest, request.getEntity());
471+
}
472+
410473
public void testPutMapping() throws IOException {
411474
PutMappingRequest putMappingRequest = new PutMappingRequest();
412475

client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,6 @@ public void testApiNamingConventions() throws Exception {
660660
"indices.put_alias",
661661
"mtermvectors",
662662
"put_script",
663-
"reindex",
664663
"reindex_rethrottle",
665664
"render_search_template",
666665
"scripts_painless_execute",

0 commit comments

Comments
 (0)