Skip to content

Commit ef4ecb1

Browse files
authored
Reindex: Use request flavored methods (#30317)
Use the new request flavored methods for the low level rest client introduced in #29623 in reindex.
1 parent 1b22477 commit ef4ecb1

File tree

3 files changed

+93
-94
lines changed

3 files changed

+93
-94
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteRequestBuilders.java

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919

2020
package org.elasticsearch.index.reindex.remote;
2121

22-
import org.apache.http.HttpEntity;
2322
import org.apache.http.entity.ByteArrayEntity;
2423
import org.apache.http.entity.ContentType;
2524
import org.apache.http.entity.StringEntity;
2625
import org.apache.lucene.util.BytesRef;
2726
import org.elasticsearch.ElasticsearchException;
2827
import org.elasticsearch.Version;
2928
import org.elasticsearch.action.search.SearchRequest;
29+
import org.elasticsearch.client.Request;
3030
import org.elasticsearch.common.Strings;
3131
import org.elasticsearch.common.bytes.BytesReference;
3232
import org.elasticsearch.common.unit.TimeValue;
@@ -40,33 +40,27 @@
4040
import org.elasticsearch.search.sort.SortBuilder;
4141

4242
import java.io.IOException;
43-
import java.util.HashMap;
44-
import java.util.Map;
4543

46-
import static java.util.Collections.singletonMap;
4744
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
4845

4946
/**
5047
* Builds requests for remote version of Elasticsearch. Note that unlike most of the
5148
* rest of Elasticsearch this file needs to be compatible with very old versions of
52-
* Elasticsearch. Thus is often uses identifiers for versions like {@code 2000099}
49+
* Elasticsearch. Thus it often uses identifiers for versions like {@code 2000099}
5350
* for {@code 2.0.0-alpha1}. Do not drop support for features from this file just
5451
* because the version constants have been removed.
5552
*/
5653
final class RemoteRequestBuilders {
5754
private RemoteRequestBuilders() {}
5855

59-
static String initialSearchPath(SearchRequest searchRequest) {
56+
static Request initialSearch(SearchRequest searchRequest, BytesReference query, Version remoteVersion) {
6057
// It is nasty to build paths with StringBuilder but we'll be careful....
6158
StringBuilder path = new StringBuilder("/");
6259
addIndexesOrTypes(path, "Index", searchRequest.indices());
6360
addIndexesOrTypes(path, "Type", searchRequest.types());
6461
path.append("_search");
65-
return path.toString();
66-
}
62+
Request request = new Request("POST", path.toString());
6763

68-
static Map<String, String> initialSearchParams(SearchRequest searchRequest, Version remoteVersion) {
69-
Map<String, String> params = new HashMap<>();
7064
if (searchRequest.scroll() != null) {
7165
TimeValue keepAlive = searchRequest.scroll().keepAlive();
7266
if (remoteVersion.before(Version.V_5_0_0)) {
@@ -75,16 +69,16 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers
7569
* timeout seems safer than less. */
7670
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
7771
}
78-
params.put("scroll", keepAlive.getStringRep());
72+
request.addParameter("scroll", keepAlive.getStringRep());
7973
}
80-
params.put("size", Integer.toString(searchRequest.source().size()));
74+
request.addParameter("size", Integer.toString(searchRequest.source().size()));
8175
if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
8276
/*
8377
* Passing `null` here just add the `version` request parameter
8478
* without any value. This way of requesting the version works
8579
* for all supported versions of Elasticsearch.
8680
*/
87-
params.put("version", null);
81+
request.addParameter("version", null);
8882
}
8983
if (searchRequest.source().sorts() != null) {
9084
boolean useScan = false;
@@ -101,13 +95,13 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers
10195
}
10296
}
10397
if (useScan) {
104-
params.put("search_type", "scan");
98+
request.addParameter("search_type", "scan");
10599
} else {
106100
StringBuilder sorts = new StringBuilder(sortToUri(searchRequest.source().sorts().get(0)));
107101
for (int i = 1; i < searchRequest.source().sorts().size(); i++) {
108102
sorts.append(',').append(sortToUri(searchRequest.source().sorts().get(i)));
109103
}
110-
params.put("sort", sorts.toString());
104+
request.addParameter("sort", sorts.toString());
111105
}
112106
}
113107
if (remoteVersion.before(Version.fromId(2000099))) {
@@ -126,20 +120,18 @@ static Map<String, String> initialSearchParams(SearchRequest searchRequest, Vers
126120
fields.append(',').append(searchRequest.source().storedFields().fieldNames().get(i));
127121
}
128122
String storedFieldsParamName = remoteVersion.before(Version.V_5_0_0_alpha4) ? "fields" : "stored_fields";
129-
params.put(storedFieldsParamName, fields.toString());
123+
request.addParameter(storedFieldsParamName, fields.toString());
130124
}
131-
return params;
132-
}
133125

