Skip to content

Commit 4dc899c

Browse files
sohaibiftikharnik9000
authored andcommitted
HLREST: add reindex API (#32679)
Adds the reindex API to the high level REST client.
1 parent 5856d45 commit 4dc899c

File tree

27 files changed

+1648
-72
lines changed

27 files changed

+1648
-72
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import org.elasticsearch.common.xcontent.XContentType;
108108
import org.elasticsearch.index.VersionType;
109109
import org.elasticsearch.index.rankeval.RankEvalRequest;
110+
import org.elasticsearch.index.reindex.ReindexRequest;
110111
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
111112
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
112113
import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest;
@@ -832,6 +833,21 @@ static Request clusterHealth(ClusterHealthRequest healthRequest) {
832833
return request;
833834
}
834835

836+
static Request reindex(ReindexRequest reindexRequest) throws IOException {
837+
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
838+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
839+
Params params = new Params(request)
840+
.withRefresh(reindexRequest.isRefresh())
841+
.withTimeout(reindexRequest.getTimeout())
842+
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards(), ActiveShardCount.DEFAULT);
843+
844+
if (reindexRequest.getScrollTime() != null) {
845+
params.putParam("scroll", reindexRequest.getScrollTime());
846+
}
847+
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
848+
return request;
849+
}
850+
835851
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
836852
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
837853
.addPathPart(rolloverRequest.getNewIndexName()).build();
@@ -1140,10 +1156,10 @@ static Request xPackInfo(XPackInfoRequest infoRequest) {
11401156
static Request xPackGraphExplore(GraphExploreRequest exploreRequest) throws IOException {
11411157
String endpoint = endpoint(exploreRequest.indices(), exploreRequest.types(), "_xpack/graph/_explore");
11421158
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
1143-
request.setEntity(createEntity(exploreRequest, REQUEST_BODY_CONTENT_TYPE));
1159+
request.setEntity(createEntity(exploreRequest, REQUEST_BODY_CONTENT_TYPE));
11441160
return request;
1145-
}
1146-
1161+
}
1162+
11471163
static Request xPackWatcherPutWatch(PutWatchRequest putWatchRequest) {
11481164
String endpoint = new EndpointBuilder()
11491165
.addPathPartAsIs("_xpack")

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

+31-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import org.elasticsearch.common.xcontent.XContentType;
6666
import org.elasticsearch.index.rankeval.RankEvalRequest;
6767
import org.elasticsearch.index.rankeval.RankEvalResponse;
68+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
69+
import org.elasticsearch.index.reindex.ReindexRequest;
6870
import org.elasticsearch.plugins.spi.NamedXContentProvider;
6971
import org.elasticsearch.rest.BytesRestResponse;
7072
import org.elasticsearch.rest.RestStatus;
@@ -323,7 +325,7 @@ public final XPackClient xpack() {
323325
* Watcher APIs on elastic.co</a> for more information.
324326
*/
325327
public WatcherClient watcher() { return watcherClient; }
326-
328+
327329
/**
328330
* Provides methods for accessing the Elastic Licensed Graph explore API that
329331
* is shipped with the default distribution of Elasticsearch. All of
@@ -332,7 +334,7 @@ public final XPackClient xpack() {
332334
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/graph-explore-api.html">
333335
* Graph API on elastic.co</a> for more information.
334336
*/
335-
public GraphClient graph() { return graphClient; }
337+
public GraphClient graph() { return graphClient; }
336338

337339
/**
338340
* Provides methods for accessing the Elastic Licensed Licensing APIs that
@@ -415,6 +417,33 @@ public final void bulkAsync(BulkRequest bulkRequest, ActionListener<BulkResponse
415417
performRequestAsyncAndParseEntity(bulkRequest, RequestConverters::bulk, BulkResponse::fromXContent, listener, emptySet(), headers);
416418
}
417419

420+
/**
421+
* Executes a reindex request.
422+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
423+
* @param reindexRequest the request
424+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
425+
* @return the response
426+
* @throws IOException in case there is a problem sending the request or parsing back the response
427+
*/
428+
public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
429+
return performRequestAndParseEntity(
430+
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet()
431+
);
432+
}
433+
434+
/**
435+
* Asynchronously executes a reindex request.
436+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
437+
* @param reindexRequest the request
438+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
439+
* @param listener the listener to be notified upon request completion
440+
*/
441+
public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener<BulkByScrollResponse> listener) {
442+
performRequestAsyncAndParseEntity(
443+
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet()
444+
);
445+
}
446+
418447
/**
419448
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
420449
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

+67
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,16 @@
4141
import org.elasticsearch.action.update.UpdateResponse;
4242
import org.elasticsearch.common.Strings;
4343
import org.elasticsearch.common.bytes.BytesReference;
44+
import org.elasticsearch.common.settings.Settings;
4445
import org.elasticsearch.common.unit.ByteSizeUnit;
4546
import org.elasticsearch.common.unit.ByteSizeValue;
4647
import org.elasticsearch.common.xcontent.XContentBuilder;
4748
import org.elasticsearch.common.xcontent.XContentType;
4849
import org.elasticsearch.index.VersionType;
4950
import org.elasticsearch.index.get.GetResult;
51+
import org.elasticsearch.index.query.IdsQueryBuilder;
52+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
53+
import org.elasticsearch.index.reindex.ReindexRequest;
5054
import org.elasticsearch.rest.RestStatus;
5155
import org.elasticsearch.script.Script;
5256
import org.elasticsearch.script.ScriptType;
@@ -689,6 +693,69 @@ public void testBulk() throws IOException {
689693
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
690694
}
691695

696+
public void testReindex() throws IOException {
697+
final String sourceIndex = "source1";
698+
final String destinationIndex = "dest";
699+
{
700+
// Prepare
701+
Settings settings = Settings.builder()
702+
.put("number_of_shards", 1)
703+
.put("number_of_replicas", 0)
704+
.build();
705+
createIndex(sourceIndex, settings);
706+
createIndex(destinationIndex, settings);
707+
assertEquals(
708+
RestStatus.OK,
709+
highLevelClient().bulk(
710+
new BulkRequest()
711+
.add(new IndexRequest(sourceIndex, "type", "1")
712+
.source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
713+
.add(new IndexRequest(sourceIndex, "type", "2")
714+
.source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
715+
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
716+
RequestOptions.DEFAULT
717+
).status()
718+
);
719+
}
720+
{
721+
// test1: create one doc in dest
722+
ReindexRequest reindexRequest = new ReindexRequest();
723+
reindexRequest.setSourceIndices(sourceIndex);
724+
reindexRequest.setDestIndex(destinationIndex);
725+
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
726+
reindexRequest.setRefresh(true);
727+
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
728+
assertEquals(1, bulkResponse.getCreated());
729+
assertEquals(1, bulkResponse.getTotal());
730+
assertEquals(0, bulkResponse.getDeleted());
731+
assertEquals(0, bulkResponse.getNoops());
732+
assertEquals(0, bulkResponse.getVersionConflicts());
733+
assertEquals(1, bulkResponse.getBatches());
734+
assertTrue(bulkResponse.getTook().getMillis() > 0);
735+
assertEquals(1, bulkResponse.getBatches());
736+
assertEquals(0, bulkResponse.getBulkFailures().size());
737+
assertEquals(0, bulkResponse.getSearchFailures().size());
738+
}
739+
{
740+
// test2: create 1 and update 1
741+
ReindexRequest reindexRequest = new ReindexRequest();
742+
reindexRequest.setSourceIndices(sourceIndex);
743+
reindexRequest.setDestIndex(destinationIndex);
744+
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
745+
assertEquals(1, bulkResponse.getCreated());
746+
assertEquals(2, bulkResponse.getTotal());
747+
assertEquals(1, bulkResponse.getUpdated());
748+
assertEquals(0, bulkResponse.getDeleted());
749+
assertEquals(0, bulkResponse.getNoops());
750+
assertEquals(0, bulkResponse.getVersionConflicts());
751+
assertEquals(1, bulkResponse.getBatches());
752+
assertTrue(bulkResponse.getTook().getMillis() > 0);
753+
assertEquals(1, bulkResponse.getBatches());
754+
assertEquals(0, bulkResponse.getBulkFailures().size());
755+
assertEquals(0, bulkResponse.getSearchFailures().size());
756+
}
757+
}
758+
692759
public void testBulkProcessorIntegration() throws IOException {
693760
int nbItems = randomIntBetween(10, 100);
694761
boolean[] errors = new boolean[nbItems];

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

+63
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.elasticsearch.common.xcontent.XContentHelper;
118118
import org.elasticsearch.common.xcontent.XContentParser;
119119
import org.elasticsearch.common.xcontent.XContentType;
120+
import org.elasticsearch.common.xcontent.json.JsonXContent;
120121
import org.elasticsearch.index.RandomCreateIndexGenerator;
121122
import org.elasticsearch.index.VersionType;
122123
import org.elasticsearch.index.query.QueryBuilder;
@@ -127,6 +128,8 @@
127128
import org.elasticsearch.index.rankeval.RankEvalSpec;
128129
import org.elasticsearch.index.rankeval.RatedRequest;
129130
import org.elasticsearch.index.rankeval.RestRankEvalAction;
131+
import org.elasticsearch.index.reindex.ReindexRequest;
132+
import org.elasticsearch.index.reindex.RemoteInfo;
130133
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
131134
import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest;
132135
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
@@ -173,13 +176,15 @@
173176
import java.util.function.Supplier;
174177
import java.util.stream.Collectors;
175178

179+
import static java.util.Collections.emptyMap;
176180
import static java.util.Collections.singletonMap;
177181
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
178182
import static org.elasticsearch.client.RequestConverters.enforceSameContentType;
179183
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomAliases;
180184
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomCreateIndexRequest;
181185
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomIndexSettings;
182186
import static org.elasticsearch.index.alias.RandomAliasActionsGenerator.randomAliasAction;
187+
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
183188
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
184189
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
185190
import static org.hamcrest.CoreMatchers.equalTo;
@@ -417,6 +422,64 @@ public void testUpdateAliases() throws IOException {
417422
assertToXContentBody(indicesAliasesRequest, request.getEntity());
418423
}
419424

425+
public void testReindex() throws IOException {
426+
ReindexRequest reindexRequest = new ReindexRequest();
427+
reindexRequest.setSourceIndices("source_idx");
428+
reindexRequest.setDestIndex("dest_idx");
429+
Map<String, String> expectedParams = new HashMap<>();
430+
if (randomBoolean()) {
431+
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
432+
RemoteInfo remoteInfo = new RemoteInfo("http", "remote-host", 9200, null,
433+
BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)),
434+
"user",
435+
"pass",
436+
emptyMap(),
437+
RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
438+
RemoteInfo.DEFAULT_CONNECT_TIMEOUT
439+
);
440+
reindexRequest.setRemoteInfo(remoteInfo);
441+
}
442+
if (randomBoolean()) {
443+
reindexRequest.setSourceDocTypes("doc", "tweet");
444+
}
445+
if (randomBoolean()) {
446+
reindexRequest.setSourceBatchSize(randomInt(100));
447+
}
448+
if (randomBoolean()) {
449+
reindexRequest.setDestDocType("tweet_and_doc");
450+
}
451+
if (randomBoolean()) {
452+
reindexRequest.setDestOpType("create");
453+
}
454+
if (randomBoolean()) {
455+
reindexRequest.setDestPipeline("my_pipeline");
456+
}
457+
if (randomBoolean()) {
458+
reindexRequest.setDestRouting("=cat");
459+
}
460+
if (randomBoolean()) {
461+
reindexRequest.setSize(randomIntBetween(100, 1000));
462+
}
463+
if (randomBoolean()) {
464+
reindexRequest.setAbortOnVersionConflict(false);
465+
}
466+
if (randomBoolean()) {
467+
String ts = randomTimeValue();
468+
reindexRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
469+
}
470+
if (reindexRequest.getRemoteInfo() == null && randomBoolean()) {
471+
reindexRequest.setSourceQuery(new TermQueryBuilder("foo", "fooval"));
472+
}
473+
setRandomTimeout(reindexRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
474+
setRandomWaitForActiveShards(reindexRequest::setWaitForActiveShards, ActiveShardCount.DEFAULT, expectedParams);
475+
expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep());
476+
Request request = RequestConverters.reindex(reindexRequest);
477+
assertEquals("/_reindex", request.getEndpoint());
478+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
479+
assertEquals(expectedParams, request.getParameters());
480+
assertToXContentBody(reindexRequest, request.getEntity());
481+
}
482+
420483
public void testPutMapping() throws IOException {
421484
PutMappingRequest putMappingRequest = new PutMappingRequest();
422485

client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,6 @@ public void testApiNamingConventions() throws Exception {
678678
"indices.put_alias",
679679
"mtermvectors",
680680
"put_script",
681-
"reindex",
682681
"reindex_rethrottle",
683682
"render_search_template",
684683
"scripts_painless_execute",

0 commit comments

Comments
 (0)