Skip to content

Commit d2d6167

Browse files
authored
Propogate version in reindex from remote search (#42412)
This is related to #31908. In order to use the external version in a reindex from remote request, the search request must be configured to request the version (as it is not returned by default). This commit modifies the search request to request the version. Additionally, it modifies our current reindex from remote tests to randomly use the external version_type.
1 parent 35b6239 commit d2d6167

File tree

5 files changed

+81
-41
lines changed

5 files changed

+81
-41
lines changed

modules/reindex/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ forbiddenPatterns {
7777
exclude '**/*.p12'
7878
}
7979

80-
// Support for testing reindex-from-remote against old Elaticsearch versions
80+
// Support for testing reindex-from-remote against old Elasticsearch versions
8181
configurations {
8282
oldesFixture
8383
es2

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuilders.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,13 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
7474
request.addParameter("scroll", keepAlive.getStringRep());
7575
}
7676
request.addParameter("size", Integer.toString(searchRequest.source().size()));
77-
if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
78-
/*
79-
* Passing `null` here just add the `version` request parameter
80-
* without any value. This way of requesting the version works
81-
* for all supported versions of Elasticsearch.
82-
*/
83-
request.addParameter("version", null);
77+
78+
if (searchRequest.source().version() == null || searchRequest.source().version() == false) {
79+
request.addParameter("version", Boolean.FALSE.toString());
80+
} else {
81+
request.addParameter("version", Boolean.TRUE.toString());
8482
}
83+
8584
if (searchRequest.source().sorts() != null) {
8685
boolean useScan = false;
8786
// Detect if we should use search_type=scan rather than a sort

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ManyDocumentsIT.java

+28-11
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,35 @@ public void testReindexFromRemote() throws IOException {
7373
Map<?, ?> http = (Map<?, ?>) nodeInfo.get("http");
7474
String remote = "http://"+ http.get("publish_address");
7575
Request request = new Request("POST", "/_reindex");
76-
request.setJsonEntity(
76+
if (randomBoolean()) {
77+
request.setJsonEntity(
7778
"{\n" +
78-
" \"source\":{\n" +
79-
" \"index\":\"test\",\n" +
80-
" \"remote\":{\n" +
81-
" \"host\":\"" + remote + "\"\n" +
82-
" }\n" +
83-
" }\n," +
84-
" \"dest\":{\n" +
85-
" \"index\":\"des\"\n" +
86-
" }\n" +
87-
"}");
79+
" \"source\":{\n" +
80+
" \"index\":\"test\",\n" +
81+
" \"remote\":{\n" +
82+
" \"host\":\"" + remote + "\"\n" +
83+
" }\n" +
84+
" }\n," +
85+
" \"dest\":{\n" +
86+
" \"index\":\"des\"\n" +
87+
" }\n" +
88+
"}");
89+
} else {
90+
// Test with external version_type
91+
request.setJsonEntity(
92+
"{\n" +
93+
" \"source\":{\n" +
94+
" \"index\":\"test\",\n" +
95+
" \"remote\":{\n" +
96+
" \"host\":\"" + remote + "\"\n" +
97+
" }\n" +
98+
" }\n," +
99+
" \"dest\":{\n" +
100+
" \"index\":\"des\",\n" +
101+
" \"version_type\": \"external\"\n" +
102+
" }\n" +
103+
"}");
104+
}
88105
Map<String, Object> response = entityAsMap(client().performRequest(request));
89106
assertThat(response, hasEntry("total", count));
90107
assertThat(response, hasEntry("created", count));

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/ReindexFromOldRemoteIT.java

+31-12
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,38 @@ private void oldEsTestCase(String portPropertyName, String requestsPerSecond) th
5656
}
5757

5858
Request reindex = new Request("POST", "/_reindex");
59-
reindex.setJsonEntity(
59+
if (randomBoolean()) {
60+
// Reindex using the external version_type
61+
reindex.setJsonEntity(
6062
"{\n"
61-
+ " \"source\":{\n"
62-
+ " \"index\": \"test\",\n"
63-
+ " \"size\": 1,\n"
64-
+ " \"remote\": {\n"
65-
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
66-
+ " }\n"
67-
+ " },\n"
68-
+ " \"dest\": {\n"
69-
+ " \"index\": \"test\"\n"
70-
+ " }\n"
71-
+ "}");
63+
+ " \"source\":{\n"
64+
+ " \"index\": \"test\",\n"
65+
+ " \"size\": 1,\n"
66+
+ " \"remote\": {\n"
67+
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
68+
+ " }\n"
69+
+ " },\n"
70+
+ " \"dest\": {\n"
71+
+ " \"index\": \"test\",\n"
72+
+ " \"version_type\": \"external\"\n"
73+
+ " }\n"
74+
+ "}");
75+
} else {
76+
// Reindex using the default internal version_type
77+
reindex.setJsonEntity(
78+
"{\n"
79+
+ " \"source\":{\n"
80+
+ " \"index\": \"test\",\n"
81+
+ " \"size\": 1,\n"
82+
+ " \"remote\": {\n"
83+
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
84+
+ " }\n"
85+
+ " },\n"
86+
+ " \"dest\": {\n"
87+
+ " \"index\": \"test\"\n"
88+
+ " }\n"
89+
+ "}");
90+
}
7291
reindex.addParameter("refresh", "true");
7392
reindex.addParameter("pretty", "true");
7493
if (requestsPerSecond != null) {

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuildersTests.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void testInitialSearchParamsFields() {
113113
// Test request without any fields
114114
Version remoteVersion = Version.fromId(between(2000099, Version.CURRENT.id));
115115
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
116-
not(either(hasKey("stored_fields")).or(hasKey("fields"))));
116+
not(either(hasKey("stored_fields")).or(hasKey("fields"))));
117117

118118
// Test stored_fields for versions that support it
119119
searchRequest = new SearchRequest().source(new SearchSourceBuilder());
@@ -134,14 +134,14 @@ public void testInitialSearchParamsFields() {
134134
searchRequest.source().storedField("_source").storedField("_id");
135135
remoteVersion = Version.fromId(between(0, 2000099 - 1));
136136
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
137-
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));
137+
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));
138138

