Skip to content

Commit 9e36add

Browse files
committed
Propogate version in reindex from remote search (elastic#42412)
This is related to elastic#31908. In order to use the external version in a reindex from remote request, the search request must be configured to request the version (as it is not returned by default). This commit modifies the search request to request the version. Additionally, it modifies our current reindex from remote tests to randomly use the external version_type.
1 parent 2de919e commit 9e36add

File tree

5 files changed

+81
-41
lines changed

5 files changed

+81
-41
lines changed

modules/reindex/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ forbiddenPatterns {
7777
exclude '**/*.p12'
7878
}
7979

80-
// Support for testing reindex-from-remote against old Elaticsearch versions
80+
// Support for testing reindex-from-remote against old Elasticsearch versions
8181
configurations {
8282
oldesFixture
8383
es2

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuilders.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,13 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
8484
request.addParameter("scroll", keepAlive.getStringRep());
8585
}
8686
request.addParameter("size", Integer.toString(searchRequest.source().size()));
87-
if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
88-
/*
89-
* Passing `null` here just add the `version` request parameter
90-
* without any value. This way of requesting the version works
91-
* for all supported versions of Elasticsearch.
92-
*/
93-
request.addParameter("version", null);
87+
88+
if (searchRequest.source().version() == null || searchRequest.source().version() == false) {
89+
request.addParameter("version", Boolean.FALSE.toString());
90+
} else {
91+
request.addParameter("version", Boolean.TRUE.toString());
9492
}
93+
9594
if (searchRequest.source().sorts() != null) {
9695
boolean useScan = false;
9796
// Detect if we should use search_type=scan rather than a sort

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ManyDocumentsIT.java

+28-11
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,35 @@ public void testReindexFromRemote() throws IOException {
7373
Map<?, ?> http = (Map<?, ?>) nodeInfo.get("http");
7474
String remote = "http://"+ http.get("publish_address");
7575
Request request = new Request("POST", "/_reindex");
76-
request.setJsonEntity(
76+
if (randomBoolean()) {
77+
request.setJsonEntity(
7778
"{\n" +
78-
" \"source\":{\n" +
79-
" \"index\":\"test\",\n" +
80-
" \"remote\":{\n" +
81-
" \"host\":\"" + remote + "\"\n" +
82-
" }\n" +
83-
" }\n," +
84-
" \"dest\":{\n" +
85-
" \"index\":\"des\"\n" +
86-
" }\n" +
87-
"}");
79+
" \"source\":{\n" +
80+
" \"index\":\"test\",\n" +
81+
" \"remote\":{\n" +
82+
" \"host\":\"" + remote + "\"\n" +
83+
" }\n" +
84+
" }\n," +
85+
" \"dest\":{\n" +
86+
" \"index\":\"des\"\n" +
87+
" }\n" +
88+
"}");
89+
} else {
90+
// Test with external version_type
91+
request.setJsonEntity(
92+
"{\n" +
93+
" \"source\":{\n" +
94+
" \"index\":\"test\",\n" +
95+
" \"remote\":{\n" +
96+
" \"host\":\"" + remote + "\"\n" +
97+
" }\n" +
98+
" }\n," +
99+
" \"dest\":{\n" +
100+
" \"index\":\"des\",\n" +
101+
" \"version_type\": \"external\"\n" +
102+
" }\n" +
103+
"}");
104+
}
88105
Map<String, Object> response = entityAsMap(client().performRequest(request));
89106
assertThat(response, hasEntry("total", count));
90107
assertThat(response, hasEntry("created", count));

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/ReindexFromOldRemoteIT.java

+31-12
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,38 @@ private void oldEsTestCase(String portPropertyName, String requestsPerSecond) th
5656
}
5757

5858
Request reindex = new Request("POST", "/_reindex");
59-
reindex.setJsonEntity(
59+
if (randomBoolean()) {
60+
// Reindex using the external version_type
61+
reindex.setJsonEntity(
6062
"{\n"
61-
+ " \"source\":{\n"
62-
+ " \"index\": \"test\",\n"
63-
+ " \"size\": 1,\n"
64-
+ " \"remote\": {\n"
65-
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
66-
+ " }\n"
67-
+ " },\n"
68-
+ " \"dest\": {\n"
69-
+ " \"index\": \"test\"\n"
70-
+ " }\n"
71-
+ "}");
63+
+ " \"source\":{\n"
64+
+ " \"index\": \"test\",\n"
65+
+ " \"size\": 1,\n"
66+
+ " \"remote\": {\n"
67+
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
68+
+ " }\n"
69+
+ " },\n"
70+
+ " \"dest\": {\n"
71+
+ " \"index\": \"test\",\n"
72+
+ " \"version_type\": \"external\"\n"
73+
+ " }\n"
74+
+ "}");
75+
} else {
76+
// Reindex using the default internal version_type
77+
reindex.setJsonEntity(
78+
"{\n"
79+
+ " \"source\":{\n"
80+
+ " \"index\": \"test\",\n"
81+
+ " \"size\": 1,\n"
82+
+ " \"remote\": {\n"
83+
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
84+
+ " }\n"
85+
+ " },\n"
86+
+ " \"dest\": {\n"
87+
+ " \"index\": \"test\"\n"
88+
+ " }\n"
89+
+ "}");
90+
}
7291
reindex.addParameter("refresh", "true");
7392
reindex.addParameter("pretty", "true");
7493
if (requestsPerSecond != null) {

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuildersTests.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void testInitialSearchParamsFields() {
141141
// Test request without any fields
142142
Version remoteVersion = Version.fromId(between(2000099, Version.CURRENT.id));
143143
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
144-
not(either(hasKey("stored_fields")).or(hasKey("fields"))));
144+
not(either(hasKey("stored_fields")).or(hasKey("fields"))));
145145

146146
// Test stored_fields for versions that support it
147147
searchRequest = new SearchRequest().source(new SearchSourceBuilder());
@@ -162,14 +162,14 @@ public void testInitialSearchParamsFields() {
162162
searchRequest.source().storedField("_source").storedField("_id");
163163
remoteVersion = Version.fromId(between(0, 2000099 - 1));
164164
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
165-
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));
165+
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));
166166