134-
static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReference query, Version remoteVersion) {
135126
// EMPTY is safe here because we're not calling namedObject
136127
try (XContentBuilder entity = JsonXContent.contentBuilder();
137128
XContentParser queryParser = XContentHelper
138129
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, query)) {
139130
entity.startObject();
140131

141132
entity.field("query"); {
142-
/* We're intentionally a bit paranoid here - copying the query as xcontent rather than writing a raw field. We don't want
133+
/* We're intentionally a bit paranoid here - copying the query
134+
* as xcontent rather than writing a raw field. We don't want
143135
* poorly written queries to escape. Ever. */
144136
entity.copyCurrentStructure(queryParser);
145137
XContentParser.Token shouldBeEof = queryParser.nextToken();
@@ -160,10 +152,11 @@ static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReferenc
160152

161153
entity.endObject();
162154
BytesRef bytes = BytesReference.bytes(entity).toBytesRef();
163-
return new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON);
155+
request.setEntity(new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON));
164156
} catch (IOException e) {
165157
throw new ElasticsearchException("unexpected error building entity", e);
166158
}
159+
return request;
167160
}
168161

169162
private static void addIndexesOrTypes(StringBuilder path, String name, String[] indicesOrTypes) {
@@ -193,45 +186,50 @@ private static String sortToUri(SortBuilder<?> sort) {
193186
throw new IllegalArgumentException("Unsupported sort [" + sort + "]");
194187
}
195188

196-
static String scrollPath() {
197-
return "/_search/scroll";
198-
}
189+
static Request scroll(String scroll, TimeValue keepAlive, Version remoteVersion) {
190+
Request request = new Request("POST", "/_search/scroll");
199191

200-
static Map<String, String> scrollParams(TimeValue keepAlive, Version remoteVersion) {
201192
if (remoteVersion.before(Version.V_5_0_0)) {
202193
/* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros
203194
* so we toss out that resolution, rounding up so we shouldn't end up
204195
* with 0s. */
205196
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
206197
}
207-
return singletonMap("scroll", keepAlive.getStringRep());
208-
}
198+
request.addParameter("scroll", keepAlive.getStringRep());
209199

210-
static HttpEntity scrollEntity(String scroll, Version remoteVersion) {
211200
if (remoteVersion.before(Version.fromId(2000099))) {
212201
// Versions before 2.0.0 extract the plain scroll_id from the body
213-
return new StringEntity(scroll, ContentType.TEXT_PLAIN);
202+
request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN));
203+
return request;
214204
}
205+
215206
try (XContentBuilder entity = JsonXContent.contentBuilder()) {
216-
return new StringEntity(Strings.toString(entity.startObject()
217-
.field("scroll_id", scroll)
218-
.endObject()), ContentType.APPLICATION_JSON);
207+
entity.startObject()
208+
.field("scroll_id", scroll)
209+
.endObject();
210+
request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON));
219211
} catch (IOException e) {
220212
throw new ElasticsearchException("failed to build scroll entity", e);
221213
}
214+
return request;
222215
}
223216