139139
// But only versions before 1.0 force _source to be in the list
140140
searchRequest = new SearchRequest().source(new SearchSourceBuilder());
141141
searchRequest.source().storedField("_id");
142142
remoteVersion = Version.fromId(between(1000099, 2000099 - 1));
143143
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
144-
hasEntry("fields", "_id,_parent,_routing,_ttl"));
144+
hasEntry("fields", "_id,_parent,_routing,_ttl"));
145145
}
146146

147147
public void testInitialSearchParamsMisc() {
@@ -161,7 +161,7 @@ public void testInitialSearchParamsMisc() {
161161
fetchVersion = randomBoolean();
162162
searchRequest.source().version(fetchVersion);
163163
}
164-
164+
165165
Map<String, String> params = initialSearch(searchRequest, query, remoteVersion).getParameters();
166166

167167
if (scroll == null) {
@@ -170,7 +170,12 @@ public void testInitialSearchParamsMisc() {
170170
assertScroll(remoteVersion, params, scroll);
171171
}
172172
assertThat(params, hasEntry("size", Integer.toString(size)));
173-
assertThat(params, fetchVersion == null || fetchVersion == true ? hasEntry("version", null) : not(hasEntry("version", null)));
173+
if (fetchVersion != null) {
174+
assertThat(params, fetchVersion ? hasEntry("version", Boolean.TRUE.toString()) :
175+
hasEntry("version", Boolean.FALSE.toString()));
176+
} else {
177+
assertThat(params, hasEntry("version", Boolean.FALSE.toString()));
178+
}
174179
}
175180

176181
private void assertScroll(Version remoteVersion, Map<String, String> params, TimeValue requested) {
@@ -197,22 +202,22 @@ public void testInitialSearchEntity() throws IOException {
197202
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
198203
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
199204
assertEquals("{\"query\":" + query + ",\"_source\":true}",
200-
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
205+
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
201206
} else {
202207
assertEquals("{\"query\":" + query + "}",
203-
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
208+
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
204209
}
205210

206211
// Source filtering is included if set up
207-
searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"});
212+
searchRequest.source().fetchSource(new String[]{"in1", "in2"}, new String[]{"out"});
208213
entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity();
209214
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
210215
assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}",
211-
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
216+
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
212217

213218
// Invalid XContent fails
214219
RuntimeException e = expectThrows(RuntimeException.class,
215-
() -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
220+
() -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
216221
assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))"));
217222
e = expectThrows(RuntimeException.class, () -> initialSearch(searchRequest, new BytesArray("{"), remoteVersion));
218223
assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input"));

0 commit comments

Comments
 (0)