167167
// But only versions before 1.0 force _source to be in the list
168168
searchRequest = new SearchRequest().source(new SearchSourceBuilder());
169169
searchRequest.source().storedField("_id");
170170
remoteVersion = Version.fromId(between(1000099, 2000099 - 1));
171171
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
172-
hasEntry("fields", "_id,_parent,_routing,_ttl"));
172+
hasEntry("fields", "_id,_parent,_routing,_ttl"));
173173
}
174174

175175
public void testInitialSearchParamsMisc() {
@@ -189,7 +189,7 @@ public void testInitialSearchParamsMisc() {
189189
fetchVersion = randomBoolean();
190190
searchRequest.source().version(fetchVersion);
191191
}
192-
192+
193193
Map<String, String> params = initialSearch(searchRequest, query, remoteVersion).getParameters();
194194

195195
if (scroll == null) {
@@ -198,7 +198,12 @@ public void testInitialSearchParamsMisc() {
198198
assertScroll(remoteVersion, params, scroll);
199199
}
200200
assertThat(params, hasEntry("size", Integer.toString(size)));
201-
assertThat(params, fetchVersion == null || fetchVersion == true ? hasEntry("version", null) : not(hasEntry("version", null)));
201+
if (fetchVersion != null) {
202+
assertThat(params, fetchVersion ? hasEntry("version", Boolean.TRUE.toString()) :
203+
hasEntry("version", Boolean.FALSE.toString()));
204+
} else {
205+
assertThat(params, hasEntry("version", Boolean.FALSE.toString()));
206+
}
202207
}
203208

204209
private void assertScroll(Version remoteVersion, Map<String, String> params, TimeValue requested) {
@@ -225,22 +230,22 @@ public void testInitialSearchEntity() throws IOException {
225230
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
226231
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
227232
assertEquals("{\"query\":" + query + ",\"_source\":true}",
228-
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
233+
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
229234
} else {
230235
assertEquals("{\"query\":" + query + "}",
231-
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
236+
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
232237
}
233238

234239
// Source filtering is included if set up
235-
searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"});
240+
searchRequest.source().fetchSource(new String[]{"in1", "in2"}, new String[]{"out"});
236241
entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity();
237242
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
238243
assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}",
239-
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
244+
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
240245

241246
// Invalid XContent fails
242247
RuntimeException e = expectThrows(RuntimeException.class,
243-
() -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
248+
() -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
244249
assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))"));
245250
e = expectThrows(RuntimeException.class, () -> initialSearch(searchRequest, new BytesArray("{"), remoteVersion));
246251
assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input"));

0 commit comments

Comments
 (0)