224-
static HttpEntity clearScrollEntity(String scroll, Version remoteVersion) {
217+
static Request clearScroll(String scroll, Version remoteVersion) {
218+
Request request = new Request("DELETE", "/_search/scroll");
219+
225220
if (remoteVersion.before(Version.fromId(2000099))) {
226221
// Versions before 2.0.0 extract the plain scroll_id from the body
227-
return new StringEntity(scroll, ContentType.TEXT_PLAIN);
222+
request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN));
223+
return request;
228224
}
229225
try (XContentBuilder entity = JsonXContent.contentBuilder()) {
230-
return new StringEntity(Strings.toString(entity.startObject()
231-
.array("scroll_id", scroll)
232-
.endObject()), ContentType.APPLICATION_JSON);
226+
entity.startObject()
227+
.array("scroll_id", scroll)
228+
.endObject();
229+
request.setEntity(new StringEntity(Strings.toString(entity), ContentType.APPLICATION_JSON));
233230
} catch (IOException e) {
234231
throw new ElasticsearchException("failed to build clear scroll entity", e);
235232
}
233+
return request;
236234
}
237235
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,43 +30,34 @@
3030
import org.elasticsearch.ElasticsearchStatusException;
3131
import org.elasticsearch.Version;
3232
import org.elasticsearch.action.bulk.BackoffPolicy;
33-
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
34-
import org.elasticsearch.common.xcontent.XContentParseException;
3533
import org.elasticsearch.index.reindex.ScrollableHitSource;
3634
import org.elasticsearch.action.search.SearchRequest;
35+
import org.elasticsearch.client.Request;
3736
import org.elasticsearch.client.ResponseException;
3837
import org.elasticsearch.client.ResponseListener;
3938
import org.elasticsearch.client.RestClient;
4039
import org.elasticsearch.common.Nullable;
41-
import org.elasticsearch.common.ParsingException;
4240
import org.elasticsearch.common.Strings;
4341
import org.elasticsearch.common.bytes.BytesReference;
4442
import org.elasticsearch.common.unit.TimeValue;
4543
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4644
import org.elasticsearch.common.util.concurrent.ThreadContext;
45+
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
4746
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
4847
import org.elasticsearch.common.xcontent.XContentParser;
48+
import org.elasticsearch.common.xcontent.XContentParseException;
4949
import org.elasticsearch.common.xcontent.XContentType;
5050
import org.elasticsearch.rest.RestStatus;
5151
import org.elasticsearch.threadpool.ThreadPool;
5252

5353
import java.io.IOException;
5454
import java.io.InputStream;
5555
import java.util.Iterator;
56-
import java.util.Map;
5756
import java.util.function.BiFunction;
5857
import java.util.function.Consumer;
5958

60-
import static java.util.Collections.emptyMap;
6159
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
6260
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
63-
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity;
64-
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity;
65-
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams;
66-
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath;
67-
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity;
68-
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams;
69-
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollPath;
7061
import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.MAIN_ACTION_PARSER;
7162
import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.RESPONSE_PARSER;
7263

@@ -88,13 +79,13 @@ public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, Thr
8879
protected void doStart(Consumer<? super Response> onResponse) {
8980
lookupRemoteVersion(version -> {
9081
remoteVersion = version;
91-
execute("POST", initialSearchPath(searchRequest), initialSearchParams(searchRequest, version),
92-
initialSearchEntity(searchRequest, query, remoteVersion), RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
82+
execute(RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
83+
RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
9384
});
9485
}
9586

9687
void lookupRemoteVersion(Consumer<Version> onVersion) {
97-
execute("GET", "", emptyMap(), null, MAIN_ACTION_PARSER, onVersion);
88+
execute(new Request("GET", ""), MAIN_ACTION_PARSER, onVersion);
9889
}
9990

10091
private void onStartResponse(Consumer<? super Response> onResponse, Response response) {
@@ -108,15 +99,13 @@ private void onStartResponse(Consumer<? super Response> onResponse, Response res
10899

109100
@Override
110101
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse) {
111-
Map<String, String> scrollParams = scrollParams(
112-
timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()),
113-
remoteVersion);
114-
execute("POST", scrollPath(), scrollParams, scrollEntity(scrollId, remoteVersion), RESPONSE_PARSER, onResponse);
102+
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
103+
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, onResponse);
115104
}
116105

117106
@Override
118107
protected void clearScroll(String scrollId, Runnable onCompletion) {
119-
client.performRequestAsync("DELETE", scrollPath(), emptyMap(), clearScrollEntity(scrollId, remoteVersion), new ResponseListener() {
108+
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
120109
@Override
121110
public void onSuccess(org.elasticsearch.client.Response response) {
122111
logger.debug("Successfully cleared [{}]", scrollId);
@@ -162,7 +151,7 @@ protected void cleanup(Runnable onCompletion) {
162151
});
163152
}
164153

165-
private <T> void execute(String method, String uri, Map<String, String> params, HttpEntity entity,
154+
private <T> void execute(Request request,
166155
BiFunction<XContentParser, XContentType, T> parser, Consumer<? super T> listener) {
167156
// Preserve the thread context so headers survive after the call
168157
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
@@ -171,7 +160,7 @@ class RetryHelper extends AbstractRunnable {
171160

172161
@Override
173162
protected void doRun() throws Exception {
174-
client.performRequestAsync(method, uri, params, entity, new ResponseListener() {
163+
client.performRequestAsync(request, new ResponseListener() {
175164
@Override
176165
public void onSuccess(org.elasticsearch.client.Response response) {
177166
// Restore the thread context to get the precious headers

0 commit comments

Comments
 (0)